OGG同步ORACLE数据到KAFKA

环境:
源端:oracle12.2 ogg for oracle 12.3
目标端:KAFKA ogg for bigdata 12.3
将oracle中的数据通过OGG同步到KAFKA

源端配置:
1、为要同步的表添加附加日志
dblogin USERID [email protected], PASSWORD ogg
add trandata scott.tab1
add trandata scott.tab2

2、 添加抽取进程
GGSCI>add extract EXT_KAF1,integrated tranlog, begin now
GGSCI>add EXTTRAIL ./dirdat/k1, extract EXT_KAF1,MEGABYTES 200

编辑抽取进程参数:
GGSCI> edit params EXT_KAF1

extract EXT_KAF1
userid c##ggadmin,PASSWORD ggadmin
LOGALLSUPCOLS
UPDATERECORDFORMAT COMPACT
exttrail ./dirdat/k1,FORMAT RELEASE 12.3
SOURCECATALOG orclpdb --(指定pdb)
table scott.tab1;
table scott.tab2;

注册进程
GGSCI> DBLOGIN USERID c##ggadmin,PASSWORD ggadmin
GGSCI> register extract EXT_KAF1 database container (orclpdb)

3、添加投递进程:
GGSCI>add extract PMP_KAF1, exttrailsource ./dirdat/k1
GGSCI>add rmttrail ./dirdat/f1,EXTRACT PMP_KAF1,MEGABYTES 200

编辑投递进程参数:
GGSCI>edit param PMP_KAF1

EXTRACT PMP_KAF1
USERID c##ggadmin,PASSWORD ggadmin
PASSTHRU
RMTHOST 10.1.1.247, MGRPORT 9178
RMTTRAIL ./dirdat/f1,format release 12.3
SOURCECATALOG orclpdb
TABLE scott.tab1;
table scott.tab2;

4、添加数据初始化进程(Oracle initial load) 可以多个表分开初始化也可以一起初始化,此处选择分开初始化
GGSCI> add extract ek_01, sourceistable

编辑参数:
GGSCI> EDIT PARAMS ek_01

EXTRACT ek_01
USERID c##ggadmin,PASSWORD ggadmin
RMTHOST 10.1.1.247, MGRPORT 9178
RMTFILE ./dirdat/ka,maxfiles 999, megabytes 500,format release 12.3
SOURCECATALOG orclpdb
table scott.tab1;

GGSCI> add extract ek_02, sourceistable

EDIT PARAMS ek_02
EXTRACT ek_02
USERID c##ggadmin,PASSWORD ggadmin
RMTHOST 10.1.1.247, MGRPORT 9178
RMTFILE ./dirdat/kb,maxfiles 999, megabytes 500,format release 12.3
SOURCECATALOG orclpdb
table scott.tab2;

5、生成def文件:
GGSCI> edit param defgen1

USERID c##ggadmin,PASSWORD ggadmin
defsfile /home/oracle/ogg/ggs12/dirdef/defgen1.def,format release 12.3
SOURCECATALOG orclpdb
table scott.tab1;
table scott.tab2;

在OGG_HOME下执行如下命令生成def文件
defgen paramfile dirprm/defgen1.prm

将生成的def文件传到目标端$OGG_HOME/dirdef下

目标端配置:
1、将$OGG_HOME/AdapterExamples/big-data/kafka下的所有文件copy到$OGG_HOME/dirprm下
cd $OGG_HOME/AdapterExamples/big-data/kafka
cp * $OGG_HOME/dirprm

2、将$ORACLE_HOME/AdapterExamples/trail下的文件tr000000000 copy到$OGG_HOME/dirdat下
cd $ORACLE_HOME/AdapterExamples/trail
cp tr000000000 $OGG_HOME/dirdat

3、添加初始化进程:(可以多表一起初始化也可以分开初始化,此处选择单独初始化)
GGSCI> ADD replicat rp_01, specialrun

GGSCI> EDIT PARAMS rp_01
SPECIALRUN
end runtime
setenv(NLS_LANG="AMERICAN_AMERICA.ZHS16GBK")
targetdb libfile libggjava.so set property=./dirprm/kafka1.props
SOURCEDEFS ./dirdef/defgen1.def
EXTFILE ./dirdat/ka
reportcount every 1 minutes, rate
grouptransops 10000
MAP orclpdb.scott.tab1, TARGET scott.tab1;

GGSCI> ADD replicat rp_02, specialrun
GGSCI> EDIT PARAMS rp_02

SPECIALRUN
end runtime
setenv(NLS_LANG="AMERICAN_AMERICA.ZHS16GBK")
targetdb libfile libggjava.so set property=./dirprm/kafka2.props
SOURCEDEFS ./dirdef/defgen1.def
EXTFILE ./dirdat/kb
reportcount every 1 minutes, rate
grouptransops 10000
MAP orclpdb.scott.tab2, TARGET scott.tab2;

4、添加恢复进程:
GGSCI>add replicat r_kaf1,exttrail ./dirdat/f1
GGSCI>edit params r_kaf1

REPLICAT r_kaf1
setenv(NLS_LANG="AMERICAN_AMERICA.ZHS16GBK")
HANDLECOLLISIONS
targetdb libfile libggjava.so set property=./dirprm/kafka1.props
SOURCEDEFS ./dirdef/defgen1.def
reportcount every 1 minutes, rate
grouptransops 10000
MAP orclpdb.scott.tab1, TARGET scott.tab1;

GGSCI> add replicat r_kaf2,exttrail ./dirdat/f2
GGSCI> edit params r_kaf2

REPLICAT r_kaf2
setenv(NLS_LANG="AMERICAN_AMERICA.ZHS16GBK")
HANDLECOLLISIONS
targetdb libfile libggjava.so set property=./dirprm/kafka2.props
SOURCEDEFS ./dirdef/defgen1.def
reportcount every 1 minutes, rate
grouptransops 10000
MAP orclpdb.scott.tab2, TARGET scott.tab2;

5、参数配置:
custom_kafka_producer.properties文件内容如下:

bootstrap.servers=10.1.1.246:9200,10.1.1.247:9200 --只需要改动这一行就行,指定kafka的地址和端口号
acks=1
reconnect.backoff.ms=1000
value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
batch.size=16384
linger.ms=10000

kafka1.props文件内容如下:
gg.handlerlist = kafkahandler
gg.handler.kafkahandler.type=kafka
gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties
#The following resolves the topic name using the short table name
gg.handler.kafkahandler.topicMappingTemplate= topic1
#gg.handler.kafkahandler.format=avro_op
gg.handler.kafkahandler.format =json --这里做了改动,指定格式为json格式
gg.handler.kafkahandler.format.insertOpKey=I
gg.handler.kafkahandler.format.updateOpKey=U
gg.handler.kafkahandler.format.deleteOpKey=D
gg.handler.kafkahandler.format.truncateOpKey=T
gg.handler.kafkahandler.format.prettyPrint=false
gg.handler.kafkahandler.format.jsonDelimiter=CDATA[]
gg.handler.kafkahandler.format.includePrimaryKeys=true --包含主键
gg.handler.kafkahandler.SchemaTopicName= topic1 --此处指定为要同步到的目标topic名字
gg.handler.kafkahandler.BlockingSend =false
gg.handler.kafkahandler.includeTokens=false
gg.handler.kafkahandler.mode=op
goldengate.userexit.timestamp=utc
goldengate.userexit.writers=javawriter
javawriter.stats.display=TRUE
javawriter.stats.full=TRUE
gg.log=log4j
gg.log.level=INFO
gg.report.time=30sec
#Sample gg.classpath for Apache Kafka
gg.classpath=dirprm/:/opt/cloudera/parcels/KAFKA/lib/kafka/libs/ --指定classpath,这里很重要,必须有kafka安装文件的类库。
#Sample gg.classpath for HDP
#gg.classpath=/etc/kafka/conf:/usr/hdp/current/kafka-broker/libs/

javawriter.bootoptions=-Xmx512m -Xms32m -Djava.class.path=ggjava/ggjava.jar

启动进程进程恢复:
1、启动源端抓取进程
GGSCI> start EXT_KAF1
2、启动源端投递进程
GGSCI> start PMP_KAF1
3、启动源端初始化进程
GGSCI> start ek_01
4、启动目标端初始化进程
在$OGG_HOME下执行如下命令:
./replicat paramfile ./dirprm/rp_01.prm reportfile ./dirrpt/rp_01.rpt -p INITIALDATALOAD
5、启动目标端恢复进程
GGSCI> start R_KAF1

遇到的错误:
1、ERROR OGG-15050 Error loading Java VM runtime library(2 no such file or directory)

原因:找不到类库(配置好环境变量之后,OGG的mgr进程没有重启,导致的)
解决:重启MGR进程

2、ERROR OG-15051 Java or JNI exception

原因:没有使用ogg12.3.1.1.1自带的kafka.props,而是copy了ogg12.2的kafka.props,导致出现异常。
解决:使用ogg12.3.1.1.1自带的kafka.props,并指定相关的属性,解决。

原文地址:http://blog.51cto.com/lyzbg/2088409

时间: 2024-10-18 06:22:51

OGG同步ORACLE数据到KAFKA的相关文章

使用maxwell实时同步mysql数据到kafka

一.软件环境: 操作系统:CentOS release 6.5 (Final) java版本: jdk1.8 zookeeper版本: zookeeper-3.4.11 kafka 版本: kafka_2.11-1.1.0.tgz maxwell版本:maxwell-1.16.0.tar.gz 注意 : 关闭所有机器的防火墙,同时注意启动可以相互telnet ip 端口 二.环境部署 1.安装jdk export JAVA_HOME=/usr/java/jdk1.8.0_181 export P

OGG 从Oracle备库同步数据至kafka

OGG 从Oracle备库同步数据至kafka Table of Contents 1. 目的 2. 环境及规划 3. 安装配置JDK 3.1. 安装jdk 3.2. 配置环境变量 4. 安装Dataguard 4.1. 安装备库软件 4.2. 配置dataguard 4.2.1. 主库 4.2.2. 备库 4.3. 完成操作 4.4. 启动实时复制 5. zookeeper集群 5.1. 上传并解压 5.2. 配置 5.3. 创建myid文件 5.4. 配置环境变量 5.5. 启动和查看服务

基于OGG的Oracle与Hadoop集群准实时同步介绍

Oracle里存储的结构化数据导出到Hadoop体系做离线计算是一种常见数据处置手段.近期有场景需要做Oracle到Hadoop体系的实时导入,这里以此案例做以介绍.Oracle作为商业化的数据库解决方案,自发性的获取数据库事务日志等比较困难,故选择官方提供的同步工具OGG(Oracle GoldenGate)来解决. 安装与基本配置 环境说明 软件配置 角色 数据存储服务及版本 OGG版本 IP 源服务器 OracleRelease11.2.0.1 Oracle GoldenGate 11.2

GoldenGate架构下oracle与oracle数据同步

GoldenGate架构下oracle与oracle数据迁移 主机updba上scott用户下数据同步到主机upgg上scott. 1: updba和upgg上scott数据一致.(可通过迁移实现) 2: 并在两个主机上分别安装GG软件.创建管理用户ggm授权. 3:配置manager管理进程 GGSCI>edit params mgr port 7809 GGSCI>info mgr GGSCI>start mgr GGSCI>info mgr 4: 配置DML同步 4.1 抓取

Oracle数据同步交换

一.为了解决数据同步汇聚,数据分发,数据转换,数据维护等需求,TreeSoft将复杂的网状的同步链路变成了星型数据链路.     TreeSoft作为中间传输载体负责连接各种数据源,为各种异构数据库之间架起同步的桥梁,     实现一对多,多对多,多对一等复杂场景的数据同步.           TreeSoft已被广泛应用,每日处理大量大数据的数据维护.数据同步.数据汇聚.数据转换业务.     支持MySQL, MariaDB, Oracle, PostgreSQL, SQL Server,

MongoDB -> kafka 高性能实时同步(采集)mongodb数据到kafka解决方案

写这篇博客的目的 让更多的人了解 阿里开源的MongoShake可以很好满足mongodb到kafka高性能高可用实时同步需求(项目地址:https://github.com/alibaba/MongoShake,下载地址:https://github.com/alibaba/MongoShake/releases).至此博客就结束了,你可以愉快地啃这个项目了.还是一起来看一下官方的描述: MongoShake is a universal data replication platform b

oracle dg 备库不同步主库数据

今天遇到一个数据库同步问题,主库被关闭,重启主库后,备库不能正常同步主库数据.只有当手动切换归档日志的时候,备库才能和主库一致. 这个问题的解决方法: 重启备库,重新应用归档日志. 操作步骤如下: //关闭备库监听器 lsnrctl stop //关闭备库 sqlplus / as sysdba alter database recover managed standby database cancel; shutdown immediate; //启动备库 startup nomount; a

Goldengate抽取ORACLE 数据 到 Hbase

1.        软件版本说明: Goldengate 12c  12.2.0.1 forOracle (源端 ) Goldengate 12c  12.2.0.1  for Bigdata ( 目标端) Oracle 11g 11.2.0.4.0 (注oracle 数据库必须是11.2.0.4之后的版本,之前的oracle 数据库版本不支持ogg 12c ) HBase 1.1.2 Java 1.8.0_91 2.        机器IP 说明 源端IP地址(192.168.45.176)安

使用Goldengate同步异构数据库之Kafka中间件

收到业务部门需求,要求将Oracle数据库某表同步至Mysql数据库中,异构环境我们用kafka来实现,下面是具体的一些配置: 由于业务需要,现申请使用架构组数据同步服务同步以下数据到管家MySQL数据库 代理商用户数据:a. 数据源:SSP库 AAA.system_userb. 数据目标:MySQL DLS库 DLS_SYSTEM_USERc. 同步逻辑: 无d. 同步数据及对应关系:参见附件e. 是否涉及敏感信息:否 准备工作:由于目标库Mysql库该表已经存在,我们将该表备份并且获取建表语