定期变换数据捕获并用钉钉机器人发报警消息

2023-11-11 09:09:05 浏览数 (3)

发消息脚本文件 send_message.sh 内容如下:

代码语言:javascript复制
#!/bin/bash
source ~/.bashrc 

cd /home/mysql/cdc/

m=`date  '%-M'`
t=`date  '%Y%m%d%H%M%S'`
e=`cat regular_etl.log | grep ERROR | wc -l`
w=`cat regular_etl.log | grep Warnings | grep -v "Warnings: 0" | wc -l`

if [ $e -gt 0 ] && [ $w -gt 0 ]; then
   cp regular_etl.log regular_etl.log.$t
   curl -k 'https://oapi.dingtalk.com/robot/send?access_token=23f4ccfe131007f3f875dfb6b5bc604f7c2bdc3a5228d6453ada1b64178c0c1' 
   -H 'Content-Type: application/json' 
   -d '{"msgtype": "text","text": {"content":"songod 表数据同步有错误和警告!"}}'
elif [ $e -gt 0 ]; then
   cp regular_etl.log regular_etl.log.$t
   curl -k 'https://oapi.dingtalk.com/robot/send?access_token=23f4ccfe131007f3f875dfb6b5bc604f7c2bdc3a5228d6453ada1b64178c0c1' 
   -H 'Content-Type: application/json' 
   -d '{"msgtype": "text","text": {"content":"songod 表数据同步有错误!"}}'
elif [ $w -gt 0 ]; then
   cp regular_etl.log regular_etl.log.$t
   curl -k 'https://oapi.dingtalk.com/robot/send?access_token=23f4ccfe131007f3f875dfb6b5bc604f7c2bdc3a5228d6453ada1b64178c0c1' 
   -H 'Content-Type: application/json' 
   -d '{"msgtype": "text","text": {"content":"songod 表数据同步有警告!"}}'
#else
#   if [ $m -ge 55 -o $m -lt 5 ]; then
#      curl -k 'https://oapi.dingtalk.com/robot/send?access_token=23f4ccfe131007f3f875dfb6b5bc604f7c2bdc3a5228d6453ada1b64178c0c1' 
#      -H 'Content-Type: application/json' 
#      -d '{"msgtype": "text","text": {"content":"songod 表数据同步完成!"}}'
#   fi
fi

变化数据捕获(CDC)脚本文件 regular_etl.sh 内容如下:

代码语言:javascript复制
#!/bin/bash
source ~/.bash_profile

cd /home/mysql/cdc/

# 定期增量装载数据量很小,但不能部分装载,所以采用并行事务方式

date;

# 0点5分抽取最近5小时15分钟的变化数据,其它调度时间抽取最近15分钟的变化数据
currentTime=`date  "%H%M"`
if [[ $currentTime = "0005" ]];
then
   dt=`date -d "5 hour ago 15 minute ago"  "%Y-%m-%d %H:%M:%S"`
   # dt=`date -d "5 day ago 15 minute ago"  "%Y-%m-%d %H:%M:%S"`
else
   dt=`date -d "15 minute ago"  "%Y-%m-%d %H:%M:%S"`
   # dt=`date -d "15 day ago"  "%Y-%m-%d %H:%M:%S"`
fi

# 第一步:导出增量数据到文件

# D1
mysql -uwxy -p123456 -h 10.0.0.1 -P3306 -N -e "select * from d1.t1 where CreateTime >= '${dt}'" > /home/mysql/cdc/d1.t1.txt &
mysql -uwxy -p123456 -h 10.0.0.1 -P3306 -N -e "select * from d1.t2 where CreateTime >= '${dt}'" > /home/mysql/cdc/d1.t2.txt &
mysql -uwxy -p123456 -h 10.0.0.1 -P3306 -N -e "select * from d1.t3 where CreateTime >= '${dt}'" > /home/mysql/cdc/d1.t3.txt &

# D2
mysql -uwxy -p123456 -h 10.0.0.2 -P3306 -N -e "select * from d2.t1 where CreateTime >= '${dt}'" > /home/mysql/cdc/d2.t1.txt &
mysql -uwxy -p123456 -h 10.0.0.2 -P3306 -N -e "select * from d2.t2 where CreateTime >= '${dt}'" > /home/mysql/cdc/d2.t2.txt &
mysql -uwxy -p123456 -h 10.0.0.2 -P3306 -N -e "select * from d2.t3 where CreateTime >= '${dt}'" > /home/mysql/cdc/d2.t3.txt &

# D3
mysql -uwxy -p123456 -h 10.0.0.3 -P3306 -N -e "select * from d3.t1 where CreateTime >= '${dt}'" > /home/mysql/cdc/d3.t1.txt &
mysql -uwxy -p123456 -h 10.0.0.3 -P3306 -N -e "select * from d3.t2 where CreateTime >= '${dt}'" > /home/mysql/cdc/d3.t2.txt &
mysql -uwxy -p123456 -h 10.0.0.3 -P3306 -N -e "select * from d3.t3 where CreateTime >= '${dt}'" > /home/mysql/cdc/d3.t3.txt &

wait

# 处理 NULL 值
sed 's/NULL/\N/g' -i *.txt

# 第一步:将增量数据导入过渡区
mysql -uwxy -p123456 -h10.10.10.10 -P3306 --local-infile -Dcdc -e "
-- 清空过渡区
truncate table cdc.t1;
truncate table cdc.t2;
truncate table cdc.t3;

-- 开启事务
set autocommit=0;
start transaction;

-- 装载过渡区
load data local infile '/home/mysql/cdc/d1.t1.txt' into table t1;show warnings;
load data local infile '/home/mysql/cdc/d1.t2.txt' into table t2;show warnings;
load data local infile '/home/mysql/cdc/d1.t3.txt' into table t3;show warnings;


-- 此处可以按需添加逻辑,如装载目标表前做一些 update 修改

-- 装载目标表
insert into d1.t1 select * from cdc.t1 t2 on duplicate key update
  Name = t2.Name,
  NickName = t2.NickName,
  SpaceID = t2.SpaceID,
  Brief = t2.Brief,
  Piclink1 = t2.Piclink1,
  Piclink2 = t2.Piclink2,
  Remark = t2.Remark,
  State = t2.State,
  CreateTime = t2.CreateTime,
  PinYinInitial = t2.PinYinInitial,
  IconNo = t2.IconNo,
  AuthDes = t2.AuthDes,
  SkinID = t2.SkinID,
  NameQP = t2.NameQP,
  MusicChannelPic = t2.MusicChannelPic,
  PopularityChange = t2.PopularityChange,
  TeamFlag = t2.TeamFlag,
  AuthState = t2.AuthState,
  nicknamePinyin = t2.nicknamePinyin,
  VVID = t2.VVID,
  baselove = t2.baselove,
  zpCount = t2.zpCount,
  musicStyle = t2.musicStyle,
  audience = t2.audience,
  authorizeFile = t2.authorizeFile;show warnings;

insert into d1.t2 select * from cdc.t2 t2 on duplicate key update
  State = t2.State,
  Priority = t2.Priority,
  CreateTime = t2.CreateTime,
  Priority2 = t2.Priority2;show warnings;
  
insert into d1.t3 select * from cdc.t3 t2 on duplicate key update
  wikiState = t2.wikiState,
  createTime = t2.createTime,
  updateTime = t2.updateTime;show warnings;

commit;" -vvv &

mysql -uwxy -p123456 -h10.10.10.11 -P3306 --local-infile -Dcdc -e "
truncate table cdc.t1;
truncate table cdc.t2;
truncate table cdc.t3;

-- 开启事务
set autocommit=0;
start transaction;

load data local infile '/home/mysql/cdc/d2.t1.txt' into table t1;show warnings;
load data local infile '/home/mysql/cdc/d2.t2.txt' into table t2;show warnings;
load data local infile '/home/mysql/cdc/d2.t3.txt' into table t3;show warnings;

insert into d2.t1 select * from cdc.t1 t2 on duplicate key update
  wikiId = t2.wikiId,
  status = t2.status,
  createtime = t2.createtime,
  updatetime = t2.updatetime;show warnings;

insert into d2.t2 select * from cdc.t2 t2 on duplicate key update
  baseInfo = t2.baseInfo,
  brief = t2.brief,
  otherInfo = t2.otherInfo,
  createtime = t2.createtime,
  updatetime = t2.updatetime;show warnings;

insert into d2.t3 select * from cdc.t3 t2 on duplicate key update
  songid = t2.songid,
  subId = t2.subId,
  createTime = t2.createTime,
  updateTime = t2.updateTime;show warnings;

commit;" -vvv &

mysql -uwxy -p123456 -h10.10.10.12 -P18251 --local-infile -Dcdc -e "
truncate table cdc.t1;
truncate table cdc.t2;
truncate table cdc.t3;

-- 开启事务
set autocommit=0;
start transaction;

load data local infile '/home/mysql/cdc/d3.t1.txt' into table t1;show warnings;
load data local infile '/home/mysql/cdc/d3.t2.txt' into table t2;show warnings;
load data local infile '/home/mysql/cdc/d3.t3.txt' into table t3;show warnings;

insert into d3.t1 select * from cdc.t1 t2 on duplicate key update
  VideoSongName = t2.VideoSongName,
  VideoSongCover = t2.VideoSongCover,
  SingerName = t2.SingerName,
  Original = t2.Original,
  State = t2.State,
  FavoriteCount = t2.FavoriteCount,
  Duration = t2.Duration,
  AudioFileUrl = t2.AudioFileUrl,
  AudioFileUrlMd5 = t2.AudioFileUrlMd5,
  KscLink = t2.KscLink,
  MidFlag = t2.MidFlag,
  TagBeginTimeMs = t2.TagBeginTimeMs,
  TagEndTimeMs = t2.TagEndTimeMs,
  CreateTime = t2.CreateTime,
  UpdateTime = t2.UpdateTime,
  operatorId = t2.operatorId,
  taskId = t2.taskId,
  taskRecode = t2.taskRecode,
  fileSize = t2.fileSize,
  convertMark = t2.convertMark,
  originalAudioFileUrl = t2.originalAudioFileUrl;show warnings;

insert into d3.t2 select * from cdc.t2 t2 on duplicate key update
  songSheetName = t2.songSheetName,
  sheetOrder = t2.sheetOrder,
  sheetDescribe = t2.sheetDescribe,
  coverUrl = t2.coverUrl,
  state = t2.state,
  onlineTime = t2.onlineTime,
  offlineTime = t2.offlineTime,
  createTime = t2.createTime,
  updateTime = t2.updateTime,
  operatorId = t2.operatorId;show warnings;

insert into d3.t3 select * from cdc.t3 t2 on duplicate key update
  songSheetId = t2.songSheetId,
  songOrder = t2.songOrder,
  state = t2.state,
  sourceType = t2.sourceType,
  sourceID = t2.sourceID,
  createTime = t2.createTime,
  updateTime = t2.updateTime,
  operatorId = t2.operatorId;show warnings;

commit;" -vvv &

wait

date;

crontab 调度执行:

代码语言:javascript复制
5,15,25,35,45,55 0-18 * * * /home/mysql/cdc/regular_etl.sh > /home/mysql/cdc/regular_etl.log 2>&1 ; /home/mysql/cdc/send_message.sh

0 人点赞