抱歉,您的浏览器无法访问本站
本页面需要浏览器支持(启用)JavaScript
了解详情 >

RocketMQ源码分析

RocketMQ整体架构

RocketMQ主要的功能集中在NameServer、rocketmq-broker、rocketmq-remoting、rocketmq-store 4个模块,架构图如下所示:

项目结构如下

图中名字对应每一个工程中的 artifactId。

最主要的便是

整体模块如下

rocketmq-namesrv

命名服务,更新和路由发现broker服务。

​ NameServer要作用是为消息生产者、消息消费者提供关于主题Topic的路由信息,NameServer除了要存储路由的基础信息,还要能够管理Broker节点,包括路由注册、路由删除等功能。

rocketmq-broker

mq的核心

​ 它能接收producer和consumer的请求,并调用store层服务对消息进行处理。HA服务的基本单元,支持同步双写,异步双写等模式。

rocketmq-store

存储层实现,同时包括了索引服务,高可用HA服务实现。

rocketmq-remoting

基于netty的底层通信实现,所有服务间的交互都基于此模块。

rocketmq-common

一些模块间通用的功能类,比如一些配置文件、常量。

rocketmq-client

java版本的mq客户端实现

rocketmq-filter

消息过滤服务,相当于在broker和consumer中间加入了一个filter代理。

rocketmq-srvutil

解析命令行的工具类ServerUtil。

rocketmq-tools

mq集群管理工具,提供了消息查询等功能

RocketMQ服务启动

​ 这个类是服务启动时执行,初始化了发送消息、消费消息、清理过期请求等各种线程池和监听事件。

​ 了解了mq服务启动的过程,接下来,我们按照一条消息从客户端发出,最终到服务端的存储层并如何落盘,这一条调用链来分析源码,了解一条消息是怎么处理的。

源码分析之消息的生产

​ RocketMQ是一个消息中间件,消息中间件最大的功能就是处理消息,所以我们从消息的角度来做一次源码分析,分析消息的来龙去脉。因为RocketMQ本身解耦的,我们从两个独立的部分,消息的生产和消息的消费两大部分入手。

Client中的消息发送

源码跟踪

前面讲过生产者发送有,单向发送,可靠同步发送和可靠异步发送,我们分析消息可靠同步发送的接口代码。 代码见

rocketmq-all-4.4.0\client\

DefaultMQProducer.send();

DefaultMQProducerImpl.send();

DefaultMQProducerImpl.sendDefaultImpl();

核心分析sendDefaultImpl方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60

//TODO Client中的消息发送: 消息发送核心方法
private SendResult sendDefaultImpl(Message msg,final CommunicationMode communicationMode,
        final SendCallback sendCallback,final long timeout)
        throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    this.makeSureStateOK();
    Validators.checkMessage(msg, this.defaultMQProducer);
    final long invokeID = random.nextLong();
//第一次开始发送时间
    long beginTimestampFirst = System.currentTimeMillis();
    long beginTimestampPrev = beginTimestampFirst;
    long endTimestamp = beginTimestampFirst;
    //TODO(Client中的消息发送) 1:获取主题路由相关信息
    TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
//主题相关信息不为空且状态是OK
    if (topicPublishInfo != null && topicPublishInfo.ok()) {
        boolean callTimeout = false;
        MessageQueue mq = null;
        Exception exception = null;
        SendResult sendResult = null;
//计算重试次数 如果是同步的则 此时是1+默认重试次数(2) =3 否则就是只有1次
        int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
        int times = 0;
        String[] brokersSent = new String[timesTotal];
        //TODO(Client中的消息发送) 2: for循环发送,发送次数由retryTimesWhenSendFailed+1来决定
        for (; times < timesTotal; times++) {
            String lastBrokerName = null == mq ? null : mq.getBrokerName();
//选择一个队列
            MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
            if (mqSelected != null) {
                mq = mqSelected;
                brokersSent[times] = mq.getBrokerName();
                try {
                    beginTimestampPrev = System.currentTimeMillis();
                    long costTime = beginTimestampPrev - beginTimestampFirst;
                    if (timeout < costTime) {
                        callTimeout = true;
                        break;
                    }
  //TODO (Client中的消息发送) 3 :调用sendKernelImpl方法
  sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
                    endTimestamp = System.currentTimeMillis();
                    this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                    //TODO(Client中的消息发送)  4.不同发送方式的sendResult处理不同
                    switch (communicationMode) {
                        case ASYNC:
                            return null;
                        case ONEWAY:
                            return null;
                        case SYNC:
                            if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
                                if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
                                    continue;
                                }
                            }
                            return sendResult;
                        default:
                            break;
                    }
                } catch (RemotingException e) { }
  1. 获取主题路由相关信息。

  2. for循环发送(发送次数由retryTimesWhenSendFailed+1来决定)。

  3. 调用sendKernelImpl方法,下面详细分析sendKernelImpl方法。

sendKernelImpl方法

sendKernelImpl是发送的核心方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
//TODO(Client中的消息发送)  3.调用sendKernelImpl方法,消息发送核心方法
private SendResult sendKernelImpl(final Message msg,
final MessageQueue mq,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final TopicPublishInfo topicPublishInfo,
final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
long beginStartTime = System.currentTimeMillis();
//TODO(Client中的消息发送) 3.1 :获取路由表信息(如果没有则会从NameServer中获取)
String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
if (null == brokerAddr) {
//尝试从nameServer获取(更新NameServer)
tryToFindTopicPublishInfo(mq.getTopic());
//再次获取地址
brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
}

SendMessageContext context = null;
if (brokerAddr != null) {
//TODO VIP通讯权限,也就是拥有发送时最高优先级
brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);

byte[] prevBody = msg.getBody();
try {
//for MessageBatch,ID has been set in the generating process
if (!(msg instanceof MessageBatch)) {
MessageClientIDSetter.setUniqID(msg);
}
int sysFlag = 0;
boolean msgBodyCompressed = false;
if (this.tryToCompressMessage(msg)) {
sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
msgBodyCompressed = true;
}

final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {
sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
}
//如果包含钩子则调用一下钩子方法
if (hasCheckForbiddenHook()) {
CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext();
checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr());
checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup());
checkForbiddenContext.setCommunicationMode(communicationMode);
checkForbiddenContext.setBrokerAddr(brokerAddr);
checkForbiddenContext.setMessage(msg);
checkForbiddenContext.setMq(mq);
checkForbiddenContext.setUnitMode(this.isUnitMode());
this.executeCheckForbiddenHook(checkForbiddenContext);
}
//发送消息的钩子
if (this.hasSendMessageHook()) {
context = new SendMessageContext();
context.setProducer(this);
context.setProducerGroup(this.defaultMQProducer.getProducerGroup());
context.setCommunicationMode(communicationMode);
context.setBornHost(this.defaultMQProducer.getClientIP());
context.setBrokerAddr(brokerAddr);
context.setMessage(msg);
context.setMq(mq);
String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (isTrans != null && isTrans.equals("true")) {
context.setMsgType(MessageType.Trans_Msg_Half);
}

if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {
context.setMsgType(MessageType.Delay_Msg);
}
this.executeSendMessageHookBefore(context);
}
//消息封装
SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
requestHeader.setTopic(msg.getTopic());
requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
requestHeader.setQueueId(mq.getQueueId());
requestHeader.setSysFlag(sysFlag);
requestHeader.setBornTimestamp(System.currentTimeMillis());
requestHeader.setFlag(msg.getFlag());
requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
requestHeader.setReconsumeTimes(0);
requestHeader.setUnitMode(this.isUnitMode());
requestHeader.setBatch(msg instanceof MessageBatch);
if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
if (reconsumeTimes != null) {
requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));
MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);
}

String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);
if (maxReconsumeTimes != null) {
requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));
MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);
}
}
//TODO(Client中的消息发送) 3.2 :通过判断发送类型,最终调用了MQClientAPIImpl类的sendMessage方法
SendResult sendResult = null;
switch (communicationMode) {
case ASYNC:
Message tmpMessage = msg;
if (msgBodyCompressed) {
//If msg body was compressed, msgbody should be reset using prevBody.
//Clone new message using commpressed message body and recover origin massage.
//Fix bug:https://github.com/apache/rocketmq-externals/issues/66
tmpMessage = MessageAccessor.cloneMessage(msg);
msg.setBody(prevBody);
}
long costTimeAsync = System.currentTimeMillis() - beginStartTime;
if (timeout < costTimeAsync) {
throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
}
sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
brokerAddr,
mq.getBrokerName(),
tmpMessage,
requestHeader,
timeout - costTimeAsync,
communicationMode,
sendCallback,
topicPublishInfo,
this.mQClientFactory,
this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),
context,
this);
break;
case ONEWAY:
case SYNC:
long costTimeSync = System.currentTimeMillis() - beginStartTime;
if (timeout < costTimeSync) {
throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
}
//TODO 3.2 :同步发送,最终调用了MQClientAPIImpl类的sendMessage方法
sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
brokerAddr,
mq.getBrokerName(),
msg,
requestHeader,
timeout - costTimeSync,
communicationMode,
context,
this);
break;
default:
assert false;
break;
}

if (this.hasSendMessageHook()) {
context.setSendResult(sendResult);
this.executeSendMessageHookAfter(context);
}

return sendResult;
} catch (RemotingException e) {
if (this.hasSendMessageHook()) {
context.setException(e);
this.executeSendMessageHookAfter(context);
}
throw e;
} catch (MQBrokerException e) {
if (this.hasSendMessageHook()) {
context.setException(e);
this.executeSendMessageHookAfter(context);
}
throw e;
} catch (InterruptedException e) {
if (this.hasSendMessageHook()) {
context.setException(e);
this.executeSendMessageHookAfter(context);
}
throw e;
} finally {
msg.setBody(prevBody);
}
}

throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
}

3.1)获取路由表信息(如果没有则会从NameServer中获取)。

3.2) 通过判断发送类型设置不同的入参,但是最终都调用了MQClientAPIImpl类的sendMessage方法。下面详细分析同步调用的sendMessage方法。

3.2.1) sendMessageSync –>NettyRemotingClient.invokeSync()方法完成发送。

sendMessageSync方法
1
2
3
4
5
6
7
8
9
10

//TODO (Client中的消息发送)3.2.1: 同步发送,最终调用sendMessageSync
private SendResult sendMessageSync(final String addr,final String brokerName,
    final Message msg, final long timeoutMillis, final RemotingCommand request
) throws RemotingException, MQBrokerException, InterruptedException {
    //TODO (Client中的消息发送)3.2.1: 同步发送,Retmoting模块中invokeSync方法,通过Netty方式完成发送
    RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
    assert response != null;
    return this.processSendResponse(brokerName, msg, response);
}

在NettyRemotingAbstract中的invokeSyncImpl里面会大量使用Netty进行调用(Netty的版本是4.0.42.Final)。

  1. 不同发送方式的sendResult处理不同。
核心关键点
消息重试

为什么RocketMQ中的消息重试是2?就是消息一般情况下发送三次(事不过三)

Netty:

这块Netty发送的详情见《网络协议和Netty》专题

为何要使用Netty作为高性能的通信库?
  • Netty的编程API使用简单,开发门槛低,无需编程者去关注和了解太多的NIO编程模型和概念;

  • 对于编程者来说,可根据业务的要求进行定制化地开发,通过Netty的ChannelHandler对通信框架进行灵活的定制化扩展;

  • Netty框架本身支持拆包/解包,异常检测等机制,让编程者可以从JAVA NIO的繁琐细节中解脱,而只需要关注业务处理逻辑;

  • Netty解决了(准确地说应该是采用了另一种方式完美规避了)JDK NIO的Bug(Epoll bug,会导致Selector空轮询,最终导致CPU 100%);

  • Netty框架内部对线程,selector做了一些细节的优化,精心设计的reactor多线程模型,可以实现非常高效地并发处理;

  • Netty已经在多个开源项目(Hadoop的RPC框架avro使用Netty作为通信框架)中都得到了充分验证,健壮性/可靠性比较好。

总结

​ 客户端发送消息流程比较简单,首先封装消息,然后根据NameServer返回的路由信息,然后把这些消息组成一个整体,最后调用Remoting模块使用Netty把消息发送给Broker。在里面包含了多种发送方式,同时也有消息重新发送机制。

Broker中消息的生产

​ 因为在Broker启动流程中涉及到了非常复杂的封装,这里没有必要进行讲解,我们简单想一想,Broker 最核心的功能就是接收到消息然后把消息进行存储,那么我们就Broker中对于消息的处理流程进行分析。

源码跟踪

发送的消息到达broker,调用org.apache.rocketmq.broker.processor.SendMessageProcessor类的processRequest()方法,processRequest()调用sendMessag()。

processRequest 方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
//TODO 这里就是请求方法(也就是消息发送至broker)
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
SendMessageContext mqtraceContext;
switch (request.getCode()) {
case RequestCode.CONSUMER_SEND_MSG_BACK:
return this.consumerSendMsgBack(ctx, request);
default:
//消息拆包
SendMessageRequestHeader requestHeader = parseRequestHeader(request);
if (requestHeader == null) {
return null;
}

mqtraceContext = buildMsgContext(ctx, requestHeader);
//执行钩子
this.executeSendMessageHookBefore(ctx, request, mqtraceContext);

RemotingCommand response;
if (requestHeader.isBatch()) { //TODO 发送的消息是批量 batch
response = this.sendBatchMessage(ctx, request, mqtraceContext, requestHeader);
} else {
//TODO(Broker中消息的生产) 1. 非批次发送消息
response = this.sendMessage(ctx, request, mqtraceContext, requestHeader);
}
//执行钩子
this.executeSendMessageHookAfter(response, mqtraceContext);
return response;
}
}

  1. 非批次发送消息sendMessag()。
sendMessage
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
private RemotingCommand sendMessage(final ChannelHandlerContext ctx,
final RemotingCommand request,
final SendMessageContext sendMessageContext,
final SendMessageRequestHeader requestHeader) throws RemotingCommandException {

//创建响应的response
final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader();
//一些参数设置
response.setOpaque(request.getOpaque());

response.addExtField(MessageConst.PROPERTY_MSG_REGION, this.brokerController.getBrokerConfig().getRegionId());
response.addExtField(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(this.brokerController.getBrokerConfig().isTraceOn()));

log.debug("receive SendMessage request command, {}", request);

final long startTimstamp = this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp();
if (this.brokerController.getMessageStore().now() < startTimstamp) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(String.format("broker unable to service, until %s", UtilAll.timeMillisToHumanString2(startTimstamp)));
return response;
}

response.setCode(-1);
super.msgCheck(ctx, requestHeader, response);
if (response.getCode() != -1) {
return response;
}

final byte[] body = request.getBody();

int queueIdInt = requestHeader.getQueueId();
//获取对应的主题的配置
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
//获取一个有效的队列id
if (queueIdInt < 0) {
queueIdInt = Math.abs(this.random.nextInt() % 99999999) % topicConfig.getWriteQueueNums();
}
//创建消息体
MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
msgInner.setTopic(requestHeader.getTopic());
msgInner.setQueueId(queueIdInt);

if (!handleRetryAndDLQ(requestHeader, response, request, msgInner, topicConfig)) {
return response;
}
//参数设置
msgInner.setBody(body);
msgInner.setFlag(requestHeader.getFlag());
MessageAccessor.setProperties(msgInner, MessageDecoder.string2messageProperties(requestHeader.getProperties()));
msgInner.setPropertiesString(requestHeader.getProperties());
msgInner.setBornTimestamp(requestHeader.getBornTimestamp());
msgInner.setBornHost(ctx.channel().remoteAddress());
msgInner.setStoreHost(this.getStoreHost());
msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());
PutMessageResult putMessageResult = null;
Map<String, String> oriProps = MessageDecoder.string2messageProperties(requestHeader.getProperties());
String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (traFlag != null && Boolean.parseBoolean(traFlag)) {
if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark(
"the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
+ "] sending transaction message is forbidden");
return response;
}
putMessageResult = this.brokerController.getTransactionalMessageService().prepareMessage(msgInner);
} else {
//TODO(Broker中消息的生产) 2.正常处理就是要进行消息存储(注意,这里都只存储commitlog),同时返回Result
putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
}

return handlePutMessageResult(putMessageResult, response, request, msgInner, responseHeader, sendMessageContext, ctx, queueIdInt);

}

2) 消息存储(注意,这里都只存储commitlog),调用DefaultMessageStore.putMessage()方法。

putMessage 方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47

//TODO 进行消息存储的是这个方法
public PutMessageResult putMessage(MessageExtBrokerInner msg) {
    if (this.shutdown) {
        log.warn("message store has shutdown, so putMessage is forbidden");
        return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
    }
    if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
        long value = this.printTimes.getAndIncrement();
        if ((value % 50000) == 0) {
            log.warn("message store is slave mode, so putMessage is forbidden ");
        }
        return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
    }
    if (!this.runningFlags.isWriteable()) {
        long value = this.printTimes.getAndIncrement();
        if ((value % 50000) == 0) {
            log.warn("message store is not writeable, so putMessage is forbidden " + this.runningFlags.getFlagBits());
        }
        return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
    } else {
        this.printTimes.set(0);
    }
    if (msg.getTopic().length() > Byte.MAX_VALUE) {
        log.warn("putMessage message topic length too long " + msg.getTopic().length());
        return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
    }
    if (msg.getPropertiesString() != null && msg.getPropertiesString().length() > Short.MAX_VALUE) {
        log.warn("putMessage message properties length too long " + msg.getPropertiesString().length());
        return new PutMessageResult(PutMessageStatus.PROPERTIES_SIZE_EXCEEDED, null);
    }
    if (this.isOSPageCacheBusy()) {
        return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, null);
    }
    long beginTime = this.getSystemClock().now();
    //TODO(Broker中消息的生产) 2.1 :这里进行commitlog的提交
    PutMessageResult result = this.commitLog.putMessage(msg);
    long eclipseTime = this.getSystemClock().now() - beginTime;
    if (eclipseTime > 500) {
        log.warn("putMessage not in lock eclipse time(ms)={}, bodyLength={}", eclipseTime, msg.getBody().length);
    }
    this.storeStatsService.setPutMessageEntireTimeMax(eclipseTime);
    if (null == result || !result.isOk()) {
        this.storeStatsService.getPutMessageFailedTimes().incrementAndGet();
    }
    return result;
}

2.1) 这里进行commitlog的提交,调用CommitLog.putMessage()。

putMessage 方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95

//RocketMQ中有commitlog、comsumequeue、Index文件要处理(这三类都使用NIO的MappedByteBuffer类来提高读写性)
//TODO 这里只先处理commitlog
public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
    msg.setStoreTimestamp(System.currentTimeMillis());
    // Set the message body BODY CRC (consider the most appropriate setting on the client)
    msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
    AppendMessageResult result = null;
    StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
    String topic = msg.getTopic();
    int queueId = msg.getQueueId();
    final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
    if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
        // Delay Delivery
        if (msg.getDelayTimeLevel() > 0) {
            if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
                msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
            }
            topic = ScheduleMessageService.SCHEDULE_TOPIC;
            queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
            // Backup real topic, queueId
            MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
            MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
            msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
            msg.setTopic(topic);
            msg.setQueueId(queueId);
        }
    }
    long eclipseTimeInLock = 0;
    MappedFile unlockMappedFile = null;
    MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
    //TODO 这里使用了lock锁
    putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
    try {
        long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
        this.beginTimeInLock = beginLockTimestamp;
        // Here settings are stored timestamp, in order to ensure an orderly global
        msg.setStoreTimestamp(beginLockTimestamp);
        if (null == mappedFile || mappedFile.isFull()) {
            mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
        }
        if (null == mappedFile) {
            log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
            beginTimeInLock = 0;
            return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null);
        }
        //TODO 这里是处理文件的关键,使用mappedFile来处理(mappedFile类中有MappedByteBuffer)
        // append方法,追加, 顺序写入。 先处理commitlog
        result = mappedFile.appendMessage(msg, this.appendMessageCallback);
        switch (result.getStatus()) {
            case PUT_OK:
                break;
            case END_OF_FILE:
                unlockMappedFile = mappedFile;
                // Create a new file, re-write the message
                mappedFile = this.mappedFileQueue.getLastMappedFile(0);
                if (null == mappedFile) { // XXX: warn and notify me
                    log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
                    beginTimeInLock = 0;
                    return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result);
                }
                result = mappedFile.appendMessage(msg, this.appendMessageCallback);
                break;
            case MESSAGE_SIZE_EXCEEDED:
            case PROPERTIES_SIZE_EXCEEDED:
                beginTimeInLock = 0;
                return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result);
            case UNKNOWN_ERROR:
                beginTimeInLock = 0;
                return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
            default:
                beginTimeInLock = 0;
                return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
        }
        eclipseTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
        beginTimeInLock = 0;
    } finally {
        putMessageLock.unlock();
    }
    if (eclipseTimeInLock > 500) {
        log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", eclipseTimeInLock, msg.getBody().length, result);
    }
    if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
        this.defaultMessageStore.unlockMappedFile(unlockMappedFile);
    }
    PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);
    // Statistics
    storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();
    storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes());
    //TODO 这里进行刷盘处理
    handleDiskFlush(result, putMessageResult, msg);
    //TODO 这里是HA高可用主从处理
    handleHA(result, putMessageResult, msg);
    return putMessageResult;
}

2.1.1 )在MappedFile类中,处理存储都是使用MappedFile这个类进行处理的,最终调用appendMessage方法。appendMessagesInner方法中,这里进行文件的追加(AppendMessageCallback接口的实现DefaultAppendMessageCallback在CommitLog类中,是一个内部类)。

appendMessagesInner 方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb) {
    assert messageExt != null;
    assert cb != null;
    int currentPos = this.wrotePosition.get();
    if (currentPos < this.fileSize) {
        ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();
        byteBuffer.position(currentPos);
        AppendMessageResult result = null;
        if (messageExt instanceof MessageExtBrokerInner) {
      //TODO 这里进行文件的追加(AppendMessageCallback接口的实现DefaultAppendMessageCallback在CommitLog类中,是一个内部类)
            result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBrokerInner) messageExt);
        } else if (messageExt instanceof MessageExtBatch) {
            result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBatch) messageExt);
        } else {
            return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
        }
        this.wrotePosition.addAndGet(result.getWroteBytes());
        this.storeTimestamp = result.getStoreTimestamp();
        return result;
    }
    log.error("MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}", currentPos, this.fileSize);
    return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
}

2.2) 在commitlog类中doAppend方法中进行commitlog的处理,还是基于byteBuffer的操作。

1
public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank,final MessageExtBrokerInner msgInner);

2.3 )在commitlog类中doAppend方法中进行返回,将消息写入的内存的位置信息、写入耗时封装为AppendMessageResult对象返回。

核心关键点

​ Commitlog,RocketMQ接收到消息后,首先是写入Commitlog文件,按照顺序进行写入,使用NIO技术。在Commitlog中putMessage最后通过判断配置文件的主从同步类型和刷盘类型,进行刷盘。

总结

​ 借助Java NIO的力量,使得I/O性能十分高,当消息来的时候,顺序写入CommitLog。

​ RocketMQ下主要有三类大文件:commitlog文件、Index文件,consumequeue文件,对于三类大文件,使用的就是NIO的MappedByteBuffer类来提高读写性能(主要是刷盘方法中)。这个类是文件内存映射的相关类,支持随机读和顺序写。在RocketMQ中,被封装成了MappedFile类。

Broker中更新消息队列和索引文件

​ 消息进入Commitlog文件还不够,因为对于消费者来说,他们必须要看到ConsumeQueue和IndexFile(ConsumeQueue是因为消费要根据队列进行消费,另外没有索引文件IndexFile,消息的查找会出现很大的延迟)。所以RocketMQ通过开启一个线程ReputMessageService来监听CommitLog文件更新事件,如果有新的消息,则及时更新ComsumeQueue、IndexFile文件。

源码跟踪
  1. DefaultMessageStore类中的内部类ReputMessageService专门处理此项任务。

  2. ReputMessageService类的run(),默认1毫秒处理一次(文件从CommitLog到ComsumeQueue和Index)

ReputMessageService#run
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Override
public void run() {
DefaultMessageStore.log.info(this.getServiceName() + " service started");
//TODO Broker中更新消息队列和索引文件 2 : 这里默认1毫秒处理一次(文件从CommitLog到ComsumeQueue和Index)
while (!this.isStopped()) {
try {
//为了速度休眠1ms
Thread.sleep(1);
this.doReput(); //这里干活的方法
} catch (Exception e) {
DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
}
}

DefaultMessageStore.log.info(this.getServiceName() + " service end");
}
  1. ReputMessageService类的doReput()方法。
ReputMessageService#doReput
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
//TODO 3. 实时更新消息消费队列和索引文件的方法
private void doReput() {
    for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {
        if (DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable()
            && this.reputFromOffset >= DefaultMessageStore.this.getConfirmOffset()) {
            break;
        }
        //TODO 从commitlog中获取数据
        SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);
        if (result != null) {
            try {
                this.reputFromOffset = result.getStartOffset();
                for (int readSize = 0; readSize < result.getSize() && doNext; ) {
                    DispatchRequest dispatchRequest =
           DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false);
                    int size = dispatchRequest.getMsgSize();
                    if (dispatchRequest.isSuccess()) {
                        if (size > 0) {
              //TODO 4 :使用DefaultMessageStore类中doDispatch,最终会构建(构建消息消费队 )和(构建索引文件)
                            DefaultMessageStore.this.doDispatch(dispatchRequest);
                            if (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole()
                                && DefaultMessageStore.this.brokerConfig.isLongPollingEnable()) {DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),
                                    dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,
                                    dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(),
                                    dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());
                            }
                            this.reputFromOffset += size;
                            readSize += size;
                            if (DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) {
                                DefaultMessageStore.this.storeStatsService
                                    .getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).incrementAndGet();
                                DefaultMessageStore.this.storeStatsService
                                    .getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic())
                                    .addAndGet(dispatchRequest.getMsgSize());
                            }
                        } else if (size == 0) {
                            this.reputFromOffset = DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset);
                            readSize = result.getSize();
                        }
                    } else if (!dispatchRequest.isSuccess()) {
                        if (size > 0) {
                            log.error("[BUG]read total count not equals msg total size. reputFromOffset={}", reputFromOffset);
                            this.reputFromOffset += size;
                        } else {
                            doNext = false;
                            if (DefaultMessageStore.this.brokerConfig.getBrokerId() == MixAll.MASTER_ID) {
                                log.error("[BUG]the master dispatch message to consume queue error, COMMITLOG OFFSET: {}",
                                    this.reputFromOffset);
                                this.reputFromOffset += result.getSize() - readSize;
                            }
                        }
                    }
                }
            } finally {
                result.release();
            }
        } else {
            doNext = false;
        }
    }
}

) ReputMessageService类的doReput()方法中,doDispatch最终会构建(构建消息消费队)和(构建索引文件)。

核心关键点

定时任务来处理的消息存储转换,处理核心类是DefaultMessageStore类中的内部类ReputMessageService。

总结

定时任务来处理的消息存储转换,处理核心类是DefaultMessageStore类中的内部类ReputMessageService。

消息的消费

消息消费分为推和拉两种模式,这里重点分析推模式。

Client中的消费者启动流程

DefaultMQPushConsumerImpl#start()方法,其中重点就是 consumer.start()

DefaultMQPushConsumer.start() ->DefaultMQPushConsumerImpl.start()

源码跟踪步骤
  1. 检查配置信息this.checkConfig();

  2. 加工订阅信息(同时,如果消息消费模式为集群模式,还需要为该消费组创建一个重试主题。)

copySubscription 方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36

//TODO 构建订阅关系(主题信息)
private void copySubscription() throws MQClientException {
    try {
        //TODO 2.1.从defaultMQPushConsumer中获取订阅关系(使用Map)
        Map<String, String> sub = this.defaultMQPushConsumer.getSubscription();
        if (sub != null) {
            for (final Map.Entry<String, String> entry : sub.entrySet()) {
                final String topic = entry.getKey();
                final String subString = entry.getValue();
                SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),
                    topic, subString);
                //TODO 2.2.把组装好的订阅关系保存到rebalanceImpl中
                this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
            }
        }
        if (null == this.messageListenerInner) {
            this.messageListenerInner = this.defaultMQPushConsumer.getMessageListener();
        }
        switch (this.defaultMQPushConsumer.getMessageModel()) {
            case BROADCASTING:
                break;
            case CLUSTERING:
                //TODO 2.3.集群模式下给topic创建一个retry的topic,%retry%+comsumerGroup
                final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup());
                SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),
                    retryTopic, SubscriptionData.SUB_ALL);
                this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData);
                break;
            default:
                break;
        }
    } catch (Exception e) {
        throw new MQClientException("subscription exception", e);
    }
}
  1. 创建MQClientInstance实例
getAndCreateMQClientInstance 方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public MQClientInstance getAndCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {
    String clientId = clientConfig.buildMQClientId();
    MQClientInstance instance = this.factoryTable.get(clientId);
    if (null == instance) {
        instance = new MQClientInstance(clientConfig.cloneClientConfig(),
                this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
        MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);
        if (prev != null) {
            instance = prev;
            log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);
        } else {
            log.info("Created new MQClientInstance for clientId:[{}]", clientId);
        }
    }
    return instance;
}
  1. 负载均衡
1
2
this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
  1. 队列默认分配算法
1
2
this.rebalanceImpl.setAllocateMessageQueueStrategy(defaultMQPushConsumer.getAllocateMessageQueueStrategy());
this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
  1. pullAPIWrapper拉取消息
1
2
this.pullAPIWrapper = new PullAPIWrapper(mQClientFactory,defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
  1. 消费进度存储
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

if (this.defaultMQPushConsumer.getOffsetStore() != null) {
    this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
} else {
    switch (this.defaultMQPushConsumer.getMessageModel()) {
        case BROADCASTING:
            this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
            break;
        case CLUSTERING:
            this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
            break;
        default:
            break;
    }
    this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
}
  1. 加载消息进度
1
2
3
4
5
6
7
8
9
10
11
12
13
this.offsetStore.load();

@Override
public void load() throws MQClientException {
    OffsetSerializeWrapper offsetSerializeWrapper = this.readLocalOffset();
    if (offsetSerializeWrapper != null && offsetSerializeWrapper.getOffsetTable() != null) {
        offsetTable.putAll(offsetSerializeWrapper.getOffsetTable());
        for (MessageQueue mq : offsetSerializeWrapper.getOffsetTable().keySet()) {
            AtomicLong offset = offsetSerializeWrapper.getOffsetTable().get(mq);
            log.info("load consumer's offset, {} {} {}",this.groupName, mq, offset.get());
        }
    }
}
  1. 判断是顺序消息还是并发消息
1
2
3
4
5
6
7
8
if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
    this.consumeOrderly = true;
    this.consumeMessageService = new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
} else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
    this.consumeOrderly = false;
    //TODO 并发消息
    this.consumeMessageService = new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
}
顺序消息ConsumeMessageOrderlyService方法实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

public ConsumeMessageOrderlyService(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl,
    MessageListenerOrderly messageListener) {
    this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl;
    this.messageListener = messageListener;
    this.defaultMQPushConsumer = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer();
    this.consumerGroup = this.defaultMQPushConsumer.getConsumerGroup();
    this.consumeRequestQueue = new LinkedBlockingQueue<Runnable>();
    this.consumeExecutor = new ThreadPoolExecutor(
        this.defaultMQPushConsumer.getConsumeThreadMin(),
        this.defaultMQPushConsumer.getConsumeThreadMax(),
        1000 * 60,
        TimeUnit.MILLISECONDS,
        this.consumeRequestQueue,
        new ThreadFactoryImpl("ConsumeMessageThread_"));
    this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ConsumeMessageScheduledThread_"));
}
并发消息ConsumeMessageConcurrentlyService实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
//TODO 1 构造函数
public ConsumeMessageConcurrentlyService(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl,
    MessageListenerConcurrently messageListener) {
    this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl;
    this.messageListener = messageListener;
    this.defaultMQPushConsumer = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer();
    this.consumerGroup = this.defaultMQPushConsumer.getConsumerGroup();
    this.consumeRequestQueue = new LinkedBlockingQueue<Runnable>();
    //TODO 消费者提交线程
    this.consumeExecutor = new ThreadPoolExecutor(
            //TODO 线程池的常驻线程数:consumeThreadMin
        this.defaultMQPushConsumer.getConsumeThreadMin(),
            //TODO 线程池的最大线程数:consumeThreadMax
        this.defaultMQPushConsumer.getConsumeThreadMax(),
        1000 * 60,
        TimeUnit.MILLISECONDS,
        this.consumeRequestQueue,
        new ThreadFactoryImpl("ConsumeMessageThread_"));
    this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ConsumeMessageScheduledThread_"));
    this.cleanExpireMsgExecutors = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("CleanExpireMsgScheduledThread_"));
}
  1. 消息消费服务并启动
ConsumeMessageOrderlyService顺序消息方式
1
2
3
4
5
6
7
8
9
10
11

public void start() {
if(MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())) {
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                ConsumeMessageOrderlyService.this.lockMQPeriodically();
            }
        }, 1000 * 1, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS);
    }
}
ConsumeMessageConcurrentlyService并发消息方式
1
2
3
4
5
6
7
8
9

public void start() {
    this.cleanExpireMsgExecutors.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            cleanExpireMsg();
        }
    }, this.defaultMQPushConsumer.getConsumeTimeout(), this.defaultMQPushConsumer.getConsumeTimeout(), TimeUnit.MINUTES);
}
  1. 注册消费者
1
2
3
4
5
6
7

boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
if (!registerOK) {
    this.serviceState = ServiceState.CREATE_JUST;
    this.consumeMessageService.shutdown();
    .....
}
MQClientInstance.registerConsumer()方法
1
2
3
4
5
6
7
8
9
10
11
12

public boolean registerConsumer(final String group, final MQConsumerInner consumer) {
    if (null == group || null == consumer) {
        return false;
    }
    MQConsumerInner prev = this.consumerTable.putIfAbsent(group, consumer);
    if (prev != null) {
        log.warn("the consumer group[" + group + "] exist already.");
        return false;
    }
    return true;
}
  1. MQClientInstance启动
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34

// TODO 12.MQClientInstance启动
public void start() throws MQClientException {
    synchronized (this) {
        switch (this.serviceState) {
            case CREATE_JUST:
                this.serviceState = ServiceState.START_FAILED;
                // If not specified,looking address from name server
                if (null == this.clientConfig.getNamesrvAddr()) {
                    this.mQClientAPIImpl.fetchNameServerAddr();
                }
                this.mQClientAPIImpl.start(); // Start request-response channel
                //TODO 12.1 定时任务
                this.startScheduledTask();
                //TODO 12.2 开启拉消息服务(线程)
                this.pullMessageService.start();
                //TODO 12.3 负载均衡服务(线程)
                this.rebalanceService.start();
                // Start push service
                this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
                log.info("the client factory [{}] start OK", this.clientId);
                this.serviceState = ServiceState.RUNNING;
                break;
            case RUNNING:
                break;
            case SHUTDOWN_ALREADY:
                break;
            case START_FAILED:
                throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
            default:
                break;
        }
    }
}

12.1) 定时任务startScheduledTask();

12.1.1) 每隔2分钟尝试获取一次NameServer地址

12.1.2) 每隔30S尝试更新主题路由信息

12.1.3) 每隔30S进行Broker心跳检测

12.1.4) 默认每隔5秒持久化ConsumeOffset

12.1.5)默认每隔1S检查线程池适配

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
//TODO 12.1 定时任务
private void startScheduledTask() {
    //TODO 12.1.1 每隔2分钟尝试获取一次NameServer地址
    if (null == this.clientConfig.getNamesrvAddr()) {
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {
                    MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();
                } catch (Exception e) {
                    log.error("ScheduledTask fetchNameServerAddr exception", e);
                }
            }
        }, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
    }
    //TODO 12.1.2 每隔30S尝试更新主题路由信息
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            try {
                MQClientInstance.this.updateTopicRouteInfoFromNameServer();
            } catch (Exception e) {
                log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
            }
        }
    }, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);
    //TODO 12.1.3 每隔30S进行Broker心跳检测
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            try {
                MQClientInstance.this.cleanOfflineBroker();
                MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
            } catch (Exception e) {
                log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);
            }
        }
    }, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);
    //TODO 12.1.4 默认每隔5秒持久化ConsumeOffset
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            try {
                MQClientInstance.this.persistAllConsumerOffset();
            } catch (Exception e) {
                log.error("ScheduledTask persistAllConsumerOffset exception", e);
            }
        }
    }, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
    //TODO 12.1.5 默认每隔1S检查线程池适配
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            try {
                MQClientInstance.this.adjustThreadPool();
            } catch (Exception e) {
                log.error("ScheduledTask adjustThreadPool exception", e);
            }
        }
    }, 1, 1, TimeUnit.MINUTES);
}

12.2 开启拉消息服务(线程)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
//TODO 消息的拉取
@Override
public void run() {
    log.info(this.getServiceName() + " service started");
    while (!this.isStopped()) {
        try {//TODO 从LinkedBlockingQueue中拉取pullRequest
            PullRequest pullRequest = this.pullRequestQueue.take();
            //TODO 进入pullMessage方法
            this.pullMessage(pullRequest);
        } catch (InterruptedException ignored) {
        } catch (Exception e) {
            log.error("Pull Message Service Run Method exception", e);
        }
    }
    log.info(this.getServiceName() + " service end");
}

private void pullMessage(final PullRequest pullRequest) {
    final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
    if (consumer != null) {
        DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
        //TODO 实现类DefaultMQPushConsumerImpl
        impl.pullMessage(pullRequest);
    } else {
        log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);
    }
}
消息拉取的关键方法DefaultMQPushConsumerImpl.pullMessage();
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141

//TODO 消息拉取的关键方法
public void pullMessage(final PullRequest pullRequest) {
    //TODO 1.获取处理队列ProcessQueue
    final ProcessQueue processQueue = pullRequest.getProcessQueue();
    //TODO 2.如果dropped=true,那么return
    if (processQueue.isDropped()) {
        log.info("the pull request[{}] is dropped.", pullRequest.toString());
        return;
    }
    //TODO 3.然后更新该消息队列最后一次拉取的时间
    pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis());
    try {
        //TODO 4.如果消费者 服务状态不为ServiceState.RUNNING,默认延迟3秒再执行
        this.makeSureStateOK();
    } catch (MQClientException e) {
        log.warn("pullMessage exception, consumer state not ok", e);
        this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
        return;
    }
    //TODO 5.是否暂停,如果有那么延迟3s执行
    if (this.isPause()) {
        log.warn("consumer was paused, execute pull request later. instanceName={}, group={}", this.defaultMQPushConsumer.getInstanceName(), this.defaultMQPushConsumer.getConsumerGroup());
        this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_SUSPEND);
        return;
    }
    //TODO 6.消息的拉取会有流量控制,当processQueue没有消费的消息的数量达到(默认1000个)会触发流量控制
    long cachedMessageCount = processQueue.getMsgCount().get();
    long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);
    if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {
        //TODO 6.1 PullRequest延迟50ms后,放入LinkedBlockQueue中,每触发1000次打印一次警告
        this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
        if ((queueFlowControlTimes++ % 1000) == 0) {
                this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
        }
        return;
    }
    //TODO 7.当processQueue中没有消费的消息体总大小 大于(默认100m)时,触发流控
    if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {
        this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
        if ((queueFlowControlTimes++ % 1000) == 0) {
            log.warn(
                "the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
                this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
        }
        return;
    }
    //TODO 8.如果不是顺序消息,判断processQueue中消息的最大间距,就是消息的最大位置和最小位置的差值

如果大于默认值2000,那么触发流控
    if (!this.consumeOrderly) {
        if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {
            this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
            if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) {
                 processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(),

processQueue.getMaxSpan(),pullRequest, queueMaxSpanFlowControlTimes);
            }
            return;
        }
    } else {
        if (processQueue.isLocked()) {
            if (!pullRequest.isLockedFirst()) {
                final long offset = this.rebalanceImpl.computePullFromWhere(pullRequest.getMessageQueue());
                boolean brokerBusy = offset < pullRequest.getNextOffset();
                    pullRequest, offset, brokerBusy);
                if (brokerBusy) {
                        pullRequest, offset);
                }
                pullRequest.setLockedFirst(true);
                pullRequest.setNextOffset(offset);
            }
        } else {
            this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
            log.info("pull message later because not locked in broker, {}", pullRequest);
            return;
        }
    }
    //TODO 9.获取主题订阅信息
    final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
    if (null == subscriptionData) {
        this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
        log.warn("find the consumer's subscription failed, {}", pullRequest);
        return;
    }
    final long beginTimestamp = System.currentTimeMillis();
    //TODO 10.new一个回调方法,这个回调方法在broker端拉取完消息将调用
    PullCallback pullCallback = new PullCallback() {
        @Override
        public void onSuccess(PullResult pullResult) {......  }
        @Override
        public void onException(Throwable e) { ......        }
    };
    //TODO 11.如果是集群消费模式,从内存中获取MessageQueue的commitlog偏移量
    boolean commitOffsetEnable = false;
    long commitOffsetValue = 0L;
    if (MessageModel.CLUSTERING == this.defaultMQPushConsumer.getMessageModel()) {
        commitOffsetValue = this.offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY);
        if (commitOffsetValue > 0) {
            commitOffsetEnable = true;
        }
    }
    String subExpression = null;
    boolean classFilter = false;
    SubscriptionData sd = this.rebalanceImpl.getSubscriptionInner().get(

pullRequest.getMessageQueue().getTopic());
    if (sd != null) {
        if (this.defaultMQPushConsumer.isPostSubscriptionWhenPull() && !sd.isClassFilterMode()) {
            subExpression = sd.getSubString();
        }
        //TODO 类过滤已经弃用了
        classFilter = sd.isClassFilterMode();
    }
    int sysFlag = PullSysFlag.buildSysFlag(
        commitOffsetEnable, // commitOffset
        true, // suspend
        subExpression != null, // subscription
        classFilter // class filter
    );
    try {
        //TODO 14.调用pullAPI方法来拉取消息
        this.pullAPIWrapper.pullKernelImpl(
            pullRequest.getMessageQueue(), // 消息消费队列
            subExpression,  //消息订阅子模式subscribe( topicName, "模式")
            subscriptionData.getExpressionType(),
            subscriptionData.getSubVersion(),  // 版本
            pullRequest.getNextOffset(),    //拉取位置
            this.defaultMQPushConsumer.getPullBatchSize(),  //从broker端拉取多少消息
            sysFlag,
            commitOffsetValue,  // 当前消息队列 commitlog日志中当前的最新偏移量(内存中)
            BROKER_SUSPEND_MAX_TIME_MILLIS, // 允许的broker 暂停的时间,毫秒为单位,默认为15s
            CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND, // 超时时间,默认为30s
            CommunicationMode.ASYNC,    // 调用方式,异步
            pullCallback  // pull 回调
        );
    } catch (Exception e) {
        log.error("pullKernelImpl exception", e);
        this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
    }
}

12.3) 负载均衡服务(线程)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

@Override
public void run() {
    log.info(this.getServiceName() + " service started");
    while (!this.isStopped()) {
        this.waitForRunning(waitInterval);
        this.mqClientFactory.doRebalance();
    }
    log.info(this.getServiceName() + " service end");
}

public void doRebalance() {
    for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {
        MQConsumerInner impl = entry.getValue();
        if (impl != null) {
            try {
                impl.doRebalance();
            } catch (Throwable e) {
                log.error("doRebalance exception", e);
            }
        }
    }
}
拉取方式DefaultMQPullConsumerImpl.doRebalance()代码
1
2
3
4
5
6
@Override
public void doRebalance() {
if (this.rebalanceImpl != null) {
this.rebalanceImpl.doRebalance(false);
}
}
推送方式DefaultMQPushConsumerImpl.doRebalance()代码
1
2
3
4
5
6
@Override
public void doRebalance() {
if (!this.pause) {
this.rebalanceImpl.doRebalance(this.isConsumeOrderly());
}
}

不管是pull还是push方式最后都调用下面的方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public void doRebalance(final boolean isOrder) {
    Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
    if (subTable != null) {
        for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
            final String topic = entry.getKey();
            try {
                this.rebalanceByTopic(topic, isOrder);
            } catch (Throwable e) {
                if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                    log.warn("rebalanceByTopic Exception", e);
                }
            }
        }
    }
    this.truncateMessageQueueNotMyTopic();
}
  1. 更新TopicRouteData
1
2
3
4
5
6
7
8
9
10
11
12
//TODO 13.更新TopicRouteData
this.updateTopicSubscribeInfoWhenSubscriptionChanged();

private void updateTopicSubscribeInfoWhenSubscriptionChanged() {
    Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
    if (subTable != null) {
        for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
            final String topic = entry.getKey();
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
        }
    }
}
  1. 检测broker状态(默认3秒检查一次)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35

//TODO 14.检测broker状态
this.mQClientFactory.checkClientInBroker();

public void checkClientInBroker() throws MQClientException {
    Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
    while (it.hasNext()) {
        Entry<String, MQConsumerInner> entry = it.next();
        Set<SubscriptionData> subscriptionInner = entry.getValue().subscriptions();
        if (subscriptionInner == null || subscriptionInner.isEmpty()) {
            return;
        }
        for (SubscriptionData subscriptionData : subscriptionInner) {
            if (ExpressionType.isTagType(subscriptionData.getExpressionType())) {
                continue;
            }
            // may need to check one broker every cluster...
            // assume that the configs of every broker in cluster are the the same.
            String addr = findBrokerAddrByTopic(subscriptionData.getTopic());
            if (addr != null) {
                try {
                    this.getMQClientAPIImpl().checkClientInBroker(
                        addr, entry.getKey(), this.clientId, subscriptionData, 3 * 1000
                    );
                } catch (Exception e) {
                    if (e instanceof MQClientException) {
                        throw (MQClientException) e;
                    } else {
                        throw new MQClientException(....)
                    }
                }
            }
        }
    }
}
MQClientAPIImpl.checkClientInBroker()方法代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public void checkClientInBroker(final String brokerAddr, final String consumerGroup,
    final String clientId, final SubscriptionData subscriptionData,
    final long timeoutMillis)
    throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException,
    RemotingConnectException, MQClientException {
    RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CHECK_CLIENT_CONFIG, null);
    CheckClientRequestBody requestBody = new CheckClientRequestBody();
    requestBody.setClientId(clientId);
    requestBody.setGroup(consumerGroup);
    requestBody.setSubscriptionData(subscriptionData);
    request.setBody(requestBody.encode());
    RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), brokerAddr), request, timeoutMillis);
    assert response != null;
    if (ResponseCode.SUCCESS != response.getCode()) {
        throw new MQClientException(response.getCode(), response.getRemark());
    }
}
NettyRemotingClient.invokeSync使用Netty完成RPC调用:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36

//TODO (Client中的消息发送)3.2.1: 这里是remoting相关的功能,具体使用Netty完成RPC调用
@Override
public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis)
    throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException {
    long beginStartTime = System.currentTimeMillis();
    final Channel channel = this.getAndCreateChannel(addr);
    if (channel != null && channel.isActive()) {
        try {
            //TODO 设置RPC调用的钩子(可以执行前后通知)
            doBeforeRpcHooks(addr, request);
            long costTime = System.currentTimeMillis() - beginStartTime;
            if (timeoutMillis < costTime) {
                throw new RemotingTimeoutException("invokeSync call timeout");
            }
            //TODO 同步调用实现类
            RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime);
            doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(channel), request, response);
            return response;
        } catch (RemotingSendRequestException e) {
            log.warn("invokeSync: send request exception, so close the channel[{}]", addr);
            this.closeChannel(addr, channel);
            throw e;
        } catch (RemotingTimeoutException e) {
            if (nettyClientConfig.isClientCloseSocketIfTimeout()) {
                this.closeChannel(addr, channel);
                log.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr);
            }
            log.warn("invokeSync: wait response timeout exception, the channel[{}]", addr);
            throw e;
        }
    } else {
        this.closeChannel(addr, channel);
        throw new RemotingConnectException(addr);
    }
}
NettyRemotingAbstract.invokeSyncImpl这个地方就是要进行消息发送(网络的处理Socket)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
//TODO 这个地方就是要进行消息发送(网络的处理Socket)
public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request,
    final long timeoutMillis)
    throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
    final int opaque = request.getOpaque();

    try {
        final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null);
        this.responseTable.put(opaque, responseFuture);
        final SocketAddress addr = channel.remoteAddress();
        channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture f) throws Exception {
                if (f.isSuccess()) {
                    responseFuture.setSendRequestOK(true);
                    return;
                } else {
                    responseFuture.setSendRequestOK(false);
                }
                responseTable.remove(opaque);
                responseFuture.setCause(f.cause());
                responseFuture.putResponse(null);
                log.warn("send a request command to channel <" + addr + "> failed.");
            }
        });
        RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
        if (null == responseCommand) {
            if (responseFuture.isSendRequestOK()) {
                throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,
                    responseFuture.getCause());
            } else {
                throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());
            }
        }
        return responseCommand;
    } finally {
        this.responseTable.remove(opaque);
    }
}
  1. 发送心跳
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

//TODO 15.发送心跳
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();

public void sendHeartbeatToAllBrokerWithLock() {
    if (this.lockHeartbeat.tryLock()) {
        try {
            this.sendHeartbeatToAllBroker();
            this.uploadFilterClassSource();
        } catch (final Exception e) {
            log.error("sendHeartbeatToAllBroker exception", e);
        } finally {
            this.lockHeartbeat.unlock();
        }
    } else {
        log.warn("lock heartBeat, but failed.");
    }
}
  1. 重新负载
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
//TODO 16.重新负载
this.mQClientFactory.rebalanceImmediately();

public void rebalanceImmediately() {
    this.rebalanceService.wakeup();
}

@Override
public void run() {
    log.info(this.getServiceName() + " service started");
    while (!this.isStopped()) {
        this.waitForRunning(waitInterval);
        this.mqClientFactory.doRebalance();
    }
    log.info(this.getServiceName() + " service end");
}
核心关键点

​ 在RocketMQ中,推模式还是使用拉模式进行消息的处理的,在MQClientInstance启动过程中启动了哪些定时任务定时任务中12步中,包括了消费过程中的各种信息,这些信息都是定时去处理的。

总结

​ 在RocketMQ中,推模式对比拉模式封装了非常多的功能,比如负载均衡、队列分配、消费进度存储、顺序消息、心跳检测等。

消息的拉取

​ 分析一下PUSH模式下的集群模式消息拉取代码。

​ 同一个消费组内有多个消费者,一个topic主题下又有多个消费队列,那么消费者是怎么分配这些消费队列的呢,从上面的启动的代码中是不是还记 得在MQClientInstance#start中,启动了pullMessageService服务线程,这个服务线程的作用就是拉取消息,我们去看下它的run方法:

源码跟踪
  1. MQClientInstance. start() -> PullMessageService. start()

  2. PullMessageService. pullMessage() -> DefaultMQPushConsumerImpl. pullMessage()方法

  3. pullMessage方法中包含了消息拉取的核心部分,包括处理暂停、流量控制、方法回调等。

核心关键点
流量控制
  1. 当processQueue没有消费的消息的数量达到(默认1000个)会触发流量控制。

  2. 当processQueue中没有消费的消息体总大小大于(默认100m)时,触发流控。

  3. 消息的最大位置和最小位置的差值如果大于默认值2000,那么触发流控。

总结

​ 消息进行拉取时的核心是流量控制,这个也是解决客户端与服务端消费能力不对等的一种方案。

消息的消费

消息拉取到了之后,消费者要进行消息的消费,消息的消费主要是consumeMessageService线程做的,我们先看下consumeMessageService的构造函数

源码跟踪

1)ConsumeMessageConcurrentlyService构造函数,在这个构造函数中,new了一个名字叫consumeExecutor的线程池,在并发消费的模式下,这个线程池也就是消费消息的方式

2)通过回调方式的模式,提交到consumeMessageService中(ConsumeMessageConcurrentlyService实现类),进入submitConsumeRequest方法,这个就是提交偏移量的处理。

submitConsumeRequest
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
//TODO 消费者提交
@Override
public void submitConsumeRequest(
final List<MessageExt> msgs,
final ProcessQueue processQueue,
final MessageQueue messageQueue,
final boolean dispatchToConsume) {
final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
if (msgs.size() <= consumeBatchSize) {
ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);
try {
//TODO 这里进行消费者线程提交
this.consumeExecutor.submit(consumeRequest);
} catch (RejectedExecutionException e) {
this.submitConsumeRequestLater(consumeRequest);
}
} else {
for (int total = 0; total < msgs.size(); ) {
List<MessageExt> msgThis = new ArrayList<MessageExt>(consumeBatchSize);
for (int i = 0; i < consumeBatchSize; i++, total++) {
if (total < msgs.size()) {
msgThis.add(msgs.get(total));
} else {
break;
}
}
//TODO 这里进行消费者线程提交
ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue);
try {
this.consumeExecutor.submit(consumeRequest);
} catch (RejectedExecutionException e) {
for (; total < msgs.size(); total++) {
msgThis.add(msgs.get(total));
}

this.submitConsumeRequestLater(consumeRequest);
}
}
}
}

3)再直接进入ConsumeMessageConcurrentlyService中的内部类 ConsumeRequest中的run方法

ConsumeRequest
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
//TODO 消费者请求处理线程
class ConsumeRequest implements Runnable {
private final List<MessageExt> msgs;
private final ProcessQueue processQueue;
private final MessageQueue messageQueue;

public ConsumeRequest(List<MessageExt> msgs, ProcessQueue processQueue, MessageQueue messageQueue) {
this.msgs = msgs;
this.processQueue = processQueue;
this.messageQueue = messageQueue;
}

public List<MessageExt> getMsgs() {
return msgs;
}

public ProcessQueue getProcessQueue() {
return processQueue;
}

@Override
public void run() {
//TODO 1:判断processQueue的dropped属性,这个属性在负载均衡中会处理,判断需不需要继续消费这个processQueue拉取到的消息
if (this.processQueue.isDropped()) {
log.info("the message queue not be able to consume, because it's dropped. group={} {}", ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue);
return;
}
//TODO 2:拿到业务系统定义的消息监听listener
MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;
ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue);
ConsumeConcurrentlyStatus status = null;
//TODO 3:判断是否有钩子函数,执行before方法
ConsumeMessageContext consumeMessageContext = null;
if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext = new ConsumeMessageContext();
consumeMessageContext.setConsumerGroup(defaultMQPushConsumer.getConsumerGroup());
consumeMessageContext.setProps(new HashMap<String, String>());
consumeMessageContext.setMq(messageQueue);
consumeMessageContext.setMsgList(msgs);
consumeMessageContext.setSuccess(false);
ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);
}
//TODO 4:调用resetRetryTopic方法设置消息的重试主题
long beginTimestamp = System.currentTimeMillis();
boolean hasException = false;
ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
try {
ConsumeMessageConcurrentlyService.this.resetRetryTopic(msgs);
if (msgs != null && !msgs.isEmpty()) {
for (MessageExt msg : msgs) {
MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis()));
}
}
//TODO 5:执行listener.consumeMessage,业务系统具体去消费消息,如果消费成功那么返回status返回CONSUME_SUCCESS,
// 如果有异常想重试,那么返回RECONSUME_LATER
status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
} catch (Throwable e) {
log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",
RemotingHelper.exceptionSimpleDesc(e),
ConsumeMessageConcurrentlyService.this.consumerGroup,
msgs,
messageQueue);
hasException = true;
}
long consumeRT = System.currentTimeMillis() - beginTimestamp;
if (null == status) {
if (hasException) {
returnType = ConsumeReturnType.EXCEPTION;
} else {
returnType = ConsumeReturnType.RETURNNULL;
}
} else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) {
returnType = ConsumeReturnType.TIME_OUT;
} else if (ConsumeConcurrentlyStatus.RECONSUME_LATER == status) {
returnType = ConsumeReturnType.FAILED;
} else if (ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status) {
returnType = ConsumeReturnType.SUCCESS;
}

if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name());
}

if (null == status) {
log.warn("consumeMessage return null, Group: {} Msgs: {} MQ: {}",
ConsumeMessageConcurrentlyService.this.consumerGroup,
msgs,
messageQueue);
status = ConsumeConcurrentlyStatus.RECONSUME_LATER;
}

if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext.setStatus(status.toString());
consumeMessageContext.setSuccess(ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status);
ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);
}

ConsumeMessageConcurrentlyService.this.getConsumerStatsManager()
.incConsumeRT(ConsumeMessageConcurrentlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);
//TODO 6: 对消费结果的处理
if (!processQueue.isDropped()) {
} else {
ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs);
}
}

public MessageQueue getMessageQueue() {
return messageQueue;
}

}

3.1)判断processQueue的dropped属性

3.2)拿到业务系统定义的消息监听listener

3.3)判断是否有钩子函数,执行before方法

3.4)调用resetRetryTopic方法设置消息的重试主题

3.5)执行listener.consumeMessage,业务系统具体去消费消息,

3.6)对消费结果的处理,进入processConsumeResult方法

3.6.1)集群模式下

3.6.2)失败的消息进入一个List

3.6.3)消费失败的数据会重新建立一个数据,使用一个定时任务,再次到 Client 中的消费者启动流程 . 源码跟踪 step 6(重试消息的时候会创建一个条新的消息,而不是用老的消息)

核心关键点

消费失败的数据会重新建立一个数据,使用一个定时任务,重试消息的时候会创建一个条新的消息,而不是用老的消息。

总结

​ 不管是消费成功还是消费失败的消息,都会更新消费进度,首先从 processQueue 中移除所有消费成功的消息并返回 offset,这里要注意一点,就是这个 offset 是 processQueue 中的 msgTreeMap 的最小的 key,为什么要这样做呢?因为消费进度的推进是 offset 决定的,因为是线程池消费,不能保证先消费的是 offset 大的那条消息,所以推进消费进度只能取最小的那条消息的 offset,这样在消费端重启的时候就可能会导致消息重复消费。

评论