sparksql中有一些容易混淆的概念,大家在面试时也会经常被问到join和shuffle相关的问题:
- 说说join的几种实现
- 说说shuffle的实现
- join操作一定发生shuffle吗?
- spark shuffle 2.0以上已经不用hash shuffle了,那join的时候还用hash join 么?
- ...
想要弄清楚这些,就得搞清楚sparksql中join的具体实现有哪些?shuffle又有哪些?他们之间的关系又是什么?
sparksql中的join
BaseJoinExec是sparksql中join实现的基类。
sparksql中,join有六种实现方式:
- SortMergeJoinExec
- ShuffledHashJoinExec
- BroadcastHashJoinExec
- BroadcastNestedLoopJoinExec
- CartesianProductExec
具体特点及分发类型如下图:
sparksql中的shuffle
Exchange
如果某个sql会发生shuffle,那么它的执行计划中一定会出现Exchange(实际是ShuffleExchangeExec)节点。
Exchange的两个实现类:
BroadcastExchangeExec(广播)、ShuffleExchangeExec(shuffle)
BroadcastExchangeExec:对应sql中的广播相关的join。
ShuffleExchangeExec:对应sql中shuffle hash join、sort merge join或者聚合类的操作,比如group by,grouping sets。
Shuffle机制
所谓shuffle就是把不同节点上的数据按相同key值拉取到一个节点上。
shuffle发生在map 和reduce之间(也可以说是两个stage之间),分为shuffleWrite 和shuffleRead两个过程。
shuffle 过程:
前一个stage进行shuffle write 把数据存在blockManage;
下一个stage 进行shuffle read 拉取上个stage 的数据。
几个重要的类:
shuffleManager
ShuffleManager是Spark系统中可插拔的Shuffle系统接口,ShuffleManager会在Driver或Executor的SparkEnv被创建时一并创建,可以通过spark.shuffle.manage配置指定具体的实现类。目前唯一实现类org.apache.spark.shuffle.sort.SortShuffleManager
ShuffleWriter
ShuffleWriter是Spark提供的ShuffleMapTask写入数据的主要类,有三种writer,分别是BypassMergeSortShuffleWriter,UnsafeShuffleWriter和SortShuffleWriter。
ShuffleManager通过getWriter方法获取合适的ShuffleWriter,然后通过write方法写入数据到存储系统中。
ShuffleReader
ShuffleReader是Shuffle read任务从上游ShuffleMapTask的结果MapStatus获取文件信息,读取数据产生迭代器,是后续Task使用的源数据的生产者,目前唯一实现是BlockStoreShuffleReader,实现了read方法。
总结
- join操作一定发生shuffle吗?
不一定。只有 SortMergeJoinExec 和 ShuffledHashJoinExec 这两种类型的join实现会发生shuffle;如果再拓展一下的话,可以说一下这几种join的选择策略(相关源码咱们课上见)。
- spark shuffle 2.0以上已经不用hash shuffle了,那join的时候还用hash join 么?
shuffle是一种数据分发的方式,它的实现代表的是两个stage之间的数据按照什么方式移动;而join是发生在某个stage中,hash join是指把小表构建成hash表,和基表进关联的操作。
hash shuffle被弃用了,hash join在ShuffledHashJoinExec 和 BroadcastHashJoinExec这两种join的实现中还在使用。