湖仓一体电商项目(二十四):合并Iceberg小文件

2022-10-27 04:25:17 浏览数 (2)

​合并Iceberg小文件

Iceberg表每次commit都会生成一个parquet数据文件,有可能一张Iceberg表对应的数据文件非常多,那么我们通过Java Api 方式对Iceberg表可以进行数据文件合并,数据文件合并之后,会生成新的Snapshot且原有Snap快照数据并不会被删除,如果要删除对应的数据文件需要通过“Expire Snapshots来实现”。

​编辑我们可以通过Java Api 删除历史快照Snap-*.avro,可以通过指定时间戳,当前时间戳之前的所有快照都会被删除,如果指定时间比最后一个快照时间还大,会保留最新快照数据。

在删除快照时,数据data目录中过期的数据parquet文件也会被删除(例如:快照回滚后不再需要的文件),到底哪些parquet文件数据被删除决定于最后的“snap-xx.avro”中对应的manifest list数据对应的parquet数据。随着不断删除snapshot,在Iceberg表不再有manifest文件对应的parquet文件也会被删除。

每次Commit生成对应的Snapshot之外,还会有一份元数据文件“vX-metadata.json”文件产生,我们可以在创建Iceberg表时执行对应的属性决定Iceberg表保留几个元数据文件,属性如下:

Property

Description

write.metadata.delete-after-commit.enabled

每次表提交后是否删除旧的元数据文件

write.metadata.previous-versions-max

要保留旧的元数据文件数量

例如,在Spark中创建表 test ,指定以上两个属性,建表语句如下:

代码语言:javascript复制
CREATE TABLE ${CataLog名称}.${库名}.${表名} (
  id bigint, 
  name string
) using iceberg
PARTITIONED BY (
  loc string
) TBLPROPERTIES (
    'write.metadata.delete-after-commit.enabled'= true,
	'write.metadata.previous-versions-max' = 3
)

此项目中我们可以定期执行如下代码来删除Iceberg中过多的快照文件和数据文件,代码如下:

代码语言:javascript复制
object CombinSnapAndRemoveOldSnap {
  def main(args: Array[String]): Unit = {

    val conf = new Configuration()
    val catalog = new HadoopCatalog(conf,"hdfs://mycluster/lakehousedata")

    /**
      * 1.准备Iceberg表
      */
    val table1: Table = catalog.loadTable(TableIdentifier.of("icebergdb","ODS_BROWSELOG"))
    val table2: Table = catalog.loadTable(TableIdentifier.of("icebergdb","ODS_MEMBER_ADDRESS"))
    val table3: Table = catalog.loadTable(TableIdentifier.of("icebergdb","ODS_MEMBER_INFO"))
    val table4: Table = catalog.loadTable(TableIdentifier.of("icebergdb","ODS_PRODUCT_CATEGORY"))
    val table5: Table = catalog.loadTable(TableIdentifier.of("icebergdb","ODS_PRODUCT_INFO"))
    val table6: Table = catalog.loadTable(TableIdentifier.of("icebergdb","ODS_USER_LOGIN"))
    val table7: Table = catalog.loadTable(TableIdentifier.of("icebergdb","DWD_BROWSELOG"))
    val table8: Table = catalog.loadTable(TableIdentifier.of("icebergdb","DWD_USER_LOGIN"))
    val table9: Table = catalog.loadTable(TableIdentifier.of("icebergdb","DWS_BROWSE_INFO"))
    val table10: Table = catalog.loadTable(TableIdentifier.of("icebergdb","DWS_USER_LOGIN"))


    /**
      * 2.合并小文件数据,Iceberg合并小文件时并不会删除被合并的文件,Compact是将小文件合并成大文件并创建新的Snapshot。
      * 如果要删除文件需要通过Expire Snapshots来实现,targetSizeInBytes 指定合并后的每个文件大小
      */
    Actions.forTable(table1).rewriteDataFiles().execute()    
    Actions.forTable(table2).rewriteDataFiles().execute()  
    Actions.forTable(table3).rewriteDataFiles().execute()  
    Actions.forTable(table4).rewriteDataFiles().execute()  
    Actions.forTable(table5).rewriteDataFiles().execute()      
    Actions.forTable(table6).rewriteDataFiles().execute()      
    Actions.forTable(table7).rewriteDataFiles().execute()            
    Actions.forTable(table8).rewriteDataFiles().execute()  
    Actions.forTable(table9).rewriteDataFiles().execute()  
    Actions.forTable(table10).rewriteDataFiles().execute()  

    /**
      * 3.删除历史快照,历史快照是通过ExpireSnapshot来实现的,设置需要删除多久的历史快照 snap-*.avro文件
      */
    table1.expireSnapshots().expireOlderThan(System.currentTimeMillis()).commit()
    table2.expireSnapshots().expireOlderThan(System.currentTimeMillis()).commit()
    table3.expireSnapshots().expireOlderThan(System.currentTimeMillis()).commit()
    table4.expireSnapshots().expireOlderThan(System.currentTimeMillis()).commit()
    table5.expireSnapshots().expireOlderThan(System.currentTimeMillis()).commit()
    table6.expireSnapshots().expireOlderThan(System.currentTimeMillis()).commit()
    table7.expireSnapshots().expireOlderThan(System.currentTimeMillis()).commit()
    table8.expireSnapshots().expireOlderThan(System.currentTimeMillis()).commit()
    table9.expireSnapshots().expireOlderThan(System.currentTimeMillis()).commit()
    table10.expireSnapshots().expireOlderThan(System.currentTimeMillis()).commit()

  }

}

0 人点赞