抱歉,您的浏览器无法访问本站
本页面需要浏览器支持(启用)JavaScript
了解详情 >

RabbitMQ-普通集群

概述

​ 一般来说,如果只是为了学习RabbitMQ或者验证业务工程的正确性那么在本地环境或者测试环境上使用其单实例部署就可以了,但是出于MQ中间件本身的可靠性、并发性、吞吐量和消息堆积能力等问题的考虑,在生产环境上一般都会考虑使用RabbitMQ的集群方案。

​ 对于RabbitMQ这么成熟的消息队列产品来说,搭建它并不难并且也有不少童鞋写过如何搭建RabbitMQ消息队列集群的博文,但可能仍然有童鞋并不了解其背后的原理,这会导致其遇到性能问题时无法对集群进行进一步的调优。本篇主要介绍RabbitMQ集群方案的原理,如何搭建具备负载均衡能力的中小规模RabbitMQ集群,并最后给出生产环境构建一个能够具备高可用、高可靠和高吞吐量的中小规模RabbitMQ集群设计方案。

​ RabbitMQ在设计之初,就内置了一个內建集群,该集群可以允许消费者和生产者在节点崩溃的情况下继续运行;并且还可以通过添加节点线性扩展消息通信的吞吐量。

​ 但是其不能可以保证消息的可靠性,即当一个节点崩溃时,该节点上队列的消息也会消失,RabbitMQ默认不会将队列的消息复制到整个集群上。

RabbitMQ默认集群原理

​ RabbitMQ这款消息队列中间件产品本身是基于Erlang编写,Erlang语言天生具备分布式特性(通过同步Erlang集群各节点的magic cookie来实现)。因此,RabbitMQ天然支持Clustering。这使得RabbitMQ本身不需要像ActiveMQ、Kafka那样通过ZooKeeper分别来实现HA方案和保存集群的元数据。集群是保证可靠性的一种方式,同时可以通过水平扩展以达到增加消息吞吐量能力的目的。 下面先来看下RabbitMQ集群的整体方案:

​ 上面图中采用三个节点组成了一个RabbitMQ的集群,Exchange A(交换器,对于RabbitMQ基础概念不太明白的童鞋可以看下基础概念)的元数据信息在所有节点上是一致的,而Queue(存放消息的队列)的完整数据则只会存在于它所创建的那个节点上。,其他节点只知道这个queue的metadata信息和一个指向queue的owner node的指针。

RabbitMQ集群元数据的同步

RabbitMQ集群会始终同步四种类型的内部元数据(类似索引):

  • 队列元数据:队列名称和它的属性;
  • 交换器元数据:交换器名称、类型和属性;
  • 绑定元数据:一张简单的表格展示了如何将消息路由到队列;
  • vhost元数据:为vhost内的队列、交换器和绑定提供命名空间和安全属性;

因此,当用户访问其中任何一个RabbitMQ节点时,通过rabbitmqctl查询到的queue/user/exchange/vhost等信息都是相同的。

为何RabbitMQ集群仅采用元数据同步的方式

​ 我想肯定有不少同学会问,想要实现HA方案,那将RabbitMQ集群中的所有Queue的完整数据在所有节点上都保存一份不就可以了么?(可以类似MySQL的主主模式嘛)这样子,任何一个节点出现故障或者宕机不可用时,那么使用者的客户端只要能连接至其他节点能够照常完成消息的发布和订阅嘛。
​ 我想RabbitMQ的作者这么设计主要还是基于集群本身的性能和存储空间上来考虑。

  • 第一,存储空间,如果每个集群节点都拥有所有Queue的完全数据拷贝,那么每个节点的存储空间会非常大,集群的消息积压能力会非常弱(无法通过集群节点的扩容提高消息积压能力);
  • 第二,性能,消息的发布者需要将消息复制到每一个集群节点,对于持久化消息,网络和磁盘同步复制的开销都会明显增加,所以其他节点接收到不属于该节点的队列的消息时会将该消息传递给该队列的所有者节点上。
  • 第三,另外当我们开启了消息的发送发确认模式,如果消息需要复制到集群中每个节点,那么就必须等到该消息复制到集群上的所有节点后,才会进行确认应答。

RabbitMQ集群发送/订阅消息的基本原理

客户端直接连接队列所在节点

​ 如果有一个消息生产者或者消息消费者通过amqp-client的客户端连接至节点1进行消息的发布或者订阅,那么此时的集群中的消息收发只与节点1相关,这个没有任何问题;如果客户端相连的是节点2或者节点3(队列1数据不在该节点上),那么情况又会是怎么样呢?

客户端连接的是非队列数据所在节点

​ 如果消息生产者所连接的是节点2或者节点3,此时队列1的完整数据不在该两个节点上,那么在发送消息过程中这两个节点主要起了一个路由转发作用,根据这两个节点上的元数据(也就是上文提到的:指向queue的owner node的指针)转发至节点1上,最终发送的消息还是会存储至节点1的队列1上。
​ 同样,如果消息消费者所连接的节点2或者节点3,那这两个节点也会作为路由节点起到转发作用,将会从节点1的队列1中拉取消息进行消费。

高可用集群相关概念

设计集群的目的

  • 允许消费者和生产者在 RabbitMQ 节点崩溃的情况下继续运行。
  • 通过增加更多的节点来扩展消息通信的吞吐量。

集群配置方式

cluster

​ 不支持跨网段,用于同一个网段内的局域网;可以随意的动态增加或者减少;节点之间需要运行相同版本的 RabbitMQ 和 Erlang。

federation

​ 应用于广域网,允许单台服务器上的交换机或队列接收发布到另一台服务器上交换机或队列的消息,可以是单独机器或集群。federation 队列类似于单向点对点连接,消息会在联盟队列之间转发任意次,直到被消费者接受。通常使用 federation 来连接 internet 上的中间服务器,用作订阅分发消息或工作队列。

shovel

​ 连接方式与 federation 的连接方式类似,但它工作在更低层次。可以应用于广域网。

集群节点类型

​ RabbitMQ中的每一个节点,不管是单一节点系统或者集群中的一部分,要么是内存节点,要么是磁盘节点。内存节点将所有的队列、交换机、绑定关系、用户、权限和vhost的元数据定义都存储在内存中,而磁盘节点则将这些信息存储到磁盘中。单节点的集群中必然只有磁盘类型的节点,否则当重启RabbitMQ之后,所有关于系统的配置信息都会丢失。不过在集群中,可以选择配置部分节点为内存节点,这样可以获得更高的性能

​ RabbitMQ只要求在集群中至少有一个磁盘节点,所有其他节点可以是内存节点。当节点加入或者离开集群时,它们必须将变更通知到至少一个磁盘节点。如果只有一个磁盘节点,而且刚好是它崩溃了,那么集群可以继续发送或者接收消息,但是不能执行创建队列、交换机、绑定关系、用户,以及更改权限、添加或者删除集群节点的操作了。也就是说,如果集群中唯一的磁盘节点崩溃,集群仍然可以保持运行,但是直到将该节点恢复到集群前,无法更改任何东西。所以在建立集群的时候应该保证有两个或者多个磁盘节点的存在

​ 在内存节点重启后,它们会连接到预先配置的磁盘节点,下载当前集群元数据的副本。当在集群中添加内存节点时,确保告知其所有的磁盘节点。只要内存节点可以找到至少一个磁盘节点,那么它就能在重启后重新加入集群中

​ 为了确保集群信息的可靠性,建议全部使用磁盘节点

磁盘节点

​ 将配置信息和元信息存储在磁盘上(单节点系统必须是磁盘节点,否则每次重启RabbitMQ之后所有的系统配置信息都会丢失)。

内存节点

将配置信息和元信息存储在内存中。性能是优于磁盘节点的。

​ RabbitMQ要求集群中至少有一个磁盘节点,当节点加入和离开集群时,必须通知磁盘节点(如果集群中唯一的磁盘节点崩溃了,则不能进行创建队列、创建交换器、创建绑定、添加用户、更改权限、添加和删除集群节点)。总之如果唯一磁盘的磁盘节点崩溃,集群是可以保持运行的,但不能更改任何东西。因此建议在集群中设置两个磁盘节点,只要一个可以,就能正常操作。

Erlang Cookie 是保证不同节点可以相互通信的密钥,要保证集群中的不同节点相互通信必须共享相同的 Erlang Cookie。具体的目录存放在/var/lib/rabbitmq/.erlang.cookie

​ 这就要从 rabbitmqctl 命令的工作原理说起,RabbitMQ 底层是通过 Erlang 架构来实现的,所以 rabbitmqctl 会启动 Erlang 节点,并基于 Erlang 节点来使用 Erlang 系统连接 RabbitMQ 节点,在连接过程中需要正确的 Erlang Cookie 和节点名称,Erlang 节点通过交换 Erlang Cookie 以获得认证。

为什幺配置相同的Erlang Cookie?

​ 因为RabbitMQ使用Erlang实现的,Erlang Cookie相当于不同节点之间相互通讯的秘钥,Erlang节点通过交换Erlang Cookie获得认证

Erlang Cookie的位置

首先要获取RabbitMQ启动日志里面的home dir路径,作为根路径。使用”docker logs 容器名称“查看

1
2
3
4
5
6
7
8
=INFO REPORT==== 3-May-2019::00:40:56 ===
node : rabbit@rabbit1
home dir : /var/lib/rabbitmq
config file(s) : /etc/rabbitmq/rabbitmq.config
cookie hash : l7FRc4s6MFrXQLBiUlLnOA==
log : tty
sasl log : tty
database dir : /var/lib/rabbitmq/mnesia/rabbit@rabbit1

所以Erlang Cookie的全部路径是“/var/lib/rabbitmq/.erlang.cookie”

1
2
root@rabbit1:/# cat /var/lib/rabbitmq/.erlang.cookie
rabbitcookie

rabbitmq集群模式

rabbitmq有3种模式,但集群模式是2种。详细如下

单一模式

​ 即单机情况不做集群,就单独运行一个rabbitmq而已。

普通模式

​ 默认模式,以两个节点(rabbit01、rabbit02)为例来进行说明。对于Queue来说,消息实体只存在于其中一个节点rabbit01(或者rabbit02),rabbit01和rabbit02两个节点仅有相同的元数据,即队列的结构。当消息进入rabbit01节点的Queue后,consumer从rabbit02节点消费时,RabbitMQ会临时在rabbit01、rabbit02间进行消息传输,把A中的消息实体取出并经过B发送给consumer。所以consumer应尽量连接每一个节点,从中取消息。即对于同一个逻辑队列,要在多个节点建立物理Queue。否则无论consumer连rabbit01或rabbit02,出口总在rabbit01,会产生瓶颈。当rabbit01节点故障后,rabbit02节点无法取到rabbit01节点中还未消费的消息实体。如果做了消息持久化,那么得等rabbit01节点恢复,然后才可被消费;如果没有持久化的话,就会产生消息丢失的现象。

镜像模式

​ 的队列做成镜像队列,存在与多个节点属于**RabbitMQ的HA方案。**该模式解决了普通模式中的问题,其实质和普通模式不同之处在于,消息实体会主动在镜像节点间同步,而不是在客户端取数据时临时拉取。该模式带来的副作用也很明显,除了降低系统性能外,如果镜像队列数量过多,加之大量的消息进入,集群内部的网络带宽将会被这种同步通讯大大消耗掉。所以在对可靠性要求较高的场合中适用。

镜像队列实现了 RabbitMQ 的高可用性(HA),具体的实现策略如下所示:

ha-mode ha-params 功能
all 镜像队列将会在整个集群中复制。当一个新的节点加入后,也会在这 个节点上复制一份。
exactly count 镜像队列将会在集群上复制 count 份。如果集群数量少于 count 时候,队列会复制到所有节点上。如果大于 Count 集群,有一个节点 crash 后,新进入节点也不会做新的镜像。
nodes node name 镜像队列会在 node name 中复制。如果这个名称不是集群中的一个,这不会触发错误。如果在这个 node list 中没有一个节点在线,那么这个 queue 会被声明在 client 连接的节点。

集群搭建

使用docker方式进行集群搭建,我们就来搭建一个rabbitmq的集群,本次我们搭建一个具有3个节点的rabbitmq集群

安装知识点

  • 建立集群时,节点中的 Erlang Cookie 值要一致,默认情况下,文件在 /var/lib/rabbitmq/.erlang.cookie
  • erlang 是通过主机名来连接服务,必须保证各个主机名之间可以 ping 通。可以通过编辑 /etc/hosts 来手工添加主机名和 IP 对应关系。如果主机名 ping 不通,rabbitmq 服务启动会失败。
1
2
3
4
rabbitmqctl stop_app # 关闭应用(关闭当前启动的节点)
rabbitmqctl reset #从管理数据库中移除所有数据,例如配置过的用户和虚拟宿主, 删除所有持久化的消息(这个命令要在rabbitmqctl stop_app之后使用)
rabbitmqctl join_cluster rabbit@node1 # node2和node1构成集群, node2必须能通过node1的主机名ping通
rabbitmqctl start_app # 启动应用,和上述关闭命令配合使用,达到清空队列的目的

–ram 指的是作为内存节点,要是想做为磁盘节点的话,就不用加 –ram 这个参数了。在 RabbitMQ 集群里,必须至少有一个磁盘节点存在

1
rabbitmqctl join_cluster --ram rabbit@node1

更改节点属性

1
2
3
4
5
6
rabbitmqctl stop_app  # 停止rabbitmq服务
rabbitmqctl change_cluster_node_type ram # 更改节点为内存节点
Turning rabbit@node2 into a ram node
rabbitmqctl change_cluster_node_type disc # 更改节点为磁盘节点
Turning rabbit@node2 into a disc node
rabbitmqctl start_app # 开启rabbitmq服务

查看集群的状态

1
rabbitmqctl cluster_status

拉取rabbitmq镜像

在centos窗口中,执行如下命令:

1
docker pull rabbitmq:management

创建映射数据卷目录

1
2
3
mkdir rabbitmqcluster
cd rabbitmqcluster/
mkdir rabbitmq01 rabbitmq02 rabbitmq03

配置网络

创建网络

在docker容器创建网络

1
docker network create rabbitmq_net

查看docker网络网关

1
docker inspect rabbitmq_net

使用该网络的docker容器可以分配172.18.0.1 以后的IP

docker 服务说明

为docker容器分配的IP

服务器说明 IP地址 节点说明
rabbitmq01 172.18.0.2 主节点
rabbitmq02 172.18.0.3 从节点
rabbitmq03 172.18.0.4 从节点

可以通过指定网络方式指定IP

1
--network rabbitmq_net --ip 172.18.0.2
配置HOST

配置hosts文件,让各个节点都能互相识别对方的存在,使用挂载的方式进行简化配置

创建host文件

1
2
3
4
127.0.0.1	localhost
172.18.0.2 rabbitmq01
172.18.0.3 rabbitmq02
172.18.0.4 rabbitmq03

使用挂载的方式挂载host文件

1
-v /tmp/etc/hosts:/etc/hosts

创建容器

启动多个RabbitMQ

注:请读者自行找寻创建数据映射目录。

创建完成映射目录后,在centos窗口中,执行如下命令创建容器:

1
2
3
4
5
docker run -d --hostname rabbitmq01 --name rabbitmqCluster01 --network rabbitmq_net --ip 172.18.0.2 -v /tmp/etc/hosts:/etc/hosts -v /tmp/rabbitmqcluster/rabbitmq01:/var/lib/rabbitmq -p 15672:15672 -p 5672:5672 -e RABBITMQ_ERLANG_COOKIE='rabbitmqCookie' rabbitmq:management

docker run -d --hostname rabbitmq02 --name rabbitmqCluster02 --network rabbitmq_net --ip 172.18.0.3 -v /tmp/etc/hosts:/etc/hosts -v /tmp/rabbitmqcluster/rabbitmq02:/var/lib/rabbitmq -p 15673:15672 -p 5673:5672 -e RABBITMQ_ERLANG_COOKIE='rabbitmqCookie' rabbitmq:management

docker run -d --hostname rabbitmq03 --name rabbitmqCluster03 --network rabbitmq_net --ip 172.18.0.4 -v /tmp/etc/hosts:/etc/hosts -v /tmp/rabbitmqcluster/rabbitmq03:/var/lib/rabbitmq -p 15674:15672 -p 5674:5672 -e RABBITMQ_ERLANG_COOKIE='rabbitmqCookie' rabbitmq:management

注:

  • –hostname 设置容器的主机名
  • RABBITMQ_ERLANG_COOKIE 节点认证作用,部署集成时 需要同步该值
  • 多个容器之间用--link连接
复制Erlang Cookie到其他RabbitMQ节点

物理机和容器之间复制命令如下:

  • 容器复制文件到物理机:docker cp 容器名称:容器目录 物理机目录
  • 物理机复制文件到容器:docker cp 物理机目录 容器名称:容器目录

设置Erlang Cookie文件权限:“chmod 600 /var/lib/rabbitmq/.erlang.cookie”

访问页面

启动容器成功后,读者可以访问

1
2
3
http://192.168.64.134:15672/#/
http://192.168.64.134:15673/#/
http://192.168.64.134:15674/#/

查看是否正常启动成功。账号/密码:guest / guest
读者登陆后,查看overview Tab页,可看到节点信息。

容器节点加入集群

第一个节点操作

首先在centos窗口中,执行如下命令,进入第一个rabbitmq节点容器:

1
docker exec -it rabbitmqCluster01 bash

进入容器后,操作rabbitmq,执行如下命令:

1
2
3
4
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl start_app
exit

第二个节点操作

接下来,进入第二个rabbitmq节点容器,执行如下命令:

1
2
3
4
5
6
docker exec -it rabbitmqCluster02 bash
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl join_cluster --ram rabbit@rabbitmq01
rabbitmqctl start_app
exit

第三个节点操作

最后,进入第三个rabbitmq节点容器,执行如下命令:

1
2
3
4
5
6
docker exec -it rabbitmqCluster03 bash
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl join_cluster --ram rabbit@rabbitmq01
rabbitmqctl start_app
exit

查看节点信息

执行上述操作,这时候 再查看 http://192.168.64.134:15672/#/overview面板中的Nodes信息,可查看到节点信息。

SpringBoot 集成RabbitMQ集群

导入坐标

1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

配置yml文件

1
2
3
4
5
6
7
8
9
10
11
12
spring:
rabbitmq:
addresses: 192.168.64.135:5672,192.168.64.135:5673,192.168.64.135:5674
username: guest
password: guest
###开启消息确认机制 confirms
virtual-host: /
publisher-returns: true
#采用手动应答
listener:
simple:
acknowledge-mode: manual

创建启动类

1
2
3
4
5
6
7
@SpringBootApplication
public class Application {

public static void main(String[] args) {
SpringApplication.run(Application.class);
}
}

创建相关类

RabbitMQConfig
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

@Configuration
public class RabbitMQConfig {
public static final String QUEUE_NAME = "test.queue.name";
public static final String EXCHANGE_NAME = "test.exchange.name";
public static final String ROUTING_KEY = "test.#";

@Bean
public Queue initQueue() {
return new Queue(QUEUE_NAME);
}

@Bean
public TopicExchange topicExchange() {
return new TopicExchange(EXCHANGE_NAME);
}

@Bean
Binding bindingTopic() {
return BindingBuilder.bind(initQueue()).
to(topicExchange()).
with(ROUTING_KEY);
}
}
QueueConsumer
1
2
3
4
5
6
7
8
9
10
11
12
13
14

@Component
public class QueueConsumer {

private static final Logger logger = LoggerFactory.getLogger(QueueConsumer.class);

@RabbitListener(queues = RabbitMQConfig.QUEUE_NAME)
public void receiveDelay(Message message, Channel channel) throws IOException {
String msg = new String(message.getBody());

logger.info("接收到消息:{}", msg);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
MessageSender
1
2
3
4
5
6
7
8
9
10
11
12
13

@Component
public class MessageSender {

private static final Logger logger = LoggerFactory.getLogger(MessageSender.class);

@Autowired
private RabbitTemplate rabbitTemplate;

public void sendMsg(String msg) {
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, "test.topic", msg);
}
}
RabbitmqController
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

@RestController
@RequestMapping("/rabbitmq")
public class RabbitmqController {

@Autowired
private MessageSender msgProducer;

/**
* 发送测试数据
*
* @param message
* @return
*/
@RequestMapping("/send")
public String send(String message) {
msgProducer.sendMsg(message);
return "OK";
}
}

测试

启动

发送消息

通过页面能够正常接收到消息

1
2
2020-10-13 16:10:07.380  INFO 14820 --- [ntContainer#0-1] c.heima.rabbitmq.consumer.QueueConsumer  : 接收到消息:test60
2020-10-13 16:15:14.293 INFO 14820 --- [ntContainer#0-1] c.heima.rabbitmq.consumer.QueueConsumer : 接收到消息:testMessage
停止从节点

停止一个从节点然后测试

1
docker stop rabbitmqCluster02

查询集群状态

队列状态是可用的

发送消息测试

1
2
3
4
2020-10-13 16:24:25.542  INFO 5584 --- [ntContainer#0-1] c.heima.rabbitmq.consumer.QueueConsumer  : 接收到消息:testMessage
2020-10-13 16:24:25.691 INFO 5584 --- [ntContainer#0-1] c.heima.rabbitmq.consumer.QueueConsumer : 接收到消息:testMessage
2020-10-13 16:24:25.848 INFO 5584 --- [ntContainer#0-1] c.heima.rabbitmq.consumer.QueueConsumer : 接收到消息:testMessage
2020-10-13 16:24:25.997 INFO 5584 --- [ntContainer#0-1] c.heima.rabbitmq.consumer.QueueConsumer : 接收到消息:testMessage

发现能够正常发送给消息,对系统不影响

停止主节点

停止主节点然后测试

1
docker stop rabbitmqCluster01

发现直接报错,测试发送消息也没有任何响应,并且队列状态是不可用的。

发现问题
  1. 使用springboot默认是一个连接到节点中的,如果该节点不宕机,那么会一直使用这个节点连接操作(导致该节点的压力过大)
  2. 使用这种配置将所有的节点目的地的ip都暴露出来了
  3. 使用这种方式的时候如果,主节点宕机那么会导致所有的节点都不可用(整个集群问题)
  4. 这就是为什么需要使用haproxy来实现,因为haproxy是负载均衡模式的(并且只暴露一个ip地址)

评论