上篇文章 spark 源码分析之十八 -- Spark 存储体系剖析 重点剖析了 Spark 的存储体系. 从本篇文章开始, 剖析 Spark 作业的调度和计算体系.
在说 DAG 之前, 先简单说一下 RDD.
对 RDD 的整体概括
文档说明如下:
RDD 全称 Resilient Distributed Dataset, 即分布式弹性数据集. 它是 Spark 的基本抽象, 代表不可变的可分区的可并行计算的数据集.
RDD 的特点:
1. 包含了一系列的分区
2. 在每一个 split 上执行函数计算
3. 依赖于其他的 RDD
4. 对于 key-value 对的有 partitioner
5. 每一个计算有优先计算位置
更多内容可以去看 Spark 的论文:
RDD 的操作
RDD 支持两种类型的操作:
transformation: 它从已存在的数据集中创建一个新的数据集. 它是懒执行的, 即生成 RDD 的所有操作都是懒执行的, 也就是说不会马上计算出结果, 它们只会记住它们依赖的基础数据集(文件, MQ 等等), 等到一个 action 需要结果返回到 driver 端的时候, 才会执行 transform 的计算. 这种设计使得 RDD 计算更加高效.
action: 它在数据集上运行计算之后给 driver 端返回一个值.
注意: reduce 是一个 action, 而 reduceByKey 则是一个 transform, 因为它返回的是一个分布式数据集, 并没有把数据返回给 driver 节点.
Action 函数
官方提供了 RDD 的 action 函数, 如下:
注意: 这只是常见的函数, 并没有列举所有的 action 函数.
Action 函数的特点
那么 action 函数有哪些特点呢?
根据上面介绍的, 即 action 会返回一个值给 driver 节点. 即它们的函数返回值是一个具体的非 RDD 类型的值或 Unit, 而不是 RDD 类型的值.
Transformation 函数
官方提供了 Transform 函数, 如下:
Transformation 函数的特点
上文提到, transformation 接收一个存在的数据集, 并将计算结果作为新的 RDD 返回. 也是就说, 它的返回结果是 RDD.
总结
其实, 理解了 action 和 transformation 的特点, 看函数的定义就知道是 action 还是 transformation.
RDD 的依赖关系
官方文档里, 聊完 RDD 的操作, 紧接着就聊了一下 shuffle, 我们按照这样的顺序来做一下说明.
Shuffle
官方给出的 shuffle 的解释如下:
注意: shuffle 是特定操作才会发生的事情, 这跟 action 和 transformation 划分没有关系.
官方给出了一些常见的例子.
Operations which can cause a shuffle include repartition operations like repartition and coalesce, ByKey operations (except for counting) like groupByKey and reduceByKey, and join operations like cogroup and join.
RDD 的四种依赖关系
那么 shuffle 跟什么有关系呢?
shuffle 跟依赖有关系. 在 spark 源码分析之一 -- RDD 的四种依赖关系 中, 说到 RDD 分为宽依赖和窄依赖, 其中窄依赖有三种, 一对一依赖, Range 依赖, Prune 依赖. 宽依赖只有一种, 那就是 shuffle 依赖.
即 RDD 跟父 RDD 的依赖关系是宽依赖, 那么就是父 RDD 在生成新的子 RDD 的过程中是存在 shuffle 过程的.
如图:
这张图也说明了一个结论, 并不是所有的 join 都是宽依赖.
依赖关系在源码中的体现
我们通常说的 RDD, 在 Spark 中具体表现为一个抽象类, 所有的 RDD 子类继承自该 RDD, 全称为 org.apache.spark.rdd.RDD, 如下:
它有两个参数, 一个参数是 SparkContext, 另一个是 deps, 即 Dependency 集合, Dependency 是所有依赖的公共父类, 即 deps 保存了父类的依赖关系.
其中, 窄依赖的父类是 NarrowDependency, 它的构造方法里是由父 RDD 这个参数的, 宽依赖 ShuffleDependency , 它的构造方法里也是有父 RDD 这个参数的.
RDD 依赖关系的不确定性
getDependencies 方法
获取抽象的方法是 getDependencies 方法, 如下:
这只是定义在 RDD 抽象父类中的默认方法, 不同的子类会有不同的实现.
它在如下类中又重新实现了这个方法, 如下:
是否是 shuffle 依赖, 跟分区的数量也有一定的关系, 具体可以看下面的几个 RDD 的依赖的实现:
CoGroupedRDD
SubtractedRDD
DAG 在 Spark 作业中的重要性
如下图, 一个 application 的执行过程被划分为四个阶段:
阶段一: 我们编写 driver 程序, 定义 RDD 的 action 和 transformation 操作. 这些依赖关系形成操作的 DAG.
阶段二: 根据形成的 DAG,DAGScheduler 将其划分为不同的 stage.
阶段三: 每一个 stage 中有一个 TaskSet,DAGScheduler 将 TaskSet 交给 TaskScheduler 去执行, TaskScheduler 将任务执行完毕之后结果返回给 DAGSCheduler.
阶段四: TaskScheduler 将任务分发到每一个 Worker 节点去执行, 并将结果返回给 TaskScheduler.
本篇文章的定位就是阶段一和阶段二. 后面会介绍阶段三和阶段四.
注: 图片不知出处.
DAG 的创建
我们先来分析一个 top N 案例.
一个真实的 TopN 案例
需求: 一个大文件里有很多的重复整数, 现在求出重复次数最多的前 10 个数.
代码如下(为了多几个 stage, 特意加了几个 repartition):
- scala> val sourceRdd = sc.textFile("/tmp/hive/hive/result",10).repartition(5)
- sourceRdd: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[5] at repartition at <console>:27
- scala> val allTopNs = sourceRdd.flatMap(line => line.split(" ")).map(Word => (Word, 1)).reduceByKey(_+_).repartition(10).sortByKey(ascending = true, 100).map(tup => (tup._2, tup._1)).mapPartitions(
- | iter => {
- | iter.toList.sortBy(tup => tup._1).takeRight(100).iterator
- |
- }
- | ).collect()
- // 结果略
- scala> val finalTopN = scala.collection.SortedMap.empty[Int, String].++(allTopNs)
- // 结果略
- scala> finalTopN.takeRight(10).foreach(tup => {
- println(tup._2 + "occurs times :" + tup._1)
- })
- 53 occurs times : 1070
- 147 occurs times : 1072
- 567 occurs times : 1073
- 931 occurs times : 1075
- 267 occurs times : 1077
- 768 occurs times : 1080
- 612 occurs times : 1081
- 877 occurs times : 1082
- 459 occurs times : 1084
- 514 occurs times : 1087
下面看一下生成的 DAG 和 Stage
任务概览
Description 描述的就是每一个 job 的最后一个方法.
stage 0 到 3 的 DAG 图:
stage 4 到 8 的 DAG 图:
每一个 stage 的 Description 描述的是 stage 的最后一个方法.
总结
可以看出, RDD 的依赖关系是有 driver 端对 RDD 的操作形成的.
一个 Stage 中 DAG 的是根据 RDD 的依赖来构建的.
我们来看一下源码.
Stage
定义
Stage 是一组并行任务, 它们都计算需要作为 Spark 作业的一部分运行的相同功能, 其中所有任务具有相同的 shuffle 依赖. 由调度程序运行的每个 DAG 任务在发生 shuffle 的边界处被分成多个阶段, 然后 DAGScheduler 以拓扑顺序运行这些阶段. 每个 Stage 都可以是一个 shuffle map 阶段, 在这种情况下, 其任务的结果是为其他阶段或结果阶段输入的, 在这种情况下, 其任务在 RDD 上运行函数直接计算 Spark action(例如 count(),save()等). 对于 shuffle map 阶段, 我们还跟踪每个输出分区所在的节点. 每个 stage 还有一个 firstJobId, 用于识别首次提交 stage 的作业. 使用 FIFO 调度时, 这允许首先计算先前作业的阶段, 或者在失败时更快地恢复. 最后, 由于故障恢复, 可以在多次尝试中重新执行单个 stage. 在这种情况下, Stage 对象将跟踪多个 StageInfo 对象以传递给 listener 或 web UI. 最近的一个将通过 latestInfo 访问.
构造方法
Stage 是一个抽象类, 构造方法如下:
参数介绍如下:
- id - Unique stage ID
- rdd - RDD that this stage runs on: for a shuffle map stage, it's the RDD we run map tasks on, while for a result stage, it's the target RDD that we ran an action on
- numTasks - Total number of tasks in stage; result stages in particular may not need to compute all partitions, e.g. for first(), lookup(), and take().
- parents - List of stages that this stage depends on (through shuffle dependencies).
- firstJobId - ID of the first job this stage was part of, for FIFO scheduling.
- callSite - Location in the user program associated with this stage: either where the target RDD was created, for a shuffle map stage, or where the action for a result stage was called
callSite 其实记录的就是 stage 用户代码的位置.
成员变量
成员方法
其实相对来说比较简单.
Stage 的子类
它有两个子类, 如下:
ResultStage
类说明:
- ResultStages apply a function on some partitions of an RDD to compute the result of an action.
- The ResultStage object captures the function to execute, func, which will be applied to each partition, and the set of partition IDs, partitions.
- Some stages may not run on all partitions of the RDD, for actions like first() and lookup().
ResultStage 在 RDD 的某些分区上应用函数来计算 action 操作的结果. 对于诸如 first()和 lookup()之类的操作, 某些 stage 可能无法在 RDD 的所有分区上运行.
简言之, ResultStage 是应用 action 操作在 action 上进而得出计算结果.
源码如下:
ShuffleMapStage
类说明
- ShuffleMapStages are intermediate stages in the execution DAG that produce data for a shuffle.
- They occur right before each shuffle operation, and might contain multiple pipelined operations before that (e.g. map and filter).
- When executed, they save map output files that can later be fetched by reduce tasks.
- The shuffleDep field describes the shuffle each stage is part of, and variables like outputLocs and numAvailableOutputs track how many map outputs are ready.
- ShuffleMapStages can also be submitted independently as jobs with DAGScheduler.submitMapStage.
- For such stages, the ActiveJobs that submitted them are tracked in mapStageJobs.
- Note that there can be multiple ActiveJobs trying to compute the same shuffle map stage.
ShuffleMapStage 是中间的 stage, 为 shuffle 生产数据. 它们在 shuffle 之前出现. 当执行完毕之后, 结果数据被保存, 以便 reduce 任务可以获取到.
构造方法
shuffleDep 记录了每一个 stage 所属的 shuffle.
Stage 的划分
在上面我们提到, 每一个 RDD 都有对父 RDD 的依赖关系, 这样的依赖关系形成了一个有向无环图. 即 DAG.
当一个用户在一个 RDD 上运行一个 action 时, 调度会检查 RDD 的血缘关系 (即依赖关系) 来创建一个 stage 中的 DAG 图来执行.
如下图:
在说 stage 划分之前先, 剖析一下跟 DAGScheduler 相关的类.
EventLoop
类说明
An event loop to receive events from the caller and process all events in the event thread. It will start an exclusive event thread to process all events.
Note: The event queue will grow indefinitely. So subclasses should make sure onReceive can handle events in time to avoid the potential OOM.
它定义了异步消息处理机制框架.
消息队列
其内部有一个阻塞双端队列, 用于存放消息:
post 到消息队列
外部线程调用 post 方法将事件 post 到堵塞队列中:
消费线程
有一个消息的消费线程:
onReceive 方法是一个抽象方法, 由子类来实现.
下面来看其实现类 -- DAGSchedulerEventProcessLoop.
其接收的是 DAGSchedulerEvent 类型的事件. DAGSchedulerEvent 是一个 sealed trait, 其实现如下:
它的每一个子类事件, 在 doOnReceive 方法中都有体现, 如下:
DAGScheduler
这个类的定义已经超过 2k 行了. 所以也不打算全部介绍, 本篇文章只介绍跟 stage 任务的生成相关的属性和方法.
类说明
- The high-level scheduling layer that implements stage-oriented scheduling. It computes a DAG of stages for each job, keeps track of which RDDs and stage outputs are materialized, and finds a minimal schedule to run the job. It then submits stages as TaskSets to an underlying TaskScheduler implementation that runs them on the cluster. A TaskSet contains fully independent tasks that can run right away based on the data that's already on the cluster (e.g. map output files from previous stages), though it may fail if this data becomes unavailable.
- Spark stages are created by breaking the RDD graph at shuffle boundaries. RDD operations with "narrow" dependencies, like map() and filter(), are pipelined together into one set of tasks in each stage, but operations with shuffle dependencies require multiple stages (one to write a set of map output files, and another to read those files after a barrier). In the end, every stage will have only shuffle dependencies on other stages, and may compute multiple operations inside it. The actual pipelining of these operations happens in the RDD.compute() functions of various RDDs
In addition to coming up with a DAG of stages, the DAGScheduler also determines the preferred locations to run each task on, based on the current cache status, and passes these to the low-level TaskScheduler. Furthermore, it handles failures due to shuffle output files being lost, in which case old stages may need to be resubmitted. Failures within a stage that are not caused by shuffle file loss are handled by the TaskScheduler, which will retry each task a small number of times before cancelling the whole stage. When looking through this code, there are several key concepts:
- Jobs (represented by ActiveJob) are the top-level work items submitted to the scheduler. For example, when the user calls an action, like count(), a job will be submitted through submitJob. Each Job may require the execution of multiple stages to build intermediate data.
- Stages (Stage) are sets of tasks that compute intermediate results in jobs, where each task computes the same function on partitions of the same RDD. Stages are separated at shuffle boundaries, which introduce a barrier (where we must wait for the previous stage to finish to fetch outputs). There are two types of stages: ResultStage, for the final stage that executes an action, and ShuffleMapStage, which writes map output files for a shuffle. Stages are often shared across multiple jobs, if these jobs reuse the same RDDs.
- Tasks are individual units of work, each sent to one machine.
- Cache tracking: the DAGScheduler figures out which RDDs are cached to avoid recomputing them and likewise remembers which shuffle map stages have already produced output files to avoid redoing the map side of a shuffle.
- Preferred locations: the DAGScheduler also computes where to run each task in a stage based on the preferred locations of its underlying RDDs, or the location of cached or shuffle data.
- Cleanup: all data structures are cleared when the running jobs that depend on them finish, to prevent memory leaks in a long-running application.
- To recover from failures, the same stage might need to run multiple times, which are called "attempts". If the TaskScheduler reports that a task failed because a map output file from a previous stage was lost, the DAGScheduler resubmits that lost stage. This is detected through a CompletionEvent with FetchFailed, or an ExecutorLost event. The DAGScheduler will wait a small amount of time to see whether other nodes or tasks fail, then resubmit TaskSets for any lost stage(s) that compute the missing tasks. As part of this process, we might also have to create Stage objects for old (finished) stages where we previously cleaned up the Stage object. Since tasks from the old attempt of a stage could still be running, care must be taken to map any events received in the correct Stage object.
- Here's a checklist to use when making or reviewing changes to this class:
- All data structures should be cleared when the jobs involving them end to avoid indefinite accumulation of state in long-running programs.
- When adding a new data structure, update DAGSchedulerSuite.assertDataStructuresEmpty to include the new structure. This will help to catch memory leaks.
下面直接来看 stage 的划分
从源码看 Stage 的划分
从 action 函数到 DAGScheduler
以 collect 函数为例.
collect 函数定义如下:
其调用了 SparkContext 的 runJob 方法, 又调用了几次其重载方法最终调用的 runJob 方法如下:
其内部调用了 DAGScheduler 的 runJob 方法
DAGScheduler 对 stage 的划分
DAGScheduler 的 runJob 方法如下:
思路, 提交方法后返回一个 JobWaiter 对象, 等待任务执行完成, 然后根据任务执行状态去执行对应的成功或失败的方法.
submitJob 如下:
最终任务被封装进了 JobSubmitted 事件消息体中, 最终该事件消息被放入了 eventProcessLoop 对象中, eventProcessLoop 定义如下:
即事件被放入到了上面我们提到的 DAGSchedulerEventProcessLoop 异步消息处理模型中.
DAGSchedulerEventProcessLoop 的 doOnReceive 中, 发现了 JobSubmitted 事件对应的分支为:
即会执行 DAGScheduler 的 handleJobSubmitted 方法, 如下:
这个方法里面有两步:
创建 ResultStage
提交 Stage
本篇文章, 我们只分析第一步, 第二步在下篇文章分析.
createResultStage 方法如下:
getOrCreateParentStage 方法创建或获取该 RDD 的 Shuffle 依赖关系, 然后根据 shuffle 依赖进而划分 stage, 源码如下:
获取其所有父类的 shuffle 依赖, getShuffleDependency 方法如下, 类似于树的深度遍历.
getOrCreateShuffleMapStage 方法根据 shuffle 依赖创建 ShuffleMapStage, 如下, 思路, 先查看当前 stage 是否已经记录在 shuffleIdToMapStage 变量中, 若存在, 表示已经创建过了, 否则需要根据依赖的 RDD 去找其 RDD 的 shuffle 依赖, 然后再创建 shuffleMapStage.
shuffleIdToMapStage 定义如下:
这个 map 中只包含正在运行的 job 的 stage 信息.
其中 shuffle 依赖的唯一 id 是: shuffleId, 这个 id 是 SpackContext 生成的全局 shuffleId.
getMissingAncestorShuffleDependencies 方法如下, 思路: 深度遍历依赖关系, 把所有未运行的 shuffle 依赖都找到.
到此, 所有寻找 shuffle 依赖关系的的逻辑都已经剖析完毕, 下面看创建 MapShuffleStage 的方法,
思路: 生成 ShuffleMapStage, 并更新 stageIdToStage 变量, 更新 shuffleIdToMapStage 变量, 如果 MapOutputTrackerMaster 中没有注册过该 shuffle, 需要注册, 最后返回 ShuffleMapStage 对象.
updateJobIdStageIdMaps 方法如下, 思路该 ResultStage 依赖的所有 ShuffleMapStage 的 jobId 设定为指定的 jobId, 即跟 ResultStage 一致的 jobId:
至此, stage 的划分逻辑剖析完毕.
总结
本篇文章对照官方文档, 说明了 RDD 的主要操作, action 和 transformation, 进一步引出了 RDD 的依赖关系, 最后剖析了 DAGScheduler 根据 shuffle 依赖划分 stage 的逻辑.
注: 文章中图片来源于 Spark 论文, 论文地址:
来源: https://www.cnblogs.com/johnny666888/p/11233982.html