抱歉,您的浏览器无法访问本站
本页面需要浏览器支持(启用)JavaScript
了解详情 >

RocketMQ消息发送

消息的发送

普通消息是指消息队列 RocketMQ 中无特性的消息,区别于有特性的定时/延时消息、顺序消息和事务消息。

RocketMQ支持3种消息发送方式:

  • 同步(sync)
  • 异步(async)
  • 单向(oneway)

单向发送

单向发送,见名知意,就是一种单方向通信方式,也就是说 producer 只负责发送消息,不等待 broker 发回响应结果,而且也没有回调函数触发,这也就意味着 producer 只发送请求不等待响应结果。

​ 单向(Oneway)发送特点为发送方只负责发送消息,不等待服务器回应且没有回调函数触发,简单地说,就是只管发,不在乎消息是否成功存储在消息服务器上,即只发送请求不等待应答。 此方式发送消息的过程耗时非常短,一般在微秒级别。

使用场景

​ 由于单向发送只是简单地发送消息,不需要等待响应,也没有回调接口触发,故发送消息所耗费的时间非常短,同时也意味着消息不可靠。所以这种单向发送比较适用于那些耗时要求非常短,但对可靠性要求并不高的场景,比如说日志收集。

代码演示

调用DefaultMQProducersendOneway方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class OnewayProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("rocket_test_consumer_group");
// 指定 name server 地址
producer.setNamesrvAddr("127.0.0.1:9876");
//初始化 Producer,整个应用生命周期内只需要初始化一次
producer.start();
try {
for (int i = 0; i < 100; i++) {
//创建一条消息对象,指定其主题、标签和消息内容
Message msg = new Message(
/* 消息主题名 */
"topicTest",
/* 消息标签 */
"TagA",
/* 消息内容 */
("Hello Java demo RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);

//发送消息不关心返回
producer.sendOneway(msg);
System.out.println("发送消息" + i);
}
} finally {
//关闭
producer.shutdown();
}
}
}

同步发送

发送者向MQ执行发送消息API时,同步等待,直到消息服务器返回发送结果,会在收到接收方发回响应之后才发下一个数据包的通讯方式,这种方式只有在消息完全发送完成之后才返回结果,此方式存在需要同步等待发送结果的时间代价。

​ 简单来说,同步发送就是指 producer 发送消息后,会在接收到 broker 响应后才继续发下一条消息的通信方式。

使用场景

​ 由于这种同步发送的方式确保了消息的可靠性,同时也能及时得到消息发送的结果,故而适合一些发送比较重要的消息场景,比如说重要的通知邮件、营销短信等等。在实际应用中,这种同步发送的方式还是用得比较多的。

注意事项

​ 这种方式具有内部重试机制,即在主动声明本次消息发送失败之前,内部实现将重试一定次数,默认为2次(DefaultMQProducer#getRetryTimesWhenSendFailed)。 发送的结果存在同一个消息可能被多次发送给给broker,这里需要应用的开发者自己在消费端处理幂等性问题。

代码演示

调用DefaultMQProducersend方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class SyncProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("rocket_test_consumer_group");
// 指定 name server 地址
producer.setNamesrvAddr("127.0.0.1:9876");
//初始化 Producer,整个应用生命周期内只需要初始化一次
producer.start();
for (int i = 0; i < 100; i++) {
//创建一条消息对象,指定其主题、标签和消息内容
Message msg = new Message(
/* 消息主题名 */
"topicTest",
/* 消息标签 */
"TagA",
/* 消息内容 */
("Hello Java demo RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
//发送消息并返回结果
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
// 一旦生产者实例不再被使用则将其关闭,包括清理资源,关闭网络连接等
producer.shutdown();
}
}

异步发送

异步发送是指发送方发出数据后,不等接收方发回响应,接着发送下个数据包的通讯方式。 MQ 的异步发送,需要用户实现异步发送回调接口(SendCallback

​ 异步发送是指 producer 发出一条消息后,不需要等待 broker 响应,就接着发送下一条消息的通信方式。需要注意的是,不等待 broker 响应,并不意味着 broker 不响应,而是通过回调接口来接收 broker 的响应。所以要记住一点,异步发送同样可以对消息的响应结果进行处理。

使用场景

​ 由于异步发送不需要等待 broker 的响应,故在一些比较注重 RT(响应时间)的场景就会比较适用。比如,在一些视频上传的场景,我们知道视频上传之后需要进行转码,如果使用同步发送的方式来通知启动转码服务,那么就需要等待转码完成才能发回转码结果的响应,由于转码时间往往较长,很容易造成响应超时。此时,如果使用的是异步发送通知转码服务,那么就可以等转码完成后,再通过回调接口来接收转码结果的响应了。

注意事项

注意:RocketMQ内部只对同步模式做了重试,异步发送模式是没有自动重试的,需要自己手动实现

1
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;

timesTotal 是总的重试次数

代码演示

调用DefaultMQProducersend方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class AsyncProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("rocket_test_consumer_group");
// 指定 name server 地址
producer.setNamesrvAddr("127.0.0.1:9876");
//初始化 Producer,整个应用生命周期内只需要初始化一次
producer.start();
//禁止失败重试
producer.setRetryTimesWhenSendAsyncFailed(100);
for (int i = 0; i < 100; i++) {
final int index = i;
//创建一条消息对象,指定其主题、标签和消息内容
Message msg = new Message(
/* 消息主题名 */
"topicTest",
/* 消息标签 */
"TagA",
/* 消息内容 */
("Hello Java demo RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
producer.send(msg, new SendCallback() {
public void onSuccess(SendResult sendResult) {
System.out.printf("%-10d OK %s %n", index,
sendResult.getMsgId());
}

public void onException(Throwable e) {
System.out.printf("%-10d Exception %s %n", index, e);
e.printStackTrace();
}
});
}
// 休眠10秒钟,否则当producer关闭时,无法接收mq的异步回调结果
TimeUnit.SECONDS.sleep(10);
// 一旦生产者实例不再被使用则将其关闭,包括清理资源,关闭网络连接等
producer.shutdown();
}
}

发送消息小结

消息发送的权衡

下面通过一张表格,简单总结一下同步发送、异步发送和单向发送的特点。

发送方式 发送 TPS 发送结果反馈 可靠性 适用场景
同步发送 一般 不丢失 重要的通知场景
异步发送 不丢失 比较注重 RT(响应时间)的场景
单向发送 最快 可能丢失 可靠性要求并不高的场景

​ 可以看到,从发送 TPS 来看,由于单向发送不需要等待响应也没有回调接口触发,发送速度非常快,一般都是微秒级的,在消息体大小一样的情况下,其发送 TPS 最大。而同步发送,需要等待响应结果的返回,受网络状况的影响较大,故发送 TPS 就比较小。异步发送不等待响应结果,发送消息时几乎不受网络的影响,故相比同步发送来说,其发送 TPS 要大得多。

​ 关于可靠性,大家需要牢记前面提过的,异步发送并不意味着消息不可靠,异步发送也是会接收到响应结果,也能对响应结果进行处理。即使发送失败,也可以通过一些补偿手段进行消息重发。和同步发送比起来,异步发送的发送 TPS 更大,更适合那些调用链路较长的一些场景。在实际使用中,同步发送和异步发送都是较为常用的两种方式,大家要视具体业务场景进行合理地选择。

使用场景

在实际使用场景中,利用何种发送方式,可以总结如下:

  • 当发送的消息不重要时,采用one-way方式,以提高吞吐量;
  • 当发送的消息很重要是,且对响应时间不敏感的时候采用sync方式;
  • 当发送的消息很重要,且对响应时间非常敏感的时候采用async方式;
各种发送方法整理

各种发送方法整理

批量消息发送

​ 以前我们发送消息的时候,都是一个一个的发送,这样效率比较低下。能不能一次发送多个消息呢?当然是可以的,RocketMQ为我们提供了这样的功能。

使用条件

但是它也有一些使用的条件:

  • 同一批发送的消息的Topic必须相同;
  • 同一批消息的waitStoreMsgOK 必须相同;
  • 批量发送的消息不支持延迟;
  • 同一批次的消息,大小不能超过1MiB;

代码演示

好了,只要我们满足上面的这些限制,就可以使用批量发送了,我们来看看发送端的代码吧,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class BatchProducer {
public static void main(String[] args) throws Exception {
//创建一个消息生产者,并设置一个消息生产者组
DefaultMQProducer producer = new DefaultMQProducer("rocket_test_consumer_group");

//指定 NameServer 地址
producer.setNamesrvAddr("127.0.0.1:9876");

//初始化 Producer,整个应用生命周期内只需要初始化一次
producer.start();
//需要发送消息的集合
List<Message> messageList = new ArrayList<>();
for (int i = 0; i < 100; i++) {
//创建一条消息对象,指定其主题、标签和消息内容
Message msg = new Message(
/* 消息主题名 */
"topicTest",
/* 消息标签 */
"TagA",
/* 消息内容 */
("Hello Java demo RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
//将需要发送的消息装进list中
messageList.add(msg);
}
//批量发送消息并返回结果
SendResult sendResult = producer.send(messageList);
System.out.printf("%s%n", sendResult);

// 一旦生产者实例不再被使用则将其关闭,包括清理资源,关闭网络连接等
producer.shutdown();
}
}

​ 其实批量发送很简单,我们只是把消息放到一个List当中,然后统一的调用send方法发送就可以了,消费端的代码没有任何的变化,正常的接收消息就可以了

MQ的重试机制

​ 由于MQ经常处于复杂的分布式系统中,考虑网络波动、服务宕机、程序异常因素,很有可能出现消息发送或者消费失败的问题。因此,消息的重试就是所有MQ中间件必须考虑到的一个关键点。如果没有消息重试,就可能产生消息丢失的问题,可能对系统产生很大的影响。所以,秉承宁可多发消息,也不可丢失消息的原则,大部分MQ都对消息重试提供了很好的支持。

​ RocketMQ为使用者封装了消息重试的处理流程,无需开发人员手动处理。RocketMQ支持了生产端和消费端两类重试机制。

生产端重试

​ 如果由于网络抖动等原因,Producer程序向Broker发送消息时没有成功,即发送端没有收到Broker的ACK,导致最终Consumer无法消费消息,此时RocketMQ会自动进行重试。

​ DefaultMQProducer可以设置消息发送失败的最大重试次数,并可以结合发送的超时时间来进行重试的处理,具体API如下:

1
2
3
4
5
6
7
8
9
10
11

//设置消息发送失败时的最大重试次数
public void setRetryTimesWhenSendFailed(int retryTimesWhenSendFailed) {
this.retryTimesWhenSendFailed = retryTimesWhenSendFailed;
}

//同步发送消息,并指定超时时间
public SendResult send(Message msg,
long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return this.defaultMQProducerImpl.send(msg, timeout);

示例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class RetryProducer {
public static void main(String[] args) throws UnsupportedEncodingException, InterruptedException, RemotingException, MQClientException, MQBrokerException {
//创建一个消息生产者,并设置一个消息生产者组
DefaultMQProducer producer = new DefaultMQProducer("rocket_test_consumer_group");

//指定 NameServer 地址
producer.setNamesrvAddr("127.0.0.1:9876");
//设置重试次数(默认2次)
producer.setRetryTimesWhenSendFailed(3000);
//初始化 Producer,整个应用生命周期内只需要初始化一次
producer.start();
Message msg = new Message(
/* 消息主题名 */
"topicTest",
/* 消息标签 */
"TagA",
/* 消息内容 */
("Hello Java demo RocketMQ ").getBytes(RemotingHelper.DEFAULT_CHARSET));
//发送消息并返回结果,设置超时时间 5ms 所以每次都会发送失败
SendResult sendResult = producer.send(msg, 5);

System.out.printf("%s%n", sendResult);
// 一旦生产者实例不再被使用则将其关闭,包括清理资源,关闭网络连接等
producer.shutdown();
}
}

超时重试纠正

超时重试`针对网上说的超时异常会重试的说法都是错误的,百度查的所以文章都说超时异常都会重试,经过产看源码才发现这个问题。

​ 发现这个问题,是因为我上面超时时间设置为5毫秒 ,按照正常肯定会报超时异常,但我设置1次重试和3000次的重试,虽然最终都会报下面异常,但输出错误时间报显然不应该是一个级别。但测试发现无论我设置的多少次的重试次数,报异常的时间都差不多。

1
org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException: sendDefaultImpl call timeout
重试解惑

针对这个疑惑,我去看了源码之后,才恍然大悟。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/**
* 说明 抽取部分代码
*/
private SendResult sendDefaultImpl(Message msg, final CommunicationMode communicationMode, final SendCallback sendCallback, final long timeout) {

//1、获取当前时间
long beginTimestampFirst = System.currentTimeMillis();
long beginTimestampPrev ;
//2、去服务器看下有没有主题消息
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
if (topicPublishInfo != null && topicPublishInfo.ok()) {
boolean callTimeout = false;
//3、通过这里可以很明显看出 如果不是同步发送消息 那么消息重试只有1次
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
//4、根据设置的重试次数,循环再去获取服务器主题消息
for (times = 0; times < timesTotal; times++) {
MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
beginTimestampPrev = System.currentTimeMillis();
long costTime = beginTimestampPrev - beginTimestampFirst;
//5、前后时间对比 如果前后时间差 大于 设置的等待时间 那么直接跳出for循环了 这就说明连接超时是不进行多次连接重试的
if (timeout < costTime) {
callTimeout = true;
break;

}
//6、如果超时直接报错
if (callTimeout) {
throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout");
}
}
}
重试总结

通过这段源码很明显可以看出以下几点

  1. 如果是异步发送那么重试次数只有1次
  2. 对于同步而言,超时异常也是不会再去重试
  3. 如果发生重试是在一个for 循环里去重试,所以它是立即重试而不是隔一段时间去重试。

RocketMQ使用注意

Broker 故障规避

​ 如果开启的话,发送异常后会根据配置计算一个不可用时间,然后选择broker的时候会判断是否在规避时间内,在的话就不跳过这个broker

禁止自动创建Topic

自动创建topic流程

autoCreateTopicEnable设置为true 标识开启自动创建topic

流程如下:

  1. 消息发送时如果根据topic没有获取到 路由信息,则会根据默认的topic去获取,获取到路由信息后选择一个队列进行发送,发送时报文会带上默认的topic以及默认的队列数量。
  2. 消息到达broker后,broker检测没有topic的路由信息,则查找默认topic的路由信息,查到表示开启了自动创建topic,则会根据消息内容中的默认的队列数量在本broker上创建topic,然后进行消息存储。
  3. broker创建topic后并不会马上同步给namesrv,而是每30进行汇报一次,更新namesrv上的topic路由信息,producer会每30s进行拉取一次topic的路由信息,更新完成后就可以正常发送消息。更新之前一直都是按照默认的topic查找路由信息。
禁用自动创建topic

为什么生产不能开启自动创建topic

​ 上述 broker 中流程会有一个问题,就是在producer更新路由信息之前的这段时间,如果消息只发送到了broker-a,则broker-b上不会创建这个topic的路由信息,broker互相之间不通信。当producer更新之后,获取到的broker列表只有broker-a,就永远不会轮询到broker-b的队列(因为没有路由信息),所以我们生产通常关闭自动创建broker,而是采用手动创建的方式。

消息发布流程

生产者启动流程

DefalutMQProducer 是默认的消息生产者实现类,它实现 MQAdmin 接口。

  1. 检查 productGroup 是否符合要求
  2. 创建 MQClientInstance 实例,整个 JVM 实例中只存在一个 MQClientManager 实例,维护一个 MQClientInstance 缓存表,也就是同一个 clientId 只会创建一个 MQClientInstance。
  3. 启动 MQClientInstance 实例。

消息发送基本流程

消息发送流程主要的步骤:

消息长度验证

​ 默认消息发送以同步方式发送,默认超时时间为 3s。

查找主题路由信息

​ 消息发送之前,需要知道主题的路由信息,以此确定发送具体的 Broker 节点。

选择消息队列

​ 根据路由信息选择消息队列(MessageQueue)。

消息发送

消息发送的 API 核心入口:DefaultMQProducerImpl#sendKernelImpl

  1. 根据 MessageQueue 获取 Broker 的网络地址。
  2. 为消息分配全局唯一 ID,如果消息体默认超过 4K,会对消息体采用 zip 压缩。
  3. 如果注册了消息发送钩子函数,则执行消息发送之前的增强逻辑。
  4. 构建消息发送请求包。
  5. 根据消息发送方式,同步、异步、单向方式进行网络传输。
  6. 如果注册了消息发送钩子函数,执行 after 逻辑。

评论