0871-6.3.2-如何基于CDH6环境编译Hudi-0.9.0并使用

2022-03-21 13:11:40 浏览数 (2)

1.文档编写目的

Apache Hudi是一个Data Lakes的开源方案,是Hadoop Updates and Incrementals的简写,它是由Uber开发并开源的Data Lakes解决方案。Hudi 是一个丰富的平台,用于构建具有增量数据管道的流式数据湖,具有如下基本特性/能力:

  • Hudi能够摄入(Ingest)和管理(Manage)基于HDFS之上的大型分析数据集,主要目的是高效的减少入库延时。
  • Hudi基于Spark来对HDFS上的数据进行更新、插入、删除等。
  • Hudi在HDFS数据集上提供如下流原语:插入更新(如何改变数据集);增量拉取(如何获取变更的数据)。
  • Hudi可以对HDFS上的parquet格式数据进行插入/更新操作。
  • Hudi通过自定义InputFormat与Hadoop生态系统(Spark、Hive、Parquet)集成。
  • Hudi通过Savepoint来实现数据恢复。
  • Hudi支持Spark 2.x版本,建议使用2.4.4 版本的Spark。

本篇文章Fayson主要介绍如何基于CDH6.3.2版本编译Hudi

2.编译环境准备

1.本次的编译环境主要是基于Intellij Idea工具进行编译,打开Idea开发工具,从git上将hudi的源码checkout下来。

点击“Get from VCS”,选择GitHub方式,填写Hudi的git地址:https://github.com/apache/hudi.git

点击Clone将Hudi的master代码拉至本地

选中hudi工程,右键切换分支版本至0.9.0

点击“Branches”,选择0.9.0版本并checkout

到此完成了Hudi源码的Checkout,接下来调整依赖包版本及简单的调整代码进行编译。

注意:Hudi是Java开发,在自己的开发环境中还需要调整后自己的Java环境变量。

3.源码编译及修改

本次编译主要是为了能够更好的适配CDH6.3.2集群,因此在编译的过程中需要将Maven依赖调整为CDH6.3.2版本。

1.修改pom.xml配置文件,将里面的依赖修改为如下

  • 确认<repositories></repositories>部分是否有Cloudera的Maven源

代码语言:javascript复制
    <repository>
      <id>cloudera</id>
      <url>https://repository.cloudera.com/artifactory/public/</url>
      <releases>
        <enabled>true</enabled>
      </releases>
      <snapshots>
        <enabled>false</enabled>
      </snapshots>
    </repository>
  • 改<properties></properties>部分的hadoop.version、hive.version以及spark2.version的版本
代码语言:javascript复制
<hadoop.version>3.0.0-cdh6.3.2</hadoop.version>
<hive.version>2.1.1-cdh6.3.2</hive.version>
<spark2.version>2.4.0-cdh6.3.2</spark2.version>
  • 修改hive-jdbc和hive-service两个依赖的配置,添加排除
代码语言:javascript复制
    <exclusion>
      <groupId>org.glassfish</groupId>
      <artifactId>javax.el</artifactId>
    </exclusion>

2.修改hudi-spark模块的org.apache.hudi.DefaultSource类中的部分代码段

使用CDH6.3.2版本的Spark依赖包找不到org.apache.spark.sql.execution.datasources.orc.OrcFileFormat

3.hudi-utilities模块代码修改

  • org.apache.hudi.utilities.sources.helpers.CloudObjectsSelector类中的getFileAttributesFromRecord(JSONObject record)方法

该方法未抛出JSONExcepiton,导致编译失败

  • org.apache.hudi.utilities.sources.helpers.S3EventsMetaSelector类中processAndDeleteInvalidMessages方法添加JSONException异常抛出

  • org.apache.hudi.utilities.sources.helpers.TestCloudObjectsSelector类中testFileAttributesFromRecordShouldReturnsExpectOutput方法添加JSONException异常抛出
  • org.apache.hudi.utilities.sources.helpers.TestS3EventsMetaSelector类中testNextEventsFromQueueShouldReturnsEventsFromQueue方法添加JSONException异常抛出

4.hudi-integ-test模块代码修改,注释pom.xml文件中jackson-annotations依赖的scope)

5.hudi-spark-datasource/hudi-spark-common模块的

org.apache.hudi.DataSourceReadOptions,将如下截图部分代码注释(204-228行的if判断)

6.完成上修改后,通过idea执行编译操作

等待命令执行成功

至此完成了Hudi0.9.0版本的编译。

4.Hudi与Spark集成测试

1.在前面完成了Hudi源码的编译,在packaging目录下的hudi-spark-bundle模块可以找到编译好的hudi-spark-bundle_2.11-0.9.0的jar包

2.将编译好的jar包上传至CDH集群任意有Spark Gateway节点的服务器上

3.使用spark-shell命令集成hudi并测试基本功能

代码语言:javascript复制
spark-shell 
  --jars hudi-spark-bundle_2.11-0.9.0.jar 
  --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'

4.在命令行执行如下代码,创建一个hudi的表并插入数据

代码语言:javascript复制
import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._

val tableName = "hudi_trips_cow1"
val basePath = "hdfs:///tmp/hudi_trips_cow1"
val dataGen = new DataGenerator
//写入数据
val inserts = convertToStringList(dataGen.generateInserts(10))
val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
df.write.format("hudi").
  options(getQuickstartWriteConfigs).
  option(PRECOMBINE_FIELD_OPT_KEY, "ts").
  option(RECORDKEY_FIELD_OPT_KEY, "uuid").
  option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
  option("hoodie.embed.timeline.server","false").
  option(TABLE_NAME, tableName).
  mode(Overwrite).
  save(basePath)

5.执行如下代码读取数据

代码语言:javascript复制
val tripsSnapshotDF = spark.read.
  format("hudi").
  load(basePath)
tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot")
spark.sql("select fare, begin_lon, begin_lat, ts from  hudi_trips_snapshot where fare > 20.0").show()
spark.sql("select _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, rider, driver, fare from  hudi_trips_snapshot").show()

查看该表的总数据量

6.执行如下代码删除数据

代码语言:javascript复制
// 取出两条要删除的数据
val ds = spark.sql("select uuid, partitionpath from hudi_trips_snapshot").limit(2)

// 删除
val deletes = dataGen.generateDeletes(ds.collectAsList())
val df = spark.read.json(spark.sparkContext.parallelize(deletes, 2))

df.write.format("hudi").
  options(getQuickstartWriteConfigs).
  option(OPERATION_OPT_KEY,"delete").
  option(PRECOMBINE_FIELD_OPT_KEY, "ts").
  option(RECORDKEY_FIELD_OPT_KEY, "uuid").
  option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
  option(TABLE_NAME, tableName).
  option("hoodie.embed.timeline.server","false").
  mode(Append).
  save(basePath)

// 再次查询
val roAfterDeleteViewDF = spark.
  read.
  format("hudi").
  load(basePath)

roAfterDeleteViewDF.registerTempTable("hudi_trips_snapshot")
// 应该返回两条数据
spark.sql("select uuid, partitionpath from hudi_trips_snapshot").show()

7.查看HDFS上的hudi数据目录

代码语言:javascript复制
hadoop fs -ls -R /tmp/hudi_trips_cow1

到完成了简单的Spark与Hudi的集成测试

5.总结

1.Hudi0.9.0版本与Spark的集成,至少需要Spark2.4.4及以上版本,在更高版本中引入的ORC的支持,因此使用CDH6.3.2版本依赖进行编译是去掉了ORC相关的代码段

2.在编译的过程中,hudi依赖的hive依赖中存在低版本的jetty依赖包,导致在执行写入时报如下异常:对于该异常的处理方式,需要在执行写入hudi数据的代码段中增加option("hoodie.embed.timeline.server","false").

代码语言:javascript复制
java.lang.NoSuchMethodError: org.apache.hudi.org.apache.jetty.server.session.SessionHandler.setHttpOnly(Z)V
  at io.javalin.core.util.JettyServerUtil.defaultSessionHandler(JettyServerUtil.kt:50)
  at io.javalin.Javalin.<init>(Javalin.java:94)

3.在后续的文章中会使用Hudi与支持的Hive、Spark、MR等进行详细的测试。

0 人点赞