1.文档编写目的
Apache Hudi是一个Data Lakes的开源方案,是Hadoop Updates and Incrementals的简写,它是由Uber开发并开源的Data Lakes解决方案。Hudi 是一个丰富的平台,用于构建具有增量数据管道的流式数据湖,具有如下基本特性/能力:
- Hudi能够摄入(Ingest)和管理(Manage)基于HDFS之上的大型分析数据集,主要目的是高效的减少入库延时。
- Hudi基于Spark来对HDFS上的数据进行更新、插入、删除等。
- Hudi在HDFS数据集上提供如下流原语:插入更新(如何改变数据集);增量拉取(如何获取变更的数据)。
- Hudi可以对HDFS上的parquet格式数据进行插入/更新操作。
- Hudi通过自定义InputFormat与Hadoop生态系统(Spark、Hive、Parquet)集成。
- Hudi通过Savepoint来实现数据恢复。
- Hudi支持Spark 2.x版本,建议使用2.4.4 版本的Spark。
本篇文章Fayson主要介绍如何基于CDH6.3.2版本编译Hudi
2.编译环境准备
1.本次的编译环境主要是基于Intellij Idea工具进行编译,打开Idea开发工具,从git上将hudi的源码checkout下来。
点击“Get from VCS”,选择GitHub方式,填写Hudi的git地址:https://github.com/apache/hudi.git
点击Clone将Hudi的master代码拉至本地
选中hudi工程,右键切换分支版本至0.9.0
点击“Branches”,选择0.9.0版本并checkout
到此完成了Hudi源码的Checkout,接下来调整依赖包版本及简单的调整代码进行编译。
注意:Hudi是Java开发,在自己的开发环境中还需要调整后自己的Java环境变量。
3.源码编译及修改
本次编译主要是为了能够更好的适配CDH6.3.2集群,因此在编译的过程中需要将Maven依赖调整为CDH6.3.2版本。
1.修改pom.xml配置文件,将里面的依赖修改为如下
- 确认<repositories></repositories>部分是否有Cloudera的Maven源
<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/public/</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
代码语言:javascript复制<hadoop.version>3.0.0-cdh6.3.2</hadoop.version>
<hive.version>2.1.1-cdh6.3.2</hive.version>
<spark2.version>2.4.0-cdh6.3.2</spark2.version>
代码语言:javascript复制 <exclusion>
<groupId>org.glassfish</groupId>
<artifactId>javax.el</artifactId>
</exclusion>
2.修改hudi-spark模块的org.apache.hudi.DefaultSource类中的部分代码段
使用CDH6.3.2版本的Spark依赖包找不到org.apache.spark.sql.execution.datasources.orc.OrcFileFormat
3.hudi-utilities模块代码修改
- org.apache.hudi.utilities.sources.helpers.CloudObjectsSelector类中的getFileAttributesFromRecord(JSONObject record)方法
该方法未抛出JSONExcepiton,导致编译失败
- org.apache.hudi.utilities.sources.helpers.S3EventsMetaSelector类中processAndDeleteInvalidMessages方法添加JSONException异常抛出
4.hudi-integ-test模块代码修改,注释pom.xml文件中jackson-annotations依赖的scope)
5.hudi-spark-datasource/hudi-spark-common模块的
org.apache.hudi.DataSourceReadOptions,将如下截图部分代码注释(204-228行的if判断)
6.完成上修改后,通过idea执行编译操作
等待命令执行成功
至此完成了Hudi0.9.0版本的编译。
4.Hudi与Spark集成测试
1.在前面完成了Hudi源码的编译,在packaging目录下的hudi-spark-bundle模块可以找到编译好的hudi-spark-bundle_2.11-0.9.0的jar包
2.将编译好的jar包上传至CDH集群任意有Spark Gateway节点的服务器上
3.使用spark-shell命令集成hudi并测试基本功能
代码语言:javascript复制spark-shell
--jars hudi-spark-bundle_2.11-0.9.0.jar
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'
4.在命令行执行如下代码,创建一个hudi的表并插入数据
代码语言:javascript复制import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
val tableName = "hudi_trips_cow1"
val basePath = "hdfs:///tmp/hudi_trips_cow1"
val dataGen = new DataGenerator
//写入数据
val inserts = convertToStringList(dataGen.generateInserts(10))
val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
df.write.format("hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
option("hoodie.embed.timeline.server","false").
option(TABLE_NAME, tableName).
mode(Overwrite).
save(basePath)
5.执行如下代码读取数据
代码语言:javascript复制val tripsSnapshotDF = spark.read.
format("hudi").
load(basePath)
tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot")
spark.sql("select fare, begin_lon, begin_lat, ts from hudi_trips_snapshot where fare > 20.0").show()
spark.sql("select _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, rider, driver, fare from hudi_trips_snapshot").show()
查看该表的总数据量
6.执行如下代码删除数据
代码语言:javascript复制// 取出两条要删除的数据
val ds = spark.sql("select uuid, partitionpath from hudi_trips_snapshot").limit(2)
// 删除
val deletes = dataGen.generateDeletes(ds.collectAsList())
val df = spark.read.json(spark.sparkContext.parallelize(deletes, 2))
df.write.format("hudi").
options(getQuickstartWriteConfigs).
option(OPERATION_OPT_KEY,"delete").
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
option(TABLE_NAME, tableName).
option("hoodie.embed.timeline.server","false").
mode(Append).
save(basePath)
// 再次查询
val roAfterDeleteViewDF = spark.
read.
format("hudi").
load(basePath)
roAfterDeleteViewDF.registerTempTable("hudi_trips_snapshot")
// 应该返回两条数据
spark.sql("select uuid, partitionpath from hudi_trips_snapshot").show()
7.查看HDFS上的hudi数据目录
代码语言:javascript复制hadoop fs -ls -R /tmp/hudi_trips_cow1
到完成了简单的Spark与Hudi的集成测试
5.总结
1.Hudi0.9.0版本与Spark的集成,至少需要Spark2.4.4及以上版本,在更高版本中引入的ORC的支持,因此使用CDH6.3.2版本依赖进行编译是去掉了ORC相关的代码段
2.在编译的过程中,hudi依赖的hive依赖中存在低版本的jetty依赖包,导致在执行写入时报如下异常:对于该异常的处理方式,需要在执行写入hudi数据的代码段中增加option("hoodie.embed.timeline.server","false").
代码语言:javascript复制java.lang.NoSuchMethodError: org.apache.hudi.org.apache.jetty.server.session.SessionHandler.setHttpOnly(Z)V
at io.javalin.core.util.JettyServerUtil.defaultSessionHandler(JettyServerUtil.kt:50)
at io.javalin.Javalin.<init>(Javalin.java:94)
3.在后续的文章中会使用Hudi与支持的Hive、Spark、MR等进行详细的测试。