Spark Shuffle 原理
文章目录
- Spark Shuffle 原理
-
- Shuffle 中的两个 stage
- Shuffle 中的任务数
- reduce 拉取数据的过程
- HashShuffle
-
- 未经优化的 HashShuffleManager
- 优化的 HashShuffleManager
- SortShuffle
-
- SortShuffleManager
- BypassSortShuffleManager
Shuffle 中的两个 stage
在 stage 划分时,最后一个 stage 为 ResultStage,其他的则都为 ShuffleMapStage;
ResultStage : 代表着由一个 action 算子的触发,并代表 Spark 作业的结束;
ShuffleMapStage : 代表着一个引起 Shuffle 的算子的执行,为下游的 stage 进行 shuffle write;
Shuffle 中的任务数
- MapTask : 最开始的 MapTask 如果是读的 hdfs 上的数据,那么分区的数量由 split 的数量决定;
- ReduceTask : reduce 端的分区默认取
的值; 如果没有设置,那么在没有发生重分区的情况下,reduce 端的分区数等于 map 端最有一个 RDD 的分区数量; 如果发生重分区,以重分区数量为准;spark.default.parallelism
reduce 拉取数据的过程
map task 为下游 stage 进行 shuffle write,reduce task 进行 shuffle read; 那么 reduce task 就需要知道在哪些地方拉取 map task 产生的数据:
- map task 执行完毕后会将计算状态以及磁盘小文件位置等信息封装到
对象中,然后由本进程中的
MapStatus
对象将
MapOutPutTrackerWorker
对象发送给Driver进程的
MapStatus
对象;
MapOutPutTrackerMaster
- 在reduce task开始执行之前会先让本进程中的
向Driver进程中的
MapOutputTrackerWorker
发动请求,请求磁盘小文件位置信息;
MapoutPutTrakcerMaster
- 当所有的Map task执行完毕后,Driver进程中的
就掌握了所有的磁盘小文件的位置信息。此时
MapOutPutTrackerMaster
会告诉
MapOutPutTrackerMaster
磁盘小文件的位置信息;
MapOutPutTrackerWorker
- 完成之前的操作之后,由
去Executor所在的节点拉数据,默认会启动五个子线程。每次拉取的数据量不能超过48M(reduce task每次最多拉取48M数据,将拉来的数据存储到Executor内存的执行内存中)。
BlockTransforService
HashShuffle
未经优化的 HashShuffleManager
shuffle write 为下游的 stage 进行数据的准备和划分; 每个 task 将 key 按照 hash 进行数据的划分,将相同 key 的数据写入同一个磁盘文件,而每一个磁盘文件只属于下游的一个 task。
而在将数据写入磁盘之前,会先将数据写入内存缓冲中,当内存缓冲填满之后,才会溢写到磁盘文件中去。
使用未经优化的 HashShuffleManager 会产生大量的磁盘文件主要体现在: 下一个 stage 的 task 有多少个,当前 stage 的每个 task 就要创建多少份磁盘文件。
所以总共生成的文件数为:当前task的数量 * 下个 stage 的 task 数量;
未优化的HashShuffleManager工作原理如图所示:
![](https://img.laitimes.com/img/__Qf2AjLwojIjJCLyojI0JCLicmbw5CMyYTMmVjN0IjZllzYkFzY4ITO4EzNkNmN1ADZkNDZy8CX0JXZ252bj91Ztl2Lc52YucWbp5GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.png)
优化的 HashShuffleManager
为了优化 HashShuffleManager,可以通过设置
spark.shuffle.consolidateFiles
,默认为false,将其设置为 true 打开;
开启了之后,和未经优化的 HashShuffleManager 的主要区别为 : 下一个 stage 的 task 有多少个,当前 stage 的每个CPU Core 就要创建多少份磁盘文件;
每个Executor创建的磁盘文件的数量的计算公式为 : CPU core的数量 * 下一个stage的task数量
SortShuffle
SortShuffleManager
SortShuffleManager 的工作模式为,先将数据写入内存数据结构; 如果是reduceByKey这种聚合类的shuffle算子,那么会选用Map数据结构,一边通过Map进行聚合,一边写入内存;如果是join这种普通的shuffle算子,那么会选用Array数据结构,直接写入内存。
每写入一条会判断是否达到阈值,达到阈值了之后会进行溢写磁盘;
在溢写磁盘之前会进行对 key 的排序,写入到内存缓冲区,然后按照每批次默认 10000 条的数量进行多次溢写磁盘; 每次溢写会产生一个临时文件;
而最后结束的时候,会将所有的临时文件进行 merge,并产生一个索引文件,标识了下游各个task的数据在文件中的start offset与end offset;
所以每个 executor 上,只会产生当前 executor 上的 task 数的磁盘文件数量
BypassSortShuffleManager
bypass运行机制的触发条件如下:
- shuffle map task数量小于spark.shuffle.sort.bypassMergeThreshold参数的值
- 不是聚合类的shuffle算子
此时,每个 task 会为下游的每一个 task 产生一份临时文件,按照 key 进行 hash 将 相同 key 放入同一个文件; 在写磁盘之前,会写到内存的缓冲区,缓冲区满了之后
才会进行数据的溢写;
上面的步骤和 未经优化的 HashShuffleManager 相同,而 bypass 机制下,后续会进行临时文件的合并,并生成一份索引文件;
和 SortShuffleManager的不同是
- 写入磁盘的机制不同
- bypass 是内存缓冲区满了才溢写
- sort 是按照批次 10000 条进行溢写
- 不需要排序
每个 executor 上,只会产生当前 executor 上的 task 数的磁盘文件数量