概述
Databus Relays主要负责以下两个工作:
- 从databus源数据库中读取变化行,并序列化为事件流保存至内存中;
- 接受客户端的请求,并将数据变化事件流返回给客户端。
技术架构
- Event Producer:用来读取数据库的变化事件,转化为AVRO类型并存储至内存中;
- Circular Buffer:Relay有一个或多个环形的缓冲池用来保存按递增的系统变化号(SCN) 为顺序的变化事件;
- SCN Writer/Reader:用来读取和写入SCN号至硬盘;
- RESTFUL interface:它暴露一个restful接口,用来推送数据变化事件至客户端。
数据抓取
目前支持Oracle和Mysql两种数据源的抓取。
Oracle数据抓取
抓取Oracle数据是通过给源表添加一个触发器,在新增和修改的时候记录SCN号作为查询的依据,通过relay定期的查询获取变化的数据。删除和查询不受影响。
Oracle数据库配置
首次部署databus,主要有以下几个步骤(非首次部署,只需要从步骤3开始):
- 创建databus表空间、用户、赋权限
createUser.sh
-- 创建databus表空间,databus用户,并给databus付权限(参见createUser.sql),注意需要指定datafile,如果表空间名字修改了,则需要修改tablespace文件
create
tablespace TBS_DATABUS datafile
'${DBDIR}/tbs_databus_01.dbf'
size
50M reuse autoextend
on
next
50M maxsize unlimited extent management
local
uniform
size
2M;
create
tablespace TBS_DATABUS_IDX datafile
'${DBDIR}/tbs_databus_idx_01.dbf'
size
50M reuse autoextend
on
next
50M maxsize unlimited extent management
local
uniform
size
2M;
create
user
DATABUS identified
by
DATABUS
default
tablespace TBS_DATABUS
temporary
tablespace temp1;
grant
create
session,
create
table
,
create
view
,
create
sequence
,
create
procedure
,
create
trigger
,
create
type,
create
job
to
DATABUS;
grant
query rewrite
to
DATABUS;
grant
execute
on
dbms_alert
to
DATABUS;
grant
execute
on
sys.dbms_lock
to
DATABUS;
grant
select
on
sys.v_$
database
to
DATABUS;
grant
execute
on
sys.dbms_aq
to
DATABUS;
grant
execute
on
sys.dbms_aqadm
to
DATABUS;
grant
execute
on
sys.dbms_aqin
to
DATABUS;
grant
execute
on
sys.dbms_aq_bqview
to
DATABUS;
alter
user
DATABUS quota unlimited
on
TBS_DATABUS;
alter
user
DATABUS quota unlimited
on
TBS_DATABUS_IDX;
- DB | perl -lane '{my a = _; a =~ s/([^/]*)/.*/1/; print DB | perl -lane '{my a = _; a =~ s/[^/]*/([^@]*)@.*/1/; print DB | perl -lane '{my a = _; a =~ s/[^/]*/[^@]*@(.*)/1/; print t | perl -lane '{ a = _; a =~ s/.*/(.*).tab/1/; print a; }' ` # echo "Creating Table t " {password} {tbs_lc}" # sqlplus {DB} << __EOF__ # @{user} {password} t | perl -lane '{ a = _; a =~ s/.*/(.*).tab/1/; print a; }' ` echo "Setting up Alerts for t | perl -lane '{ a = _; a =~ s/.*/(.*).view/1/; print {view}" ; # wc=`grep -ic "wc -lt 1 ]; # then # echo "View names should start with sy{DB} << __EOF__ # @{user} {password} t | perl -lane '{ a = _; a =~ s/.*/(.*).tab/1/; print t | perl -lane '{ a = _; a =~ s/.*/(.*).tab/1/; print t | perl -lane '{ a = _; a =~ s/.*/(.*).tab/1/; print
- 修改表结构,增加一列 TXN NUMBER(posp_boss)
person.sql
-- 创建表,添加TXN列
create
table
posp_boss.person
(
id number
primary
key
,
first_name
varchar
(120)
not
null
,
last_name
varchar
(120)
not
null
,
birth_date
date
,
deleted
varchar
(5)
default
'false'
not
null
,
txn number
);
- 将源表权限赋给databus(posp_boss)
grant.sql
grant
insert
,
update
,
select
on
posp_boss.PERSON
to
databus;
- 给posp_boss赋databus.sync_core包的执行权限(databus)
grant.sql
grant
execute
on
databus.sync_core
to
posp_boss;
- 创建索引(posp_boss)
index.sql
-- 创建索引(posp_boss)
create
index
posp_boss.PERSON_txn
on
POSP_BOSS.PERSON(txn) tablespace index_data;
- 创建表视图,注意一定要把TXN列包括进去,并且要把ID映射为KEY
book_vw.sql
-- 主键ID 映射为 KEY
CREATE
OR
REPLACE
FORCE
VIEW
sy$person
AS
SELECT
txn,
id
key
,
first_name,
last_name,
birth_date,
deleted
FROM
posp_boss.person;
- 新增sy$sources表配置,注意value的值必须小于等于125
insert.sql
-- 注意sourceName区分大小写,对应到上面触发器里面的sync_core.getTxn('POSP_BOSS.PERSON')的POSP_BOSS.PERSON
insert
into
databus.sy$sources
values
(
'POSP_BOSS.PERSON'
,1);
- 创建触发器
trigger.sql
-- 注意和sy$sources插入的值一致
CREATE TRIGGER PERSON_TRG
before insert or update on POSP_BOSS.PERSON
referencing old as old
new
as
new
for
each row
begin
if
(updating and :
new
.txn <
0
) then
:
new
.txn := -:
new
.txn;
else
:
new
.txn := databus.sync_core.getTxn(
'POSP_BOSS.PERSON'
);
end
if
;
end;
至此,针对于Oracle的数据抓取数据端的配置就全部配置完毕了。
Mysql数据抓取
Mysql的数据抓取比较简单
- 创建一个slave的帐号,因为binlog日志分析是基于主从复制的模式来实现的
- 开启Mysql的binlog日志,设置日志名称,这个名称是后面需要用到的,默认mysql-bin,注意,binlog日志默认是不开启的,开启后需要重启mysql服务
- 设置binlog日志格式为ROW,默认是STATEMENT。binlog_format = ROW ,只有ROW模式才会记录受影响的行数,Databus默认只获取影响行数的事件
my.cnf
server-id =
1
log_bin = mysql-bin
expire_logs_days =
10
max_binlog_size = 100M
binlog_format = ROW
- 配置数据源,注意sources的id必须与sy$sources中的value一致
sources.json
{
"name"
:
"boss"
,
"id"
:
1
,
"uri"
:
"mysql://repl/123456@localhost:3306/1/mysql-bin"
,
"slowSourceQueryThreshold"
:
2000
,
"sources"
:
[
{
"id"
:
1
,
"name"
:
"com.linkedin.events.example.person.Person"
,
"uri"
:
"lijiang.person"
,
"partitionFunction"
:
"constant:1"
}
]
}
uri的格式为:mysql://用户/密码@host:port/serverID/binlog文件名称 另外需要注意sources里对应数据源的uri,必需带上数据库名称,格式为 db.table - 对于Mysql的数据抓取,很多数据类型在Avro序列化时会被转换为string
部署normal_replay
- 配置relay sources,sources的id必须与sy$sources的value一致。注意oracle和mysql的配置是不一样的。
source.json
# oracle
{
"name"
:
"person"
,
"id"
:
1
,
"uri"
:
"jdbc:oracle:thin:lijiang/lijiang@192.168.16.239:51521:afc1"
,
"slowSourceQueryThreshold"
:
2000
,
"sources"
:
[
{
"id"
:
1
,
"name"
:
"com.linkedin.events.example.person.Person"
,
"uri"
:
"lijiang.person"
,
"partitionFunction"
:
"constant:1"
}
]
}
# mysql
{
"name"
:
"boss"
,
"id"
:
1
,
"uri"
:
"mysql://repl/123456@localhost:3306/1/mysql-bin"
,
"slowSourceQueryThreshold"
:
2000
,
"sources"
:
[
{
"id"
:
1
,
"name"
:
"com.linkedin.events.example.person.Person"
,
"uri"
:
"lijiang.person"
,
"partitionFunction"
:
"constant:1"
}
]
}
- 添加 avro 配置文件至schemas_registry文件夹中,关于avro的详细结束参见Apache Avro
book.avsc
{
"name"
:
"Person_V1"
,
"doc"
:
"Auto-generated Avro schema for sy$person. Generated at Dec 04, 2012 05:07:05 PM PST"
,
"type"
:
"record"
,
"meta"
:
"dbFieldName=sy$person;pk=key;"
,
"namespace"
:
"com.linkedin.events.example.person"
,
"fields"
: [ {
"name"
:
"txn"
,
"type"
: [
"long"
,
"null"
],
"meta"
:
"dbFieldName=TXN;dbFieldPosition=0;"
}, {
"name"
:
"key"
,
"type"
: [
"long"
,
"null"
],
"meta"
:
"dbFieldName=KEY;dbFieldPosition=1;"
}, {
"name"
:
"firstName"
,
"type"
: [
"string"
,
"null"
],
"meta"
:
"dbFieldName=FIRST_NAME;dbFieldPosition=2;"
}, {
"name"
:
"lastName"
,
"type"
: [
"string"
,
"null"
],
"meta"
:
"dbFieldName=LAST_NAME;dbFieldPosition=3;"
}, {
"name"
:
"birthDate"
,
"type"
: [
"long"
,
"null"
],
"meta"
:
"dbFieldName=BIRTH_DATE;dbFieldPosition=4;"
}, {
"name"
:
"deleted"
,
"type"
: [
"string"
,
"null"
],
"meta"
:
"dbFieldName=DELETED;dbFieldPosition=5;"
} ]
}
- 启动relay
starup.sh
./bin/startup.sh relay
至此,Relay和数据库都已经配置和部署完成!
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
发布者:全栈程序员栈长,转载请注明出处:https://javaforall.cn/181396.html原文链接:https://javaforall.cn