使用Goldengate同步异构数据库之Kafka中间件

收到业务部门需求,要求将Oracle数据库某表同步至Mysql数据库中,异构环境我们用kafka来实现,下面是具体的一些配置;

由于业务需要,现申请使用架构组数据同步服务同步以下数据到管家MySQL数据库

代理商用户数据:
a. 数据源:SSP库 AAA.system_user
b. 数据目标:MySQL DLS库 DLS_SYSTEM_USER
c. 同步逻辑: 无
d. 同步数据及对应关系:参见附件
e. 是否涉及敏感信息:否

准备工作;由于目标库Mysql库该表已经存在,我们将该表备份并且获取建表语句;
--获取建表语句
mysql> show create table dls_system_user;

--导出单个数据表结构和数据
mysqldump -uroot -p dls DLS_SYSTEM_USER > DLS_SYSTEM_USER_180622.sql

--重命名表
ALTER TABLE DLS_SYSTEM_USERRENAME DLS_SYSTEM_USER_BAK0622;

--新建空表
CREATE TABLE dls_system_user (
ID varchar(100) NOT NULL,
ACCOUNT_EXPIRED int(1) NOT NULL DEFAULT ‘0‘,
ACCOUNT_LOCKED int(1) NOT NULL DEFAULT ‘0‘,
ENABLED int(1) NOT NULL DEFAULT ‘0‘,
ORG_NO varchar(255) NOT NULL DEFAULT ‘‘,
USER_CODE varchar(100) NOT NULL DEFAULT ‘‘,
REMARK_NAME varchar(255) NOT NULL DEFAULT ‘‘,
IS_CREATE_PERSON varchar(255) NOT NULL DEFAULT ‘‘,
STATUS int(10) NOT NULL DEFAULT ‘0‘,
PRIMARY KEY (ID),
KEY IDX_DLS_SYSTEM_USER_USER_CODE (USER_CODE)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

Oracle源端GoldenGate配置:
1、为要同步的表添加附加日志
dblogin USERID [email protected], password ggs
add trandata AAA.system_user

2、 添加抽取进程
add extract ext_kafb, tranlog, begin now
add EXTTRAIL ./dirdat/a2, extract ext_kafb,MEGABYTES 200

edit params EXT_KAFB

extract EXT_KAFB
USERID [email protected], password ggs
LOGALLSUPCOLS
exttrail ./dirdat/a2,FORMAT RELEASE 11.2
table AAA.system_user;

3、添加投递进程:
add extract pmp_kafb, exttrailsource ./dirdat/a2
add rmttrail ./dirdat/b2,EXTRACT pmp_kafb,MEGABYTES 200

eidt params pmp_kafb

EXTRACT pmp_kafb
USERID [email protected], password ggs
PASSTHRU
RMTHOST 172.16.xxx.5, MGRPORT 9178 --kafka服务器地址
RMTTRAIL ./dirdat/b2,format release 11.2
table AAA.system_user;

----初始化文件存放在 /ggs/ggs12/dirprm/

4.添加初始化进程
ADD EXTRACT ek_20, sourceistable ---源端添加

edit params ek_20

EXTRACT ek_20
USERID [email protected], password ggs
RMTHOST 172.16.154.5, MGRPORT 9178
RMTFILE ./dirdat/lb,maxfiles 999, megabytes 500
table AAA.system_user;

5.生成def文件:
GGSCI> edit param defgen_n9

USERID [email protected], password ggs
defsfile /goldengate/ggskafka/dirdef/defgen_n9.def,format release 11.2
table AAA.system_user;

在OGG_HOME下执行如下命令生成def文件
defgen paramfile /goldengate/ggskafka/dirprm/defgen_n9.prm

将生成的def文件传到kafka服务器$OGG_HOME/dirdef下

---目标端mysql 数据库地址172.16.xxx.148,需要新建kafka用户

grant select,insert,update,delete,create,drop on DLS.* to ‘kafka‘@‘%‘ identified by ‘jiubugaosuni‘;

--kafka服务器GoldenGate操作
1、添加初始化进程:---dirprm
GGSCI> ADD replicat rn_3,specialrun

EDIT PARAMS rn_3

SPECIALRUN
end runtime
setenv(NLS_LANG="AMERICAN_AMERICA.ZHS16GBK")
targetdb libfile libggjava.so set property=./dirprm/kafkat_n3.props
SOURCEDEFS ./dirdef/defgen_n9.def
EXTFILE ./dirdat/lb
reportcount every 1 minutes, rate
grouptransops 10000
MAP AAA.system_user, TARGET DLS.DLS_SYSTEM_USER;

2、添加复制进程:
GGSCI>add replicat RN_KF3,exttrail ./dirdat/lb
GGSCI>edit params RN_KF3

REPLICAT RN_KF3
setenv(NLS_LANG="AMERICAN_AMERICA.ZHS16GBK")
HANDLECOLLISIONS
targetdb libfile libggjava.so set property=./dirprm/kafkat_n3.props
SOURCEDEFS ./dirdef/defgen_n9.def
reportcount every 1 minutes, rate
grouptransops 10000
MAP AAA.system_user, TARGET DLS.DLS_SYSTEM_USER;

3、参数配置:
cd /home/app/ogg/ggs12/dirprm

custom_kafka_producer.properties 文件内容如下:

[[email protected] dirprm]$ more custom_kafka_producer.properties
bootstrap.servers=172.16.xxx.5:9092,172.16.xxx.7:9092
acks=1
reconnect.backoff.ms=1000

value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer

100KB per partition

batch.size=16384
linger.ms=0

---vi添加对应文件 kafkat_n3.props
kafka.props文件内容如下:
gg.handlerlist = kafkahandler
gg.handler.kafkahandler.type=kafka
gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties
#The following resolves the topic name using the short table name
gg.handler.kafkahandler.topicMappingTemplate= DLS.DLS_MERCHANT_STATUS
#gg.handler.kafkahandler.format=avro_op
gg.handler.kafkahandler.format =json --指定文件类型
gg.handler.kafkahandler.format.insertOpKey=I
gg.handler.kafkahandler.format.updateOpKey=U
gg.handler.kafkahandler.format.deleteOpKey=D
gg.handler.kafkahandler.format.truncateOpKey=T
gg.handler.kafkahandler.format.prettyPrint=false
gg.handler.kafkahandler.format.jsonDelimiter=CDATA[]
gg.handler.kafkahandler.format.includePrimaryKeys=true
gg.handler.kafkahandler.SchemaTopicName= DLS.DLS_MERCHANT_STATUS --指定topic名称
gg.handler.kafkahandler.BlockingSend =false
gg.handler.kafkahandler.includeTokens=false
gg.handler.kafkahandler.mode=op
goldengate.userexit.timestamp=utc
goldengate.userexit.writers=javawriter
javawriter.stats.display=TRUE
javawriter.stats.full=TRUE
gg.log=log4j
gg.log.level=INFO
gg.report.time=30sec
#Sample gg.classpath for Apache Kafka
gg.classpath=dirprm/:/opt/cloudera/parcels/KAFKA/lib/kafka/libs/ --patch路径
#Sample gg.classpath for HDP
#gg.classpath=/etc/kafka/conf:/usr/hdp/current/kafka-broker/libs/

javawriter.bootoptions=-Xmx512m -Xms32m -Djava.class.path=ggjava/ggjava.jar

至此我们配置算是基本完成,现在我们来开启进程,初始化数据;
1、启动源端抓取进程
GGSCI> start EXT_KAFB
2、启动源端投递进程
GGSCI> start pmp_kafb
3、启动源端初始化进程
GGSCI> start ek_20
4、启动目标端初始化进程
GGSCI> start rn_3
在$OGG_HOME下执行如下命令:

./replicat paramfile ./dirprm/rn_3.prm reportfile ./dirrpt/rn_3.rpt -p INITIALDATALOAD
5、启动目标端恢复进程
GGSCI> start RN_KF3

原文地址:http://blog.51cto.com/yangjunfeng/2132872

时间: 2024-10-03 15:14:30

使用Goldengate同步异构数据库之Kafka中间件的相关文章

12C数据库Goldengate同步异构数据库Kafka中间件之二

前两天测试环境的需求将上线生产环境,需求还是a. 数据源:SSP库 ssp.m_system_user,Oracle DB 12.1.0.2.0,Ogg Version 12.2.0.1.1 OGGCORE_12.2.0.1.0_PLATFORMS_151211.1401_FBOb. 数据目标:MySQL DLS库 DLS_SYSTEM_USERc.kafka集群:10.1.1.247 ,Ogg Version 12.3.0.1.0 OGGCORE_OGGADP.12.3.0.1.0GA_PLA

SymmetricDS 异构数据库同步软件部署案例

SymmetricDS是一个开源的同步软件,该软件是基于java环境编写的,在运行的时候需要安装JDK.SymmetricDS可以同步文件和数据库,本文的重点是数据库方面的同步. SymmetricDS支持多种数据库的同步,支持的数据库如下: Oracle, MySQL, MariaDB, PostgreSQL, MS SQL Server (including Azure), IBM DB2, H2, HSQLDB, Derby, Firebird, Interbase, Informix,

分布式数据库集群中间件

我是一个分布式数据库集群中间件的开发人员,已经一年多一点的开发时间了,今天总结点我所知道的一些事情,给有新近来到这个领域的研发人员一点借鉴. 生活不易,赚钱不易,离开仅仅是为多赚点钱. ----学渣 我仅仅是说我所开发过的系统: 后面文章会有具体的分析.这里仅仅做简单的梳理.也就是你要研发分布式数据库集群中间件.须要向着哪些方面去思考. 首先我们从名字去分析我们正在从事的工作内容: 1. 分布式 须要考虑它的方向(后面具体说明) a)  分布式中的概念 b)  分布式的模型 c)  分布式系统特

使用DataGridView进行增删改查,并同步到数据库

DataGridView控件具有极高的可配置性和可扩展性.它提供有大量的属性.方法和事件,能够用来对该控件的外观和行为进行自己定义.以下通过一个小样例来展示DataGridView进行增删改查,并同步到数据库的功能. 窗口展示: 用户需求: 1.当窗口显示时,将数据库中用户表中的数据显示出来. 2.选中一行,运行删除操作,同一时候在数据库中对应数据被删除. 3.双击某个数据,进行编辑,或者在空白行加入新的数据,然后点击更新,数据库随之更新. 代码展示: Public Class Form1'代码

Zebra 教程:数据库访问层中间件

Zebra是点评内部使用的数据库访问层中间件,它具有以下的功能点: 配置集中管理,动态刷新支持读写分离.分库分表丰富的监控信息在CAT上展现其中的三个组件的功能分别是(腾云科技ty300.com): zebra-api : 最主要的访问层中间件(勤快学qkxue.net)zebra-ds-monitor-client:基于CAT的监控(可选)zebra-dao:基于MyBatis的异步化的DAO组件(可选)编译1. git clone https://github.com/dianping/ze

Code First03---CodeFirst根据配置同步到数据库的三种方式

上一节我们说到使用Fluent API对实体的配置,但是有一个问题了,在业务中我们可以用到的实体很多,那是不是每个都需要这样去配置,这样就造成我们重写的OnModelCreating方法很庞大了.所以我们需要更好的组织Fluent API的配置. 我们知道modelBuilder的Entity<T>泛型方法的返回值是EntityTypeConfiguration<T>泛型类. 所以我们可以定义一个继承自EntityTypeConfiguration<T>泛型类的类来定义

Oracle Gateway11g R2访问异构数据库(MSSQL)配置文档

目录 1 前提条件 2 下载透明网关 3 解压安装透明网关 4 配置tnsnames 5 配置监听器 6 配置Gateway 7 重启Oracle服务 8 配置测试 9 创建DBLink 10 数据库测试接连 11 附件 注:MSSQL不需要配置ODBC数据源. 1.前提条件 1.     准备工作 软件名称 操作系统 IP地址 端口 用户 密码 版本 状态 Oracle数据库 Windows localhost 1521 scott scott win32 Oracle11g R2 已安装 O

OracleGateway11gR2访问异构数据库(MSSQL)配置文档(转)

1.前提条件 1. 准备工作 软件名称 操作系统 IP地址 端口 用户 密码 版本 状态 Oracle数据库 Windows localhost 1521 scott scott win32 Oracle11g R2 已安装 Oracle 透明网关 Windows localhost       win32_11gR2_gateways 未安装 MSSQL数据库 Windows 10.22.1.125 1433 sa 123 win64 MS SQL 2008 已安装 注:本例访问的是MSSQL

异构数据库之间完全可以用SQL语句导数据

告诉你一个最快的方法,用SQLServer连接DBF 在SQLServer中执行 SELECT * into bmk FROM OpenDataSource( 'Microsoft.Jet.OLEDB.4.0',  'Data Source="e:\share";User ID=Admin;Password=;Extended properties=dBase 5.0')...bmk 这样就可以把e:\share中的bmk.dbf表导入到Sqlserver中, 速度是最快的 把压箱底的