抱歉,您的浏览器无法访问本站
本页面需要浏览器支持(启用)JavaScript
了解详情 >

RocketMQ顺序消息

顺序消费

无序消息

无序消息也指普通的消息,Producer 只管发送消息,Consumer 只管接收消息,至于消息和消息之间的顺序并没有保证。

  • Producer 依次发送 orderId 为 1、2、3 的消息

  • Consumer 接到的消息顺序有可能是 1、2、3,也有可能是 2、1、3 等情况,这就是普通消息。

全局顺序

对于指定的一个 Topic,所有消息按照严格的先入先出(FIFO)的顺序进行发布和消费

​ 比如 Producer 发送orderId 1,3,2 的消息, 那么 Consumer 也必须要按照 1,3,2 的顺序进行消费。

全局顺序

局部顺序

在实际开发有些场景中,我并不需要消息完全按照完全按的先进先出,而是某些消息保证先进先出就可以了。

​ 就好比一个订单涉及 订单生成订单支付订单完成。我不用管其它的订单,只保证同样订单ID能保证这个顺序就可以了。

img

Rocket顺序消息

RocketMQ可以严格的保证消息有序。但这个顺序,不是全局顺序,只是分区(queue)顺序。要全局顺序只能一个分区。

​ 之所以出现你这个场景看起来不是顺序的,是因为发送消息的时候,消息发送默认是会采用轮询的方式发送到不通的queue(分区)

实现原理

​ 我们知道 生产的message最终会存放在Queue中,如果一个Topic关联了16个Queue,如果我们不指定消息往哪个队列里放,那么默认是平均分配消息到16个queue,

​ 好比有100条消息,那么这100条消息会平均分配在这16个Queue上,那么每个Queue大概放5~6个左右。这里有一点很重的是:同一个queue,存储在里面的message 是按照先进先出的原则

​ 这个时候思路就来了,好比有orderId=1的3条消息,分别是 订单生产订单付款订单完成。只要保证它们放到同一个Queue那就保证消费者先进先出了。

img

这就保证局部顺序了,即同一订单按照先后顺序放到同一Queue,那么取消息的时候就可以保证先进先取出。

那么全局消息呢

这个就简单啦,你把所有消息都放在一个Queue里,这样不就保证全局消息了。

就这么简单

​ 当然不是,这里还有很关键的一点,好比在一个消费者集群的情况下,消费者1先去Queue拿消息,它拿到了 订单生成,它拿完后,消费者2去queue拿到的是 订单支付

拿的顺序是没毛病了,但关键是先拿到不代表先消费完它。会存在虽然你消费者1先拿到订单生成,但由于网络等原因,消费者2比你真正的先消费消息。这是不是很尴尬了。

订单付款还是可能会比订单生成更早消费的情况。那怎么办。

分布式锁来了

Rocker采用的是分段锁,它不是锁整个Broker而是锁里面的单个Queue,因为只要锁单个Queue就可以保证局部顺序消费了。

Rocker采用的是分段锁,它不是锁整个Broker而是锁里面的单个Queue,因为只要锁单个Queue就可以保证局部顺序消费了。

所以最终的消费者这边的逻辑就是

  • 消费者1去Queue拿 订单生成,它就锁住了整个Queue,只有它消费完成并返回成功后,这个锁才会释放。

  • 然后下一个消费者去拿到 订单支付 同样锁住当前Queue,这样的一个过程来真正保证对同一个Queue能够真正意义上的顺序消费,而不仅仅是顺序取出。

消息类型对比

全局顺序与分区顺序对比
Topic消息类型 支持事务消息 支持定时/延时消息 性能
无序消息(普通、事务、定时/延时) 最高
分区顺序消息
全局顺序消息 一般
发送方式对比
Topic消息类型 支持可靠同步发送 支持可靠异步发送 支持Oneway发送
无序消息(普通、事务、定时/延时)
分区顺序消息
全局顺序消息

注意事项

  1. 顺序消息暂不支持广播模式。
  2. 顺序消息不支持异步发送方式,否则将无法严格保证顺序。
  3. 建议同一个 Group ID 只对应一种类型的 Topic,即不同时用于顺序消息和无序消息的收发。
  4. 对于全局顺序消息,建议创建实例个数 >=2。

代码示例

这里保证两点

  1. 生产端 同一orderID的订单放到同一个queue。
  2. 消费端 同一个queue取出消息的时候锁住整个queue,直到消费后再解锁。

相关代码

实体类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class ProductOrder {
private String orderId;
private String orderName;

public ProductOrder(String orderId, String orderName) {
this.orderId = orderId;
this.orderName = orderName;
}

public String getOrderId() {
return orderId;
}

public void setOrderId(String orderId) {
this.orderId = orderId;
}

public String getOrderName() {
return orderName;
}

public void setOrderName(String orderName) {
this.orderName = orderName;
}
}
Product(生产者)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
public class OrderProducer {
private static final List<ProductOrder> orderList = new ArrayList<>();

static {
orderList.add(new ProductOrder("XXX001", "订单创建"));
orderList.add(new ProductOrder("XXX001", "订单付款"));
orderList.add(new ProductOrder("XXX001", "订单完成"));
orderList.add(new ProductOrder("XXX002", "订单创建"));
orderList.add(new ProductOrder("XXX002", "订单付款"));
orderList.add(new ProductOrder("XXX002", "订单完成"));
orderList.add(new ProductOrder("XXX003", "订单创建"));
orderList.add(new ProductOrder("XXX003", "订单付款"));
orderList.add(new ProductOrder("XXX003", "订单完成"));
}

public static void main(String[] args) throws Exception {
//创建一个消息生产者,并设置一个消息生产者组
DefaultMQProducer producer = new DefaultMQProducer("rocket_test_consumer_group");

//指定 NameServer 地址
producer.setNamesrvAddr("127.0.0.1:9876");
//初始化 Producer,整个应用生命周期内只需要初始化一次
producer.start();

for (int i = 0; i < orderList.size(); i++) {
//获取当前order
ProductOrder order = orderList.get(i);
//创建一条消息对象,指定其主题、标签和消息内容
Message message = new Message(
/* 消息主题名 */
"topicTest",
/* 消息标签 */
order.getOrderId(),
/* 消息内容 */
(order.toString()).getBytes(RemotingHelper.DEFAULT_CHARSET)
);

//发送消息并返回结果
SendResult sendResult = producer.send(message, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object args) {
//arg的值其实就是下面传入 orderId
String orderid = (String) args;
//因为订单是String类型,所以通过hashCode转成int类型
int hashCode = orderid.hashCode();
//因为hashCode可能为负数 所以取绝对值
hashCode = Math.abs(hashCode);
//保证同一个订单号 一定分配在同一个queue上
long index = hashCode % mqs.size();
//根据索引选择不同的队列
return mqs.get((int) index);
}
}, order.getOrderId());

System.out.println("product: 发送状态:" + sendResult.getSendStatus() + ",存储queue:" + sendResult.getMessageQueue().getQueueId() + ",orderID:" + order.getOrderId() + ",type:" + order.getType());
}

// 一旦生产者实例不再被使用则将其关闭,包括清理资源,关闭网络连接等
producer.shutdown();
}
}
Consumer(消费者)

上面说过,消费者真正要达到消费顺序,需要分布式锁,所以这里需要将MessageListenerOrderly替换之前的MessageListenerConcurrently,因为它里面实现了分布式锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
public class OrderConsumer {
private static final Random random = new Random();

public static void main(String[] args) throws Exception {
//创建一个消息消费者,并设置一个消息消费者组
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("rocket_test_consumer_group");
//指定 NameServer 地址
consumer.setNamesrvAddr("127.0.0.1:9876");
//设置 Consumer 第一次启动时从队列头部开始消费还是队列尾部开始消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//订阅指定 Topic 下的所有消息
consumer.subscribe("topicTest", "*");

//注册消费的监听 这里注意顺序消费为MessageListenerOrderly 之前并发为ConsumeConcurrentlyContext
consumer.registerMessageListener(new MessageListenerOrderly() {
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext context) {
//默认 list 里只有一条消息,可以通过设置参数来批量接收消息
if (list != null) {
for (MessageExt ext : list) {
try {
try {
//模拟业务逻辑处理中...
TimeUnit.SECONDS.sleep(random.nextInt(10));
} catch (Exception e) {
e.printStackTrace();
}

//获取接收到的消息
String message = new String(ext.getBody(), RemotingHelper.DEFAULT_CHARSET);
//获取队列ID
int queueId = context.getMessageQueue().getQueueId();
//打印消息
System.out.println("Consumer-线程名称=[" + Thread.currentThread().getId() + "],接收queueId:[" + queueId + "],接收时间:[" + new Date().getTime() + "],消息=[" + message + "]");

} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
}
//消费成功提交偏移量
return ConsumeOrderlyStatus.SUCCESS;
}
});

// 消费者对象在使用之前必须要调用 start 初始化
consumer.start();
System.out.println("消息消费者已启动");
}
}

测试

生产者发送消息

看看生产者有没有把相同订单指定到同一个queue

1
2
3
4
5
6
7
8
9
product: 发送状态:SEND_OK,存储queue:3,orderID:XXX001,type:订单创建
product: 发送状态:SEND_OK,存储queue:3,orderID:XXX001,type:订单付款
product: 发送状态:SEND_OK,存储queue:3,orderID:XXX001,type:订单完成
product: 发送状态:SEND_OK,存储queue:2,orderID:XXX002,type:订单创建
product: 发送状态:SEND_OK,存储queue:2,orderID:XXX002,type:订单付款
product: 发送状态:SEND_OK,存储queue:2,orderID:XXX002,type:订单完成
product: 发送状态:SEND_OK,存储queue:1,orderID:XXX003,type:订单创建
product: 发送状态:SEND_OK,存储queue:1,orderID:XXX003,type:订单付款
product: 发送状态:SEND_OK,存储queue:1,orderID:XXX003,type:订单完成

通过测试结果可以看出:相同订单已经存到同一queue中了

消费者消费消息
单消费者

看看消费结果是不是我们需要的结果

1
2
3
4
5
6
7
8
9
10
消息消费者已启动
Consumer-线程名称=[31],接收queueId:[2],接收时间:[1608114160498],消息=[ProductOrder{orderId='XXX002', type='订单创建'}]
Consumer-线程名称=[29],接收queueId:[3],接收时间:[1608114165490],消息=[ProductOrder{orderId='XXX001', type='订单创建'}]
Consumer-线程名称=[33],接收queueId:[1],接收时间:[1608114167523],消息=[ProductOrder{orderId='XXX003', type='订单创建'}]
Consumer-线程名称=[31],接收queueId:[2],接收时间:[1608114169502],消息=[ProductOrder{orderId='XXX002', type='订单付款'}]
Consumer-线程名称=[29],接收queueId:[3],接收时间:[1608114172492],消息=[ProductOrder{orderId='XXX001', type='订单付款'}]
Consumer-线程名称=[31],接收queueId:[2],接收时间:[1608114173503],消息=[ProductOrder{orderId='XXX002', type='订单完成'}]
Consumer-线程名称=[33],接收queueId:[1],接收时间:[1608114173524],消息=[ProductOrder{orderId='XXX003', type='订单付款'}]
Consumer-线程名称=[29],接收queueId:[3],接收时间:[1608114179493],消息=[ProductOrder{orderId='XXX001', type='订单完成'}]
Consumer-线程名称=[33],接收queueId:[1],接收时间:[1608114182524],消息=[ProductOrder{orderId='XXX003', type='订单完成'}]

MessageListenerOrderly能够保证顺序消费,从图中我们也看到了期望的结果。图中的输出是只启动了一个消费者时的输出,看起来订单号还是混在一起,但是每组订单号之间是有序的。因为消息发送时被分配到了三个队列(参见前面生产者输出日志),那么这三个队列的消息被这唯一消费者消费。

消费异常

如果出现消费异常返回ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT后面的消息将无法消费。

1

多消费者
  • 如果启动2个消费者: 那么其中一个消费者对应消费2个队列,另一个消费者对应消费剩下的1个队列。

  • 如果启动3个消费者: 那么每个消费者都对应消费1个队列,订单号就区分开了。输出变为这样:

消费者A

1
2
3
Consumer-线程名称=[36],接收queueId:[2],接收时间:[1608115320966],消息=[ProductOrder{orderId='XXX002', type='订单创建'}]
Consumer-线程名称=[36],接收queueId:[2],接收时间:[1608115323966],消息=[ProductOrder{orderId='XXX002', type='订单付款'}]
Consumer-线程名称=[36],接收queueId:[2],接收时间:[1608115323966],消息=[ProductOrder{orderId='XXX002', type='订单完成'}]

消费者B

1
2
3
4
Consumer-线程名称=[37],接收queueId:[3],接收时间:[1608115319873],消息=[ProductOrder{orderId='XXX001', type='订单创建'}]
Consumer-线程名称=[37],接收queueId:[3],接收时间:[1608115326873],消息=[ProductOrder{orderId='XXX001', type='订单付款'}]
Consumer-线程名称=[37],接收queueId:[3],接收时间:[1608115333873],消息=[ProductOrder{orderId='XXX001', type='订单完成'}]

消费者C

1
2
3
4
Consumer-线程名称=[29],接收queueId:[1],接收时间:[1608115326417],消息=[ProductOrder{orderId='XXX003', type='订单创建'}]
Consumer-线程名称=[29],接收queueId:[1],接收时间:[1608115333418],消息=[ProductOrder{orderId='XXX003', type='订单付款'}]
Consumer-线程名称=[29],接收queueId:[1],接收时间:[1608115341418],消息=[ProductOrder{orderId='XXX003', type='订单完成'}]

小结

通过测试结果我们看出

  1. 消费消息的顺序并没有完全按照之前的先进先出,即没有满足全局顺序。
  2. 同一订单来讲,订单的 订单生成、订单支付、订单完成 消费顺序是保证的。

这是局部保证顺序消费就已经满足我们当前实际开发中的需求了。

rocketmq的顺序消息需要满足2点:

  1. Producer端保证发送消息有序,且发送到同一个队列。
  2. consumer端保证消费同一个队列。

评论