Canal部署

  |  

Canal简介

Mysql初始化

初始化数据库表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
/*
SQLyog Ultimate v13.1.1 (64 bit)
MySQL - 5.7.32-log : Database - test
*********************************************************************
*/

/*!40101 SET NAMES utf8 */;

/*!40101 SET SQL_MODE=''*/;

/*!40014 SET @OLD_UNIQUE_CHECKS=@@UNIQUE_CHECKS, UNIQUE_CHECKS=0 */;
/*!40014 SET @OLD_FOREIGN_KEY_CHECKS=@@FOREIGN_KEY_CHECKS, FOREIGN_KEY_CHECKS=0 */;
/*!40101 SET @OLD_SQL_MODE=@@SQL_MODE, SQL_MODE='NO_AUTO_VALUE_ON_ZERO' */;
/*!40111 SET @OLD_SQL_NOTES=@@SQL_NOTES, SQL_NOTES=0 */;
CREATE DATABASE /*!32312 IF NOT EXISTS*/`test` /*!40100 DEFAULT CHARACTER SET latin1 */;

USE `test`;

/*Table structure for table `activity_info` */

DROP TABLE IF EXISTS `activity_info`;

CREATE TABLE `activity_info` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`name` varchar(100) DEFAULT NULL COMMENT '活动名称',
`desc` varchar(3000) DEFAULT NULL COMMENT '活动介绍',
`starttime` datetime DEFAULT NULL,
`endtime` datetime DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8;

/*Data for the table `activity_info` */

insert into `activity_info`(`id`,`name`,`desc`,`starttime`,`endtime`) values
(1,'双十一红包雨活动','10亿红包等你拿,活动只有1小时!10亿红包等你拿,活动只有1小时!10亿红包等你拿,活动只有1小时!10亿红包等你拿,活动只有1小时!10亿红包等你拿,活动只有1小时!10亿红包等你拿,活动只有1小时!10亿红包等你拿,活动只有1小时!10亿红包等你拿,活动只有1小时!10亿红包等你拿,活动只有1小时!10亿红包等你拿,活动只有1小时!10亿红包等你拿,活动只有1小时!10亿红包等你拿,活动只有1小时!10亿红包等你拿,活动只有1小时!10亿红包等你拿,活动只有1小时!10亿红包等你拿,活动只有1小时!10亿红包等你拿,活动只有1小时!10亿红包等你拿,活动只有1小时!10亿红包等你拿,活动只有1小时!10亿红包等你拿,活动只有1小时!10亿红包等你拿,活动只有1小时!10亿红包等你拿,活动只有1小时!10亿红包等你拿,活动只有1小时!10亿红包等你拿,活动只有1小时!10亿红包等你拿,活动只有1小时!10亿红包等你拿,活动只有1小时!10亿红包等你拿,活动只有1小时!10亿红包等你拿,活动只有1小时!10亿红包等你拿,活动只有1小时!10亿红包等你拿,活动只有1小时!10亿红包等你拿,活动只有1小时!10亿红包等你拿,活动只有1小时!10亿红包等你拿,活动只有1小时!10亿红包等你拿,活动只有1小时!10亿红包等你拿,活动只有1小时!10亿红包等你拿,活动只有1小时!10亿红包等你拿,活动只有1小时!10亿红包等你拿,活动只有1小时!10亿红包等你拿,活动只有1小时!10亿红包等你拿,活动只有1小时!10亿红包等你拿,活动只有1小时!10亿红包等你拿11,2活动只有1小时!10亿红包等你拿,活动只有1小时!','2020-06-30 23:19:49','2020-05-26 23:19:56');

/*Table structure for table `money_log` */

DROP TABLE IF EXISTS `money_log`;

CREATE TABLE `money_log` (
`id` varchar(60) NOT NULL,
`money` double DEFAULT NULL COMMENT '抢到的金额',
`createtime` datetime DEFAULT NULL COMMENT '抢到的时间',
`username` varchar(20) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='抢红包记录';

/*Data for the table `money_log` */

insert into `money_log`(`id`,`money`,`createtime`,`username`) values
('0bd5b66e-d94a-43e9-8a9f-26cfcac60d8a',10,'2020-08-23 17:53:47','zhangsan'),
('6dc4d8ce-1548-4285-b9e7-da441a304a64',57,'2020-08-23 17:48:11','zhangsan'),
('8afbc7b9-11fa-4840-bc3a-086d6a1ce0d4',6,'2020-08-23 17:53:45','zhangsan'),
('ce40d6cb-3763-403a-9c8e-9be371aa1318',24,'2020-08-23 17:47:38','zhangsan'),
('ttttt',15,'2020-08-23 17:48:41','zhangsan');

/*Table structure for table `money_package` */

DROP TABLE IF EXISTS `money_package`;

CREATE TABLE `money_package` (
`id` int(11) NOT NULL COMMENT '主键ID',
`money` int(11) NOT NULL COMMENT '红包总金额',
`count` int(11) NOT NULL COMMENT '红包数量',
`sort` int(2) DEFAULT NULL COMMENT '发红包顺序',
`type` int(11) DEFAULT '1' COMMENT '红包发放类型 1:延时发放 2:立即发放',
`hasload` int(1) DEFAULT '1' COMMENT '加载位置 1:未加载 2:加载到程序 3:加载到Redis缓存',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

/*Data for the table `money_package` */

insert into `money_package`(`id`,`money`,`count`,`sort`,`type`,`hasload`) values
(1,9000,20,1,1,3),
(2,1500,11,2,1,3),
(3,1600,100,3,1,3),
(4,1900,10,4,1,3),
(5,1500,11,5,1,3),
(6,111,11,6,1,3),
(7,9999,10,7,1,3),
(8,2,2,8,1,3),
(9,222,2,9,1,3);

/*Table structure for table `user_info` */

DROP TABLE IF EXISTS `user_info`;

CREATE TABLE `user_info` (
`username` varchar(20) NOT NULL COMMENT '用户名',
`sex` varchar(2) DEFAULT NULL,
`name` varchar(20) DEFAULT NULL,
`level` int(1) NOT NULL DEFAULT '1' COMMENT '会员等级',
`age` int(3) DEFAULT NULL,
PRIMARY KEY (`username`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

/*Data for the table `user_info` */

insert into `user_info`(`username`,`sex`,`name`,`level`,`age`) values
('c','男','C3',1,34),
('lisi','男','张三',1,31),
('wangwu','男','王五',1,29);

/*!40101 SET SQL_MODE=@OLD_SQL_MODE */;
/*!40014 SET FOREIGN_KEY_CHECKS=@OLD_FOREIGN_KEY_CHECKS */;
/*!40014 SET UNIQUE_CHECKS=@OLD_UNIQUE_CHECKS */;
/*!40111 SET SQL_NOTES=@OLD_SQL_NOTES */;

设置日志模式

对于自建 MySQL , 需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,my.cnf 中配置如下

1
2
3
4
[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复

注意:针对阿里云 RDS for MySQL , 默认打开了 binlog , 并且账号默认具有 binlog dump 权限 , 不需要任何权限或者 binlog 设置,可以直接跳过这一步

创建Canal用户

canal的原理是模拟自己为mysql slave,所以这里一定需要做为mysql slave的相关权限

1
2
3
4
CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;

针对已有的账户可直接通过grant

Linux 部署

配置Canal

安装

下载所需版本的Canal:下载地址

解压下载的文件

1
tar -xzf canal.deployer-1.1.5-SNAPSHOT.tar.gz -C /tmp/canal

配置修改

应用参数
1
vi conf/example/instance.properties
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#################################################
## mysql serverId
canal.instance.mysql.slaveId = 1234

#position info,需要改成自己的数据库信息
canal.instance.master.address = 127.0.0.1:3306
canal.instance.master.journal.name =
canal.instance.master.position =
canal.instance.master.timestamp =

#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =

#username/password,需要改成自己的数据库信息
canal.instance.dbUsername = canal

canal.instance.dbPassword = canal
canal.instance.defaultDatabaseName =
canal.instance.connectionCharset = UTF-8

#table regex
canal.instance.filter.regex = .*\..*

#################################################

说明

  • canal.instance.connectionCharset 代表数据库的编码方式对应到java中的编码类型,比如UTF-8,GBK , ISO-8859-1

  • 如果系统是1个cpu,需要将canal.instance.parser.parallel设置为false

启动

1
sh bin/startup.sh

查看日志

1
vi logs/canal/canal.log
1
2
3
2013-02-05 22:45:27.967 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## start the canal server.
2013-02-05 22:45:28.113 [main] INFO com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[10.1.29.120:11111]
2013-02-05 22:45:28.210 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## the canal server is running now ......

具体instance的日志:

1
vi logs/example/example.log
1
2
3
4
2013-02-05 22:50:45.636 [main] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties]
2013-02-05 22:50:45.641 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [example/instance.properties]
2013-02-05 22:50:45.803 [main] INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example
2013-02-05 22:50:45.810 [main] INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start successful....

关闭

1
sh bin/stop.sh

Docker 单机部署

本次使用 canal+mysql+rabbitmq方式进行部署

Docker部署情况

以下是Docker 部署各个服务的一个详情

容器服务IP暴漏端口
Canal172.18.0.2011111
MySQL172.18.0.103306
RabbitMQ172.18.0.30567,215,672

准备工作

MySql配置文件设置

配置mysql配置文件 my.cnf的挂载文件

1
2
3
4
[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
Cana相关配置
配置canal.properties

配置Canal配置文件canal.properties的挂载文件

1
2
3
4
5
6
7
8
9
10
.....
# 指定rabbitmq
canal.serverMode = rabbitmq
...
# rabbitmq 设置
canal.mq.servers = 172.18.0.30 ## 注意不要加端口号,不然会报IPV6错误。
canal.mq.vhost=/
canal.mq.exchange=canal
canal.mq.username=guest
canal.mq.password=guest

完整配置文件如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
#################################################
######### common argument #############
#################################################
# tcp bind ip
canal.ip =
# register ip to zookeeper
canal.register.ip =
canal.port = 11111
canal.metrics.pull.port = 11112
# canal instance user/passwd
# canal.user = canal
# canal.passwd = E3619321C1A937C46A0D8BD1DAC39F93B27D4458

# canal admin config
#canal.admin.manager = 127.0.0.1:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441

canal.zkServers =
# flush data to zk
canal.zookeeper.flush.period = 1000
canal.withoutNetty = false
# tcp, kafka, rocketMQ, rabbitMQ
canal.serverMode = rabbitMQ
# flush meta cursor/parse position to file
canal.file.data.dir = ${canal.conf.dir}
canal.file.flush.period = 1000
## memory store RingBuffer size, should be Math.pow(2,n)
canal.instance.memory.buffer.size = 16384
## memory store RingBuffer used memory unit size , default 1kb
canal.instance.memory.buffer.memunit = 1024
## meory store gets mode used MEMSIZE or ITEMSIZE
canal.instance.memory.batch.mode = MEMSIZE
canal.instance.memory.rawEntry = true

## detecing config
canal.instance.detecting.enable = false
#canal.instance.detecting.sql = insert into retl.xdual values(1,now()) on duplicate key update x=now()
canal.instance.detecting.sql = select 1
canal.instance.detecting.interval.time = 3
canal.instance.detecting.retry.threshold = 3
canal.instance.detecting.heartbeatHaEnable = false

# support maximum transaction size, more than the size of the transaction will be cut into multiple transactions delivery
canal.instance.transaction.size = 1024
# mysql fallback connected to new master should fallback times
canal.instance.fallbackIntervalInSeconds = 60

# network config
canal.instance.network.receiveBufferSize = 16384
canal.instance.network.sendBufferSize = 16384
canal.instance.network.soTimeout = 30

# binlog filter config
canal.instance.filter.druid.ddl = true
canal.instance.filter.query.dcl = false
canal.instance.filter.query.dml = false
canal.instance.filter.query.ddl = false
canal.instance.filter.table.error = false
canal.instance.filter.rows = false
canal.instance.filter.transaction.entry = false

# binlog format/image check
canal.instance.binlog.format = ROW,STATEMENT,MIXED
canal.instance.binlog.image = FULL,MINIMAL,NOBLOB

# binlog ddl isolation
canal.instance.get.ddl.isolation = false

# parallel parser config
canal.instance.parser.parallel = true
## concurrent thread number, default 60% available processors, suggest not to exceed Runtime.getRuntime().availableProcessors()
#canal.instance.parser.parallelThreadSize = 16
## disruptor ringbuffer size, must be power of 2
canal.instance.parser.parallelBufferSize = 256

# table meta tsdb info
canal.instance.tsdb.enable = true
canal.instance.tsdb.dir = ${canal.file.data.dir:../conf}/${canal.instance.destination:}
canal.instance.tsdb.url = jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;
canal.instance.tsdb.dbUsername = canal
canal.instance.tsdb.dbPassword = canal
# dump snapshot interval, default 24 hour
canal.instance.tsdb.snapshot.interval = 24
# purge snapshot expire , default 360 hour(15 days)
canal.instance.tsdb.snapshot.expire = 360

#################################################
######### destinations #############
#################################################
canal.destinations = example
# conf root dir
canal.conf.dir = ../conf
# auto scan instance dir add/remove and start/stop instance
canal.auto.scan = true
canal.auto.scan.interval = 5

canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml
#canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xml

canal.instance.global.mode = spring
canal.instance.global.lazy = false
canal.instance.global.manager.address = ${canal.admin.manager}
#canal.instance.global.spring.xml = classpath:spring/memory-instance.xml
canal.instance.global.spring.xml = classpath:spring/file-instance.xml
#canal.instance.global.spring.xml = classpath:spring/default-instance.xml

##################################################
######### MQ Properties #############
##################################################
# aliyun ak/sk , support rds/mq
canal.aliyun.accessKey =
canal.aliyun.secretKey =
canal.aliyun.uid=

canal.mq.flatMessage = true
canal.mq.canalBatchSize = 50
canal.mq.canalGetTimeout = 100
# Set this value to "cloud", if you want open message trace feature in aliyun.
canal.mq.accessChannel = local

canal.mq.database.hash = true
canal.mq.send.thread.size = 30
canal.mq.build.thread.size = 8

##################################################
######### Kafka #############
##################################################
kafka.bootstrap.servers = 127.0.0.1:9092
kafka.acks = all
kafka.compression.type = none
kafka.batch.size = 16384
kafka.linger.ms = 1
kafka.max.request.size = 1048576
kafka.buffer.memory = 33554432
kafka.max.in.flight.requests.per.connection = 1
kafka.retries = 0

kafka.kerberos.enable = false
kafka.kerberos.krb5.file = "../conf/kerberos/krb5.conf"
kafka.kerberos.jaas.file = "../conf/kerberos/jaas.conf"

##################################################
######### RocketMQ #############
##################################################
rocketmq.producer.group = test
rocketmq.enable.message.trace = false
rocketmq.customized.trace.topic =
rocketmq.namespace =
rocketmq.namesrv.addr = 127.0.0.1:9876
rocketmq.retry.times.when.send.failed = 0
rocketmq.vip.channel.enabled = false

##################################################
######### RabbitMQ #############
##################################################
rabbitmq.host = 172.18.0.30
rabbitmq.virtual.host = /
rabbitmq.exchange = canal
rabbitmq.username = guest
rabbitmq.password = guest
配置 instance.properties

配置Canal的数据源配置文件instance.properties的挂载文件,路径在example/instance.properties

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
#################################################
## mysql serverId , v1.0.26+ will autoGen
## mysql slaveId v1.0.26 后的版本支持自动生成 可以不需要配置
# canal.instance.mysql.slaveId=0

# enable gtid use true/false
canal.instance.gtidon=false

# position info
## 配置连接数据库的地址
canal.instance.master.address=172.18.0.10:3306
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=

# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=

# table meta tsdb info
canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=jdbc:mysql://172.18.0.10:3306/test
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal

#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=

# username/password
# 配置数据库用户名密码
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
# 配置 连接数据库的编码格式
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==

# table regex
## canal 收集表的 过滤正则表达式 这个表示收集所有表数据
canal.instance.filter.regex=.*\\..*
# table black regex
## canal 收集表的黑名单
canal.instance.filter.black.regex=
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch

#################################################
创建Canal日志文件目录

创建一个Canal的日志文件的挂载目录

1
mkdir -p /tmp/data/canal/logs
编写Canal消费者
导入POM文件
1
2
3
4
5
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.4</version>
</dependency>
代码编写
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class CanalConsumer {
public final static String EXCHANGE_NAME = "canal";//direct交换器名称
public final static String DIRECT_QUEUE = "direct_queue";

public static void main(String[] args) throws IOException, TimeoutException {
//创建连接工厂,连接RabbitMQ
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.64.144");//端口号、用户名、密码可以使用默认的
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setPort(5672);
//创建连接
Connection connection = connectionFactory.newConnection();
//创建信道
Channel channel = connection.createChannel();
//在信道中设置交换器
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
//声明队列
channel.queueDeclare(DIRECT_QUEUE, false, false, false, null);
//交换器和队列绑定
channel.queueBind(DIRECT_QUEUE, EXCHANGE_NAME,"");
System.out.println("等待 message.....");
//声明消费者
com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "utf-8");
System.out.println("Received:" + envelope.getRoutingKey() + "========" + message);

}
};
//消费者在指定的对队列上消费
channel.basicConsume(DIRECT_QUEUE, true, consumer);
}
}

Docker 任务编排

编写docker-compose配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
version: '2'   
services:
mysql:
image: mysql:5.7
hostname: mysql
container_name: mysql
networks:
docker-network:
ipv4_address: 172.18.0.10
ports:
- "3306:3306"
environment:
MYSQL_ROOT_PASSWORD: root
volumes:
- "/tmp/etc/mysql:/etc/mysql/conf.d"
- "/tmp/data/mysql:/var/lib/mysql"
rabbitMQ:
image: rabbitmq:management
hostname: rabbitMQ
container_name: rabbitMQ
networks:
docker-network:
ipv4_address: 172.18.0.30
ports:
- "5672:5672"
- "15672:15672"
canal:
image: canal/canal-server
hostname: canal
container_name: canal
restart: always
networks:
docker-network:
ipv4_address: 172.18.0.20
ports:
- "11111:11111"
volumes:
- "/tmp/etc/canal/canal.properties:/home/admin/canal-server/conf/canal.properties"
- "/tmp/etc/canal/example:/home/admin/canal-server/conf/example"
- "/tmp/data/canal/logs:/home/admin/canal-server/logs"
depends_on:
- mysql
- rabbitMQ

networks:
docker-network:
ipam:
config:
- subnet: 172.18.0.0/16
gateway: 172.18.0.1

启动Docker

1
docker-compose up -d

检查环境

检查MySQL

可以使用远程工具连接MySql检查是否能够正常连接

Mysql初始化

参考上面的【Mysql初始化】

检查RabbitMQ

登录RabbitMQ管理界面检查是否能够登录

检查Canal服务

因为是启动后才初始化的 mysql用户,Canal启动连接数据库会失败,因为配置了restart: always,docker容器会不断重启重试。

1
tail -f /tmp/data/canal/logs/canal/canal.log

出现如下日志表示Canal 启动成功

测试服务

启动消费者测试

修改一条数据检查能够监听到

1
2
等待 message.....
Received:========{"data":[{"username":"c","sex":"男","name":"C4","level":"1","age":"34"}],"database":"test","es":1604310855000,"id":3,"isDdl":false,"mysqlType":{"username":"varchar(20)","sex":"varchar(2)","name":"varchar(20)","level":"int(1)","age":"int(3)"},"old":[{"name":"C3"}],"pkNames":["username"],"sql":"","sqlType":{"username":12,"sex":12,"name":12,"level":4,"age":4},"table":"user_info","ts":1604310855785,"type":"UPDATE"}

可以测试通过

关闭消费者测试

关闭消费者,修改数据后,检查修改后数据是否推送到了MQ

发现有数据堆积,启动消费者后就可以消费

Docker 集群部署

Docker部署情况

以下是Docker 部署各个服务的一个详情

容器服务IP暴漏端口
Canal01172.18.0.2011111
Canal02172.18.0.2111112
zookeeper172.18.0.52181
MySQL172.18.0.103306
RabbitMQ172.18.0.30567,215,672

准备工作

安装Zookeeper

​ Canal和Kafka集群都依赖于Zookeeper做服务协调,为了方便管理,一般会独立部署Zookeeper服务或者Zookeeper集群。

修改配置文件

参考下Docker单机版【Cana相关配置】

canal.properties修改

因为canal集群需要修改canal的配置

1
2
## canal zk配置,集群用逗号分隔
canal.zkServers=172.18.0.5:2181
编写Canal消费者

参考上面配置

Docker 任务编排

编写docker-compose配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
version: '2'   
services:
zookeeper:
image: zookeeper
container_name: zookeeper
privileged: true
networks:
docker_network:
ipv4_address: 172.18.0.5
ports:
- "2181:2181"
mysql:
image: mysql:5.7
hostname: mysql
container_name: mysql
networks:
docker-network:
ipv4_address: 172.18.0.10
ports:
- "3306:3306"
environment:
MYSQL_ROOT_PASSWORD: root
volumes:
- "/tmp/etc/mysql:/etc/mysql/conf.d"
- "/tmp/data/mysql:/var/lib/mysql"
rabbitMQ:
image: rabbitmq:management
hostname: rabbitMQ
container_name: rabbitMQ
networks:
docker-network:
ipv4_address: 172.18.0.30
ports:
- "5672:5672"
- "15672:15672"
canal:
image: canal/canal-server
hostname: canal01
container_name: canal01
restart: always
networks:
docker-network:
ipv4_address: 172.18.0.20
ports:
- "11111:11111"
volumes:
- "/tmp/etc/canal/canal.properties:/home/admin/canal-server/conf/canal.properties"
- "/tmp/etc/canal/example:/home/admin/canal-server/conf/example"
- "/tmp/logs/canal01:/home/admin/canal-server/logs"
depends_on:
- mysql
- rabbitMQ
- zookeeper
canal:
image: canal/canal-server
hostname: canal02
container_name: canal02
restart: always
networks:
docker-network:
ipv4_address: 172.18.0.21
ports:
- "11112:11111"
volumes:
- "/tmp/etc/canal/canal.properties:/home/admin/canal-server/conf/canal.properties"
- "/tmp/etc/canal/example:/home/admin/canal-server/conf/example"
- "/tmp/logs/canal02:/home/admin/canal-server/logs"
depends_on:
- mysql
- rabbitMQ
- zookeeper

networks:
docker-network:
ipam:
config:
- subnet: 172.18.0.0/16
gateway: 172.18.0.1

启动Docker

1
docker-compose up -d

检查环境

参考上文检查环境

测试服务

启动消费者测试

修改一条数据检查能够监听到

1
2
等待 message.....
Received:========{"data":[{"username":"c","sex":"男","name":"C1","level":"1","age":"34"}],"database":"test","es":1604385903000,"id":5,"isDdl":false,"mysqlType":{"username":"varchar(20)","sex":"varchar(2)","name":"varchar(20)","level":"int(1)","age":"int(3)"},"old":[{"name":"C6"}],"pkNames":["username"],"sql":"","sqlType":{"username":12,"sex":12,"name":12,"level":4,"age":4},"table":"user_info","ts":1604385903480,"type":"UPDATE"}

可以测试通过

关闭消费者测试

关闭消费者,修改数据后,检查修改后数据是否推送到了MQ

发现有数据堆积,启动消费者后就可以消费

破坏性测试
停掉Canal01

停掉一个Canal01进行测试

1
2
>docker-compose stop canal01
Stopping canal01 ... done

测试是否可以正常监听到

停掉Canal02

关闭第二台Canal

1
2
>docker-compose stop canal02
Stopping canal02 ... done

修改数据不能够进行同步

启动Canal01

启动第一台Canal

1
docker-compose start canal01

修改数据发现能够正常同步

启动Canal02
1
docker-compose start canal01

修改数据发现能够正常同步

 评论