在Spark越来越受到主流市场青睐的大背景下,Hive作为Hadoop生态当中的数仓组件工具,在于Spark生态的配合当中,开始有了Hive on Spark的思路,那么具体是怎么实现的呢?今天的大数据开发分享,我们来讲讲Hive on Spark设计原则及架构。
总的来说,Hive on Spark的设计思路,是重用Hive逻辑层面的功能,从生成物理计划开始,提供一整套针对Spark的实现,比如SparkCompiler、SparkTask等,最终实现Hive查询作为Spark的任务来执行。
Hive on Spark设计原则
①尽可能少改动Hive的代码,从而不影响Hive目前对MapReduce和Tez的支持。同时,Hive on Spark保证对现有的MapReduce和Tez模式在功能和性能方面不会有任何影响。
②对于选择Spark的用户,应使其能够自动的获取Hive现有的和未来新增的功能。
③尽可能降低维护成本,保持对Spark依赖的松耦合。
Hive on Spark架构设计
①新的计算引擎
Hive的用户可以通过hive.execution.engine来设置计算引擎,该参数可选的值为mr和tez。为了实现Hive on Spark,我们将spark作为该参数的第三个选项。要开启Hive on Spark模式,用户仅需将这个参数设置为spark即可。
②以Hive的表作为RDD
Spark以分布式可靠数据集(Resilient Distributed Dataset,RDD)作为其数据抽象,因此我们需要将Hive的表转化为RDD以便Spark处理。本质上,Hive的表和Spark的HadoopRDD都是HDFS上的一组文件,通过InputFormat和RecordReader读取其中的数据,因此这个转化是自然而然的。
③使用Hive原语
这里主要是指使用Hive的操作符对数据进行处理。通过将Hive的操作符包装为Function,然后应用到RDD上。这样只需要依赖较少的几种RDD的转换,而主要的计算逻辑仍由Hive提供。
由于使用了Hive的原语,因此我们需要显式地调用一些Transformation来实现Shuffle的功能。下表中列举了Hive on Spark使用的所有转换。
④物理执行计划
通过SparkCompiler将Operator Tree转换为Task Tree,其中需要提交给Spark执行的任务即为SparkTask。不同于MapReduce中Map Reduce的两阶段执行模式,Spark采用DAG执行模式,因此一个SparkTask包含了一个表示RDD转换的DAG,我们将这个DAG包装为SparkWork。
执行SparkTask时,就根据SparkWork所表示的DAG计算出最终的RDD,然后通过RDD的foreachAsync来触发运算。使用foreachAsync是因为我们使用了Hive原语,因此不需要RDD返回结果;此外foreachAsync异步提交任务便于我们对任务进行监控。
⑤SparkContext生命周期
SparkContext是用户与Spark集群进行交互的接口,Hive on Spark应该为每个用户的会话创建一个SparkContext。但是Spark目前的使用方式假设SparkContext的生命周期是Spark应用级别的,而且目前在同一个JVM中不能创建多个SparkContext(请参考SPARK-2243)。这明显无法满足HiveServer2的应用场景,因为多个客户端需要通过同一个HiveServer2来提供服务。鉴于此,我们需要在单独的JVM中启动SparkContext,并通过RPC与远程的SparkContext进行通信。
⑥任务监控与统计信息收集
Spark提供了SparkListener接口来监听任务执行期间的各种事件,因此我们可以实现一个Listener来监控任务执行进度以及收集任务级别的统计信息(目前任务级别的统计由SparkListener采集,任务进度则由Spark提供的专门的API来监控)。另外Hive还提供了Operator级别的统计数据信息,比如读取的行数等。在MapReduce模式下,这些信息通过Hadoop Counter收集。我们可以使用Spark提供的Accumulator来实现该功能。
⑦测试
除了一般的单元测试以外,Hive还提供了Qfile Test,即运行一些事先定义的查询,并根据结果判断测试是否通过。Hive on Spark的Qfile Test应该尽可能接近真实的Spark部署环境。
关于大数据开发,Hive on Spark设计原则及架构,以上就为大家做了大致的介绍了。Hive on Spark作为业界使用较多的一种手段,某种程度上来说,还是值得去细细钻研一下的。