抱歉,您的浏览器无法访问本站
本页面需要浏览器支持(启用)JavaScript
了解详情 >

RocketMQ自动选主

Dledger简介

传统部署方式不足

​ 在 RocketMQ 4.5 之前的版本中,部署 RocketMQ 高可用方案一般都会采用多主多从方式,这种方式需要多个 Master 节点与实时备份 Master 节点数据的 Slave 节点,Slave 节点通过同步复制或异步复制的方式去同步 Master 节点数据,但这样的部署模式存在一定缺陷。

​ 比如故障转移方面,如果 Master 点挂了,还需要人为手动对 Master 节点进行重启或者切换,它无法自动的将 Slave 节点转换为 Master 节点,因此,我们希望能有一个新的多副本架构,去解决这个问题。

新技术解决的问题

为了实现新的高可用多副本架构,RockeMQ 最终选用了基于 Raft 协议的 commitlog 存储库 DLedger,新的多副本架构首先需要解决自动故障转移的问题,本质上来说是自动选主的问题,这个问题的解决方案有以下两种方案

  • 利用第三方协调服务集群完成选主,比如 Zookeeper 或者 Etcd,但是这种方案会引入了重量级外部组件,使部署变得复杂,同时也会增加运维对组件的故障诊断成本,比如在维护 RocketMQ 集群还需要维护 Zookeeper 集群,保证 Zookeeper 集群如何高可用,不仅仅如此,如果 zookeeper 集群出现故障也会影响到 RocketMQ 集群。
  • 利用 raft 协议来完成一个自动选主,raft 协议相比前者的优点是不需要引入外部组件,自动选主逻辑集成到各个节点的进程中,节点之间通过通信就可以完成选主。

RocketMQ 最终选择使用 raft 协议来解决这个问题,而 DLedger 就是一个基于 raft 协议的 commitlog 存储库,也是 RocketMQ 实现新的高可用多副本架构的关键。

Dledger 简介

DLedger是什么

DLedger是一个基于raft协议的commitlog存储库

​ Dledger有一套CommitLog机制,如果使用了它,它接到数据第一步就是写入自己的CommitLog,引入Dledger技术,其实就是使用Dledger的CommitLog来替换掉Broker自己的CommitLog,然后Broker仍然可以基于Dledger的CommitLog,把消息的位置信息保存到ConsumeQueue中

自动主从切换

master-slave模式下消息存储是producer发送消息给Broker后,首先将消息写入到CommitLog日志文件中

​ 然后会把具体的MessageQueue中消息的位置信息存储到ConsumeQueue中,因为是master-slave模式是不支持主从切换的,这个时候如果主节点挂掉则不会发生主从切换,消息只能写入另外的主节点里面,而slave只提供消息读取的功能。

​ Dledger模式先Broker为了实现高可用,是有一个Broker组的,包含Master和Slave,Master接收到数据同步给Slave,一旦出现故障,可以实现主从自动切换,而slave选举成为主节点需要使用共识算法,这里RocketMQ选择是的Raft共识算法

Raft协议选举

分布式算法中比较常常听到的是 Paxos 算法,但是由于 Paxos 算法难于理解,且实现比较困难,所以不太受业界欢迎

​ 然后出现新的分布式算法 Raft,其比 Paxos 更容易懂与实现,到如今在实际中运用的也已经很成熟,不同的语言都有对其的实现,Dledger 就是其中一个 Java 语言的实现,其将算法方面的内容全部抽象掉,这样开发人员只需要关系业务即可,大大降低使用难度。

​ 共识算法Raft通过集群中唯一的领导者管理集群其他服务器上的日志复制来保证数据一致性。共识问题在Raft中被分解为leader选举、log复制两个相对独立的子问题。

一致性问题

​ 在分布式系统中,一致性问题(consensus problem)是指对于一组服务器,给定一组操作,我们需要一个协议使得最后它们的结果达成一致。

leader选举机制

Raft集群中每个节点有三种状态:Follower,Candidate,Leader,状态之间是互相转换。

​ 节点启动的初始状态为follower,每个Follower节点上都有一个倒计时器 (随机在 150ms 到 300ms 之间设置Election Timeout时间),倒计时截止后状态转换为Candidate,并开始发起leader选举。

​ Follower节点通过接受leader hearteat或者candidate RequestVote请求重设Election Timeout来维持Follower状态。

选举过程
  1. Follower节点定时器截止后增加当前的term(一段任意的时间序号,相当于一个国家的朝代,每个Term以选举开始,如果选举成功,则集群会在当前Term下由当前的leader管理),节点状态转换为Candidate

  2. Candidate节点首先增加投票计数器,投票给自己作为新的领导者,然后向集群中的其他服务器发送RequestVote RPC请求。

  3. 收到RequestVote的服务器,在同一term中只会按照先到先得投票给一个与自身log一样或者更新的candidate节点。

  4. Candidate 节点获得超过一半的节点投支持票,该节点状态将转换为leader,其它Candidate 节点收到term值等于或大于当前自身term值得leader hearteat后,该节点状态将转换为follower,如果Candidate 节点定时器截止,仍没有选出leader,将由最先截止的Candidate 节点发起下一轮投票(解决多个Candidate同时获取相同选票无法确定leader问题)。

随机设置Election Timeout时间,可避免多个follower同时变为Candidate状态,发起leader投票,总结起来就是,假如一轮投票不能得到结果,那就每个人随机休眠一下,先醒过来的投给自己,后醒过来的尊重大多数人的意见,选举出Leader后,所有的接收消息操作全都由Leader来负责了,Follower只能同步Leader的数据。

Dledger的数据同步机制

现在我们了解了Dledger的投票选举机制,那么Broker接收到消息后,是如何基于Dledger实现数据同步的呢?

​ 同样,Dledger也是通过Raft协议进行多副本同步的,简单来讲,数据同步分为两个阶段,uncommitted阶段和committed阶段。

​ 首先,当Leader接到消息数据后,会先标记消息为uncommitted状态,然后通过Dledger的组件把uncommitted状态的消息发送给Follower上的DledgerServer。

​ 接着Follower接到消息后,会发送一个ack给Leader上的DledgerServer,然后如果Leader发现超过半数的Follower已经给自己返回了ack,那么就认为同步成功了,这时候把状态改为committed,然后再发消息给Follower,将Follower上的状态也改为committed。

​ 如果这个时候Leader宕机后,剩余的Follower重新发起新的一轮的选举了,然后还是按照老规矩,最后一定会选出一个Leader,新的消息就由新选举出来的Leader来接收就可以了。

​ 同时,由于数据同步是分为两阶段同步的,超过半数的机器返回ack才会认为是同步成功,所以就算是在同步过程中Leader宕机了,其实Follower中已经同步了消息,只不过状态是uncommitted而已,新的Leader可以根据这些uncommitted的消息进行数据的恢复操作,更改消息状态为commited。

DLedger 定位

Raft 协议是复制状态机的实现,这种模型应用到消息系统中就会存在问题

image-20220627172905176

​ 对于消息系统来说,它本身是一个中间代理,commitlog 状态是系统最终状态,并不需要状态机再去完成一次状态构建。因此 DLedger 去掉了 raft 协议中状态机的部分,但基于raft协议保证commitlog 是一致的,并且是高可用的。

image-20220627173009647

​ 另一方面 DLedger 又是一个轻量级的 java library,它对外提供的 API 非常简单,append 和 get,Append 向 DLedger 添加数据,并且添加的数据会对应一个递增的索引,而 get 可以根据索引去获得相应的数据,因此 DLedger 是一个 append only 的日志系统。

DLedger 应用场景

image-20220627173033738

​ DLedger 其中一个应用就是在分布式消息系统中,RocketMQ 4.5 版本发布后,可以采用 RocketMQ on DLedger 方式进行部署,DLedger commitlog 代替了原来的 commitlog,使得 commitlog 拥有了选举复制能力,然后通过角色透传的方式,raft 角色透传给外部 broker 角色,leader 对应原来的 master,follower 和 candidate 对应原来的 slave

​ 因此 RocketMQ 的 broker 拥有了自动故障转移的能力,在一组 broker 中, Master 挂了以后,依靠 DLedger 自动选主能力,会重新选出 leader,然后通过角色透传变成新的 Master。

image-20220627173057438

​ DLedger 还可以构建高可用的嵌入式 KV 存储。我们把对一些数据的操作记录到 DLedger 中,然后根据数据量或者实际需求,恢复到hashmap 或者 rocksdb 中,从而构建一致的、高可用的 KV 存储系统,应用到元信息管理等场景。

Dledger部署

DLedger方案简介

RocketMQ-on-DLedger Group 是指一组相同名称的 Broker,组中至少需要 3 个 Broker 节点来保证集群能够运行

image-20220627173411858

​ 在 Broker 启动时候,通过 raft 算法能够自动选举出一个 Broker 为 Leader 节点,其余为 Follower 节点,这种模式下 Leader 和 Follower 之间复制数据以保证高可用,如果 Leader 节点出现问题是可以自动进行容灾切换并保证数据一致性。且不仅仅如此,该模式也支持 Broker 节点水平扩展来增加吞吐量,所以该模式将会是部署 RocketMQ 常用模式之一

配置文件介绍

配置示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
## RocketMQ 基本配置
brokerClusterName = DefaultCluster
brokerName = broker-a
brokerIP1=192.168.245.141
brokerIP2=192.168.145.141
listenPort=30911
namesrvAddr=127.0.0.1:9876
#如果多台机器只需要配置下面storePathRootDir以及storePathCommitLog
storePathRootDir=/tmp/rmqstore/node00
storePathCommitLog=/tmp/rmqstore/node00/commitlog
#如果只有一台机器,下面的路径需要单独配置
storePathConsumeQueue=/tmp/rmqstore/node00/consumequeue
storePathIndex=/tmp/rmqstore/node00/index
storeCheckpoint=/tmp/rmqstore/node00/checkpoint
abortFile=/tmp/rmqstore/node00/abort

## Dledger 配置
enableDLegerCommitLog=true
dLegerGroup=broker-a
dLegerPeers=n0-127.0.0.1:40911;n1-127.0.0.1:40912;n2-127.0.0.1:40913
## must be unique
dLegerSelfId=n0
sendMessageThreadPoolNums=16
基本配置参数说明
参数名称 参数描述 参数示例
brokerClusterName Broker 集群名称 RaftCluster
brokerName Broker 名称 RaftNode00
listenPort Broker 监听端口 30911
namesrvAddr Broker Namesrv 地址 192.168.1.1:9876;192.168.1.2:9876
storePathRootDir Broker 存储目录 /tmp/rmqstore/node00
storePathCommitLog Commitlog 存储目录 /tmp/rmqstore/node00/commitlog
Dledger 配置参数说明
参数名称 参数描述 参数示例
enableDLegerCommitLog 是否启动 DLedger true
dLegerGroup DLedger Raft Group 的名字,建议和 brokerName 保持一致 RaftNode00
dLegerPeers DLedger Group 内各节点的地址与端口信息(同一个 Group 内的各个节点配置必须要保证一致) n0-192.168.1.1:40911;n1-192.168.1.2:40911;n2-192.168.1.3:40911
dLegerSelfId 节点 id, 必须属于 dLegerPeers 中的一个;同 Group 内各个节点要唯一 例如: 第一个节点配置为”n0” 第一个节点配置为”n1” 第一个节点配置为”n2”
sendMessageThreadPoolNums 发送线程个数(建议配置成 CPU 核数) 8

单节点物理部署

下面我们采用物理部署单节点方式来演示DLedger的部署方案

部署架构

image-20220627173430462

节点安排
服务器 部署的应用 存储挂载目录 端口号
192.168.245.141 Namesrv Server 9876
192.168.245.141 Broker node0 /tmp/rmqstore/node0 30911
192.168.245.141 Broker node1 /tmp/rmqstore/node1 30921
192.168.245.141 Broker node2 /tmp/rmqstore/node2 30931
RocketMQ下载
创建目录

下面我们创建RocketMQ环境的目录

1
2
# 创建安装以及存储目录
mkdir -p /usr/local/rocketmq/DledgerCluster/{node0,node1,node2}/storage
下载RocketMQ

接下来就需要下载RocketMQ的安装文件

1
wget https://archive.apache.org/dist/rocketmq/4.7.1/rocketmq-all-4.7.1-bin-release.zip -P /usr/local/rocketmq/DledgerCluster/node0

image-20220627173441540

解压文件

使用unzip命令解压下载的文件

1
2
3
4
# 解压RocketMQ安装文件
unzip rocketmq-all-4.7.1-bin-release.zip
# 删除安装包
rm -rf rocketmq-all-4.7.1-bin-release.zip
RocketMQ启动命令修改

因为我们是要单机启动,需要调小RocketMQ的JVM参数

​ 修改目录 /usr/local/rocketmq/rocketmq-all-4.7.1-bin-release/bin 下的 2个配置文件: runserver.sh、runbroker.sh 不然启动会报insufficient memory

runserver.sh
1
vi runserver.sh

设置 runserver.sh 中此项配置

1
JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn256m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"

image-20220627173452032

runbroker.sh
1
vi runbroker.sh

设置 runbroker.sh 中此项配置

1
JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn256m"

image-20220627173500411

复制安装包

下面我们需要将配置好的安装包复制到其他的节点目录

1
2
3
cd /usr/local/rocketmq/DledgerCluster
cp -R node0/rocketmq-all-4.7.1-bin-release/ node1/
cp -R node0/rocketmq-all-4.7.1-bin-release/ node2/
配置文件修改

下面我们修改RocketMQ目录下的broker.conf的配置文件,将配置文件修改为以下内容

node0配置

node0配置文件如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
brokerClusterName = DledgerCluster
brokerName = broker-a
brokerIP1=192.168.245.141
brokerIP2=192.168.145.141

listenPort=30911
namesrvAddr=127.0.0.1:9876
storePathRootDir=/usr/local/rocketmq/DledgerCluster/node0/storage
storePathCommitLog=/usr/local/rocketmq/DledgerCluster/node0/storage/commitlog
storePathConsumeQueue=/usr/local/rocketmq/DledgerCluster/node0/storage/consumequeue
storePathIndex=/usr/local/rocketmq/DledgerCluster/node0/storage/index
storeCheckpoint=/usr/local/rocketmq/DledgerCluster/node0/storage/checkpoint
abortFile=/usr/local/rocketmq/DledgerCluster/node0/storage/abort


enableDLegerCommitLog=true
dLegerGroup=broker-a
# 这里配置的是所有节点的信息,用于选主
dLegerPeers=n0-127.0.0.1:40911;n1-127.0.0.1:40912;n2-127.0.0.1:40913
## must be unique 注意该参数必须是以n开头加上数字
dLegerSelfId=n0
sendMessageThreadPoolNums=16
node1配置

node1配置文件如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
brokerClusterName = DledgerCluster
brokerName = broker-a
brokerIP1=192.168.245.141
brokerIP2=192.168.145.141
listenPort=30921

namesrvAddr=127.0.0.1:9876
storePathRootDir=/usr/local/rocketmq/DledgerCluster/node1/storage
storePathCommitLog=/usr/local/rocketmq/DledgerCluster/node1/storage/commitlog
storePathConsumeQueue=/usr/local/rocketmq/DledgerCluster/node1/storage/consumequeue
storePathIndex=/usr/local/rocketmq/DledgerCluster/node1/storage/index
storeCheckpoint=/usr/local/rocketmq/DledgerCluster/node1/storage/checkpoint
abortFile=/usr/local/rocketmq/DledgerCluster/node1/storage/abort


enableDLegerCommitLog=true
dLegerGroup=broker-a
# 这里配置的是所有节点的信息,用于选主
dLegerPeers=n0-127.0.0.1:40911;n1-127.0.0.1:40912;n2-127.0.0.1:40913
## must be unique 注意该参数必须是以n开头加上数字
dLegerSelfId=n1
sendMessageThreadPoolNums=16
node2配置

node2配置文件如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
brokerClusterName = DledgerCluster
brokerName = broker-a
brokerIP1=192.168.245.141
brokerIP2=192.168.145.141

listenPort=30931
namesrvAddr=127.0.0.1:9876
storePathRootDir=/usr/local/rocketmq/DledgerCluster/node2/storage
storePathCommitLog=/usr/local/rocketmq/DledgerCluster/node2/storage/commitlog
storePathConsumeQueue=/usr/local/rocketmq/DledgerCluster/node2/storage/consumequeue
storePathIndex=/usr/local/rocketmq/DledgerCluster/node2/storage/index
storeCheckpoint=/usr/local/rocketmq/DledgerCluster/node2/storage/checkpoint
abortFile=/usr/local/rocketmq/DledgerCluster/node2/storage/abort


enableDLegerCommitLog=true
dLegerGroup=broker-a
# 这里配置的是所有节点的信息,用于选主
dLegerPeers=n0-127.0.0.1:40911;n1-127.0.0.1:40912;n2-127.0.0.1:40913
## must be unique 注意该参数必须是以n开头加上数字
dLegerSelfId=n2
sendMessageThreadPoolNums=16
启动服务

经过上面配置文件的修改,我们已经配置完成了Dledger的配置,下面就是启动服务了

​ 注意启动broker的时候要通过-c指定配置文件进行启动,否则会使用默认的配置文件

启动NameSrv

启动RocketMQ集群第一步是启动NameSrv服务

1
2
3
cd /usr/local/rocketmq/DledgerCluster/node0/rocketmq-all-4.7.1-bin-release/
#启动NameSrv
nohup sh bin/mqnamesrv &

image-20220627173515562

如下所示,看到 The Name Server boot success. serializeType=JSON 表示启动成功

image-20220627173524220

启动node0

下面我们启动broker的第一个节点

1
2
3
cd /usr/local/rocketmq/DledgerCluster/node0/rocketmq-all-4.7.1-bin-release/
# 启动node0的broker,并且需要指定配置文件
nohup sh bin/mqbroker -c conf/broker.conf &

image-20220627173533311

启动之后同样提示将日志信息追加到了当前目录下的 nohup.out 文件中,如果出现The broker[bogon, 192.168.245.141:10911] boot success. serializeType=JSON则表示启动成功

image-20220627173541425

启动node1,node2

跟上面启动node0的步骤一样

验证服务

当我们全部启动完成后,可以验证以下服务

查看进程

可以通过命令来查看服务的进程

1
ps -ef|grep java

我们发现所有的节点都已经启动了

image-20220627173554420

发送消息
1
2
3
4
# 设置环境变量
export NAMESRV_ADDR=127.0.0.1:9876
# 发送消息
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer

发送消息成功

image-20220627173603160

接收消息

使用命令测试消息接收

1
2
3
4
# 设置环境变量
export NAMESRV_ADDR=127.0.0.1:9876
# 接收消息
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer

发送消息成功,说明我们集群搭建完成,可以进行消息收发

image-20220627173612881

可靠性验证

RocketMQ使用Dledger 模式就是为了保障集群的高可靠性,如果主节点挂掉会重新选举

查看集群状态

可以通过mqadmin来查看RocketMQ的集群情况

1
sh bin/mqadmin clusterList -n 127.0.0.1:9876

其中BID为0的就是主节点

image-20220627173621474

破坏性测试

我们现在将主节点停掉,然后检查是否可以正常选主,并且是否可以正常收发消息

停掉主节点

我们看到BID为0的节点是主节点,根据我们的部署方案知道这个节点是node1

1
ps -ef|grep java

我们需要找到主节点的进程号然后杀死主节点

image-20220627173630397

1
kill -9 27930

image-20220627173637581

查看集群情况

停掉主节点后,我们查看mq集群的情况

1
sh bin/mqadmin clusterList -n 127.0.0.1:9876

我们发现已经选出来了一个主节点,根据端口我们知道当前的主节点是node2

image-20220627173646185

发送消息测试

我们发送消息进行测试,检查选出来的主节点是否能够发送消息

1
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer

我们发现已经发送主节点成功,说明集群已经恢复正常

image-20220627173652836

启动挂掉的节点

我们把刚才停掉的node1节点启动,然后在检查集群的情况

1
nohup sh bin/mqbroker -c conf/broker.conf &

image-20220627173701130

检查集群情况
1
sh bin/mqadmin clusterList -n 127.0.0.1:9876

我们发现三个节点已经恢复,刚才启动的节点现在变成了flower节点了

image-20220627173711807

查看集群配置

我们可以通过以下命令来查看broker的配置信息

1
sh bin/mqadmin getBrokerConfig -c DledgerCluster -n 127.0.0.1:9876

因为我们没有配置同步方式,默认应该是异步复制,而这里显示的是同步复制

image-20220627173721224

带者疑问我们分了下源码

image-20220627173730267

可以看到,LEADER角色的Broker,角色会被改成SYNC_MASTER

为什么不支持异步复制

​ 根据一些资料分析可以得出开启DLedger之后,由于使用了Raft协议来保证集群的一致性,由于Raft主从复制的原理,和原来的Master/Slave模式的不一样,如果使用了Raft协议,主从复制是由Master发起,主动向每个从节点同步消息,而异步的方式是由从节点发起,向主节点上报offset,主节点再根据offset向从节点同步,所以开启了DLedger后,配置了ASNYC_MASTER会不生效,在最新的4.8.0中使用了Pipeline模式和批量复制,性能有所提升

评论