Databus Relays

2022-11-03 17:28:12 浏览数 (3)

概述

Databus Relays主要负责以下两个工作:

  • 从databus源数据库中读取变化行,并序列化为事件流保存至内存中;
  • 接受客户端的请求,并将数据变化事件流返回给客户端。

技术架构

  • Event Producer:用来读取数据库的变化事件,转化为AVRO类型并存储至内存中;
  • Circular Buffer:Relay有一个或多个环形的缓冲池用来保存按递增的系统变化号(SCN) 为顺序的变化事件;
  • SCN Writer/Reader:用来读取和写入SCN号至硬盘;
  • RESTFUL interface:它暴露一个restful接口,用来推送数据变化事件至客户端。

数据抓取

目前支持Oracle和Mysql两种数据源的抓取。

Oracle数据抓取

抓取Oracle数据是通过给源表添加一个触发器,在新增和修改的时候记录SCN号作为查询的依据,通过relay定期的查询获取变化的数据。删除和查询不受影响。

Oracle数据库配置

首次部署databus,主要有以下几个步骤(非首次部署,只需要从步骤3开始):

  1. 创建databus表空间、用户、赋权限 createUser.sh -- 创建databus表空间,databus用户,并给databus付权限(参见createUser.sql),注意需要指定datafile,如果表空间名字修改了,则需要修改tablespace文件 create tablespace TBS_DATABUS datafile '${DBDIR}/tbs_databus_01.dbf' size 50M reuse autoextend on next 50M maxsize unlimited extent management local uniform size 2M; create tablespace TBS_DATABUS_IDX datafile '${DBDIR}/tbs_databus_idx_01.dbf' size 50M reuse autoextend on next 50M maxsize unlimited extent management local uniform size 2M; create user DATABUS identified by DATABUS default tablespace TBS_DATABUS temporary tablespace temp1; grant create session, create table , create view , create sequence , create procedure , create trigger , create type, create job to DATABUS; grant query rewrite to DATABUS; grant execute on dbms_alert to DATABUS; grant execute on sys.dbms_lock to DATABUS; grant select on sys.v_$ database to DATABUS; grant execute on sys.dbms_aq to DATABUS; grant execute on sys.dbms_aqadm to DATABUS; grant execute on sys.dbms_aqin to DATABUS; grant execute on sys.dbms_aq_bqview to DATABUS; alter user DATABUS quota unlimited on TBS_DATABUS; alter user DATABUS quota unlimited on TBS_DATABUS_IDX;
  2. DB | perl -lane '{my a = _; a =~ s/([^/]*)/.*/1/; print DB | perl -lane '{my a = _; a =~ s/[^/]*/([^@]*)@.*/1/; print DB | perl -lane '{my a = _; a =~ s/[^/]*/[^@]*@(.*)/1/; print t | perl -lane '{ a = _; a =~ s/.*/(.*).tab/1/; print a; }' ` # echo "Creating Table t " {password} {tbs_lc}" # sqlplus {DB} << __EOF__ # @{user} {password} t | perl -lane '{ a = _; a =~ s/.*/(.*).tab/1/; print a; }' ` echo "Setting up Alerts for t | perl -lane '{ a = _; a =~ s/.*/(.*).view/1/; print {view}" ; # wc=`grep -ic "wc -lt 1 ]; # then # echo "View names should start with sy{DB} << __EOF__ # @{user} {password} t | perl -lane '{ a = _; a =~ s/.*/(.*).tab/1/; print t | perl -lane '{ a = _; a =~ s/.*/(.*).tab/1/; print t | perl -lane '{ a = _; a =~ s/.*/(.*).tab/1/; print
  3. 修改表结构,增加一列 TXN NUMBER(posp_boss) person.sql -- 创建表,添加TXN列 create table posp_boss.person ( id number primary key , first_name varchar (120) not null , last_name varchar (120) not null , birth_date date , deleted varchar (5) default 'false' not null , txn number );
  4. 将源表权限赋给databus(posp_boss) grant.sql grant insert , update , select on posp_boss.PERSON to databus;
  5. 给posp_boss赋databus.sync_core包的执行权限(databus) grant.sql grant execute on databus.sync_core to posp_boss;
  6. 创建索引(posp_boss) index.sql -- 创建索引(posp_boss) create index posp_boss.PERSON_txn on POSP_BOSS.PERSON(txn) tablespace index_data;
  7. 创建表视图,注意一定要把TXN列包括进去,并且要把ID映射为KEY book_vw.sql -- 主键ID 映射为 KEY CREATE OR REPLACE FORCE VIEW sy$person AS SELECT txn, id key , first_name, last_name, birth_date, deleted FROM posp_boss.person;
  8. 新增sy$sources表配置,注意value的值必须小于等于125 insert.sql -- 注意sourceName区分大小写,对应到上面触发器里面的sync_core.getTxn('POSP_BOSS.PERSON')的POSP_BOSS.PERSON insert into databus.sy$sources values ( 'POSP_BOSS.PERSON' ,1);
  9. 创建触发器 trigger.sql -- 注意和sy$sources插入的值一致 CREATE TRIGGER PERSON_TRG before insert or update on POSP_BOSS.PERSON referencing old as old new as new for each row begin if (updating and : new .txn < 0 ) then : new .txn := -: new .txn; else : new .txn := databus.sync_core.getTxn( 'POSP_BOSS.PERSON' ); end if ; end; 至此,针对于Oracle的数据抓取数据端的配置就全部配置完毕了。

Mysql数据抓取

Mysql的数据抓取比较简单

  • 创建一个slave的帐号,因为binlog日志分析是基于主从复制的模式来实现的
  • 开启Mysql的binlog日志,设置日志名称,这个名称是后面需要用到的,默认mysql-bin,注意,binlog日志默认是不开启的,开启后需要重启mysql服务
  • 设置binlog日志格式为ROW,默认是STATEMENT。binlog_format = ROW ,只有ROW模式才会记录受影响的行数,Databus默认只获取影响行数的事件 my.cnf server-id = 1 log_bin = mysql-bin expire_logs_days = 10 max_binlog_size = 100M binlog_format = ROW
  • 配置数据源,注意sources的id必须与sy$sources中的value一致 sources.json { "name" : "boss" , "id" : 1 , "uri" : "mysql://repl/123456@localhost:3306/1/mysql-bin" , "slowSourceQueryThreshold" : 2000 , "sources" : [ { "id" : 1 , "name" : "com.linkedin.events.example.person.Person" , "uri" : "lijiang.person" , "partitionFunction" : "constant:1" } ] } uri的格式为:mysql://用户/密码@host:port/serverID/binlog文件名称 另外需要注意sources里对应数据源的uri,必需带上数据库名称,格式为 db.table
  • 对于Mysql的数据抓取,很多数据类型在Avro序列化时会被转换为string

部署normal_replay

  1. 配置relay sources,sources的id必须与sy$sources的value一致。注意oracle和mysql的配置是不一样的。 source.json # oracle { "name" : "person" , "id" : 1 , "uri" : "jdbc:oracle:thin:lijiang/lijiang@192.168.16.239:51521:afc1" , "slowSourceQueryThreshold" : 2000 , "sources" : [ { "id" : 1 , "name" : "com.linkedin.events.example.person.Person" , "uri" : "lijiang.person" , "partitionFunction" : "constant:1" } ] } # mysql { "name" : "boss" , "id" : 1 , "uri" : "mysql://repl/123456@localhost:3306/1/mysql-bin" , "slowSourceQueryThreshold" : 2000 , "sources" : [ { "id" : 1 , "name" : "com.linkedin.events.example.person.Person" , "uri" : "lijiang.person" , "partitionFunction" : "constant:1" } ] }
  2. 添加 avro 配置文件至schemas_registry文件夹中,关于avro的详细结束参见Apache Avro book.avsc { "name" : "Person_V1" , "doc" : "Auto-generated Avro schema for sy$person. Generated at Dec 04, 2012 05:07:05 PM PST" , "type" : "record" , "meta" : "dbFieldName=sy$person;pk=key;" , "namespace" : "com.linkedin.events.example.person" , "fields" : [ { "name" : "txn" , "type" : [ "long" , "null" ], "meta" : "dbFieldName=TXN;dbFieldPosition=0;" }, { "name" : "key" , "type" : [ "long" , "null" ], "meta" : "dbFieldName=KEY;dbFieldPosition=1;" }, { "name" : "firstName" , "type" : [ "string" , "null" ], "meta" : "dbFieldName=FIRST_NAME;dbFieldPosition=2;" }, { "name" : "lastName" , "type" : [ "string" , "null" ], "meta" : "dbFieldName=LAST_NAME;dbFieldPosition=3;" }, { "name" : "birthDate" , "type" : [ "long" , "null" ], "meta" : "dbFieldName=BIRTH_DATE;dbFieldPosition=4;" }, { "name" : "deleted" , "type" : [ "string" , "null" ], "meta" : "dbFieldName=DELETED;dbFieldPosition=5;" } ] }
  3. 启动relay starup.sh ./bin/startup.sh relay

至此,Relay和数据库都已经配置和部署完成!

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

发布者:全栈程序员栈长,转载请注明出处:https://javaforall.cn/181396.html原文链接:https://javaforall.cn

1 人点赞