文章目录
- 03-PDI(Kettle)导入与导出CDC
- 数据的全量、增量、差异备份
- 基于时间戳的源数据CDC
- 实验原理
- 实验步骤
- 基于触发器的CDC
- 实验原理
- 实验步骤
- 基于快照的CDC案例
- 实验原理
- 实验步骤
03-PDI(Kettle)导入与导出CDC
数据的全量、增量、差异备份
参考博客: https://blog.csdn.net/qq_38097573/article/details/103593150
在很远很远的地方,有一个帐房先生。 他每天要记很多很多的账单。 老先生一生谨慎,为了保证账本的安全, 便找来三个徒弟帮忙来对账本做备份, 这样即使账本丢失了, 也可以用备份的账本继续使用。 三个徒弟各有所长,分别采用了不同的做法: 大徒弟▼
性格宅心仁厚,成熟稳重。 他采用的方式是每天都把师父的账单重新抄录一份。这样做的好处就是每天都是一份完整的账本,每一个备份的账本都可以直接使用,坏处则是每天要花费很多时间去进行记录,并且需要很多纸、墨水以及存账本的柜子。 二徒弟▼
性格聪明伶俐,人小鬼大。 她觉得大师姐的方法太累太耗时,不如每天只抄录账本上新增的信息,这样她每天总是第一个抄完,不仅节省时间,而且为店里节省了大量的纸张、墨水等成本。不过她最讨厌师父来检查账本了,因此她需要将每次纪录的数据拼凑在一起才能组成一个完成的账本。 三徒弟▼
性格心思缜密,粗中有细 她借鉴了两位师姐的办法,进行折中创新,具体是隔一段时间就把账本的所有信息重新抄一份,而在这段时间内则只纪录每天相比于“总账”变化的信息,比如周一抄一份总账,周二买菜一项花了100元,那么周三就把“买菜花了100元”纪录下来;周三买肉花了50元,周四就把周三的“买肉花了50元”和周二的“买菜一项花了100元”都记录下来。如果师父要查账的话,她把新纪录的数据和“总账”拼在一起,就是完整的账本了。这种方式在记账速度、纸墨使用量、账单查看等方面都是一种较为折中的方式。哦,对了,一个比较明显的不足就是解释起来需要打更多的字!
三个徒弟的三种方法各有千秋,后来为了便于记忆,账房先生给这三种方法起了名字,分别叫:全量备份、增量备份和差异备份。
基于时间戳的源数据CDC
实验原理
从时间戳识别出变化的数据并只导入这部分数据。根据cdc_time_log表中的上次执行时间,以及输入的当前执行时间,增量导出student_cdc表中的数据。输出的数据存储在XX/student_cdc.xls文件中。其中,cdc_time_log表的主要作用是记录上次执行的时间,拉取当前执行时间与上次执行时间之间的数据即为增量数据, 拉取成功后,需要将cdc_time_log表中的上次执行时间更新为当前执行时间。这样就可以继续进行CDC操作。
实验步骤
- 创建student_cdc表。
在mysql命令行执行student_cdc.sql脚本
student_cdc.sql内容
代码语言:javascript复制SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;
-- ----------------------------
-- Table structure for student_cdc
-- ----------------------------
DROP TABLE IF EXISTS `student_cdc`;
CREATE TABLE `student_cdc` (
`学号` int(11) NOT NULL,
`姓名` varchar(45) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
`性别` varchar(45) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
`班级` varchar(45) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
`年龄` varchar(45) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
`成绩` int(15) NULL DEFAULT NULL,
`身高` varchar(45) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
`手机` varchar(45) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
`插入时间` varchar(45) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
`更新时间` varchar(45) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
PRIMARY KEY (`学号`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;
-- ----------------------------
-- Records of student_cdc
-- ----------------------------
INSERT INTO `student_cdc` VALUES (1, '张一', '男', '1701', '16', 78, '170', '18946554571', '2018-08-06', '2018-08-06');
INSERT INTO `student_cdc` VALUES (2, '李二', '男', '1701', '17', 80, '175', '18946554572', '2018-08-06', '2018-08-06');
INSERT INTO `student_cdc` VALUES (3, '谢逊', '男', '1702', '18', 95, '169', '18946554573', '2018-08-06', '2018-08-06');
INSERT INTO `student_cdc` VALUES (4, '赵玲', '女', '1702', '19', 86, '180', '18956257895', '2018-08-06', '2018-08-06');
INSERT INTO `student_cdc` VALUES (5, '张明', '男', '1704', '20', 85, '185', '18946554575', '2018-08-07', '2018-08-07');
INSERT INTO `student_cdc` VALUES (6, '张三', '女', '1704', '18', 92, '169', '18946554576', '2018-08-06', '2018-08-07');
SET FOREIGN_KEY_CHECKS = 1;
在mysql命令行执行cdc_time_log.sql脚本。
cdc_time_log.sql内容
代码语言:javascript复制SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;
-- ----------------------------
-- Table structure for cdc_time_log
-- ----------------------------
DROP TABLE IF EXISTS `cdc_time_log`;
CREATE TABLE `cdc_time_log` (
`id` int(11) NOT NULL,
`上次执行时间` varchar(45) CHARACTER SET latin1 COLLATE latin1_swedish_ci NULL DEFAULT NULL,
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = latin1 COLLATE = latin1_swedish_ci ROW_FORMAT = Dynamic;
-- ----------------------------
-- Records of cdc_time_log
-- ----------------------------
INSERT INTO `cdc_time_log` VALUES (1, '2018-08-04');
SET FOREIGN_KEY_CHECKS = 1;
2.新建并设计转换。
(1)设计转换如下
(2) 命名参数设置:鼠标在转换的空白地方右键,选择“转换设置”。
在命名参数标签卡“命名参数”中配置命名参数的名字(cur_time)和默认值(“2018-08-04”)。
(3)“table imput”的配置:
命名为“CDC日志表输入”
建立数据库连接,在数据库连接的option中设置
代码语言:javascript复制characterEncoding为utf-8
SQL查询语句为:
代码语言:javascript复制SELECT 上次执行时间 as last1,'${cur_time}' as cur1,上次执行时间 as last2,'${cur_time}' as cur2 FROM cdc_time_log
(4)“table input2”设置 步骤命名为“学生表输入” Sql语句为:
代码语言:javascript复制SELECT 学号, 姓名, 性别, 班级, 年龄, 成绩, 身高, 手机, 插入时间, 更新时间, '${cur_time}' as 导入时间 FROM student_cdc WHERE (插入时间>=? and 插入时间<=?) OR (更新时间>? and 更新时间<=?)
(5)“Microsoft Excel 输出”步骤设置,文件输出路径设置为"e:/output/student_cdc" 。
(6)“插入/更新”步骤设置
由于insert/update的上一部为excel,这里选择get update fields后,会出现多余字段,可通过edit mapping配置对应的映射关系。其中table field 为当前表中的字段,stream field为上一个步骤流在的字段。
3、运行
(1)在弹出的对话框中,设置命名参数cur_time的值为“2018-08-06”,点击“启动”按钮,将在路径/home/ubuntu中输出名为student_cdc.xls的文件,cdc_time_log表中的内容将自动改为输入的参数值。
excel输出内容为:
(2)第二次运行,将参数修改为“2018-08-07”。
excel输出内容为:
cdc_time_log内容更新为: 2018-08-07
基于触发器的CDC
实验原理
类似时间戳和主键序列的CDC操作,区别在于这里采用触发器生成增量条件。
实验步骤
- 创建所需表。
sql语句内容如下: 注意:这段代码是总的SQL语句,不需要执行,下面会对这段语句分步骤解释,读者执行分步骤中的语句即可
代码语言:javascript复制create table studentsync like studentinfo;
-- desc studentsync;
ALTER TABLE studentsync MODIFY createtimestamp datetime;
ALTER TABLE studentsync MODIFY modifytimestamp datetime;
ALTER TABLE studentsync ADD synchronizationtime datetime;
-- 创建操作日志表
DROP TABLE IF EXISTS cdc_opt_log;
create table cdc_opt_log
(
SID int PRIMARY KEY,
optype char(1),
opflag char(6)
);
-- 创建 INSERT 触发器
DROP TRIGGER IF EXISTS tri_insert_student;
DELIMITER //
CREATE TRIGGER tri_insert_student AFTER INSERT ON studentinfo FOR EACH ROW
begin
IF (SELECT 1 FROM cdc_opt_log WHERE SID=new.ID) THEN
UPDATE cdc_opt_log SET optype='I',opflag='未处理' WHERE SID=new.ID;
ELSE
INSERT INTO cdc_opt_log(SID, optype, opflag) VALUES (new.ID, 'I', '未处理');
END IF;
end
//
DELIMITER ;
-- 创建 UPDATE 触发器
DROP TRIGGER IF EXISTS tri_update_student;
DELIMITER //
CREATE TRIGGER tri_update_student AFTER UPDATE ON studentinfo FOR EACH ROW
begin
IF (SELECT 1 FROM cdc_opt_log WHERE SID=new.ID) THEN
UPDATE cdc_opt_log SET optype='U',opflag='未处理' WHERE SID=new.ID;
ELSE
INSERT INTO cdc_opt_log(SID, optype, opflag) VALUES (new.ID, 'U', '未处理');
END IF;
end
//
DELIMITER ;
-- 创建 DELETE 触发器
DROP TRIGGER IF EXISTS tri_delete_student;
DELIMITER //
CREATE TRIGGER tri_delete_student AFTER DELETE ON studentinfo FOR EACH ROW
begin
IF (SELECT 1 FROM cdc_opt_log WHERE SID=old.ID) THEN
UPDATE cdc_opt_log SET optype='D',opflag='未处理' WHERE SID=old.ID;
ELSE
INSERT INTO cdc_opt_log(SID, optype, opflag) VALUES (old.ID, 'D', '未处理');
END IF;
end
//
DELIMITER ;
-- 触发插入
-- INSERT INTO studentinfo (ID,Name,Gender,Class,Age,Score,Height,PhoneNumber) VALUES (10,'Shan Zhang','M','1704',18,91,171,'19946554576');
-- 触发删除
-- DELETE FROM studentinfo WHERE Name='Shan Zhang';
-- 触发更新
-- UPDATE studentinfo SET Score=92 WHERE ID=1;
-- 清空操作日志表
-- TRUNCATE cdc_opt_log;
- 创建INSERT触发器
创建INSERT触发器tri_insert_student
代码语言:javascript复制DROP TRIGGER IF EXISTS tri_insert_student;
DELIMITER //
CREATE TRIGGER tri_insert_student AFTER INSERT ON studentinfo FOR EACH ROW
begin
IF (SELECT 1 FROM cdc_opt_log WHERE SID=new.ID) THEN
UPDATE cdc_opt_log SET optype='I',opflag='未处理' WHERE SID=new.ID;
ELSE
INSERT INTO cdc_opt_log(SID, optype, opflag) VALUES (new.ID, 'I', '未处理');
END IF;
end
//
DELIMITER ;
studentinfo表中插入一条记录后,触发器会执行,向cdc_opt_log中更新或插入一条记录。 本段语句解释如下:
代码语言:javascript复制DROP TRIGGER IF EXISTS tri_insert_student;
表示删除触发器
DELIMITER // 表示修改定界符为 // ,避免MySQL遇到 分号;立刻执行
CREATE TRIGGER tri_insert_student AFTER INSERT ON studentinfo FOR EACH ROW 表示创建的触发器为tri_insert_student , 后面的AFTER表示插入后执行,可选BEFORE, 后面的INSERT表示插入时触发器执行, ON studentinfo 表示触发器定义在某表中, FOR EACH ROW表示每行都会触发。
代码语言:javascript复制begin
IF (SELECT 1 FROM cdc_opt_log WHERE SID=new.ID) THEN
UPDATE cdc_opt_log SET optype='I',opflag='未处理' WHERE SID=new.ID;
ELSE
INSERT INTO cdc_opt_log(SID, optype, opflag) VALUES (new.ID, 'I', '未处理');
END IF;
end
表示触发器的语句,其中new 为支持的关键字,表示修改后的行,old表示修改前的行。
DELIMITER ;表示修改定界符回默认的;。
- 创建UPDATE触发器 创建INSERT触发器tri_update_student
DROP TRIGGER IF EXISTS tri_update_student;
DELIMITER //
CREATE TRIGGER tri_update_student AFTER UPDATE ON studentinfo FOR EACH ROW
begin
IF (SELECT 1 FROM cdc_opt_log WHERE SID=new.ID) THEN
UPDATE cdc_opt_log SET optype='U',opflag='未处理' WHERE SID=new.ID;
ELSE
INSERT INTO cdc_opt_log(SID, optype, opflag) VALUES (new.ID, 'U', '未处理');
END IF;
end
//
DELIMITER ;
这段语句与插入触发器基本类似,不再重复描述
- 创建DELETE触发器
创建INSERT触发器tri_delete_student
代码语言:javascript复制DROP TRIGGER IF EXISTS tri_delete_student;
DELIMITER //
CREATE TRIGGER tri_delete_student AFTER DELETE ON studentinfo FOR EACH ROW
begin
IF (SELECT 1 FROM cdc_opt_log WHERE SID=old.ID) THEN
UPDATE cdc_opt_log SET optype='D',opflag='未处理' WHERE SID=old.ID;
ELSE
INSERT INTO cdc_opt_log(SID, optype, opflag) VALUES (old.ID, 'D', '未处理');
END IF;
end
//
DELIMITER ;
- 转换设计
- 具体步骤
(1)Table Input :GetCDCOptlog
代码语言:javascript复制 SELECT
optype,
'已完成' as res,
SID
FROM cdc_opt_log
WHERE opflag='未处理'
(2)Switch Case :IsDeleteOperation
(3)Table Input:GetStudentCDC sql语句:
代码语言:javascript复制 SELECT
ID
, Name
, Gender
, Class
, Age
, Score
, Height
, PhoneNumber
, createtimestamp
, modifytimestamp
, ? as optype
, ? as res
, CURRENT_TIMESTAMP as loadtimestamp
FROM studentinfo
WHERE ID=?
(4)Insert Update:InsertOrUpdateStudentCDCtoSyncTable 这一步骤会将新插入的数据更新到studentsync。
(5)Insert Update:UpdateCDCOptLog 这一步骤会将cdc_opt_log表在的optflag字段修改为res值(已完成)
(6)Delete : DeleteStudentFromSyncTable
这一步骤会删除studentsync中的指定ID的记录
(7)Insert / Update : UpdateCDCOptLog2
至此:转换编写完成,如有需要,请留言,我会把ktr文件发到邮箱中
基于快照的CDC案例
实验原理
如果没有时间戳,不允许使用触发器,就要使用快照表。可以通过比较源表和快照表来获得数据变化。 基于快照的CDC可以检测到插入、更新和删除的数据,这是相对于基于时间戳的CDC方案的有点。其缺点是需要大量存储空间来保存快照。
实验步骤
- 复制表中全部数据的几种方法 复制studentinfo表中数据到新表studentinfobak1 可选用的方法, 参考: https://blog.csdn.net/weixin_39732609/article/details/113555925
1、复制表结构及数据到新表
CREATE TABLE 新表SELECT * FROM 旧表
这种方法会将oldtable中所有的内容都拷贝过来,当然我们可以用delete from newtable;来删除。
不过这种方法的一个最不好的地方就是新表中没有了旧表的primary key、Extra(auto_increment)等属性。需要自己用 alter 添加,而且容易搞错。
2、只复制表结构到新表
CREATE TABLE 新表 SELECT * FROM 旧表 WHERE 1=2
或CREATE TABLE 新表 LIKE 旧表
3、复制旧表的数据到新表(假设两个表结构一样)
INSERT INTO 新表SELECT * FROM 旧表
4、复制旧表的数据到新表(假设两个表结构不一样)
INSERT INTO 新表(字段1,字段2,.......) SELECT 字段1,字段2,...... FROM 旧表
5、可以将表1结构复制到表2
SELECT * INTO 表2 FROM 表1 WHERE 1=2
6、可以将表1内容全部复制到表2
SELECT * INTO 表2 FROM 表1
7、 show create table 旧表;
这样会将旧表的创建命令列出。我们只需要将该命令拷贝出来,更改table的名字,就可以建立一个完全一样的表
8、mysqldump
用mysqldump将表dump出来,改名字后再导回去或者直接在命令行中运行
9、复制旧数据库到新数据库(复制全部表结构并且复制全部表数据)
#mysql -u root -ppassword
>CREATE DATABASE new_db;
#mysqldump old_db -u root -ppassword--skip-extended-insert --add-drop-table | mysql new_db -u root -ppassword
10、表不在同一数据库中(如,db1 table1, db2 table2)
sql: insert into db1.table1 select * from db2.table2 (完全复制)
insert into db1.table1 select distinct * from db2.table2(不复制重复纪录)
insert into tdb1.able1 select top 5 * from db2.table2 (前五条纪录)
- 创建studentinfo表
-- -----------------------------------------------
-- 创建kettledb数据库
-- -----------------------------------------------
CREATE DATABASE IF NOT EXISTS kettledb;
USE kettledb;
-- -----------------------------------------------
-- 创建studentinfo表
-- 添加两个时间戳字段:
-- ID : 学生记录主键,设置未自增序列 (AUTO_INCREMENT)
-- 新增学生记录会ID会自动增加1
-- createtimestamp : 记录创建时间
-- 当记录被创建是自动设置当时时间
-- moditytimestamp : 记录修改时间
-- 当记录被更新是自动设置当时时间
-- -----------------------------------------------
DROP TABLE IF EXISTS studentinfo;
CREATE TABLE studentinfo (
ID int UNSIGNED PRIMARY KEY AUTO_INCREMENT,
Name varchar(20),
Gender char(2),
Class char(10),
Age tinyint UNSIGNED DEFAULT NULL,
Score tinyint UNSIGNED DEFAULT NULL,
Height tinyint UNSIGNED DEFAULT NULL,
PhoneNumber char(11),
createtimestamp datetime DEFAULT CURRENT_TIMESTAMP,
modifytimestamp datetime ON UPDATE CURRENT_TIMESTAMP
);
-- -----------------------------------------------
-- 插入几条测试数据
-- createtimestamp和modifytimestamp会自动添加
-- -----------------------------------------------
INSERT INTO studentinfo (ID, Name, Gender, Class, Age, Score, Height, PhoneNumber) VALUES (1, 'Yi Zhang', 'M', '1701', 16, 78, 170, '19946554571');
INSERT INTO studentinfo (ID, Name, Gender, Class, Age, Score, Height, PhoneNumber) VALUES (2, 'Li, Er', 'M', '1701', 17, 80, 175, '19946554572');
INSERT INTO studentinfo (ID, Name, Gender, Class, Age, Score, Height, PhoneNumber) VALUES (3, 'Xun Xie', 'M', '1702', 18, 95, 168, '19946554573');
INSERT INTO studentinfo (ID, Name, Gender, Class, Age, Score, Height, PhoneNumber) VALUES (4, 'Ling Zhao', 'F', '1702', 19, 86, 180, '19946554574');
INSERT INTO studentinfo (ID, Name, Gender, Class, Age, Score, Height, PhoneNumber) VALUES (5, 'Ming Zhang', 'M', '1704', 20, 85, 185, '19946554575');
INSERT INTO studentinfo (ID, Name, Gender, Class, Age, Score, Height, PhoneNumber) VALUES (6, 'San Zhang', 'F', '1704', 18, 95, 169, '19946554576');
- 复制kettledb数据库中的studentinfo数据到新的表studentinfobak1
use kettledb;
show create table studentinfo;
返回studentinfo建表语句为:
代码语言:javascript复制CREATE TABLE `studentinfo` (
`ID` int(10) unsigned NOT NULL AUTO_INCREMENT,
`Name` varchar(20) DEFAULT NULL,
`Gender` char(2) DEFAULT NULL,
`Class` char(10) DEFAULT NULL,
`Age` tinyint(3) unsigned DEFAULT NULL,
`Score` tinyint(3) unsigned DEFAULT NULL,
`Height` tinyint(3) unsigned DEFAULT NULL,
`PhoneNumber` char(11) DEFAULT NULL,
`createtimestamp` datetime DEFAULT CURRENT_TIMESTAMP,
`modifytimestamp` datetime DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`ID`)
) ENGINE=InnoDB AUTO_INCREMENT=7 DEFAULT CHARSET=utf8
根据studentinfo建表语句创建studentinfobak1表,注意这里修改了表名为studentinfobak1。
代码语言:javascript复制CREATE TABLE `studentinfobak1` (
`ID` int(10) unsigned NOT NULL AUTO_INCREMENT,
`Name` varchar(20) DEFAULT NULL,
`Gender` char(2) DEFAULT NULL,
`Class` char(10) DEFAULT NULL,
`Age` tinyint(3) unsigned DEFAULT NULL,
`Score` tinyint(3) unsigned DEFAULT NULL,
`Height` tinyint(3) unsigned DEFAULT NULL,
`PhoneNumber` char(11) DEFAULT NULL,
`createtimestamp` datetime DEFAULT CURRENT_TIMESTAMP,
`modifytimestamp` datetime DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`ID`)
) ENGINE=InnoDB AUTO_INCREMENT=7 DEFAULT CHARSET=utf8
- 快照备份 将studentinfo中的数据备份到studentinfobak1 中
INSERT INTO studentinfobak1 SELECT * FROM studentinfo
- 修改studentinfo表中数据 执行的sql:
-- 触发插入
INSERT INTO studentinfo (ID,Name,Gender,Class,Age,Score,Height,PhoneNumber) VALUES (10,'Shan Zhang','M','1704',18,91,171,'19946554576');
-- 触发删除
DELETE FROM studentinfo WHERE Name='San Zhang';
-- 触发更新
UPDATE studentinfo SET Score=92 WHERE ID=1;
-- 清空操作日志表
-- TRUNCATE cdc_opt_log;
- 复制studentinfo表中数据到新表studentinfobak2 创建studentinfobak2表
CREATE TABLE `studentinfobak2` (
`ID` int(10) unsigned NOT NULL AUTO_INCREMENT,
`Name` varchar(20) DEFAULT NULL,
`Gender` char(2) DEFAULT NULL,
`Class` char(10) DEFAULT NULL,
`Age` tinyint(3) unsigned DEFAULT NULL,
`Score` tinyint(3) unsigned DEFAULT NULL,
`Height` tinyint(3) unsigned DEFAULT NULL,
`PhoneNumber` char(11) DEFAULT NULL,
`createtimestamp` datetime DEFAULT CURRENT_TIMESTAMP,
`modifytimestamp` datetime DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`ID`)
) ENGINE=InnoDB AUTO_INCREMENT=7 DEFAULT CHARSET=utf8
- 快照备份 将studentinfo中的数据备份到新的备份表studentinfobak2 中:
INSERT INTO studentinfobak2 SELECT * FROM studentinfo
- kettle步骤
(1)设计转换 table input 为第一次的备份studentinfobak1中的数据 table input 2 为第二次的备份studentinfobak2 中的数据 通过比较两份数据中的差(Merge rows diff),即可获得增量数据, 再通过synchronize after merge中进行合并操作。synchronize after merge空间常与Merge rows diff联合使用,用于合并后同步信息 =根据某个字段值的条件插入,删除,更新数据库表
(2)table input1
(3)table input2
(4)merge rows (diff) 这里会将Table input作为参照表,Table input 2 作为比较表,根据ID进行匹配,ID相同的进行比较,如果ID仅存在与Table input 中,而在Table input 2 不存在,表示该数据被删除了。如果ID不存在与Table input 中,而在Table input 2 存在,表示该数据为新增。如果ID同时存在与Table input和Table input 2中,比较其ID ,Name,Gender…等字段,如果这些字段在两个表输入不相同,表示该数据进行过修改。
有提示建议先排序,可以在merge rows前执行下sort row操作,读者可以再添加排序步骤
(5)synchronize after merge设置 General选项通用设置: Target Schema为比较两份快照后端增量输出表,保存这两份快照中的增量数据。 Table field表示目标标准的ID字段,Stream field1 表示上一步merge rows diff输出的字段。 最下面的update fields表示要更新的字段。 第一次执行,可以选择SQL—>Execute生成目标表。
Advanced选项设置高级
至此:转换编写完成,如有需要,请留言,我会把ktr文件发到您的邮箱