rocketmq 里有一种比较特殊的用法,就是顺序消息,比如订单的生命周期里,在创建,支付,签收等状态轮转中,会发出来对应的消息,这里面就比较需要去保证他们的顺序,当然在处理的业务代码也可以做对应的处理,结合消息重投,但是如果这里消息就能保证顺序性了,那么业务代码就能更好的关注业务代码的处理。
首先有一种情况是全局的有序,比如对于一个 topic 里就得发送线程保证只有一个,内部的 queue 也只有一个,消费线程也只有一个,这样就能比较容易的保证全局顺序性了,但是这里的问题就是完全限制了性能,不是很现实,在真实场景里很多都是比如对于同一个订单,需要去保证状态的轮转是按照预期的顺序来,而不必要全局的有序性。
对于这类的有序性,需要在发送和接收方都有对应的处理,在发送消息中,需要去指定 selector,即MessageQueueSelector
,能够以固定的方式是分配到对应的 MessageQueue
比如像 RocketMQ 中的示例1 2 3 4 5 6 7 8 SendResult sendResult = producer.send(msg, new MessageQueueSelector () { @Override public MessageQueue select (List<MessageQueue> mqs, Message msg, Object arg) { Long id = (Long) arg; long index = id % mqs.size(); return mqs.get((int ) index); } }, orderList.get(i).getOrderId());
而在消费侧有几个点比较重要,首先我们要保证一个 MessageQueue只被一个消费者消费,消费队列存在broker端,要保证 MessageQueue 只被一个消费者消费,那么消费者在进行消息拉取消费时就必须向mq服务器申请队列锁 ,消费者申请队列锁的代码存在于RebalanceService消息队列负载的实现代码中。1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 List<PullRequest> pullRequestList = new ArrayList <PullRequest>(); for (MessageQueue mq : mqSet) { if (!this .processQueueTable.containsKey(mq)) { if (isOrder && !this .lock(mq)) { log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed" , consumerGroup, mq); continue ; } this .removeDirtyOffset(mq); ProcessQueue pq = new ProcessQueue (); long nextOffset = this .computePullFromWhere(mq); if (nextOffset >= 0 ) { ProcessQueue pre = this .processQueueTable.putIfAbsent(mq, pq); if (pre != null ) { log.info("doRebalance, {}, mq already exists, {}" , consumerGroup, mq); } else { log.info("doRebalance, {}, add a new mq, {}" , consumerGroup, mq); PullRequest pullRequest = new PullRequest (); pullRequest.setConsumerGroup(consumerGroup); pullRequest.setNextOffset(nextOffset); pullRequest.setMessageQueue(mq); pullRequest.setProcessQueue(pq); pullRequestList.add(pullRequest); changed = true ; } } else { log.warn("doRebalance, {}, add new mq failed, {}" , consumerGroup, mq); } } }
在申请到锁之后会创建 pullRequest 进行消息拉取,消息拉取部分的代码实现在PullMessageService中,1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 @Override public void run () { log.info(this .getServiceName() + " service started" ); while (!this .isStopped()) { try { PullRequest pullRequest = this .pullRequestQueue.take(); this .pullMessage(pullRequest); } catch (InterruptedException ignored) { } catch (Exception e) { log.error("Pull Message Service Run Method exception" , e); } } log.info(this .getServiceName() + " service end" ); }
消息拉取完后,需要提交到ConsumeMessageService中进行消费,顺序消费的实现为ConsumeMessageOrderlyService,提交消息进行消费的方法为ConsumeMessageOrderlyService#submitConsumeRequest,具体实现如下:1 2 3 4 5 6 7 8 9 10 11 @Override public void submitConsumeRequest ( final List<MessageExt> msgs, final ProcessQueue processQueue, final MessageQueue messageQueue, final boolean dispathToConsume) { if (dispathToConsume) { ConsumeRequest consumeRequest = new ConsumeRequest (processQueue, messageQueue); this .consumeExecutor.submit(consumeRequest); } }
构建了一个ConsumeRequest对象,它是个实现了 runnable 接口的类,并提交给了线程池来并行消费,看下顺序消费的ConsumeRequest的run方法实现:1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 @Override public void run () { if (this .processQueue.isDropped()) { log.warn("run, the message queue not be able to consume, because it's dropped. {}" , this .messageQueue); return ; } final Object objLock = messageQueueLock.fetchLockObject(this .messageQueue); synchronized (objLock) { if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this .defaultMQPushConsumerImpl.messageModel()) || (this .processQueue.isLocked() && !this .processQueue.isLockExpired())) { final long beginTime = System.currentTimeMillis(); for (boolean continueConsume = true ; continueConsume; ) { if (this .processQueue.isDropped()) { log.warn("the message queue not be able to consume, because it's dropped. {}" , this .messageQueue); break ; } if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this .defaultMQPushConsumerImpl.messageModel()) && !this .processQueue.isLocked()) { log.warn("the message queue not locked, so consume later, {}" , this .messageQueue); ConsumeMessageOrderlyService.this .tryLockLaterAndReconsume(this .messageQueue, this .processQueue, 10 ); break ; } if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this .defaultMQPushConsumerImpl.messageModel()) && this .processQueue.isLockExpired()) { log.warn("the message queue lock expired, so consume later, {}" , this .messageQueue); ConsumeMessageOrderlyService.this .tryLockLaterAndReconsume(this .messageQueue, this .processQueue, 10 ); break ; } long interval = System.currentTimeMillis() - beginTime; if (interval > MAX_TIME_CONSUME_CONTINUOUSLY) { ConsumeMessageOrderlyService.this .submitConsumeRequestLater(processQueue, messageQueue, 10 ); break ; } final int consumeBatchSize = ConsumeMessageOrderlyService.this .defaultMQPushConsumer.getConsumeMessageBatchMaxSize(); List<MessageExt> msgs = this .processQueue.takeMessags(consumeBatchSize); defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup()); if (!msgs.isEmpty()) { final ConsumeOrderlyContext context = new ConsumeOrderlyContext (this .messageQueue); ConsumeOrderlyStatus status = null ; ConsumeMessageContext consumeMessageContext = null ; if (ConsumeMessageOrderlyService.this .defaultMQPushConsumerImpl.hasHook()) { consumeMessageContext = new ConsumeMessageContext (); consumeMessageContext .setConsumerGroup(ConsumeMessageOrderlyService.this .defaultMQPushConsumer.getConsumerGroup()); consumeMessageContext.setNamespace(defaultMQPushConsumer.getNamespace()); consumeMessageContext.setMq(messageQueue); consumeMessageContext.setMsgList(msgs); consumeMessageContext.setSuccess(false ); consumeMessageContext.setProps(new HashMap <String, String>()); ConsumeMessageOrderlyService.this .defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext); } long beginTimestamp = System.currentTimeMillis(); ConsumeReturnType returnType = ConsumeReturnType.SUCCESS; boolean hasException = false ; try { this .processQueue.getLockConsume().lock(); if (this .processQueue.isDropped()) { log.warn("consumeMessage, the message queue not be able to consume, because it's dropped. {}" , this .messageQueue); break ; } status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context); } catch (Throwable e) { log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}" , RemotingHelper.exceptionSimpleDesc(e), ConsumeMessageOrderlyService.this .consumerGroup, msgs, messageQueue); hasException = true ; } finally { this .processQueue.getLockConsume().unlock(); } if (null == status || ConsumeOrderlyStatus.ROLLBACK == status || ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT == status) { log.warn("consumeMessage Orderly return not OK, Group: {} Msgs: {} MQ: {}" , ConsumeMessageOrderlyService.this .consumerGroup, msgs, messageQueue); } long consumeRT = System.currentTimeMillis() - beginTimestamp; if (null == status) { if (hasException) { returnType = ConsumeReturnType.EXCEPTION; } else { returnType = ConsumeReturnType.RETURNNULL; } } else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000 ) { returnType = ConsumeReturnType.TIME_OUT; } else if (ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT == status) { returnType = ConsumeReturnType.FAILED; } else if (ConsumeOrderlyStatus.SUCCESS == status) { returnType = ConsumeReturnType.SUCCESS; } if (ConsumeMessageOrderlyService.this .defaultMQPushConsumerImpl.hasHook()) { consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name()); } if (null == status) { status = ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT; } if (ConsumeMessageOrderlyService.this .defaultMQPushConsumerImpl.hasHook()) { consumeMessageContext.setStatus(status.toString()); consumeMessageContext .setSuccess(ConsumeOrderlyStatus.SUCCESS == status || ConsumeOrderlyStatus.COMMIT == status); ConsumeMessageOrderlyService.this .defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext); } ConsumeMessageOrderlyService.this .getConsumerStatsManager() .incConsumeRT(ConsumeMessageOrderlyService.this .consumerGroup, messageQueue.getTopic(), consumeRT); continueConsume = ConsumeMessageOrderlyService.this .processConsumeResult(msgs, status, context, this ); } else { continueConsume = false ; } } } else { if (this .processQueue.isDropped()) { log.warn("the message queue not be able to consume, because it's dropped. {}" , this .messageQueue); return ; } ConsumeMessageOrderlyService.this .tryLockLaterAndReconsume(this .messageQueue, this .processQueue, 100 ); } } }
获取到锁对象后,使用synchronized尝试申请线程级独占锁。
如果加锁成功 ,同一时刻只有一个线程进行消息消费。
如果加锁失败 ,会延迟100ms重新尝试向broker端申请锁定messageQueue,锁定成功后重新提交消费请求
创建消息拉取任务时,消息客户端向broker端申请锁定MessageQueue,使得一个MessageQueue同一个时刻只能被一个消费客户端消费。
消息消费时,多线程针对同一个消息队列的消费先尝试使用synchronized申请独占锁,加锁成功才能进行消费,使得一个MessageQueue同一个时刻只能被一个消费客户端中一个线程消费。 这里其实还有很重要的一点是对processQueue的加锁,这里其实是保证了在 rebalance的过程中如果 processQueue 被分配给了另一个 consumer,但是当前已经被我这个 consumer 再消费,但是没提交,就有可能出现被两个消费者消费,所以得进行加锁保证不受 rebalance 影响。