一、简介
Canal是Java开发的基于数据库增量日志解析工具,提供增量数据订阅和消费,目前主要支持MySQL。它的工作原理比较简单,就是将自己伪装成一个MySQL Slave,从Master同步数据。
- canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
- MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
- canal 解析 binary log 对象(原始为 byte 流)
- 解析后的数据再做后续处理
先了解一下MySQL主备复制原理:
- MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)
- MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
- MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据
二、准备工作
- 对于自建的MySQL,需要开启binlog写入功能,并配置biglog-format为ROW模式,这样才能一行行的检测到数据的变动
1 | [mysqld] |
- 创建canal用户并授权其作为MySQL Slave的权限
1 | CREATE USER canal IDENTIFIED BY 'canal'; |
三、启动Canal服务
-
下载canal
我们这里使用的是canal.deployer-1.1.4.tar.gz
-
解压
1
2tar zxvf canal.deployer-1.1.4.tar.gz
cd canal.deployer-1.1.4 -
修改配置文件,主要是MySQL Master的连接信息
打开文件
conf/example/instance.properties
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58################################################
# mysql serverId , v1.0.26+ will autoGen
canal.instance.mysql.slaveId=1
enable gtid use true/false
canal.instance.gtidon=false
position info
canal.instance.master.address=127.0.0.1:3306
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=
rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=
table meta tsdb info
canal.instance.tsdb.enable=true
canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
canal.instance.tsdb.dbUsername=canal
canal.instance.tsdb.dbPassword=canal
canal.instance.standby.address =
canal.instance.standby.journal.name =
canal.instance.standby.position =
canal.instance.standby.timestamp =
canal.instance.standby.gtid=
username/password
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.defaultDatabaseName=test_canal
canal.instance.connectionCharset = UTF-8
enable druid Decrypt database password
canal.instance.enableDruid=false
canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==
table regex
canal.instance.filter.regex=.*\\..*
table black regex
canal.instance.filter.black.regex=
table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch
mq config
canal.mq.topic=example
dynamic topic route by schema or table regex
canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
canal.mq.partition=0
hash partition config
canal.mq.partitionsNum=3
canal.mq.partitionHash=test.table:id^name,.*\\..*
################################################- canal.instance.connectionCharset 代表数据库的编码方式对应到 java 中的编码类型,比如 UTF-8,GBK , ISO-8859-1
- 如果系统是1个 cpu,需要将 canal.instance.parser.parallel 设置为 false
-
启动canal
1
sh bin/startup.sh
-
查看server和instance日志
1
2
3
4
52020-04-23 23:48:59.048 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## set default uncaught exception handler
2020-04-23 23:48:59.097 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## load canal configurations
2020-04-23 23:48:59.110 [main] INFO com.alibaba.otter.canal.deployer.CanalStarter - ## start the canal server.
2020-04-23 23:48:59.178 [main] INFO com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[192.168.0.106(192.168.0.106):11111]
2020-04-23 23:49:00.243 [main] INFO com.alibaba.otter.canal.deployer.CanalStarter - ## the canal server is running now ......1
2
3
4
5
6
7er.setConnectionCharset(java.lang.String)]
2020-04-23 23:48:59.793 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties]
2020-04-23 23:48:59.794 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [example/instance.properties]
2020-04-23 23:49:00.192 [main] INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example
2020-04-23 23:49:00.199 [main] WARN c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table filter : ^.*\..*$
2020-04-23 23:49:00.199 [main] WARN c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table black filter :
2020-04-23 23:49:00.206 [main] INFO c.a.otter.canal.instance.core.AbstractCanalInstance - start successful.... -
关闭
1
sh bin/stop.sh
四、创建Java工程
-
引入canal和redis包
1
2
3
4
5
6
7
8
9
10
11<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.4</version>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.4.2</version>
</dependency> -
创建RedisUtils工具类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
public class RedisUtils {
// Redis服务器IP
private static String ADDR = "127.0.0.1";
// Redis的端口号
private static int PORT = 6379;
// 访问密码
private static String AUTH = "";
// 可用连接实例的最大数目,默认值为8;
// 如果赋值为-1,则表示不限制;如果pool已经分配了maxActive个jedis实例,则此时pool的状态为exhausted(耗尽)。
private static int MAX_ACTIVE = 1024;
// 控制一个pool最多有多少个状态为idle(空闲的)的jedis实例,默认值也是8。
private static int MAX_IDLE = 200;
// 等待可用连接的最大时间,单位毫秒,默认值为-1,表示永不超时。如果超过等待时间,则直接抛出JedisConnectionException;
private static int MAX_WAIT = 10000;
// 过期时间
protected static int expireTime = 60 * 60 * 24;
// 连接池
protected static JedisPool pool;
/**
* 静态代码,只在初次调用一次
*/
static {
JedisPoolConfig config = new JedisPoolConfig();
//最大连接数
config.setMaxTotal(MAX_ACTIVE);
//最多空闲实例
config.setMaxIdle(MAX_IDLE);
//超时时间
config.setMaxWaitMillis(MAX_WAIT);
//
config.setTestOnBorrow(false);
pool = new JedisPool(config, ADDR, PORT, 1000);
}
/**
* 获取jedis实例
*/
protected static synchronized Jedis getJedis() {
Jedis jedis = null;
try {
jedis = pool.getResource();
} catch (Exception e) {
e.printStackTrace();
if (jedis != null) {
pool.returnBrokenResource(jedis);
}
}
return jedis;
}
/**
* 释放jedis资源
*
* @param jedis
* @param isBroken
*/
protected static void closeResource(Jedis jedis, boolean isBroken) {
try {
if (isBroken) {
pool.returnBrokenResource(jedis);
} else {
pool.returnResource(jedis);
}
} catch (Exception e) {
}
}
/**
* 是否存在key
*
* @param key
*/
public static boolean existKey(String key) {
Jedis jedis = null;
boolean isBroken = false;
try {
jedis = getJedis();
jedis.select(0);
return jedis.exists(key);
} catch (Exception e) {
isBroken = true;
} finally {
closeResource(jedis, isBroken);
}
return false;
}
/**
* 删除key
*
* @param key
*/
public static void delKey(String key) {
Jedis jedis = null;
boolean isBroken = false;
try {
jedis = getJedis();
jedis.select(0);
jedis.del(key);
} catch (Exception e) {
isBroken = true;
} finally {
closeResource(jedis, isBroken);
}
}
/**
* 取得key的值
*
* @param key
*/
public static String stringGet(String key) {
Jedis jedis = null;
boolean isBroken = false;
String lastVal = null;
try {
jedis = getJedis();
jedis.select(0);
lastVal = jedis.get(key);
jedis.expire(key, expireTime);
} catch (Exception e) {
isBroken = true;
} finally {
closeResource(jedis, isBroken);
}
return lastVal;
}
/**
* 添加string数据
*
* @param key
* @param value
*/
public static String stringSet(String key, String value) {
Jedis jedis = null;
boolean isBroken = false;
String lastVal = null;
try {
jedis = getJedis();
jedis.select(0);
lastVal = jedis.set(key, value);
jedis.expire(key, expireTime);
} catch (Exception e) {
e.printStackTrace();
isBroken = true;
} finally {
closeResource(jedis, isBroken);
}
return lastVal;
}
/**
* 添加hash数据
*
* @param key
* @param field
* @param value
*/
public static void hashSet(String key, String field, String value) {
boolean isBroken = false;
Jedis jedis = null;
try {
jedis = getJedis();
if (jedis != null) {
jedis.select(0);
jedis.hset(key, field, value);
jedis.expire(key, expireTime);
}
} catch (Exception e) {
isBroken = true;
} finally {
closeResource(jedis, isBroken);
}
}
} -
创建CanalClient
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98import java.net.InetSocketAddress;
import java.util.List;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.CanalEntry.*;
public class CanalClient {
public static void main(String args[]) {
// 创建链接
CanalConnector connector = CanalConnectors
.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(), 11111), "example", "", "");
int batchSize = 1000;
try {
connector.connect();
connector.subscribe(".*\\..*");
connector.rollback();
while (true) {
Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
}
} else {
printEntry(message.getEntries());
}
connector.ack(batchId); // 提交确认
}
} finally {
connector.disconnect();
}
}
private static void printEntry(List<Entry> entrys) {
for (Entry entry : entrys) {
if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN
|| entry.getEntryType() == EntryType.TRANSACTIONEND) {
continue;
}
RowChange rowChage = null;
try {
rowChage = RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
e);
}
EventType eventType = rowChage.getEventType();
for (RowData rowData : rowChage.getRowDatasList()) {
if (eventType == EventType.DELETE) {
// 删除,获取删除前的数据
redisDelete(rowData.getBeforeColumnsList());
} else if (eventType == EventType.INSERT) {
// 新增,获取新增后的数据
redisSet(rowData.getAfterColumnsList());
} else {
// 修改,获取修改后的数据
// printColumn(rowData.getBeforeColumnsList());
redisSet(rowData.getAfterColumnsList());
}
}
}
}
private static void redisSet(List<Column> columns) {
JSONObject json = new JSONObject();
// 字段和值的列表,放入json,后续作为redis的值
for (Column column : columns) {
json.put(column.getName(), column.getValue());
}
if (columns.size() > 0) {
System.out.println("set key: " + columns.get(0).getValue() + ", value: " + json.toJSONString());
// 获取第一列主键的数据
RedisUtils.stringSet("demo:" + columns.get(0).getValue(), json.toJSONString());
}
}
private static void redisDelete(List<Column> columns) {
JSONObject json = new JSONObject();
for (Column column : columns) {
json.put(column.getName(), column.getValue());
}
if (columns.size() > 0) {
System.out.println("delete key: " + columns.get(0).getValue());
RedisUtils.delKey("demo:" + columns.get(0).getValue());
}
}
}
五、验证
-
启动CanalClient.java
-
创建数据表test_canal
1
2
3
4
5
6
7
8CREATE TABLE `test_canal` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`name` varchar(11) DEFAULT NULL,
`age` int(11) DEFAULT 0,
`sex` varchar(11) DEFAULT '男',
PRIMARY KEY (`id`),
KEY `idx_name` (`name`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4 -
校验一下Redis中目前是否为空
-
插入数据
- 插入三条数据
1
2
3INSERT INTO test_canal(name, age, sex) VALUES('cc0', 3, '男');
INSERT INTO test_canal(name, age, sex) VALUES('cc1', 6, '女');
INSERT INTO test_canal(name, age, sex) VALUES('cc2', 9, '男');- 查看控制台
- 查询Redis中的数据
验证成功,数据同步达到了毫秒级。
-
修改数据
- 修改id=5的数据
1
UPDATE test_canal SET sex='11' WHERE id=5;
- 查看控制台
- 查询Redis中的数据
验证成功,Redis中的sex列已经修改为何MySQL表中一致了
-
删除数据
- 删除id=5的数据
1
DELETE FROM test_canal WHERE id=5;
- 查看控制台
- 查询Redis中的数据
验证成功,Redis中的"demo:5"已经被删除了
总结
关于更多的Canal介绍可以查看官方文档:Canal官方介绍,源端目前只支持MySQL,目标端可以是任意组件