使用Canal实现数据库和Redis同步

一、简介

Canal是Java开发的基于数据库增量日志解析工具,提供增量数据订阅和消费,目前主要支持MySQL。它的工作原理比较简单,就是将自己伪装成一个MySQL Slave,从Master同步数据。

  • canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
  • MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
  • canal 解析 binary log 对象(原始为 byte 流)
  • 解析后的数据再做后续处理

img

先了解一下MySQL主备复制原理:

  • MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)
  • MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
  • MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据

二、准备工作

  1. 对于自建的MySQL,需要开启binlog写入功能,并配置biglog-format为ROW模式,这样才能一行行的检测到数据的变动
1
2
3
4
[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
  1. 创建canal用户并授权其作为MySQL Slave的权限
1
2
3
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;

三、启动Canal服务

  1. 下载canal

    我们这里使用的是canal.deployer-1.1.4.tar.gz

  2. 解压

    1
    2
    tar zxvf canal.deployer-1.1.4.tar.gz
    cd canal.deployer-1.1.4
    image-20200424004844430
  3. 修改配置文件,主要是MySQL Master的连接信息

    打开文件conf/example/instance.properties

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    #################################################
    ## mysql serverId , v1.0.26+ will autoGen
    canal.instance.mysql.slaveId=1

    # enable gtid use true/false
    canal.instance.gtidon=false

    # position info
    canal.instance.master.address=127.0.0.1:3306
    canal.instance.master.journal.name=
    canal.instance.master.position=
    canal.instance.master.timestamp=
    canal.instance.master.gtid=

    # rds oss binlog
    canal.instance.rds.accesskey=
    canal.instance.rds.secretkey=
    canal.instance.rds.instanceId=

    # table meta tsdb info
    canal.instance.tsdb.enable=true
    #canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
    #canal.instance.tsdb.dbUsername=canal
    #canal.instance.tsdb.dbPassword=canal

    #canal.instance.standby.address =
    #canal.instance.standby.journal.name =
    #canal.instance.standby.position =
    #canal.instance.standby.timestamp =
    #canal.instance.standby.gtid=

    # username/password
    canal.instance.dbUsername=canal
    canal.instance.dbPassword=canal
    canal.instance.defaultDatabaseName=test_canal
    canal.instance.connectionCharset = UTF-8
    # enable druid Decrypt database password
    canal.instance.enableDruid=false
    #canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==

    # table regex
    canal.instance.filter.regex=.*\\..*
    # table black regex
    canal.instance.filter.black.regex=
    # table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
    #canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
    # table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
    #canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch

    # mq config
    # canal.mq.topic=example
    # dynamic topic route by schema or table regex
    # canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
    # canal.mq.partition=0
    # hash partition config
    # canal.mq.partitionsNum=3
    # canal.mq.partitionHash=test.table:id^name,.*\\..*
    #################################################
    • canal.instance.connectionCharset 代表数据库的编码方式对应到 java 中的编码类型,比如 UTF-8,GBK , ISO-8859-1
    • 如果系统是1个 cpu,需要将 canal.instance.parser.parallel 设置为 false
  4. 启动canal

    1
    sh bin/startup.sh
  5. 查看server和instance日志

    1
    2
    3
    4
    5
    2020-04-23 23:48:59.048 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## set default uncaught exception handler
    2020-04-23 23:48:59.097 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## load canal configurations
    2020-04-23 23:48:59.110 [main] INFO com.alibaba.otter.canal.deployer.CanalStarter - ## start the canal server.
    2020-04-23 23:48:59.178 [main] INFO com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[192.168.0.106(192.168.0.106):11111]
    2020-04-23 23:49:00.243 [main] INFO com.alibaba.otter.canal.deployer.CanalStarter - ## the canal server is running now ......
    1
    2
    3
    4
    5
    6
    7
    er.setConnectionCharset(java.lang.String)]
    2020-04-23 23:48:59.793 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties]
    2020-04-23 23:48:59.794 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [example/instance.properties]
    2020-04-23 23:49:00.192 [main] INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example
    2020-04-23 23:49:00.199 [main] WARN c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table filter : ^.*\..*$
    2020-04-23 23:49:00.199 [main] WARN c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table black filter :
    2020-04-23 23:49:00.206 [main] INFO c.a.otter.canal.instance.core.AbstractCanalInstance - start successful....
  6. 关闭

    1
    sh bin/stop.sh

四、创建Java工程

  1. 引入canal和redis包

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    <dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.client</artifactId>
    <version>1.1.4</version>
    </dependency>

    <dependency>
    <groupId>redis.clients</groupId>
    <artifactId>jedis</artifactId>
    <version>2.4.2</version>
    </dependency>
  2. 创建RedisUtils工具类

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
    143
    144
    145
    146
    147
    148
    149
    150
    151
    152
    153
    154
    155
    156
    157
    158
    159
    160
    161
    162
    163
    164
    165
    166
    167
    168
    169
    170
    171
    172
    173
    174
    175
    176
    177
    178
    179
    180
    181
    182
    183
    184
    185
    186
    187
    188
    189
    190
    191
    import redis.clients.jedis.Jedis;
    import redis.clients.jedis.JedisPool;
    import redis.clients.jedis.JedisPoolConfig;

    public class RedisUtils {

    // Redis服务器IP
    private static String ADDR = "127.0.0.1";

    // Redis的端口号
    private static int PORT = 6379;

    // 访问密码
    private static String AUTH = "";

    // 可用连接实例的最大数目,默认值为8;
    // 如果赋值为-1,则表示不限制;如果pool已经分配了maxActive个jedis实例,则此时pool的状态为exhausted(耗尽)。
    private static int MAX_ACTIVE = 1024;

    // 控制一个pool最多有多少个状态为idle(空闲的)的jedis实例,默认值也是8。
    private static int MAX_IDLE = 200;

    // 等待可用连接的最大时间,单位毫秒,默认值为-1,表示永不超时。如果超过等待时间,则直接抛出JedisConnectionException;
    private static int MAX_WAIT = 10000;

    // 过期时间
    protected static int expireTime = 60 * 60 * 24;

    // 连接池
    protected static JedisPool pool;

    /**
    * 静态代码,只在初次调用一次
    */
    static {
    JedisPoolConfig config = new JedisPoolConfig();
    //最大连接数
    config.setMaxTotal(MAX_ACTIVE);
    //最多空闲实例
    config.setMaxIdle(MAX_IDLE);
    //超时时间
    config.setMaxWaitMillis(MAX_WAIT);
    //
    config.setTestOnBorrow(false);
    pool = new JedisPool(config, ADDR, PORT, 1000);
    }

    /**
    * 获取jedis实例
    */
    protected static synchronized Jedis getJedis() {
    Jedis jedis = null;
    try {
    jedis = pool.getResource();
    } catch (Exception e) {
    e.printStackTrace();
    if (jedis != null) {
    pool.returnBrokenResource(jedis);
    }
    }
    return jedis;
    }

    /**
    * 释放jedis资源
    *
    * @param jedis
    * @param isBroken
    */
    protected static void closeResource(Jedis jedis, boolean isBroken) {
    try {
    if (isBroken) {
    pool.returnBrokenResource(jedis);
    } else {
    pool.returnResource(jedis);
    }
    } catch (Exception e) {

    }
    }

    /**
    * 是否存在key
    *
    * @param key
    */
    public static boolean existKey(String key) {
    Jedis jedis = null;
    boolean isBroken = false;
    try {
    jedis = getJedis();
    jedis.select(0);
    return jedis.exists(key);
    } catch (Exception e) {
    isBroken = true;
    } finally {
    closeResource(jedis, isBroken);
    }
    return false;
    }

    /**
    * 删除key
    *
    * @param key
    */
    public static void delKey(String key) {
    Jedis jedis = null;
    boolean isBroken = false;
    try {
    jedis = getJedis();
    jedis.select(0);
    jedis.del(key);
    } catch (Exception e) {
    isBroken = true;
    } finally {
    closeResource(jedis, isBroken);
    }
    }

    /**
    * 取得key的值
    *
    * @param key
    */
    public static String stringGet(String key) {
    Jedis jedis = null;
    boolean isBroken = false;
    String lastVal = null;
    try {
    jedis = getJedis();
    jedis.select(0);
    lastVal = jedis.get(key);
    jedis.expire(key, expireTime);
    } catch (Exception e) {
    isBroken = true;
    } finally {
    closeResource(jedis, isBroken);
    }
    return lastVal;
    }

    /**
    * 添加string数据
    *
    * @param key
    * @param value
    */
    public static String stringSet(String key, String value) {
    Jedis jedis = null;
    boolean isBroken = false;
    String lastVal = null;
    try {
    jedis = getJedis();
    jedis.select(0);
    lastVal = jedis.set(key, value);
    jedis.expire(key, expireTime);
    } catch (Exception e) {
    e.printStackTrace();
    isBroken = true;
    } finally {
    closeResource(jedis, isBroken);
    }
    return lastVal;
    }

    /**
    * 添加hash数据
    *
    * @param key
    * @param field
    * @param value
    */
    public static void hashSet(String key, String field, String value) {
    boolean isBroken = false;
    Jedis jedis = null;
    try {
    jedis = getJedis();
    if (jedis != null) {
    jedis.select(0);
    jedis.hset(key, field, value);
    jedis.expire(key, expireTime);
    }
    } catch (Exception e) {
    isBroken = true;
    } finally {
    closeResource(jedis, isBroken);
    }
    }

    }
  3. 创建CanalClient

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    import java.net.InetSocketAddress;
    import java.util.List;

    import com.alibaba.fastjson.JSONObject;
    import com.alibaba.otter.canal.client.CanalConnector;
    import com.alibaba.otter.canal.client.CanalConnectors;
    import com.alibaba.otter.canal.common.utils.AddressUtils;
    import com.alibaba.otter.canal.protocol.Message;
    import com.alibaba.otter.canal.protocol.CanalEntry.*;

    public class CanalClient {

    public static void main(String args[]) {
    // 创建链接
    CanalConnector connector = CanalConnectors
    .newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(), 11111), "example", "", "");
    int batchSize = 1000;
    try {
    connector.connect();
    connector.subscribe(".*\\..*");
    connector.rollback();
    while (true) {
    Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
    long batchId = message.getId();
    int size = message.getEntries().size();
    if (batchId == -1 || size == 0) {
    try {
    Thread.sleep(500);
    } catch (InterruptedException e) {
    }
    } else {
    printEntry(message.getEntries());
    }
    connector.ack(batchId); // 提交确认
    }
    } finally {
    connector.disconnect();
    }
    }

    private static void printEntry(List<Entry> entrys) {
    for (Entry entry : entrys) {
    if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN
    || entry.getEntryType() == EntryType.TRANSACTIONEND) {
    continue;
    }

    RowChange rowChage = null;
    try {
    rowChage = RowChange.parseFrom(entry.getStoreValue());
    } catch (Exception e) {
    throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
    e);
    }

    EventType eventType = rowChage.getEventType();

    for (RowData rowData : rowChage.getRowDatasList()) {
    if (eventType == EventType.DELETE) {
    // 删除,获取删除前的数据
    redisDelete(rowData.getBeforeColumnsList());
    } else if (eventType == EventType.INSERT) {
    // 新增,获取新增后的数据
    redisSet(rowData.getAfterColumnsList());
    } else {
    // 修改,获取修改后的数据
    // printColumn(rowData.getBeforeColumnsList());
    redisSet(rowData.getAfterColumnsList());
    }
    }
    }
    }

    private static void redisSet(List<Column> columns) {
    JSONObject json = new JSONObject();
    // 字段和值的列表,放入json,后续作为redis的值
    for (Column column : columns) {
    json.put(column.getName(), column.getValue());
    }
    if (columns.size() > 0) {
    System.out.println("set key: " + columns.get(0).getValue() + ", value: " + json.toJSONString());
    // 获取第一列主键的数据
    RedisUtils.stringSet("demo:" + columns.get(0).getValue(), json.toJSONString());
    }
    }

    private static void redisDelete(List<Column> columns) {
    JSONObject json = new JSONObject();
    for (Column column : columns) {
    json.put(column.getName(), column.getValue());
    }
    if (columns.size() > 0) {
    System.out.println("delete key: " + columns.get(0).getValue());
    RedisUtils.delKey("demo:" + columns.get(0).getValue());
    }
    }

    }

五、验证

  1. 启动CanalClient.java

  2. 创建数据表test_canal

    1
    2
    3
    4
    5
    6
    7
    8
    CREATE TABLE `test_canal` (
    `id` int(11) NOT NULL AUTO_INCREMENT,
    `name` varchar(11) DEFAULT NULL,
    `age` int(11) DEFAULT 0,
    `sex` varchar(11) DEFAULT '男',
    PRIMARY KEY (`id`),
    KEY `idx_name` (`name`) USING BTREE
    ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4
  3. 校验一下Redis中目前是否为空

    image-20200424011739125

  4. 插入数据

    • 插入三条数据
    1
    2
    3
    INSERT INTO test_canal(name, age, sex) VALUES('cc0', 3, '男');
    INSERT INTO test_canal(name, age, sex) VALUES('cc1', 6, '女');
    INSERT INTO test_canal(name, age, sex) VALUES('cc2', 9, '男');
    • 查看控制台

    image-20200424011949902

    • 查询Redis中的数据
    image-20200424012034154

    验证成功,数据同步达到了毫秒级。

  5. 修改数据

    • 修改id=5的数据
    1
    UPDATE test_canal SET sex='11' WHERE id=5;
    • 查看控制台
    image-20200424012346073
    • 查询Redis中的数据
    image-20200424012421781

    验证成功,Redis中的sex列已经修改为何MySQL表中一致了

  6. 删除数据

    • 删除id=5的数据
    1
    DELETE FROM test_canal WHERE id=5;
    • 查看控制台
    image-20200424012700399
    • 查询Redis中的数据
    image-20200424013951523

    验证成功,Redis中的"demo:5"已经被删除了

总结

关于更多的Canal介绍可以查看官方文档:Canal官方介绍,源端目前只支持MySQL,目标端可以是任意组件