bireme数据源同步工具--debezium+kafka+bireme

1、介绍

Bireme 是一个 Greenplum / HashData 数据仓库的增量同步工具。目前支持 MySQL、PostgreSQL 和 MongoDB 数据源
官方介绍文档:https://github.com/HashDataInc/bireme/blob/master/README_zh-cn.md

1、数据流


Bireme 采用 DELETE + COPY 的方式,将数据源的修改记录同步到 Greenplum / HashData ,相较于INSERT + UPDATE + DELETE的方式,COPY 方式速度更快,性能更优

2、数据源

2.1、Maxwell + Kafka 是 bireme 目前支持的一种数据源类型,架构如下图:

Maxwell 是一个 MySQL binlog 的读取工具,它可以实时读取 MySQL 的 binlog,并生成 JSON 格式的消息,作为生产者发送给 Kafka
?
2.2、Debezium + Kafka 是 bireme 支持的另外一种数据源类型,架构如下图:

Debezium 是一个CDC工具,可以将数据库的增删改转换为事件流,并把这些修改发送给 Kafka

3、工作原理

Bireme 从数据源读取数据 (Record),将其转化为内部格式 (Row) 并缓存,当缓存数据达到一定量,将这些数据合并为一个任务 (Task),每个任务包含两个集合,delete 集合与insert 集合,最后把这些数据更新到目标数据库。
每个数据源可以有多个 pipeline,对于 maxwell,每个 Kafka partition 对应一个 pipeline;对于 debezium,每个 Kafka topic 对应一个 pipeline

4、本文搭建实例图形


?

?

2、配置相关数据源、目标数据源和java环境

1、mysql数据源

1、数据库,create database syncdb1;
2、用户权限,需要拥有SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT,此处使用root权限
3、同步的表(切换到syncdb1数据库),create table tb1(a int, b char(10), primary key(a));

2、pgsql目的数据库

1、用户,create user syncdb with password ‘syncdb‘;
2、数据库,create database syncdb with owner ‘syncdb‘;
3、同步的表(使用syncdb用户切换到syncdb数据库),create table tb1(a int, b char(10), primary key(a));

3、java环境的安装

1、下载二进制安装包:jdk-8u101-linux-x64.tar.gz
2、解压二进制包并做软链接:tar xf jdk-8u101-linux-x64.tar.gz && ln -s /data/jdk1.8.0_101 /usr/java
3、配置路径和java环境变量:vim /etc/profile.d/java.sh
export JAVA_HOME=/usr/java
export JRE_HOME=$JAVA_HOME/jre
export PATH=$JAVA_HOME/bin:$PATH
export CLASSPATH=.:$JAVA_HOME/lib:$JAVA_HOME/jre/lib
4、source生效:source  /etc/profile.d/java.sh

?
?
?

3、kafka的安装和启动配置

1、下载地址:https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/
2、kafka官方文档:http://kafka.apache.org/
3、解压缩:tar xf kafka_2.11-2.0.0.tgz && cd kafka_2.11-2.0.0
4、ZooKeeper

启动,bin/zookeeper-server-start.sh config/zookeeper.properties
关闭,bin/zookeeper-server-stop.sh config/zookeeper.properties

5、Kafka server

启动,bin/kafka-server-start.sh config/server.properties
启动,bin/kafka-server-stop.sh config/server.properties

6、Topic(不是本实验必须的,作为学习使用)

创建,bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic world
查询,bin/kafka-topics.sh --list --zookeeper localhost:2181
删除,bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic world

7、Producer(不是本实验必须的,作为学习使用)

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
>hello
>jiaming
>

8、Consumer(不是本实验必须的,作为学习使用)

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
hello
jiaming

?
?
?

4、debezium的安装和启动配置

下载debezium的mysql连接器
1、下载地址:https://debezium.io/docs/install/
2、debezium官方文档:https://debezium.io/docs/tutorial/
3、解压缩:tar xf debezium-connector-mysql-0.8.1.Final-plugin.tar.gz
4、解压出来的jar包全部拷贝到kafka libs目录下,cp debezium-connector-mysql/.jar kafka2.11-2.0.0/libs/
5、添加配置文件(用于连接mysql数据源,对应参数可参考官方介绍:https://debezium.io/docs/connectors/mysql/#example-configuration
cd kafka_2.11-2.0.0 && vim mysql.properties
note:debezium的database.server.name一定要和bireme的data_source保持一致

name=inventory-connector
connector.class=io.debezium.connector.mysql.MySqlConnector
database.hostname=118.190.209.102
database.port=5700
database.user=root
database.password=123456
database.server.id=129129
database.server.name=debezium1  # debezium的database.server.name一定要和bireme的data_source保持一致
database.whitelist=syncdb1  # 同步的数据库列表
database.history.kafka.bootstrap.servers=localhost:9092
database.history.kafka.topic=dbhistory.debezium1
include.schema.changes=true

6、以独立模式启动kafka connect,此时debezium会对数据库中的每一个表创建一个topic,消费相应的topic,即可获取binlog解析信息

cd kafka_2.11-2.0.0
bin/connect-standalone.sh config/connect-standalone.properties mysql.properties

7、查看topic列表

cd kafka_2.11-2.0.0
bin/kafka-topics.sh --list --zookeeper localhost:2181

debezium1.syncdb1.tb1,每个数据源同步表会生成一个topic
debezium1,记录ddl操作
dbhistory.debezium1,记录对应ddl操作和position位点信息

?
?

5、bireme的安装和启动配置

1、下载地址:https://github.com/HashDataInc/bireme/releases
2、bireme官方文档:https://github.com/HashDataInc/bireme/blob/master/README_zh-cn.md
3、解压缩:tar xf bireme-1.0.0.tar.gz && cd bireme-1.0.0
4、修改配置文件,vim etc/config.properties
note:debezium的database.server.name一定要和bireme的data_source保持一致

# target database where the data will sync into.
target.url = jdbc:postgresql://118.190.209.102:5432/syncdb
target.user = syncdb
target.passwd = syncdb

# data source name list, separated by comma.
data_source = debezium1  # debezium的database.server.name一定要和bireme的data_source保持一致

# data source "debezium1"
debezium1.type = debezium
# kafka server which debezium write into.
debezium1.kafka.server = 127.0.0.1:9092
# kafka groupid used for consumer.
debezium1.kafka.groupid = bireme
debezium1.kafka.namespace = debezium1

# set the IP address for bireme state server.
state.server.addr = 0.0.0.0
# set the port for bireme state server.
state.server.port = 8080

5、修改配置文件,vim etc/debezium1.properties(表映射配置
note:debezium1.properties的debezium1一定要和bireme的data_source保持一致

# source table full name = target table full name
syncdb1.tb1 = public.tb1

6、启动bireme,bin/bireme start
7、监控,http://192.168.1.129:8080/pretty (state.server.addr:state.server.port)
?
?
?

6、测试

1、mysql数据源

insert into tb1 select 1,‘a‘;
insert into tb1 select 2,‘b‘;

2、pgsql目标数据库

syncdb=# select * from tb1;
 a |     b
---+------------
 1 | a
 2 | b
(2 rows)

?
?
?

7、优势和存在问题

1、优势

1、可以实现多个库表的汇总功能,syncdb1.tb1/syncdb2.tb1 可以汇总到pgsql的一张表tb1中
2、中间使用kafka消息队列,对于大数据量性能方面提升较好
3、不存在数据源库***问题,位点信息存放在kafka中的topic中
4、第一次启动debezium,会生成一个数据源数据库的snapshot,然后之后基于binlog的解析,这样避免了第一次同步数据源数据库到目标数据库的一份全量数据

2、存在问题
待测试补充

原文地址:http://blog.51cto.com/11257187/2153817

时间: 2024-10-08 04:30:29

bireme数据源同步工具--debezium+kafka+bireme的相关文章

阿里云开源离线同步工具DataX3.0介绍

阿里云开源离线同步工具DataX3.0介绍 一. DataX3.0概览 ? DataX 是一个异构数据源离线同步工具,致力于实现包括关系型数据库(MySQL.Oracle等).HDFS.Hive.ODPS.HBase.FTP等各种异构数据源之间稳定高效的数据同步功能. 设计理念 为了解决异构数据源同步问题,DataX将复杂的网状的同步链路变成了星型数据链路,DataX作为中间传输载体负责连接各种数据源.当需要接入一个新的数据源的时候,只需要将此数据源对接到DataX,便能跟已有的数据源做到无缝数

OGG 从Oracle备库同步数据至kafka

OGG 从Oracle备库同步数据至kafka Table of Contents 1. 目的 2. 环境及规划 3. 安装配置JDK 3.1. 安装jdk 3.2. 配置环境变量 4. 安装Dataguard 4.1. 安装备库软件 4.2. 配置dataguard 4.2.1. 主库 4.2.2. 备库 4.3. 完成操作 4.4. 启动实时复制 5. zookeeper集群 5.1. 上传并解压 5.2. 配置 5.3. 创建myid文件 5.4. 配置环境变量 5.5. 启动和查看服务

【java并发】线程同步工具CyclicBarrier的使用

上一节中总结了Semaphore同步工具的使用,Semaphore主要提供了一个记数信号量,允许最大线程数运行.CyclicBarrier是另一个同步工具,这一节主要来总结一下CyclicBarrier的使用.先看一下官方的对CyclicBarrier的介绍: 一个同步辅助类,它允许一组线程互相等待,直到到达某个公共屏障点 (common barrier point).在涉及一组固定大小的线程的程序中,这些线程必须不时地互相等待,此时 CyclicBarrier 很有用.因为该 barrier

Linux的rsync远程数据同步工具

Rsync(remote synchronize) 是一个远程数据同步工具,可以使用"Rsync算法"同步本地和远程主机之间的文件. rsync的好处是只同步两个文件不同的部分,相同的部分不在传递.类似于增量备份, 这使的在服务器传递备份文件或者同步文件,比起scp工具要省好多时间. OS:ubuntu server 10.04 server:192.168.64.128 client:192.168.64.145 server 1.ubuntu  server 10.04默认已安装r

12306订票助手文件版本同步工具

using System; using System.Collections.Generic; using System.Text; using System.Linq; namespace TicketPackageSyncTool { class Program { /// <summary> /// 当前的根路径 /// </summary> static string _root; static string _chromePath; static string _outp

深入分析同步工具类之CountDownLatch

概览: CountDownLatch又称闭锁,其作用是让一个或者多个线程挂起,直到其他的线程执行完后恢复挂起的线程,使其继续执行.内部维护着一个静态内部类Sync,该类继承AbstractQueuedSynchronizer(这个类之前分析过了,参见    深入分析同步工具类之AbstractQueuedSynchronizer),Sync实例维护着state属性,调用await()方法,使当前线程挂起,当一个线程执行完后,调用countDown()方法,state-1,直到state变为0,被

线程:Exchanger同步工具

可以在对中对元素进行配对和交换的线程的同步点,类似于交易,A拿着钱到达指定地点,B拿着物品到达指定地点,相互交换,然后各自忙各自的事去了. 1 package ch03; 2 3 import java.util.concurrent.Exchanger; 4 5 public class ExchangerTest { 6 7 public static void main(String[] args) { 8 final Exchanger<String> changer = new Ex

线程:CountDownLatch同步工具

一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待. 类似计数器,当计数器的值为0时,继续往下执行. 1 package ch03; 2 3 import java.util.Random; 4 import java.util.concurrent.CountDownLatch; 5 import java.util.concurrent.ExecutorService; 6 import java.util.concurrent.Executors; 7 8

文件夹自动同步工具

这是我之前开发的文件夹自动同步工具,主要实现开发机和服务器之间的文件夹同步. 项目地址: https://github.com/mike-zhang/autoSync 问题描述 在windows下修改代码,到服务器上去编译,但每次都要通过winscp之类的工具拖拽上去(当然你也可以通过scp命令行的方式). 每次修改的文件很少,而且可能位于不同的目录,每次都重复覆盖文件的操作感觉比较麻烦,所以开发了这个自动文件夹自动同步工具. 当然这个工具也可以用于两台linux服务器之间的文件夹同步. 工具介