RocketMQ源码分析
RocketMQ整体架构
RocketMQ主要的功能集中在NameServer、rocketmq-broker、rocketmq-remoting、rocketmq-store 4个模块,架构图如下所示:
项目结构如下
图中名字对应每一个工程中的 artifactId。
最主要的便是
整体模块如下 rocketmq-namesrv
命名服务,更新和路由发现broker服务。
NameServer要作用是为消息生产者、消息消费者提供关于主题Topic的路由信息,NameServer除了要存储路由的基础信息,还要能够管理Broker节点,包括路由注册、路由删除等功能。
rocketmq-broker
mq的核心
它能接收producer和consumer的请求,并调用store层服务对消息进行处理。HA服务的基本单元,支持同步双写,异步双写等模式。
rocketmq-store
存储层实现,同时包括了索引服务,高可用HA服务实现。
rocketmq-remoting
基于netty的底层通信实现,所有服务间的交互都基于此模块。
rocketmq-common
一些模块间通用的功能类,比如一些配置文件、常量。
rocketmq-client
java版本的mq客户端实现
rocketmq-filter
消息过滤服务,相当于在broker和consumer中间加入了一个filter代理。
rocketmq-srvutil
解析命令行的工具类ServerUtil。
mq集群管理工具,提供了消息查询等功能
RocketMQ服务启动 这个类是服务启动时执行,初始化了发送消息、消费消息、清理过期请求等各种线程池和监听事件。
了解了mq服务启动的过程,接下来,我们按照一条消息从客户端发出,最终到服务端的存储层并如何落盘,这一条调用链来分析源码,了解一条消息是怎么处理的。
源码分析之消息的生产 RocketMQ是一个消息中间件,消息中间件最大的功能就是处理消息,所以我们从消息的角度来做一次源码分析,分析消息的来龙去脉。因为RocketMQ本身解耦的,我们从两个独立的部分,消息的生产和消息的消费两大部分入手。
Client中的消息发送 源码跟踪
前面讲过生产者发送有,单向发送,可靠同步发送和可靠异步发送,我们分析消息可靠同步发送的接口代码。 代码见
rocketmq-all-4.4.0\client\
DefaultMQProducer.send();
DefaultMQProducerImpl.send();
DefaultMQProducerImpl.sendDefaultImpl();
核心分析sendDefaultImpl方法 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 private SendResult sendDefaultImpl (Message msg,final CommunicationMode communicationMode, final SendCallback sendCallback,final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { this .makeSureStateOK(); Validators.checkMessage(msg, this .defaultMQProducer); final long invokeID = random.nextLong(); long beginTimestampFirst = System.currentTimeMillis(); long beginTimestampPrev = beginTimestampFirst; long endTimestamp = beginTimestampFirst; TopicPublishInfo topicPublishInfo = this .tryToFindTopicPublishInfo(msg.getTopic()); if (topicPublishInfo != null && topicPublishInfo.ok()) { boolean callTimeout = false ; MessageQueue mq = null ; Exception exception = null ; SendResult sendResult = null ; int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this .defaultMQProducer.getRetryTimesWhenSendFailed() : 1 ; int times = 0 ; String[] brokersSent = new String [timesTotal]; for (; times < timesTotal; times++) { String lastBrokerName = null == mq ? null : mq.getBrokerName(); MessageQueue mqSelected = this .selectOneMessageQueue(topicPublishInfo, lastBrokerName); if (mqSelected != null ) { mq = mqSelected; brokersSent[times] = mq.getBrokerName(); try { beginTimestampPrev = System.currentTimeMillis(); long costTime = beginTimestampPrev - beginTimestampFirst; if (timeout < costTime) { callTimeout = true ; break ; } sendResult = this .sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime); endTimestamp = System.currentTimeMillis(); this .updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false ); switch (communicationMode) { case ASYNC: return null ; case ONEWAY: return null ; case SYNC: if (sendResult.getSendStatus() != SendStatus.SEND_OK) { if (this .defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) { continue ; } } return sendResult; default : break ; } } catch (RemotingException e) { }
获取主题路由相关信息。
for循环发送(发送次数由retryTimesWhenSendFailed+1来决定)。
调用sendKernelImpl方法,下面详细分析sendKernelImpl方法。
sendKernelImpl方法
sendKernelImpl是发送的核心方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 private SendResult sendKernelImpl (final Message msg, final MessageQueue mq, final CommunicationMode communicationMode, final SendCallback sendCallback, final TopicPublishInfo topicPublishInfo, final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { long beginStartTime = System.currentTimeMillis(); String brokerAddr = this .mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName()); if (null == brokerAddr) { tryToFindTopicPublishInfo(mq.getTopic()); brokerAddr = this .mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName()); } SendMessageContext context = null ; if (brokerAddr != null ) { brokerAddr = MixAll.brokerVIPChannel(this .defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr); byte [] prevBody = msg.getBody(); try { if (!(msg instanceof MessageBatch)) { MessageClientIDSetter.setUniqID(msg); } int sysFlag = 0 ; boolean msgBodyCompressed = false ; if (this .tryToCompressMessage(msg)) { sysFlag |= MessageSysFlag.COMPRESSED_FLAG; msgBodyCompressed = true ; } final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED); if (tranMsg != null && Boolean.parseBoolean(tranMsg)) { sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE; } if (hasCheckForbiddenHook()) { CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext (); checkForbiddenContext.setNameSrvAddr(this .defaultMQProducer.getNamesrvAddr()); checkForbiddenContext.setGroup(this .defaultMQProducer.getProducerGroup()); checkForbiddenContext.setCommunicationMode(communicationMode); checkForbiddenContext.setBrokerAddr(brokerAddr); checkForbiddenContext.setMessage(msg); checkForbiddenContext.setMq(mq); checkForbiddenContext.setUnitMode(this .isUnitMode()); this .executeCheckForbiddenHook(checkForbiddenContext); } if (this .hasSendMessageHook()) { context = new SendMessageContext (); context.setProducer(this ); context.setProducerGroup(this .defaultMQProducer.getProducerGroup()); context.setCommunicationMode(communicationMode); context.setBornHost(this .defaultMQProducer.getClientIP()); context.setBrokerAddr(brokerAddr); context.setMessage(msg); context.setMq(mq); String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED); if (isTrans != null && isTrans.equals("true" )) { context.setMsgType(MessageType.Trans_Msg_Half); } if (msg.getProperty("__STARTDELIVERTIME" ) != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null ) { context.setMsgType(MessageType.Delay_Msg); } this .executeSendMessageHookBefore(context); } SendMessageRequestHeader requestHeader = new SendMessageRequestHeader (); requestHeader.setProducerGroup(this .defaultMQProducer.getProducerGroup()); requestHeader.setTopic(msg.getTopic()); requestHeader.setDefaultTopic(this .defaultMQProducer.getCreateTopicKey()); requestHeader.setDefaultTopicQueueNums(this .defaultMQProducer.getDefaultTopicQueueNums()); requestHeader.setQueueId(mq.getQueueId()); requestHeader.setSysFlag(sysFlag); requestHeader.setBornTimestamp(System.currentTimeMillis()); requestHeader.setFlag(msg.getFlag()); requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties())); requestHeader.setReconsumeTimes(0 ); requestHeader.setUnitMode(this .isUnitMode()); requestHeader.setBatch(msg instanceof MessageBatch); if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { String reconsumeTimes = MessageAccessor.getReconsumeTime(msg); if (reconsumeTimes != null ) { requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes)); MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME); } String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg); if (maxReconsumeTimes != null ) { requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes)); MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES); } } SendResult sendResult = null ; switch (communicationMode) { case ASYNC: Message tmpMessage = msg; if (msgBodyCompressed) { tmpMessage = MessageAccessor.cloneMessage(msg); msg.setBody(prevBody); } long costTimeAsync = System.currentTimeMillis() - beginStartTime; if (timeout < costTimeAsync) { throw new RemotingTooMuchRequestException ("sendKernelImpl call timeout" ); } sendResult = this .mQClientFactory.getMQClientAPIImpl().sendMessage( brokerAddr, mq.getBrokerName(), tmpMessage, requestHeader, timeout - costTimeAsync, communicationMode, sendCallback, topicPublishInfo, this .mQClientFactory, this .defaultMQProducer.getRetryTimesWhenSendAsyncFailed(), context, this ); break ; case ONEWAY: case SYNC: long costTimeSync = System.currentTimeMillis() - beginStartTime; if (timeout < costTimeSync) { throw new RemotingTooMuchRequestException ("sendKernelImpl call timeout" ); } sendResult = this .mQClientFactory.getMQClientAPIImpl().sendMessage( brokerAddr, mq.getBrokerName(), msg, requestHeader, timeout - costTimeSync, communicationMode, context, this ); break ; default : assert false ; break ; } if (this .hasSendMessageHook()) { context.setSendResult(sendResult); this .executeSendMessageHookAfter(context); } return sendResult; } catch (RemotingException e) { if (this .hasSendMessageHook()) { context.setException(e); this .executeSendMessageHookAfter(context); } throw e; } catch (MQBrokerException e) { if (this .hasSendMessageHook()) { context.setException(e); this .executeSendMessageHookAfter(context); } throw e; } catch (InterruptedException e) { if (this .hasSendMessageHook()) { context.setException(e); this .executeSendMessageHookAfter(context); } throw e; } finally { msg.setBody(prevBody); } } throw new MQClientException ("The broker[" + mq.getBrokerName() + "] not exist" , null ); }
3.1)获取路由表信息(如果没有则会从NameServer中获取)。
3.2) 通过判断发送类型设置不同的入参,但是最终都调用了MQClientAPIImpl类的sendMessage方法。下面详细分析同步调用的sendMessage方法。
3.2.1) sendMessageSync –>NettyRemotingClient.invokeSync()方法完成发送。
sendMessageSync方法 1 2 3 4 5 6 7 8 9 10 private SendResult sendMessageSync (final String addr,final String brokerName, final Message msg, final long timeoutMillis, final RemotingCommand request ) throws RemotingException, MQBrokerException, InterruptedException { RemotingCommand response = this .remotingClient.invokeSync(addr, request, timeoutMillis); assert response != null ; return this .processSendResponse(brokerName, msg, response); }
在NettyRemotingAbstract中的invokeSyncImpl里面会大量使用Netty进行调用(Netty的版本是4.0.42.Final)。
不同发送方式的sendResult处理不同。
核心关键点 消息重试
为什么RocketMQ中的消息重试是2?就是消息一般情况下发送三次(事不过三)
Netty:
这块Netty发送的详情见《网络协议和Netty》专题
为何要使用Netty作为高性能的通信库?
Netty的编程API使用简单,开发门槛低,无需编程者去关注和了解太多的NIO编程模型和概念;
对于编程者来说,可根据业务的要求进行定制化地开发,通过Netty的ChannelHandler对通信框架进行灵活的定制化扩展;
Netty框架本身支持拆包/解包,异常检测等机制,让编程者可以从JAVA NIO的繁琐细节中解脱,而只需要关注业务处理逻辑;
Netty解决了(准确地说应该是采用了另一种方式完美规避了)JDK NIO的Bug(Epoll bug,会导致Selector空轮询,最终导致CPU 100%);
Netty框架内部对线程,selector做了一些细节的优化,精心设计的reactor多线程模型,可以实现非常高效地并发处理;
Netty已经在多个开源项目(Hadoop的RPC框架avro使用Netty作为通信框架)中都得到了充分验证,健壮性/可靠性比较好。
总结 客户端发送消息流程比较简单,首先封装消息,然后根据NameServer返回的路由信息,然后把这些消息组成一个整体,最后调用Remoting模块使用Netty把消息发送给Broker。在里面包含了多种发送方式,同时也有消息重新发送机制。
Broker中消息的生产 因为在Broker启动流程中涉及到了非常复杂的封装,这里没有必要进行讲解,我们简单想一想,Broker 最核心的功能就是接收到消息然后把消息进行存储,那么我们就Broker中对于消息的处理流程进行分析。
源码跟踪
发送的消息到达broker,调用org.apache.rocketmq.broker.processor.SendMessageProcessor类的processRequest()方法,processRequest()调用sendMessag()。
processRequest 方法 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 @Override public RemotingCommand processRequest (ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { SendMessageContext mqtraceContext; switch (request.getCode()) { case RequestCode.CONSUMER_SEND_MSG_BACK: return this .consumerSendMsgBack(ctx, request); default : SendMessageRequestHeader requestHeader = parseRequestHeader(request); if (requestHeader == null ) { return null ; } mqtraceContext = buildMsgContext(ctx, requestHeader); this .executeSendMessageHookBefore(ctx, request, mqtraceContext); RemotingCommand response; if (requestHeader.isBatch()) { response = this .sendBatchMessage(ctx, request, mqtraceContext, requestHeader); } else { response = this .sendMessage(ctx, request, mqtraceContext, requestHeader); } this .executeSendMessageHookAfter(response, mqtraceContext); return response; } }
非批次发送消息sendMessag()。
sendMessage 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 private RemotingCommand sendMessage (final ChannelHandlerContext ctx, final RemotingCommand request, final SendMessageContext sendMessageContext, final SendMessageRequestHeader requestHeader) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class); final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader(); response.setOpaque(request.getOpaque()); response.addExtField(MessageConst.PROPERTY_MSG_REGION, this .brokerController.getBrokerConfig().getRegionId()); response.addExtField(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(this .brokerController.getBrokerConfig().isTraceOn())); log.debug("receive SendMessage request command, {}" , request); final long startTimstamp = this .brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp(); if (this .brokerController.getMessageStore().now() < startTimstamp) { response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark(String.format("broker unable to service, until %s" , UtilAll.timeMillisToHumanString2(startTimstamp))); return response; } response.setCode(-1 ); super .msgCheck(ctx, requestHeader, response); if (response.getCode() != -1 ) { return response; } final byte [] body = request.getBody(); int queueIdInt = requestHeader.getQueueId(); TopicConfig topicConfig = this .brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic()); if (queueIdInt < 0 ) { queueIdInt = Math.abs(this .random.nextInt() % 99999999 ) % topicConfig.getWriteQueueNums(); } MessageExtBrokerInner msgInner = new MessageExtBrokerInner (); msgInner.setTopic(requestHeader.getTopic()); msgInner.setQueueId(queueIdInt); if (!handleRetryAndDLQ(requestHeader, response, request, msgInner, topicConfig)) { return response; } msgInner.setBody(body); msgInner.setFlag(requestHeader.getFlag()); MessageAccessor.setProperties(msgInner, MessageDecoder.string2messageProperties(requestHeader.getProperties())); msgInner.setPropertiesString(requestHeader.getProperties()); msgInner.setBornTimestamp(requestHeader.getBornTimestamp()); msgInner.setBornHost(ctx.channel().remoteAddress()); msgInner.setStoreHost(this .getStoreHost()); msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes()); PutMessageResult putMessageResult = null ; Map<String, String> oriProps = MessageDecoder.string2messageProperties(requestHeader.getProperties()); String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED); if (traFlag != null && Boolean.parseBoolean(traFlag)) { if (this .brokerController.getBrokerConfig().isRejectTransactionMessage()) { response.setCode(ResponseCode.NO_PERMISSION); response.setRemark( "the broker[" + this .brokerController.getBrokerConfig().getBrokerIP1() + "] sending transaction message is forbidden" ); return response; } putMessageResult = this .brokerController.getTransactionalMessageService().prepareMessage(msgInner); } else { putMessageResult = this .brokerController.getMessageStore().putMessage(msgInner); } return handlePutMessageResult(putMessageResult, response, request, msgInner, responseHeader, sendMessageContext, ctx, queueIdInt); }
2) 消息存储(注意,这里都只存储commitlog),调用DefaultMessageStore.putMessage()方法。
putMessage 方法 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 public PutMessageResult putMessage (MessageExtBrokerInner msg) { if (this .shutdown) { log.warn("message store has shutdown, so putMessage is forbidden" ); return new PutMessageResult (PutMessageStatus.SERVICE_NOT_AVAILABLE, null ); } if (BrokerRole.SLAVE == this .messageStoreConfig.getBrokerRole()) { long value = this .printTimes.getAndIncrement(); if ((value % 50000 ) == 0 ) { log.warn("message store is slave mode, so putMessage is forbidden " ); } return new PutMessageResult (PutMessageStatus.SERVICE_NOT_AVAILABLE, null ); } if (!this .runningFlags.isWriteable()) { long value = this .printTimes.getAndIncrement(); if ((value % 50000 ) == 0 ) { log.warn("message store is not writeable, so putMessage is forbidden " + this .runningFlags.getFlagBits()); } return new PutMessageResult (PutMessageStatus.SERVICE_NOT_AVAILABLE, null ); } else { this .printTimes.set(0 ); } if (msg.getTopic().length() > Byte.MAX_VALUE) { log.warn("putMessage message topic length too long " + msg.getTopic().length()); return new PutMessageResult (PutMessageStatus.MESSAGE_ILLEGAL, null ); } if (msg.getPropertiesString() != null && msg.getPropertiesString().length() > Short.MAX_VALUE) { log.warn("putMessage message properties length too long " + msg.getPropertiesString().length()); return new PutMessageResult (PutMessageStatus.PROPERTIES_SIZE_EXCEEDED, null ); } if (this .isOSPageCacheBusy()) { return new PutMessageResult (PutMessageStatus.OS_PAGECACHE_BUSY, null ); } long beginTime = this .getSystemClock().now(); PutMessageResult result = this .commitLog.putMessage(msg); long eclipseTime = this .getSystemClock().now() - beginTime; if (eclipseTime > 500 ) { log.warn("putMessage not in lock eclipse time(ms)={}, bodyLength={}" , eclipseTime, msg.getBody().length); } this .storeStatsService.setPutMessageEntireTimeMax(eclipseTime); if (null == result || !result.isOk()) { this .storeStatsService.getPutMessageFailedTimes().incrementAndGet(); } return result; }
2.1) 这里进行commitlog的提交,调用CommitLog.putMessage()。
putMessage 方法 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 public PutMessageResult putMessage (final MessageExtBrokerInner msg) { msg.setStoreTimestamp(System.currentTimeMillis()); msg.setBodyCRC(UtilAll.crc32(msg.getBody())); AppendMessageResult result = null ; StoreStatsService storeStatsService = this .defaultMessageStore.getStoreStatsService(); String topic = msg.getTopic(); int queueId = msg.getQueueId(); final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag()); if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) { if (msg.getDelayTimeLevel() > 0 ) { if (msg.getDelayTimeLevel() > this .defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) { msg.setDelayTimeLevel(this .defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()); } topic = ScheduleMessageService.SCHEDULE_TOPIC; queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel()); MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic()); MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId())); msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties())); msg.setTopic(topic); msg.setQueueId(queueId); } } long eclipseTimeInLock = 0 ; MappedFile unlockMappedFile = null ; MappedFile mappedFile = this .mappedFileQueue.getLastMappedFile(); putMessageLock.lock(); try { long beginLockTimestamp = this .defaultMessageStore.getSystemClock().now(); this .beginTimeInLock = beginLockTimestamp; msg.setStoreTimestamp(beginLockTimestamp); if (null == mappedFile || mappedFile.isFull()) { mappedFile = this .mappedFileQueue.getLastMappedFile(0 ); } if (null == mappedFile) { log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString()); beginTimeInLock = 0 ; return new PutMessageResult (PutMessageStatus.CREATE_MAPEDFILE_FAILED, null ); } result = mappedFile.appendMessage(msg, this .appendMessageCallback); switch (result.getStatus()) { case PUT_OK: break ; case END_OF_FILE: unlockMappedFile = mappedFile; mappedFile = this .mappedFileQueue.getLastMappedFile(0 ); if (null == mappedFile) { log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString()); beginTimeInLock = 0 ; return new PutMessageResult (PutMessageStatus.CREATE_MAPEDFILE_FAILED, result); } result = mappedFile.appendMessage(msg, this .appendMessageCallback); break ; case MESSAGE_SIZE_EXCEEDED: case PROPERTIES_SIZE_EXCEEDED: beginTimeInLock = 0 ; return new PutMessageResult (PutMessageStatus.MESSAGE_ILLEGAL, result); case UNKNOWN_ERROR: beginTimeInLock = 0 ; return new PutMessageResult (PutMessageStatus.UNKNOWN_ERROR, result); default : beginTimeInLock = 0 ; return new PutMessageResult (PutMessageStatus.UNKNOWN_ERROR, result); } eclipseTimeInLock = this .defaultMessageStore.getSystemClock().now() - beginLockTimestamp; beginTimeInLock = 0 ; } finally { putMessageLock.unlock(); } if (eclipseTimeInLock > 500 ) { log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}" , eclipseTimeInLock, msg.getBody().length, result); } if (null != unlockMappedFile && this .defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) { this .defaultMessageStore.unlockMappedFile(unlockMappedFile); } PutMessageResult putMessageResult = new PutMessageResult (PutMessageStatus.PUT_OK, result); storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet(); storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes()); handleDiskFlush(result, putMessageResult, msg); handleHA(result, putMessageResult, msg); return putMessageResult; }
2.1.1 )在MappedFile类中,处理存储都是使用MappedFile这个类进行处理的,最终调用appendMessage方法。appendMessagesInner方法中,这里进行文件的追加(AppendMessageCallback接口的实现DefaultAppendMessageCallback在CommitLog类中,是一个内部类)。
appendMessagesInner 方法 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public AppendMessageResult appendMessagesInner (final MessageExt messageExt, final AppendMessageCallback cb) { assert messageExt != null ; assert cb != null ; int currentPos = this .wrotePosition.get(); if (currentPos < this .fileSize) { ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this .mappedByteBuffer.slice(); byteBuffer.position(currentPos); AppendMessageResult result = null ; if (messageExt instanceof MessageExtBrokerInner) { result = cb.doAppend(this .getFileFromOffset(), byteBuffer, this .fileSize - currentPos, (MessageExtBrokerInner) messageExt); } else if (messageExt instanceof MessageExtBatch) { result = cb.doAppend(this .getFileFromOffset(), byteBuffer, this .fileSize - currentPos, (MessageExtBatch) messageExt); } else { return new AppendMessageResult (AppendMessageStatus.UNKNOWN_ERROR); } this .wrotePosition.addAndGet(result.getWroteBytes()); this .storeTimestamp = result.getStoreTimestamp(); return result; } log.error("MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}" , currentPos, this .fileSize); return new AppendMessageResult (AppendMessageStatus.UNKNOWN_ERROR); }
2.2) 在commitlog类中doAppend方法中进行commitlog的处理,还是基于byteBuffer的操作。
1 public AppendMessageResult doAppend (final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank,final MessageExtBrokerInner msgInner) ;
2.3 )在commitlog类中doAppend方法中进行返回,将消息写入的内存的位置信息、写入耗时封装为AppendMessageResult对象返回。
核心关键点 Commitlog,RocketMQ接收到消息后,首先是写入Commitlog文件,按照顺序进行写入,使用NIO技术。在Commitlog中putMessage最后通过判断配置文件的主从同步类型和刷盘类型,进行刷盘。
总结 借助Java NIO的力量,使得I/O性能十分高,当消息来的时候,顺序写入CommitLog。
RocketMQ下主要有三类大文件:commitlog文件、Index文件,consumequeue文件,对于三类大文件,使用的就是NIO的MappedByteBuffer类来提高读写性能(主要是刷盘方法中)。这个类是文件内存映射的相关类,支持随机读和顺序写。在RocketMQ中,被封装成了MappedFile类。
Broker中更新消息队列和索引文件 消息进入Commitlog文件还不够,因为对于消费者来说,他们必须要看到ConsumeQueue和IndexFile(ConsumeQueue是因为消费要根据队列进行消费,另外没有索引文件IndexFile,消息的查找会出现很大的延迟)。所以RocketMQ通过开启一个线程ReputMessageService来监听CommitLog文件更新事件,如果有新的消息,则及时更新ComsumeQueue、IndexFile文件。
源码跟踪
DefaultMessageStore类中的内部类ReputMessageService专门处理此项任务。
ReputMessageService类的run(),默认1毫秒处理一次(文件从CommitLog到ComsumeQueue和Index)
ReputMessageService#run 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 @Override public void run () { DefaultMessageStore.log.info(this .getServiceName() + " service started" ); while (!this .isStopped()) { try { Thread.sleep(1 ); this .doReput(); } catch (Exception e) { DefaultMessageStore.log.warn(this .getServiceName() + " service has exception. " , e); } } DefaultMessageStore.log.info(this .getServiceName() + " service end" ); }
ReputMessageService类的doReput()方法。
ReputMessageService#doReput 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 private void doReput () { for (boolean doNext = true ; this .isCommitLogAvailable() && doNext; ) { if (DefaultMessageStore.this .getMessageStoreConfig().isDuplicationEnable() && this .reputFromOffset >= DefaultMessageStore.this .getConfirmOffset()) { break ; } SelectMappedBufferResult result = DefaultMessageStore.this .commitLog.getData(reputFromOffset); if (result != null ) { try { this .reputFromOffset = result.getStartOffset(); for (int readSize = 0 ; readSize < result.getSize() && doNext; ) { DispatchRequest dispatchRequest = DefaultMessageStore.this .commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false , false ); int size = dispatchRequest.getMsgSize(); if (dispatchRequest.isSuccess()) { if (size > 0 ) { DefaultMessageStore.this .doDispatch(dispatchRequest); if (BrokerRole.SLAVE != DefaultMessageStore.this .getMessageStoreConfig().getBrokerRole() && DefaultMessageStore.this .brokerConfig.isLongPollingEnable()) {DefaultMessageStore.this .messageArrivingListener.arriving(dispatchRequest.getTopic(), dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1 , dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(), dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap()); } this .reputFromOffset += size; readSize += size; if (DefaultMessageStore.this .getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) { DefaultMessageStore.this .storeStatsService .getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).incrementAndGet(); DefaultMessageStore.this .storeStatsService .getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic()) .addAndGet(dispatchRequest.getMsgSize()); } } else if (size == 0 ) { this .reputFromOffset = DefaultMessageStore.this .commitLog.rollNextFile(this .reputFromOffset); readSize = result.getSize(); } } else if (!dispatchRequest.isSuccess()) { if (size > 0 ) { log.error("[BUG]read total count not equals msg total size. reputFromOffset={}" , reputFromOffset); this .reputFromOffset += size; } else { doNext = false ; if (DefaultMessageStore.this .brokerConfig.getBrokerId() == MixAll.MASTER_ID) { log.error("[BUG]the master dispatch message to consume queue error, COMMITLOG OFFSET: {}" , this .reputFromOffset); this .reputFromOffset += result.getSize() - readSize; } } } } } finally { result.release(); } } else { doNext = false ; } } }
) ReputMessageService类的doReput()方法中,doDispatch最终会构建(构建消息消费队)和(构建索引文件)。
核心关键点 定时任务来处理的消息存储转换,处理核心类是DefaultMessageStore类中的内部类ReputMessageService。
总结 定时任务来处理的消息存储转换,处理核心类是DefaultMessageStore类中的内部类ReputMessageService。
消息的消费
消息消费分为推和拉两种模式,这里重点分析推模式。
Client中的消费者启动流程
DefaultMQPushConsumerImpl#start()方法,其中重点就是 consumer.start()
DefaultMQPushConsumer.start() ->DefaultMQPushConsumerImpl.start()
源码跟踪步骤
检查配置信息this.checkConfig();
加工订阅信息(同时,如果消息消费模式为集群模式,还需要为该消费组创建一个重试主题。)
copySubscription 方法 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 private void copySubscription () throws MQClientException { try { Map<String, String> sub = this .defaultMQPushConsumer.getSubscription(); if (sub != null ) { for (final Map.Entry<String, String> entry : sub.entrySet()) { final String topic = entry.getKey(); final String subString = entry.getValue(); SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this .defaultMQPushConsumer.getConsumerGroup(), topic, subString); this .rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData); } } if (null == this .messageListenerInner) { this .messageListenerInner = this .defaultMQPushConsumer.getMessageListener(); } switch (this .defaultMQPushConsumer.getMessageModel()) { case BROADCASTING: break ; case CLUSTERING: final String retryTopic = MixAll.getRetryTopic(this .defaultMQPushConsumer.getConsumerGroup()); SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this .defaultMQPushConsumer.getConsumerGroup(), retryTopic, SubscriptionData.SUB_ALL); this .rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData); break ; default : break ; } } catch (Exception e) { throw new MQClientException ("subscription exception" , e); } }
创建MQClientInstance实例
getAndCreateMQClientInstance 方法 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public MQClientInstance getAndCreateMQClientInstance (final ClientConfig clientConfig, RPCHook rpcHook) { String clientId = clientConfig.buildMQClientId(); MQClientInstance instance = this .factoryTable.get(clientId); if (null == instance) { instance = new MQClientInstance (clientConfig.cloneClientConfig(), this .factoryIndexGenerator.getAndIncrement(), clientId, rpcHook); MQClientInstance prev = this .factoryTable.putIfAbsent(clientId, instance); if (prev != null ) { instance = prev; log.warn("Returned Previous MQClientInstance for clientId:[{}]" , clientId); } else { log.info("Created new MQClientInstance for clientId:[{}]" , clientId); } } return instance; }
负载均衡
1 2 this .rebalanceImpl.setConsumerGroup(this .defaultMQPushConsumer.getConsumerGroup());this .rebalanceImpl.setMessageModel(this .defaultMQPushConsumer.getMessageModel());
队列默认分配算法
1 2 this .rebalanceImpl.setAllocateMessageQueueStrategy(defaultMQPushConsumer.getAllocateMessageQueueStrategy());this .rebalanceImpl.setmQClientFactory(this .mQClientFactory);
pullAPIWrapper拉取消息
1 2 this .pullAPIWrapper = new PullAPIWrapper (mQClientFactory,defaultMQPushConsumer.getConsumerGroup(), isUnitMode());this .pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
消费进度存储
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 if (this .defaultMQPushConsumer.getOffsetStore() != null ) { this .offsetStore = this .defaultMQPushConsumer.getOffsetStore(); } else { switch (this .defaultMQPushConsumer.getMessageModel()) { case BROADCASTING: this .offsetStore = new LocalFileOffsetStore (this .mQClientFactory, this .defaultMQPushConsumer.getConsumerGroup()); break ; case CLUSTERING: this .offsetStore = new RemoteBrokerOffsetStore (this .mQClientFactory, this .defaultMQPushConsumer.getConsumerGroup()); break ; default : break ; } this .defaultMQPushConsumer.setOffsetStore(this .offsetStore); }
加载消息进度
1 2 3 4 5 6 7 8 9 10 11 12 13 this .offsetStore.load(); @Override public void load () throws MQClientException { OffsetSerializeWrapper offsetSerializeWrapper = this .readLocalOffset(); if (offsetSerializeWrapper != null && offsetSerializeWrapper.getOffsetTable() != null ) { offsetTable.putAll(offsetSerializeWrapper.getOffsetTable()); for (MessageQueue mq : offsetSerializeWrapper.getOffsetTable().keySet()) { AtomicLong offset = offsetSerializeWrapper.getOffsetTable().get(mq); log.info("load consumer's offset, {} {} {}" ,this .groupName, mq, offset.get()); } } }
判断是顺序消息还是并发消息
1 2 3 4 5 6 7 8 if (this .getMessageListenerInner() instanceof MessageListenerOrderly) { this .consumeOrderly = true ; this .consumeMessageService = new ConsumeMessageOrderlyService (this , (MessageListenerOrderly) this .getMessageListenerInner()); } else if (this .getMessageListenerInner() instanceof MessageListenerConcurrently) { this .consumeOrderly = false ; this .consumeMessageService = new ConsumeMessageConcurrentlyService (this , (MessageListenerConcurrently) this .getMessageListenerInner()); }
顺序消息ConsumeMessageOrderlyService方法实现 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public ConsumeMessageOrderlyService (DefaultMQPushConsumerImpl defaultMQPushConsumerImpl, MessageListenerOrderly messageListener) { this .defaultMQPushConsumerImpl = defaultMQPushConsumerImpl; this .messageListener = messageListener; this .defaultMQPushConsumer = this .defaultMQPushConsumerImpl.getDefaultMQPushConsumer(); this .consumerGroup = this .defaultMQPushConsumer.getConsumerGroup(); this .consumeRequestQueue = new LinkedBlockingQueue <Runnable>(); this .consumeExecutor = new ThreadPoolExecutor ( this .defaultMQPushConsumer.getConsumeThreadMin(), this .defaultMQPushConsumer.getConsumeThreadMax(), 1000 * 60 , TimeUnit.MILLISECONDS, this .consumeRequestQueue, new ThreadFactoryImpl ("ConsumeMessageThread_" )); this .scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl ("ConsumeMessageScheduledThread_" )); }
并发消息ConsumeMessageConcurrentlyService实现 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public ConsumeMessageConcurrentlyService (DefaultMQPushConsumerImpl defaultMQPushConsumerImpl, MessageListenerConcurrently messageListener) { this .defaultMQPushConsumerImpl = defaultMQPushConsumerImpl; this .messageListener = messageListener; this .defaultMQPushConsumer = this .defaultMQPushConsumerImpl.getDefaultMQPushConsumer(); this .consumerGroup = this .defaultMQPushConsumer.getConsumerGroup(); this .consumeRequestQueue = new LinkedBlockingQueue <Runnable>(); this .consumeExecutor = new ThreadPoolExecutor ( this .defaultMQPushConsumer.getConsumeThreadMin(), this .defaultMQPushConsumer.getConsumeThreadMax(), 1000 * 60 , TimeUnit.MILLISECONDS, this .consumeRequestQueue, new ThreadFactoryImpl ("ConsumeMessageThread_" )); this .scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl ("ConsumeMessageScheduledThread_" )); this .cleanExpireMsgExecutors = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl ("CleanExpireMsgScheduledThread_" )); }
消息消费服务并启动
ConsumeMessageOrderlyService顺序消息方式 1 2 3 4 5 6 7 8 9 10 11 public void start () {if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this .defaultMQPushConsumerImpl.messageModel())) { this .scheduledExecutorService.scheduleAtFixedRate(new Runnable () { @Override public void run () { ConsumeMessageOrderlyService.this .lockMQPeriodically(); } }, 1000 * 1 , ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS); } }
ConsumeMessageConcurrentlyService并发消息方式 1 2 3 4 5 6 7 8 9 public void start () { this .cleanExpireMsgExecutors.scheduleAtFixedRate(new Runnable () { @Override public void run () { cleanExpireMsg(); } }, this .defaultMQPushConsumer.getConsumeTimeout(), this .defaultMQPushConsumer.getConsumeTimeout(), TimeUnit.MINUTES); }
注册消费者
1 2 3 4 5 6 7 boolean registerOK = mQClientFactory.registerConsumer(this .defaultMQPushConsumer.getConsumerGroup(), this );if (!registerOK) { this .serviceState = ServiceState.CREATE_JUST; this .consumeMessageService.shutdown(); ..... }
MQClientInstance.registerConsumer()方法 1 2 3 4 5 6 7 8 9 10 11 12 public boolean registerConsumer (final String group, final MQConsumerInner consumer) { if (null == group || null == consumer) { return false ; } MQConsumerInner prev = this .consumerTable.putIfAbsent(group, consumer); if (prev != null ) { log.warn("the consumer group[" + group + "] exist already." ); return false ; } return true ; }
MQClientInstance启动
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 public void start () throws MQClientException { synchronized (this ) { switch (this .serviceState) { case CREATE_JUST: this .serviceState = ServiceState.START_FAILED; if (null == this .clientConfig.getNamesrvAddr()) { this .mQClientAPIImpl.fetchNameServerAddr(); } this .mQClientAPIImpl.start(); this .startScheduledTask(); this .pullMessageService.start(); this .rebalanceService.start(); this .defaultMQProducer.getDefaultMQProducerImpl().start(false ); log.info("the client factory [{}] start OK" , this .clientId); this .serviceState = ServiceState.RUNNING; break ; case RUNNING: break ; case SHUTDOWN_ALREADY: break ; case START_FAILED: throw new MQClientException ("The Factory object[" + this .getClientId() + "] has been created before, and failed." , null ); default : break ; } } }
12.1) 定时任务startScheduledTask();
12.1.1) 每隔2分钟尝试获取一次NameServer地址
12.1.2) 每隔30S尝试更新主题路由信息
12.1.3) 每隔30S进行Broker心跳检测
12.1.4) 默认每隔5秒持久化ConsumeOffset
12.1.5)默认每隔1S检查线程池适配
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 private void startScheduledTask () { if (null == this .clientConfig.getNamesrvAddr()) { this .scheduledExecutorService.scheduleAtFixedRate(new Runnable () { @Override public void run () { try { MQClientInstance.this .mQClientAPIImpl.fetchNameServerAddr(); } catch (Exception e) { log.error("ScheduledTask fetchNameServerAddr exception" , e); } } }, 1000 * 10 , 1000 * 60 * 2 , TimeUnit.MILLISECONDS); } this .scheduledExecutorService.scheduleAtFixedRate(new Runnable () { @Override public void run () { try { MQClientInstance.this .updateTopicRouteInfoFromNameServer(); } catch (Exception e) { log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception" , e); } } }, 10 , this .clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS); this .scheduledExecutorService.scheduleAtFixedRate(new Runnable () { @Override public void run () { try { MQClientInstance.this .cleanOfflineBroker(); MQClientInstance.this .sendHeartbeatToAllBrokerWithLock(); } catch (Exception e) { log.error("ScheduledTask sendHeartbeatToAllBroker exception" , e); } } }, 1000 , this .clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS); this .scheduledExecutorService.scheduleAtFixedRate(new Runnable () { @Override public void run () { try { MQClientInstance.this .persistAllConsumerOffset(); } catch (Exception e) { log.error("ScheduledTask persistAllConsumerOffset exception" , e); } } }, 1000 * 10 , this .clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS); this .scheduledExecutorService.scheduleAtFixedRate(new Runnable () { @Override public void run () { try { MQClientInstance.this .adjustThreadPool(); } catch (Exception e) { log.error("ScheduledTask adjustThreadPool exception" , e); } } }, 1 , 1 , TimeUnit.MINUTES); }
12.2 开启拉消息服务(线程)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 @Override public void run () { log.info(this .getServiceName() + " service started" ); while (!this .isStopped()) { try { PullRequest pullRequest = this .pullRequestQueue.take(); this .pullMessage(pullRequest); } catch (InterruptedException ignored) { } catch (Exception e) { log.error("Pull Message Service Run Method exception" , e); } } log.info(this .getServiceName() + " service end" ); } private void pullMessage (final PullRequest pullRequest) { final MQConsumerInner consumer = this .mQClientFactory.selectConsumer(pullRequest.getConsumerGroup()); if (consumer != null ) { DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer; impl.pullMessage(pullRequest); } else { log.warn("No matched consumer for the PullRequest {}, drop it" , pullRequest); } }
消息拉取的关键方法DefaultMQPushConsumerImpl.pullMessage(); 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 public void pullMessage (final PullRequest pullRequest) { final ProcessQueue processQueue = pullRequest.getProcessQueue(); if (processQueue.isDropped()) { log.info("the pull request[{}] is dropped." , pullRequest.toString()); return ; } pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis()); try { this .makeSureStateOK(); } catch (MQClientException e) { log.warn("pullMessage exception, consumer state not ok" , e); this .executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION); return ; } if (this .isPause()) { log.warn("consumer was paused, execute pull request later. instanceName={}, group={}" , this .defaultMQPushConsumer.getInstanceName(), this .defaultMQPushConsumer.getConsumerGroup()); this .executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_SUSPEND); return ; } long cachedMessageCount = processQueue.getMsgCount().get(); long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024 ); if (cachedMessageCount > this .defaultMQPushConsumer.getPullThresholdForQueue()) { this .executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL); if ((queueFlowControlTimes++ % 1000 ) == 0 ) { this .defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes); } return ; } if (cachedMessageSizeInMiB > this .defaultMQPushConsumer.getPullThresholdSizeForQueue()) { this .executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL); if ((queueFlowControlTimes++ % 1000 ) == 0 ) { log.warn( "the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}" , this .defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes); } return ; } 如果大于默认值2000 ,那么触发流控 if (!this .consumeOrderly) { if (processQueue.getMaxSpan() > this .defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) { this .executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL); if ((queueMaxSpanFlowControlTimes++ % 1000 ) == 0 ) { processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(),pullRequest, queueMaxSpanFlowControlTimes); } return ; } } else { if (processQueue.isLocked()) { if (!pullRequest.isLockedFirst()) { final long offset = this .rebalanceImpl.computePullFromWhere(pullRequest.getMessageQueue()); boolean brokerBusy = offset < pullRequest.getNextOffset(); pullRequest, offset, brokerBusy); if (brokerBusy) { pullRequest, offset); } pullRequest.setLockedFirst(true ); pullRequest.setNextOffset(offset); } } else { this .executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION); log.info("pull message later because not locked in broker, {}" , pullRequest); return ; } } final SubscriptionData subscriptionData = this .rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic()); if (null == subscriptionData) { this .executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION); log.warn("find the consumer's subscription failed, {}" , pullRequest); return ; } final long beginTimestamp = System.currentTimeMillis(); PullCallback pullCallback = new PullCallback () { @Override public void onSuccess (PullResult pullResult) {...... } @Override public void onException (Throwable e) { ...... } }; boolean commitOffsetEnable = false ; long commitOffsetValue = 0L ; if (MessageModel.CLUSTERING == this .defaultMQPushConsumer.getMessageModel()) { commitOffsetValue = this .offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY); if (commitOffsetValue > 0 ) { commitOffsetEnable = true ; } } String subExpression = null ; boolean classFilter = false ; SubscriptionData sd = this .rebalanceImpl.getSubscriptionInner().get( pullRequest.getMessageQueue().getTopic()); if (sd != null ) { if (this .defaultMQPushConsumer.isPostSubscriptionWhenPull() && !sd.isClassFilterMode()) { subExpression = sd.getSubString(); } classFilter = sd.isClassFilterMode(); } int sysFlag = PullSysFlag.buildSysFlag( commitOffsetEnable, true , subExpression != null , classFilter ); try { this .pullAPIWrapper.pullKernelImpl( pullRequest.getMessageQueue(), subExpression, subscriptionData.getExpressionType(), subscriptionData.getSubVersion(), pullRequest.getNextOffset(), this .defaultMQPushConsumer.getPullBatchSize(), sysFlag, commitOffsetValue, BROKER_SUSPEND_MAX_TIME_MILLIS, CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND, CommunicationMode.ASYNC, pullCallback ); } catch (Exception e) { log.error("pullKernelImpl exception" , e); this .executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION); } }
12.3) 负载均衡服务(线程)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 @Override public void run () { log.info(this .getServiceName() + " service started" ); while (!this .isStopped()) { this .waitForRunning(waitInterval); this .mqClientFactory.doRebalance(); } log.info(this .getServiceName() + " service end" ); } public void doRebalance () { for (Map.Entry<String, MQConsumerInner> entry : this .consumerTable.entrySet()) { MQConsumerInner impl = entry.getValue(); if (impl != null ) { try { impl.doRebalance(); } catch (Throwable e) { log.error("doRebalance exception" , e); } } } }
拉取方式DefaultMQPullConsumerImpl.doRebalance()代码 1 2 3 4 5 6 @Override public void doRebalance () { if (this .rebalanceImpl != null ) { this .rebalanceImpl.doRebalance(false ); } }
推送方式DefaultMQPushConsumerImpl.doRebalance()代码 1 2 3 4 5 6 @Override public void doRebalance () { if (!this .pause) { this .rebalanceImpl.doRebalance(this .isConsumeOrderly()); } }
不管是pull还是push方式最后都调用下面的方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public void doRebalance (final boolean isOrder) { Map<String, SubscriptionData> subTable = this .getSubscriptionInner(); if (subTable != null ) { for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) { final String topic = entry.getKey(); try { this .rebalanceByTopic(topic, isOrder); } catch (Throwable e) { if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { log.warn("rebalanceByTopic Exception" , e); } } } } this .truncateMessageQueueNotMyTopic(); }
更新TopicRouteData
1 2 3 4 5 6 7 8 9 10 11 12 this .updateTopicSubscribeInfoWhenSubscriptionChanged();private void updateTopicSubscribeInfoWhenSubscriptionChanged () { Map<String, SubscriptionData> subTable = this .getSubscriptionInner(); if (subTable != null ) { for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) { final String topic = entry.getKey(); this .mQClientFactory.updateTopicRouteInfoFromNameServer(topic); } } }
检测broker状态(默认3秒检查一次)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 this .mQClientFactory.checkClientInBroker(); public void checkClientInBroker () throws MQClientException { Iterator<Entry<String, MQConsumerInner>> it = this .consumerTable.entrySet().iterator(); while (it.hasNext()) { Entry<String, MQConsumerInner> entry = it.next(); Set<SubscriptionData> subscriptionInner = entry.getValue().subscriptions(); if (subscriptionInner == null || subscriptionInner.isEmpty()) { return ; } for (SubscriptionData subscriptionData : subscriptionInner) { if (ExpressionType.isTagType(subscriptionData.getExpressionType())) { continue ; } String addr = findBrokerAddrByTopic(subscriptionData.getTopic()); if (addr != null ) { try { this .getMQClientAPIImpl().checkClientInBroker( addr, entry.getKey(), this .clientId, subscriptionData, 3 * 1000 ); } catch (Exception e) { if (e instanceof MQClientException) { throw (MQClientException) e; } else { throw new MQClientException (....) } } } } } }
MQClientAPIImpl.checkClientInBroker()方法代码: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public void checkClientInBroker (final String brokerAddr, final String consumerGroup, final String clientId, final SubscriptionData subscriptionData, final long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQClientException { RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CHECK_CLIENT_CONFIG, null ); CheckClientRequestBody requestBody = new CheckClientRequestBody (); requestBody.setClientId(clientId); requestBody.setGroup(consumerGroup); requestBody.setSubscriptionData(subscriptionData); request.setBody(requestBody.encode()); RemotingCommand response = this .remotingClient.invokeSync(MixAll.brokerVIPChannel(this .clientConfig.isVipChannelEnabled(), brokerAddr), request, timeoutMillis); assert response != null ; if (ResponseCode.SUCCESS != response.getCode()) { throw new MQClientException (response.getCode(), response.getRemark()); } }
NettyRemotingClient.invokeSync使用Netty完成RPC调用: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 @Override public RemotingCommand invokeSync (String addr, final RemotingCommand request, long timeoutMillis) throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException { long beginStartTime = System.currentTimeMillis(); final Channel channel = this .getAndCreateChannel(addr); if (channel != null && channel.isActive()) { try { doBeforeRpcHooks(addr, request); long costTime = System.currentTimeMillis() - beginStartTime; if (timeoutMillis < costTime) { throw new RemotingTimeoutException ("invokeSync call timeout" ); } RemotingCommand response = this .invokeSyncImpl(channel, request, timeoutMillis - costTime); doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(channel), request, response); return response; } catch (RemotingSendRequestException e) { log.warn("invokeSync: send request exception, so close the channel[{}]" , addr); this .closeChannel(addr, channel); throw e; } catch (RemotingTimeoutException e) { if (nettyClientConfig.isClientCloseSocketIfTimeout()) { this .closeChannel(addr, channel); log.warn("invokeSync: close socket because of timeout, {}ms, {}" , timeoutMillis, addr); } log.warn("invokeSync: wait response timeout exception, the channel[{}]" , addr); throw e; } } else { this .closeChannel(addr, channel); throw new RemotingConnectException (addr); } }
NettyRemotingAbstract.invokeSyncImpl这个地方就是要进行消息发送(网络的处理Socket) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 public RemotingCommand invokeSyncImpl (final Channel channel, final RemotingCommand request, final long timeoutMillis) throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException { final int opaque = request.getOpaque(); try { final ResponseFuture responseFuture = new ResponseFuture (channel, opaque, timeoutMillis, null , null ); this .responseTable.put(opaque, responseFuture); final SocketAddress addr = channel.remoteAddress(); channel.writeAndFlush(request).addListener(new ChannelFutureListener () { @Override public void operationComplete (ChannelFuture f) throws Exception { if (f.isSuccess()) { responseFuture.setSendRequestOK(true ); return ; } else { responseFuture.setSendRequestOK(false ); } responseTable.remove(opaque); responseFuture.setCause(f.cause()); responseFuture.putResponse(null ); log.warn("send a request command to channel <" + addr + "> failed." ); } }); RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis); if (null == responseCommand) { if (responseFuture.isSendRequestOK()) { throw new RemotingTimeoutException (RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis, responseFuture.getCause()); } else { throw new RemotingSendRequestException (RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause()); } } return responseCommand; } finally { this .responseTable.remove(opaque); } }
发送心跳
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 this .mQClientFactory.sendHeartbeatToAllBrokerWithLock(); public void sendHeartbeatToAllBrokerWithLock () { if (this .lockHeartbeat.tryLock()) { try { this .sendHeartbeatToAllBroker(); this .uploadFilterClassSource(); } catch (final Exception e) { log.error("sendHeartbeatToAllBroker exception" , e); } finally { this .lockHeartbeat.unlock(); } } else { log.warn("lock heartBeat, but failed." ); } }
重新负载
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 this .mQClientFactory.rebalanceImmediately(); public void rebalanceImmediately () { this .rebalanceService.wakeup(); } @Override public void run () { log.info(this .getServiceName() + " service started" ); while (!this .isStopped()) { this .waitForRunning(waitInterval); this .mqClientFactory.doRebalance(); } log.info(this .getServiceName() + " service end" ); }
核心关键点 在RocketMQ中,推模式还是使用拉模式进行消息的处理的,在MQClientInstance启动过程中启动了哪些定时任务定时任务中12步中,包括了消费过程中的各种信息,这些信息都是定时去处理的。
总结 在RocketMQ中,推模式对比拉模式封装了非常多的功能,比如负载均衡、队列分配、消费进度存储、顺序消息、心跳检测等。
消息的拉取 分析一下PUSH模式下的集群模式消息拉取代码。
同一个消费组内有多个消费者,一个topic主题下又有多个消费队列,那么消费者是怎么分配这些消费队列的呢,从上面的启动的代码中是不是还记 得在MQClientInstance#start中,启动了pullMessageService服务线程,这个服务线程的作用就是拉取消息,我们去看下它的run方法:
源码跟踪
MQClientInstance. start() -> PullMessageService. start()
PullMessageService. pullMessage() -> DefaultMQPushConsumerImpl. pullMessage()方法
pullMessage方法中包含了消息拉取的核心部分,包括处理暂停、流量控制、方法回调等。
核心关键点 流量控制
当processQueue没有消费的消息的数量达到(默认1000个)会触发流量控制。
当processQueue中没有消费的消息体总大小大于(默认100m)时,触发流控。
消息的最大位置和最小位置的差值如果大于默认值2000,那么触发流控。
总结 消息进行拉取时的核心是流量控制,这个也是解决客户端与服务端消费能力不对等的一种方案。
消息的消费
消息拉取到了之后,消费者要进行消息的消费,消息的消费主要是consumeMessageService线程做的,我们先看下consumeMessageService的构造函数
源码跟踪 1)ConsumeMessageConcurrentlyService构造函数,在这个构造函数中,new了一个名字叫consumeExecutor的线程池,在并发消费的模式下,这个线程池也就是消费消息的方式
2)通过回调方式的模式,提交到consumeMessageService中(ConsumeMessageConcurrentlyService实现类),进入submitConsumeRequest方法,这个就是提交偏移量的处理。
submitConsumeRequest 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 @Override public void submitConsumeRequest ( final List<MessageExt> msgs, final ProcessQueue processQueue, final MessageQueue messageQueue, final boolean dispatchToConsume) { final int consumeBatchSize = this .defaultMQPushConsumer.getConsumeMessageBatchMaxSize(); if (msgs.size() <= consumeBatchSize) { ConsumeRequest consumeRequest = new ConsumeRequest (msgs, processQueue, messageQueue); try { this .consumeExecutor.submit(consumeRequest); } catch (RejectedExecutionException e) { this .submitConsumeRequestLater(consumeRequest); } } else { for (int total = 0 ; total < msgs.size(); ) { List<MessageExt> msgThis = new ArrayList <MessageExt>(consumeBatchSize); for (int i = 0 ; i < consumeBatchSize; i++, total++) { if (total < msgs.size()) { msgThis.add(msgs.get(total)); } else { break ; } } ConsumeRequest consumeRequest = new ConsumeRequest (msgThis, processQueue, messageQueue); try { this .consumeExecutor.submit(consumeRequest); } catch (RejectedExecutionException e) { for (; total < msgs.size(); total++) { msgThis.add(msgs.get(total)); } this .submitConsumeRequestLater(consumeRequest); } } } }
3)再直接进入ConsumeMessageConcurrentlyService中的内部类 ConsumeRequest中的run方法
ConsumeRequest 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 class ConsumeRequest implements Runnable { private final List<MessageExt> msgs; private final ProcessQueue processQueue; private final MessageQueue messageQueue; public ConsumeRequest (List<MessageExt> msgs, ProcessQueue processQueue, MessageQueue messageQueue) { this .msgs = msgs; this .processQueue = processQueue; this .messageQueue = messageQueue; } public List<MessageExt> getMsgs () { return msgs; } public ProcessQueue getProcessQueue () { return processQueue; } @Override public void run () { if (this .processQueue.isDropped()) { log.info("the message queue not be able to consume, because it's dropped. group={} {}" , ConsumeMessageConcurrentlyService.this .consumerGroup, this .messageQueue); return ; } MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this .messageListener; ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext (messageQueue); ConsumeConcurrentlyStatus status = null ; ConsumeMessageContext consumeMessageContext = null ; if (ConsumeMessageConcurrentlyService.this .defaultMQPushConsumerImpl.hasHook()) { consumeMessageContext = new ConsumeMessageContext (); consumeMessageContext.setConsumerGroup(defaultMQPushConsumer.getConsumerGroup()); consumeMessageContext.setProps(new HashMap <String, String>()); consumeMessageContext.setMq(messageQueue); consumeMessageContext.setMsgList(msgs); consumeMessageContext.setSuccess(false ); ConsumeMessageConcurrentlyService.this .defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext); } long beginTimestamp = System.currentTimeMillis(); boolean hasException = false ; ConsumeReturnType returnType = ConsumeReturnType.SUCCESS; try { ConsumeMessageConcurrentlyService.this .resetRetryTopic(msgs); if (msgs != null && !msgs.isEmpty()) { for (MessageExt msg : msgs) { MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis())); } } status = listener.consumeMessage(Collections.unmodifiableList(msgs), context); } catch (Throwable e) { log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}" , RemotingHelper.exceptionSimpleDesc(e), ConsumeMessageConcurrentlyService.this .consumerGroup, msgs, messageQueue); hasException = true ; } long consumeRT = System.currentTimeMillis() - beginTimestamp; if (null == status) { if (hasException) { returnType = ConsumeReturnType.EXCEPTION; } else { returnType = ConsumeReturnType.RETURNNULL; } } else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000 ) { returnType = ConsumeReturnType.TIME_OUT; } else if (ConsumeConcurrentlyStatus.RECONSUME_LATER == status) { returnType = ConsumeReturnType.FAILED; } else if (ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status) { returnType = ConsumeReturnType.SUCCESS; } if (ConsumeMessageConcurrentlyService.this .defaultMQPushConsumerImpl.hasHook()) { consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name()); } if (null == status) { log.warn("consumeMessage return null, Group: {} Msgs: {} MQ: {}" , ConsumeMessageConcurrentlyService.this .consumerGroup, msgs, messageQueue); status = ConsumeConcurrentlyStatus.RECONSUME_LATER; } if (ConsumeMessageConcurrentlyService.this .defaultMQPushConsumerImpl.hasHook()) { consumeMessageContext.setStatus(status.toString()); consumeMessageContext.setSuccess(ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status); ConsumeMessageConcurrentlyService.this .defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext); } ConsumeMessageConcurrentlyService.this .getConsumerStatsManager() .incConsumeRT(ConsumeMessageConcurrentlyService.this .consumerGroup, messageQueue.getTopic(), consumeRT); if (!processQueue.isDropped()) { } else { ConsumeMessageConcurrentlyService.this .processConsumeResult(status, context, this ); log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}" , messageQueue, msgs); } } public MessageQueue getMessageQueue () { return messageQueue; } }
3.1)判断processQueue的dropped属性
3.2)拿到业务系统定义的消息监听listener
3.3)判断是否有钩子函数,执行before方法
3.4)调用resetRetryTopic方法设置消息的重试主题
3.5)执行listener.consumeMessage,业务系统具体去消费消息,
3.6)对消费结果的处理,进入processConsumeResult方法
3.6.1)集群模式下
3.6.2)失败的消息进入一个List
3.6.3)消费失败的数据会重新建立一个数据,使用一个定时任务,再次到 Client 中的消费者启动流程 . 源码跟踪 step 6(重试消息的时候会创建一个条新的消息,而不是用老的消息)
核心关键点
消费失败的数据会重新建立一个数据,使用一个定时任务,重试消息的时候会创建一个条新的消息,而不是用老的消息。
总结 不管是消费成功还是消费失败的消息,都会更新消费进度,首先从 processQueue 中移除所有消费成功的消息并返回 offset,这里要注意一点,就是这个 offset 是 processQueue 中的 msgTreeMap 的最小的 key,为什么要这样做呢?因为消费进度的推进是 offset 决定的,因为是线程池消费,不能保证先消费的是 offset 大的那条消息,所以推进消费进度只能取最小的那条消息的 offset,这样在消费端重启的时候就可能会导致消息重复消费。