概述
Databus Relays主要负责以下两个工作:
- 从databus源数据库中读取变化行,并序列化为事件流保存至内存中;
- 接受客户端的请求,并将数据变化事件流返回给客户端。
技术架构
- Event Producer:用来读取数据库的变化事件,转化为AVRO类型并存储至内存中;
- Circular Buffer:Relay有一个或多个环形的缓冲池用来保存按递增的系统变化号(SCN) 为顺序的变化事件;
- SCN Writer/Reader:用来读取和写入SCN号至硬盘;
- RESTFUL interface:它暴露一个restful接口,用来推送数据变化事件至客户端。
数据抓取
目前支持Oracle和Mysql两种数据源的抓取。
Oracle数据抓取
抓取Oracle数据是通过给源表添加一个触发器,在新增和修改的时候记录SCN号作为查询的依据,通过relay定期的查询获取变化的数据。删除和查询不受影响。
Oracle数据库配置
首次部署databus,主要有以下几个步骤(非首次部署,只需要从步骤3开始):
- 创建databus表空间、用户、赋权限
createUser.sh
-- 创建databus表空间,databus用户,并给databus付权限(参见createUser.sql),注意需要指定datafile,如果表空间名字修改了,则需要修改tablespace文件createtablespace TBS_DATABUS datafile'${DBDIR}/tbs_databus_01.dbf'size50M reuse autoextendonnext50M maxsize unlimited extent managementlocaluniformsize2M;createtablespace TBS_DATABUS_IDX datafile'${DBDIR}/tbs_databus_idx_01.dbf'size50M reuse autoextendonnext50M maxsize unlimited extent managementlocaluniformsize2M;createuserDATABUS identifiedbyDATABUSdefaulttablespace TBS_DATABUStemporarytablespace temp1;grantcreatesession,createtable,createview,createsequence,createprocedure,createtrigger,createtype,createjobtoDATABUS;grantquery rewritetoDATABUS;grantexecuteondbms_alerttoDATABUS;grantexecuteonsys.dbms_locktoDATABUS;grantselectonsys.v_$databasetoDATABUS;grantexecuteonsys.dbms_aqtoDATABUS;grantexecuteonsys.dbms_aqadmtoDATABUS;grantexecuteonsys.dbms_aqintoDATABUS;grantexecuteonsys.dbms_aq_bqviewtoDATABUS;alteruserDATABUS quota unlimitedonTBS_DATABUS;alteruserDATABUS quota unlimitedonTBS_DATABUS_IDX; - DB | perl -lane '{my a = _; a =~ s/([^/]*)/.*/1/; print DB | perl -lane '{my a = _; a =~ s/[^/]*/([^@]*)@.*/1/; print DB | perl -lane '{my a = _; a =~ s/[^/]*/[^@]*@(.*)/1/; print t | perl -lane '{ a = _; a =~ s/.*/(.*).tab/1/; print a; }' ` # echo "Creating Table t " {password} {tbs_lc}" # sqlplus {DB} << __EOF__ # @{user} {password} t | perl -lane '{ a = _; a =~ s/.*/(.*).tab/1/; print a; }' ` echo "Setting up Alerts for t | perl -lane '{ a = _; a =~ s/.*/(.*).view/1/; print {view}" ; # wc=`grep -ic "wc -lt 1 ]; # then # echo "View names should start with sy{DB} << __EOF__ # @{user} {password} t | perl -lane '{ a = _; a =~ s/.*/(.*).tab/1/; print t | perl -lane '{ a = _; a =~ s/.*/(.*).tab/1/; print t | perl -lane '{ a = _; a =~ s/.*/(.*).tab/1/; print
- 修改表结构,增加一列 TXN NUMBER(posp_boss)
person.sql
-- 创建表,添加TXN列createtableposp_boss.person(id numberprimarykey,first_namevarchar(120)notnull,last_namevarchar(120)notnull,birth_datedate,deletedvarchar(5)default'false'notnull,txn number); - 将源表权限赋给databus(posp_boss)
grant.sql
grantinsert,update,selectonposp_boss.PERSONtodatabus; - 给posp_boss赋databus.sync_core包的执行权限(databus)
grant.sql
grantexecuteondatabus.sync_coretoposp_boss; - 创建索引(posp_boss)
index.sql
-- 创建索引(posp_boss)createindexposp_boss.PERSON_txnonPOSP_BOSS.PERSON(txn) tablespace index_data; - 创建表视图,注意一定要把TXN列包括进去,并且要把ID映射为KEY
book_vw.sql
-- 主键ID 映射为 KEYCREATEORREPLACEFORCEVIEWsy$personASSELECTtxn,idkey,first_name,last_name,birth_date,deletedFROMposp_boss.person; - 新增sy$sources表配置,注意value的值必须小于等于125
insert.sql
-- 注意sourceName区分大小写,对应到上面触发器里面的sync_core.getTxn('POSP_BOSS.PERSON')的POSP_BOSS.PERSONinsertintodatabus.sy$sourcesvalues('POSP_BOSS.PERSON',1); - 创建触发器
trigger.sql
-- 注意和sy$sources插入的值一致CREATE TRIGGER PERSON_TRGbefore insert or update on POSP_BOSS.PERSONreferencing old as oldnewasnewforeach rowbeginif(updating and :new.txn <0) then:new.txn := -:new.txn;else:new.txn := databus.sync_core.getTxn('POSP_BOSS.PERSON');endif;end;至此,针对于Oracle的数据抓取数据端的配置就全部配置完毕了。
Mysql数据抓取
Mysql的数据抓取比较简单
- 创建一个slave的帐号,因为binlog日志分析是基于主从复制的模式来实现的
- 开启Mysql的binlog日志,设置日志名称,这个名称是后面需要用到的,默认mysql-bin,注意,binlog日志默认是不开启的,开启后需要重启mysql服务
- 设置binlog日志格式为ROW,默认是STATEMENT。binlog_format = ROW ,只有ROW模式才会记录受影响的行数,Databus默认只获取影响行数的事件
my.cnf
server-id =1log_bin = mysql-binexpire_logs_days =10max_binlog_size = 100Mbinlog_format = ROW - 配置数据源,注意sources的id必须与sy$sources中的value一致
sources.json
{"name":"boss","id":1,"uri":"mysql://repl/123456@localhost:3306/1/mysql-bin","slowSourceQueryThreshold":2000,"sources":[{"id":1,"name":"com.linkedin.events.example.person.Person","uri":"lijiang.person","partitionFunction":"constant:1"}]}uri的格式为:mysql://用户/密码@host:port/serverID/binlog文件名称 另外需要注意sources里对应数据源的uri,必需带上数据库名称,格式为 db.table - 对于Mysql的数据抓取,很多数据类型在Avro序列化时会被转换为string
部署normal_replay
- 配置relay sources,sources的id必须与sy$sources的value一致。注意oracle和mysql的配置是不一样的。
source.json
# oracle{"name":"person","id":1,"uri":"jdbc:oracle:thin:lijiang/lijiang@192.168.16.239:51521:afc1","slowSourceQueryThreshold":2000,"sources":[{"id":1,"name":"com.linkedin.events.example.person.Person","uri":"lijiang.person","partitionFunction":"constant:1"}]}# mysql{"name":"boss","id":1,"uri":"mysql://repl/123456@localhost:3306/1/mysql-bin","slowSourceQueryThreshold":2000,"sources":[{"id":1,"name":"com.linkedin.events.example.person.Person","uri":"lijiang.person","partitionFunction":"constant:1"}]} - 添加 avro 配置文件至schemas_registry文件夹中,关于avro的详细结束参见Apache Avro
book.avsc
{"name":"Person_V1","doc":"Auto-generated Avro schema for sy$person. Generated at Dec 04, 2012 05:07:05 PM PST","type":"record","meta":"dbFieldName=sy$person;pk=key;","namespace":"com.linkedin.events.example.person","fields": [ {"name":"txn","type": ["long","null"],"meta":"dbFieldName=TXN;dbFieldPosition=0;"}, {"name":"key","type": ["long","null"],"meta":"dbFieldName=KEY;dbFieldPosition=1;"}, {"name":"firstName","type": ["string","null"],"meta":"dbFieldName=FIRST_NAME;dbFieldPosition=2;"}, {"name":"lastName","type": ["string","null"],"meta":"dbFieldName=LAST_NAME;dbFieldPosition=3;"}, {"name":"birthDate","type": ["long","null"],"meta":"dbFieldName=BIRTH_DATE;dbFieldPosition=4;"}, {"name":"deleted","type": ["string","null"],"meta":"dbFieldName=DELETED;dbFieldPosition=5;"} ]} - 启动relay
starup.sh
./bin/startup.sh relay
至此,Relay和数据库都已经配置和部署完成!
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
发布者:全栈程序员栈长,转载请注明出处:https://javaforall.cn/181396.html原文链接:https://javaforall.cn


