IndexFile 结构 hash 结构能够通过 key 寻找到对应在 CommitLog 中的位置
IndexFile 的构建则是分发给这个进行处理1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 class CommitLogDispatcherBuildIndex implements CommitLogDispatcher { @Override public void dispatch (DispatchRequest request) { if (DefaultMessageStore.this .messageStoreConfig.isMessageIndexEnable()) { DefaultMessageStore.this .indexService.buildIndex(request); } } } public void buildIndex (DispatchRequest req) { IndexFile indexFile = retryGetAndCreateIndexFile(); if (indexFile != null ) { long endPhyOffset = indexFile.getEndPhyOffset(); DispatchRequest msg = req; String topic = msg.getTopic(); String keys = msg.getKeys(); if (msg.getCommitLogOffset() < endPhyOffset) { return ; } final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag()); switch (tranType) { case MessageSysFlag.TRANSACTION_NOT_TYPE: case MessageSysFlag.TRANSACTION_PREPARED_TYPE: case MessageSysFlag.TRANSACTION_COMMIT_TYPE: break ; case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: return ; } if (req.getUniqKey() != null ) { indexFile = putKey(indexFile, msg, buildKey(topic, req.getUniqKey())); if (indexFile == null ) { log.error("putKey error commitlog {} uniqkey {}" , req.getCommitLogOffset(), req.getUniqKey()); return ; } } if (keys != null && keys.length() > 0 ) { String[] keyset = keys.split(MessageConst.KEY_SEPARATOR); for (int i = 0 ; i < keyset.length; i++) { String key = keyset[i]; if (key.length() > 0 ) { indexFile = putKey(indexFile, msg, buildKey(topic, key)); if (indexFile == null ) { log.error("putKey error commitlog {} uniqkey {}" , req.getCommitLogOffset(), req.getUniqKey()); return ; } } } } } else { log.error("build index error, stop building index" ); } }
配置的数量1 2 3 private boolean messageIndexEnable = true ;private int maxHashSlotNum = 5000000 ;private int maxIndexNum = 5000000 * 4 ;
最核心的其实是 IndexFile 的结构和如何写入1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 public boolean putKey (final String key, final long phyOffset, final long storeTimestamp) { if (this .indexHeader.getIndexCount() < this .indexNum) { int keyHash = indexKeyHashMethod(key); int slotPos = keyHash % this .hashSlotNum; int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize; FileLock fileLock = null ; try { int slotValue = this .mappedByteBuffer.getInt(absSlotPos); if (slotValue <= invalidIndex || slotValue > this .indexHeader.getIndexCount()) { slotValue = invalidIndex; } long timeDiff = storeTimestamp - this .indexHeader.getBeginTimestamp(); timeDiff = timeDiff / 1000 ; if (this .indexHeader.getBeginTimestamp() <= 0 ) { timeDiff = 0 ; } else if (timeDiff > Integer.MAX_VALUE) { timeDiff = Integer.MAX_VALUE; } else if (timeDiff < 0 ) { timeDiff = 0 ; } int absIndexPos = IndexHeader.INDEX_HEADER_SIZE + this .hashSlotNum * hashSlotSize + this .indexHeader.getIndexCount() * indexSize; this .mappedByteBuffer.putInt(absIndexPos, keyHash); this .mappedByteBuffer.putLong(absIndexPos + 4 , phyOffset); this .mappedByteBuffer.putInt(absIndexPos + 4 + 8 , (int ) timeDiff); this .mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4 , slotValue); this .mappedByteBuffer.putInt(absSlotPos, this .indexHeader.getIndexCount()); if (this .indexHeader.getIndexCount() <= 1 ) { this .indexHeader.setBeginPhyOffset(phyOffset); this .indexHeader.setBeginTimestamp(storeTimestamp); } this .indexHeader.incHashSlotCount(); this .indexHeader.incIndexCount(); this .indexHeader.setEndPhyOffset(phyOffset); this .indexHeader.setEndTimestamp(storeTimestamp); return true ; } catch (Exception e) { log.error("putKey exception, Key: " + key + " KeyHashCode: " + key.hashCode(), e); } finally { if (fileLock != null ) { try { fileLock.release(); } catch (IOException e) { log.error("Failed to release the lock" , e); } } } } else { log.warn("Over index file capacity: index count = " + this .indexHeader.getIndexCount() + "; index max num = " + this .indexNum); } return false ; }
具体可以看一下这个简略的示意图