DAGScheduler 的主要任务是基于 Stage 构建 DAG,决定每个任务的最佳位置
- 记录哪个 RDD 或者 Stage 输出被物化
- 面向 stage 的调度层。为 job 生成以 stage 组成的 DAG。提交 TaskSet 给 TaskScheduler 运行
- 又一次提交 shuffle 输出丢失的 stage
每个 Stage 内。都是独立的 tasks,他们共同运行同一个 computefunction,享有同样的 shuffledependencies。DAG 在切分 stage 的时候是按照出现 shuffle 为界限的。
以下的代码是 SparkContext 实例化 DAGScheduler 的过程:
- @volatile private[spark]
- var dagScheduler: DAGScheduler = _
- try {
- dagScheduler = new DAGScheduler(this)
- } catch {
- case e:
- Exception = >{
- try {
- stop()
- } finally {
- throw new SparkException("Error while constructing DAGScheduler", e)
- }
- }
- }
以下代码显示了 DAGScheduler 的构造函数定义中,通过绑定 TaskScheduler 的方式创建,当中次构造函数去调用主构造函数来将 sc 的字段填充入參:
- private[spark]
- class DAGScheduler(
- private[scheduler] val sc: SparkContext,
- private[scheduler] val taskScheduler: TaskScheduler,
- listenerBus: LiveListenerBus,
- mapOutputTracker: MapOutputTrackerMaster,
- blockManagerMaster: BlockManagerMaster,
- env: SparkEnv,
- clock: Clock = new SystemClock())
- extends Logging {
- def this(sc: SparkContext, taskScheduler: TaskScheduler) = {
- this(
- sc,
- taskScheduler,
- sc.listenerBus,
- sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster],
- sc.env.blockManager.master,
- sc.env)
- }
- def this(sc: SparkContext) = this(sc, sc.taskScheduler)
Action 的大部分操作会进行作业(job)的提交,源代码 1.0 版的 job 提交过程的大致调用链是:
–>
- sc.runJob()
–>
- dagScheduler.runJob
—>
- dagScheduler.submitJob
–>
- dagSchedulerEventProcessActor.JobSubmitted
–>
- dagScheduler.handleJobSubmitted
–>
- dagScheduler.submitStage
–>
- dagScheduler.submitMissingTasks
。 详细的作业提交运行期的函数调用为:
- taskScheduler.submitTasks
- sc.runJob->dagScheduler.runJob->submitJob
- DAGScheduler::submitJob 会创建 JobSummitted 的 event 发送给内嵌类 eventProcessActor(在源代码 1.4 中,submitJob 函数中,使用 DAGSchedulerEventProcessLoop 类进行事件的处理)
- eventProcessActor 在接收到 JobSubmmitted 之后调用 processEvent 处理函数
- job 到 stage 的转换,生成 finalStage 并提交运行。关键是调用 submitStage
- 在 submitStage 中会计算 stage 之间的依赖关系,依赖关系分为宽依赖和窄依赖两种
- 假设计算中发现当前的 stage 没有不论什么依赖或者全部的依赖都已经准备完毕,则提交 task
- 提交 task 是调用函数 submitMissingTasks 来完毕
- task 真正运行在哪个 worker 上面是由 TaskScheduler 来管理,也就是上面的 submitMissingTasks 会调用 TaskScheduler::submitTasks
- TaskSchedulerImpl 中会依据 Spark 的当前运行模式来创建对应的 backend, 假设是在单机运行则创建 LocalBackend
- LocalBackend 收到 TaskSchedulerImpl 传递进来的 ReceiveOffers 事件
- receiveOffers->executor.launchTask->TaskRunner.run
DAGScheduler.runjob 最后把结果通过 resultHandler 保存返回。
这里 DAGScheduler 的 runJob 函数调用 DAGScheduler 的 submitJob 函数来提交任务:
- def runJob[T, U: ClassTag](
- rdd: RDD[T],
- func: (TaskContext, Iterator[T]) => U,
- partitions: Seq[Int],
- callSite: CallSite,
- allowLocal: Boolean,
- resultHandler: (Int, U) => Unit,
- properties: Properties): Unit = {
- val start = System.nanoTime
- val waiter = submitJob(rdd, func, partitions, callSite, allowLocal, resultHandler, properties)
- waiter.awaitResult() match {
- case JobSucceeded => {
- logInfo("Job %d finished: %s, took %f s".format
- (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
- }
- case JobFailed(exception: Exception) =>
- logInfo("Job %d failed: %s, took %f s".format
- (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
- throw exception
- }
- }
在 Spark 源代码 1.4.0 中,DAGScheduler 的 submitJob 函数不再使用 DAGEventProcessActor 进行事件处理和消息通信,而是使用 DAGSchedulerEventProcessLoop 类实例 eventProcessLoop 进行 JobSubmitted 事件的 post 动作。
以下是 submitJob 函数代码:
- /**
- * Submit a job to the job scheduler and get a JobWaiter object back. The JobWaiter object
- * can be used to block until the the job finishes executing or can be used to cancel the job.
- */
- def submitJob[T, U](
- rdd: RDD[T],
- func: (TaskContext, Iterator[T]) => U,
- partitions: Seq[Int],
- callSite: CallSite,
- allowLocal: Boolean,
- resultHandler: (Int, U) => Unit,
- properties: Properties): JobWaiter[U] = {
- // Check to make sure we are not launching a task on a partition that does not exist.
- val maxPartitions = rdd.partitions.length
- partitions.find(p => p >= maxPartitions || p < 0).foreach { p =>
- throw new IllegalArgumentException(
- "Attempting to access a non-existent partition: " + p + ". " +
- "Total number of partitions: " + maxPartitions)
- }
- val jobId = nextJobId.getAndIncrement()
- if (partitions.size == 0) {
- return new JobWaiter[U](this, jobId, 0, resultHandler)
- }
- assert(partitions.size > 0)
- val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
- val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
- eventProcessLoop.post(JobSubmitted(
- jobId, rdd, func2, partitions.toArray, allowLocal, callSite, waiter, properties))
- waiter
- }
当 eventProcessLoop 对象投递了 JobSubmitted 事件之后,对象内的 eventThread 线程实例对事件进行处理。不断从事件队列中取出事件,调用 onReceive 函数处理事件。当匹配到 JobSubmitted 事件后。调用 DAGScheduler 的 handleJobSubmitted 函数并传入 jobid、rdd 等參数来处理 Job。
Job 处理过程中 handleJobSubmitted 比較关键,该函数主要负责 RDD 的依赖性分析。生成 finalStage,并依据 finalStage 来产生 ActiveJob。
在 handleJobSubmitted 函数源代码中。给出了部分凝视:
- private[scheduler] def handleJobSubmitted(jobId: Int,
- finalRDD: RDD[_],
- func: (TaskContext, Iterator[_]) => _,
- partitions: Array[Int],
- allowLocal: Boolean,
- callSite: CallSite,
- listener: JobListener,
- properties: Properties) {
- var finalStage: Stage = null
- try {
- // New stage creation may throw an exception if, for example, jobs are run on a
- // HadoopRDD whose underlying HDFS files have been deleted.
- finalStage = newStage(finalRDD, partitions.size, None, jobId, callSite)
- } catch {
- //错误处理,告诉监听器作业失败。返回....
- case e: Exception =>
- logWarning("Creating new stage failed due to exception - job: " + jobId, e)
- listener.jobFailed(e)
- return
- }
- if (finalStage != null) {
- val job = new ActiveJob(jobId, finalStage, func, partitions, callSite, listener, properties)
- clearCacheLocs()
- logInfo("Got job %s (%s) with %d output partitions (allowLocal=%s)".format(
- job.jobId, callSite.shortForm, partitions.length, allowLocal))
- logInfo("Final stage: " + finalStage + "(" + finalStage.name + ")")
- logInfo("Parents of final stage: " + finalStage.parents)
- logInfo("Missing parents: " + getMissingParentStages(finalStage))
- val shouldRunLocally =
- localExecutionEnabled && allowLocal && finalStage.parents.isEmpty && partitions.length == 1
- val jobSubmissionTime = clock.getTimeMillis()
- if (shouldRunLocally) {
- // 非常短、没有父stage的本地操作,比方 first() or take() 的操作本地运行
- // Compute very short actions like first() or take() with no parent stages locally.
- listenerBus.post(
- SparkListenerJobStart(job.jobId, jobSubmissionTime, Seq.empty, properties))
- runLocally(job)
- } else {
- // collect等操作走的是这个过程。更新相关的关系映射,用监听器监听,然后提交作业
- jobIdToActiveJob(jobId) = job
- activeJobs += job
- finalStage.resultOfJob = Some(job)
- val stageIds = jobIdToStageIds(jobId).toArray
- val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
- listenerBus.post(
- SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
- // 提交stage
- submitStage(finalStage)
- }
- }
- // 提交stage
- submitWaitingStages()
- }
该篇文章介绍了 DAGScheduler 从 SparkContext 中进行实例化,到运行 Action 操作时提交任务调用 runJob 函数,进而介绍了提交任务的消息调度,和处理 Job 函数 handleJobSubmitted 函数。
因为在 handleJobSubmitted 函数中涉及到依赖性分析和 stage 的源代码内容,于是我计划在下一篇文章里进行介绍和源代码分析。
来源: http://www.bubuko.com/infodetail-2157443.html