result = result && this.consumerOffsetManager.load(); result = result && this.subscriptionGroupManager.load(); result = result && this.consumerFilterManager.load();
if (this.brokerConfig.getNamesrvAddr() != null) { this.brokerOuterAPI.updateNameServerAddressList(this.brokerConfig.getNamesrvAddr()); log.info("Set user specified name server address: {}", this.brokerConfig.getNamesrvAddr()); } elseif (this.brokerConfig.isFetchNamesrvAddrByAddressServer()) { this.scheduledExecutorService.scheduleAtFixedRate(newRunnable() {
lockFile.getChannel().write(ByteBuffer.wrap("lock".getBytes())); lockFile.getChannel().force(true); { /** * 1. Make sure the fast-forward messages to be truncated during the recovering according to the max physical offset of the commitlog; * 2. DLedger committedPos may be missing, so the maxPhysicalPosInLogicQueue maybe bigger that maxOffset returned by DLedgerCommitLog, just let it go; * 3. Calculate the reput offset according to the consume queue; * 4. Make sure the fall-behind messages to be dispatched before starting the commitlog, especially when the broker role are automatically changed. */ longmaxPhysicalPosInLogicQueue= commitLog.getMinOffset(); for (ConcurrentMap<Integer, ConsumeQueue> maps : this.consumeQueueTable.values()) { for (ConsumeQueue logic : maps.values()) { if (logic.getMaxPhysicOffset() > maxPhysicalPosInLogicQueue) { maxPhysicalPosInLogicQueue = logic.getMaxPhysicOffset(); } } } if (maxPhysicalPosInLogicQueue < 0) { maxPhysicalPosInLogicQueue = 0; } if (maxPhysicalPosInLogicQueue < this.commitLog.getMinOffset()) { maxPhysicalPosInLogicQueue = this.commitLog.getMinOffset(); /** * This happens in following conditions: * 1. If someone removes all the consumequeue files or the disk get damaged. * 2. Launch a new broker, and copy the commitlog from other brokers. * * All the conditions has the same in common that the maxPhysicalPosInLogicQueue should be 0. * If the maxPhysicalPosInLogicQueue is gt 0, there maybe something wrong. */ log.warn("[TooSmallCqOffset] maxPhysicalPosInLogicQueue={} clMinOffset={}", maxPhysicalPosInLogicQueue, this.commitLog.getMinOffset()); } log.info("[SetReputOffset] maxPhysicalPosInLogicQueue={} clMinOffset={} clMaxOffset={} clConfirmedOffset={}", maxPhysicalPosInLogicQueue, this.commitLog.getMinOffset(), this.commitLog.getMaxOffset(), this.commitLog.getConfirmOffset()); this.reputMessageService.setReputFromOffset(maxPhysicalPosInLogicQueue); this.reputMessageService.start();
/** * 1. Finish dispatching the messages fall behind, then to start other services. * 2. DLedger committedPos may be missing, so here just require dispatchBehindBytes <= 0 */ while (true) { if (dispatchBehindBytes() <= 0) { break; } Thread.sleep(1000); log.info("Try to finish doing reput the messages fall behind during the starting, reputOffset={} maxOffset={} behind={}", this.reputMessageService.getReputFromOffset(), this.getMaxPhyOffset(), this.dispatchBehindBytes()); } this.recoverTopicQueueTable(); }
if (!messageStoreConfig.isEnableDLegerCommitLog()) { this.haService.start(); this.handleScheduleMessageService(messageStoreConfig.getBrokerRole()); }
if (null == namesrvConfig.getRocketmqHome()) { System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV); System.exit(-2); }
Iterator<Entry<Long, String>> it = brokerData.getBrokerAddrs().entrySet().iterator(); while (it.hasNext()) { Entry<Long, String> entry = it.next(); LongbrokerId= entry.getKey(); StringbrokerAddr= entry.getValue(); if (brokerAddr.equals(brokerAddrFound)) { brokerNameFound = brokerData.getBrokerName(); it.remove(); log.info("remove brokerAddr[{}, {}] from brokerAddrTable, because channel destroyed", brokerId, brokerAddr); break; } }
if (brokerData.getBrokerAddrs().isEmpty()) { removeBrokerName = true; itBrokerAddrTable.remove(); log.info("remove brokerName[{}] from brokerAddrTable, because channel destroyed", brokerData.getBrokerName()); } }
if (brokerNameFound != null && removeBrokerName) { Iterator<Entry<String, Set<String>>> it = this.clusterAddrTable.entrySet().iterator(); while (it.hasNext()) { Entry<String, Set<String>> entry = it.next(); StringclusterName= entry.getKey(); Set<String> brokerNames = entry.getValue(); booleanremoved= brokerNames.remove(brokerNameFound); if (removed) { log.info("remove brokerName[{}], clusterName[{}] from clusterAddrTable, because channel destroyed", brokerNameFound, clusterName);
if (brokerNames.isEmpty()) { log.info("remove the clusterName[{}] from clusterAddrTable, because channel destroyed and no broker in this cluster", clusterName); it.remove(); }
switch (request.getCode()) { case RequestCode.PUT_KV_CONFIG: returnthis.putKVConfig(ctx, request); case RequestCode.GET_KV_CONFIG: returnthis.getKVConfig(ctx, request); case RequestCode.DELETE_KV_CONFIG: returnthis.deleteKVConfig(ctx, request); case RequestCode.QUERY_DATA_VERSION: return queryBrokerTopicConfig(ctx, request); case RequestCode.REGISTER_BROKER: VersionbrokerVersion= MQVersion.value2Version(request.getVersion()); if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) { returnthis.registerBrokerWithFilterServer(ctx, request); } else { returnthis.registerBroker(ctx, request); } case RequestCode.UNREGISTER_BROKER: returnthis.unregisterBroker(ctx, request); case RequestCode.GET_ROUTEINTO_BY_TOPIC: returnthis.getRouteInfoByTopic(ctx, request); case RequestCode.GET_BROKER_CLUSTER_INFO: returnthis.getBrokerClusterInfo(ctx, request); case RequestCode.WIPE_WRITE_PERM_OF_BROKER: returnthis.wipeWritePermOfBroker(ctx, request); case RequestCode.GET_ALL_TOPIC_LIST_FROM_NAMESERVER: return getAllTopicListFromNameserver(ctx, request); case RequestCode.DELETE_TOPIC_IN_NAMESRV: return deleteTopicInNamesrv(ctx, request); case RequestCode.GET_KVLIST_BY_NAMESPACE: returnthis.getKVListByNamespace(ctx, request); case RequestCode.GET_TOPICS_BY_CLUSTER: returnthis.getTopicsByCluster(ctx, request); case RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_NS: returnthis.getSystemTopicListFromNs(ctx, request); case RequestCode.GET_UNIT_TOPIC_LIST: returnthis.getUnitTopicList(ctx, request); case RequestCode.GET_HAS_UNIT_SUB_TOPIC_LIST: returnthis.getHasUnitSubTopicList(ctx, request); case RequestCode.GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST: returnthis.getHasUnitSubUnUnitTopicList(ctx, request); case RequestCode.UPDATE_NAMESRV_CONFIG: returnthis.updateConfig(ctx, request); case RequestCode.GET_NAMESRV_CONFIG: returnthis.getConfig(ctx, request); default: break; } returnnull; }
以broker注册为例,
1 2 3 4 5 6 7
case RequestCode.REGISTER_BROKER: VersionbrokerVersion= MQVersion.value2Version(request.getVersion()); if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) { returnthis.registerBrokerWithFilterServer(ctx, request); } else { returnthis.registerBroker(ctx, request); }
public RegisterBrokerResult registerBroker( final String clusterName, final String brokerAddr, final String brokerName, finallong brokerId, final String haServerAddr, final TopicConfigSerializeWrapper topicConfigWrapper, final List<String> filterServerList, final Channel channel) { RegisterBrokerResultresult=newRegisterBrokerResult(); try { try { this.lock.writeLock().lockInterruptibly();
response.setCode(ResponseCode.TOPIC_NOT_EXIST); response.setRemark("No topic route info in name server for the topic: " + requestHeader.getTopic() + FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL)); return response; }
/* * Instantiate with specified consumer group name. * 首先是new 一个对象出来,然后指定 Consumer 的 Group * 同一类Consumer的集合,这类Consumer通常消费同一类消息且消费逻辑一致。消费者组使得在消息消费方面,实现负载均衡和容错的目标变得非常容易。要注意的是,消费者组的消费者实例必须订阅完全相同的Topic。RocketMQ 支持两种消息模式:集群消费(Clustering)和广播消费(Broadcasting)。 */ DefaultMQPushConsumerconsumer=newDefaultMQPushConsumer("please_rename_unique_group_name_4");
/* * Specify name server addresses. * <p/> * 这里可以通知指定环境变量或者设置对象参数的形式指定名字空间服务的地址 * * Alternatively, you may specify name server addresses via exporting environmental variable: NAMESRV_ADDR * <pre> * {@code * consumer.setNamesrvAddr("name-server1-ip:9876;name-server2-ip:9876"); * } * </pre> */
/* * Specify where to start in case the specified consumer group is a brand new one. * 指定消费起始点 */ consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
/* * Subscribe one more more topics to consume. * 指定订阅的 topic 跟 tag,注意后面的是个表达式,可以以 tag1 || tag2 || tag3 传入 */ consumer.subscribe("TopicTest", "*");
/* * Register callback to execute on arrival of messages fetched from brokers. * 注册具体获得消息后的处理方法 */ consumer.registerMessageListener(newMessageListenerConcurrently() {
@Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } });
/* * Launch the consumer instance. * 启动消费者 */ consumer.start();
System.out.printf("Consumer Started.%n"); }
然后就是看看 start 的过程了
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/** * This method gets internal infrastructure readily to serve. Instances must call this method after configuration. * * @throws MQClientException if there is any client error. */ @Override publicvoidstart()throws MQClientException { setConsumerGroup(NamespaceUtil.wrapNamespace(this.getNamespace(), this.consumerGroup)); this.defaultMQPushConsumerImpl.start(); if (null != traceDispatcher) { try { traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel()); } catch (MQClientException e) { log.warn("trace dispatcher start failed ", e); } } }
/** * Internal implementation. Most of the functions herein are delegated to it. */ protectedfinaltransient DefaultMQPushConsumerImpl defaultMQPushConsumerImpl;
booleanregisterOK= mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this); if (!registerOK) { this.serviceState = ServiceState.CREATE_JUST; this.consumeMessageService.shutdown(); thrownewMQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup() + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL), null); }
mQClientFactory.start(); log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup()); this.serviceState = ServiceState.RUNNING; break; case RUNNING: case START_FAILED: case SHUTDOWN_ALREADY: thrownewMQClientException("The PushConsumer service state not OK, maybe started once, " + this.serviceState + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), null); default: break; }
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest); break; case NO_MATCHED_MSG: pullRequest.setNextOffset(pullResult.getNextBeginOffset());
public PullResult pullKernelImpl( final MessageQueue mq, final String subExpression, final String expressionType, finallong subVersion, finallong offset, finalint maxNums, finalint sysFlag, finallong commitOffset, finallong brokerSuspendMaxTimeMillis, finallong timeoutMillis, final CommunicationMode communicationMode, final PullCallback pullCallback )throws MQClientException, RemotingException, MQBrokerException, InterruptedException { FindBrokerResultfindBrokerResult= this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), this.recalculatePullFromWhichNode(mq), false); if (null == findBrokerResult) { this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic()); findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), this.recalculatePullFromWhichNode(mq), false); }
if (findBrokerResult != null) { { // check version if (!ExpressionType.isTagType(expressionType) && findBrokerResult.getBrokerVersion() < MQVersion.Version.V4_1_0_SNAPSHOT.ordinal()) { thrownewMQClientException("The broker[" + mq.getBrokerName() + ", " + findBrokerResult.getBrokerVersion() + "] does not upgrade to support for filter message by " + expressionType, null); } } intsysFlagInner= sysFlag;
if (findBrokerResult.isSlave()) { sysFlagInner = PullSysFlag.clearCommitOffsetFlag(sysFlagInner); }
public PullResult pullMessage( final String addr, final PullMessageRequestHeader requestHeader, finallong timeoutMillis, final CommunicationMode communicationMode, final PullCallback pullCallback )throws RemotingException, MQBrokerException, InterruptedException { RemotingCommandrequest= RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, requestHeader);
switch (communicationMode) { case ONEWAY: assertfalse; returnnull; case ASYNC: this.pullMessageAsync(addr, request, timeoutMillis, pullCallback); returnnull; case SYNC: returnthis.pullMessageSync(addr, request, timeoutMillis); default: assertfalse; break; }