war result onf blog 超过 lsit fde mark ensure
一个 job 的生命历程
dagScheduler.runJob //(1)
-->submitJob(eventProcessLoop.post(JobSubmitted, ***) //(2)
-->eventProcessLoop //(3)
-->onReceive(event: DAGSchedulerEvent) //(4)
-->doOnReceive(event: DAGSchedulerEvent) //(5)
-->
case JobSubmitted //(6)
-->dagScheduler.handleJobSubmitted //(7)
-->finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite) //(8)
-->job = new ActiveJob(jobId, finalStage, callSite, listener, properties) //(9)
-->jobIdToActiveJob(jobId) = job //(10)
-->activeJobs += job //(11)
-->finalStage.setActiveJob(job) //(12)
-->stageIds = jobIdToStageIds(jobId).toArray //(13)
-->stageInfos = stageIds.flatMap(id = >stageIdToStage.get(id).map(_.latestInfo)) //(14)
-->listenerBus.post(SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties)) //(15)
-->submitStage(finalStage) //(16)
-->getMissingParentStages(stage).sortBy(_.id) //(17)
-->finalStage = getOrCreateShuffleMapStage(dependency, jobId) //(18)
-->createShuffleMapStage(dep, firstJobId) //(19)
-->stage = new ShuffleMapStage(id, rdd, numTasks, parents, jobId, rdd.creationSite, shuffleDep)-->job = new ActiveJob(jobId, finalStage, callSite, listener, properties) //(20)
-->submitStage(finalStage) //(21)//划分和提交stage算法精髓
-->submitMissingTasks(stage, jobId.get) //(22)
-->submitWaitingChildStages(stage) //(23)
-->markMapStageJobAsFinished(job, mapOutputTracker.getStatistics(dependency)) //(24)
(1)所有的 action 算子都会触发一个 job 的调度,经过多次不同的 runjob 重载后停在这里调度 submitJob
(2)调用 eventProcessLoop 方法,并发送 JobSubmitted 消息给 DAGSchedulerEventProcessLoop(DAGScheduler 的循环响应函数体)
(3)eventProcessLoop = new DAGSchedulerEventProcessLoop(this)
(4)onReceive 函数是接受 DAGSchedulerEventProcessLoop DAG 调度程序的事件接受函数
(5)doOnReceive 实际是步骤 4 的事件处理函数
(6)根据步骤 2 的发送事件,触发 JobSubmitted 这个事件响应
(7)dagScheduler 的核心入口
(8)使用触发的 job 的最后一个 RDD 创建一个 finalstage,并且放入内存缓存中 stageIdToStage
(9)使用 finalStage 创建一个 job.这个 job 最后一个 stage 就是 final stage
(10)(11)(12)(13)(14)(15)把 job 加入各种内存缓存中,其实就是各个数据结构
(16)提交 fianlStage.总是从最后开始往前推测.
(17)获取当前 stage 的父 stage.stage 的划分算法,主要在这里.waitingForVisit = new Stack[RDD[_]].栈结构,从最后的 stage 往前的 stage 放进栈中,实现先进后出.符合程序调用顺序.
(18)获取最后一个 stage,finalstage
(19)生成一个 ShuffleMapStage
(20)利用 finalestage 生成一个 job
(21)划分和提交 stage 算法精髓,划分好 stage 之后全部放在 waiting stage 数据结构中
(22)提交所有在 waiting stage 中的 stage,从 stage0...finalstage
(23)检查等待的阶段,现在有资格重新提交.提交依赖于给定父级阶段的阶段.当父阶段完成时调用成功
(24)所有的 stage 划分完并提交结束
------------------------------------------------------------------------------
stage 划分算法非常重要,精通 spark,必须对 stage 划分算法很清晰,知道自己编写的 spark 程序被划分为几个 job,每个 job 被划分为几个 stage,
每个 stage 包含了哪些代码,只有知道每个 stage 包括哪些代码后.在线上,如果发现某个 stage 执行特别慢,或者某个 stage 一直报错,才能针对
特定的 stage 包含的代码排查问题,或性能调优.
stage 划分算法总结:
1. 从 finalstage 倒推(通过 栈 数据结构实现)
2. 通过宽依赖,进行 stage 的划分
3. 通过递归,优先提交父 stage
------------------------------------------------------------------------------
/**
* 获取某个stage的父stage
* 对于一个stage,如果它的最后一个RDD的所有依赖都是窄依赖,将不会创建新的stage
* 如果其RDD会依赖某个RDD,用宽依赖的RDD创建一个新的stage,并立即返回这个stage
* @type {[type]}
*/
private def getMissingParentStages(stage: Stage) : List[Stage] = {
val missing = new HashSet[Stage] val visited = new HashSet[RDD[_]] val waitingForVisit = new Stack[RDD[_]] def visit(rdd: RDD[_]) {
if (!visited(rdd)) {
visited += rdd val rddHasUncachedPartitions = getCacheLocs(rdd).contains(Nil) if (rddHasUncachedPartitions) {
//遍历RDD的依赖,对于每种具有shuffle的操作,如reduceByKey,groupByKey,countByKey,底层对应了3个RDD:
//Map
for (dep < -rdd.dependencies) {
dep match {
//如果是宽依赖
case shufDep:
ShuffleDependency[_, _, _] = >
//使用宽依赖的RDD创建一个 ShuffleMapStage,并且将isShuffleMap 设置为true,
//默认最后一个stage不是shuffle不是ShuffleMapStage,但是finalstage之前所有的stage都是ShuffleMapStage
val mapStage = getOrCreateShuffleMapStage(shufDep, stage.firstJobId) if (!mapStage.isAvailable) {
missing += mapStage
}
//如果是窄依赖
case narrowDep:
NarrowDependency[_] = >
//将依赖的RDD放入栈中
waitingForVisit.push(narrowDep.rdd)
}
}
}
}
}
//
waitingForVisit.push(stage.rdd) while (waitingForVisit.nonEmpty) {
//
visit(waitingForVisit.pop())
}
missing.toList
}-------------------------------------------------------------------------------------------------------------------------------taskScheduler
-->taskSchedulerImpl (standalone 模式)
-->SparkDeploySchedulerBackend (负责创建 AppClient, 向 master 注册 Application)
在 TaskSchedulerImpl 中, 对一个单独的 taskset 的任务进行调度. 这个类负责追踪每一个 taskset, 如果 task 失败的话
会负责重试 spark, 直到超过重试次数, 并且会通知延迟调度, 为这个 taskSet 处理本地化机制. 它的主要接口是
resourceOffer, 在这个接口中, taskset 会希望在一个节点上运行一个任务, 并且接受任务的状态改变消息,
来知道它负责的 task 的状态改变了.
override def submitTasks(taskSet: TaskSet) {
val tasks = taskSet.tasks //获取ttaskSet的task列表
logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks") this.synchronized {
//每个taskSet都会创建一个manager,用于管理每个taskSet,并设定最大失败次数 maxTaskFailures
val manager = createTaskSetManager(taskSet, maxTaskFailures) val stage = taskSet.stageId
//尝试连接task,如果task失败,会负责重试spark,直到超过重试次数,并且会通知延迟调度
val stageTaskSets = taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])
//通过 manager 获得活着的taskSet
stageTaskSets(taskSet.stageAttemptId) = manager val conflictingTaskSet = stageTaskSets.exists {
case(_, ts) = >ts.taskSet != taskSet && !ts.isZombie
}
if (conflictingTaskSet) {
throw new IllegalStateException(s "more than one active taskSet for stage $stage:" + s " ${stageTaskSets.toSeq.map{_._2.taskSet.id}.mkString(", ")}")
}
//利用已选择的调度器schedulableBuilder,把一个taskSet的manager加入调度管理池中
/*
def initialize(backend: SchedulerBackend) {
this.backend = backend
schedulableBuilder = {
schedulingMode match {
case SchedulingMode.FIFO =>
new FIFOSchedulableBuilder(rootPool)
case SchedulingMode.FAIR =>
new FairSchedulableBuilder(rootPool, conf)
case _ =>
throw new IllegalArgumentException(s"Unsupported $SCHEDULER_MODE_PROPERTY: " +
s"$schedulingMode")
}
}
schedulableBuilder.buildPools()
}*/
schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties) if (!isLocal && !hasReceivedTask) {
starvationTimer.scheduleAtFixedRate(new TimerTask() {
override def run() {
if (!hasLaunchedTask) {
logWarning("Initial job has not accepted any resources; " + "check your cluster UI to ensure that workers are registered " + "and have sufficient resources")
} else {
this.cancel()
}
}
},
STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS)
}
hasReceivedTask = true
}
/**
* 创建 taskScheduler 的时候,就是为 taskSchedulerImpl 创建一个 SparkDeploySchedulerBackend .
* 它负责创建AppClient,向master注册Application
*/
backend.reviveOffers()
}
一个 Spark job 的生命历程
来源: http://www.bubuko.com/infodetail-2469036.html