发消息脚本文件 send_message.sh 内容如下:
代码语言:javascript复制#!/bin/bash
source ~/.bashrc
cd /home/mysql/cdc/
m=`date '%-M'`
t=`date '%Y%m%d%H%M%S'`
e=`cat regular_etl.log | grep ERROR | wc -l`
w=`cat regular_etl.log | grep Warnings | grep -v "Warnings: 0" | wc -l`
if [ $e -gt 0 ] && [ $w -gt 0 ]; then
cp regular_etl.log regular_etl.log.$t
curl -k 'https://oapi.dingtalk.com/robot/send?access_token=23f4ccfe131007f3f875dfb6b5bc604f7c2bdc3a5228d6453ada1b64178c0c1'
-H 'Content-Type: application/json'
-d '{"msgtype": "text","text": {"content":"songod 表数据同步有错误和警告!"}}'
elif [ $e -gt 0 ]; then
cp regular_etl.log regular_etl.log.$t
curl -k 'https://oapi.dingtalk.com/robot/send?access_token=23f4ccfe131007f3f875dfb6b5bc604f7c2bdc3a5228d6453ada1b64178c0c1'
-H 'Content-Type: application/json'
-d '{"msgtype": "text","text": {"content":"songod 表数据同步有错误!"}}'
elif [ $w -gt 0 ]; then
cp regular_etl.log regular_etl.log.$t
curl -k 'https://oapi.dingtalk.com/robot/send?access_token=23f4ccfe131007f3f875dfb6b5bc604f7c2bdc3a5228d6453ada1b64178c0c1'
-H 'Content-Type: application/json'
-d '{"msgtype": "text","text": {"content":"songod 表数据同步有警告!"}}'
#else
# if [ $m -ge 55 -o $m -lt 5 ]; then
# curl -k 'https://oapi.dingtalk.com/robot/send?access_token=23f4ccfe131007f3f875dfb6b5bc604f7c2bdc3a5228d6453ada1b64178c0c1'
# -H 'Content-Type: application/json'
# -d '{"msgtype": "text","text": {"content":"songod 表数据同步完成!"}}'
# fi
fi
变化数据捕获(CDC)脚本文件 regular_etl.sh 内容如下:
代码语言:javascript复制#!/bin/bash
source ~/.bash_profile
cd /home/mysql/cdc/
# 定期增量装载数据量很小,但不能部分装载,所以采用并行事务方式
date;
# 0点5分抽取最近5小时15分钟的变化数据,其它调度时间抽取最近15分钟的变化数据
currentTime=`date "%H%M"`
if [[ $currentTime = "0005" ]];
then
dt=`date -d "5 hour ago 15 minute ago" "%Y-%m-%d %H:%M:%S"`
# dt=`date -d "5 day ago 15 minute ago" "%Y-%m-%d %H:%M:%S"`
else
dt=`date -d "15 minute ago" "%Y-%m-%d %H:%M:%S"`
# dt=`date -d "15 day ago" "%Y-%m-%d %H:%M:%S"`
fi
# 第一步:导出增量数据到文件
# D1
mysql -uwxy -p123456 -h 10.0.0.1 -P3306 -N -e "select * from d1.t1 where CreateTime >= '${dt}'" > /home/mysql/cdc/d1.t1.txt &
mysql -uwxy -p123456 -h 10.0.0.1 -P3306 -N -e "select * from d1.t2 where CreateTime >= '${dt}'" > /home/mysql/cdc/d1.t2.txt &
mysql -uwxy -p123456 -h 10.0.0.1 -P3306 -N -e "select * from d1.t3 where CreateTime >= '${dt}'" > /home/mysql/cdc/d1.t3.txt &
# D2
mysql -uwxy -p123456 -h 10.0.0.2 -P3306 -N -e "select * from d2.t1 where CreateTime >= '${dt}'" > /home/mysql/cdc/d2.t1.txt &
mysql -uwxy -p123456 -h 10.0.0.2 -P3306 -N -e "select * from d2.t2 where CreateTime >= '${dt}'" > /home/mysql/cdc/d2.t2.txt &
mysql -uwxy -p123456 -h 10.0.0.2 -P3306 -N -e "select * from d2.t3 where CreateTime >= '${dt}'" > /home/mysql/cdc/d2.t3.txt &
# D3
mysql -uwxy -p123456 -h 10.0.0.3 -P3306 -N -e "select * from d3.t1 where CreateTime >= '${dt}'" > /home/mysql/cdc/d3.t1.txt &
mysql -uwxy -p123456 -h 10.0.0.3 -P3306 -N -e "select * from d3.t2 where CreateTime >= '${dt}'" > /home/mysql/cdc/d3.t2.txt &
mysql -uwxy -p123456 -h 10.0.0.3 -P3306 -N -e "select * from d3.t3 where CreateTime >= '${dt}'" > /home/mysql/cdc/d3.t3.txt &
wait
# 处理 NULL 值
sed 's/NULL/\N/g' -i *.txt
# 第一步:将增量数据导入过渡区
mysql -uwxy -p123456 -h10.10.10.10 -P3306 --local-infile -Dcdc -e "
-- 清空过渡区
truncate table cdc.t1;
truncate table cdc.t2;
truncate table cdc.t3;
-- 开启事务
set autocommit=0;
start transaction;
-- 装载过渡区
load data local infile '/home/mysql/cdc/d1.t1.txt' into table t1;show warnings;
load data local infile '/home/mysql/cdc/d1.t2.txt' into table t2;show warnings;
load data local infile '/home/mysql/cdc/d1.t3.txt' into table t3;show warnings;
-- 此处可以按需添加逻辑,如装载目标表前做一些 update 修改
-- 装载目标表
insert into d1.t1 select * from cdc.t1 t2 on duplicate key update
Name = t2.Name,
NickName = t2.NickName,
SpaceID = t2.SpaceID,
Brief = t2.Brief,
Piclink1 = t2.Piclink1,
Piclink2 = t2.Piclink2,
Remark = t2.Remark,
State = t2.State,
CreateTime = t2.CreateTime,
PinYinInitial = t2.PinYinInitial,
IconNo = t2.IconNo,
AuthDes = t2.AuthDes,
SkinID = t2.SkinID,
NameQP = t2.NameQP,
MusicChannelPic = t2.MusicChannelPic,
PopularityChange = t2.PopularityChange,
TeamFlag = t2.TeamFlag,
AuthState = t2.AuthState,
nicknamePinyin = t2.nicknamePinyin,
VVID = t2.VVID,
baselove = t2.baselove,
zpCount = t2.zpCount,
musicStyle = t2.musicStyle,
audience = t2.audience,
authorizeFile = t2.authorizeFile;show warnings;
insert into d1.t2 select * from cdc.t2 t2 on duplicate key update
State = t2.State,
Priority = t2.Priority,
CreateTime = t2.CreateTime,
Priority2 = t2.Priority2;show warnings;
insert into d1.t3 select * from cdc.t3 t2 on duplicate key update
wikiState = t2.wikiState,
createTime = t2.createTime,
updateTime = t2.updateTime;show warnings;
commit;" -vvv &
mysql -uwxy -p123456 -h10.10.10.11 -P3306 --local-infile -Dcdc -e "
truncate table cdc.t1;
truncate table cdc.t2;
truncate table cdc.t3;
-- 开启事务
set autocommit=0;
start transaction;
load data local infile '/home/mysql/cdc/d2.t1.txt' into table t1;show warnings;
load data local infile '/home/mysql/cdc/d2.t2.txt' into table t2;show warnings;
load data local infile '/home/mysql/cdc/d2.t3.txt' into table t3;show warnings;
insert into d2.t1 select * from cdc.t1 t2 on duplicate key update
wikiId = t2.wikiId,
status = t2.status,
createtime = t2.createtime,
updatetime = t2.updatetime;show warnings;
insert into d2.t2 select * from cdc.t2 t2 on duplicate key update
baseInfo = t2.baseInfo,
brief = t2.brief,
otherInfo = t2.otherInfo,
createtime = t2.createtime,
updatetime = t2.updatetime;show warnings;
insert into d2.t3 select * from cdc.t3 t2 on duplicate key update
songid = t2.songid,
subId = t2.subId,
createTime = t2.createTime,
updateTime = t2.updateTime;show warnings;
commit;" -vvv &
mysql -uwxy -p123456 -h10.10.10.12 -P18251 --local-infile -Dcdc -e "
truncate table cdc.t1;
truncate table cdc.t2;
truncate table cdc.t3;
-- 开启事务
set autocommit=0;
start transaction;
load data local infile '/home/mysql/cdc/d3.t1.txt' into table t1;show warnings;
load data local infile '/home/mysql/cdc/d3.t2.txt' into table t2;show warnings;
load data local infile '/home/mysql/cdc/d3.t3.txt' into table t3;show warnings;
insert into d3.t1 select * from cdc.t1 t2 on duplicate key update
VideoSongName = t2.VideoSongName,
VideoSongCover = t2.VideoSongCover,
SingerName = t2.SingerName,
Original = t2.Original,
State = t2.State,
FavoriteCount = t2.FavoriteCount,
Duration = t2.Duration,
AudioFileUrl = t2.AudioFileUrl,
AudioFileUrlMd5 = t2.AudioFileUrlMd5,
KscLink = t2.KscLink,
MidFlag = t2.MidFlag,
TagBeginTimeMs = t2.TagBeginTimeMs,
TagEndTimeMs = t2.TagEndTimeMs,
CreateTime = t2.CreateTime,
UpdateTime = t2.UpdateTime,
operatorId = t2.operatorId,
taskId = t2.taskId,
taskRecode = t2.taskRecode,
fileSize = t2.fileSize,
convertMark = t2.convertMark,
originalAudioFileUrl = t2.originalAudioFileUrl;show warnings;
insert into d3.t2 select * from cdc.t2 t2 on duplicate key update
songSheetName = t2.songSheetName,
sheetOrder = t2.sheetOrder,
sheetDescribe = t2.sheetDescribe,
coverUrl = t2.coverUrl,
state = t2.state,
onlineTime = t2.onlineTime,
offlineTime = t2.offlineTime,
createTime = t2.createTime,
updateTime = t2.updateTime,
operatorId = t2.operatorId;show warnings;
insert into d3.t3 select * from cdc.t3 t2 on duplicate key update
songSheetId = t2.songSheetId,
songOrder = t2.songOrder,
state = t2.state,
sourceType = t2.sourceType,
sourceID = t2.sourceID,
createTime = t2.createTime,
updateTime = t2.updateTime,
operatorId = t2.operatorId;show warnings;
commit;" -vvv &
wait
date;
crontab 调度执行:
代码语言:javascript复制5,15,25,35,45,55 0-18 * * * /home/mysql/cdc/regular_etl.sh > /home/mysql/cdc/regular_etl.log 2>&1 ; /home/mysql/cdc/send_message.sh