OGG实现Oracle数据同步到Kafka

2022-08-16 14:33:40 浏览数 (1)

环境: 源端:Oracle12.2 ogg for Oracle 12.3 目标端:Kafka ogg for bigdata 12.3 将Oracle中的数据通过OGG同步到Kafka

源端配置: 1、为要同步的表添加附加日志 dblogin USERID ogg@orclpdb, PASSWORD ogg add trandata scott.tab1 add trandata scott.tab2

2、 添加抽取进程 GGSCI>add extract EXT_KAF1,integrated tranlog, begin now GGSCI>add EXTTRAIL ./dirdat/k1, extract EXT_KAF1,MEGABYTES 200

编辑抽取进程参数: GGSCI> edit params EXT_KAF1

extract EXT_KAF1 userid c##ggadmin,PASSWORD ggadmin LOGALLSUPCOLS UPDATERECORDFORMAT COMPACT exttrail ./dirdat/k1,FORMAT RELEASE 12.3 SOURCECATALOG orclpdb --(指定pdb) table scott.tab1; table scott.tab2;

注册进程 GGSCI> DBLOGIN USERID c##ggadmin,PASSWORD ggadmin GGSCI> register extract EXT_KAF1 database container (orclpdb)

3、添加投递进程: GGSCI>add extract PMP_KAF1, exttrailsource ./dirdat/k1 GGSCI>add rmttrail ./dirdat/f1,EXTRACT PMP_KAF1,MEGABYTES 200

编辑投递进程参数: GGSCI>edit param PMP_KAF1

EXTRACT PMP_KAF1 USERID c##ggadmin,PASSWORD ggadmin PASSTHRU RMTHOST 10.1.1.247, MGRPORT 9178 RMTTRAIL ./dirdat/f1,format release 12.3 SOURCECATALOG orclpdb TABLE scott.tab1; table scott.tab2;

4、添加数据初始化进程(Oracle initial load) 可以多个表分开初始化也可以一起初始化,此处选择分开初始化 GGSCI> add extract ek_01, sourceistable

编辑参数: GGSCI> EDIT PARAMS ek_01

EXTRACT ek_01 USERID c##ggadmin,PASSWORD ggadmin RMTHOST 10.1.1.247, MGRPORT 9178 RMTFILE ./dirdat/ka,maxfiles 999, megabytes 500,format release 12.3 SOURCECATALOG orclpdb table scott.tab1;

GGSCI> add extract ek_02, sourceistable

EDIT PARAMS ek_02 EXTRACT ek_02 USERID c##ggadmin,PASSWORD ggadmin RMTHOST 10.1.1.247, MGRPORT 9178 RMTFILE ./dirdat/kb,maxfiles 999, megabytes 500,format release 12.3 SOURCECATALOG orclpdb table scott.tab2;

5、生成def文件: GGSCI> edit param defgen1

USERID c##ggadmin,PASSWORD ggadmin defsfile /home/oracle/ogg/ggs12/dirdef/defgen1.def,format release 12.3 SOURCECATALOG orclpdb table scott.tab1; table scott.tab2;

在OGG_HOME下执行如下命令生成def文件 defgen paramfile dirprm/defgen1.prm

将生成的def文件传到目标端$OGG_HOME/dirdef下

目标端配置: 1、将$OGG_HOME/AdapterExamples/big-data/kafka下的所有文件copy到$OGG_HOME/dirprm下 cd $OGG_HOME/AdapterExamples/big-data/kafka cp * $OGG_HOME/dirprm

2、将$ORACLE_HOME/AdapterExamples/trail下的文件tr000000000 copy到$OGG_HOME/dirdat下 cd $ORACLE_HOME/AdapterExamples/trail cp tr000000000 $OGG_HOME/dirdat

3、添加初始化进程:(可以多表一起初始化也可以分开初始化,此处选择单独初始化) GGSCI> ADD replicat rp_01, specialrun

GGSCI> EDIT PARAMS rp_01 SPECIALRUN end runtime setenv(NLS_LANG="AMERICAN_AMERICA.ZHS16GBK") targetdb libfile libggjava.so set property=./dirprm/kafka1.props SOURCEDEFS ./dirdef/defgen1.def EXTFILE ./dirdat/ka reportcount every 1 minutes, rate grouptransops 10000 MAP orclpdb.scott.tab1, TARGET scott.tab1;

GGSCI> ADD replicat rp_02, specialrun GGSCI> EDIT PARAMS rp_02

SPECIALRUN end runtime setenv(NLS_LANG="AMERICAN_AMERICA.ZHS16GBK") targetdb libfile libggjava.so set property=./dirprm/kafka2.props SOURCEDEFS ./dirdef/defgen1.def EXTFILE ./dirdat/kb reportcount every 1 minutes, rate grouptransops 10000 MAP orclpdb.scott.tab2, TARGET scott.tab2;

4、添加恢复进程: GGSCI>add replicat r_kaf1,exttrail ./dirdat/f1 GGSCI>edit params r_kaf1

REPLICAT r_kaf1 setenv(NLS_LANG="AMERICAN_AMERICA.ZHS16GBK") HANDLECOLLISIONS targetdb libfile libggjava.so set property=./dirprm/kafka1.props SOURCEDEFS ./dirdef/defgen1.def reportcount every 1 minutes, rate grouptransops 10000 MAP orclpdb.scott.tab1, TARGET scott.tab1;

GGSCI> add replicat r_kaf2,exttrail ./dirdat/f2 GGSCI> edit params r_kaf2

REPLICAT r_kaf2 setenv(NLS_LANG="AMERICAN_AMERICA.ZHS16GBK") HANDLECOLLISIONS targetdb libfile libggjava.so set property=./dirprm/kafka2.props SOURCEDEFS ./dirdef/defgen1.def reportcount every 1 minutes, rate grouptransops 10000 MAP orclpdb.scott.tab2, TARGET scott.tab2;

5、参数配置: custom_kafka_producer.properties文件内容如下:

bootstrap.servers=10.1.1.246:9200,10.1.1.247:9200 --只需要改动这一行就行,指定kafka的地址和端口号 acks=1 reconnect.backoff.ms=1000 value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer batch.size=16384 linger.ms=10000

kafka1.props文件内容如下: gg.handlerlist = kafkahandler gg.handler.kafkahandler.type=kafka gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties #The following resolves the topic name using the short table name gg.handler.kafkahandler.topicMappingTemplate= topic1 #gg.handler.kafkahandler.format=avro_op gg.handler.kafkahandler.format =json --这里做了改动,指定格式为json格式 gg.handler.kafkahandler.format.insertOpKey=I gg.handler.kafkahandler.format.updateOpKey=U gg.handler.kafkahandler.format.deleteOpKey=D gg.handler.kafkahandler.format.truncateOpKey=T gg.handler.kafkahandler.format.prettyPrint=false gg.handler.kafkahandler.format.jsonDelimiter=CDATA[] gg.handler.kafkahandler.format.includePrimaryKeys=true --包含主键 gg.handler.kafkahandler.SchemaTopicName= topic1 --此处指定为要同步到的目标topic名字 gg.handler.kafkahandler.BlockingSend =false gg.handler.kafkahandler.includeTokens=false gg.handler.kafkahandler.mode=op goldengate.userexit.timestamp=utc goldengate.userexit.writers=javawriter javawriter.stats.display=TRUE javawriter.stats.full=TRUE gg.log=log4j gg.log.level=INFO gg.report.time=30sec #Sample gg.classpath for Apache Kafka gg.classpath=dirprm/:/opt/cloudera/parcels/KAFKA/lib/kafka/libs/ --指定classpath,这里很重要,必须有kafka安装文件的类库。 #Sample gg.classpath for HDP #gg.classpath=/etc/kafka/conf:/usr/hdp/current/kafka-broker/libs/ javawriter.bootoptions=-Xmx512m -Xms32m -Djava.class.path=ggjava/ggjava.jar

启动进程进程恢复: 1、启动源端抓取进程 GGSCI> start EXT_KAF1 2、启动源端投递进程 GGSCI> start PMP_KAF1 3、启动源端初始化进程 GGSCI> start ek_01 4、启动目标端初始化进程 在$OGG_HOME下执行如下命令: ./replicat paramfile ./dirprm/rp_01.prm reportfile ./dirrpt/rp_01.rpt -p INITIALDATALOAD 5、启动目标端恢复进程 GGSCI> start R_KAF1

遇到的错误: 1、ERROR OGG-15050 Error loading Java VM runtime library(2 no such file or directory)

原因:找不到类库(配置好环境变量之后,OGG的mgr进程没有重启,导致的) 解决:重启MGR进程

2、ERROR OG-15051 Java or JNI exception

原因:没有使用ogg12.3.1.1.1自带的kafka.props,而是copy了ogg12.2的kafka.props,导致出现异常。 解决:使用ogg12.3.1.1.1自带的kafka.props,并指定相关的属性,解决。

0 人点赞