前言
目前用户常用的两款大数据架构包括EMR(数据建模和建仓场景,支持hive、spark、presto等引擎)和DLC(数据湖分析场景,引擎支持spark、presto引擎),其中EMR场景存储为HDFS(支持本地盘和对象存储cos),数据格式支持Iceberg、orc、parquet、text等,均支持内外表;DLC场景存储为cos,内表数据格式为Iceberg,外表数据格式为orc和text。下文通过离线和实时两种模式描述如何通过Inlong实现mysql数据的同步到HDFS和DLC,同时实现下游用户可读。
采集方案能力组合
场景 | 类型 | 模式 | 建议场景 | 场景 方案推荐度 |
---|---|---|---|---|
EMR | 离线 | Append | 数据表 日志型 | 数据表:有保留数据天级变更状态诉求,推荐采用此方案 1、读取数据对采集源端产生压力; 2、终态数据需要业务根据主键合并; 3、增量模式采用分区的处理办法,分区可以保留源端数据变更的全状态记录;日志型:日志使用该方案较少 |
Overwrite | 数据表 | 数据表:无保留数据变更状态诉求,推荐采用此方案 1、读取数据对采集源端产生压力; 2、重写过程中hive会锁定,无法使用; 3、终态数据需要业务根据主键合并; 4、全量模式适用小表,增量模式适用大表; | ||
实时 | Append | 数据表 日志型 | 数据表:降低源端压力诉求或者binlog备份可采用此方案 1、Binlog的方式,读取数据对采集源端压力较小; 2、终态数据需要业务合并; 3、增量适用数据大小表;日志型:推荐方案 | |
DLC | 离线 | Append | 数据表 日志型 | 数据表:方案等同EMR-Append,但是DLC底层支持upsert语义,此方案并不建议 1、读取数据对采集源端产生压力; 2、终态数据需要业务合并; 3、增量模式采用分区的处理办法,分区可以保留源端数据变更的全状态记录日志型:日志使用该方案较少 |
Overwrite | 数据表 | 数据表:方案等同EMR-Overwrite,但是DLC底层支持upsert语义,此方案并不建议 1、读取数据对采集源端产生压力; 2、重写过程中hive会锁定,无法使用; 3、终态数据需要业务根据主键合并; 4、全量模式适用小表,增量模式适用大表 | ||
Upsert | 数据表 | 数据表:推荐采用此方案 1、读取数据对采集源端产生压力; 2、终态数据需要借助引擎的小文件合并能力 | ||
实时 | Append | 数据表 日志型 | 数据表:此方案适用binlog备份 1、 binlog的方式,读取数据对采集源端压力较小; 2、终态数据需要业务合并; 3、增量模式下能感知物理删除日志型:推荐方案 | |
Upsert | 数据表 | 数据表:推荐采用此方案 1、binlog的方式,读取数据对采集源端压力较小; 2、需要开启小文件合并; 3、数据维护成本较低 |
场景推荐方案
场景 | 用户诉求 | 选型 | 接入成本 |
---|---|---|---|
EMR | 离线同步,保留历史状态 | Append Hive分区表 Hive全量表 目标视图 | 中 |
离线同步,不保留历史状态 | Overwrite Hive非分区表 Hive全量表 目标视图 | 相对低 | |
离线同步-小表 | Overwrite模式 Hive目标表 | 低 | |
实时同步 | Append Hive分区表 Hive全量表 目标视图 | 中 | |
DLC | 离线同步 | Upsert Hive非分区目标表 | 低 |
实时同步 | Upsert Hive目标表 | 低 |
EMR场景-CDH相同
EMR场景现在主流存储格式为HDFS-ORC。数据采集到HDFS主要由离线和实时两类方案,离线引擎为DataX,实时引擎为Flink。系统架构图如下:
说明:目前离线支持Append和Overwrite模式,实时支持Append模式,下文展开各模式的数据处理方案。
离线类型
离线采集类型目前支持两种写入模式,Append适用于增量、Overwrite适用于小表全量和大表增量场景,因HDFS数据本身不具备更新能力,所以在增量场景下需要额外的Merge任务对数据进行加工处理。
Append模式
Append模式下可以写入hive非分区表或者分区表,两类表的数据都需要落地之后经过任务合并处理。但Mysql端可能存在大量的DML操作,非分区表在积累一定时间周期后读取最新数据成本会越来越高,所以建议写入hive分区表。
备注:数据唯一性问题
1、 单表场景:单表数据存在主键即可保证数据唯一性
2、 分库分表场景:数据表的主键非全局唯一,需要业务使用方识别;数据表的主键全局唯一,数据唯一性正常
分区表
配置方式
Mysql源表
代码语言:javascript复制CREATE DATABASE streamdemo;
USE streamdemo;
CREATE TABLE `productinfo` (
`product_id` int(11) NOT NULL COMMENT '商品ID',
`product_name` varchar(100) DEFAULT NULL COMMENT '商品名称',
`price` float DEFAULT NULL COMMENT '商品价格',
`create_time` datetime DEFAULT NULL COMMENT '上架时间',
`update_time` datetime DEFAULT NOT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
`description` varchar(500) DEFAULT NULL COMMENT '商品描述',
PRIMARY KEY (`product_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8
Hive 分区表
代码语言:javascript复制CREATE TABLE `default.productinfo_offline`(
`product_id` int COMMENT '商品ID',
`product_name` string COMMENT '商品名称',
`price` float COMMENT '商品价格',
`create_time` timestamp COMMENT '上架时间',
`update_time` timestamp COMMENT '更新时间',
`description` string COMMENT '商品描述'
) PARTITIONED BY (`pt` string) stored as orc
任务配置
说明点:
1. 通常离线采集上一天数据,示例是根据update_time采集,需要在《筛选条件》处填写 update_time=${yyyyMMdd-1d},时间函数参考详见数据集成 时间参数说明-操作指南-文档中心-腾讯云
2. 这里的分区定义是根据productinfo的update_time生成,同时源字段是datetime类型,Hive分区字段pt是string类型,需要经过DATE_FORMAT(update_time,'%Y-%m-%d')做转换;
3. 离线同步采用的源端数据库函数,当前示例mysql数据源
源表配置 | 函数 | 目标表配置 |
---|---|---|
DATE_FORMAT(update_time,'%Y-%m-%d') | Mysql函数 | pt |
字段配置示例
数据合并流程
因Append模式写入的数据并不会对主键去重,所以完成一次采集后需要经过下游业务去重处理。逻辑如下:
操作流程描述:以下操作基于2023年1月14开始,后续时间自行替换
1. Inlong实时将1月14号及之前的全量数据(任务配置选择全量模式即可:第一次运行会全量同步,目标分区表根据update_time动态生成多个分区)写入${T}_增量_分区表。以下操作只在第一初始化操作:第一次的全量表${T}_全量_{20230113}数据通过insert into select * 的方式全取自第一同步的${T}_增量_分区表
2. Inlong的任务根据业务时间(示例场景为update_time)写到对应分区
3. Wedata配置定时任务生成一个${T}_全量_{20230113}数据表,处理逻辑:${T}_全量_{20230114}=${T}_全量_{20230113} ${T}_增量_{20230114}(根据业务属性做Merge)
4. Wedata完成Merge之后,基于${T}_全量_{20230113} =${T}视图(此处也可以不采用视图方案,${T}_全量只保留一份,每次采用Overwrite模式写入。此方案优势:不用清除历史${T}_全量_{时间}数据和维护视图;缺点:Overwrite模式可能存在风险,合并过程中存在锁)
5. 系统重复1-4步骤
数据合并逻辑
实现原理:当天分区最新的数据 上一天之前的全量数据(根据当天分区数据主键去除当天数据),合并代码如下
代码语言:javascript复制#合并逻辑 (基于 union row_number, 优点完全去重, 缺点资源浪费)
create table IF NOT EXISTS `T_全量_20230114` as
select
product_id,
product_name,
price,
create_time,
update_time,
description
from
(
select
*,
row_number() over(
partition by `product_id`
order by
`update_time` desc
) as rn
from
(
select
*
from
`T_全量_20230113`
union
all
select
*
from
`T_增量_分区表`
where pt = '20230114'
) a
) b
where
b.rn = 1
#创建视图
alter view `T` as select * from `T_全量_20230114`
关注点
1. 关注点1:什么时候做合并和创建视图
答复:离线分区写入任务可配置成Wedata调度任务,调度任务完成通过依赖触发下游的Merge任务
2. 关注点2:数据修复如何处理
答复:人工处理
3. 关注点3:DDL操作如何处理
答复:人工处理
4. 关注点4:下游读取view过程中,alter view是否会死锁
答复:alter view可以正常执行,下游读取的是view对应的源表,不影响
关注点5:${T}_全量_{时间}占用大量存储空间
答复:借助数据治理能力,设置该类表的生命周期为N天,定时删除历史数据
非分区表
配置基本等同于分区表配置,差异点主要在以下两方面
1. 任务配置阶段不需要对update_time做字段转换映射hive的分区字段
2. 任务Merge过程根据update_time读取数据,但是因为非分区表,当前扫描文件的量较大,同时随着时间的积累,文件会越来越多,导致性能会越来越差。
Overwrite模式
全量表场景
导入任务写入同一个表,每次导入都是讲全量的最新数据写入到目标表,下游可直接使用
配置方式
增量表场景
增量模式的数据处理逻辑类似Append模式下的分区表逻辑,区别主要在导入表是通过Overwrite模式,通过增量数据覆盖,无法追溯数据的历史状态。
配置方式
Mysql源表
代码语言:javascript复制CREATE DATABASE streamdemo;
USE streamdemo;
CREATE TABLE `productinfo` (
`product_id` int(11) NOT NULL COMMENT '商品ID',
`product_name` varchar(100) DEFAULT NULL COMMENT '商品名称',
`price` float DEFAULT NULL COMMENT '商品价格',
`create_time` datetime DEFAULT NULL COMMENT '上架时间',
`update_time` datetime DEFAULT NOT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
`description` varchar(500) DEFAULT NULL COMMENT '商品描述',
PRIMARY KEY (`product_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8
Hive 分区表
代码语言:javascript复制CREATE TABLE `default.productinfo_offline`(
`product_id` int COMMENT '商品ID',
`product_name` string COMMENT '商品名称',
`price` float COMMENT '商品价格',
`create_time` timestamp COMMENT '上架时间',
`update_time` timestamp COMMENT '更新时间',
`description` string COMMENT '商品描述'
) stored as orc
任务配置
说明点:
1. 通常离线采集上一天数据,示例是根据update_time采集,需要在《筛选条件》处填写 update_time=${yyyyMMdd-1d},时间函数参考详见数据集成 时间参数说明-操作指南-文档中心-腾讯云
数据合并流程
因写入的数据并不会对主键去重,所以完成一次采集后需要经过下游业务去重处理。逻辑如下:
操作流程描述:以下操作基于2023年1月14开始,后续时间自行替换
1. Inlong将1月14号及之前的全量数据(任务配置选择全量模式即可:第一次运行会全量同步,目标分区表根据update_time动态生成多个分区)写入${T}_增量。以下操作只在第一初始化操作:第一次的全量表${T}_全量_{20230113}数据通过insert into select * 的方式全取自第一同步的${T}_增量
2. Inlong的任务根据业务时间(示例场景为update_time)写到对应分区
3. Wedata配置定时任务生成一个${T}_全量_{20230113}数据表,处理逻辑:${T}_全量_{20230114}=${T}_全量_{20230113} ${T}_增量(根据业务属性做Merge)
4. Wedata完成Merge之后,基于${T}_全量_{20230113} =${T}视图(此处也可以不采用视图方案,${T}_全量只保留一份,每次采用Overwrite模式写入。此方案优势:不用清除历史${T}_全量_{时间}数据和维护视图;缺点:Overwrite模式可能存在风险,合并过程中存在锁)
5. 系统重复1-4步骤
数据合并逻辑
实现原理:当天分区最新的数据 上一天之前的全量数据(根据当天分区数据主键去除当天数据),合并代码如下
代码语言:javascript复制#合并逻辑 (基于 union row_number, 优点完全去重, 缺点资源浪费)
create table IF NOT EXISTS `T_全量_20230114` as
select
product_id,
product_name,
price,
create_time,
update_time,
description
from
(
select
*,
row_number() over(
partition by `product_id`
order by
`update_time` desc
) as rn
from
(
select
*
from
`T_全量_20230113`
union
all
select
*
from
`T_增量`
) a
) b
where
b.rn = 1
#创建视图
alter view `T` as select * from `T_全量_20230114`
关注点
1. 关注点1:什么时候做合并和创建视图
答复:离线分区写入任务可配置成Wedata调度任务,调度任务完成通过依赖触发下游的Merge任务
2. 关注点2:数据修复如何处理
答复:人工处理
3. 关注点3:DDL操作如何处理
答复:人工处理
4. 关注点4:下游读取view过程中,alter view是否会死锁
答复:alter view可以正常执行,下游读取的是view对应的源表,不影响
关注点5:${T}_全量_{时间}占用大量存储空间
答复:借助数据治理能力,设置该类表的生命周期为N天,定时删除历史数据
实时类型
实时写入通过订阅Mysql-binlog,将binlog DDL操作标识写入HDFS,后续基于操作标识Merger产出最新的数据镜像,实时写入流程图
当前实时写入hive只支持append模式,hive目标表可为非分区表或者分区表,两类表的数据都需要落地之后经过任务合并处理。但mysql端可能存在大量的DML操作,非分区表在积累一定时间周期后读取最新数据成本会越来越高,所以在实时写入场景,建议写入hive分区表。
Append模式
分区表
配置方式
mysql源表
代码语言:javascript复制CREATE DATABASE streamdemo;
USE streamdemo;
CREATE TABLE `productinfo` (
`product_id` int(11) NOT NULL COMMENT '商品ID',
`product_name` varchar(100) DEFAULT NULL COMMENT '商品名称',
`price` float DEFAULT NULL COMMENT '商品价格',
`create_time` datetime DEFAULT NULL COMMENT '上架时间',
`update_time` datetime DEFAULT NOT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
`description` varchar(500) DEFAULT NULL COMMENT '商品描述',
PRIMARY KEY (`product_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8
hive 分区表
代码语言:javascript复制CREATE TABLE `default.productinfo_uid`(
`uid` string COMMENT '唯一键',
`type_Wedata_di` string,
`ts_Wedata_di` timestamp,
`processing_time_Wedata_di` timestamp,
`product_id` int COMMENT '商品ID',
`product_name` string COMMENT '商品名称',
`price` float COMMENT '商品价格',
`create_time` timestamp COMMENT '上架时间',
`update_time` timestamp COMMENT '更新时间',
`description` string COMMENT '商品描述'
) PARTITIONED BY (`pt` string) stored as orc
特殊字段说明
该部分字段在实时任务同步过程中引擎自定生成,主要用来实现主键扩展,DDL操作标识,为下游区分有效数据
字段 | 内容 |
---|---|
database_Wedata_di | streamdemo |
table_Wedata_di | productinfo |
type_Wedata_di | INSERT、UPDATE、DELETE |
ts_Wedata_di | 2023-01-30 14:31:42.0 事件时间 |
processing_time_Wedata_di | 2023-01-30 14:31:43.019 处理时间 |
任务配置
说明点:
分区定义是根据productinfo的update_time生成,同时源字段是datetime类型,hive分区字段pt是string类型,需要经过to_date(cast(update_time as string),'yyyy-MM-dd')做转换;
目标表数据唯一性
1、 单表场景:单表数据存在主键即可保证数据唯一性
2、 分库分表场景:源表的主键非全局唯一,可通过database_Wedata_di ||'_'|| table_Wedata_di || '_' || CAST(product_id AS string)组成唯一键;数据表的主键全局唯一,直接使用product_id作为目标表唯一键
实时同步采用的Flink函数,支持函数列表:系统(内置)函数 | Apache Flink
源表配置 | 函数 | 目标表配置 |
---|---|---|
to_date(cast(update_time as string),'yyyy-MM-dd') | Flink函数 | pt |
database_Wedata_di ||'_'|| table_Wedata_di || '_' || CAST(product_id AS string) | Flink函数 | uid(可选) |
数据合并流程
操作流程描述:以下操作基于2023年1月14开始,后续时间自行替换
1. Inlong实时将1月14号及之前的全量数据(任务配置选择全量模式即可:第一次运行会全量同步,目标分区表根据update_time动态生成多个分区)写入${T}_增量_{20230114}。以下操作只在第一初始化操作:第一次的全量表${T}_全量_{20230113}数据通过insert into select * 的方式全取自第一同步的${T}_增量
2. Inlong的任务根据业务时间(示例场景为update_time)写到对应分区
3. Wedata配置定时任务生成一个${T}_全量_{20230113}数据表,处理逻辑:${T}_全量_{20230114}=${T}_全量_{20230113} ${T}_增量_{20230114}(根据业务属性做Merge)
4. Wedata完成Merge之后,基于${T}_全量_{20230113} =${T}视图(此处也可以不采用视图方案,${T}_全量只保留一份,每次采用Overwrite模式写入。此方案优势:不用清除历史${T}_全量_{时间}数据和维护视图;缺点:Overwrite模式可能存在风险,合并过程中存在锁)
5. 系统重复1-4步骤
数据合并逻辑
实现原理:当天分区最新的数据 上一天之前的全量数据(根据当天分区数据主键去除当天数据),合并代码如下
代码语言:javascript复制#合并逻辑 (基于 union row_number, 优点完全去重, 缺点资源浪费)
create table IF NOT EXISTS `T_全量_20230114` as
select
uid,
type_Wedata_di
ts_Wedata_di,
product_id,
product_name,
price,
create_time,
update_time,
description
from
(
select
*,
row_number() over(
partition by `uid`
order by
`ts_Wedata_di` desc
) as rn
from
(
select
*
from
`T_全量_20230113`
union
all
select
*
from
`T_增量_分区表`
where pt='20230114'
) a
) b
where
b.rn = 1
and b.`type_Wedata_di` <> 'DELETE';
#创建视图
alter view `T` as select product_id,product_name,price,create_time,update_time,description from `T_全量_20230114`
关注点
1. 关注点1:什么时候做合并和创建视图
答复:
1. 方案1:目前整条数据链路计划延迟控制在15分钟内,所以Merge任务可以00:15分后开始执行,完成之后执行创建视图
2. 方案2:编写shell脚本检测${T}_增量当天新分区的生成情况,如果已经生成则通过Wedata-event触发Merge任务
2. 关注点2:数据修复如何处理
答复:人工处理
3. 关注点3:DDL操作如何处理
答复:人工处理
4. 关注点4:下游读取View过程中,Alter View是否会死锁
答复:Alter View可以正常执行,下游读取的是View对应的源表,不影响
关注点5:${T}_全量_{时间}占用大量存储空间
答复:借助数据治理能力,设置该类表的生命周期为N天,定时删除历史数据
非分区表
配置基本等同于分区表配置,差异点主要在以下两方面
1. 任务配置阶段不需要对update_time做字段转换映射hive的分区字段
2. 任务Merge过程不是根据分区读取数据,而是根据update_time读取数据。当前模式文件扫描性能会越来越差
DLC场景
DLC场景现在主流存储格式为Iceberg(以下建表均为内表)。数据采集到支持离线和实时两种类型,离线引擎为DataX,实时引擎为Flink。系统架构图如下:
离线类型
Append模式
等同于EMR场景离线类型的Append模式,这里不做描述。
Overwrite模式
等同于EMR场景离线类型的Overwrite模式,这里不做描述
Upsert模式
当前模式主要通过离线写入并更新的方式生成目标数据内容,下游用户可以无感查询最新数据。当前目标表支持分区和非分区表,在实时数据湖场景下,非分区表更加适用。
非分区表
配置方式
Mysql源表
代码语言:javascript复制CREATE DATABASE streamdemo;
USE streamdemo;
CREATE TABLE `productinfo` (
`product_id` int(11) NOT NULL COMMENT '商品ID',
`product_name` varchar(100) DEFAULT NULL COMMENT '商品名称',
`price` float DEFAULT NULL COMMENT '商品价格',
`create_time` datetime DEFAULT NULL COMMENT '上架时间',
`update_time` datetime DEFAULT NOT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
`description` varchar(500) DEFAULT NULL COMMENT '商品描述',
PRIMARY KEY (`product_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8
DLC内表结构
代码语言:javascript复制CREATE TABLE IF NOT EXISTS `DataLakeCatalog`.`rikhuang_iceberg`.`productinfo_iceberg` (
`product_id` int,
`product_name` string,
`price` float,
`create_time` timestamp,
`update_time` timestamp,
`description` string
) TBLPROPERTIES (
'format-version' = '2',
'write.distribution-mode' = 'hash',
'write.Merge.mode' = 'Merge-on-read',
'write.parquet.bloom-filter-enabled.column.addr' = 'true',
'write.parquet.bloom-filter-enabled.column.id' = 'true',
'write.update.mode' = 'Merge-on-read',
'write.upsert.enabled' = 'true',
'write.metadata.delete-after-commit.enabled' = 'true',
'write.metadata.previous-versions-max' = '100',
'write.metadata.metrics.default' = 'full',
'dlc.ao.data.govern.inherit' = 'default',
'dlc.ao.Merge.data.enable' = 'enable',
'dlc.ao.Merge.data.engine' = {用户创建的spark引擎name},
'dlc.ao.Merge.data.min-input-files' = '5',
'dlc.ao.Merge.data.target-file-size-bytes' = '536870912',
'dlc.ao.Merge.data.interval-min' = '60',
'dlc.ao.expired.snapshots.enable' = 'enable',
'dlc.ao.expired.snapshots.engine' = {用户创建的spark引擎name},
'dlc.ao.expired.snapshots.retain-last' = '5',
'dlc.ao.expired.snapshots.before-days' = '2',
'dlc.ao.expired.snapshots.max-concurrent-deletes' = '25',
'dlc.ao.expired.snapshots.interval-min' = '60',
'dlc.ao.remove.orphan.enable' = 'enable',
'dlc.ao.remove.orphan.engine' = {用户创建的spark引擎name},
'dlc.ao.remove.orphan.before-days' = '3',
'dlc.ao.remove.orphan.max-concurrent-deletes' = '25',
'dlc.ao.remove.orphan.interval-min' = '120',
'dlc.ao.Merge.manifests.enable' = 'enable',
'dlc.ao.Merge.manifests.engine' = {用户创建的spark引擎name},
'dlc.ao.Merge.manifests.interval-min' = '120'
);
表描述:DLC表采用MOR的模式写入和读取,如果不做治理,实时读取性能会非常糟糕,比对数据详见附录。
表治理:DLC当前可以配置表治理规则,表治理工作可由DLC的内置调度机制完成,表治理规则如下:
任务配置
目标表数据唯一性
1、 单表场景:单表数据存在主键即可保证数据唯一性
2、 分库分表场景:源表的主键非全局唯一,当前场景会出现数据覆盖的问题;数据表的主键全局唯一,直接使用product_id作为目标表唯一键
关注点
1. 关注点1:数据修复如何处理
答复:人工处理
2. 关注点2:DDL操作如何处理
答复:人工处理
3. 关注点3:DLC表的治理规则
答复:可在DLC数据管理,创建原生表阶段可视化配置
分区表
配置基本等同于非分区表配置,差异点主要在以下两方面
1. 任务配置阶段需要对update_time做字段转换映射DLC的分区字段
2. 目标表的唯一键除上述常规配置之外需要加入分区字段(不然会报错)
实时类型
Upsert模式
当前模式主要通过实时写入并更新的方式生成目标数据内容,下游用户可以无感查询最新数据。当前目标表支持分区和非分区表,在实时数据湖场景下,非分区表更加适用。
非分区表
配置方式
Mysql源表
代码语言:javascript复制CREATE DATABASE streamdemo;
USE streamdemo;
CREATE TABLE `productinfo` (
`product_id` int(11) NOT NULL COMMENT '商品ID',
`product_name` varchar(100) DEFAULT NULL COMMENT '商品名称',
`price` float DEFAULT NULL COMMENT '商品价格',
`create_time` datetime DEFAULT NULL COMMENT '上架时间',
`update_time` datetime DEFAULT NOT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
`description` varchar(500) DEFAULT NULL COMMENT '商品描述',
PRIMARY KEY (`product_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8
DLC内表结构
代码语言:javascript复制CREATE TABLE IF NOT EXISTS `DataLakeCatalog`.`rikhuang_iceberg`.`productinfo_iceberg` (
`uid` string,
`product_id` int,
`product_name` string,
`price` float,
`create_time` timestamp,
`update_time` timestamp,
`description` string
) TBLPROPERTIES (
'format-version' = '2',
'write.distribution-mode' = 'hash',
'write.Merge.mode' = 'Merge-on-read',
'write.parquet.bloom-filter-enabled.column.addr' = 'true',
'write.parquet.bloom-filter-enabled.column.id' = 'true',
'write.update.mode' = 'Merge-on-read',
'write.upsert.enabled' = 'true',
'write.metadata.delete-after-commit.enabled' = 'true',
'write.metadata.previous-versions-max' = '100',
'write.metadata.metrics.default' = 'full',
'dlc.ao.data.govern.inherit' = 'default',
'dlc.ao.Merge.data.enable' = 'enable',
'dlc.ao.Merge.data.engine' = {用户创建的spark引擎name},
'dlc.ao.Merge.data.min-input-files' = '5',
'dlc.ao.Merge.data.target-file-size-bytes' = '536870912',
'dlc.ao.Merge.data.interval-min' = '60',
'dlc.ao.expired.snapshots.enable' = 'enable',
'dlc.ao.expired.snapshots.engine' = {用户创建的spark引擎name},
'dlc.ao.expired.snapshots.retain-last' = '5',
'dlc.ao.expired.snapshots.before-days' = '2',
'dlc.ao.expired.snapshots.max-concurrent-deletes' = '25',
'dlc.ao.expired.snapshots.interval-min' = '60',
'dlc.ao.remove.orphan.enable' = 'enable',
'dlc.ao.remove.orphan.engine' = {用户创建的spark引擎name},
'dlc.ao.remove.orphan.before-days' = '3',
'dlc.ao.remove.orphan.max-concurrent-deletes' = '25',
'dlc.ao.remove.orphan.interval-min' = '120',
'dlc.ao.Merge.manifests.enable' = 'enable',
'dlc.ao.Merge.manifests.engine' = {用户创建的spark引擎name},
'dlc.ao.Merge.manifests.interval-min' = '120'
);
表描述:DLC表采用MOR的模式写入和读取,如果不做治理,实时读取性能会非常糟糕,比对数据详见附录。
表治理:DLC当前可以配置表治理规则,表治理工作可由DLC的内置调度机制完成,表治理规则如下:
特殊字段说明
该部分字段在实时任务同步过程中引擎自定生成,主要用来实现主键扩展,DDL操作标识,为下游区分有效数据。当前DLC表模式下,如果源端非分库分表或者分库分表模式下主键不冲突,如下特殊字段可以不用采集。
目标表数据唯一性
1、 单表场景:单表数据存在主键即可保证数据唯一性
2、 分库分表场景:源表的主键非全局唯一,可通过database_Wedata_di ||'_'|| table_Wedata_di || '_' || CAST(product_id AS string)组成唯一键;数据表的主键全局唯一,直接使用product_id作为目标表唯一键
字段 | 内容 |
---|---|
database_Wedata_di | streamdemo |
table_Wedata_di | productinfo |
type_Wedata_di | INSERT、UPDATE、DELETE |
ts_Wedata_di | 2023-01-30 14:31:42.0 事件时间 |
processing_time_Wedata_di | 2023-01-30 14:31:43.019 处理时间 |
任务配置
说明点:
1. hive表中的主键uid字段由database_Wedata_di ||'_'|| table_Wedata_di || '_' || CAST(product_id AS string)组成
2. 实时同步采用的Flink函数,支持函数列表:系统(内置)函数 | Apache Flink
3. uid字段必须配置再唯一键中
源表配置 | 函数 | 目标表配置 |
---|---|---|
database_Wedata_di ||'_'|| table_Wedata_di || '_' || CAST(product_id AS string) | Flink函数 | uid |
关注点
1. 关注点1:数据修复如何处理
答复:人工处理
2. 关注点2:DDL操作如何处理
答复:人工处理
3. 关注点3:DLC表的治理规则
答复:可在DLC数据管理,创建原生表阶段可视化配置
分区表
配置基本等同于非分区表配置,差异点主要在以下两方面
3. 任务配置阶段需要对update_time做字段转换映射DLC的分区字段
4. 目标表的唯一键除上述常规配置之外需要加入分区字段(不然会报错)
Append模式
当前模式主要写入数据内容 DDL标识符,数据并不会自动更新。配置方式和原理等同于EMR场景-实时类型同步。这里不做描述。
附录
字典表
比对语法 | 执行SQL |
---|---|
count(1) | select count(id) from {table}; |
max(id) | select max(id),min(id) from {table} |
id asc | select * from {table} order by id ; |
id desc | select * from {table} order by id desc; |
k-max(id) | select * from {table} where id =max(id) ; |
k-min(id) | select * from {table} where id =min(id) ; |
all max | select max(id),max(name),max(addr),max(remark) from {table} ; |
SparkSQL 16CU
sparkSQL(16CUs) | ||||||||||
---|---|---|---|---|---|---|---|---|---|---|
表类型 | COS(orc)外部表 | 合并表(更新少) | 合并表(更新多) | 非合并表(更新少) | 非合并表(更新多) | |||||
写入量 | 1000万 | 1000万 | 2000万 | 1000万 | 2000万 | |||||
是否合并 | / | 是 | 是 | 否 | 否 | |||||
数据量 | 1000万 | 787万 | 930万 | 1000万 | 2000万 | |||||
容量 | 1.33GB | 1.57GB | 1.71GB | 3.42GB | ||||||
比对语法 | 耗时 | 扫描量 | 耗时 | 扫描量 | 耗时 | 扫描量 | 耗时 | 扫描量 | 耗时 | 扫描量 |
count(id) | 6s | 34MB | 7s | 35MB | 6s | 40MB | 37s | 172MB | 108s | 566MB |
max(id) | 6s | 34MB | 8s | 35MB | 7s | 40MB | 36s | 172MB | 108s | 566MB |
id asc | 40s | 3.3GB | 36s | 2.7GB | 43s | 3.1GB | 104s | 3.6GB | 257s | 7.7GB |
id desc | 38s | 3.3GB | 34s | 2.7GB | 41s | 3.1GB | 102s | 3.6GB | 255s | 7.7GB |
k-max(id) | 6s | 5.1MB | 5s | 35MB | 5s | 127MB | 8s | 51MB | 10s | 50MB |
k-min(id) | 7s | 16MB | 7s | 127MB | 7s | 48MB | 12s | 51MB | 12s | 104MB |
all max | 12s | 1.7GB | 12s | 1.3GB | 15s | 1.6GB | 47s | 1.8GB | 128s | 3.8GB |