RocketMQ主从同步
概要
高可用(HA 机制)特性是目前分布式系统中必备的特性之一,对一个中间件来说没有HA机制必然是一个重大的缺陷。RocketMQ的Broker 分为 Master (主)和 Slave(从)两个角色,为了保证高可用性,Master 角色的机器接收到消息后 ,要把内容同步到 Slave 机器上,这样一旦 Master 宕机,Slave 机 器依然可以提供服务。这个就是 RocketMQ 实现高可用(HA 机制)的原理。
为了提高消息消费的高可用性,避免 Broker 发生单点故障引起存储在 Broker 上的消息无法及时消费, RocketMQ 引入了 Broker 主从机制:即消息 消费到达主服务器(Master)后需要消息同步到消息从服务器(Slave),如果主服务器 Broker 宕机后,消息消费者可以从从服务器拉取消息。
同时 RocketMQ 依赖 NameServer, 所以为了确保高可用,同时要确保 NameServer 的高可用,一般通过部署多台 NamesServer 服务器来实现,但彼此 之间互不通信,也就是 NameServer 务器之间在某一时刻的数据并不会完全相同,但这对消息发送不会造成任何影响,这也是 RocketMQ NameServer 设计的一个亮点。
RocketMQ集群部署模式
集群部署模式
单master模式
也就是只有一个 master 节点,称不上是集群,一旦这个 master 节点宕机,那么整个服务就不可用
多master模式
一个集群无Slave,全是Master,例如2个Master或者3个Master,这种模式的优缺点如下:
优点
配置简单,单个Master宕机或重启维护对应用无影响,在磁盘配置为RAID10时,即使机器宕机不可恢复情况下,由于RAID10磁盘非常可靠,消息也不会丢(异步刷盘丢失少量消息,同步刷盘一条不丢),性能最高;
缺点
单台机器宕机期间,这台机器上未被消费的消息在机器恢复之前不可订阅,消息实时性会受到影响。
注意:使用同步刷盘可以保证消息不丢失,同时 Topic 相对应的 queue 应该分布在集群中各个节点,而不是只在某各节点上,否则,该节点宕机会 对订阅该 topic 的应用造成影响。
多Master多Slave模式-异步复制
在多 master 模式的基础上,每个 master 节点都有至少一个对应的 slave。master 节点可读可写,但是 slave 只能读不能写,类似于 mysql 的主备 模式。
优点
即使磁盘损坏,消息丢失的非常少,且消息实时性不会受影响,同时Master宕机后,消费者仍然可以从Slave消费,而且此过程对应用透明,不需要人工干预,一般情况下都是 master 消费,在 master 宕机或超过负载时,消费者可以从 slave 读取消息,消息的实时性不会受影响,性能几乎和多 master 一样。
缺点
使用异步复制的同步方式有可能会有消息丢失的问题。
多master多slave同步-双写模式
同多 master 多 slave 异步复制模式类似,区别在于 master 和 slave 之间的数据同步方式。
优点
同步双写的同步模式能保证数据不丢失。,数据与服务都无单点故障,Master宕机情况下,消息无延迟,服务可用性与数据可用性都非常高;
缺点
性能比异步复制模式略低(大约低10%左右),发送单个消息的RT会略高,且目前版本在主节点宕机后,备机不能自动切换为主机。
同步方式
同步双写和异步复制(指的一组 master 和 slave 之间数据的同步)
刷盘策略
同步刷盘和异步刷盘(指的是节点自身数据是同步还是异步存储进入磁盘)
注意:对数据要求较高的场景,建议的持久化策略是主 broker 和从 broker 采用同步复制方式,主从 broker 都采用异步刷盘方式。通过同步复制方式,保存数据热备份,通过异步刷盘方式,保证 rocketMQ 高吞吐量。
安装部署过程
注意事项
注意,默认RocketMQ会吃8G,所以需要修改默认加载内存设置。
修改broker启动脚本runbroker.sh里面的jvm参数
JAVA_OPT=”${JAVA_OPT} -server -Xms8g -Xmx8g -Xmn4g”改为
JAVA_OPT=”${JAVA_OPT} -server -Xms128m -Xmx256m -Xmn256m”
RocketMQ提供了初始的集群部署模式下的配置文件,如下图:
双主集群安装
服务器相关配置信息
NameServer集群
192.168.56.102
192.168.56.103
Broker服务器
192.168.56.102(主A)
192.168.56.103(主B)
注意,因为RocketMQ使用外网地址,所以配置文件(MQ文件夹/conf/2m-noslave/)需要修改(同时修改nameserver地址为集群地址):
配置文件配置
使用2m-noslave配置模板
broker-a.properties 配置
192.168.56.102(主A)的broker-a.properties 增加:
1 | brokerIP1=192.168.56.102 |
broker-b.properties
192.168.56.103 (主B)broker-b.properties 增加:
1 | brokerIP1=192.168.56.103 |
启动步骤
记得关闭防火墙或者要开通9876端口
启动NameServer集群
这里使用102和103两台作为集群即可
在机器A,启动第1台NameServer:102服务器进入至MQ文件夹/bin下:然后执行:
1
nohup sh mqnamesrv &
在机器B,启动第2台NameServer:103服务器进入至MQ文件夹/bin下:然后执行:
1
nohup sh mqnamesrv &
启动双主集群
顺序是先启动主,然后启动从
启动主A
启动主A:102服务器进入至MQ文件夹/bin下,执行以下命令(autoCreateTopicEnable=true测试环境开启,生产环境建议关闭):
1 | nohup sh mqbroker -c ../conf/2m-noslave/broker-a.properties autoCreateTopicEnable=true & tail -f ~/logs/rocketmqlogs/broker.log |
启动主B
启动主B:103服务器进入至MQ文件夹\bin下,执行以下命令:
1 | nohup sh mqbroker -c ../conf/2m-noslave/broker-b.properties autoCreateTopicEnable=true & tail -f ~/logs/rocketmqlogs/broker.log |
查看日志
每台服务器查看日志
1 | tail -f ~/logs/rocketmqlogs/broker.log |
启动控制台
如果是要启动控制台,则需要重新打包:
进入\rocketmq-console\src\main\resources文件夹,打开application.properties进行配置(多个NameServer使用;分隔)。
rocketmq.config.namesrvAddr=192.168.56.102:9876;192.168.56.103:9876
进入\rocketmq-externals\rocketmq-console文件夹,执行mvn clean package -Dmaven.test.skip=true,编译生成。在把编译后的jar包丢上服务器:
启动命令
1 | nohup java -jar rocketmq-console-ng-1.0.1.jar & |
双主双从同步集群安装
服务器相关配置信息
NameServer集群
192.168.56.102
192.168.56.103
Broker服务器
192.168.56.102(主A)
192.168.56.103(主B)
192.168.56.104(从A)
192.168.56.105(从B)
注意,因为RocketMQ使用外网地址,所以配置文件(MQ文件夹/conf/2m-2s-sync/)需要修改(同时修改nameserver地址为集群地址):
配置文件配置
使用2m-2s-sync配置模板
broker-a.properties
192.168.56.102(主A)的broker-a.properties增加:
1 | brokerIP1=192.168.56.102 |
broker-b.properties
192.168.56.103(主B)的broker-b.properties增加:
1 | brokerIP1=192.168.56.103 |
broker-a-s.properties
192.168.56.104从A)的broker-a-s.properties增加:
1 | brokerIP1=192.168.56.104 |
broker-b-s.properties
192.168.56.105(从B)broker-b-s.properties增加:
1 | brokerIP1=192.168.56.105 |
启动步骤
记得关闭防火墙或者要开通9876端口
启动NameServer集群
这里使用102和103两台作为集群即可
启动机器A
在机器A,启动第1台NameServer:102服务器进入至MQ文件夹/bin下,然后执行:
1 | nohup sh mqnamesrv & |
启动机器B
在机器B,启动第2台NameServer:103服务器进入至MQ文件夹/bin下,然后执行:
1 | nohup sh mqnamesrv & |
启动双主双从同步集群
顺序是先启动主,然后启动从
启动主A
启动主A:,102服务器进入至MQ文件夹/bin下:执行以下命令(autoCreateTopicEnable=true测试环境开启,生产环境建议关闭):
1 | nohup sh mqbroker -c ../conf/2m-2s-sync/broker-a.properties autoCreateTopicEnable=true & |
启动主B
启动主B,103服务器进入至MQ文件夹\bin下,执行以下命令:
1 | nohup sh mqbroker -c ../conf/2m-2s-sync/broker-b.properties autoCreateTopicEnable=true & |
启动从A
启动从A,104服务器进入至MQ文件夹\bin下,执行以下命令:
1 | nohup sh mqbroker -c ../conf/2m-2s-sync/broker-a-s.properties autoCreateTopicEnable=true & |
启动从B
启动从B,105服务器进入至MQ文件夹\bin下,执行以下命令:
1 | nohup sh mqbroker -c ../conf/2m-2s-sync/broker-b-s.properties autoCreateTopicEnable=true & |
查看日志
每台服务器查看日志:
1 | tail -f ~/logs/rocketmqlogs/broker.log |
双主双从异步集群安装
服务器相关配置信息
NameServer集群
192.168.56.102
192.168.56.103
Broker服务器
192.168.56.102(主A)
192.168.56.103(主B)
192.168.56.104(从A)
192.168.56.105(从B)
配置文件配置
使用2m-2s-async配置模板
注意:因为RocketMQ使用外网地址,所以配置文件(MQ文件夹/conf/2m-2s-async/)需要修改(同时修改nameserver地址为集群地址):
broker-a.properties
192.168.56.102(主A)的broker-a.properties增加:
1 | brokerIP1=192.168.56.102 |
broker-b.properties
192.168.56.103(主B)的broker-b.properties增加:
1 | brokerIP1=192.168.56.103 |
broker-a-s.properties
192.168.56.104(从A)的broker-a-s.properties增加:
1 | brokerIP1=192.168.56.104 |
broker-b-s.properties
192.168.56.105(从B)的broker-b-s.properties增加:
1 | brokerIP1=192.168.56.105 |
启动步骤
记得关闭防火墙或者要开通9876端口
启动NameServer集群
这里使用102和103两台作为集群即可
启动机器A
在机器A,启动第1台NameServer: 102服务器进入至MQ文件夹/bin下,然后执行:
1 | nohup sh mqnamesrv & |
启动机器B
在机器B,启动第2台NameServer: 103服务器进入至MQ文件夹/bin下,然后执行:
1 | nohup sh mqnamesrv & |
启动双主双从同步集群
顺序是先启动主,然后启动从
启动主A
启动主A: 102服务器进入至MQ文件夹/bin下:执行以下命令(autoCreateTopicEnable=true 测试环境开启,生产环境建议关闭):
1 | nohup sh mqbroker -c ../conf/2m-2s-async/broker-a.properties autoCreateTopicEnable=true & |
启动主B
启动主B:103服务器进入至MQ文件夹\bin下:执行以下命令:
1 | nohup sh mqbroker -c ../conf/2m-2s-async/broker-b.properties autoCreateTopicEnable=true & |
启动从A
启动从A: 104服务器进入至MQ文件夹\bin下:执行以下命令:
1 | nohup sh mqbroker -c ../conf/2m-2s-async/broker-a-s.properties autoCreateTopicEnable=true & |
启动从B
启动从B:105服务器进入至MQ文件夹\bin下:执行以下命令:
1 | nohup sh mqbroker -c ../conf/2m-2s-async/broker-b-s.properties autoCreateTopicEnable=true & |
查看日志
每台服务器查看日志
1 | tail -f ~/logs/rocketmqlogs/broker.log |
启动控制台
如果是要启动控制台,则需要重新打包:
进入\rocketmq-console\src\main\resources文件夹,打开application.properties进行配置(多个NameServer使用;分隔)。
例如:rocketmq.config.namesrvAddr=192.168.56.102:9876;192.168.56.103:9876
进入\rocketmq-externals\rocketmq-console文件夹,执行mvn clean package -Dmaven.test.skip=true编译生成jar包,在把编译后的jar包丢上服务器:
启动命令
1 | nohup java -jar rocketmq-console-ng-1.0.1.jar & |
主从复制原理
详细参见主从复制原理
RocketMQ 主从同步(HA)实现过程如下:
主服务器启动,并在特定端口上监听从服务器的连接。
从服务器主动连接主服务器,主服务器接受客户端的连接,并建立相关 TCP 连接。
从服务器主动向服务器发送待拉取消息偏移 ,主服务器解析请求并返回消息给从服务器。
从服务器保存消息并继续发送新的消息同步请求
核心实现
从服务器在启动的时候主动向主服务器建立 TCP 长连接,然后获取服务器的 commitlog 最大偏移,以此偏移向主服务器主动拉取消息,主服务器根 据偏移量,与自身 commitlog 文件的最大偏移进行比较,如果大于从服务器 commitlog 偏移,主服务器将向从服务器返回一定数量的消息,该过程循环进行,达到主从服务器数据同步。
读写分离机制
RocketMQ 读写分离与他中间件的实现方式完全不同, RocketMQ 是消费者首先服务器发起拉取消息请求,然后主服务器返回一批消息,然后会根据主服务器负载压力与主从同步情况,向从服务器建议下次消息拉取是从主服务器还是从从服务器拉取。
那消息服务端是根据何种规则来建议哪个消息消费队列该从哪台 Broker 服务器上拉取消息呢?
一般都是从主服务器拉取,如果主阶段拉取的消息已经超出了常驻内存的大小,表示主服务器繁忙,此时从从服务器拉取。
如果主服务器繁忙则建议下 次从从服务器拉取消息,设置 suggestWhichBrokerld 配置文件中 whichBrokerWhenConsumeSlowly 属性,默认为 1。如 果一个 Master 拥有多台 Slave 服务器,参与消息拉取负载的从服务器只会是其中一个。
与Spring集成
pom文件
1 | <!--RocketMQ--> |
生产者
生产者配置信息
applicationContext.xml
1 | <!-- 生产者配置 --> |
生产者代码实现
发送入口代码
1 |
|
生产者代码封装
1 |
|
消费者
消费者监听配置
applicationContext.xml中使用监听器的方式
1 | <!-- 消费者监听1 --> |
消费者代码实现
监听消息代码
1 |
|
与SpringBoot集成
配置信息
跟Spring和原生非常类似,maven配置如下:
POM配置
1 | <dependency> |
application.properties配置
1 | #============== rocket =================== |
生产者代码实现
发送入口代码
1 |
|
生产者代码封装
1 |
|
消费者
消费者代码
1 |
|