使用ogg实现oracle到kafka的增量数据实时同步

2022-12-01 19:47:42 浏览数 (1)

Oracle Golden Gate软件是一种基于日志的结构化数据复制备份软件,它通过解析源数据库在线日志或归档日志获得数据的增量变化,再将这些变化应用到目标数据库,从而实现源数据库与目标数据库同步。

0、本篇中源端和目标端的一些配置信息:

-

版本

OGG版本

id地址

源端

Oracle11gR2

Oracle GoldenGate 11.2.1.0.1 for Oracle on Linux x86-64

Carlota3

目标端

kafka_2.12-2.5.0

Linux x86-64上的Oracle GoldenGate for Big Data 19.1.0.0.1

Carlota2

源端和目标端的文件不一样,目标端需要下载Oracle GoldenGate for Big Data,源端需要下载Oracle GoldenGate for Oracle!

PS:源端是安装好了Oracle的机器,目标端是安装好了Kafka的机器,二者环境变量之前都配置好了。

1、源端OGG安装

先建立ogg目录

代码语言:javascript复制
mkdir -p /opt/ogg

解压zip文件

代码语言:javascript复制
unzip ogg112101_fbo_ggs_Linux_x64_ora11g_64bit.zip

解压后得到一个tar包,再解压这个tar

代码语言:javascript复制
tar xf fbo_ggs_Linux_x64_ora11g_64bit.tar -C /opt/ogg

使oracle用户有ogg的权限,后面有些需要在oracle用户下执行才能成功

代码语言:javascript复制
chown -R oracle:oinstall /data/ogg 

配置OGG环境变量

代码语言:javascript复制
vim /etc/profile

export OGG_HOME=/opt/ogg export LD_LIBRARY_PATH=

ORACLE_HOME/lib:/usr/lib export PATH=

OGG_HOME:$PATH

代码语言:javascript复制
source /etc/profile

2、目标端OGG安装

先建立ogg目录

代码语言:javascript复制
mkdir -p /data/ogg

解压zip文件

代码语言:javascript复制
unzip OGG_BigData_Linux_x64_19.1.0.0.1.zip

解压后得到一个tar包,再解压这个tar

代码语言:javascript复制
tar xf OGG_BigData_Linux_x64_19.1.0.0.1.tar

使oracle用户有ogg的权限,后面有些需要在oracle用户下执行才能成功

代码语言:javascript复制
chown -R oracle:oinstall /data/ogg 

配置OGG环境变量

代码语言:javascript复制
vim /etc/profile

export OGG_HOME=/opt/ogg export LD_LIBRARY_PATH=

JAVA_HOME/jre/lib/amd64:

JAVA_HOME/jre/lib/amd64/server:

JAVA_HOME/jre/lib/amd64/libjsig.so:

JAVA_HOME/jre/lib/amd64/server/libjvm.so:

OGG_HOME/lib export PATH=

OGG_HOME:$PATH

代码语言:javascript复制
source /etc/profile
代码语言:javascript复制
ggsci
代码语言:javascript复制
create subdirs

3、源端Oracle归档模式设置

登陆Oracle用户

代码语言:javascript复制
su - oracle

登陆Oracle

代码语言:javascript复制
sqlplus / as sysdba

查看当前是否为归档模式(若为Disabled,则需手动打开)

代码语言:javascript复制
archive log list 

Database log mode No Archive Mode Automatic archival Disabled Archive destination USE_DB_RECOVERY_FILE_DEST Oldest online log sequence 12 Current log sequence 14

立即关闭数据库

代码语言:javascript复制
shutdown immediate 

启动实例并加载数据库,但不打开

代码语言:javascript复制
startup mount 

更改数据库为归档模式

代码语言:javascript复制
alter database archivelog; 

打开数据库

代码语言:javascript复制
alter database open;

启用自动归档

代码语言:javascript复制
alter system archive log start; 

再次查看当前是否为归档模式(看到为Enabled,则成功打开归档模式。)

代码语言:javascript复制
archive log list 

Database log mode Archive Mode Automatic archival Enabled Archive destination USE_DB_RECOVERY_FILE_DEST Oldest online log sequence 12 Next log sequence to archive 14 Current log sequence 14

查看辅助日志状态(若为NO,则需要通过命令修改)

代码语言:javascript复制
select force_logging, supplemental_log_data_min from v$database;

FORCE_ SUPPLEMENTAL_LOG


NO NO

代码语言:javascript复制
alter database force logging;
代码语言:javascript复制
alter database add supplemental log data;

再次查看辅助日志状态(为YES即可)

代码语言:javascript复制
select force_logging, supplemental_log_data_min from v$database;

FORCE_ SUPPLEMENTAL_LOG


YES YES

4、源端oracle创建复制用户

root用户建立相关文件夹,并赋予权限

代码语言:javascript复制
mkdir -p /data/oracle/oggdata/orcl
代码语言:javascript复制
chown -R oracle:oinstall /data/oracle/oggdata/orcl

执行下面sql

代码语言:javascript复制
SQL> create tablespace oggtbs datafile '/data/oracle/oggdata/orcl/oggtbs01.dbf' size 1000M autoextend on;

Tablespace created.

SQL>  create user ogg identified by ogg default tablespace oggtbs;

User created.

SQL> grant dba to ogg;

Grant succeeded.

Oracle创建测试表

代码语言:javascript复制
create user test_ogg  identified by test_ogg default tablespace users;
grant dba to test_ogg;
conn test_ogg/test_ogg;
create table test_ogg(id int ,name varchar(20),primary key(id));

5、OGG源端配置

代码语言:javascript复制
ggsci
代码语言:javascript复制
create subdirs
代码语言:javascript复制
dblogin userid ogg password ogg
代码语言:javascript复制
edit param ./globals

oggschema ogg

配置管理器mgr

代码语言:javascript复制
edit param mgr

PORT 7809 DYNAMICPORTLIST 7810-7909 AUTORESTART EXTRACT *,RETRIES 5,WAITMINUTES 3 * PURGEOLDEXTRACTS ./dirdat/,usecheckpoints, minkeepdays 3

添加复制表

代码语言:javascript复制
add trandata test_ogg.test_ogg

info trandata test_ogg.test_ogg

配置extract进程(ORACLE_SID与Orcale中的相同)

代码语言:javascript复制
edit param extkafka

extract extkafka dynamicresolution SETENV (ORACLE_SID = “orcl11g”) SETENV (NLS_LANG = “american_america.AL32UTF8”) userid ogg,password ogg exttrail /da ta/ogg/dirdat/to table test_ogg.test_ogg;

代码语言:javascript复制
add extract extkafka,tranlog,begin now

若报错

ERROR: Could not create checkpoint file /opt/ogg/dirchk/EXTKAFKA.cpe (error 2, No such file or directory).

执行下面的命令再重新添加即可。

代码语言:javascript复制
create subdirs
代码语言:javascript复制
add exttrail /data/ogg/dirdat/to,extract extkafka

配置pump进程

代码语言:javascript复制
edit param pukafka

extract pukafka passthru dynamicresolution userid ogg,password ogg rmthost Carlota2 mgrport 7809 rmttrail /data/ogg/dirdat/to table test_ogg.test_ogg;

代码语言:javascript复制
add extract pukafka,exttrailsource /data/ogg/dirdat/to
代码语言:javascript复制
add rmttrail /data/ogg/dirdat/to,extract pukafka

配置define文件(Oracle与MySQL,Hadoop集群(HDFS,Hive,kafka等)等之间数据传输可以定义为异构数据类型的传输,故需要定义表之间的关系映射,)

代码语言:javascript复制
edit param test_ogg

defsfile /data/ogg/dirdef/test_ogg.test_ogg userid ogg,password ogg table test_ogg.test_ogg;

返回终端执行

代码语言:javascript复制
./defgen paramfile dirprm/test_ogg.prm

将生成的/data/ogg/dirdef/test_ogg.test_ogg发送的目标端ogg目录下的dirdef里:

代码语言:javascript复制
scp -r /data/ogg/dirdef/test_ogg.test_ogg root@Carlota2:/opt/ogg/dirdef/ 

6、OGG目标端配置

开启kafka服务

代码语言:javascript复制
zkServer.sh start

kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties 
代码语言:javascript复制
ggsic

配置管理器mgr

代码语言:javascript复制
edit param mgr

PORT 7809 DYNAMICPORTLIST 7810-7909 AUTORESTART EXTRACT *,RETRIES 5,WAITMINUTES 3 * PURGEOLDEXTRACTS ./dirdat/,usecheckpoints, minkeepdays 3

配置checkpoint

代码语言:javascript复制
edit  param  ./GLOBALS

CHECKPOINTTABLE test_ogg.checkpoint

配置replicate进程

代码语言:javascript复制
edit param rekafka

REPLICAT rekafka sourcedefs /data/ogg/dirdef/test_ogg.test_ogg TARGETDB LIBFILE libggjava.so SET property=dirprm/kafka.props REPORTCOUNT EVERY 1 MINUTES, RATE GROUPTRANSOPS 10000 MAP test_ogg.test_ogg, TARGET test_ogg.test_ogg;

配置kafka.props(去掉注释)

代码语言:javascript复制
cd /opt/ogg/dirprm/
vim kafka.props

gg.handlerlist=kafkahandler //handler类型 gg.handler.kafkahandler.type=kafka gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties //kafka相关配置 gg.handler.kafkahandler.topicMappingTemplate=test_ogg //kafka的topic名称,无需手动创建 gg.handler.kafkahandler.format=json //传输文件的格式,支持json,xml等 gg.handler.kafkahandler.mode=op //OGG for Big Data中传输模式,即op为一次SQL传输一次,tx为一次事务传输一次 gg.classpath=dirprm/:/usr/local/apps/kafka_2.12-2.5.0/libs/:/opt/ogg/:/opt/ogg/lib/

代码语言:javascript复制
vim custom_kafka_producer.properties

bootstrap.servers=192.168.44.129:9092 //kafkabroker的地址 acks=1 compression.type=gzip //压缩类型 reconnect.backoff.ms=1000 //重连延时 value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer batch.size=102400 linger.ms=10000

添加trail文件到replicate进程

代码语言:javascript复制
add replicat rekafka exttrail /data/ogg/dirdat/to,checkpointtable test_ogg.checkpoint

7、测试

在源端和目标端的OGG命令行下使用start [进程名]的形式启动所有进程。 启动顺序按照源mgr——目标mgr——源extract——源pump——目标replicate来完成。 全部需要在ogg目录下执行ggsci目录进入ogg命令行。 源端依次是

代码语言:javascript复制
start mgr
start extkafka
start pukafka

目标端

代码语言:javascript复制
start mgr
start rekafka

GGSCI (Carlota2) 1> info all Program Status Group Lag at Chkpt Time Since Chkpt MANAGER RUNNING REPLICAT RUNNING REKAFKA 00:00:00 00:00:08

GGSCI (Carlota3) 1> info all Program Status Group Lag at Chkpt Time Since Chkpt MANAGER RUNNING EXTRACT RUNNING EXTKAFKA 00:00:00 00:00:00 EXTRACT RUNNING PUKAFKA 00:00:00 00:00:10

现在源端执行sql语句

代码语言:javascript复制
conn test_ogg/test_ogg
insert into test_ogg values(1,'test');
commit;
update test_ogg set name='zhangsan' where id=1;
commit;
delete test_ogg where id=1;
commit;

查看源端trail文件状态

代码语言:javascript复制
ls -l /data/ogg/dirdat/to*

查看目标端trail文件状态

代码语言:javascript复制
ls -l /data/ogg/dirdat/to*

查看kafka是否自动建立对应的主题

代码语言:javascript复制
kafka-topics.sh --list --zookeeper localhost:2181

在列表中显示有test_ogg则表示没问题 通过消费者看是否有同步消息

代码语言:javascript复制
kafka-console-consumer.sh --bootstrap-server Carlota2:9092 --topic test_ogg --from-beginning

{“table”:“TEST_OGG.TEST_OGG”,“op_type”:“I”,“op_ts”:“2020-07-31 13:42:33.072327”,“current_ts”:“2020-07-31T13:42:38.928000”,“pos”:“00000000000000001066”,“after”:{“ID”:1,“NAME”:“test”}} {“table”:“TEST_OGG.TEST_OGG”,“op_type”:“U”,“op_ts”:“2020-07-31 13:42:46.005763”,“current_ts”:“2020-07-31T13:42:52.201000”,“pos”:“00000000000000001204”,“before”:{},“after”:{“ID”:1,“NAME”:“zhangsan”}} {“table”:“TEST_OGG.TEST_OGG”,“op_type”:“D”,“op_ts”:“2020-07-31 13:42:57.079268”,“current_ts”:“2020-07-31T13:43:02.231000”,“pos”:“00000000000000001347”,“before”:{“ID”:1}}

0 人点赞