聊聊 RocketMQ 的 Broker 源码

broker 的启动形式有点类似于 NameServer,都是服务类型的,跟 Consumer 差别比较大,

首先是org.apache.rocketmq.broker.BrokerStartup中的 main 函数,org.apache.rocketmq.broker.BrokerStartup#createBrokerController基本就是读取参数,这里差点把最核心的初始化给漏了,

1
2
3
4
5
6
7
8
9
final BrokerController controller = new BrokerController(
brokerConfig,
nettyServerConfig,
nettyClientConfig,
messageStoreConfig);
// remember all configs to prevent discard
controller.getConfiguration().registerConfig(properties);

boolean initResult = controller.initialize();

前面是以 broker 配置,netty 的服务端和客户端配置,以及消息存储配置在实例化 BrokerController,然后就是初始化了

1
2
3
4
5
6
public boolean initialize() throws CloneNotSupportedException {
boolean result = this.topicConfigManager.load();

result = result && this.consumerOffsetManager.load();
result = result && this.subscriptionGroupManager.load();
result = result && this.consumerFilterManager.load();

前面这些就是各个配置的 load 了,然后是个我认为比较重要的部分messageStore 的实例化,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
if (result) {
try {
this.messageStore =
new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener,
this.brokerConfig);
if (messageStoreConfig.isEnableDLegerCommitLog()) {
DLedgerRoleChangeHandler roleChangeHandler = new DLedgerRoleChangeHandler(this, (DefaultMessageStore) messageStore);
((DLedgerCommitLog)((DefaultMessageStore) messageStore).getCommitLog()).getdLedgerServer().getdLedgerLeaderElector().addRoleChangeHandler(roleChangeHandler);
}
this.brokerStats = new BrokerStats((DefaultMessageStore) this.messageStore);
//load plugin
MessageStorePluginContext context = new MessageStorePluginContext(messageStoreConfig, brokerStatsManager, messageArrivingListener, brokerConfig);
this.messageStore = MessageStoreFactory.build(context, this.messageStore);
this.messageStore.getDispatcherList().addFirst(new CommitLogDispatcherCalcBitMap(this.brokerConfig, this.consumerFilterManager));
} catch (IOException e) {
result = false;
log.error("Failed to initialize", e);
}
}

result = result && this.messageStore.load();

先是实例化,实例化构造函数里的代码比较重要,重点看一下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public DefaultMessageStore(final MessageStoreConfig messageStoreConfig, final BrokerStatsManager brokerStatsManager,
final MessageArrivingListener messageArrivingListener, final BrokerConfig brokerConfig) throws IOException {
this.messageArrivingListener = messageArrivingListener;
this.brokerConfig = brokerConfig;
this.messageStoreConfig = messageStoreConfig;
this.brokerStatsManager = brokerStatsManager;
this.allocateMappedFileService = new AllocateMappedFileService(this);
if (messageStoreConfig.isEnableDLegerCommitLog()) {
this.commitLog = new DLedgerCommitLog(this);
} else {
this.commitLog = new CommitLog(this);
}
this.consumeQueueTable = new ConcurrentHashMap<>(32);

this.flushConsumeQueueService = new FlushConsumeQueueService();
this.cleanCommitLogService = new CleanCommitLogService();
this.cleanConsumeQueueService = new CleanConsumeQueueService();
this.storeStatsService = new StoreStatsService();
this.indexService = new IndexService(this);
if (!messageStoreConfig.isEnableDLegerCommitLog()) {
this.haService = new HAService(this);
} else {
this.haService = null;
}
this.reputMessageService = new ReputMessageService();

this.scheduleMessageService = new ScheduleMessageService(this);

this.transientStorePool = new TransientStorePool(messageStoreConfig);

if (messageStoreConfig.isTransientStorePoolEnable()) {
this.transientStorePool.init();
}

this.allocateMappedFileService.start();

this.indexService.start();

this.dispatcherList = new LinkedList<>();
this.dispatcherList.addLast(new CommitLogDispatcherBuildConsumeQueue());
this.dispatcherList.addLast(new CommitLogDispatcherBuildIndex());

File file = new File(StorePathConfigHelper.getLockFile(messageStoreConfig.getStorePathRootDir()));
MappedFile.ensureDirOK(file.getParent());
lockFile = new RandomAccessFile(file, "rw");
}

这里面有很多类,不过先把从构造函数里传进来的忽略下,接下来就是 AllocateMappedFileService 这个service,前面看过文章的可能会根据上面的代码猜到,这也是个 ServiceThread,如果是对RocketMQ 有所了解的可能从名字可以看出这个类是关于 RocketMQ 消息怎么落盘的,当需要创建MappedFile时(在MapedFileQueue.getLastMapedFile方法中),向该线程的requestQueue队列中放入AllocateRequest请求对象,该线程会在后台监听该队列,并在后台创建MapedFile对象,即同时创建了物理文件。然后是创建了 IndexService 服务线程,用来给创建索引;还有是FlushConsumeQueueService是将ConsumeQueue 刷入磁盘;CleanCommitLogService用来清理过期的 CommitLog,默认是 72 小时以上;CleanConsumeQueueService是将小于最新的 CommitLog 偏移量的 ConsumeQueue 清理掉;StoreStatsService是储存统计服务;HAService用于CommitLog 的主备同步;ScheduleMessageService用于定时消息;还有就是这个ReputMessageService非常重要,就是由它实现了将 CommitLog 以 topic+queue 纬度构建 ConsumeQueue,后面TransientStorePool是异步刷盘时的存储buffer,也可以从后面的判断中看出来

1
2
3
4
public boolean isTransientStorePoolEnable() {
return transientStorePoolEnable && FlushDiskType.ASYNC_FLUSH == getFlushDiskType()
&& BrokerRole.SLAVE != getBrokerRole();
}

再然后就是启动两个服务线程,dispatcherList是为CommitLog文件转发请求,差不多这个初始化就这些内容。

然后回到外层,下面是主备切换的配置,然后是数据统计,接着是存储插件加载,然后是往转发器链表里再加一个过滤器

1
2
3
4
5
6
7
8
9
if (messageStoreConfig.isEnableDLegerCommitLog()) {
DLedgerRoleChangeHandler roleChangeHandler = new DLedgerRoleChangeHandler(this, (DefaultMessageStore) messageStore);
((DLedgerCommitLog)((DefaultMessageStore) messageStore).getCommitLog()).getdLedgerServer().getdLedgerLeaderElector().addRoleChangeHandler(roleChangeHandler);
}
this.brokerStats = new BrokerStats((DefaultMessageStore) this.messageStore);
//load plugin
MessageStorePluginContext context = new MessageStorePluginContext(messageStoreConfig, brokerStatsManager, messageArrivingListener, brokerConfig);
this.messageStore = MessageStoreFactory.build(context, this.messageStore);
this.messageStore.getDispatcherList().addFirst(new CommitLogDispatcherCalcBitMap(this.brokerConfig, this.consumerFilterManager));

接下来就是org.apache.rocketmq.store.MessageStore#load的过程了,

  1. 调用ScheduleMessageService.load方法,初始化延迟级别列表。将这些级别(”1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”)的延时存入延迟级别delayLevelTable:ConcurrentHashMap<Integer /* level */, Long/* delay timeMillis */>变量中,例如1s的kv值为1:1000,5s的kv值为2:5000,key值依次类推;每个延迟级别即为一个队列。

2)调用CommitLog.load方法,在此方法中调用MapedFileQueue.load方法,将$HOME /store/commitlog目录下的所有文件加载到MapedFileQueue的List变量中;

3)调用DefaultMessageStore.loadConsumeQueue方法加载consumequeue文件数据到DefaultMessageStore.consumeQueueTable集合中。

初始化StoreCheckPoint对象,加载$HOME/store/checkpoint文件,该文件记录三个字段值,分别是物理队列消息时间戳、逻辑队列消息时间戳、索引队列消息时间戳。

调用IndexService.load方法加载$HOME/store/index目录下的文件。对该目录下的每个文件初始化一个IndexFile对象。然后调用IndexFile对象的load方法将IndexHeader加载到对象的变量中;再根据检查是否存在abort文件,若有存在abort文件,则表示Broker表示上次是异常退出的,则检查checkpoint的indexMsgTimestamp字段值是否小于IndexHeader的endTimestamp值,indexMsgTimestamp值表示最后刷盘的时间,若小于则表示在最后刷盘之后在该文件中还创建了索引,则要删除该Index文件,否则将该IndexFile对象放入indexFileList:ArrayList索引文件集合中。

然后调用org.apache.rocketmq.store.DefaultMessageStore#recover恢复,前面有根据boolean lastExitOK = !this.isTempFileExist();临时文件是否存在来判断上一次是否正常退出,根据这个状态来选择什么恢复策略

接下去是初始化 Netty 服务端,初始化发送消息线程池(sendMessageExecutor)、拉取消息线程池(pullMessageExecutor)、管理Broker线程池(adminBrokerExecutor)、客户端管理线程池(clientManageExecutor),注册事件处理器,包括发送消息事件处理器(SendMessageProcessor)、拉取消息事件处理器、查询消息事件处理器(QueryMessageProcessor,包括客户端的心跳事件、注销事件、获取消费者列表事件、更新更新和查询消费进度consumerOffset)、客户端管理事件处理器(ClientManageProcessor)、结束事务处理器(EndTransactionProcessor)、默认事件处理器(AdminBrokerProcessor),然后是定时任务

BrokerController.this.getBrokerStats().record(); 记录 Broker 状态

BrokerController.this.consumerOffsetManager.persist(); 持久化consumerOffset

BrokerController.this.consumerFilterManager.persist();持久化consumerFilter

BrokerController.this.protectBroker(); 保护 broker,消费慢,不让继续投递

BrokerController.this.printWaterMark(); 打印水位

log.info("dispatch behind commit log {} bytes", BrokerController.this.getMessageStore().dispatchBehindBytes()); 检查落后程度

BrokerController.this.brokerOuterAPI.fetchNameServerAddr(); 定时获取 nameserver

BrokerController.this.printMasterAndSlaveDiff(); 打印主从不一致

然后是 tsl,初始化事务消息,初始化 RPCHook

请把害怕打到公屏上🤦‍♂️,从线程池名字和调用的方法应该可以看出大部分的用途

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.clientHousekeepingService);
NettyServerConfig fastConfig = (NettyServerConfig) this.nettyServerConfig.clone();
fastConfig.setListenPort(nettyServerConfig.getListenPort() - 2);
this.fastRemotingServer = new NettyRemotingServer(fastConfig, this.clientHousekeepingService);
this.sendMessageExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getSendMessageThreadPoolNums(),
this.brokerConfig.getSendMessageThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.sendThreadPoolQueue,
new ThreadFactoryImpl("SendMessageThread_"));

this.pullMessageExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getPullMessageThreadPoolNums(),
this.brokerConfig.getPullMessageThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.pullThreadPoolQueue,
new ThreadFactoryImpl("PullMessageThread_"));

this.replyMessageExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getProcessReplyMessageThreadPoolNums(),
this.brokerConfig.getProcessReplyMessageThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.replyThreadPoolQueue,
new ThreadFactoryImpl("ProcessReplyMessageThread_"));

this.queryMessageExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getQueryMessageThreadPoolNums(),
this.brokerConfig.getQueryMessageThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.queryThreadPoolQueue,
new ThreadFactoryImpl("QueryMessageThread_"));

this.adminBrokerExecutor =
Executors.newFixedThreadPool(this.brokerConfig.getAdminBrokerThreadPoolNums(), new ThreadFactoryImpl(
"AdminBrokerThread_"));

this.clientManageExecutor = new ThreadPoolExecutor(
this.brokerConfig.getClientManageThreadPoolNums(),
this.brokerConfig.getClientManageThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.clientManagerThreadPoolQueue,
new ThreadFactoryImpl("ClientManageThread_"));

this.heartbeatExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getHeartbeatThreadPoolNums(),
this.brokerConfig.getHeartbeatThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.heartbeatThreadPoolQueue,
new ThreadFactoryImpl("HeartbeatThread_", true));

this.endTransactionExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getEndTransactionThreadPoolNums(),
this.brokerConfig.getEndTransactionThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.endTransactionThreadPoolQueue,
new ThreadFactoryImpl("EndTransactionThread_"));

this.consumerManageExecutor =
Executors.newFixedThreadPool(this.brokerConfig.getConsumerManageThreadPoolNums(), new ThreadFactoryImpl(
"ConsumerManageThread_"));

this.registerProcessor();

final long initialDelay = UtilAll.computeNextMorningTimeMillis() - System.currentTimeMillis();
final long period = 1000 * 60 * 60 * 24;
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.getBrokerStats().record();
} catch (Throwable e) {
log.error("schedule record error.", e);
}
}
}, initialDelay, period, TimeUnit.MILLISECONDS);

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.consumerOffsetManager.persist();
} catch (Throwable e) {
log.error("schedule persist consumerOffset error.", e);
}
}
}, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.consumerFilterManager.persist();
} catch (Throwable e) {
log.error("schedule persist consumer filter error.", e);
}
}
}, 1000 * 10, 1000 * 10, TimeUnit.MILLISECONDS);

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.protectBroker();
} catch (Throwable e) {
log.error("protectBroker error.", e);
}
}
}, 3, 3, TimeUnit.MINUTES);

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.printWaterMark();
} catch (Throwable e) {
log.error("printWaterMark error.", e);
}
}
}, 10, 1, TimeUnit.SECONDS);

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

@Override
public void run() {
try {
log.info("dispatch behind commit log {} bytes", BrokerController.this.getMessageStore().dispatchBehindBytes());
} catch (Throwable e) {
log.error("schedule dispatchBehindBytes error.", e);
}
}
}, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);

if (this.brokerConfig.getNamesrvAddr() != null) {
this.brokerOuterAPI.updateNameServerAddressList(this.brokerConfig.getNamesrvAddr());
log.info("Set user specified name server address: {}", this.brokerConfig.getNamesrvAddr());
} else if (this.brokerConfig.isFetchNamesrvAddrByAddressServer()) {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

@Override
public void run() {
try {
BrokerController.this.brokerOuterAPI.fetchNameServerAddr();
} catch (Throwable e) {
log.error("ScheduledTask fetchNameServerAddr exception", e);
}
}
}, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
}

if (!messageStoreConfig.isEnableDLegerCommitLog()) {
if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
if (this.messageStoreConfig.getHaMasterAddress() != null && this.messageStoreConfig.getHaMasterAddress().length() >= 6) {
this.messageStore.updateHaMasterAddress(this.messageStoreConfig.getHaMasterAddress());
this.updateMasterHAServerAddrPeriodically = false;
} else {
this.updateMasterHAServerAddrPeriodically = true;
}
} else {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.printMasterAndSlaveDiff();
} catch (Throwable e) {
log.error("schedule printMasterAndSlaveDiff error.", e);
}
}
}, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
}
}

if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {
// Register a listener to reload SslContext
try {
fileWatchService = new FileWatchService(
new String[] {
TlsSystemConfig.tlsServerCertPath,
TlsSystemConfig.tlsServerKeyPath,
TlsSystemConfig.tlsServerTrustCertPath
},
new FileWatchService.Listener() {
boolean certChanged, keyChanged = false;

@Override
public void onChanged(String path) {
if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) {
log.info("The trust certificate changed, reload the ssl context");
reloadServerSslContext();
}
if (path.equals(TlsSystemConfig.tlsServerCertPath)) {
certChanged = true;
}
if (path.equals(TlsSystemConfig.tlsServerKeyPath)) {
keyChanged = true;
}
if (certChanged && keyChanged) {
log.info("The certificate and private key changed, reload the ssl context");
certChanged = keyChanged = false;
reloadServerSslContext();
}
}

private void reloadServerSslContext() {
((NettyRemotingServer) remotingServer).loadSslContext();
((NettyRemotingServer) fastRemotingServer).loadSslContext();
}
});
} catch (Exception e) {
log.warn("FileWatchService created error, can't load the certificate dynamically");
}
}
initialTransaction();
initialAcl();
initialRpcHooks();
}
return result;

Broker 启动过程

贴代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
public void start() throws Exception {
if (this.messageStore != null) {
this.messageStore.start();
}

if (this.remotingServer != null) {
this.remotingServer.start();
}

if (this.fastRemotingServer != null) {
this.fastRemotingServer.start();
}

if (this.fileWatchService != null) {
this.fileWatchService.start();
}

if (this.brokerOuterAPI != null) {
this.brokerOuterAPI.start();
}

if (this.pullRequestHoldService != null) {
this.pullRequestHoldService.start();
}

if (this.clientHousekeepingService != null) {
this.clientHousekeepingService.start();
}

if (this.filterServerManager != null) {
this.filterServerManager.start();
}

if (!messageStoreConfig.isEnableDLegerCommitLog()) {
startProcessorByHa(messageStoreConfig.getBrokerRole());
handleSlaveSynchronize(messageStoreConfig.getBrokerRole());
this.registerBrokerAll(true, false, true);
}

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

@Override
public void run() {
try {
BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
} catch (Throwable e) {
log.error("registerBrokerAll Exception", e);
}
}
}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);

if (this.brokerStatsManager != null) {
this.brokerStatsManager.start();
}

if (this.brokerFastFailure != null) {
this.brokerFastFailure.start();
}


}

首先是启动messageStore,调用 start 方法,这里面又调用了一些代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
public void start() throws Exception {

lock = lockFile.getChannel().tryLock(0, 1, false);
if (lock == null || lock.isShared() || !lock.isValid()) {
throw new RuntimeException("Lock failed,MQ already started");
}

lockFile.getChannel().write(ByteBuffer.wrap("lock".getBytes()));
lockFile.getChannel().force(true);
{
/**
* 1. Make sure the fast-forward messages to be truncated during the recovering according to the max physical offset of the commitlog;
* 2. DLedger committedPos may be missing, so the maxPhysicalPosInLogicQueue maybe bigger that maxOffset returned by DLedgerCommitLog, just let it go;
* 3. Calculate the reput offset according to the consume queue;
* 4. Make sure the fall-behind messages to be dispatched before starting the commitlog, especially when the broker role are automatically changed.
*/
long maxPhysicalPosInLogicQueue = commitLog.getMinOffset();
for (ConcurrentMap<Integer, ConsumeQueue> maps : this.consumeQueueTable.values()) {
for (ConsumeQueue logic : maps.values()) {
if (logic.getMaxPhysicOffset() > maxPhysicalPosInLogicQueue) {
maxPhysicalPosInLogicQueue = logic.getMaxPhysicOffset();
}
}
}
if (maxPhysicalPosInLogicQueue < 0) {
maxPhysicalPosInLogicQueue = 0;
}
if (maxPhysicalPosInLogicQueue < this.commitLog.getMinOffset()) {
maxPhysicalPosInLogicQueue = this.commitLog.getMinOffset();
/**
* This happens in following conditions:
* 1. If someone removes all the consumequeue files or the disk get damaged.
* 2. Launch a new broker, and copy the commitlog from other brokers.
*
* All the conditions has the same in common that the maxPhysicalPosInLogicQueue should be 0.
* If the maxPhysicalPosInLogicQueue is gt 0, there maybe something wrong.
*/
log.warn("[TooSmallCqOffset] maxPhysicalPosInLogicQueue={} clMinOffset={}", maxPhysicalPosInLogicQueue, this.commitLog.getMinOffset());
}
log.info("[SetReputOffset] maxPhysicalPosInLogicQueue={} clMinOffset={} clMaxOffset={} clConfirmedOffset={}",
maxPhysicalPosInLogicQueue, this.commitLog.getMinOffset(), this.commitLog.getMaxOffset(), this.commitLog.getConfirmOffset());
this.reputMessageService.setReputFromOffset(maxPhysicalPosInLogicQueue);
this.reputMessageService.start();

/**
* 1. Finish dispatching the messages fall behind, then to start other services.
* 2. DLedger committedPos may be missing, so here just require dispatchBehindBytes <= 0
*/
while (true) {
if (dispatchBehindBytes() <= 0) {
break;
}
Thread.sleep(1000);
log.info("Try to finish doing reput the messages fall behind during the starting, reputOffset={} maxOffset={} behind={}", this.reputMessageService.getReputFromOffset(), this.getMaxPhyOffset(), this.dispatchBehindBytes());
}
this.recoverTopicQueueTable();
}

if (!messageStoreConfig.isEnableDLegerCommitLog()) {
this.haService.start();
this.handleScheduleMessageService(messageStoreConfig.getBrokerRole());
}

this.flushConsumeQueueService.start();
this.commitLog.start();
this.storeStatsService.start();

this.createTempFile();
this.addScheduleTask();
this.shutdown = false;
}

调用DefaultMessageStore.start方法启动DefaultMessageStore对象中的一些服务线程。

  1. 启动ReputMessageService服务线程
  2. 启动FlushConsumeQueueService服务线程;
  3. 调用CommitLog.start方法,启动CommitLog对象中的FlushCommitLogService线程服务,若是同步刷盘(SYNC_FLUSH)则是启动GroupCommitService线程服务;若是异步刷盘(ASYNC_FLUSH)则是启动FlushRealTimeService线程服务;
  4. 启动StoreStatsService线程服务;
  5. 启动定时清理任务

然后是启动ClientHousekeepingService的 netty 服务端和客户端,然后是启动fileWatchService证书服务,接着启动BrokerOuterAPI中的NettyRemotingClient,即建立与NameServer的链接,用于自身Broker与其他模块的RPC功能调用;包括获取NameServer的地址、注册Broker、注销Broker、获取Topic配置、获取消息进度信息、获取订阅关系等RPC功能,然后是PullRequestHoldService服务线程,这个就是实现长轮询的,然后启动管家ClientHousekeepingService服务,负责扫描不活跃的生产者,消费者和 filter,启动FilterServerManager 过滤器服务管理,然后启动定时任务调用org.apache.rocketmq.broker.BrokerController#registerBrokerAll向所有 nameserver 注册 broker,最后是按需开启org.apache.rocketmq.store.stats.BrokerStatsManager和org.apache.rocketmq.broker.latency.BrokerFastFailure,基本上启动过程就完成了