通过canal实现把MySQL数据实时增量到kafka

2023-06-15,,

说明:我们有一个业务需要把mysql中一些表实时同步到大数据集群hbase上面,我们先通过sqoop把表中数据全量导入到hbase中,然后再通过canal定位的某个binlog的position,来实现增量同步,canal官网提供了java/go接口,直接写入到Kafka,然后通过sparkstreaming实时写入到hbase中

一. 通过sqoop把mysql表中的数据全量导入到hbase中(需要安装sqoop)

sqoop import \
--connect jdbc:mysql://ip:port/database \
--username username \
--password password \
--table user_info \
--hbase-create-table \
--hbase-table user_info \
--hbase-row-key id \
--column-family order_info

二. 精确定位到binlog位点,进行启动

1. 查看当前数据库的binlog日志,在数据库中通过show binary logs 查看

mysql> show binary logs;
+------------------+-----------+
| Log_name | File_size |
+------------------+-----------+
| mysql-bin. | |
| mysql-bin. | |
| mysql-bin. | |
| mysql-bin. |

2. 查看当前binlog日志的position(一般最大的binlog以及最大的position)

show binlog events in 'mysql-bin.001115';

3. 需要先重启canal服务(如果之前正运行canal)

cd /usr/local/canal/bin && ./stop.sh && ./startup.sh

4. 修改zookeeper对应destination里面的配置

说明:我这边是在配置文件里面配置了zookeeper,如果没有配置,则会在你相应的destination目录下面生成meta.dat文件,也只需修改到对应的binlog和position即可

1)连接zookeeper

./zkCli.sh -server zk-address:

2)查看对应destination的配置(其中test为destination的名称)

(CONNECTED) 2] get /otter/canal/destinations/test//cursor
"journalName":"mysqlbin.002908"
"position":

3)把上面配置中journalName和position修改为自己需要的binlog日志和偏移量

 (CONNECTED) ]set /otter/canal/destinations/d_aura_jike//cursor {xxx}

5. 修改一下对应destination的配置文件(主要是触发使其生效)

vim /usr/local/canal/conf/test/instance.properties

6. 通过canal提供的java/go接口,来测试数据的同步(官网有例子https://github.com/alibaba/canal)

通过canal实现把MySQL数据实时增量到kafka的相关教程结束。

《通过canal实现把MySQL数据实时增量到kafka.doc》

下载本文的Word格式文档,以方便收藏与打印。