Shuffle 输出追踪者 --MapOutputTracker
这个组件作为 shuffle 的一个辅助组件, 在整个 shuffle 模块中具有很重要的作用. 我们在前面一系列的分析中, 或多或少都会提到这个组件, 比如在 DAGScheduler 提交一个 stage 时会将这个 stage 封装成一个任务集(TaskSet), 但是可能有的分区已经计算过了, 有了结果(stage 由于失败可能会多次提交, 其中有部分 task 可能已经计算完成), 这些分区就不需要再次计算, 而只需要计算那些失败的分区, 那么很显然需要有一个组件来维护 shuffle 过程中的任务失败成功的状态, 以及计算结果的位置信息.
此外, 在 shuffle 读取阶段, 我们知道一个 reduce 端的分区会依赖于多个 map 端的分区的输出数据, 那么我们在读取一个 reduce 分区对应的数据时, 就需要知道这个 reduce 分区依赖哪些 map 分区, 每个 block 的物理位置是什么, blockId 是什么, 这个 block 中属于这个 reduce 分区的数据量大小是多少, 这些信息的记录维护都是靠 MapOutputTracker 来实现的, 所以我们现在知道 MapOutputTracker 的重要性了.
MapOutputTracker.scala
MapOutputTracker 组件的主要功能类和辅助类全部在这个文件中, 我先大概说一下各个类的主要作用, 然后重点分析关键的类.
ShuffleStatus, 这个类是对一个 stage 的 shuffle 输出状态的封装, 它内部的一个主要的成员 mapStatuses 是一个数组, 这个数组的下标就是 map 的分区序号, 存放了每个 map 分区的输出情况, 关于 MapStatus 具体可以看 MapStatus.scala, 这里不打算展开.
MapOutputTrackerMessage, 用于 rpc 请求的消息类, 有两个实现类: GetMapOutputStatuses 用于获取某次 shuffle 的所有输出状态; StopMapOutputTracker 用于向 driver 端的发送停止 MapOutputTrackerMasterEndpoint 端点的请求.
MapOutputTrackerMasterEndpoint, 如果熟悉 spark 的 rpc 模块的话, 对这个类应该就很熟悉, 它就是一个 rpc 服务端, 通过向 RpcEnv 注册自己, 通过一个名称标识自己, 从而接收到特定一些消息, 也就是上面说的两种消息.
MapOutputTracker, 这个类是一个抽象类, 只是定义了一些操作接口, 它的一个最重要的作用可能就是内部维护了一个序列值 epoch, 这个值表示某一个一致的全局 map 输出状态, 一旦有 map 输出发生变更, 这个值就要加一, executor 端会同步最新的 epoch 以判断自己的 map 输出状态的缓存是否过期.
MapOutputTrackerMaster, 运行在 driver 端, 实现类 MapOutputTracker 的大部分功能, 是最核心的类
MapOutputTrackerWorker, 运行在 executor 端, 主要作用是封装了 rpc 调用的逻辑.
总的来看, 最核心的类是 MapOutputTrackerMaster, 其他的类都是围绕这个类的一些辅助类, 所以我们重点分析 MapOutputTrackerMaster, 其他的类我不打算深入展开, 相信读者自己也能够较为轻松地理解.
- MapOutputTrackerMaster
- findMissingPartitions
这个方法在上面已经提到了, 会在 DAGScheduler 封装任务集的时候查找一个 stage 需要计算的分区时会调用到.
- def findMissingPartitions(shuffleId: Int): Option[Seq[Int]] = {
- shuffleStatuses.get(shuffleId).map(_.findMissingPartitions())
- }
- ShuffleStatus.findMissingPartitions
- def findMissingPartitions(): Seq[Int] = synchronized {
- val missing = (0 until numPartitions).filter(id => mapStatuses(id) == null)
- assert(missing.size == numPartitions - _numAvailableOutputs,
- s"${missing.size} missing, expected ${numPartitions - _numAvailableOutputs}")
- missing
- }
这两段代码很简单, 不用多说, 就是从 map 结构中查找.
此外, 像 registerShuffle,registerMapOutput,unregisterMapOutput,unregisterShuffle,removeOutputsOnHost 等等, 我们可以看到这几个方法本身都是很简答的, 无非就是对内部 map 结构的插入, 更新和查找, 关键的是你要清楚这些方法的调用时机是什么? 弄清这一点, 会让我们对 MapOutputTracker 在整个 spark 框架中的作用和充当的角色有更深的理解. 方法的调用地点, 通过 Idea 这类 IDE 工具其实都可以很简单地定位到, 这里我不做过多展开, 仅仅简单地概括一下:
registerShuffle, DAGScheduler 在创建一个 ShuffleMapStage 时会顺便把这个 stage 对应的 shuffle 注册进来.
registerMapOutput, 在一个 shuffleMapTask 任务完成后, 会把 map 输出的信息注册进来.
removeOutputsOnHost, 将某个 host 上的相关 map 输出信息全部移除, 一般在主机丢失时调用此操作
removeOutputsOnExecutor, 同样地, 将某个 executor 上的相关 map 输出信息全部移除, 一般在 executor 丢失时调用此操作
getMapSizesByExecutorId
我们来看另一个比较重要的方法, 在 reduce 阶段读取数据时, 一个 task 首先需要知道它依赖于哪些 map 输出, 这时它回想 driver 端的 MapOutputTrackerMasterEndpoint 组件发送一个获取 map 输出的消息, 经过一系列方法调用最终会调用这个方法:
- def getMapSizesByExecutorId(shuffleId: Int, startPartition: Int, endPartition: Int)
- : Seq[(BlockManagerId, Seq[(BlockId, Long)])] = {
- logDebug(s"Fetching outputs for shuffle $shuffleId, partitions $startPartition-$endPartition")
- shuffleStatuses.get(shuffleId) match {
- case Some (shuffleStatus) =>
- // 将所有的 mapStatus 数组转换成 (BlockManagerId, Seq[(BlockId, Long)]) 对象
- shuffleStatus.withMapStatuses { statuses =>
- MapOutputTracker.convertMapStatuses(shuffleId, startPartition, endPartition, statuses)
- }
- case None =>
- Seq.empty
- }
- }
我们看一下: MapOutputTracker.convertMapStatuses, 这个方法也很简单, 其实就是将每个 map 分区输出切分成 reduce 分区数量, 最后产生的 (BlockId, Long) 元组数量等于 map 分区数量 * reduce 分区数量.
- def convertMapStatuses(
- shuffleId: Int,
- startPartition: Int,
- endPartition: Int,
- statuses: Array[MapStatus]): Seq[(BlockManagerId, Seq[(BlockId, Long)])] = {
- assert (statuses != null)
- // 用于存放结果
- val splitsByAddress = new HashMap[BlockManagerId, ArrayBuffer[(BlockId, Long)]]
- // 最后产生的 (BlockId, Long) 元组数量等于 map 分区数量 * reduce 分区数量
- for ((status, mapId) <- statuses.zipWithIndex) {
- if (status == null) {
- val errorMessage = s"Missing an output location for shuffle $shuffleId"
- logError(errorMessage)
- throw new MetadataFetchFailedException(shuffleId, startPartition, errorMessage)
- } else {
- for (part <- startPartition until endPartition) {
- splitsByAddress.getOrElseUpdate(status.location, ArrayBuffer()) +=
- ((ShuffleBlockId(shuffleId, mapId, part), status.getSizeForBlock(part)))
- }
- }
- }
- splitsByAddress.toSeq
- }
- getPreferredLocationsForShuffle
我们来看另外一个比较重要的方法. 我们知道 reduce 端的分区一般会依赖于多个 map 端分区输出, 但是对于每个 map 分区依赖的数据量是不同的, 举个极端的例子, 假设 reduce 端某个分区依赖于 10 个 map 端的输出分区, 但是其中一个分区依赖的数据有 10000 条, 而其他分区依赖的数据只有 1 条, 这种情况下, 显然我们应该吧这个 reduce 任务优先调度到那个依赖了 10000 条的 executor 上. 当然这个例子举得很简单, 可能也不是什么准确, 但是也足够说明这个方法的作用.
- def getPreferredLocationsForShuffle(dep: ShuffleDependency[_, _, _], partitionId: Int)
- : Seq[String] = {
- // 首先判断几个参数配置, 如果都符合条件, 那么再进行偏向位置的计算
- if (shuffleLocalityEnabled && dep.rdd.partitions.length <SHUFFLE_PREF_MAP_THRESHOLD &&
- dep.partitioner.numPartitions < SHUFFLE_PREF_REDUCE_THRESHOLD) {
- // 关键调用
- val blockManagerIds = getLocationsWithLargestOutputs(dep.shuffleId, partitionId,
- dep.partitioner.numPartitions, REDUCER_PREF_LOCS_FRACTION)
- if (blockManagerIds.nonEmpty) {
- blockManagerIds.get.map(_.host)
- } else {
- Nil
- }
- } else {
- Nil
- }
- }
可以看出来, 关键的方法是 getLocationsWithLargestOutputs, 接下来, 我们就来看一下这个方法:
注释已经说得很清楚, 这个方法的逻辑很简单, 比如一个 reduce 端分区要读取的总数据量是 100m, 某个 executor 上的所有 map 输出中与这个 reduce 分区相关的数据加起来有 20m, 即超过了总量的 0.2, 这时这个 executor 就能够成为偏向位置, 是不是很简单. 但是这里应该注意到一个问题, 这个方法是以 executor 为最小单位计算偏向位置, 而在前一个方法 getPreferredLocationsForShuffle 中, 获取到成为偏向位置的那些 BlockManagerId 后, 仅仅是取出了 host 作为偏向位置返回给上层调用者, 问题在于一个 host(即物理节点)上可能有多个 executor, 这就会造成返回的结果中会有重复的 host,; 另外, 既然返回 host 作为偏向位置, 那为什么不直接以 host 作为最小单位来计算偏向位置呢, 比如将一个 host 上所有与这个 reduce 分区相关的数据加起来, 如果超过 0.2 的占比就认为这个 host 能够作为偏向位置, 这样好像更合理, 也更容易产生偏向位置. 举个极端的例子, 一个 host 上运行了 5 个 executor, 每个 executor 与分区相关的数据占比 0.1, 另外有 5 个 host 上每个都只运行了一个 executor, 他们的数据占比均为 0.1, 这种情况下是不会产生偏向位置的, 但是实际上显然应该将那个拥有 5 个 executor 的 host 作为偏向位置.
- def getLocationsWithLargestOutputs(
- shuffleId: Int,
- reducerId: Int,
- numReducers: Int,
- fractionThreshold: Double)
- : Option[Array[BlockManagerId]] = {
- val shuffleStatus = shuffleStatuses.get(shuffleId).orNull
- // 对 shuffleStatus 非空检查
- if (shuffleStatus != null) {
- shuffleStatus.withMapStatuses { statuses =>
- // 对 mapStatus 数组的非空检查
- if (statuses.nonEmpty) {
- // HashMap to add up sizes of all blocks at the same location
- // 记录每个 executor 上的所有 map 输出的 block 中属于这个 reduce 端分区的数据量
- val locs = new HashMap[BlockManagerId, Long]
- var totalOutputSize = 0L
- var mapIdx = 0
- while (mapIdx <statuses.length) {
- val status = statuses(mapIdx)
- // status may be null here if we are called between registerShuffle, which creates an
- // array with null entries for each output, and registerMapOutputs, which populates it
- // with valid status entries. This is possible if one thread schedules a job which
- // depends on an RDD which is currently being computed by another thread.
- if (status != null) {
- val blockSize = status.getSizeForBlock(reducerId)
- if (blockSize> 0) {
- locs(status.location) = locs.getOrElse(status.location, 0L) + blockSize
- totalOutputSize += blockSize
- }
- }
- mapIdx = mapIdx + 1
- }
- // 最后, 判断一个 executor 能否成为偏向位置的条件是:
- // 这个 executor 上所有与这个 reduce 分区相关的数据大小与这个分区数据总量的比值是否大于一个阈值
- // 这个阈值默认是 0.2
- val topLocs = locs.filter { case (loc, size) =>
- size.toDouble / totalOutputSize>= fractionThreshold
- }
- // Return if we have any locations which satisfy the required threshold
- if (topLocs.nonEmpty) {
- return Some(topLocs.keys.toArray)
- }
- }
- }
- }
- None
- }
总结
国际惯例, 再晚也要总结一下. 我们简单总结一下 map 输出追踪器的作用:
维护所有 shuffle 的 map 输出状态信息, 位置信息等
查找某个 stage 还有哪些未计算的分区
获取 reduce 分区的偏向位置
获取 reduce 分区依赖哪些 map 输出, 他们的位置, 每个 map 输出中相关数据的大小
来源: https://www.cnblogs.com/zhuge134/p/11048963.html