【Canal】利用canal实现mysql实时增量备份并对接kafka

简介

canal 1.1.1版本之后, 默认支持将canal server接收到的binlog数据直接投递到MQ, 目前默认支持的MQ系统有:

kafka: https://github.com/apache/kafka

RocketMQ : https://github.com/apache/rocketmq

本文中默认已经安装了kafka环境,仅做对接的演示;(若没有安装则需要提前安装kafka)

演示环境如下:

bigdata111 bigdata112 bigdata113
centos7.0 centos7.0 centos7.0
jdk1.8 jdk1.8 jdk1.8
zookeeper3.4 zookeeper3.4 zookeeper3.4
mysql5.7
canal-server canal-server canal-server
canal-admin
kafka2.11 kafka2.11 kafka2.11

1.修改canal.properties文件(三台机器)

[[email protected] canal-server]# cd conf/
[[email protected] conf]# ll
总用量 20
-rwxr-xr-x. 1 root root  291 3月  16 04:43 canal_local.properties
-rwxr-xr-x. 1 root root 5182 3月  16 04:54 canal.properties
drwxrwxrwx. 2 root root   47 3月  16 05:02 example
-rwxr-xr-x. 1 root root 3119 3月  16 04:43 logback.xml
drwxrwxrwx. 2 root root   38 3月  16 04:43 metrics
drwxrwxrwx. 3 root root 4096 3月  16 04:43 spring
[[email protected] conf]# vi canal.properties
# 修改服务模式为kafka
canal.serverMode = kafka
# 修改kafka的对应服务器
canal.mq.servers = bigdata111:9092,bigdata112:9092,bigdata113:9092

2.修改instance.properties文件(三台机器)

[[email protected] conf]# cd example/
[[email protected] example]# ll
总用量 196
-rw-r--r--. 1 root root 196608 3月  16 04:43 h2.mv.db
-rwxr-xr-x. 1 root root   2037 3月  16 05:02 instance.properties
[[email protected] example]# vi instance.properties
# 将数据发送到指定的topic
canal.mq.topic=test

3.启动zookeeper和Canal(三台机器)

启动zookeeper、查看zk的状态

[[email protected] canal-server]# zkServer.sh start
ZooKeeper JMX enabled by default
Using config: /opt/module/zookeeper-3.4.10/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
[[email protected] canal-server]# zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /opt/module/zookeeper-3.4.10/bin/../conf/zoo.cfg
Mode: follower

[[email protected] canal-server]# zkServer.sh start
ZooKeeper JMX enabled by default
Using config: /opt/module/zookeeper-3.4.10/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
[[email protected] canal-server]# zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /opt/module/zookeeper-3.4.10/bin/../conf/zoo.cfg
Mode: follower

[[email protected] canal-server]# zkServer.sh start
ZooKeeper JMX enabled by default
Using config: /opt/module/zookeeper-3.4.10/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
[[email protected] canal-server]# zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /opt/module/zookeeper-3.4.10/bin/../conf/zoo.cfg
Mode: leader

启动canal

[[email protected] canal-server]# bin/startup.sh
[[email protected] canal-server]# bin/startup.sh
[[email protected] canal-server]# bin/startup.sh

4.启动kafka服务(三台机器)

[[email protected] kafka-2.11]# kafka-server-start.sh /opt/module/kafka-2.11/config/server.properties &
[[email protected] kafka-2.11]# kafka-server-start.sh /opt/module/kafka-2.11/config/server.properties &
[[email protected] kafka-2.11]# kafka-server-start.sh /opt/module/kafka-2.11/config/server.properties &

5.查看所有进程(三台机器)

[[email protected] canal-server]# jps
5360 Kafka
4963 QuorumPeerMain
5699 Jps
5044 CanalLauncher

5.启动kafka消费者(bigdata113)

在bigdata113上启动消费者

kafka版本大于0.9以上,使用以下命令启动消费者:

[[email protected] canal-server]# kafka-console-consumer.sh --bootstrap-server bigdata113:9092 --from-beginning --topic test

kafka版本小于0.9版本,使用以下命令启动消费者:

[[email protected] canal-server]# kafka-console-consumer.sh --zookeeper bigdata113:2181 --from-beginning --topic test

如果在启动过程中报错:

zookeeper is not a recognized option

则是因为版本不同造成的,对应修改如上方式即可;

6.mysql操作数据

比如数据库中有一个stu表,我们向里面增加,更新,删除一个数据,看kafka消费者中的数据变化;

mysql> select * from stu limit 10;
+----+--------+------+---------------------+
| id | name   | sex  | stime               |
+----+--------+------+---------------------+
|  1 | 张三   | 男   | 2019-09-23 17:25:07 |
|  2 | 李四   | 女   | 2019-09-23 17:25:13 |
|  3 | 李楠   | 男   | 2019-09-23 17:25:21 |
|  4 | 张畅   | 女   | 2019-09-23 17:25:30 |
|  5 | 李想   | 男   | 2019-09-23 17:25:38 |
|  6 | 赵街   | 男   | 2019-09-23 17:25:50 |
|  7 | 林安   | 男   | 2019-09-23 17:26:00 |
|  8 | 秦王   | 男   | 2019-09-23 17:45:47 |
|  9 | 纣王   | 男   | 2019-09-23 17:45:47 |
| 10 | 张楠   | 男   | 2019-09-23 17:45:47 |
+----+--------+------+---------------------+
10 rows in set (0.00 sec)

mysql> insert into stu(id,name,sex)values(99332,‘test111‘,‘男‘);
Query OK, 1 row affected (0.03 sec)

mysql> update stu set name=‘test222‘ where id = 99332;
Query OK, 1 row affected (0.07 sec)
Rows matched: 1  Changed: 1  Warnings: 0

mysql> delete from stu where id =99332;
Query OK, 1 row affected (0.03 sec)

kafka消费者变化:

[[email protected] kafka-2.11]# kafka-console-consumer.sh --bootstrap-server bigdata113:9092 --from-beginning --topic test
{"data":[{"id":"9999","name":"张三22222","sex":"男","stime":"2020-03-16 06:24:53"}],"database":"student","es":1584311093000,"id":2,"isDdl":false,"mysqlType":{"id":"int(11)","name":"varchar(255)","sex":"varchar(255)","stime":"timestamp"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12,"sex":12,"stime":93},"table":"stu","ts":1584313187762,"type":"INSERT"}
[2020-03-16 07:19:38,216] INFO [GroupMetadataManager brokerId=113] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
{"data":[{"id":"99332","name":"test111","sex":"男","stime":"2020-03-16 07:22:02"}],"database":"student","es":1584314522000,"id":3,"isDdl":false,"mysqlType":{"id":"int(11)","name":"varchar(255)","sex":"varchar(255)","stime":"timestamp"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12,"sex":12,"stime":93},"table":"stu","ts":1584314522967,"type":"INSERT"}
{"data":[{"id":"99332","name":"test222","sex":"男","stime":"2020-03-16 07:23:55"}],"database":"student","es":1584314635000,"id":4,"isDdl":false,"mysqlType":{"id":"int(11)","name":"varchar(255)","sex":"varchar(255)","stime":"timestamp"},"old":[{"name":"test111","stime":"2020-03-16 07:22:02"}],"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12,"sex":12,"stime":93},"table":"stu","ts":1584314635674,"type":"UPDATE"}
{"data":[{"id":"99332","name":"test222","sex":"男","stime":"2020-03-16 07:23:55"}],"database":"student","es":1584314658000,"id":5,"isDdl":false,"mysqlType":{"id":"int(11)","name":"varchar(255)","sex":"varchar(255)","stime":"timestamp"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12,"sex":12,"stime":93},"table":"stu","ts":1584314658748,"type":"DELETE"}

原文地址:https://www.cnblogs.com/ShadowFiend/p/12522976.html

时间: 2024-10-02 01:53:17

【Canal】利用canal实现mysql实时增量备份并对接kafka的相关文章

mysql实时增量备份

采用binlog日志的好处 掌控所有更改操作,必要时可用于恢复数据 数据库主从复制的必要条件 [[email protected]~]# vim /etc/my.cnf [mysqld] .. .. log-bin=mysql-bin //启用二进制日志,并指定前缀 .. ..[[email protected] ~]# service mysqld restart 确认binlog日志文件新启用binlog后,每次启动MySQl服务都会新生成一份日志文件: [[email protected]

mysql实时增量备份 binlog日志备份

启用binlog日志实现对数据的增量备份: 日志存储位置: /var/lib/mysql/ 日志名称:主机名-bin.000001 或mysqld-bin.000001 binlog日志概述:二进制日志,记录所有更改数据的操作:默认超过500M自动生成新的日志: 修改主配置文件启用binlog日志 vim /etc/my.cnf [mysqld] log-bin  (或指定日志名log-bin=x.000001或者指定目录和文件名log-bin=/logdir/X.000001) max-bin

05: 实时增量备份 、 XtraBackup 备份 、 总结和答疑 、 MySQL 主从同步

day05 增量备份一.启用binlog日志 实现 实时增量备份二.使用第3方软件提供的命令做增量备份 +++++++++++++++++++++++++++++++++一.启用binlog日志 实现 实时增量备份1.1 binlog日志介绍:二进制日志, 是MySQL数据库服务日志文件中的一种,记录执行的除查询之外的sql命令.默认没有启用. 查询的sql命令 : show desc select 1.2 启用binlog日志?vim /etc/my.cnf[mysqld]server_id=

NFS +inotify+rsync 实现数据的远程挂载与实时增量备份

NFS 网络文件系统 功能: 用户可以像访问自己的本地文件系统一样使用网络中的远端系统上的文件 原理: 用户进程-->RPC服务(portman)-->tcp/ip协议栈-->远端主机RPC服务 后 -->远端NFS服务-->远端主机本地磁盘数据-->NFS服务-->tcp/ip协议栈-->本地数据 优点: 易于实现前段服务器代理分流的时候处理sission以及数据查找时找不到数据等问题,且同时起到了就算任何一个web服务器宕机都不影响数据的访问 缺点: 单

利用Xtrabackup进行mysql数据库的备份

利用Xtrabackup来实现数据库的备份 Xtrabackup是有percona公司开发的一款开源备份工具,它与ibbackup这个备份工具不同的是.ibbackup是一个收费的备份工具,而且在其备份性能上,ibbackup不如Xtrabackup.ibbackup和Xtrabackup都对Innodb存储引擎支持在线物理完全备份和在线物理增量备份,对MyISAM存储引擎来说,只支持温备份而已.更对关于两者的特性比较,请参考http://www.percona.com/software/per

mysql二进制增量备份

最近一直在思考如何增量备份二进制日志,这样一来我们就可以用mysqldump +二进制全备或者利用xtrabackup+二进制备份 思想:利用mysql-bin.index,如果是第一次备份,就将mysql-bin.index中的所有文件都复制到备份目录下,然后保存备份后二进制日志的序号到一个文件中backbinlogpostion        增量备份时(非第一备份),就用backbinlogpostion存的序号与当前mysql_bin.index中的比较,如果序号小于mysql-bin.

Mysql的增量备份 及基于时间点与位置的恢复

增量备份的优点是没有重复数据,备份量不大,时间短.缺点也很明显,需要上次完全备份及完全备份之后所有的增量备份才能恢复,反推恢复,操作较为繁琐. Mysql没有提供增量备份的方法,但是可以通过二进制日志间接实现增量备份. 二进制日志对备份的意义如下:1)二进制日志保存了所有更新或者可能更新数据库的日志文件2)二进制日志在启动Mysql服务器后开始记录,并在文件达到max_binlog_size 所设置的大小或者接收到的flush logs命令后重新创建新的日志文件.3)只需要定时执行flush l

Python 生产环境Mysql数据库增量备份脚本

Mysql数据库常用的办法是通过mysqldump导出sql进行备份,但是不适合数据量很大的数据库,速度,锁表是两个严重的问题.前面写了一遍blog介绍xtrabackup的热备工具.下面的脚本是基于xtrabackup实现自动备份数据库的功能. 需求描述: 每天晚上23点,对数据库进行一次完整备份.第二天0-22点,每小时进行一次增量备份.每次备份前把上次的完整备份和23次增量备份移动到指定目录里,保留7天的数据. ps:不要问我,为什么是23点执行完整备份,0点不更好处理吗?bingo,这是

通过innobackupex实现对MySQL的增量备份与还原

备份 增量备份是基于完整备份的,所以我们需要先做一次完整备份: innobackupex --password=test /backup/ 备注:test是我的MySQL服务的root用户的密码,/backup/是我用于存放备份文件的目录. 执行完命令后,/backup/目录下会生成一个时间戳目录,在我这个例子中是/backup/2016-09-20_14-45-26/. 第一次增量备份: innobackupex --password=test --incremental /backup/ -