阿里canal安装和简介及canal-php使用

本文主要记录安装canal和canal-php链接canal使用!

canal简介

在这里插入图片描述
canal [kə’næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费

早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigger 获取增量变更。从 2010 年开始,业务逐步尝试数据库日志解析获取增量变更进行同步,由此衍生出了大量的数据库增量订阅和消费业务。

基于日志增量订阅和消费的业务包括

  • 数据库镜像

  • 数据库实时备份

  • 索引构建和实时维护(拆分异构索引、倒排索引等)

  • 业务 cache 刷新

  • 带业务逻辑的增量数据处理

  • 当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x

    工作原理

    1、MySQL主备复制原理
    在这里插入图片描述

  • MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)

  • MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)

  • MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据

2、canal 工作原理

  • canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议

  • MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )

  • canal 解析 binary log 对象(原始为 byte 流)

    准备

  • 对于自建 MySQL , 需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,my.cnf 中配置如下

1
2
3
4
[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重

注意:针对阿里云 RDS for MySQL , 默认打开了 binlog , 并且账号默认具有 binlog dump 权限 , 不需要任何权限或者 binlog 设置,可以直接跳过这一步

  • 授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant
1
2
3
4
CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;

安装Canal并配置

  • 下载 canal, 访问 release 页面 , 选择需要的包下载, 如以 1.1.7 版本为例(服务器下载慢可以本地下载好上传至服务器)
1
wget https://github.com/alibaba/canal/releases/download/canal-1.1.7-alpha-2/canal.deployer-1.1.7-SNAPSHOT.tar.gz
  • 解压缩
1
2
mkdir /tmp/canal
tar zxvf canal.deployer-$version.tar.gz -C /tmp/canal
  • 配置修改

    1、首先获取mysql binlog日志名称binlog偏移量 参数值
    命令:

    1
    show master status

    截图:
    在这里插入图片描述
    2、修改canal配置

    1
    vim conf/example/instance.properties
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    ## mysql serverId
    canal.instance.mysql.slaveId = 1234
    #position info,需要改成自己的数据库信息
    canal.instance.master.address = 127.0.0.1:3306
    # binlog日志名称
    canal.instance.master.journal.name=mysql-bin.000009
    # mysql主库链接时起始的binlog偏移量
    canal.instance.master.position = 154
    canal.instance.master.timestamp =
    #canal.instance.standby.address =
    #canal.instance.standby.journal.name =
    #canal.instance.standby.position =
    #canal.instance.standby.timestamp =
    #username/password,需要改成自己的数据库信息(上面授权的canal账号)
    canal.instance.dbUsername = canal
    canal.instance.dbPassword = canal
    canal.instance.defaultDatabaseName =
    canal.instance.connectionCharset = UTF-8
    #table regex
    canal.instance.filter.regex = .\*\\\\..\*

    3、启动

    1
    sh bin/startup.sh
    • 查看 server 日志
      1
      cat logs/canal/canal.log
      1
      2
      3
      4
      5
      2023-03-28 20:23:27.202 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## set default uncaught exception handler
      2023-03-28 20:23:27.211 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## load canal configurations
      2023-03-28 20:23:27.223 [main] INFO com.alibaba.otter.canal.deployer.CanalStarter - ## start the canal server.
      2023-03-28 20:23:27.263 [main] INFO com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[172.17.0.1(172.17.0.1):11111]
      2023-03-28 20:23:28.763 [main] INFO com.alibaba.otter.canal.deployer.CanalStarter - ## the canal server is running now ......
    • 查看 instance 的日志
      1
      cat logs/example/example.log
      1
      2
      3
      4
      2023-03-28 20:23:28.710 [main] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example 
      2023-03-28 20:23:28.720 [main] WARN c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table filter : ^.*\..*$
      2023-03-28 20:23:28.720 [main] WARN c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table black filter : ^mysql\.slave_.*$
      2023-03-28 20:23:28.724 [main] INFO c.a.otter.canal.instance.core.AbstractCanalInstance - start successful....

    4、关闭

    1
    sh bin/stop.sh

php客户端 canal-php 链接Canal 测试

使用canal php 客户端: canal-php

1、构建canal php客户端

1
2
3
4
5
$ composer require xingwenge/canal_php
or
$ git clone https://github.com/xingwenge/canal-php.git
$ cd canal-php
$ composer update

2、建立与Canal的连接

src/sample/client.php 文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
try {
$client = CanalConnectorFactory::createClient(CanalClient::TYPE_SOCKET_CLUE);
# $client = CanalConnectorFactory::createClient(CanalClient::TYPE_SWOOLE);

$client->connect("127.0.0.1", 11111);
$client->checkValid();
$client->subscribe("1001", "example", ".*\\..*"); # 不设置过滤
# $client->subscribe("1001", "example", "db_name.tb_name"); # 设置过滤(指定db_name数据库下的tb_name表)
# $client->subscribe("1001", "example", "test\..*"); # 设置过滤(指定test数据库)

while (true) {
$message = $client->get(100);
if ($entries = $message->getEntries()) {
foreach ($entries as $entry) {
Fmt::println($entry);
}
}
sleep(1);
}

$client->disConnect();
} catch (\Exception $e) {
echo $e->getMessage(), PHP_EOL;
}

3、测试
首先运行client.php:

1
php src/sample/client.php

其次,我们执行sql修改一条数据:
UPDATE test222.user SET name = ‘李四888’ WHERE id = 2 AND name = ‘李四’ LIMIT 1

控制台可以看到如下信息:
在这里插入图片描述
至此,canal-php链接canal成功并拿到对应的binlog日志变更记录,可以进行业务逻辑操作。

总结:

Canal的好处在于对业务代码没有侵入,因为是基于监听binlog日志去进行同步数据的。实时性也能做到准实时,其实是很多企业一种比较常见的数据同步的方案。

通过上面的学习之后,我们应该都明白canal如何安装和canal-php用法。实际上这仅仅只是入门,因为实际项目中我们不是这样玩的…

实际项目我们是配置MQ模式,配合RocketMQ或者Kafka,canal会把数据发送到MQ的topic中,然后通过消息队列的消费者进行处理。