一、产生背景
- MR性能差,资源消耗大,如:Hive作业之间的数据不是直接流动的,而是借助HDFS作为共享数据存储系统,即一个作业将处理好的数据写入HDFS,下一个作业再从HDFS重新读取数据进行处理。很明显更高效的方式是,第一个作业直接将数据传递给下游作业。
- MR 默认了map和reduce阶段,map会对中间结果进行分区、排序,reduce会进行合并排序,这一过程并不适用于所有场景。
- 引擎级别的Runtime优化:MR执行计划在编译时已经确定,无法动态调整(?)。然而在执行ETL和Ad-hoc等任务时,根据实际处理的表大小,动态调整join策略、任务并行度将大大缩短任务执行时间。
二、原理
2.1 DAG
https://hortonworks.com/blog/expressing-data-processing-in-apache-tez/
- Vertex:定义了用户逻辑(如:map/reduce)与相关的资源与环境
- Edge:定义了上下游Vertex之间的连接方式。
- Edge相关属性:
- Data movement:定义了producer与consumer之间数据流动的方式。 One-To-One: 第i个producer产生的数据,发送给第i个consumer。这种上下游关系属于Spark的窄依赖。 Broadcast: producer产生的数据路由都下游所有consumer。这种上下游关系也属于Spark的窄依赖。 Scatter-Gather: producer将产生的数据分块,将第i块数据发送到第i个consumer。这种上下游关系属于Spark的宽依赖。
Sequential: Consumer task 需要producer task结束后启动,如:MR。
Concurrent: Consumer task 与producer task一起启动,如:流计算。
- Data source:定义了任务outp的生命周期与可靠性。 Persisted: 当任务退出后,该任务output依然存在,但经过一段时间后,可能会被删除,如:Mapper输出的中间结果。
Persisted-Reliable: 任务output总是存在,比如,MR中reducer的输出结 果,存在HDFS上。
Ephemeral: 任务输出只有当该task在运行的时候,才存在,如:流计算的 中间结果。
- 举例——MapReduce在Tez的编程模型
一个DAG图中只有两个Vertex,Map Vertex与Reduce Vertex。连接Map Vertex与Reduce Vertex的Edge有以下属性:
- Data movement:Scatter-Gather
- Scheduling:Sequential
- Data Source:Map Vertex的Data Source为Persisted-Reliable,reduce Vertex 的 Data Source为Persisted
- Tez Api实现WordCount
2.2 Runtime API——Input/Processor/Output
Task是Tez的最小执行单元,Vertex中task的数量与该vertex的并行度一致。以下是Input、Processor、Output均需要实现的接口:
代码语言:javascript复制List<Event> initialize(Tez*Context) -This is where I/P/O receive their corresponding context objects. They can, optionally, return a list of events.
handleEvents(List<Event> events) – Any events generated for the specific I/P/O will be passed in via this interface. Inputs receive DataMovementEvent(s) generated by corresponding Outputs on this interface – and will need to interpret them to retrieve data. At the moment, this can be ignored for Outputs and Processors.
List<Event> close() – Any cleanup or final commits will typically be implemented in the close method. This is generally a good place for Outputs to generate DataMovementEvent(s). More on these events later.
2.3 Runtime优化
任务运行时,程序知晓更多任务相关的信息,通过这些信息,我们可以动态修改修改执行计划,比如:修改mapper或reducer数量,决定何时启动reducer等。在Tez中,不同组件通过不同事件类型,进行通信。
- 动态修改reducer并行度:MapTask通过VertexManager类型的事件向ShuffleVertextManager发送信息,比如:所处理的partition大小等。ShuffleVertexManager通过所获得的信息,可以估算出所有Task的输出数据大小,最后来调整下游reduce Vertex的并行度,如下图:
2.4 从逻辑执行计划到物理执行计划
- 从逻辑DAG到最后物理执行计划示意图:
2.5 其他优化措施
- Tez Session: 与数据库session相似,在同一个Tez Session中,可串行执行多个Tez Dag。Tez Session避免了AM的多次启动与销毁,在有多个DAG图的Tez作业(HQL任务)中大大减小了任务执行时间。
这也是为什么在Tez-UI中,一个HQL任务,只有一个Application,却有多个DAG(MR中一个HQL任务,有多个Application)。
Tez相关参数:
- Container复用
- 问题:
- container的资源兼容?被先后调度到同一个container的多个task所需要的资源,必须与container的资源相互兼容。也就是说,container拥有的资源,如:jar包,Memory,CPU等,需要是task所需资源的“超集”。
- 怎么调度?进行container复用时,Tez对Task进行调度。Tez会依据:任务本地性、任务所需资源、pending任务的优先级等因素,进行任务调度。
- 优点:
- 减少作业执行过程中JVM的创建与销毁带来的开销
- 减小对RM的请求压力
- 运行在同一container上task之间的数据共享。比如,MapJoin中可以通过共享小表数据的方式,减少资源消耗。
- 相关参数:
三、优缺点
- 优点:
- 避免中间数据写回HDFS,减小任务执行时间
- vertex management模块使runtime动态修改执行计划变成可能
- input/processor/output编程模型,大大提高了任务模型的灵活性
- 提供container复用机制与Tez Session,减少资源消耗
- 缺点:
- 出现数据重复问题等数据质量问题
- Tez与Hive捆绑,在其他领域应用较少
- 社区不活跃
四、Hive On Tez:
- Hive On Tez初始并行度算法 https://cwiki.apache.org/confluence/display/TEZ/How initial task parallelism works
- Hive 调节mapper与reducer数量 http://www.openkb.info/2017/05/hive-on-tez-how-to-control-number-of.html
- Tez Configuration https://tez.apache.org/releases/0.9.0/tez-api-javadocs/configs/TezConfiguration.html
- Hive on Tez 配置参数 https://cwiki.apache.org/confluence/display/Hive/Configuration Properties#ConfigurationProperties-Tez
参考链接:
- https://hortonworks.com/blog/apache-tez-a-new-chapter-in-hadoop-data-processing/
- http://web.eecs.umich.edu/~mosharaf/Readings/Tez.pdf