天天看点

Spark Shuffle 原理Spark Shuffle 原理

Spark Shuffle 原理

文章目录

  • Spark Shuffle 原理
    • Shuffle 中的两个 stage
    • Shuffle 中的任务数
    • reduce 拉取数据的过程
    • HashShuffle
      • 未经优化的 HashShuffleManager
      • 优化的 HashShuffleManager
    • SortShuffle
      • SortShuffleManager
      • BypassSortShuffleManager

Shuffle 中的两个 stage

在 stage 划分时,最后一个 stage 为 ResultStage,其他的则都为 ShuffleMapStage;

ResultStage : 代表着由一个 action 算子的触发,并代表 Spark 作业的结束;

ShuffleMapStage : 代表着一个引起 Shuffle 的算子的执行,为下游的 stage 进行 shuffle write;

Shuffle 中的任务数

  • MapTask : 最开始的 MapTask 如果是读的 hdfs 上的数据,那么分区的数量由 split 的数量决定;
  • ReduceTask : reduce 端的分区默认取

    spark.default.parallelism

    的值; 如果没有设置,那么在没有发生重分区的情况下,reduce 端的分区数等于 map 端最有一个 RDD 的分区数量; 如果发生重分区,以重分区数量为准;

reduce 拉取数据的过程

map task 为下游 stage 进行 shuffle write,reduce task 进行 shuffle read; 那么 reduce task 就需要知道在哪些地方拉取 map task 产生的数据:

  1. map task 执行完毕后会将计算状态以及磁盘小文件位置等信息封装到

    MapStatus

    对象中,然后由本进程中的

    MapOutPutTrackerWorker

    对象将

    MapStatus

    对象发送给Driver进程的

    MapOutPutTrackerMaster

    对象;
  2. 在reduce task开始执行之前会先让本进程中的

    MapOutputTrackerWorker

    向Driver进程中的

    MapoutPutTrakcerMaster

    发动请求,请求磁盘小文件位置信息;
  3. 当所有的Map task执行完毕后,Driver进程中的

    MapOutPutTrackerMaster

    就掌握了所有的磁盘小文件的位置信息。此时

    MapOutPutTrackerMaster

    会告诉

    MapOutPutTrackerWorker

    磁盘小文件的位置信息;
  4. 完成之前的操作之后,由

    BlockTransforService

    去Executor所在的节点拉数据,默认会启动五个子线程。每次拉取的数据量不能超过48M(reduce task每次最多拉取48M数据,将拉来的数据存储到Executor内存的执行内存中)。

HashShuffle

未经优化的 HashShuffleManager

shuffle write 为下游的 stage 进行数据的准备和划分; 每个 task 将 key 按照 hash 进行数据的划分,将相同 key 的数据写入同一个磁盘文件,而每一个磁盘文件只属于下游的一个 task。

而在将数据写入磁盘之前,会先将数据写入内存缓冲中,当内存缓冲填满之后,才会溢写到磁盘文件中去。

使用未经优化的 HashShuffleManager 会产生大量的磁盘文件主要体现在: 下一个 stage 的 task 有多少个,当前 stage 的每个 task 就要创建多少份磁盘文件。

所以总共生成的文件数为:当前task的数量 * 下个 stage 的 task 数量;

未优化的HashShuffleManager工作原理如图所示:

Spark Shuffle 原理Spark Shuffle 原理

优化的 HashShuffleManager

为了优化 HashShuffleManager,可以通过设置

spark.shuffle.consolidateFiles

,默认为false,将其设置为 true 打开;

开启了之后,和未经优化的 HashShuffleManager 的主要区别为 : 下一个 stage 的 task 有多少个,当前 stage 的每个CPU Core 就要创建多少份磁盘文件;

每个Executor创建的磁盘文件的数量的计算公式为 : CPU core的数量 * 下一个stage的task数量

Spark Shuffle 原理Spark Shuffle 原理

SortShuffle

SortShuffleManager

SortShuffleManager 的工作模式为,先将数据写入内存数据结构; 如果是reduceByKey这种聚合类的shuffle算子,那么会选用Map数据结构,一边通过Map进行聚合,一边写入内存;如果是join这种普通的shuffle算子,那么会选用Array数据结构,直接写入内存。

每写入一条会判断是否达到阈值,达到阈值了之后会进行溢写磁盘;

在溢写磁盘之前会进行对 key 的排序,写入到内存缓冲区,然后按照每批次默认 10000 条的数量进行多次溢写磁盘; 每次溢写会产生一个临时文件;

而最后结束的时候,会将所有的临时文件进行 merge,并产生一个索引文件,标识了下游各个task的数据在文件中的start offset与end offset;

所以每个 executor 上,只会产生当前 executor 上的 task 数的磁盘文件数量

Spark Shuffle 原理Spark Shuffle 原理

BypassSortShuffleManager

bypass运行机制的触发条件如下:

  • shuffle map task数量小于spark.shuffle.sort.bypassMergeThreshold参数的值
  • 不是聚合类的shuffle算子

此时,每个 task 会为下游的每一个 task 产生一份临时文件,按照 key 进行 hash 将 相同 key 放入同一个文件; 在写磁盘之前,会写到内存的缓冲区,缓冲区满了之后

才会进行数据的溢写;

上面的步骤和 未经优化的 HashShuffleManager 相同,而 bypass 机制下,后续会进行临时文件的合并,并生成一份索引文件;

和 SortShuffleManager的不同是

  1. 写入磁盘的机制不同
    • bypass 是内存缓冲区满了才溢写
    • sort 是按照批次 10000 条进行溢写
  2. 不需要排序

每个 executor 上,只会产生当前 executor 上的 task 数的磁盘文件数量

Spark Shuffle 原理Spark Shuffle 原理