03-PDI(Kettle)导入与导出CDC

2022-11-12 15:30:11 浏览数 (3)

文章目录

  • 03-PDI(Kettle)导入与导出CDC
    • 数据的全量、增量、差异备份
    • 基于时间戳的源数据CDC
      • 实验原理
      • 实验步骤
    • 基于触发器的CDC
      • 实验原理
      • 实验步骤
    • 基于快照的CDC案例
      • 实验原理
      • 实验步骤

03-PDI(Kettle)导入与导出CDC

数据的全量、增量、差异备份

参考博客: https://blog.csdn.net/qq_38097573/article/details/103593150

在很远很远的地方,有一个帐房先生。 他每天要记很多很多的账单。 老先生一生谨慎,为了保证账本的安全, 便找来三个徒弟帮忙来对账本做备份, 这样即使账本丢失了, 也可以用备份的账本继续使用。 三个徒弟各有所长,分别采用了不同的做法: 大徒弟▼

性格宅心仁厚,成熟稳重。 他采用的方式是每天都把师父的账单重新抄录一份。这样做的好处就是每天都是一份完整的账本,每一个备份的账本都可以直接使用,坏处则是每天要花费很多时间去进行记录,并且需要很多纸、墨水以及存账本的柜子。 二徒弟▼

性格聪明伶俐,人小鬼大。 她觉得大师姐的方法太累太耗时,不如每天只抄录账本上新增的信息,这样她每天总是第一个抄完,不仅节省时间,而且为店里节省了大量的纸张、墨水等成本。不过她最讨厌师父来检查账本了,因此她需要将每次纪录的数据拼凑在一起才能组成一个完成的账本。 三徒弟▼

性格心思缜密,粗中有细 她借鉴了两位师姐的办法,进行折中创新,具体是隔一段时间就把账本的所有信息重新抄一份,而在这段时间内则只纪录每天相比于“总账”变化的信息,比如周一抄一份总账,周二买菜一项花了100元,那么周三就把“买菜花了100元”纪录下来;周三买肉花了50元,周四就把周三的“买肉花了50元”和周二的“买菜一项花了100元”都记录下来。如果师父要查账的话,她把新纪录的数据和“总账”拼在一起,就是完整的账本了。这种方式在记账速度、纸墨使用量、账单查看等方面都是一种较为折中的方式。哦,对了,一个比较明显的不足就是解释起来需要打更多的字!

三个徒弟的三种方法各有千秋,后来为了便于记忆,账房先生给这三种方法起了名字,分别叫:全量备份、增量备份和差异备份。

基于时间戳的源数据CDC

实验原理

从时间戳识别出变化的数据并只导入这部分数据。根据cdc_time_log表中的上次执行时间,以及输入的当前执行时间,增量导出student_cdc表中的数据。输出的数据存储在XX/student_cdc.xls文件中。其中,cdc_time_log表的主要作用是记录上次执行的时间,拉取当前执行时间与上次执行时间之间的数据即为增量数据, 拉取成功后,需要将cdc_time_log表中的上次执行时间更新为当前执行时间。这样就可以继续进行CDC操作。

实验步骤

  1. 创建student_cdc表。

在mysql命令行执行student_cdc.sql脚本

student_cdc.sql内容

代码语言:javascript复制
SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;

-- ----------------------------
-- Table structure for student_cdc
-- ----------------------------
DROP TABLE IF EXISTS `student_cdc`;
CREATE TABLE `student_cdc`  (
  `学号` int(11) NOT NULL,
  `姓名` varchar(45) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  `性别` varchar(45) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  `班级` varchar(45) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  `年龄` varchar(45) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  `成绩` int(15) NULL DEFAULT NULL,
  `身高` varchar(45) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  `手机` varchar(45) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  `插入时间` varchar(45) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  `更新时间` varchar(45) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  PRIMARY KEY (`学号`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;

-- ----------------------------
-- Records of student_cdc
-- ----------------------------
INSERT INTO `student_cdc` VALUES (1, '张一', '男', '1701', '16', 78, '170', '18946554571', '2018-08-06', '2018-08-06');
INSERT INTO `student_cdc` VALUES (2, '李二', '男', '1701', '17', 80, '175', '18946554572', '2018-08-06', '2018-08-06');
INSERT INTO `student_cdc` VALUES (3, '谢逊', '男', '1702', '18', 95, '169', '18946554573', '2018-08-06', '2018-08-06');
INSERT INTO `student_cdc` VALUES (4, '赵玲', '女', '1702', '19', 86, '180', '18956257895', '2018-08-06', '2018-08-06');
INSERT INTO `student_cdc` VALUES (5, '张明', '男', '1704', '20', 85, '185', '18946554575', '2018-08-07', '2018-08-07');
INSERT INTO `student_cdc` VALUES (6, '张三', '女', '1704', '18', 92, '169', '18946554576', '2018-08-06', '2018-08-07');

SET FOREIGN_KEY_CHECKS = 1;

在mysql命令行执行cdc_time_log.sql脚本。

cdc_time_log.sql内容

代码语言:javascript复制
SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;

-- ----------------------------
-- Table structure for cdc_time_log
-- ----------------------------
DROP TABLE IF EXISTS `cdc_time_log`;
CREATE TABLE `cdc_time_log`  (
  `id` int(11) NOT NULL,
  `上次执行时间` varchar(45) CHARACTER SET latin1 COLLATE latin1_swedish_ci NULL DEFAULT NULL,
  PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = latin1 COLLATE = latin1_swedish_ci ROW_FORMAT = Dynamic;

-- ----------------------------
-- Records of cdc_time_log
-- ----------------------------
INSERT INTO `cdc_time_log` VALUES (1, '2018-08-04');

SET FOREIGN_KEY_CHECKS = 1;

2.新建并设计转换。

(1)设计转换如下

(2) 命名参数设置:鼠标在转换的空白地方右键,选择“转换设置”。

在命名参数标签卡“命名参数”中配置命名参数的名字(cur_time)和默认值(“2018-08-04”)。

(3)“table imput”的配置:

命名为“CDC日志表输入”

建立数据库连接,在数据库连接的option中设置

代码语言:javascript复制
characterEncoding为utf-8

SQL查询语句为:

代码语言:javascript复制
SELECT 上次执行时间 as last1,'${cur_time}' as cur1,上次执行时间 as last2,'${cur_time}' as cur2 FROM cdc_time_log

(4)“table input2”设置 步骤命名为“学生表输入” Sql语句为:

代码语言:javascript复制
SELECT 学号, 姓名, 性别, 班级, 年龄, 成绩, 身高, 手机, 插入时间, 更新时间, '${cur_time}' as 导入时间 FROM student_cdc WHERE (插入时间>=? and 插入时间<=?) OR (更新时间>? and 更新时间<=?)

(5)“Microsoft Excel 输出”步骤设置,文件输出路径设置为"e:/output/student_cdc" 。

(6)“插入/更新”步骤设置

由于insert/update的上一部为excel,这里选择get update fields后,会出现多余字段,可通过edit mapping配置对应的映射关系。其中table field 为当前表中的字段,stream field为上一个步骤流在的字段。

3、运行

(1)在弹出的对话框中,设置命名参数cur_time的值为“2018-08-06”,点击“启动”按钮,将在路径/home/ubuntu中输出名为student_cdc.xls的文件,cdc_time_log表中的内容将自动改为输入的参数值。

excel输出内容为:

(2)第二次运行,将参数修改为“2018-08-07”。

excel输出内容为:

cdc_time_log内容更新为: 2018-08-07

基于触发器的CDC

实验原理

类似时间戳和主键序列的CDC操作,区别在于这里采用触发器生成增量条件。

实验步骤

  1. 创建所需表。

sql语句内容如下: 注意:这段代码是总的SQL语句,不需要执行,下面会对这段语句分步骤解释,读者执行分步骤中的语句即可

代码语言:javascript复制
create table studentsync like studentinfo;

-- desc studentsync;
ALTER TABLE studentsync MODIFY createtimestamp datetime;
ALTER TABLE studentsync MODIFY modifytimestamp datetime;
ALTER TABLE studentsync ADD synchronizationtime datetime;


-- 创建操作日志表
DROP TABLE IF EXISTS cdc_opt_log;
create table cdc_opt_log 
(
	SID int PRIMARY KEY,
	optype char(1),
	opflag char(6)
);

-- 创建 INSERT 触发器
DROP TRIGGER IF EXISTS tri_insert_student;
DELIMITER //
CREATE TRIGGER tri_insert_student AFTER INSERT ON studentinfo FOR EACH ROW 
begin
	IF (SELECT 1 FROM cdc_opt_log WHERE SID=new.ID) THEN 
		UPDATE cdc_opt_log SET optype='I',opflag='未处理' WHERE SID=new.ID;
	ELSE
		INSERT INTO cdc_opt_log(SID, optype, opflag) VALUES (new.ID, 'I', '未处理');
	END IF;
end
//
DELIMITER ;

-- 创建 UPDATE 触发器
DROP TRIGGER IF EXISTS tri_update_student;
DELIMITER //
CREATE TRIGGER tri_update_student AFTER UPDATE ON studentinfo FOR EACH ROW 
begin
	IF (SELECT 1 FROM cdc_opt_log WHERE SID=new.ID) THEN 
		UPDATE cdc_opt_log SET optype='U',opflag='未处理' WHERE SID=new.ID;
	ELSE
		INSERT INTO cdc_opt_log(SID, optype, opflag) VALUES (new.ID, 'U', '未处理');
	END IF;
end
//
DELIMITER ;

-- 创建 DELETE 触发器 
DROP TRIGGER IF EXISTS tri_delete_student;
DELIMITER //
CREATE TRIGGER tri_delete_student AFTER DELETE ON studentinfo FOR EACH ROW 
begin
	IF (SELECT 1 FROM cdc_opt_log WHERE SID=old.ID) THEN 
		UPDATE cdc_opt_log SET optype='D',opflag='未处理' WHERE SID=old.ID;
	ELSE
		INSERT INTO cdc_opt_log(SID, optype, opflag) VALUES (old.ID, 'D', '未处理');
	END IF;
end
//
DELIMITER ;


-- 触发插入
-- INSERT INTO studentinfo (ID,Name,Gender,Class,Age,Score,Height,PhoneNumber) VALUES (10,'Shan Zhang','M','1704',18,91,171,'19946554576');

-- 触发删除
-- DELETE FROM studentinfo WHERE Name='Shan Zhang';

-- 触发更新
-- UPDATE studentinfo SET Score=92 WHERE ID=1;

-- 清空操作日志表
-- TRUNCATE cdc_opt_log;
  1. 创建INSERT触发器

创建INSERT触发器tri_insert_student

代码语言:javascript复制
DROP TRIGGER IF EXISTS tri_insert_student;
DELIMITER //
CREATE TRIGGER tri_insert_student AFTER INSERT ON studentinfo FOR EACH ROW 
begin
	IF (SELECT 1 FROM cdc_opt_log WHERE SID=new.ID) THEN 
		UPDATE cdc_opt_log SET optype='I',opflag='未处理' WHERE SID=new.ID;
	ELSE
		INSERT INTO cdc_opt_log(SID, optype, opflag) VALUES (new.ID, 'I', '未处理');
	END IF;
end
//
DELIMITER ;

studentinfo表中插入一条记录后,触发器会执行,向cdc_opt_log中更新或插入一条记录。 本段语句解释如下:

代码语言:javascript复制
DROP TRIGGER IF EXISTS tri_insert_student;

表示删除触发器

DELIMITER // 表示修改定界符为 // ,避免MySQL遇到 分号;立刻执行

CREATE TRIGGER tri_insert_student AFTER INSERT ON studentinfo FOR EACH ROW 表示创建的触发器为tri_insert_student , 后面的AFTER表示插入后执行,可选BEFORE, 后面的INSERT表示插入时触发器执行, ON studentinfo 表示触发器定义在某表中, FOR EACH ROW表示每行都会触发。

代码语言:javascript复制
begin
	IF (SELECT 1 FROM cdc_opt_log WHERE SID=new.ID) THEN 
		UPDATE cdc_opt_log SET optype='I',opflag='未处理' WHERE SID=new.ID;
	ELSE
		INSERT INTO cdc_opt_log(SID, optype, opflag) VALUES (new.ID, 'I', '未处理');
	END IF;
end

表示触发器的语句,其中new 为支持的关键字,表示修改后的行,old表示修改前的行。

DELIMITER ;表示修改定界符回默认的;。

  1. 创建UPDATE触发器 创建INSERT触发器tri_update_student
代码语言:javascript复制
DROP TRIGGER IF EXISTS tri_update_student;
DELIMITER //
CREATE TRIGGER tri_update_student AFTER UPDATE ON studentinfo FOR EACH ROW 
begin
	IF (SELECT 1 FROM cdc_opt_log WHERE SID=new.ID) THEN 
		UPDATE cdc_opt_log SET optype='U',opflag='未处理' WHERE SID=new.ID;
	ELSE
		INSERT INTO cdc_opt_log(SID, optype, opflag) VALUES (new.ID, 'U', '未处理');
	END IF;
end
//
DELIMITER ;

这段语句与插入触发器基本类似,不再重复描述

  1. 创建DELETE触发器

创建INSERT触发器tri_delete_student

代码语言:javascript复制
DROP TRIGGER IF EXISTS tri_delete_student;
DELIMITER //
CREATE TRIGGER tri_delete_student AFTER DELETE ON studentinfo FOR EACH ROW 
begin
	IF (SELECT 1 FROM cdc_opt_log WHERE SID=old.ID) THEN 
		UPDATE cdc_opt_log SET optype='D',opflag='未处理' WHERE SID=old.ID;
	ELSE
		INSERT INTO cdc_opt_log(SID, optype, opflag) VALUES (old.ID, 'D', '未处理');
	END IF;
end
//
DELIMITER ;
  1. 转换设计
  1. 具体步骤

(1)Table Input :GetCDCOptlog

代码语言:javascript复制
   SELECT
    optype,
    '已完成' as res,
    SID
   FROM cdc_opt_log
   WHERE opflag='未处理'

(2)Switch Case :IsDeleteOperation

(3)Table Input:GetStudentCDC sql语句:

代码语言:javascript复制
   SELECT
     ID
   , Name
   , Gender
   , Class
   , Age
   , Score
   , Height
   , PhoneNumber
   , createtimestamp
   , modifytimestamp
   , ? as optype
   , ? as res
   , CURRENT_TIMESTAMP as loadtimestamp
   FROM studentinfo
   WHERE ID=?

(4)Insert Update:InsertOrUpdateStudentCDCtoSyncTable 这一步骤会将新插入的数据更新到studentsync。

(5)Insert Update:UpdateCDCOptLog 这一步骤会将cdc_opt_log表在的optflag字段修改为res值(已完成)

(6)Delete : DeleteStudentFromSyncTable

这一步骤会删除studentsync中的指定ID的记录

(7)Insert / Update : UpdateCDCOptLog2

至此:转换编写完成,如有需要,请留言,我会把ktr文件发到邮箱中

基于快照的CDC案例

实验原理

如果没有时间戳,不允许使用触发器,就要使用快照表。可以通过比较源表和快照表来获得数据变化。 基于快照的CDC可以检测到插入、更新和删除的数据,这是相对于基于时间戳的CDC方案的有点。其缺点是需要大量存储空间来保存快照。

实验步骤

  1. 复制表中全部数据的几种方法 复制studentinfo表中数据到新表studentinfobak1 可选用的方法, 参考: https://blog.csdn.net/weixin_39732609/article/details/113555925
代码语言:javascript复制
   1、复制表结构及数据到新表
    CREATE TABLE 新表SELECT * FROM 旧表
    这种方法会将oldtable中所有的内容都拷贝过来,当然我们可以用delete from newtable;来删除。
    不过这种方法的一个最不好的地方就是新表中没有了旧表的primary key、Extra(auto_increment)等属性。需要自己用 alter 添加,而且容易搞错。
    
    2、只复制表结构到新表
    CREATE TABLE 新表 SELECT * FROM 旧表 WHERE 1=2
    或CREATE TABLE 新表 LIKE 旧表
    
    3、复制旧表的数据到新表(假设两个表结构一样)
    INSERT INTO 新表SELECT * FROM 旧表
    
    4、复制旧表的数据到新表(假设两个表结构不一样)
    INSERT INTO 新表(字段1,字段2,.......) SELECT 字段1,字段2,...... FROM 旧表
    
    5、可以将表1结构复制到表2
    SELECT * INTO 表2 FROM 表1 WHERE 1=2
    
    6、可以将表1内容全部复制到表2
    SELECT * INTO 表2 FROM 表1
    
    7、 show create table 旧表;
    这样会将旧表的创建命令列出。我们只需要将该命令拷贝出来,更改table的名字,就可以建立一个完全一样的表
    
    8、mysqldump
    用mysqldump将表dump出来,改名字后再导回去或者直接在命令行中运行
    
    9、复制旧数据库到新数据库(复制全部表结构并且复制全部表数据)
    #mysql -u root -ppassword
    >CREATE DATABASE new_db;
    #mysqldump old_db -u root -ppassword--skip-extended-insert --add-drop-table | mysql new_db -u root -ppassword
    
    10、表不在同一数据库中(如,db1 table1, db2 table2)
    sql: insert into db1.table1 select * from db2.table2 (完全复制)
    insert into db1.table1 select distinct * from db2.table2(不复制重复纪录)
    insert into tdb1.able1 select top 5 * from db2.table2 (前五条纪录)
  1. 创建studentinfo表
代码语言:javascript复制
-- -----------------------------------------------
-- 创建kettledb数据库
-- -----------------------------------------------
CREATE DATABASE IF NOT EXISTS kettledb;
USE kettledb;

-- -----------------------------------------------
-- 创建studentinfo表
-- 添加两个时间戳字段:
--   ID : 学生记录主键,设置未自增序列 (AUTO_INCREMENT)
--        新增学生记录会ID会自动增加1
--   createtimestamp : 记录创建时间
--                     当记录被创建是自动设置当时时间
--   moditytimestamp : 记录修改时间
--                     当记录被更新是自动设置当时时间
-- -----------------------------------------------
DROP TABLE IF EXISTS studentinfo;
CREATE TABLE studentinfo  (
  ID int UNSIGNED PRIMARY KEY AUTO_INCREMENT,
  Name varchar(20),
  Gender char(2),
  Class char(10),
  Age tinyint UNSIGNED DEFAULT NULL,
  Score tinyint UNSIGNED DEFAULT NULL,
  Height tinyint UNSIGNED DEFAULT NULL,
  PhoneNumber char(11),
  createtimestamp datetime DEFAULT CURRENT_TIMESTAMP,
  modifytimestamp datetime ON UPDATE CURRENT_TIMESTAMP
);

-- -----------------------------------------------
-- 插入几条测试数据
-- createtimestamp和modifytimestamp会自动添加
-- -----------------------------------------------
INSERT INTO studentinfo (ID, Name, Gender, Class, Age, Score, Height, PhoneNumber) VALUES (1, 'Yi Zhang', 'M', '1701', 16, 78, 170, '19946554571');
INSERT INTO studentinfo (ID, Name, Gender, Class, Age, Score, Height, PhoneNumber) VALUES (2, 'Li, Er', 'M', '1701', 17, 80, 175, '19946554572');
INSERT INTO studentinfo (ID, Name, Gender, Class, Age, Score, Height, PhoneNumber) VALUES (3, 'Xun Xie', 'M', '1702', 18, 95, 168, '19946554573');
INSERT INTO studentinfo (ID, Name, Gender, Class, Age, Score, Height, PhoneNumber) VALUES (4, 'Ling Zhao', 'F', '1702', 19, 86, 180, '19946554574');
INSERT INTO studentinfo (ID, Name, Gender, Class, Age, Score, Height, PhoneNumber) VALUES (5, 'Ming Zhang', 'M', '1704', 20, 85, 185, '19946554575');
INSERT INTO studentinfo (ID, Name, Gender, Class, Age, Score, Height, PhoneNumber) VALUES (6, 'San Zhang', 'F', '1704', 18, 95, 169, '19946554576');
  1. 复制kettledb数据库中的studentinfo数据到新的表studentinfobak1
代码语言:javascript复制
use kettledb;
show create table studentinfo;

返回studentinfo建表语句为:

代码语言:javascript复制
CREATE TABLE `studentinfo` (
  `ID` int(10) unsigned NOT NULL AUTO_INCREMENT,
  `Name` varchar(20) DEFAULT NULL,
  `Gender` char(2) DEFAULT NULL,
  `Class` char(10) DEFAULT NULL,
  `Age` tinyint(3) unsigned DEFAULT NULL,
  `Score` tinyint(3) unsigned DEFAULT NULL,
  `Height` tinyint(3) unsigned DEFAULT NULL,
  `PhoneNumber` char(11) DEFAULT NULL,
  `createtimestamp` datetime DEFAULT CURRENT_TIMESTAMP,
  `modifytimestamp` datetime DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP,
  PRIMARY KEY (`ID`)
) ENGINE=InnoDB AUTO_INCREMENT=7 DEFAULT CHARSET=utf8

根据studentinfo建表语句创建studentinfobak1表,注意这里修改了表名为studentinfobak1。

代码语言:javascript复制
CREATE TABLE `studentinfobak1` (
  `ID` int(10) unsigned NOT NULL AUTO_INCREMENT,
  `Name` varchar(20) DEFAULT NULL,
  `Gender` char(2) DEFAULT NULL,
  `Class` char(10) DEFAULT NULL,
  `Age` tinyint(3) unsigned DEFAULT NULL,
  `Score` tinyint(3) unsigned DEFAULT NULL,
  `Height` tinyint(3) unsigned DEFAULT NULL,
  `PhoneNumber` char(11) DEFAULT NULL,
  `createtimestamp` datetime DEFAULT CURRENT_TIMESTAMP,
  `modifytimestamp` datetime DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP,
  PRIMARY KEY (`ID`)
) ENGINE=InnoDB AUTO_INCREMENT=7 DEFAULT CHARSET=utf8
  1. 快照备份 将studentinfo中的数据备份到studentinfobak1 中
代码语言:javascript复制
INSERT INTO studentinfobak1 SELECT * FROM studentinfo
  1. 修改studentinfo表中数据 执行的sql:
代码语言:javascript复制
-- 触发插入
INSERT INTO studentinfo (ID,Name,Gender,Class,Age,Score,Height,PhoneNumber) VALUES (10,'Shan Zhang','M','1704',18,91,171,'19946554576');

-- 触发删除
DELETE FROM studentinfo WHERE Name='San Zhang';

-- 触发更新
UPDATE studentinfo SET Score=92 WHERE ID=1;

-- 清空操作日志表
-- TRUNCATE cdc_opt_log;
  1. 复制studentinfo表中数据到新表studentinfobak2 创建studentinfobak2表
代码语言:javascript复制
CREATE TABLE `studentinfobak2` (
  `ID` int(10) unsigned NOT NULL AUTO_INCREMENT,
  `Name` varchar(20) DEFAULT NULL,
  `Gender` char(2) DEFAULT NULL,
  `Class` char(10) DEFAULT NULL,
  `Age` tinyint(3) unsigned DEFAULT NULL,
  `Score` tinyint(3) unsigned DEFAULT NULL,
  `Height` tinyint(3) unsigned DEFAULT NULL,
  `PhoneNumber` char(11) DEFAULT NULL,
  `createtimestamp` datetime DEFAULT CURRENT_TIMESTAMP,
  `modifytimestamp` datetime DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP,
  PRIMARY KEY (`ID`)
) ENGINE=InnoDB AUTO_INCREMENT=7 DEFAULT CHARSET=utf8
  1. 快照备份 将studentinfo中的数据备份到新的备份表studentinfobak2 中:
代码语言:javascript复制
INSERT INTO studentinfobak2 SELECT * FROM studentinfo
  1. kettle步骤

(1)设计转换 table input 为第一次的备份studentinfobak1中的数据 table input 2 为第二次的备份studentinfobak2 中的数据 通过比较两份数据中的差(Merge rows diff),即可获得增量数据, 再通过synchronize after merge中进行合并操作。synchronize after merge空间常与Merge rows diff联合使用,用于合并后同步信息 =根据某个字段值的条件插入,删除,更新数据库表

(2)table input1

(3)table input2

(4)merge rows (diff) 这里会将Table input作为参照表,Table input 2 作为比较表,根据ID进行匹配,ID相同的进行比较,如果ID仅存在与Table input 中,而在Table input 2 不存在,表示该数据被删除了。如果ID不存在与Table input 中,而在Table input 2 存在,表示该数据为新增。如果ID同时存在与Table input和Table input 2中,比较其ID ,Name,Gender…等字段,如果这些字段在两个表输入不相同,表示该数据进行过修改。

有提示建议先排序,可以在merge rows前执行下sort row操作,读者可以再添加排序步骤

(5)synchronize after merge设置 General选项通用设置: Target Schema为比较两份快照后端增量输出表,保存这两份快照中的增量数据。 Table field表示目标标准的ID字段,Stream field1 表示上一步merge rows diff输出的字段。 最下面的update fields表示要更新的字段。 第一次执行,可以选择SQL—>Execute生成目标表。

Advanced选项设置高级

至此:转换编写完成,如有需要,请留言,我会把ktr文件发到您的邮箱

0 人点赞