抱歉,您的浏览器无法访问本站
本页面需要浏览器支持(启用)JavaScript
了解详情 >

RabbitMQ-原生编程

添加RabbitMQ依赖

1
2
3
4
5
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.7.3</version>
</dependency>

com.rabbitmq:amqp-client使用slf4j日志框架,需要添加一个具体的日志实现,如下所示。(当然也可以使用其他的日志实现,如 log4j等)

1
2
3
4
5
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.26</version>
</dependency>

direct交换器

绑定单个KEY

生产者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Producer {
public final static String EXCHANGE_NAME = "direct_exchange";//direct交换器名称
public final static Integer SEND_NUM = 10;//发送消息次数

public static void main(String[] args) throws IOException, TimeoutException {
//创建连接工厂,连接RabbitMQ
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.64.131");//端口号、用户名、密码可以使用默认的
connectionFactory.setUsername("root");
connectionFactory.setPassword("root");
connectionFactory.setPort(5672);
//创建连接
Connection connection = connectionFactory.newConnection();
//创建信道
Channel channel = connection.createChannel();
//在信道中设置交换器
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
//交换器和队列绑定放到消费者进行
//自定义路由键
String[] keys = new String[]{"key1", "key2", "key3"};
//发送消息

for (int i = 0; i < SEND_NUM; i++) {
String key = keys[i % keys.length];
String message = "hello 发送rabitmq消息" + i;
//消息进行发送
channel.basicPublish(EXCHANGE_NAME, key, null, message.getBytes("UTF-8"));
System.out.println("sendMessage:" + key + "===" + message);
}
//关闭信道
channel.close();
//关闭连接
connection.close();
}
}
消费者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer {
public final static String EXCHANGE_NAME = "direct_exchange";//direct交换器名称
public final static String DIRECT_QUEUE = "direct_queue";

public static void main(String[] args) throws IOException, TimeoutException {
//创建连接工厂,连接RabbitMQ
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.64.131");//端口号、用户名、密码可以使用默认的
connectionFactory.setUsername("root");
connectionFactory.setPassword("root");
connectionFactory.setPort(5672);
//创建连接
Connection connection = connectionFactory.newConnection();
//创建信道
Channel channel = connection.createChannel();
//在信道中设置交换器
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
//声明队列
channel.queueDeclare(DIRECT_QUEUE, false, false, false, null);
//交换器和队列绑定
channel.queueBind(DIRECT_QUEUE, EXCHANGE_NAME, "key1");
System.out.println("等待 message.....");
//声明消费者
com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "utf-8");
System.out.println("Received:" + envelope.getRoutingKey() + "========" + message);

}
};
//消费者在指定的对队列上消费
channel.basicConsume(DIRECT_QUEUE, true, consumer);
}
}
运行结果

绑定单个KEY 我们发现消费者直接受到绑定KEY的数据,未绑定KEY的数据被丢弃

生产者
1
2
3
4
5
6
7
8
9
10
sendMessage:key1===hello 发送rabitmq消息0
sendMessage:key2===hello 发送rabitmq消息1
sendMessage:key3===hello 发送rabitmq消息2
sendMessage:key1===hello 发送rabitmq消息3
sendMessage:key2===hello 发送rabitmq消息4
sendMessage:key3===hello 发送rabitmq消息5
sendMessage:key1===hello 发送rabitmq消息6
sendMessage:key2===hello 发送rabitmq消息7
sendMessage:key3===hello 发送rabitmq消息8
sendMessage:key1===hello 发送rabitmq消息9
消费者
1
2
3
4
5
等待 message.....
Received:key1========hello 发送rabitmq消息0
Received:key1========hello 发送rabitmq消息3
Received:key1========hello 发送rabitmq消息6
Received:key1========hello 发送rabitmq消息9

绑定多个KEY

生产者

同上

消费者

只需要修改消费者交换器绑定的部分

1
2
3
4
5
6
7
8
....
//交换器和队列绑定
// channel.queueBind(DIRECT_QUEUE, EXCHANGE_NAME, "key1");
String[] keys = new String[]{"key1", "key2", "key3"};
for (String key : keys) {
channel.queueBind(DIRECT_QUEUE, EXCHANGE_NAME, key);
}
....
运行结果

监听多个KEY 我们发现能够接收到所有的数据了

生产者
1
2
3
4
5
6
7
8
9
10
sendMessage:key1===hello 发送rabitmq消息0
sendMessage:key2===hello 发送rabitmq消息1
sendMessage:key3===hello 发送rabitmq消息2
sendMessage:key1===hello 发送rabitmq消息3
sendMessage:key2===hello 发送rabitmq消息4
sendMessage:key3===hello 发送rabitmq消息5
sendMessage:key1===hello 发送rabitmq消息6
sendMessage:key2===hello 发送rabitmq消息7
sendMessage:key3===hello 发送rabitmq消息8
sendMessage:key1===hello 发送rabitmq消息9
消费者
1
2
3
4
5
6
7
8
9
10
11
等待 message.....
Received:key1========hello 发送rabitmq消息0
Received:key2========hello 发送rabitmq消息1
Received:key3========hello 发送rabitmq消息2
Received:key1========hello 发送rabitmq消息3
Received:key2========hello 发送rabitmq消息4
Received:key3========hello 发送rabitmq消息5
Received:key1========hello 发送rabitmq消息6
Received:key2========hello 发送rabitmq消息7
Received:key3========hello 发送rabitmq消息8
Received:key1========hello 发送rabitmq消息9

多个信道发送消息

生产者

生产者采用多线程的方式模拟多信道发送消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeoutException;

public class Producer {
public final static String EXCHANGE_NAME = "direct_exchange";//direct交换器名称
public final static Integer SEND_NUM = 10;//发送消息次数
//创建线程池
private static final Executor executor = Executors.newFixedThreadPool(10);

public static void main(String[] args) throws IOException, TimeoutException {
//创建连接工厂,连接RabbitMQ
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.64.131");//端口号、用户名、密码可以使用默认的
connectionFactory.setUsername("root");
connectionFactory.setPassword("root");
connectionFactory.setPort(5672);
//创建连接
Connection connection = connectionFactory.newConnection();
//多个线程发送消息
for (int i = 0; i < 3; i++) {
//异步线程发送消息
executor.execute(() -> {
try {
sendMessage(connection);
} catch (Exception e) {
e.printStackTrace();
}
});

}

//关闭连接
// connection.close();
}

//发送消息
public static void sendMessage(Connection connection) throws IOException, TimeoutException {
//创建信道
Channel channel = connection.createChannel();
//在信道中设置交换器
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
//交换器和队列绑定放到消费者进行
//自定义路由键
String[] keys = new String[]{"key1", "key2", "key3"};
//发送消息

for (int i = 0; i < SEND_NUM; i++) {
String key = keys[i % keys.length];
String message = "hello 发送rabitmq消息" + i;
//消息进行发送
channel.basicPublish(EXCHANGE_NAME, key, null, message.getBytes("UTF-8"));
System.out.println("sendMessage:" + key + "===" + message + ",线程号:" + Thread.currentThread().getId());
}
//关闭信道
channel.close();
}
}
消费者

同上

运行结果
生产者

我们发现多个信道发送,将数据发送了三次

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
sendMessage:key1===hello 发送rabitmq消息0,线程号:16
sendMessage:key1===hello 发送rabitmq消息0,线程号:15
sendMessage:key1===hello 发送rabitmq消息0,线程号:14
sendMessage:key2===hello 发送rabitmq消息1,线程号:16
sendMessage:key2===hello 发送rabitmq消息1,线程号:15
sendMessage:key2===hello 发送rabitmq消息1,线程号:14
sendMessage:key3===hello 发送rabitmq消息2,线程号:16
sendMessage:key3===hello 发送rabitmq消息2,线程号:15
sendMessage:key3===hello 发送rabitmq消息2,线程号:14
sendMessage:key1===hello 发送rabitmq消息3,线程号:16
sendMessage:key1===hello 发送rabitmq消息3,线程号:15
sendMessage:key1===hello 发送rabitmq消息3,线程号:14
sendMessage:key2===hello 发送rabitmq消息4,线程号:16
sendMessage:key2===hello 发送rabitmq消息4,线程号:15
sendMessage:key2===hello 发送rabitmq消息4,线程号:14
sendMessage:key3===hello 发送rabitmq消息5,线程号:16
sendMessage:key3===hello 发送rabitmq消息5,线程号:15
sendMessage:key3===hello 发送rabitmq消息5,线程号:14
sendMessage:key1===hello 发送rabitmq消息6,线程号:16
sendMessage:key1===hello 发送rabitmq消息6,线程号:15
sendMessage:key1===hello 发送rabitmq消息6,线程号:14
sendMessage:key2===hello 发送rabitmq消息7,线程号:16
sendMessage:key2===hello 发送rabitmq消息7,线程号:15
sendMessage:key2===hello 发送rabitmq消息7,线程号:14
sendMessage:key3===hello 发送rabitmq消息8,线程号:16
sendMessage:key3===hello 发送rabitmq消息8,线程号:15
sendMessage:key3===hello 发送rabitmq消息8,线程号:14
sendMessage:key1===hello 发送rabitmq消息9,线程号:16
sendMessage:key1===hello 发送rabitmq消息9,线程号:15
sendMessage:key1===hello 发送rabitmq消息9,线程号:14
消费者

消费者也消费了三次

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
等待 message.....
Received:key1========hello 发送rabitmq消息0
Received:key2========hello 发送rabitmq消息1
Received:key3========hello 发送rabitmq消息2
Received:key1========hello 发送rabitmq消息3
Received:key2========hello 发送rabitmq消息4
Received:key3========hello 发送rabitmq消息5
Received:key1========hello 发送rabitmq消息6
Received:key2========hello 发送rabitmq消息7
Received:key3========hello 发送rabitmq消息8
Received:key1========hello 发送rabitmq消息9
Received:key1========hello 发送rabitmq消息0
Received:key2========hello 发送rabitmq消息1
Received:key3========hello 发送rabitmq消息2
Received:key1========hello 发送rabitmq消息3
Received:key2========hello 发送rabitmq消息4
Received:key3========hello 发送rabitmq消息5
Received:key1========hello 发送rabitmq消息6
Received:key2========hello 发送rabitmq消息7
Received:key3========hello 发送rabitmq消息8
Received:key1========hello 发送rabitmq消息9
Received:key1========hello 发送rabitmq消息0
Received:key2========hello 发送rabitmq消息1
Received:key3========hello 发送rabitmq消息2
Received:key1========hello 发送rabitmq消息3
Received:key2========hello 发送rabitmq消息4
Received:key3========hello 发送rabitmq消息5
Received:key1========hello 发送rabitmq消息6
Received:key2========hello 发送rabitmq消息7
Received:key3========hello 发送rabitmq消息8
Received:key1========hello 发送rabitmq消息9

多个信道接收消息

生产者

采用单个信道发送消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer {
public final static String EXCHANGE_NAME = "direct_exchange";//direct交换器名称
public final static Integer SEND_NUM = 10;//发送消息次数

public static void main(String[] args) throws IOException, TimeoutException {
//创建连接工厂,连接RabbitMQ
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.64.131");//端口号、用户名、密码可以使用默认的
connectionFactory.setUsername("root");
connectionFactory.setPassword("root");
connectionFactory.setPort(5672);
//创建连接
Connection connection = connectionFactory.newConnection();
//创建信道
Channel channel = connection.createChannel();
//在信道中设置交换器
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
//交换器和队列绑定放到消费者进行
//自定义路由键
String[] keys = new String[]{"key1", "key2", "key3"};
//发送消息

for (int i = 0; i < SEND_NUM; i++) {
String key = keys[i % keys.length];
String message = "hello 发送rabitmq消息" + i;
//消息进行发送
channel.basicPublish(EXCHANGE_NAME, key, null, message.getBytes("UTF-8"));
System.out.println("sendMessage:" + key + "===" + message);
}
//关闭信道
channel.close();
//关闭连接
connection.close();
}
}

消费者

使用多线程创建多个信道监听消息发送,我们这里只监听一个key来查看消息接收情况

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeoutException;

public class Consumer {
public final static String EXCHANGE_NAME = "direct_exchange";//direct交换器名称
public final static String DIRECT_QUEUE = "direct_queue";

private static final Executor executor = Executors.newFixedThreadPool(10);


public static void main(String[] args) throws IOException, TimeoutException {
//创建连接工厂,连接RabbitMQ
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.64.131");//端口号、用户名、密码可以使用默认的
connectionFactory.setUsername("root");
connectionFactory.setPassword("root");
connectionFactory.setPort(5672);
//创建连接
Connection connection = connectionFactory.newConnection();
for (int i = 0; i < 3; i++) {
executor.execute(() -> {
try {
receiveMessage(connection);
} catch (IOException e) {
e.printStackTrace();
}
});
}

}

public static void receiveMessage(Connection connection) throws IOException {
//创建信道
Channel channel = connection.createChannel();
//在信道中设置交换器
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
//声明队列
channel.queueDeclare(DIRECT_QUEUE, false, false, false, null);
String[] keys = new String[]{"key1"};
for (String key : keys) {
channel.queueBind(DIRECT_QUEUE, EXCHANGE_NAME, key);
}
System.out.println("等待 message.....");
com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "utf-8");
System.out.println("Received:" + envelope.getRoutingKey() + "========" + message + ",ThreadId:" + Thread.currentThread().getId());

}
};
//消费者在指定的对队列上消费
channel.basicConsume(DIRECT_QUEUE, true, consumer);
}
}
运行结果
生产者
1
2
3
4
5
6
7
8
9
10
sendMessage:key1===hello 发送rabitmq消息0
sendMessage:key2===hello 发送rabitmq消息1
sendMessage:key3===hello 发送rabitmq消息2
sendMessage:key1===hello 发送rabitmq消息3
sendMessage:key2===hello 发送rabitmq消息4
sendMessage:key3===hello 发送rabitmq消息5
sendMessage:key1===hello 发送rabitmq消息6
sendMessage:key2===hello 发送rabitmq消息7
sendMessage:key3===hello 发送rabitmq消息8
sendMessage:key1===hello 发送rabitmq消息9
消费者

因为我们值监听了key1 这个key 我们发现多个信道接收到的消息是轮询来接受的

1
2
3
4
5
6
7
等待 message.....
等待 message.....
等待 message.....
Received:key1========hello 发送rabitmq消息0,ThreadId:21
Received:key1========hello 发送rabitmq消息3,ThreadId:22
Received:key1========hello 发送rabitmq消息6,ThreadId:23
Received:key1========hello 发送rabitmq消息9,ThreadId:24

评论