抱歉,您的浏览器无法访问本站
本页面需要浏览器支持(启用)JavaScript
了解详情 >

RabbitMQ-消息消费均衡

消息的获取方式

消息的消费者在获取消息时,主要有两种方式来获取消息

推送Consume

​ 其中消息推送Consume,属于一种推送模型。即在注册一个消费者后,RabbitMQ会在消息可用时,自动将消息进行推送给消费者。

​ 这种模式我们已经使用过很多次了,如下:

拉取Get

​ 消息的拉取Get方式,属于一种轮询模型,发送一次get请求,获得一个消息。如果此时RabbitMQ中没有消息,会获得一个表示空的回复。

​ 这种方式性能比较差,每获得一条消息,都要和RabbitMQ进行网络通信发出请求。而且对RabbitMQ来说,RabbitMQ无法进行任何优化,因为它永远不知道应用程序何时会发出请求。

​ 既然每发送一次get请求,只能获得一个消息,那么我们实现的时候,则要在一个循环里,不断去服务器get消息,如下:

消息的应答

​ 消费者收到的每一条消息都必须进行确认。消息确认后,RabbitMQ才会从队列删除这条消息,RabbitMQ不会为未确认的消息设置超时时间,它判断此消息是否需要重新投递给消费者的唯一依据是消费该消息的消费者连接是否已经断开。这么设计的原因是RabbitMQ允许消费者消费一条消息的时间可以很久很久。

自动确认

​ 消费者在声明队列时,可以指定autoAck参数,当autoAck=true时,一旦消费者接收到了消息,就视为自动确认了消息

​ 比如上面消息的获取方式中,介绍的消息推送Consume、消息拉取Get都是采用的自动确认方式,如下:

​ 上述也注重说明了一旦消费者接收到了消息,就视为自动确认了消息。这里和ActiveMQ有些不同,所以需要进行区分,在ActiveMQ消息的可靠性中采用自动确认且是异步的情况下,其默认是在业务逻辑处理完成后进行自动确认,即在消息监听器最后异步进行确认,如下:

​ 但是在RabbitMQ中声明消费者处理业务逻辑之前自动确认就已经完成了(推送Consume、消息拉取Ge都是如此),可由下面代码进行测试,如下:

​ 上述代码我们采用了自动确认,并且在业务处理的最后手动抛出异常,然后我们进行测试,发现重启消费者后不会再收到消息,说明该消息已经被消费者消费掉并确认了,RabbitMQ已经将其移除了。

手动确认

​ 从上述的介绍,我们发现在自动确认中间,如果消费者在处理消息的过程中,出了错,就没有什么办法重新处理这条消息,所以我们很多时候,需要在消息处理成功后,再确认消息,这就需要手动确认。

​ 当autoAck=false时,RabbitMQ会等待消费者显式发回ack信号后才从内存(和磁盘,如果是持久化消息的话)中移除消息。

​ 采用消息确认机制后,只要令autoAck=false,消费者就有足够的时间处理消息(任务),不用担心处理消息过程中消费者进程挂掉后消息丢失的问题,因为RabbitMQ会一直持有消息直到消费者显式调用channel.basicAck为止。

​ 当autoAck=false时,对于RabbitMQ服务器端而言,如果服务器端一直没有收到消费者的ack信号,并且消费此消息的消费者已经断开连接,则服务器端会安排该消息重新进入队列,等待投递给下一个消费者(也可能还是原来的那个消费者)。

单个确认

​ 这里我们启动了手动确认后,就必须调用channel.basicAck方法进行确认,否则的话RabbitMQ会一直进行等待,当我们这个消费者关闭后,RabbitMQ会将该条消息再发给对应的消费者进行消费,直到有消费者对该条消息进行消费并应答完成。

​ 看到上述对消息的处理后,我们还发现一个问题,如果channel.basicAck收到确认前的代码有问题,会抛出异常无法进行手动确认怎么办,一般消费者也不会连接中断,那么该消息就一直无法被处理,连被其他消费者处理的机会都没有,所以一般我们会进行try-catch处理,处理成功则手动确认,失败或有异常则拒绝。

​ 最后我们再看看上述手动确认的basicAck方法中的两个参数,如下:

​ 该参数我们在[RabbitMQ消息发布之发送方确认]的异步监听发送方确认模式中也介绍过,第一个参数deliveryTag为消息的ID,第二个参数multiple则为是否批量确认,这里我们都是逐条消息读取并确认的,所以都为false。

批量确认

​ 如果是批量确认的话,这里第一个参数deliveryTag为最后一条消息的ID,第二个参数为true,其批量确认代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
public class BatchAckConsumer {
public final static String EXCHANGE_NAME = "direct_exchange";//direct交换器名称
public final static String DIRECT_QUEUE = "direct_queue";
private static AtomicInteger messageCount = new AtomicInteger(0);

private static final Executor executor = Executors.newFixedThreadPool(10);


public static void main(String[] args) throws IOException, TimeoutException {
//创建连接工厂,连接RabbitMQ
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.64.132");//端口号、用户名、密码可以使用默认的
connectionFactory.setUsername("root");
connectionFactory.setPassword("root");
connectionFactory.setPort(5672);
//创建连接
Connection connection = connectionFactory.newConnection();
executor.execute(() -> {
try {
receiveMessage(connection);
} catch (IOException e) {
e.printStackTrace();
}
});

}

public static void receiveMessage(Connection connection) throws IOException {
//创建信道
Channel channel = connection.createChannel();
//在信道中设置主交换器
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
//声明队列
channel.queueDeclare(DIRECT_QUEUE, false, false, false, null);
String[] keys = new String[]{"key1"};
for (String key : keys) {
channel.queueBind(DIRECT_QUEUE, EXCHANGE_NAME, key);
}
System.out.println("主交换器等待 message.....");
com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "utf-8");
System.out.println("主交换器 Received:" + envelope.getRoutingKey() + "========" + message + ",ThreadId:" + Thread.currentThread().getId());
messageCount.incrementAndGet();
if (messageCount.get() % 100 == 0) {
channel.basicAck(envelope.getDeliveryTag(), true);
} else if (message.equals("end")) {
this.getChannel().basicAck(envelope.getDeliveryTag(), true);
}
}
};
//消费者在指定的对队列上消费
channel.basicConsume(DIRECT_QUEUE, false, consumer);
}
}

​ 上述采用了100条消息进批量确认,只有上述代码的最后判断的if (message.equals("end")),是因为消息的条数很少是我们批量确认数量的整数倍,所以这里要求消息的生产者在所有消息发送完成后,需要额外的发送一条消息内容为end的消息,进行告诉消费者所有消息已发送完成,进行最后剩余消息的批量确认。

注意: 如果采用了上述的批量确认的方式,如果消费者应用程序在确认消息之前崩溃,则所有未确认的消息(包括已成功处理,未被确认)将被重新发送给其他消费者。所以这里存在着一定程度上的可靠性风险。

QOS预取模式

​ RabbitMQ提供了一种QOS(Quality of Service ,服务质量保证)功能,即在非自动确认消息的前提下,如果一定数目的消息(通过基于consume或者channel设置QOS的值)未被确认前,不进行消费新的消息

​ 在RabbitMQ消息的获取方式中,无论是推送Consume、拉取Get哪一种方式,只要不进行消息确认,那么RabbitMQ会认为该消息未被成功处理,则不会将其移除,一般情况下我们让消费者处理一批指定数量的消息,然后再进行批量确认,其用法在[RabbitMQ消息应答]的最后手动批量确认中,已有相关介绍。

​ 这里假设我们忘记了进行批量确认,或者批量确认出现了问题,那么消费者一直在消费消息且不给予消息确认,那么服务最后就会出现问题,这里就可以利用RabbitMQ的QOS预取模式,来对消费者进行流控,从而限制未ack的消息数量。

注意: 消费确认模式必须是非自动确认ACK机制(这个是使用RabbitMQ的QOS预取模式的前提条件,否则会QOS不生效)

使用

​ 其使用的其实比较的简单,首先需要设置QOS的值;另外,还可以基于消费者Consume和信道Channel的粒度进行设置(global),如下:

参数解释

其中basicOos()有多个参数,其参数含义如下:

  • prefetchSize: 最多传输的内容的大小的限制,0为不限制,但据说prefetchSize参数,RabbitMQ暂未对其没有实现。
  • prefetchCount: 会告诉RabbitMQ不要同时给一个消费者推送多于N个消息,即一旦有N个消息还没有ack,则该消费者Consumer将阻塞block掉,直到有消息进行ack确认
  • global: true/false,表示是否将上面设置应用于channel,简单点说,就是上面限制是信道channel级别的还是消费者consumer级别。

测试

提供者发送消息,RabbitMQ有接收有4条消息未消费,然后再消息的消费者端,我们进行消息的消费,这里采用手动确认ack的机制,但是我们不进行确认,进行测试

控制台打印信息

1
2
3
4
5
主交换器等待 message.....
主交换器 Received:key1========hello 发送rabitmq消息0,ThreadId:17
主交换器 Received:key1========hello 发送rabitmq消息3,ThreadId:17
主交换器 Received:key1========hello 发送rabitmq消息6,ThreadId:17
主交换器 Received:key1========hello 发送rabitmq消息9,ThreadId:17

​ 上述这三条消息被消费者消费了,但是并未确认,所以我们关闭该消费者,再次启动该队列的消费者还是可以收到三条消息的。

​ 这里我们然后再使用RabbitMQ的QOS预取模式,对其进行流控,从而限制未ack的消息数量。如我们限制一个信道最多只能有2个未确认的消息,如下:

控制台打印信息

1
2
3
主交换器等待 message.....
主交换器 Received:key1========hello 发送rabitmq消息0,ThreadId:17
主交换器 Received:key1========hello 发送rabitmq消息3,ThreadId:17

​ 另外对于basicOos()的参数global,上述进行了介绍,true表示限制信道,false表示限制单独的消费者,但是如果我们同时进行设置的话,如下:

1
2
3
//开启QOS
channel.basicQos(100, true);
channel.basicQos(50, false);

​ RabbitMQ将此解释为意味着两个预取限制应该彼此独立地强制执行,即两个条件都需要进行满足

​ 之前我们介绍了一个连接可以有多个信道,其实我们一个信道中肯定也是可以有多个消费者的,我们只需要在其中声明多个消费者即可:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
private static DefaultConsumer getConsumer(Channel channel) {
return new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "utf-8");
System.out.println("主交换器 Received:" + envelope.getRoutingKey() + "========" + message + ",ThreadId:"
+ Thread.currentThread().getId());
//手动确认
// channel.basicAck(envelope.getDeliveryTag(),false);
}
};
}

public static void receiveMessage(Connection connection) throws IOException {
//创建信道
Channel channel = connection.createChannel();
//在信道中设置主交换器
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
//开启QOS
channel.basicQos(100, true);
channel.basicQos(50, false);
//声明队列
channel.queueDeclare(DIRECT_QUEUE, false, false, false, null);
String[] keys = new String[]{"key1"};
for (String key : keys) {
channel.queueBind(DIRECT_QUEUE, EXCHANGE_NAME, key);
}
System.out.println("主交换器等待 message.....");

//消费者在指定的对队列上消费
channel.basicConsume(DIRECT_QUEUE, false, getConsumer(channel));
channel.basicConsume(DIRECT_QUEUE, false, getConsumer(channel));
}

​ 这样的话,一个信道中就存在两个消费者了,其也是轮询进行消费消息的,那么上述的设置的效果为:每个消费者最多可以有50个未确认的消费,但是该信道所有的消费者加起来最多只能有100个未确认的消息。

消费者事务

​ 在[RabbitMQ消息发布之事务]中,我们介绍了消息生产者在发布消息时,可以使用其事务操作,其实在RabbitMQ中,消费者在消费者事务时,也是可以使用事务的,其使用方法和生产者一致。

​ 如下,在手动确认模式中,当我们开启了事务,即使我们进行了手动确认消息,但是如果事务不进行提交,或者事务进行回滚了,那么上述的消息还是被RabbitMQ视为未成功消费的,当该消费者断开时,其余该队列的消费者仍然可以继续消费该消息,所以必须进行commit提交

​ 如上,开启事务后后,即使手动确认了消息,也是必须进行事务提交的,那么如果是自动确认呢?其事务还会起作用么?

​ 这里如果是自动确认,即autoAck=true是不支持事务的,也就是说你即使在收到消息之后在回滚事务也是于事无补的,队列已经把消息移除了。

消息的拒绝

Reject

​ 在[RabbitMQ消息应答]中, 我们在采用手动确认时,为了避免因为异常导致消息无法被处理,就曾使用过channel.basicRject()方法,如下:

我们可以看看该方法,其中主要有两个参数:

basicReject在拒绝消息时,需要指定上述两个参数,第一个deliveryTag,即消息的ID;第二个requeue,表示该消息是否需要重新发送给消费者。

​ 当deliveryTag标识设置为true时,消息被拒绝后,会一直进行重发,直到消息能够被正常的消费并确认,如果deliveryTag标识设置为false时,那么消息被拒绝后不会再进行重发。

Nack

而Nack则是RabbitMQ对AMQP规范的一个扩展,其可以一次性拒绝多个消息。

其中多了一个参数multiple,其表示是是否为批量拒绝,这个参数之前在批量确认中也是有的。

死信交换器DLX

​ 在[RabbiteMQ消息的拒绝]中,我们是可以利用Reject和Nack去拒绝消息的,消息在被拒绝的时候是可以指定RabbitMQ是否需要重新发送给消费者。

​ 如何我们消息被拒绝,且不重新投递的情况下,那么这条消息就无法被处理了么?在RabbitMQ中也有类似的处理——死信交换器。

​ 死信交换器不是默认的设置,这里是被投递消息被拒绝后的一个可选行为,是在创建队列的时进行声明的,往往用在对问题消息的诊断上。

消息进入死信交换器一般是以下几种情况:

  • 消息被拒绝,并且设置requeue参数为 false
  • 消息过期
  • 队列达到最大长度

​ 死信交换器仍然只是一个普通的交换器,创建时并没有特别要求和操作。在创建队列的时候,声明该交换器将用作保存被拒绝的消息即可,相关的参数是x-dead-letter-exchange

和备用交换器的区别

​ 死信交换器的设置其实和[RabbitMQ消息发布之备用交换器]比较类似,不过备用交换器是消息生产者在发布消息时无法路由消息,消息将被路由到备用交换器,所以备用交换器配置在消息的生产者。而死信交换器则是接收过期或者被拒绝的消息,所以肯定是配置在消息的消费者。

​ 场景分析:备用交换器一般是用于生产者生产消息时,确保消息可以尽量进入 RabbitMQ,而死信交换器主要是用于消费者消费消息的万不一失性的场景(比如消息过期,队列满了,消息拒绝且不重新投递)

消息的发布者配置

首先我们先看看消息的发布者,这里和之前介绍的用法一样,就是发布了key1、key2、key3三条消息,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41

public class Producer {
public final static String EXCHANGE_NAME = "topic_exchange";//topic交换器名称
public final static Integer SEND_NUM = 10;//发送消息次数

public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
//创建连接工厂,连接RabbitMQ
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.64.133");//端口号、用户名、密码可以使用默认的
connectionFactory.setUsername("root");
connectionFactory.setPassword("root");
connectionFactory.setPort(5672);
//创建连接
Connection connection = connectionFactory.newConnection();
//创建信道
Channel channel = connection.createChannel();
//在信道中设置主交换器
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);


//交换器和队列绑定放到消费者进行
//自定义路由键
String[] keys = new String[]{"key1", "key2", "key3"};
//发送消息

for (int i = 0; i < SEND_NUM; i++) {
String key = keys[i % keys.length];
String message = "hello 发送rabitmq消息" + i;
//消息进行发送 并添加mandatory为true
channel.basicPublish(EXCHANGE_NAME, key, null, message.getBytes("UTF-8"));

System.out.println("sendMessage:" + key + "===" + message);
}


//关闭信道
channel.close();
//关闭连接
connection.close();
}
}

消息接收者配置

接收全部数据

​ 然后我们给生产者生成的三条消息,并对消费者配置死信交换器,消费者正常ack确认key1,key2 但是对于key3 进行拒接并不重新投递,正常情况下key3 的数据将会被死信交换器接收到,这个时候需要配死信交换器了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
public class Consumer {
public final static String EXCHANGE_NAME = "topic_exchange";//topic交换器名称
public final static String DLX_EXCHANGE_NAME = "dlx_topict_exchange"; //死信交换器
private static final Executor executor = Executors.newFixedThreadPool(10);


public static void main(String[] args) throws IOException, TimeoutException {
//创建连接工厂,连接RabbitMQ
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.64.133");//端口号、用户名、密码可以使用默认的
connectionFactory.setUsername("root");
connectionFactory.setPassword("root");
connectionFactory.setPort(5672);
//创建连接
Connection connection = connectionFactory.newConnection();
executor.execute(() -> {
try {
receiveMessage(connection);
} catch (IOException e) {
e.printStackTrace();
}
});

}

public static void receiveMessage(Connection connection) throws IOException {
//创建信道
Channel channel = connection.createChannel();
//在信道中设置主交换器
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
//创建死信交换器
Map<String, Object> dlxMap = new HashMap<>();
dlxMap.put("x-dead-letter-exchange", DLX_EXCHANGE_NAME);
String queueName = "all_key_queue_name";
//声明队列
channel.queueDeclare(queueName, false, false, false, dlxMap);
//将队列和交换器通过路由键进行绑定
channel.queueBind(queueName, EXCHANGE_NAME, "#");

System.out.println("主交换器等待 message.....");
com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "utf-8");
System.out.println("主交换器 Received:" + envelope.getRoutingKey() + "========" + message + ",ThreadId:"
+ Thread.currentThread().getId());
if ("key3".equals(envelope.getRoutingKey())) {
// key3 的数据 拒绝并且不重新投递
channel.basicReject(envelope.getDeliveryTag(), false);
} else {
//其他的Key ack确认
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
//消费者在指定的对队列上消费
channel.basicConsume(queueName, false, consumer);
}
}

接收key2数据

​ 另外一个消费者,我们只接收其key2消息,并且也是拒绝并不重新投递,不同的是我们将其送到死信交换器的时候,将其路由键进行重新定义了(死信交换器重新定义路由键,可以将其不同的消息,发送到同一个队列之类,可以进行统一处理失败的消息),代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
public class Key2Consumer {
public final static String EXCHANGE_NAME = "topic_exchange";//topic交换器名称
public final static String DLX_EXCHANGE_NAME = "dlx_topict_exchange"; //死信交换器
private static final Executor executor = Executors.newFixedThreadPool(10);


public static void main(String[] args) throws IOException, TimeoutException {
//创建连接工厂,连接RabbitMQ
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.64.133");//端口号、用户名、密码可以使用默认的
connectionFactory.setUsername("root");
connectionFactory.setPassword("root");
connectionFactory.setPort(5672);
//创建连接
Connection connection = connectionFactory.newConnection();
executor.execute(() -> {
try {
receiveMessage(connection);
} catch (IOException e) {
e.printStackTrace();
}
});

}

public static void receiveMessage(Connection connection) throws IOException {
//创建信道
Channel channel = connection.createChannel();
//在信道中设置主交换器
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
//创建死信交换器
Map<String, Object> dlxMap = new HashMap<>();
dlxMap.put("x-dead-letter-exchange", DLX_EXCHANGE_NAME);
//死信路由键,会替换消息原来的路由键
dlxMap.put("x-dead-letter-routing-key", "dead_key2");
String queueName = "key2_queue_name";
//声明队列
channel.queueDeclare(queueName, false, false, false, dlxMap);
//将队列和交换器通过路由键进行绑定
channel.queueBind(queueName, EXCHANGE_NAME, "key2");

System.out.println("主交换器等待 message.....");
com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "utf-8");
System.out.println("主交换器 Received:" + envelope.getRoutingKey() + "========" + message + ",ThreadId:"
+ Thread.currentThread().getId());
// key2 的数据 拒绝并且不重新投递
channel.basicReject(envelope.getDeliveryTag(), false);
}
};
//消费者在指定的对队列上消费
channel.basicConsume(queueName, false, consumer);
}
}

死信消息接收

​ 上述两个消费者使用的是同一个死信交换器——dlx,两个消费者失败的消息都会进入同一个死信交换器中。当然我们也可以为每个消费者都定义一个单独的死信交换器,这样多个消费者失败的消息就会进入不同的死信交换器,我们在处理的时候就需要对多个死信交换器进行处理,这里就需要看其业务的要求的。

全部死信消息接收

​ 另外上述我们发现我们还没有创建死信交换器(当然我们也是可以在两个消费者中进行创建的,只要保证其参数相同),这里我们就是其死信交换器的消费者中进行创建的,这里同样我们为死信交换器也创建了两个消费者,一个可以消费其中的所有消息,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
public class DlxConsumer {

public final static String DLX_EXCHANGE_NAME = "dlx_topict_exchange"; //死信交换器
private static final Executor executor = Executors.newFixedThreadPool(10);


public static void main(String[] args) throws IOException, TimeoutException {
//创建连接工厂,连接RabbitMQ
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.64.133");//端口号、用户名、密码可以使用默认的
connectionFactory.setUsername("root");
connectionFactory.setPassword("root");
connectionFactory.setPort(5672);
//创建连接
Connection connection = connectionFactory.newConnection();
executor.execute(() -> {
try {
receiveMessage(connection);
} catch (IOException e) {
e.printStackTrace();
}
});

}

public static void receiveMessage(Connection connection) throws IOException {
//创建信道
Channel channel = connection.createChannel();
//在信道中设置主交换器
channel.exchangeDeclare(DLX_EXCHANGE_NAME, BuiltinExchangeType.TOPIC);

//获取临时队列
String queueName = channel.queueDeclare().getQueue();

//将队列和交换器通过路由键进行绑定
channel.queueBind(queueName, DLX_EXCHANGE_NAME, "#");

System.out.println("死信换器等待 message.....");
com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "utf-8");
System.out.println("死信交换器 Received:" + envelope.getRoutingKey() + "========" + message + ",ThreadId:"
+ Thread.currentThread().getId());
//拒绝提交
channel.basicReject(envelope.getDeliveryTag(), false);

}
};
//消费者在指定的对队列上消费
channel.basicConsume(queueName, false, consumer);
}
}
key2 死信数据接收

​ 另一个死信交换器消费者只用来处理那个被我们修改了路由键的key2消息,上述中我们将其路由键有key2修改为了dead_key2,其代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
public class Key2DlxConsumer {

public final static String DLX_EXCHANGE_NAME = "dlx_topict_exchange"; //死信交换器
private static final Executor executor = Executors.newFixedThreadPool(10);


public static void main(String[] args) throws IOException, TimeoutException {
//创建连接工厂,连接RabbitMQ
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.64.133");//端口号、用户名、密码可以使用默认的
connectionFactory.setUsername("root");
connectionFactory.setPassword("root");
connectionFactory.setPort(5672);
//创建连接
Connection connection = connectionFactory.newConnection();
executor.execute(() -> {
try {
receiveMessage(connection);
} catch (IOException e) {
e.printStackTrace();
}
});

}

public static void receiveMessage(Connection connection) throws IOException {
//创建信道
Channel channel = connection.createChannel();
//在信道中设置主交换器
channel.exchangeDeclare(DLX_EXCHANGE_NAME, BuiltinExchangeType.TOPIC);

//获取临时队列
String queueName = channel.queueDeclare().getQueue();

//将队列和交换器通过路由键进行绑定
channel.queueBind(queueName, DLX_EXCHANGE_NAME, "dead_key2");

System.out.println("死信换器key2等待 message.....");
com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "utf-8");
System.out.println("死信交换器key2 Received:" + envelope.getRoutingKey() + "========" + message + ",ThreadId:"
+ Thread.currentThread().getId());
//拒绝提交
channel.basicReject(envelope.getDeliveryTag(), false);
}
};
//消费者在指定的对队列上消费
channel.basicConsume(queueName, false, consumer);
}
}

测试

生产者生产数据

生产者生成了10 条数据 对 key1,key2,key3

1
2
3
4
5
6
7
8
9
10
sendMessage:key1===hello 发送rabitmq消息0
sendMessage:key2===hello 发送rabitmq消息1
sendMessage:key3===hello 发送rabitmq消息2
sendMessage:key1===hello 发送rabitmq消息3
sendMessage:key2===hello 发送rabitmq消息4
sendMessage:key3===hello 发送rabitmq消息5
sendMessage:key1===hello 发送rabitmq消息6
sendMessage:key2===hello 发送rabitmq消息7
sendMessage:key3===hello 发送rabitmq消息8
sendMessage:key1===hello 发送rabitmq消息9
Consumer接收全部数据

Consumer 消费者接收全部数据 并且 ack确认key1,key2,key3拒绝并且不进行重新投递,key3 的数据将会发送到死信队列

1
2
3
4
5
6
7
8
9
10
11
主交换器等待 message.....
主交换器 Received:key1========hello 发送rabitmq消息0,ThreadId:17
主交换器 Received:key2========hello 发送rabitmq消息1,ThreadId:18
主交换器 Received:key3========hello 发送rabitmq消息2,ThreadId:18
主交换器 Received:key1========hello 发送rabitmq消息3,ThreadId:18
主交换器 Received:key2========hello 发送rabitmq消息4,ThreadId:18
主交换器 Received:key3========hello 发送rabitmq消息5,ThreadId:18
主交换器 Received:key1========hello 发送rabitmq消息6,ThreadId:18
主交换器 Received:key2========hello 发送rabitmq消息7,ThreadId:18
主交换器 Received:key3========hello 发送rabitmq消息8,ThreadId:18
主交换器 Received:key1========hello 发送rabitmq消息9,ThreadId:18
Key2Consumer 接收key2数据

Key2Consumer 用另一个队列接收路由键key2 的数据并且拒绝并且不进行重新投递,但是key2 设置死信队列,并且将路由键改为了dead_key2

1
2
3
4
主交换器等待 message.....
主交换器 Received:key2========hello 发送rabitmq消息1,ThreadId:17
主交换器 Received:key2========hello 发送rabitmq消息4,ThreadId:17
主交换器 Received:key2========hello 发送rabitmq消息7,ThreadId:17
DlxConsumer 死信队列消费

因为DlxConsumer接收所有的死信队列数据,所以Consumer和Key2Consumer 拒绝的数据DlxConsumer 都会接收到,所以应该会接收到 Consumer拒绝的 key3 路由键的数据以及Key2Consumer 拒绝的key2 数据,但是Key2Consumer 会将路由键改为dead_key2

1
2
3
4
5
6
7
死信换器等待 message.....
死信交换器 Received:key3========hello 发送rabitmq消息2,ThreadId:17
死信交换器 Received:key3========hello 发送rabitmq消息5,ThreadId:18
死信交换器 Received:key3========hello 发送rabitmq消息8,ThreadId:18
死信交换器 Received:dead_key2========hello 发送rabitmq消息1,ThreadId:18
死信交换器 Received:dead_key2========hello 发送rabitmq消息4,ThreadId:18
死信交换器 Received:dead_key2========hello 发送rabitmq消息7,ThreadId:18
Key2DlxConsumer 死信队列消费

Key2DlxConsumer 只会接收路由键为dead_key2的死信数据

1
2
3
4
死信换器key2等待 message.....
死信交换器key2 Received:dead_key2========hello 发送rabitmq消息1,ThreadId:17
死信交换器key2 Received:dead_key2========hello 发送rabitmq消息4,ThreadId:18
死信交换器key2 Received:dead_key2========hello 发送rabitmq消息7,ThreadId:18

评论