Apache Hudi 使用文件聚类功能 (Clustering) 解决小文件过多的问题

2022-11-11 19:16:38 浏览数 (1)

全网最全大数据面试提升手册!

Hudi测试:批处理后文件据类再接流

本文详细阐述了在 “批处理后,流处理之前” 进行文件 Clustering 操作的方法。该方法可以将众多小文件合并成数量极少的大文件,从而防止过多小文件的产生。

在批处理结束后进行 Clustering 主要涉及如下几个步骤,它们主要都是通过 spark-submit 命令完成的:

  • 制定 Clustering 计划,找到
批处理数据结束

首先用 bulk_insert 方式运行批处理任务。注意下面的操作都是在批处理任务完成后,接流之前进行。

查看表相关的 hdfs,可以发现由于使用了 bulk_insert 的方式写入数据,导致文件数量非常多,而每个文件的 Size 非常小。我们希望将每个分区的1000多个小文件聚合成几个大文件,以免造成不必要的查询和系统维护开销。

代码语言:javascript复制
[hadoop@p0-tklfrna-tklrna-device02 hudi_clustering]$ hdfs dfs -count /flk_hudi/chdrpf_hudi_test03/*
           7            7           32637997 /flk_hudi/chdrpf_hudi_test03/.hoodie
           1         1067          571117942 /flk_hudi/chdrpf_hudi_test03/1
           1         1071          716513820 /flk_hudi/chdrpf_hudi_test03/2
           1         1072          644997032 /flk_hudi/chdrpf_hudi_test03/3
           1         1072          507397985 /flk_hudi/chdrpf_hudi_test03/4
           1         1069          730774472 /flk_hudi/chdrpf_hudi_test03/5
           1         1067          586561261 /flk_hudi/chdrpf_hudi_test03/6
           1         1063          557377359 /flk_hudi/chdrpf_hudi_test03/7
           1         1070          483416155 /flk_hudi/chdrpf_hudi_test03/8
           1         1071          587965407 /flk_hudi/chdrpf_hudi_test03/A
           1         1071          570651877 /flk_hudi/chdrpf_hudi_test03/B
           1         1068          796163049 /flk_hudi/chdrpf_hudi_test03/C
           1         1064          732633320 /flk_hudi/chdrpf_hudi_test03/D
           1         1067          524777141 /flk_hudi/chdrpf_hudi_test03/E
           1         1070          550302848 /flk_hudi/chdrpf_hudi_test03/F
           1         1076          540059544 /flk_hudi/chdrpf_hudi_test03/G
           1         1071          590094172 /flk_hudi/chdrpf_hudi_test03/H
           1         1076          505755100 /flk_hudi/chdrpf_hudi_test03/I
           1         1068          606771875 /flk_hudi/chdrpf_hudi_test03/J
           1         1068          495261290 /flk_hudi/chdrpf_hudi_test03/K
           1         1067          516964732 /flk_hudi/chdrpf_hudi_test03/L
           1         1060          482056347 /flk_hudi/chdrpf_hudi_test03/M
           1         1054          607625266 /flk_hudi/chdrpf_hudi_test03/N
           1         1077          551989638 /flk_hudi/chdrpf_hudi_test03/O
           1         1076          590537140 /flk_hudi/chdrpf_hudi_test03/P
           1         1069          536362956 /flk_hudi/chdrpf_hudi_test03/Q
           1         1072          559723804 /flk_hudi/chdrpf_hudi_test03/R
           1         1067          546042696 /flk_hudi/chdrpf_hudi_test03/S
           1         1059          528438508 /flk_hudi/chdrpf_hudi_test03/T
           1         1063          518288413 /flk_hudi/chdrpf_hudi_test03/U
           1         1070          543146873 /flk_hudi/chdrpf_hudi_test03/V
           1         1066          532588113 /flk_hudi/chdrpf_hudi_test03/W
           1         1069          494606809 /flk_hudi/chdrpf_hudi_test03/X
           1         1079          527128056 /flk_hudi/chdrpf_hudi_test03/Y
           1         1068          477378497 /flk_hudi/chdrpf_hudi_test03/Z
           1         1075          471848267 /flk_hudi/chdrpf_hudi_test03/a

查看当前 hdfs 路径下的文件个数。可以发现由于 bulk_insert 导致小文件非常之多,这会显著影响查询的性能 (一次查询可能要做几千个 IO 操作)。

代码语言:javascript复制
[hadoop@p0-tklfrna-tklrna-device02 hudi_clustering]$ hdfs dfs -count /flk_hudi/chdrpf_hudi_test03/
          43        37452        22269590565 /flk_hudi/chdrpf_hudi_test03
Clustering
配置清理策略

使用最简配置方法如下

代码语言:javascript复制
[hadoop@p0-tklfrna-tklrna-device02 hudi_clustering]$ cat /home/hadoop/hudi_clustering/clusteringjob.properties
hoodie.clustering.inline.max.commits=2
hoodie.clustering.plan.strategy.max.num.groups=40

添加高级配置项

代码语言:javascript复制
[hadoop@p0-tklfrna-tklrna-device02 ~]$ cat /home/hadoop/hudi_clustering/clusteringjob.properties
hoodie.clustering.inline=true
hoodie.clustering.inline.max.commits=2
hoodie.clustering.plan.strategy.max.num.groups=40
hoodie.clustering.plan.strategy.target.file.max.bytes=1073741824
hoodie.clustering.plan.strategy.max.bytes.per.group=2147483648
hoodie.clustering.plan.strategy.small.file.limit=629145600
Schedule

指定 Clustering 计划。计划制定完毕后 Hudi 对应 hdfs 的 Timeline 中会出现相应时间戳,以供执行计划。

代码语言:javascript复制
spark-submit 
--master yarn 
--class org.apache.hudi.utilities.HoodieClusteringJob 
hdfs://nameservice1/utility_jars/hudi-utilities-bundle_2.12-0.10.0.jar 
--schedule 
--base-path hdfs://nameservice1/flk_hudi/chdrpf_hudi_test03 
--table-name chdrpf_hudi_test03 
--props file:///home/hadoop/hudi_clustering/clusteringjob.properties 
--spark-memory 16g 
> /home/hadoop/hudi_clustering/clusteringjob.log 2>&1

查看 Hdfs 中的 Hudi 的 Timeline 获取时间戳。文件后缀为 replacecommit.requested 的时间戳即为我们需要的时间戳。复制我们需要的 20220826105913373,以便下一步粘贴。

代码语言:javascript复制
[hadoop@p0-tklfrna-tklrna-device02 ~]$ hdfs dfs -ls /flk_hudi/chdrpf_hudi_test03/.hoodie/
Found 407 items
drwxr-xr-x   - hadoop supergroup          0 2022-08-26 10:10 /flk_hudi/chdrpf_hudi_test03/.hoodie/.aux
drwxr-xr-x   - hadoop supergroup          0 2022-08-26 14:53 /flk_hudi/chdrpf_hudi_test03/.hoodie/.temp
-rw-r--r--   3 hadoop supergroup   18596070 2022-08-26 10:14 /flk_hudi/chdrpf_hudi_test03/.hoodie/20220826101036547.commit
-rw-r--r--   3 hadoop supergroup          0 2022-08-26 10:10 /flk_hudi/chdrpf_hudi_test03/.hoodie/20220826101036547.commit.requested
-rw-r--r--   3 hadoop supergroup          0 2022-08-26 10:10 /flk_hudi/chdrpf_hudi_test03/.hoodie/20220826101036547.inflight
-rw-r--r--   3 hadoop supergroup   14041389 2022-08-26 10:16 /flk_hudi/chdrpf_hudi_test03/.hoodie/20220826101404432.commit
-rw-r--r--   3 hadoop supergroup          0 2022-08-26 10:14 /flk_hudi/chdrpf_hudi_test03/.hoodie/20220826101404432.commit.requested
-rw-r--r--   3 hadoop supergroup          0 2022-08-26 10:14 /flk_hudi/chdrpf_hudi_test03/.hoodie/20220826101404432.inflight
...
-rw-r--r--   3 hadoop supergroup    5685565 2022-08-26 10:59 /flk_hudi/chdrpf_hudi_test03/.hoodie/20220826105913373.replacecommit.requested
...
Execute

Clustering 执行需要使用刚才的时间戳配置 --instant-time 20220826105913373 于命令中即可执行。

代码语言:javascript复制
spark-submit 
--master yarn 
--class org.apache.hudi.utilities.HoodieClusteringJob 
hdfs://nameservice1/utility_jars/hudi-utilities-bundle_2.12-0.10.0.jar 
--instant-time 20220826105913373 
--base-path hdfs://nameservice1/flk_hudi/chdrpf_hudi_test03 
--table-name chdrpf_hudi_test03 
--props file:///home/hadoop/hudi_clustering/clusteringjob.properties 
--spark-memory 16g 
> /home/hadoop/hudi_clustering/clusteringjob_execution.log 2>&1
文件聚类完毕后
代码语言:javascript复制
[hadoop@p0-tklfrna-tklrna-device02 hudi_clustering]$ hdfs dfs -count /flk_hudi/chdrpf_hudi_test03/*
           7           10           39759457 /flk_hudi/chdrpf_hudi_test03/.hoodie
           1         1068          644693330 /flk_hudi/chdrpf_hudi_test03/1
           1         1072          912384991 /flk_hudi/chdrpf_hudi_test03/2
           1         1073          783040567 /flk_hudi/chdrpf_hudi_test03/3
           1         1073          535431665 /flk_hudi/chdrpf_hudi_test03/4
           1         1070          938545286 /flk_hudi/chdrpf_hudi_test03/5
           1         1068          676230669 /flk_hudi/chdrpf_hudi_test03/6
           1         1064          625387487 /flk_hudi/chdrpf_hudi_test03/7
           1         1071          494572949 /flk_hudi/chdrpf_hudi_test03/8
           1         1072          675599389 /flk_hudi/chdrpf_hudi_test03/A
           1         1072          643710911 /flk_hudi/chdrpf_hudi_test03/B
           1         1069         1056860522 /flk_hudi/chdrpf_hudi_test03/C
           1         1065          940690081 /flk_hudi/chdrpf_hudi_test03/D
           1         1068          563929957 /flk_hudi/chdrpf_hudi_test03/E
           1         1071          606406555 /flk_hudi/chdrpf_hudi_test03/F
           1         1077          589463777 /flk_hudi/chdrpf_hudi_test03/G
           1         1072          682564783 /flk_hudi/chdrpf_hudi_test03/H
           1         1077          529816271 /flk_hudi/chdrpf_hudi_test03/I
           1         1069          712917512 /flk_hudi/chdrpf_hudi_test03/J
           1         1069          514668751 /flk_hudi/chdrpf_hudi_test03/K
           1         1068          550874973 /flk_hudi/chdrpf_hudi_test03/L
           1         1061          495250431 /flk_hudi/chdrpf_hudi_test03/M
           1         1055          716887761 /flk_hudi/chdrpf_hudi_test03/N
           1         1078          612144859 /flk_hudi/chdrpf_hudi_test03/O
           1         1077          679350316 /flk_hudi/chdrpf_hudi_test03/P
           1         1070          586176818 /flk_hudi/chdrpf_hudi_test03/Q
           1         1073          625760986 /flk_hudi/chdrpf_hudi_test03/R
           1         1068          603042997 /flk_hudi/chdrpf_hudi_test03/S
           1         1060          576062292 /flk_hudi/chdrpf_hudi_test03/T
           1         1064          555764103 /flk_hudi/chdrpf_hudi_test03/U
           1         1071          598050377 /flk_hudi/chdrpf_hudi_test03/V
           1         1066          532588113 /flk_hudi/chdrpf_hudi_test03/W
           1         1069          494606809 /flk_hudi/chdrpf_hudi_test03/X
           1         1079          527128056 /flk_hudi/chdrpf_hudi_test03/Y
           1         1068          477378497 /flk_hudi/chdrpf_hudi_test03/Z
           1         1075          471848267 /flk_hudi/chdrpf_hudi_test03/a
运行清理

在进行完 Clustering 操作后,很多小文件都被合并进大文件了。由于 Hudi 不会主动删除过期和不必要的文件,因此需要利用手动清理策略来对过期文件进行清理删除。

清理策略的配置文件

代码语言:javascript复制
[hadoop@p0-tklfrna-tklrna-device02 hudi_clustering]$ cat /home/hadoop/hudi_clustering/hudi_cleaning.properties 
# hudi_cleaning.properties

# When enabled, the cleaner table service is invoked immediately after each commit, to delete older file slices
hoodie.clean.automatic=true

# Only applies when hoodie.clean.automatic is turned on. 
# When turned on runs cleaner async with writing, which can speed up overall write performance.
hoodie.clean.async=true

# # This policy has the effect of keeping N number of file versions irrespective of time. 
# # This policy is useful when it is known how many MAX versions of the file does one want to keep at any given time.
# # hoodie.cleaner.policy=KEEP_LATEST_COMMITS
hoodie.cleaner.policy=KEEP_LATEST_COMMITS

# # Number of commits to retain, without cleaning.
# # This will be retained for num_of_commits * time_between_commits (scheduled).
# # hoodie.cleaner.commits.retained=3
# When KEEP_LATEST_FILE_VERSIONS cleaning policy is used, 
# the minimum number of file slices to retain in each file group, during cleaning.
hoodie.cleaner.commits.retained=1

# When set to true, cleaner also deletes the bootstrap base file when it's skeleton base file is cleaned.
hoodie.cleaner.delete.bootstrap.base.file=false
# Only if the log file size is greater than the threshold in bytes, the file group will be compacted.

hoodie.commits.archival.batch=60

hoodie.archive.merge.small.file.limit.bytes=104857600
# When set to true, compaction service is triggered after each write. 
# While being simpler operationally, this adds extra latency on the write path.
hoodie.compact.inline=false

hoodie.parquet.small.file.limit=124857600

hoodie.cleaner.parallelism=800

hoodie.cleaner.incremental.mode=true

# Archiving service moves older entries from timeline into an archived log after each write, 
# to keep the metadata overhead constant, even as the table size grows
hoodie.keep.max.commits=3
hoodie.keep.min.commits=2

利用命令执行清理策略

代码语言:javascript复制
spark-submit 
--class org.apache.hudi.utilities.HoodieCleaner 
hdfs://nameservice1/utility_jars/hudi-utilities-bundle_2.12-0.10.0.jar 
--props file:///home/hadoop/hudi_clustering/hudi_cleaning.properties 
--target-base-path hdfs://nameservice1/flk_hudi/chdrpf_hudi_test03 
> /home/hadoop/hudi_clustering/clusteringjob_cleaning.log 2>&1
接流处理任务

此时,可以将流处理任务接至该 Hudi 表中。文件清理的效果会在 Hudi 接流后显现。

清理后文件个数
代码语言:javascript复制
[hadoop@p0-tklfrna-tklrna-device02 hudi_clustering]$ hdfs dfs -count /flk_hudi/chdrpf_hudi_test03/*          39         2818           61047630 /flk_hudi/chdrpf_hudi_test03/.hoodie
           1            5          295730057 /flk_hudi/chdrpf_hudi_test03/1
           1            5          581449403 /flk_hudi/chdrpf_hudi_test03/2
           1            5          541564433 /flk_hudi/chdrpf_hudi_test03/3
           1            5          113526185 /flk_hudi/chdrpf_hudi_test03/4
           1            5          819123981 /flk_hudi/chdrpf_hudi_test03/5
           1            5          361258893 /flk_hudi/chdrpf_hudi_test03/6
           1            4          205559110 /flk_hudi/chdrpf_hudi_test03/7
           1            4           33721101 /flk_hudi/chdrpf_hudi_test03/8
           1            5          352884732 /flk_hudi/chdrpf_hudi_test03/A
           1            5          294248033 /flk_hudi/chdrpf_hudi_test03/B
           1            5          771533591 /flk_hudi/chdrpf_hudi_test03/C
           1            5          614827884 /flk_hudi/chdrpf_hudi_test03/D
           1            5          157676833 /flk_hudi/chdrpf_hudi_test03/E
           1            5          226004511 /flk_hudi/chdrpf_hudi_test03/F
           1            5          198656601 /flk_hudi/chdrpf_hudi_test03/G
           1            5          372307018 /flk_hudi/chdrpf_hudi_test03/H
           1            5           97041611 /flk_hudi/chdrpf_hudi_test03/I
           1            5          427390894 /flk_hudi/chdrpf_hudi_test03/J
           1            5           78296341 /flk_hudi/chdrpf_hudi_test03/K
           1            5          136428423 /flk_hudi/chdrpf_hudi_test03/L
           1            5           53218521 /flk_hudi/chdrpf_hudi_test03/M
           1            5          439899957 /flk_hudi/chdrpf_hudi_test03/N
           1            5          242278011 /flk_hudi/chdrpf_hudi_test03/O
           1            5          357549763 /flk_hudi/chdrpf_hudi_test03/P
           1            5          200702230 /flk_hudi/chdrpf_hudi_test03/Q
           1            5          265952714 /flk_hudi/chdrpf_hudi_test03/R
           1            5          229783530 /flk_hudi/chdrpf_hudi_test03/S
           1            5          191817537 /flk_hudi/chdrpf_hudi_test03/T
           1            5          151138760 /flk_hudi/chdrpf_hudi_test03/U
           1            5          221236895 /flk_hudi/chdrpf_hudi_test03/V
           1         4112         2060894265 /flk_hudi/chdrpf_hudi_test03/W
           1         4117         1910706738 /flk_hudi/chdrpf_hudi_test03/X
           1         4169         2042792364 /flk_hudi/chdrpf_hudi_test03/Y
           1         2221          995253322 /flk_hudi/chdrpf_hudi_test03/Z
           1         1075          472877437 /flk_hudi/chdrpf_hudi_test03/a

可以看到每个分区内的小文件已经被聚合成大文件,并随着流数据的进入,文件数量的增长速度也在合理范围内。

Ps: 我们把后几个分区作为对照组没有进行文件聚合。可以通过在 Clustering 的配置文件中调大 hoodie.clustering.plan.strategy.max.num.groups=30 的值来增加 SparkJob 的 parallelism 从而把所有分区涵盖进行,进行文件聚合。

代码语言:javascript复制
[hadoop@p0-tklfrna-tklrna-device02 hudi_clustering]$ hdfs dfs -count /flk_hudi/chdrpf_hudi_test03/
          76        19050        17396389394 /flk_hudi/chdrpf_hudi_test03
Timeline 观察

20220826105913373.replacecommit表示进行完毕聚类操作的时刻

20220826114108591.clean表示进行完毕清理操作的时刻

20220826114317026.commit表示进行完毕新数据写入操作的时刻

代码语言:javascript复制
[hadoop@p0-tklfrna-tklrna-device02 hudi_clustering]$ hdfs dfs -ls /flk_hudi/chdrpf_hudi_test03/.hoodie
Found 30 items
drwxr-xr-x   - hadoop supergroup          0 2022-08-26 10:10 /flk_hudi/chdrpf_hudi_test03/.hoodie/.aux
drwxr-xr-x   - hadoop supergroup          0 2022-08-26 11:46 /flk_hudi/chdrpf_hudi_test03/.hoodie/.temp
-rw-r--r--   3 hadoop supergroup   18596070 2022-08-26 10:14 /flk_hudi/chdrpf_hudi_test03/.hoodie/20220826101036547.commit
-rw-r--r--   3 hadoop supergroup          0 2022-08-26 10:10 /flk_hudi/chdrpf_hudi_test03/.hoodie/20220826101036547.commit.requested
-rw-r--r--   3 hadoop supergroup          0 2022-08-26 10:10 /flk_hudi/chdrpf_hudi_test03/.hoodie/20220826101036547.inflight
-rw-r--r--   3 hadoop supergroup   14041389 2022-08-26 10:16 /flk_hudi/chdrpf_hudi_test03/.hoodie/20220826101404432.commit
-rw-r--r--   3 hadoop supergroup          0 2022-08-26 10:14 /flk_hudi/chdrpf_hudi_test03/.hoodie/20220826101404432.commit.requested
-rw-r--r--   3 hadoop supergroup          0 2022-08-26 10:14 /flk_hudi/chdrpf_hudi_test03/.hoodie/20220826101404432.inflight
-rw-r--r--   3 hadoop supergroup    1435895 2022-08-26 11:09 /flk_hudi/chdrpf_hudi_test03/.hoodie/20220826105913373.replacecommit
-rw-r--r--   3 hadoop supergroup          0 2022-08-26 11:03 /flk_hudi/chdrpf_hudi_test03/.hoodie/20220826105913373.replacecommit.inflight
-rw-r--r--   3 hadoop supergroup    5685565 2022-08-26 10:59 /flk_hudi/chdrpf_hudi_test03/.hoodie/20220826105913373.replacecommit.requested
-rw-r--r--   3 hadoop supergroup    1009885 2022-08-26 11:37 /flk_hudi/chdrpf_hudi_test03/.hoodie/20220826113342082.commit
-rw-r--r--   3 hadoop supergroup          0 2022-08-26 11:33 /flk_hudi/chdrpf_hudi_test03/.hoodie/20220826113342082.commit.requested
-rw-r--r--   3 hadoop supergroup          0 2022-08-26 11:33 /flk_hudi/chdrpf_hudi_test03/.hoodie/20220826113342082.inflight
-rw-r--r--   3 hadoop supergroup    3811303 2022-08-26 11:40 /flk_hudi/chdrpf_hudi_test03/.hoodie/20220826113740364.commit
-rw-r--r--   3 hadoop supergroup          0 2022-08-26 11:37 /flk_hudi/chdrpf_hudi_test03/.hoodie/20220826113740364.commit.requested
-rw-r--r--   3 hadoop supergroup          0 2022-08-26 11:37 /flk_hudi/chdrpf_hudi_test03/.hoodie/20220826113740364.inflight
-rw-r--r--   3 hadoop supergroup    2940587 2022-08-26 11:43 /flk_hudi/chdrpf_hudi_test03/.hoodie/20220826114026452.commit
-rw-r--r--   3 hadoop supergroup          0 2022-08-26 11:40 /flk_hudi/chdrpf_hudi_test03/.hoodie/20220826114026452.commit.requested
-rw-r--r--   3 hadoop supergroup          0 2022-08-26 11:40 /flk_hudi/chdrpf_hudi_test03/.hoodie/20220826114026452.inflight
-rw-r--r--   3 hadoop supergroup    5005100 2022-08-26 11:41 /flk_hudi/chdrpf_hudi_test03/.hoodie/20220826114108591.clean
-rw-r--r--   3 hadoop supergroup    4260649 2022-08-26 11:41 /flk_hudi/chdrpf_hudi_test03/.hoodie/20220826114108591.clean.inflight
-rw-r--r--   3 hadoop supergroup    4260649 2022-08-26 11:41 /flk_hudi/chdrpf_hudi_test03/.hoodie/20220826114108591.clean.requested
-rw-r--r--   3 hadoop supergroup    2867542 2022-08-26 11:46 /flk_hudi/chdrpf_hudi_test03/.hoodie/20220826114317026.commit
-rw-r--r--   3 hadoop supergroup          0 2022-08-26 11:43 /flk_hudi/chdrpf_hudi_test03/.hoodie/20220826114317026.commit.requested

如果这个文章对你有帮助,不要忘记 「在看」 「点赞」 「收藏」 三连啊喂!

2022年全网首发|大数据专家级技能模型与学习指南(胜天半子篇)

互联网最坏的时代可能真的来了

我在B站读大学,大数据专业

我们在学习Flink的时候,到底在学习什么?

193篇文章暴揍Flink,这个合集你需要关注一下

Flink生产环境TOP难题与优化,阿里巴巴藏经阁YYDS

Flink CDC我吃定了耶稣也留不住他!| Flink CDC线上问题小盘点

我们在学习Spark的时候,到底在学习什么?

在所有Spark模块中,我愿称SparkSQL为最强!

硬刚Hive | 4万字基础调优面试小总结

数据治理方法论和实践小百科全书

标签体系下的用户画像建设小指南

4万字长文 | ClickHouse基础&实践&调优全视角解析

【面试&个人成长】2021年过半,社招和校招的经验之谈

大数据方向另一个十年开启 |《硬刚系列》第一版完结

我写过的关于成长/面试/职场进阶的文章

当我们在学习Hive的时候在学习什么?「硬刚Hive续集」

0 人点赞