Spark Shuffle 和 Spill 的区别

本文翻译自 Chendi Xue’s Blog原文链接 

spark shuffle 做了什么事情?

shuffle是指 map 任务和 task 任务之间的流程。在后文我们提到 shuffling 都是指对数据做shuffle 的处理。

为什么数据需要 shuffle?

我们首先来举个例子说明。假如我们需要统计美国各个州中每一个社区的GDP。那么我们最终的结果应该类似于 (Manhattan -> xxx billion; Beverly Hills ->xxx billion) 这样子的形式。

由于我们需要计算的社区非常的多,那么我们必然需要借助 terasort 这样的算法来帮助我们排名。整个任务的流程如下:

  1. map 任务从 hdfs 上拉取数据, 并且计算这些数据都是属于哪一个城市。比如某个社区属于 NewYork, 那么就把数据放到 NewYork 的桶里。
  2. 当所有的 map 任务完成后,也就意味着所有的社区都进入了对于的城市 bucket 中。所有的桶内的数据都像图中左边展示的那样。不同的颜色代表不同的城市。
  3. 这些 buckets 就是需要 shuffle 的数据!
  4. 之后,redue 任务就开始工作了,每一个 reduce 任务负责处理一个城市的数据。它会读取每一个 map任务写入的对应城市的数据。并且在同时还会对拉取的数据进行排序。
  5. 一个所有的 bucket 数据处理完成(如图中右边所示),我们也就得到了对应每个城市的根据GDP排名的数据。

img

因此,在 spark UI 中,让一个任务需要 shuffling 的时候,它总是会被切分成两个 stage 。 包括一个 map stage 和 一个 reduce stage. 如下图所示。

怎样估计有多少数据需要被 shuffle 呢?

shuffle 是 spark 的在相同或不同的 executor 节点交换内部 map 和 reduce 任务数据的过程。 map 任务负责写任务(shuffle write) ,而 reduce 任务则会拉取数据(shuffle read) . 因此 shuffle 数据的大小依赖于我们结果的需求。

假如我们现在的任务是 rank,就就是需要把没有排序的 (社区,GDP) 这样的数据,变成输出为 根据GDP排好序的结果。那么需要 shuffle 的数据就是这部分数据的压缩或者序列化的结果。

假如我们需要的结果是一个城市的总GDP,并且输入还是没有排序的社区GDP数据,那么 shuffle 的数据就是一个记录了每个社区GDP总和的list。

我们也可以通过 spark UI 来追踪 shuffle 数据的数量。

如果你想做一个预估,可以这样进行简单的计算: 比如我们的输入数据是 hdfs 上面 256m 的 block, 并且这样的数据总共有 100 G. 那么我们基本上可以预估需要 100GB/256MB=400 个map任务。 并且每一个 map 任务都包含了 256M 的数据。 这些数据之后会被通过序列化放入不同的城市的 bucket 中。因此我们也可以看到 shuffle write 的数据也接近 256M 的大小,这个值由于序列化的原因会稍微大一点。

之后当我们开始 reduce 时,reduce task 就会从所有 map task 中读取对应城市的记录。那么每一个 reduce task 的 shuffle read 数据应该是一个城市所有记录的总和。这个数据根据不同城市的社区数量的不同而不同。

spark spill all 又做了什么呢?

Spilling 是spark 从硬盘上读写数据的情况之一。还有一个情况是 spark 内存不够用的时候。

  1. 当要进行 shuffle 的时候, 我们并不是把每一条数据都写入到磁盘中,一般来说我们会首先把数据写得这个城市的bucket对应在内存中的位置,如果内存设置了上限阈值,超出的部分数据就会被写入到磁盘中。
  2. 除了做 shuffle 的工作,spark还有一种操作叫做 External Sorter, 他会给每一个城市的桶做一次 TimeSort,因此这个过程需要较大的内存支持,当内存不充足的时候,它会把数据写入到磁盘中并重新进行新一轮的插入排序。最后对在磁盘和内存中的所有数据进行 merge sort。

怎样估计有多少数据会被 spill 呢?

这取决于你的 JVM 有多少的可用内存。 spark的初始阈值是 5M 来尝试把在内存中进行 insert sort 的数据 spill 到磁盘中。

Spark Shuffle DataFlow 细节

在经过上文的解释之后,让我们来看一下具体的流程图。

无论是 shuffle write 还是 external spill,目前 spark 都是通过 DiskBlockObkectWriter 来控制数据的 kyro 序列化缓存和 达到阈值后的写入磁盘。

当从文件中读数据时,shuffle read 以不同的方式处理 相同节点的数据读取和 其他网络节点的数据读取。 相同节点的 数据读取会以 FileSegmentManagedBuffer 的形式来拉取数据, 而远程节点则会以NettyManagedBuffer 的形式来拉取数据。

对于排序数据的读取,spark 会先 return 一个 排序过的 RDD 的迭代器, 然后读取操作会定义在interator.hasNext() 函数中,以惰性的方式来读取数据。

0%