抱歉,您的浏览器无法访问本站
本页面需要浏览器支持(启用)JavaScript
了解详情 >

RabbitMQ-延迟队列

什么是延迟队列

延时队列,首先,它是一种队列,队列意味着内部的元素是有序的,元素出队和入队是有方向性的,元素从一端进入,从另一端取出。

​ 其次,延时队列,最重要的特性就体现在它的延时属性上,跟普通的队列不一样的是,普通队列中的元素总是等着希望被早点取出处理,而延时队列中的元素则是希望被在指定时间得到取出和处理,所以延时队列中的元素是都是带时间属性的,通常来说是需要被处理的消息或者任务。

​ 简单来说,延时队列就是用来存放需要在指定时间被处理的元素的队列。

延时队列使用场景

那么什么时候需要用延时队列呢?考虑一下以下场景:

  1. 订单在十分钟之内未支付则自动取消。
  2. 新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒。
  3. 账单在一周内未支付,则自动结算。
  4. 用户注册成功后,如果三天内没有登陆则进行短信提醒。
  5. 用户发起退款,如果三天内没有得到处理则通知相关运营人员。
  6. 预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议。

​ 这些场景都有一个特点,需要在某个事件发生之后或者之前的指定时间点完成某一项任务,如:发生订单生成事件,在十分钟之后检查该订单支付状态,然后将未支付的订单进行关闭;发生店铺创建事件,十天后检查该店铺上新商品数,然后通知上新数为0的商户;发生账单生成事件,检查账单支付状态,然后自动结算未支付的账单;发生新用户注册事件,三天后检查新注册用户的活动数据,然后通知没有任何活动记录的用户;发生退款事件,在三天之后检查该订单是否已被处理,如仍未被处理,则发送消息给相关运营人员;发生预定会议事件,判断离会议开始是否只有十分钟了,如果是,则通知各个与会人员。

​ 看起来似乎使用定时任务,一直轮询数据,每秒查一次,取出需要被处理的数据,然后处理不就完事了吗?如果数据量比较少,确实可以这样做,比如:对于“如果账单一周内未支付则进行自动结算”这样的需求,如果对于时间不是严格限制,而是宽松意义上的一周,那么每天晚上跑个定时任务检查一下所有未支付的账单,确实也是一个可行的方案。但对于数据量比较大,并且时效性较强的场景,如:“订单十分钟内未支付则关闭“,短期内未支付的订单数据可能会有很多,活动期间甚至会达到百万甚至千万级别,对这么庞大的数据量仍旧使用轮询的方式显然是不可取的,很可能在一秒内无法完成所有订单的检查,同时会给数据库带来很大压力,无法满足业务要求而且性能低下。

​ 这时候,延时队列就可以闪亮登场了,以上场景,正是延时队列的用武之地。

​ 既然延时队列可以解决很多特定场景下,带时间属性的任务需求,那么如何构造一个延时队列呢?接下来,本文将介绍如何用RabbitMQ来实现延时队列。

RabbitMQ中的TTL

​ 在介绍延时队列之前,还需要先介绍一下RabbitMQ中的一个高级特性——TTL(Time To Live)

TTL是什么呢?TTL是RabbitMQ中一个消息或者队列的属性,表明一条消息或者该队列中的所有消息的最大存活时间,单位是毫秒。换句话说,如果一条消息设置了TTL属性或者进入了设置TTL属性的队列,那么这条消息如果在TTL设置的时间内没有被消费,则会成为“死信”(至于什么是死信,请翻看上一篇)。如果同时配置了队列的TTL和消息的TTL,那么较小的那个值将会被使用。

那么,如何设置这个TTL值呢?有两种方式,第一种是在创建队列的时候设置队列的“x-message-ttl”属性,如下:

1
2
3
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-message-ttl", 6000);
channel.queueDeclare(queueName, durable, exclusive, autoDelete, args);

这样所有被投递到该队列的消息都最多不会存活超过6s。

另一种方式便是针对每条消息设置TTL,代码如下:

1
2
3
4
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
builder.expiration("6000");
AMQP.BasicProperties properties = builder.build();
channel.basicPublish(exchangeName, routingKey, mandatory, properties, "msg body".getBytes());

这样这条消息的过期时间也被设置成了6s。

​ 但这两种方式是有区别的,如果设置了队列的TTL属性,那么一旦消息过期,就会被队列丢弃,而第二种方式,消息即使过期,也不一定会被马上丢弃,因为消息是否过期是在即将投递到消费者之前判定的,如果当前队列有严重的消息积压情况,则已过期的消息也许还能存活较长时间。

​ 另外,还需要注意的一点是,如果不设置TTL,表示消息永远不会过期,如果将TTL设置为0,则表示除非此时可以直接投递该消息到消费者,否则该消息将会被丢弃。

实现延时队列

​ 前一篇里介绍了如果设置死信队列,前文中又介绍了TTL,至此,利用RabbitMQ实现延时队列的两大要素已经集齐,接下来只需要将它们进行调和,再加入一点点调味料,延时队列就可以新鲜出炉了。

​ 想想看,延时队列,不就是想要消息延迟多久被处理吗,TTL则刚好能让消息在延迟多久之后成为死信,另一方面,成为死信的消息都会被投递到死信队列里,这样只需要消费者一直消费死信队列里的消息就万事大吉了,因为里面的消息都是希望被立即处理的消息。

从下图可以大致看出消息的流向

​ 生产者生产一条延时消息,根据需要延时时间的不同,利用不同的routingkey将消息路由到不同的延时队列,每个队列都设置了不同的TTL属性,并绑定在同一个死信交换机中,消息过期后,根据routingkey的不同,又会被路由到不同的死信队列中,消费者只需要监听对应的死信队列进行处理即可。

配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
server:
port: 8080

spring:
rabbitmq:
host: 192.168.64.133
port: 5672
username: root
password: root
###开启消息确认机制 confirms
virtual-host: /
publisher-returns: true
#采用手动应答
listener:
simple:
acknowledge-mode: manual

配置类

交换器配置

先声明交换机、队列以及他们的绑定关系:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
@Configuration
public class RabbitMQConfig {

public static final String DELAY_EXCHANGE_NAME = "delay.queue.demo.business.exchange";
public static final String DELAY_QUEUE_A_NAME = "delay.queue.demo.business.queuea";
public static final String DELAY_QUEUE_B_NAME = "delay.queue.demo.business.queueb";
public static final String DELAY_QUEUE_A_ROUTING_KEY = "delay.queue.demo.business.queuea.routingkey";
public static final String DELAY_QUEUE_B_ROUTING_KEY = "delay.queue.demo.business.queueb.routingkey";
public static final String DEAD_LETTER_EXCHANGE = "delay.queue.demo.deadletter.exchange";
public static final String DEAD_LETTER_QUEUE_A_ROUTING_KEY = "delay.queue.demo.deadletter.delay_10s.routingkey";
public static final String DEAD_LETTER_QUEUE_B_ROUTING_KEY = "delay.queue.demo.deadletter.delay_60s.routingkey";
public static final String DEAD_LETTER_QUEUE_A_NAME = "delay.queue.demo.deadletter.queuea";
public static final String DEAD_LETTER_QUEUE_B_NAME = "delay.queue.demo.deadletter.queueb";

// 声明延时Exchange
@Bean("delayExchange")
public DirectExchange delayExchange() {
return new DirectExchange(DELAY_EXCHANGE_NAME);
}

// 声明死信Exchange
@Bean("deadLetterExchange")
public DirectExchange deadLetterExchange() {
return new DirectExchange(DEAD_LETTER_EXCHANGE);
}

// 声明延时队列A 延时10s
// 并绑定到对应的死信交换机
@Bean("delayQueueA")
public Queue delayQueueA() {
Map<String, Object> args = new HashMap<>(2);
// x-dead-letter-exchange 这里声明当前队列绑定的死信交换机
args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
// x-dead-letter-routing-key 这里声明当前队列的死信路由key
args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUE_A_ROUTING_KEY);
// x-message-ttl 声明队列的TTL
args.put("x-message-ttl", 1000 * 10);
return QueueBuilder.durable(DELAY_QUEUE_A_NAME).withArguments(args).build();
}

// 声明延时队列B 延时 60s
// 并绑定到对应的死信交换机
@Bean("delayQueueB")
public Queue delayQueueB() {
Map<String, Object> args = new HashMap<>(2);
// x-dead-letter-exchange 这里声明当前队列绑定的死信交换机
args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
// x-dead-letter-routing-key 这里声明当前队列的死信路由key
args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUE_B_ROUTING_KEY);
// x-message-ttl 声明队列的TTL
args.put("x-message-ttl", 1888 * 60);
return QueueBuilder.durable(DELAY_QUEUE_B_NAME).withArguments(args).build();
}

// 声明死信队列A 用于接收延时10s处理的消息
@Bean("deadLetterQueueA")
public Queue deadLetterQueueA() {
return new Queue(DEAD_LETTER_QUEUE_A_NAME);
}

// 声明死信队列B 用于接收延时60s处理的消息
@Bean("deadLetterQueueB")
public Queue deadLetterQueueB() {
return new Queue(DEAD_LETTER_QUEUE_B_NAME);
}

// 声明延时队列A绑定关系
@Bean
public Binding delayBindingA(@Qualifier("delayQueueA") Queue queue,
@Qualifier("delayExchange") DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(DELAY_QUEUE_A_ROUTING_KEY);
}

// 声明业务队列B绑定关系
@Bean
public Binding delayBindingB(@Qualifier("delayQueueB") Queue queue,
@Qualifier("delayExchange") DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(DELAY_QUEUE_B_ROUTING_KEY);
}

// 声明死信队列A绑定关系
@Bean
public Binding deadLetterBindingA(@Qualifier("deadLetterQueueA") Queue queue,
@Qualifier("deadLetterExchange") DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUE_A_ROUTING_KEY);
}

// 声明死信队列B绑定关系
@Bean
public Binding deadLetterBindingB(@Qualifier("deadLetterQueueB") Queue queue,
@Qualifier("deadLetterExchange") DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUE_B_ROUTING_KEY);
}
}
枚举类

定义相关枚举属性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

public enum DelayTypeEnum {
DELAY_10s(10), DELAY_60s(60);

private int duration = 0;

DelayTypeEnum(int duration) {
this.duration = duration;
}

public int getDuration() {
return duration;
}

public static DelayTypeEnum getType(int duration) {
return Stream.of(DelayTypeEnum.values()).filter((type) -> type.duration == duration).findFirst().get();
}
}

消费者

接下来,创建两个消费者,分别对两个死信队列的消息进行消费:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Component
public class DeadLetterQueueConsumer {

private static final Logger logger = LoggerFactory.getLogger(DeadLetterQueueConsumer.class);

@RabbitListener(queues = RabbitMQConfig.DEAD_LETTER_QUEUE_A_NAME)
public void receiveA(Message message, Channel channel) throws IOException {
String msg = new String(message.getBody());
logger.info("当前时间:{},死信队列A收到消息:{}", System.currentTimeMillis(), msg);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}

@RabbitListener(queues = RabbitMQConfig.DEAD_LETTER_QUEUE_B_NAME)
public void receiveB(Message message, Channel channel) throws IOException {
String msg = new String(message.getBody());
logger.info("当前时间:{},死信队列B收到消息:{}", new Date().toString(), msg);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Component
public class DelayMessageSender {

private static final Logger logger = LoggerFactory.getLogger(DelayMessageSender.class);

@Autowired
private RabbitTemplate rabbitTemplate;

public void sendMsg(String msg, DelayTypeEnum type) {
logger.info("发送延时消息,message:{}, duration:{},当前时间:", msg, type.getDuration(), System.currentTimeMillis());
switch (type) {
case DELAY_10s:
rabbitTemplate.convertAndSend(RabbitMQConfig.DELAY_EXCHANGE_NAME, RabbitMQConfig.DELAY_QUEUE_A_ROUTING_KEY, msg);
break;
case DELAY_60s:
rabbitTemplate.convertAndSend(RabbitMQConfig.DELAY_EXCHANGE_NAME, RabbitMQConfig.DELAY_QUEUE_B_ROUTING_KEY, msg);
break;
default:
rabbitTemplate.convertAndSend(RabbitMQConfig.DELAY_EXCHANGE_NAME, RabbitMQConfig.DELAY_QUEUE_A_ROUTING_KEY, msg);
break;
}
}
}

请求接口

接下来,我们暴露一个web接口来生产消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
RestController
@RequestMapping("/rabbitmq")
public class RabbitmqController {

@Autowired
private DelayMessageSender msgProducer;

/**
* 发送测试数据
*
* @param type 交换器类型
* @param message
* @return
*/
@RequestMapping("/send")
public String send(String message, int type) {
msgProducer.sendMsg(message, DelayTypeEnum.getType(type));
return "OK";
}
}

RabbitMQ控制台

打开rabbitMQ的管理后台,可以看到我们刚才创建的交换机和队列信息:

测试

接下来,我们来发送几条消息

1
2
3
4
5
6
7
8
2020-09-22 16:55:40.977  INFO 452 --- [nio-8080-exec-1] c.h.r.consumer.DelayMessageSender        : 发送延时消息,message:xxxxxxx, duration:60,当前时间:
2020-09-22 16:55:45.716 INFO 452 --- [nio-8080-exec-2] c.h.r.consumer.DelayMessageSender : 发送延时消息,message:xxxxxxx, duration:10,当前时间:
2020-09-22 16:55:53.164 INFO 452 --- [nio-8080-exec-3] c.h.r.consumer.DelayMessageSender : 发送延时消息,message:延时10s, duration:10,当前时间:
2020-09-22 16:55:55.746 INFO 452 --- [ntContainer#1-1] c.h.r.consumer.DeadLetterQueueConsumer : 当前时间:1600764955745,死信队列A收到消息:xxxxxxx
2020-09-22 16:56:01.333 INFO 452 --- [nio-8080-exec-4] c.h.r.consumer.DelayMessageSender : 发送延时消息,message:延时60s, duration:60,当前时间:
2020-09-22 16:56:03.170 INFO 452 --- [ntContainer#1-2] c.h.r.consumer.DeadLetterQueueConsumer : 当前时间:1600764963170,死信队列A收到消息:延时10s
2020-09-22 16:57:34.268 INFO 452 --- [ntContainer#0-1] c.h.r.consumer.DeadLetterQueueConsumer : 当前时间:Tue Sep 22 16:57:34 CST 2020,死信队列B收到消息:xxxxxxx
2020-09-22 16:57:54.619 INFO 452 --- [ntContainer#0-2] c.h.r.consumer.DeadLetterQueueConsumer : 当前时间:Tue Sep 22 16:57:54 CST 2020,死信队列B收到消息:延时60s

​ 第一条消息在6s后变成了死信消息,然后被消费者消费掉,第二条消息在60s之后变成了死信消息,然后被消费掉,这样,一个还算ok的延时队列就打造完成了。

​ 不过,等等,如果这样使用的话,岂不是每增加一个新的时间需求,就要新增一个队列,这里只有6s和60s两个时间选项,如果需要一个小时后处理,那么就需要增加TTL为一个小时的队列,如果是预定会议室然后提前通知这样的场景,岂不是要增加无数个队列才能满足需求??

RabbitMQ延时队列优化

​ 显然,需要一种更通用的方案才能满足需求,那么就只能将TTL设置在消息属性里了。我们来试一试。

配置类

增加一个延时队列,用于接收设置为任意延时时长的消息,增加一个相应的死信队列和routingkey:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
@Configuration
public class RabbitMQConfig {

public static final String DELAY_EXCHANGE_NAME = "delay.queue.demo.business.exchange";
public static final String DELAY_QUEUEC_NAME = "delay.queue.demo.business.queuec";
public static final String DELAY_QUEUEC_ROUTING_KEY = "delay.queue.demo.business.queuec.routingkey";
public static final String DEAD_LETTER_EXCHANGE = "delay.queue.demo.deadletter.exchange";
public static final String DEAD_LETTER_QUEUEC_ROUTING_KEY = "delay.queue.demo.deadletter.delay_anytime.routingkey";
public static final String DEAD_LETTER_QUEUEC_NAME = "delay.queue.demo.deadletter.queuec";

// 声明延时Exchange
@Bean("delayExchange")
public DirectExchange delayExchange(){
return new DirectExchange(DELAY_EXCHANGE_NAME);
}

// 声明死信Exchange
@Bean("deadLetterExchange")
public DirectExchange deadLetterExchange(){
return new DirectExchange(DEAD_LETTER_EXCHANGE);
}

// 声明延时队列C 不设置TTL
// 并绑定到对应的死信交换机
@Bean("delayQueueC")
public Queue delayQueueC(){
Map<String, Object> args = new HashMap<>(3);
// x-dead-letter-exchange 这里声明当前队列绑定的死信交换机
args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
// x-dead-letter-routing-key 这里声明当前队列的死信路由key
args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUEC_ROUTING_KEY);
return QueueBuilder.durable(DELAY_QUEUEC_NAME).withArguments(args).build();
}

// 声明死信队列C 用于接收延时任意时长处理的消息
@Bean("deadLetterQueueC")
public Queue deadLetterQueueC(){
return new Queue(DEAD_LETTER_QUEUEC_NAME);
}

// 声明延时列C绑定关系
@Bean
public Binding delayBindingC(@Qualifier("delayQueueC") Queue queue,
@Qualifier("delayExchange") DirectExchange exchange){
return BindingBuilder.bind(queue).to(exchange).with(DELAY_QUEUEC_ROUTING_KEY);
}

// 声明死信队列C绑定关系
@Bean
public Binding deadLetterBindingC(@Qualifier("deadLetterQueueC") Queue queue,
@Qualifier("deadLetterExchange") DirectExchange exchange){
return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEC_ROUTING_KEY);
}
}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Component
public class DeadLetterQueueConsumer {

private static final Logger logger = LoggerFactory.getLogger(DeadLetterQueueConsumer.class);
DateFormat dateFormat = DateFormat.getDateTimeInstance();

@RabbitListener(queues = RabbitMQConfig.DEAD_LETTER_QUEUEC_NAME)
public void receiveC(Message message, Channel channel) throws IOException {
String msg = new String(message.getBody());
Date sendDate = message.getMessageProperties().getTimestamp();
//延迟的秒数
long durationSec = (System.currentTimeMillis() - sendDate.getTime()) / 1000;
logger.info("接收到延时消息,消息:{},延时秒数:{},当前时间:{}", msg, durationSec, dateFormat.format(new Date()));
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}

生产者

使用spring的setExpiration方法来设置消息的过期时间

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Component
public class DelayMessageSender {

private static final Logger logger = LoggerFactory.getLogger(DelayMessageSender.class);
DateFormat dateFormat = DateFormat.getDateTimeInstance();

@Autowired
private RabbitTemplate rabbitTemplate;

public void sendMsg(String msg, int delayTime) {
logger.info("发送延时消息,message:{}, duration:{},当前时间:{}", msg, delayTime, dateFormat.format(new Date()));
rabbitTemplate.convertAndSend(RabbitMQConfig.DELAY_EXCHANGE_NAME, RabbitMQConfig.DELAY_QUEUEC_ROUTING_KEY, msg, x -> {
MessageProperties messageProperties = x.getMessageProperties();
messageProperties.setExpiration(String.valueOf(delayTime));
messageProperties.setTimestamp(new Date());
return x;
});
}
}

请求接口

这个接口可以传递延迟任意时间消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@RestController
@RequestMapping("/rabbitmq")
public class RabbitmqController {

@Autowired
private DelayMessageSender msgProducer;

/**
* 发送测试数据
*
* @param delayTime 交换器类型
* @param message
* @return
*/
@RequestMapping("/send")
public String send(String message, int delayTime) {
msgProducer.sendMsg(message, delayTime);
return "OK";
}
}

测试

正常测试

请求地址

1
2
http://localhost:8080/rabbitmq/send?message=test30s&delayTime=30000
http://localhost:8080/rabbitmq/send?message=test60s&delayTime=60000

测试数据

1
2
3
4
2020-09-23 10:17:41.397  INFO 17924 --- [nio-8080-exec-4] c.h.r.consumer.DelayMessageSender        : 发送延时消息,message:test30s, duration:30000,当前时间:2020-9-23 10:17:41
2020-09-23 10:17:47.317 INFO 17924 --- [nio-8080-exec-7] c.h.r.consumer.DelayMessageSender : 发送延时消息,message:test60s, duration:60000,当前时间:2020-9-23 10:17:47
2020-09-23 10:18:11.402 INFO 17924 --- [ntContainer#0-1] c.h.r.consumer.DeadLetterQueueConsumer : 接收到延时消息,消息:test30s,延时秒数:30,当前时间:2020-9-23 10:18:11
2020-09-23 10:18:47.321 INFO 17924 --- [ntContainer#0-1] c.h.r.consumer.DeadLetterQueueConsumer : 接收到延时消息,消息:test60s,延时秒数:60,当前时间:2020-9-23 10:18:47
非正常测试

​ 看起来似乎没什么问题,但不要高兴的太早,在最开始的时候,就介绍过,如果使用在消息属性上设置TTL的方式,消息可能并不会按时“死亡“,因为RabbitMQ只会检查第一个消息是否过期,如果过期则丢到死信队列,索引如果第一个消息的延时时长很长,而第二个消息的延时时长很短,则第二个消息并不会优先得到执行。

请求地址

1
2
http://localhost:8080/rabbitmq/send?message=test60s&delayTime=60000
http://localhost:8080/rabbitmq/send?message=test30s&delayTime=30000

测试数据

1
2
3
4
2020-09-23 10:19:30.793  INFO 17924 --- [io-8080-exec-10] c.h.r.consumer.DelayMessageSender        : 发送延时消息,message:test60s, duration:60000,当前时间:2020-9-23 10:19:30
2020-09-23 10:19:36.694 INFO 17924 --- [nio-8080-exec-1] c.h.r.consumer.DelayMessageSender : 发送延时消息,message:test30s, duration:30000,当前时间:2020-9-23 10:19:36
2020-09-23 10:20:30.797 INFO 17924 --- [ntContainer#0-1] c.h.r.consumer.DeadLetterQueueConsumer : 接收到延时消息,消息:test60s,延时秒数:60,当前时间:2020-9-23 10:20:30
2020-09-23 10:20:30.797 INFO 17924 --- [ntContainer#0-1] c.h.r.consumer.DeadLetterQueueConsumer : 接收到延时消息,消息:test30s,延时秒数:54,当前时间:2020-9-23 10:20:30

我们先发了一个延时时长为60s的消息,然后发了一个延时时长为30s的消息,结果显示,第二个消息会在等第一个消息成为死信后才会“死亡“。

利用RabbitMQ插件实现延迟队列

​ 上文中提到的问题,确实是一个硬伤,如果不能实现在消息粒度上添加TTL,并使其在设置的TTL时间及时死亡,就无法设计成一个通用的延时队列。

安装延时插件

linux安装
下载插件

​ 那如何解决这个问题呢?不要慌,安装一个插件即可:https://www.rabbitmq.com/community-plugins.html ,下载rabbitmq_delayed_message_exchange插件,然后解压放置到RabbitMQ的插件目录。

1
wget https://dl.bintray.com/rabbitmq/community-plugins/3.7.x/rabbitmq_delayed_message_exchange/rabbitmq_delayed_message_exchange-20171201-3.7.x.zip
安装插件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
unzip rabbitmq_delayed_message_exchange-20171201-3.7.x.zip -d /usr/lib/rabbitmq/lib/rabbitmq_server-3.7.4/plugins/

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

The following plugins have been configured:
rabbitmq_delayed_message_exchange
rabbitmq_management
rabbitmq_management_agent
rabbitmq_web_dispatch
Applying plugin configuration to rabbit@192-168-x-xx...
The following plugins have been enabled:
rabbitmq_delayed_message_exchange

started 1 plugins.
docker 安装
下载插件

https://www.rabbitmq.com/community-plugins.html 下载ez文件

1
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/v3.8.0/rabbitmq_delayed_message_exchange-3.8.0.ez
安装插件

然后拷贝到docker里面去,找到MQ插件的路径,我的是 /opt/rabbitmq/plugins

1
docker cp rabbitmq_delayed_message_exchange-3.8.0.ez 585512ab3fdd:/opt/rabbitmq/plugins

启用插件

然后 rabbitmq-plugins enable rabbitmq_delayed_message_exchange 即可 注意不要带上版本和.ez

1
rabbitmq-plugins enable rabbitmq_delayed_message_exchange

rabbitmq-plugins list 命令查看已安装插件

配置类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Configuration
public class RabbitMQConfig {
public static final String DELAYED_QUEUE_NAME = "delay.queue.demo.delay.queue";
public static final String DELAYED_EXCHANGE_NAME = "delay.queue.demo.delay.exchange";
public static final String DELAYED_ROUTING_KEY = "delay.queue.demo.delay.routingkey";

@Bean
public Queue immediateQueue() {
return new Queue(DELAYED_QUEUE_NAME);
}

@Bean
public CustomExchange customExchange() {
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct");
return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false, args);
}

@Bean
public Binding bindingNotify(@Qualifier("immediateQueue") Queue queue,
@Qualifier("customExchange") CustomExchange customExchange) {
return BindingBuilder.bind(queue).to(customExchange).with(DELAYED_ROUTING_KEY).noargs();
}
}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Component
public class DeadLetterQueueConsumer {

private static final Logger logger = LoggerFactory.getLogger(DeadLetterQueueConsumer.class);
DateFormat dateFormat = DateFormat.getDateTimeInstance();

@RabbitListener(queues = RabbitMQConfig.DELAYED_QUEUE_NAME)
public void receiveDelay(Message message, Channel channel) throws IOException {
String msg = new String(message.getBody());
Date sendDate = message.getMessageProperties().getTimestamp();
//延迟的秒数
long durationSec = (System.currentTimeMillis() - sendDate.getTime()) / 1000;
logger.info("接收到延时消息,消息:{},延时秒数:{},当前时间:{}", msg, durationSec, dateFormat.format(new Date()));
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}

生产者

使用spring的setDelay方法来设置消息的过期时间

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Component
public class DelayMessageSender {

private static final Logger logger = LoggerFactory.getLogger(DelayMessageSender.class);
DateFormat dateFormat = DateFormat.getDateTimeInstance();

@Autowired
private RabbitTemplate rabbitTemplate;

public void sendMsg(String msg, int delayTime) {
logger.info("发送延时消息,message:{}, duration:{},当前时间:{}", msg, delayTime, dateFormat.format(new Date()));
rabbitTemplate.convertAndSend(RabbitMQConfig.DELAYED_EXCHANGE_NAME, RabbitMQConfig.DELAYED_ROUTING_KEY, msg, x -> {
MessageProperties messageProperties = x.getMessageProperties();
messageProperties.setDelay(delayTime);
messageProperties.setTimestamp(new Date());
return x;
});
}
}

请求接口

这个接口可以传递延迟任意时间消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@RestController
@RequestMapping("/rabbitmq")
public class RabbitmqController {

@Autowired
private DelayMessageSender msgProducer;

/**
* 发送测试数据
*
* @param delayTime 交换器类型
* @param message
* @return
*/
@RequestMapping("/send")
public String send(String message, int delayTime) {
msgProducer.sendMsg(message, delayTime);
return "OK";
}
}

测试

正常测试

请求地址

1
2
http://localhost:8080/rabbitmq/send?message=test30s&delayTime=30000
http://localhost:8080/rabbitmq/send?message=test60s&delayTime=60000

测试数据

1
2
3
4
2020-09-23 11:06:15.676  INFO 11072 --- [nio-8080-exec-5] c.h.r.consumer.DelayMessageSender        : 发送延时消息,message:test60s, duration:60000,当前时间:2020-9-23 11:06:15
2020-09-23 11:06:17.308 INFO 11072 --- [nio-8080-exec-6] c.h.r.consumer.DelayMessageSender : 发送延时消息,message:test60s, duration:60000,当前时间:2020-9-23 11:06:17
2020-09-23 11:07:15.680 INFO 11072 --- [ntContainer#0-1] c.h.r.consumer.DeadLetterQueueConsumer : 接收到延时消息,消息:test60s,延时秒数:60,当前时间:2020-9-23 11:07:15
2020-09-23 11:07:17.312 INFO 11072 --- [ntContainer#0-1] c.h.r.consumer.DeadLetterQueueConsumer : 接收到延时消息,消息:test60s,延时秒数:60,当前时间:2020-9-23 11:07:17
非正常测试

请求地址

1
2
http://localhost:8080/rabbitmq/send?message=test60s&delayTime=60000
http://localhost:8080/rabbitmq/send?message=test30s&delayTime=30000

测试数据

1
2
3
4
2020-09-23 11:09:45.823  INFO 11072 --- [nio-8080-exec-4] c.h.r.consumer.DelayMessageSender        : 发送延时消息,message:test60s, duration:60000,当前时间:2020-9-23 11:09:45
2020-09-23 11:09:47.402 INFO 11072 --- [nio-8080-exec-5] c.h.r.consumer.DelayMessageSender : 发送延时消息,message:test30s, duration:30000,当前时间:2020-9-23 11:09:47
2020-09-23 11:10:17.408 INFO 11072 --- [ntContainer#0-1] c.h.r.consumer.DeadLetterQueueConsumer : 接收到延时消息,消息:test30s,延时秒数:30,当前时间:2020-9-23 11:10:17
2020-09-23 11:10:45.826 INFO 11072 --- [ntContainer#0-1] c.h.r.consumer.DeadLetterQueueConsumer : 接收到延时消息,消息:test60s,延时秒数:60,当前时间:2020-9-23 11:10:45

第二个消息被先消费掉了,符合预期。至此,RabbitMQ实现延时队列的部分就完结了。

总结

​ 延时队列在需要延时处理的场景下非常有用,使用RabbitMQ来实现延时队列可以很好的利用RabbitMQ的特性,如:消息可靠发送、消息可靠投递、死信队列来保障消息至少被消费一次以及未被正确处理的消息不会被丢弃。另外,通过RabbitMQ集群的特性,可以很好的解决单点故障问题,不会因为单个节点挂掉导致延时队列不可用或者消息丢失。

​ 当然,延时队列还有很多其它选择,比如利用Java的DelayQueu,利用Redis的zset,利用Quartz或者利用kafka的时间轮,这些方式各有特点,但就像炉石传说一般,这些知识就好比手里的卡牌,知道的越多,可以用的卡牌也就越多,遇到问题便能游刃有余,所以需要大量的知识储备和经验积累才能打造出更出色的卡牌组合,让自己解决问题的能力得到更好的提升。

​ 但另一方面,随着时间的流逝和阅历的增长,越来越感觉到自己的能力有限,无法独自面对纷繁复杂且多变的业务需求,在很多方面需要其他人的协助才能很好的完成任务。也知道闻道有先后,术业有专攻,不会再狂妄自大,觉得自己能把所有事情都搞定,也将重心慢慢转移到研究如何有效的进行团队合作上来,我相信一个高度协调的团队永远比一个人战斗要更有价值。

评论