基于Apache Hudi的多库多表实时入湖最佳实践

2022-12-09 21:09:31 浏览数 (2)

1. 前言

CDC(Change Data Capture)从广义上讲所有能够捕获变更数据的技术都可以称为CDC,但本篇文章中对CDC的定义限定为以非侵入的方式实时捕获数据库的变更数据。例如:通过解析MySQL数据库的Binlog日志捕获变更数据,而不是通过SQL Query源表捕获变更数据。Hudi 作为最热的数据湖技术框架之一, 用于构建具有增量数据处理管道的流式数据湖。其核心的能力包括对象存储上数据行级别的快速更新和删除,增量查询(Incremental queries,Time Travel),小文件管理和查询优化(Clustering,Compactions,Built-in metadata),ACID和并发写支持。Hudi不是一个Server,它本身不存储数据,也不是计算引擎,不提供计算能力。其数据存储在S3(也支持其它对象存储和HDFS),Hudi来决定数据以什么格式存储在S3(Parquet,Avro,…), 什么方式组织数据能让实时摄入的同时支持更新,删除,ACID等特性。Hudi通过Spark,Flink计算引擎提供数据写入, 计算能力,同时也提供与OLAP引擎集成的能力,使OLAP引擎能够查询Hudi表。从使用上看Hudi就是一个JAR包,启动Spark, Flink作业的时候带上这个JAR包即可。Amazon EMR 上的Spark,Flink,Presto ,Trino原生集成Hudi, 且EMR的Runtime在Spark,Presto引擎上相比开源有2倍以上的性能提升。在多库多表的场景下(比如:百级别库表),当我们需要将数据库(mysql,postgres,sqlserver,oracle,mongodb等)中的数据通过CDC的方式以分钟级别(1minute )延迟写入Hudi,并以增量查询的方式构建数仓层次,对数据进行实时高效的查询分析时。我们要解决三个问题,第一,如何使用统一的代码完成百级别库表CDC数据并行写入Hudi,降低开发维护成本。第二,源端Schema变更如何同步到Hudi表。第三,使用Hudi增量查询构建数仓层次比如ODS->DWD->DWS(各层均是Hudi表),DWS层的增量聚合如何实现。本篇文章推荐的方案是: 使用Flink CDC DataStream API(非SQL)先将CDC数据写入Kafka,而不是直接通过Flink SQL写入到Hudi表,主要原因如下,第一,在多库表且Schema不同的场景下,使用SQL的方式会在源端建立多个CDC同步线程,对源端造成压力,影响同步性能。第二,没有MSK做CDC数据上下游的解耦和数据缓冲层,下游的多端消费和数据回溯比较困难。CDC数据写入到MSK后,推荐使用Spark Structured Streaming DataFrame API或者Flink StatementSet 封装多库表的写入逻辑,但如果需要源端Schema变更自动同步到Hudi表,使用Spark Structured Streaming DataFrame API实现更为简单,使用Flink则需要基于HoodieFlinkStreamer做额外的开发。Hudi增量ETL在DWS层需要数据聚合的场景的下,可以通过Flink Streaming Read将Hudi作为一个无界流,通过Flink计算引擎完成数据实时聚合计算写入到Hudi表。

2. 架构设计与解析

2.1 CDC数据实时写入MSK

图中标号1,2是将数据库中的数据通过CDC方式实时发送到MSK(Amazon托管的Kafka服务)。flink-cdc-connectors[1]是当前比较流行的CDC开源工具。它内嵌debezium[2]引擎,支持多种数据源,对于MySQL支持Batch阶段(全量同步阶段)并行,无锁,Checkpoint(可以从失败位置恢复,无需重新读取,对大表友好)。支持Flink SQL API和DataStream API,这里需要注意的是如果使用SQL API对于库中的每张表都会单独创建一个链接,独立的线程去执行binlog dump。如果需要同步的表比较多,会对源端产生较大的压力。在需要整库同步表非常多的场景下,应该使用DataStream API写代码的方式只建一个binlog dump同步所有需要的库表。另一种场景是如果只同步分库分表的数据,比如user表做了分库,分表,其表Schema都是一样的,Flink CDC的SQL API支持正则匹配多个库表,这时使用SQL API同步依然只会建立一个binlog dump线程。需要说明的是通过Flink CDC可以直接将数据Sink到Hudi, 中间无需MSK,但考虑到上下游的解耦,数据的回溯,多业务端消费,多表管理维护,依然建议CDC数据先到MSK,下游再从MSK接数据写入Hudi。

2.2 CDC工具对比

图中标号3,除了flink-cdc-connectors之外,DMS(Amazon Database Migration Services)是Amazon 托管的数据迁移服务,提供多种数据源(mysql,oracle,sqlserver,postgres,mongodb,documentdb等)的CDC支持,支持可视化的CDC任务配置,运行,管理,监控。因此可以选择DMS作为CDC的解析工具,DMS支持将MSK或者自建Kafka作为数据投递的目标,所以CDC实时同步到MSK通过DMS可以快速可视化配置管理。当然除了DMS之外还有很多开源的CDC工具,也可以完成CDC的同步工作,但需要在EC2上搭建相关服务。下图列出了CDC工具的对比项,供大家参考

2.3 Spark Structured Streaming多库表并行写Hudi及Schema变更

图中标号4,CDC数据到了MSK之后,可以通过Spark/Flink计算引擎消费数据写入到Hudi表,我们把这一层我们称之为ODS层。无论Spark还是Flink都可以做到数据ODS层的数据落地,使用哪一个我们需要综合考量,这里阐述一些相对重要的点。首先对于Spark引擎,我们一定是使用Spark Structured Streaming 消费MSK写入Hudi,由于可以使用DataFrame API写Hudi, 因此在Spark中可以方便的实现消费CDC Topic并根据其每条数据中的元信息字段(数据库名称,表名称等)在单作业内分流写入不同的Hudi表,封装多表并行写入逻辑,一个Job即可实现整库多表同步的逻辑。样例代码截图如下,完整代码点击Github[3]获取

我们知道CDC数据中是带着I(insert)、U(update)、D(delete)信息的, 不同的CDC工具数据格式不同,但要表达的含义是一致的。使用Spark写入Hudi我们主要关注U、D信息,数据带着U信息表示该条数据是一个更新操作,对于Hudi而言只要设定源表的主键为Hudi的recordKey,同时根据需求场景设定precombineKey即可。这里对precombineKey做一个说明,它表示的是当数据需要更新时(recordKey相同), 默认选择两条数据中precombineKey的大保留在Hudi中。其实Hudi有非常灵活的Payload机制,通过参数hoodie.datasource.write.payload.class可以选择不同的Payload实现,比如Partial Update(部分字段更新)的Payload实现OverwriteNonDefaultsWithLatestAvroPayload,也可以自定义Payload实现类,它核心要做的就是如何根据precombineKey指定的字段更新数据。所以对于CDC数据Sink Hudi而言,我们需要保证上游的消息顺序,只要我们表中有能判断哪条数据是最新的数据的字段即可,那这个字段在MySQL中往往我们设计成数据更新时间modify_time timestamp DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP 。如果没有类似字段,建议定义设计规范加上这个字段,否则就必须保证数据有序(这会给架构设计和性能带来更多的阻力),不然数据在Hudi中Updata的结果可能就是错的。对于带着D信息的数据,它表示这条数据在源端被删除,Hudi是提供删除能力的,其中一种方式是当一条数据中包含_hoodie_is_deleted字段,且值为true是,Hudi会自动删除此条数据,这在Spark Structured Streaming 代码中很容易实现,只需在map操作实现添加一个字段且当数据中包含D信息设定字段值为true即可。

2.4 Flink StatementSet多库表CDC并行写Hudi

对于使用Flink引擎消费MSK中的CDC数据落地到ODS层Hudi表,如果想要在一个JOB实现整库多张表的同步,Flink StatementSet来实现通过一个Kafka的CDC Source表,根据元信息选择库表Sink到Hudi中。但这里需要注意的是由于Flink和Hudi集成,是以SQL方式先创建表,再执行Insert语句写入到该表中的,如果需要同步的表有上百之多,封装一个自动化的逻辑能够减轻我们的工作,你会发现SQL方式写入Hudi虽然对于单表写入使用上很方便,不用编程只需要写SQL即可,但也带来了一些限制,由于写入Hudi时是通过SQL先建表,Schema在建表时已将定义,如果源端Schema变更,通过SQL方式是很难实现下游Hudi表Schema的自动变更的。虽然在Hudi的官网并未提供Flink DataStream API写入Hudi的例子,但Flink写入Hudi是可以通过HoodieFlinkStreamer以DataStream API的方式实现,在Hudi源码[4]中可以找到。因此如果想要更加灵活简单的实现多表的同步,以及Schema的自动变更,需要自行参照HoodieFlinkStreamer代码以DataStream API的方式写Hudi。对于I,U,D信息,Flink的debezium ,maxwell,canal format会直接将消息解析 为Flink的changelog流,换句话说就是Flink会将I,U,D操作直接解析成Flink内部的数据结构RowData,直接Sink到Hudi表即可,我们同样需要在SQL中设定recordKey,precombineKey,也可以设定Payload class的不同实现类。

2.5 Flink Streaming Read模式读Hudi实现ODS层聚合

图中标号5,数据通过Spark/Flink落地到ODS层后,我们可能需要构建DWD和DWS层对数据做进一步的加工处理,(DWD和DWS并非必须的,根据你的场景而定,你可以直接让OLAP引擎查询ODS层的Hudi表)我们希望能够使用到Hudi的增量查询能力,只查询变更的数据来做后续DWD和DWS的ETL,这样能够加速构建同时减少资源消耗。对于Spark引擎,在DWD层如果仅仅是对数据做map,fliter等相关类型操作,是可以使用增量查询的,但如果DWD层的构建有Join操作,是无法通过增量查询实现的,只能全表(或者分区)扫描。DWS层的构建如果聚合类型的操作没有去重,窗口类型的操作,只是SUM, AVG,MIN, MAX等类型的操作,可以通过增量查询之后和目标表做Merge实现,反之,只能全表(或者分区)扫描。对于Flink引擎来构建DWD和DWS, 由于Flink 支持Hudi表的streaming read, 在SQL设定read.streaming.enabled= true,changelog.enabled=true等相关流式读取的参数即可。设定后Flink把Hudi表当做了一个无界的changelog流表,无论怎样做ETL都是支持的,Flink会自身存储状态信息,整个ETL的链路是流式的。

2.6 OLAP引擎查询Hudi表

图中标号6, EMR Hive/Presto/Trino 都可以查询Hudi表,但需要注意的是不同引擎对于查询的支持是不同的,参见官网[5],这些引擎对于Hudi表只能查询,不能写入。关于Schema的自动变更,首先Hudi自身是支持Schema Evolution[6],我们想要做到源端Schema变更自动同步到Hudi表,通过上文的描述,可以知道如果使用Spark引擎,可以通过DataFrame API操作数据,通过from_json动态生成DataFrame,因此可以较为方便的实现自动添加列。如果使用Flink引擎上文已经说明想要自动实现Schema的变更,通过HoodieFlinkStreamer以DataStream API的方式实现Hudi写入的同时融入Schema变更的逻辑。

3. EMR CDC整库同步Demo

接下的Demo操作中会选择RDS MySQL作为数据源,Flink CDC DataStream API 同步库中的所有表到Kafka,使用Spark引擎消费Kafka中binlog数据实现多表写入ODS层Hudi,使用Flink引擎以streaming read的模式做DWD和DWS层的Hudi表构建。

3.1 环境信息

代码语言:javascript复制
EMR 6.6.0 
Hudi 0.10.0 
Spark 3.2.0 
Flink 1.14.2  
Presto 0.267
MySQL 5.7.34

3.2 创建源表

在MySQL中创建test_db库及user,product,user_order三张表,插入样例数据,后续CDC先加载表中已有的数据,之后源添加新数据并修改表结构添加新字段,验证Schema变更自动同步到Hudi表。

代码语言:javascript复制
-- create databases
create database if not exists test_db default character set utf8mb4 collate utf8mb4_general_ci;
use test_db;

-- create  user table
drop table if exists user;
create table if not exists user
(
    id           int auto_increment primary key,
    name         varchar(155)                        null,
    device_model varchar(155)                        null,
    email        varchar(50)                         null,
    phone        varchar(50)                         null,
    create_time  timestamp default CURRENT_TIMESTAMP not null,
    modify_time  timestamp default CURRENT_TIMESTAMP null on update CURRENT_TIMESTAMP
)charset = utf8mb4;

-- insert data
insert into user(name,device_model,email,phone) values
('customer-01','dm-01','abc01@email.com','188776xxxxx'),
('customer-02','dm-02','abc02@email.com','166776xxxxx');

-- create product table
drop table if exists product;
create table if not exists product
(
    pid          int not null primary key,
    pname        varchar(155)                        null,
    pprice       decimal(10,2)                           ,
    create_time  timestamp default CURRENT_TIMESTAMP not null,
    modify_time  timestamp default CURRENT_TIMESTAMP null on update CURRENT_TIMESTAMP
)charset = utf8mb4;

-- insert data
insert into product(pid,pname,pprice) values
('1','prodcut-001',125.12),
('2','prodcut-002',225.31);

-- create order table
drop table if exists user_order;
create table if not exists user_order
(
    id           int auto_increment primary key,
    oid          varchar(155)                        not null,
    uid          int                                         ,
    pid          int                                         ,
    onum         int                                         ,
    create_time  timestamp default CURRENT_TIMESTAMP not null,
    modify_time  timestamp default CURRENT_TIMESTAMP null on update CURRENT_TIMESTAMP
)charset = utf8mb4;

-- insert data
insert into user_order(oid,uid,pid,onum) values 
('o10001',1,1,100),
('o10002',1,2,30),
('o10001',2,1,22),
('o10002',2,2,16);

-- select data
select * from user;
select * from product;
select * from user_order;

3.3 Flink CDC发送数据到Kafka

使用DataStream API编写CDC同步程序。样例代码Github[7]

代码语言:javascript复制
# 创建topic
kafka-topics.sh --create --zookeeper ${zk}  --replication-factor 2 --partitions 8  --topic cdc_topic
# 下载代码,编译打包
mvn clean package  -Dscope.type=provided  -DskipTests
# 也可以使用已经打好的包,进入EMR主节点,执行命令
wget https://dxs9dnjebzm6y.cloudfront.net/tmp/emr-flink-cdc-1.0-SNAPSHOT.jar
# disalbe check-leaked-classloader
sudo sed -i -e '$aclassloader.check-leaked-classloader: false' /etc/flink/conf/flink-conf.yaml
# 启动flink cdc 发送数据到Kafka
sudo flink run -m yarn-cluster 
-yjm 1024 -ytm 2048 -d 
-ys 4 -p 8 
-c  com.aws.analytics.MySQLCDC  
/home/hadoop/emr-flink-cdc-1.0-SNAPSHOT.jar 
-b xxxxx.amazonaws.com:9092 
-t cdc_topic_001 
-c s3://xxxxx/flink/checkpoint/ 
-l 30 -h xxxxx.rds.amazonaws.com:3306 -u admin 
-P admin123456 
-d test_db -T test_db.* 
-p 4 
-e 5400-5408
# 相关的参数说明如下
MySQLCDC 1.0
Usage: MySQLCDC [options]

  -c, --checkpointDir <value>
                           checkpoint dir
  -l, --checkpointInterval <value>
                           checkpoint interval: default 60 seconds
  -b, --brokerList <value>
                           kafka broker list,sep comma
  -t, --sinkTopic <value>  kafka topic
  -h, --host <value>       mysql hostname, eg. localhost:3306
  -u, --username <value>   mysql username
  -P, --pwd <value>        mysql password
  -d, --dbList <value>     cdc database list: db1,db2,..,dbn
  -T, --tbList <value>     cdc table list: db1.*,db2.*,db3.tb*...,dbn.*
  -p, --parallel <value>   cdc source parallel
  -s, --position <value>   cdc start position: initial or latest,default: initial
  -e, --serverId <value>   cdc server id
  
# 消费Kafka topic 观察数据
./kafka_2.12-2.6.2/bin/kafka-console-consumer.sh --bootstrap-server $brok --topic cdc_topic_001 --from-beginning |jq .

3.4 Spark 消费CDC数据整库同步

代码语言:javascript复制
# 整库同步样例代码  https://github.com/yhyyz/emr-hudi-example/blob/main/src/main/scala/com/aws/analytics/Debezium2Hudi.scala

# 下载代码,编译打包
mvn clean package  -Dscope.type=provided  -DskipTests
# 也可以使用已经打好的包,进入EMR主节点,执行命令
wget https://dxs9dnjebzm6y.cloudfront.net/tmp/emr-hudi-example-1.0-SNAPSHOT-jar-with-dependencies.jar 

# 执行如下命令提交作业,命令中设定-s hms,hudi表同步到Glue Catalog
spark-submit  --master yarn 
--deploy-mode client 
--driver-memory 1g 
--executor-memory 1g 
--executor-cores 2 
--num-executors  2 
--conf "spark.dynamicAllocation.enabled=false" 
--conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" 
--conf "spark.sql.hive.convertMetastoreParquet=false" 
--jars  /usr/lib/hudi/hudi-spark-bundle.jar,/usr/lib/spark/external/lib/spark-avro.jar 
--class com.aws.analytics.Debezium2Hudi /home/hadoop/emr-hudi-example-1.0-SNAPSHOT-jar-with-dependencies.jar 
-e prod -b xxxxx.amazonaws.com:9092 
-t cdc_topic_001 -p emr-cdc-group-02 -s true 
-o earliest 
-i 60 -y cow -p 10 
-c s3://xxxxx/spark-checkpoint/emr-hudi-cdc-005/ 
-g s3://xxxxx/emr-hudi-cdc-005/ 
-r jdbc:hive2://localhost:10000  
-n hadoop -w upsert  
-s hms 
--concurrent false 
-m "{"tableInfo":[{"database":"test_db","table":"user","recordKey":"id","precombineKey":"modify_time","partitionTimeColumn":"create_time","hudiPartitionField":"year_month"},
{"database":"test_db","table":"user_order","recordKey":"id","precombineKey":"modify_time","partitionTimeColumn":"create_time","hudiPartitionField":"year_month"},{"database":"test_db","table":"product","recordKey":"pid","precombineKey":"modify_time","partitionTimeColumn":"create_time","hudiPartitionField":"year_month"}]}"

# 相关参数说明如下:
Debezium2Hudi 1.0
Usage: spark ss Debezium2Hudi [options]

  -e, --env <value>        env: dev or prod
  -b, --brokerList <value>
                           kafka broker list,sep comma
  -t, --sourceTopic <value>
                           kafka topic
  -p, --consumeGroup <value>
                           kafka consumer group
  -s, --syncHive <value>   whether sync hive,default:false
  -o, --startPos <value>   kafka start pos latest or earliest,default latest
  -m, --tableInfoJson <value>
                           table info json str
  -i, --trigger <value>    default 300 second,streaming trigger interval
  -c, --checkpointDir <value>
                           hdfs dir which used to save checkpoint
  -g, --hudiEventBasePath <value>
                           hudi event table hdfs base path
  -y, --tableType <value>  hudi table type MOR or COW. default COW
  -t, --morCompact <value>
                           mor inline compact,default:true
  -m, --inlineMax <value>  inline max compact,default:20
  -r, --syncJDBCUrl <value>
                           hive server2 jdbc, eg. jdbc:hive2://localhost:10000
  -n, --syncJDBCUsername <value>
                           hive server2 jdbc username, default: hive
  -p, --partitionNum <value>
                           repartition num,default 16
  -w, --hudiWriteOperation <value>
                           hudi write operation,default insert
  -u, --concurrent <value>
                           write multiple hudi table concurrent,default false
  -s, --syncMode <value>   sync mode,default jdbc, glue catalog set dms
  -z, --syncMetastore <value>
                           hive metastore uri,default thrift://localhost:9083
                           
# 下图可以看到表已经同步到Glue Catalog ,数据已经写入到S3
代码语言:javascript复制
-- 向MySQL的user表中添加一列,并插入一条新数据, 查询hudi表,可以看到新列和数据已经自动同步到user表,注意以下SQL在MySQL端执行
alter table user add column age int
insert into user(name,device_model,email,phone,age) values
('customer-03','dm-03','abc03@email.com','199776xxxxx',18);

3.5 Flink Streaming Read 实时聚合

代码语言:javascript复制
# 注意最后一个参数,-t 是把/etc/hive/conf/hive-site.xml 加入到classpath,这样hudi执行表同步到Glue是就可以加入加载到这个配置,配置中的关键是 hive.metastore.client.factory.class = com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory,这样就可以加载用到Glue的Catalog实现. 如果EMR集群启动时就选择了Glue Metastore,该文件中/etc/hive/conf/hive-site.xml 已经配置了AWSGlueDataCatalogHiveClientFactory. 如果启动EMR没有选择Glue Metastore,还需要同步数据到Glue,需要手动加上。

# 注意替换为你的S3 Bucket
checkpoints=s3://xxxxx/flink/checkpoints/datagen/

flink-yarn-session -jm 1024 -tm 4096 -s 2  
-D state.backend=rocksdb 
-D state.checkpoint-storage=filesystem 
-D state.checkpoints.dir=${checkpoints} 
-D execution.checkpointing.interval=5000 
-D state.checkpoints.num-retained=5 
-D execution.checkpointing.mode=EXACTLY_ONCE 
-D execution.checkpointing.externalized-checkpoint-retention=RETAIN_ON_CANCELLATION 
-D state.backend.incremental=true 
-D execution.checkpointing.max-concurrent-checkpoints=1 
-D rest.flamegraph.enabled=true 
-d 
-t /etc/hive/conf/hive-site.xml 

# 启动Flink sql client
/usr/lib/flink/bin/sql-client.sh embedded -j /usr/lib/hudi/hudi-flink-bundle.jar shell
-- user表,开启streaming read, changelog.enalbe=true
set sql-client.execution.result-mode=tableau;

CREATE TABLE `user`(
    id string,
    name STRING,
    device_model STRING,
    email STRING,
    phone STRING,
    age string,
    create_time STRING,
    modify_time STRING,
    year_month STRING
)
PARTITIONED BY (`year_month`)
WITH (
  'connector' = 'hudi',
  'path' = 's3://xxxxx/emr-hudi-cdc-005/test_db/user/',
  'hoodie.datasource.write.recordkey.field' = 'id',
  'table.type' = 'COPY_ON_WRITE',
  'index.bootstrap.enabled' = 'true',
  'read.streaming.enabled' = 'true',
  'read.start-commit' = '20220607014223',
  'changelog.enabled' = 'false',
  'read.streaming.check-interval' = '1'
);

# 实时查询数据
select * from `user`;

# 在MySQL中修改user表中id=3的name为new-customer-03,注意以下SQL在MySQL端执行
update  user set name="new-customer-03" where id=3;

# 在Flink 端可以可以看到数据变更
代码语言:javascript复制
-- Flink聚合操作Sink到Hudi表

-- batch
CREATE TABLE  user_agg(
num BIGINT,
device_model STRING
)WITH(
  'connector' = 'hudi',
  'path' = 's3://xxxxx/emr-cdc-hudi/user_agg/',
  'table.type' = 'COPY_ON_WRITE',  
  'write.precombine.field' = 'device_model',
  'write.operation' = 'upsert',
  'hoodie.datasource.write.recordkey.field' = 'device_model',
  'hive_sync.database' = 'dws',
  'hive_sync.enable' = 'true',
  'hive_sync.table' = 'user_agg',
  'hive_sync.mode' = 'HMS',
  'hive_sync.use_jdbc' = 'false',
  'hive_sync.username' = 'hadoop'
);

4. 总结

本篇文章讲解了如何通过EMR实现CDC数据入湖及Schema的自动变更。通过Flink CDC DataStream API先将整库数据发送到MSK,这时CDC在源端只有一个binlog dump线程,降低对源端的压力。使用Spark Structured Streaming 动态解析数据写入到Hudi表来实现Shema的自动变更,实现单个Job管理多表Sink, 多表情况下降低开发维护成本,可以并行或者串行写多张Hudi表,元数据同步Glue Catalog。使用Flink Hudi的Streaming Read 模式实现实时数据ETL,满足DWD和DWS层的实时Join和聚合的需求。Amazon EMR环境中原生集成Hudi, 使用Amazon EMR轻松构建了整库同步的Demo。

引用链接

[1] flink-cdc-connectors: https://github.com/ververica/flink-cdc-connectors [2] debezium: https://debezium.io/documentation/ [3] Github: https://github.com/yhyyz/emr-hudi-example/blob/main/src/main/scala/com/aws/analytics/Canal2Hudi.scala [4] Hudi源码: https://github.com/apache/hudi/blob/master/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java [5] 参见官网: https://hudi.apache.org/docs/querying_data#merge-on-read-tables-1 [6] Schema Evolution: https://hudi.apache.org/docs/schema_evolution [7] 样例代码Github: https://github.com/yhyyz/emr-flink-cdc/blob/main/src/main/scala/com/aws/analytics/MySQLCDC.scala

0 人点赞