TaskSchedulerImpl
上一篇讲到 DAGScheduler 根据 shuffle 依赖对作业的整个计算链划分成多个 stage 之后, 就开始提交最后一个 ResultStage, 而由于 stage 之间的依赖关系, 实际上最终是循着计算链从上到下依次提交 stage 的. 每提交一个 stage, 就会将这个 stage 分成多个 Task, 并且会计算每个 Task 的偏向位置, 将 RDD 和 ShuffleDependency,TaskMetrics 等对象序列化用于远程传输, 最后把一个 stage 的所有 Task 包装成一个任务集, 提交给 TaskSchedulerImpl 运行. 本节就来分析一下这个 TaskSchedulerImpl. 首先把 TaskSchedulerImpl 的说明翻译一下:
TaskSchedulerImpl 的主要作用是调度 Task, 内部通过调度后端进行实际任务的传输. 不同的集群类型对应不同的具体的调度后端的实现, 例如本地模式的调度后端实现是 LocalSchedulerBackend, 而任务调度器的实现只有一种就是 TaskSchedulerImpl.TaskSchedulerImpl 主要处理一些通用的逻辑, 例如在多个作业之间决定调度顺序, 执行推测执行的逻辑等等.
TaskSchedulerImpl 在使用之前应该先调用 initialize() 和 start()方法, 然后再提交任务集.
这里插一句, 这两个方法分别在上面地方调用呢? 都是在 SparkContext 初始化的时候调用的, 具体可以看 SparkContext 初始化代码.
关于线程安全的一些提示: 由于会存在多线程同时提交任务的情况, 所以面向外部的 public 的方法必须加锁以保持内部状态量, 簿记量的一致性.
此外, 一些调度后端 (SchedulerBackend) 的方法会先获取自身的锁, 然后获取 TaskSchedulerImpl 对象的锁, 所有应该避免在持有 TaskSchedulerImpl 对象的锁的情况下再尝试获取调度后端的锁, 这样会造成死锁. 实际上这句话的意思就是因为有些操作需要同时持有调度后端的锁和 TaskSchedulerImpl 锁, 对于这种需要同时持有多把锁的情况, 应该保持获取锁的顺序是一致的, 这样就能避免出现死锁的情况.
举个例子: 有些操作要同时获取 A 锁和 B 锁, 如果方法 m1 的获取顺序是先获取 A 锁然后获取 B 锁, 而 m2 是先 B 后 A, 如果同时又两个线程 t1,t2 分别执行方法 m1 和 m2, 有可能在某个时刻形成这样的情况: t1 获取了 A 在等待 B, 而 t2 获取了 B 在等待 A, 这样互相等待而又不释放锁就形成死锁.
好了, 接下来我们接着任务提交的逻辑继续分析.
- submitTasks
- override def submitTasks(taskSet: TaskSet) {
- val tasks = taskSet.tasks
- logInfo("Adding task set" + taskSet.id + "with" + tasks.length + "tasks")
- // 加锁, 更新一些簿记量和状态量
- this.synchronized {
- // 创建一个 TaskSetManager, 对 Task 集的进一步封装
- val manager = createTaskSetManager(taskSet, maxTaskFailures)
- val stage = taskSet.stageId
- // 更新 stageId 和 TaskSetManager 的映射关系, 由于失败重试机制, 一个 stage 可能会被多次尝试
- val stageTaskSets =
- taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])
- // 将本次尝试的 TaskSetManager 添加到映射中
- stageTaskSets(taskSet.stageAttemptId) = manager
- // 检测是否有还在运行的 stage 尝试, 如果有就是重复提交了, 需要抛出一个异常
- val conflictingTaskSet = stageTaskSets.exists { case (_, ts) =>
- ts.taskSet != taskSet && !ts.isZombie
- }
- if (conflictingTaskSet) {
- throw new IllegalStateException(s"more than one active taskSet for stage $stage:" +
- s"${stageTaskSets.toSeq.map{_._2.taskSet.id}.mkString(",")}")
- }
- // 将新创建的 TaskSetManager 添加到调度池中, 调度池决定了如果存在多个 TaskSet 在排队应该如何进行排序,
- // 可以通过 taskSet.properties 设置队列名称, taskSet.properties 是通过 SparkContext 的一个 ThreadLocal 变量设置的
- schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
- // 启动一个定时任务打印提示信息
- if (!isLocal && !hasReceivedTask) {
- starvationTimer.scheduleAtFixedRate(new TimerTask() {
- override def run() {
- if (!hasLaunchedTask) {
- logWarning("Initial job has not accepted any resources;" +
- "check your cluster UI to ensure that workers are registered" +
- "and have sufficient resources")
- } else {
- this.cancel()
- }
- }
- }, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS)
- }
- hasReceivedTask = true
- }
- // 到这里, 锁已经释放
- // 通知调度后端有任务可以运行了,
- backend.reviveOffers()
- }
获取锁, 更新一些簿记量
将新的任务集封装为 TaskSetManager 添加到调度池中
调用 SchedulerBackEnd, 给任务分配可用资源
- CoarseGrainedSchedulerBackend.reviveOffers
- override def reviveOffers() {
- driverEndpoint.send(ReviveOffers)
- }
这个方法通过 rpc 模块给 DriverEndPoint 发送一个消息, 本地进程内调用 rpc 方法, 主要是为了代码模块的统一. 在 DriverEndpoint.receive 方法中, 我们可以看到, 在接收到 ReviveOffers 消息后, 就会调用 makeOffers 方法,
- DriverEndpoint.makeOffers
- private def makeOffers() {
- // Make sure no executor is killed while some task is launching on it
- // 获取锁, 同步
- val taskDescs = CoarseGrainedSchedulerBackend.this.synchronized {
- // Filter out executors under killing
- // 过滤掉正在被杀死的 executor
- val activeExecutors = executorDataMap.filterKeys(executorIsAlive)
- // 把所有可用的 executor 封装成资源对象
- val workOffers = activeExecutors.map {
- case (id, executorData) =>
- new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
- }.toIndexedSeq
- // 把这些可用的资源交给 TaskSchedulerImpl 进行调度
- // TaskSchedulerImpl 会综合考虑任务本地性, 黑名单, 调度池的调度顺序等因素, 返回 TaskDescription 集合
- // TaskDescription 对象是对一个 Task 的完整描述,
- // 包括序列化的任务数据, 任务在哪个 executor 上运行, 依赖文件和 jar 包等信息
- scheduler.resourceOffers(workOffers)
- }
- if (!taskDescs.isEmpty) {
- launchTasks(taskDescs)
- }
- }
这个方法主要是将当前所有可用的资源 (executor) 封装成资源对象 (WorkerOffer) 交给 TaskSchedulerImpl,TaskSchedulerImpl 会综合考虑任务本地性, 黑名单, 调度池的调度顺序等因素, 返回 TaskDescription 集合, TaskDescription 对象是对一个 Task 的完整描述, 包括序列化的任务数据, 任务在哪个 executor 上运行, 依赖文件和 jar 包等信息.
从这里也可以看出, 调度后端的职责其实相对比较少主要是对 executor 的管理, 以及调用 rpc 远端服务的引用发送任务数据, 大部分的调度工作还是由 TaskSchedulerImpl 来完成.
接下来我们分析一下 Task 调度最重要的一个方法, TaskSchedulerImpl.resourceOffers
- TaskSchedulerImpl.resourceOffers
- // 这个方法由调度后端调用, 调度后端会将可用的 executor 资源告诉 TaskSchedulerImpl,
- // TaskSchedulerImpl 根据 TaskSet 优先级(调度池), 黑名单, 本地性等因素给出要实际运行的任务.
- // 我们使用 round-robin 的方式将任务分配到各个 executor 上, 以使得计算资源的 使用更均衡.
- def resourceOffers(offers: IndexedSeq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {
- // Mark each slave as alive and remember its hostname
- // Also track if new executor is added
- // 标记是否有新的可用 executor 加入
- var newExecAvail = false
- // 这个循环主要目的是两个:
- // 1. 更新一些簿记量, 如物理节点和 executor 的相互映射关系, 机架和 host 的映射关系, host 和 executor 上运行的任务信息等等
- // 2. 检查是否有新的可用 executor 加入
- for (o <- offers) {
- if (!hostToExecutors.contains(o.host)) {
- hostToExecutors(o.host) = new HashSet[String]()
- }
- if (!executorIdToRunningTaskIds.contains(o.executorId)) {
- hostToExecutors(o.host) += o.executorId
- executorAdded(o.executorId, o.host)
- executorIdToHost(o.executorId) = o.host
- executorIdToRunningTaskIds(o.executorId) = HashSet[Long]()
- newExecAvail = true
- }
- for (Rack <- getRackForHost(o.host)) {
- hostsByRack.getOrElseUpdate(Rack, new HashSet[String]()) += o.host
- }
- }
- // Before making any offers, remove any nodes from the blacklist whose blacklist has expired. Do
- // this here to avoid a separate thread and added synchronization overhead, and also because
- // updating the blacklist is only relevant when task offers are being made.
- // 触发黑名单的超时检查, 被加入黑明单的节点或 executor 是由一定超时时间的,
- // 在超时时间内不能像他们提交任务, 而过了超时时间, 这些资源将被重新投入使用
- blacklistTrackerOpt.foreach(_.applyBlacklistTimeout())
- // 根据最新的黑名单过滤掉在黑名单中的计算资源, 包括 host 和 executor
- val filteredOffers = blacklistTrackerOpt.map { blacklistTracker =>
- offers.filter { offer =>
- !blacklistTracker.isNodeBlacklisted(offer.host) &&
- !blacklistTracker.isExecutorBlacklisted(offer.executorId)
- }
- }.getOrElse(offers)
- // 对资源进行混洗, 使得分配更加均匀, 使用 scala 库的 Random 进行混洗
- val shuffledOffers = shuffleOffers(filteredOffers)
- // Build a list of tasks to assign to each worker.
- // 每个 executor 能分配多少个任务, cores / CPUS_PER_TASK
- val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores / CPUS_PER_TASK))
- // 每个 executor 提供的 CPU 核数
- val availableCpus = shuffledOffers.map(o => o.cores).toArray
- // 通过调度池对所有的任务集按优先级进行排序, 获取排序后的任务集
- val sortedTaskSets = rootPool.getSortedTaskSetQueue
- // 如果有新的 executor 加入, 需要通知每个 TaskSetManager
- for (taskSet <- sortedTaskSets) {
- logDebug("parentName: %s, name: %s, runningTasks: %s".format(
- taskSet.parent.name, taskSet.name, taskSet.runningTasks))
- if (newExecAvail) {
- taskSet.executorAdded()
- }
- }
- // Take each TaskSet in our scheduling order, and then offer it each node in increasing order
- // of locality levels so that it gets a chance to launch local tasks on all of them.
- // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY
- // 对于每一个任务集, 为其分配资源
- for (taskSet <- sortedTaskSets) {
- var launchedAnyTask = false
- var launchedTaskAtCurrentMaxLocality = false
- // 本地性从低到高的顺序
- for (currentMaxLocality <- taskSet.myLocalityLevels) {
- // 每个本地性级别会进行多轮分配,
- // 每一轮依次轮询每个 executor, 每个 executor 分配一个任务,
- // 这样一轮下来每个 executor 都会分配到一个任务, 显然大多数情况下, executor 的资源是不会被占满的
- // 没关系, 我们会接着进行第二轮分配, 知道没有资源或者在当前的本地性级别下任务被分配完了, 就跳出循环
- do {
- launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet(
- taskSet, currentMaxLocality, shuffledOffers, availableCpus, tasks)
- launchedAnyTask |= launchedTaskAtCurrentMaxLocality
- } while (launchedTaskAtCurrentMaxLocality)
- }
- if (!launchedAnyTask) {
- taskSet.abortIfCompletelyBlacklisted(hostToExecutors)
- }
- }
- if (tasks.size> 0) {
- hasLaunchedTask = true
- }
- return tasks
- }
更新一些簿记量, 如物理节点和 executor 的相互映射关系, 机架和 host 的映射关系, host 和 executor 上运行的任务信息等等; 检查是否有新的可用 executor 加入
触发黑名单的超时检查, 被加入黑明单的节点或 executor 是由一定超时时间的, 在超时时间内不能像他们提交任务, 而过了超时时间, 这些资源将被重新投入使用; 根据最新的黑名单过滤掉在黑名单中的计算资源, 包括 host 和 executor
通过调度池对所有的任务集按优先级进行排序, 获取排序后的任务集
对于每一个任务集, 按照对 executor 进行 round-robin 的方式分配任务, 会进行多轮分配, 每一轮依次轮询所有的 executor, 为每一个 executor 分配一个符合本地性要求的任务
- TaskSchedulerImpl.resourceOfferSingleTaskSet
- private def resourceOfferSingleTaskSet(
- taskSet: TaskSetManager,
- maxLocality: TaskLocality,
- shuffledOffers: Seq[WorkerOffer],
- availableCpus: Array[Int],
- tasks: IndexedSeq[ArrayBuffer[TaskDescription]]) : Boolean = {
- var launchedTask = false
- // nodes and executors that are blacklisted for the entire application have already been
- // filtered out by this point
- // 轮询每一个 executor, 分别为其分配一个 Task
- for (i <- 0 until shuffledOffers.size) {
- val execId = shuffledOffers(i).executorId
- val host = shuffledOffers(i).host
- // 检查这个 executor 上的 CPU 资源是否够用
- if (availableCpus(i)>= CPUS_PER_TASK) {
- try {
- // 根据最大允许的本地性级别取出能够在这个 executor 上执行的任务,
- for (task <- taskSet.resourceOffer(execId, host, maxLocality)) {
- // 如果能够找出一个可以在这个 executor 上运行的符合本地性要求的任务,
- // 将这个任务加入传进来的集合中
- tasks(i) += task
- val tid = task.taskId
- taskIdToTaskSetManager(tid) = taskSet
- taskIdToExecutorId(tid) = execId
- executorIdToRunningTaskIds(execId).add(tid)
- availableCpus(i) -= CPUS_PER_TASK
- assert(availableCpus(i)>= 0)
- launchedTask = true
- }
- } catch {
- case e: TaskNotSerializableException =>
- logError(s"Resource offer failed, task set ${taskSet.name} was not serializable")
- // Do not offer resources for this task, but don't throw an error to allow other
- // task sets to be submitted.
- return launchedTask
- }
- }
- }
- return launchedTask
- }
这个方法就是对所有可用的 executor 进行一轮 round-robin 方式的分配, 一轮分配中, 每个 executor 最多只能得到一个任务, 这样做是为了尽量将任务 "打散", 均匀第 "撒到" 所有 executor 上.
- TaskSetManager.resourceOffer
- @throws[TaskNotSerializableException]
- def resourceOffer(
- execId: String,
- host: String,
- maxLocality: TaskLocality.TaskLocality)
- : Option[TaskDescription] =
- {
- // 首先检查黑名单
- val offerBlacklisted = taskSetBlacklistHelperOpt.exists { blacklist =>
- blacklist.isNodeBlacklistedForTaskSet(host) ||
- blacklist.isExecutorBlacklistedForTaskSet(execId)
- }
- if (!isZombie && !offerBlacklisted) {
- val curTime = clock.getTimeMillis()
- var allowedLocality = maxLocality
- // 根据本地性等待时间重新计算本地性级别
- if (maxLocality != TaskLocality.NO_PREF) {
- allowedLocality = getAllowedLocalityLevel(curTime)
- if (allowedLocality> maxLocality) {
- // We're not allowed to search for farther-away tasks
- allowedLocality = maxLocality
- }
- }
- // 找出一个在指定的本地性级别下, 能够在这个 executor 上运行的任务
- dequeueTask(execId, host, allowedLocality).map { case ((index, taskLocality, speculative)) =>
- // Found a task; do some bookkeeping and return a task description
- val task = tasks(index)
- // 分配一个 taskId
- val taskId = sched.newTaskId()
- // Do various bookkeeping
- // 更新一些簿记量
- copiesRunning(index) += 1
- // task 的尝试次数
- val attemptNum = taskAttempts(index).size
- val info = new TaskInfo(taskId, index, attemptNum, curTime,
- execId, host, taskLocality, speculative)
- taskInfos(taskId) = info
- // 记录每次尝试的任务信息
- taskAttempts(index) = info :: taskAttempts(index)
- // Update our locality level for delay scheduling
- // NO_PREF will not affect the variables related to delay scheduling
- // 更新本地性信息和事件信息用于计算本地性等待时间
- // 而对于没有本地性偏好的任务则不会影响这些簿记量
- if (maxLocality != TaskLocality.NO_PREF) {
- currentLocalityIndex = getLocalityIndex(taskLocality)
- lastLaunchTime = curTime
- }
- // Serialize and return the task
- // 对 task 进行序列化
- val serializedTask: ByteBuffer = try {
- ser.serialize(task)
- } catch {
- // If the task cannot be serialized, then there's no point to re-attempt the task,
- // as it will always fail. So just abort the whole task-set.
- case NonFatal(e) =>
- val msg = s"Failed to serialize task $taskId, not attempting to retry it."
- logError(msg, e)
- abort(s"$msg Exception during serialization: $e")
- throw new TaskNotSerializableException(e)
- }
- // 如果序列化后的体积超过指定阈值, 那么会打印一条警告信息
- if (serializedTask.limit()> TaskSetManager.TASK_SIZE_TO_WARN_KB * 1024 &&
- !emittedTaskSizeWarning) {
- emittedTaskSizeWarning = true
- logWarning(s"Stage ${task.stageId} contains a task of very large size" +
- s"(${serializedTask.limit() / 1024} KB). The maximum recommended task size is" +
- s"${TaskSetManager.TASK_SIZE_TO_WARN_KB} KB.")
- }
- // 更新调度池中的运行任务统计的簿记量
- addRunningTask(taskId)
- // We used to log the time it takes to serialize the task, but task size is already
- // a good proxy to task serialization time.
- // val timeTaken = clock.getTime() - startTime
- val taskName = s"task ${info.id} in stage ${taskSet.id}"
- logInfo(s"Starting $taskName (TID $taskId, $host, executor ${info.executorId}," +
- s"partition ${task.partitionId}, $taskLocality, ${serializedTask.limit()} bytes)")
- // 通过 dagScheduler 给事件总线头第一个任务开始的事件
- sched.dagScheduler.taskStarted(task, info)
- // 封装成一个 TaskDescription 对象, 并返回给上层调用
- new TaskDescription(
- taskId,
- attemptNum,
- execId,
- taskName,
- index,
- addedFiles,
- addedJars,
- task.localProperties,
- serializedTask)
- }
- } else {
- None
- }
- }
这个方法的作用是对给定的 executor 和本地性级别, 分配一个符合要求的任务给这个 executor. 最终任务被封装成 TaskDescription 对象.
小结
对任务分配做一个小结. 在给定的计算资源上分配合适的任务, 这个工作主要是由 TaskScheduler 和 TaskSetManager 两个类协同完成的. 而任务本地性的维护与分配时的检查工作是在 TaskSetManager 中完成的.
接下来, 我们分析一下获取到可以实际运行的任务后, 调度后端是怎么把这些任务发送到制定的 executor 上执行的.
DriverEndpoint.makeOffers
首先还得接着回到 DriverEndpoint.makeOffers 方法, makeOffers 方法中通过调用 TaskSchedulerImpl.resourceOffers 方法切入 TaskSchedulerImpl, 然后就是 TaskSchedulerImpl 在做任务分配的工作, 最终 TaskSchedulerImpl 将分配好的任务以 TaskDescription 的封装形式返回给 DriverEndpoint(DriverEndpoint 是调度后端的一个内部类), 然后紧接着调用 DriverEndpoint.launchTasks 方法将这些任务传给相应的 executor 执行.
- DriverEndpoint.launchTasks
- private def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
- for (task <- tasks.flatten) {
- val serializedTask = TaskDescription.encode(task)
- if (serializedTask.limit()>= maxRpcMessageSize) {
- scheduler.taskIdToTaskSetManager.get(task.taskId).foreach { taskSetMgr =>
- try {
- var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed:" +
- "spark.rpc.message.maxSize (%d bytes). Consider increasing" +
- "spark.rpc.message.maxSize or using broadcast variables for large values."
- msg = msg.format(task.taskId, task.index, serializedTask.limit(), maxRpcMessageSize)
- // 注意, 这里只要有一个 task 体积超过阈值, 就将该 task 所属的 taskSet 取消掉
- // 为什么这么做呢? 因为其实一个 taskset 中的所有 task 的体积都是一样的, 只是 partition 序号不同而已,
- // 所以一个 task 体积超过阈值, taskset 中的其他 task 也必然超过阈值,
- // 所以没有必要再尝试其他的 task, 直接把 taskset 取消掉更高效
- taskSetMgr.abort(msg)
- } catch {
- case e: Exception => logError("Exception in error callback", e)
- }
- }
- }
- else {
- val executorData = executorDataMap(task.executorId)
- // 维护 CPU 资源信息
- executorData.freeCores -= scheduler.CPUS_PER_TASK
- logDebug(s"Launching task ${task.taskId} on executor id: ${task.executorId} hostname:" +
- s"${executorData.executorHost}.")
- // 通过 rpc 发送任务到指定的 executor 上
- executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))
- }
- }
- }
这个方法跟简单, 主要就是对 TaskDescription 进行序列化, 然后检查体积是否超过阈值, 如果没超过阈值就调用 rpc 服务引用, 将任务发送到指定的 executor 上.
总结
好了, 经过漫长的调用, 终于我们的任务要离开 driver, 驶向 executor 了, 回顾一下任务在 driver 中从诞生到最终发送的过程, 主要有一下几个步骤:
DAGScheduler 对作业计算链按照 shuffle 依赖划分多个 stage, 提交一个 stage 根据个 stage 的一些信息创建多个 Task, 包括 ShuffleMapTask 和 ResultTask, 并封装成一个任务集(TaskSet), 把这个任务集交给 TaskScheduler
TaskSchedulerImpl 将接收到的任务集加入调度池中, 然后通知调度后端 SchedulerBackend
CoarseGrainedSchedulerBackend 收到新任务提交的通知后, 检查下现在可用 executor 有哪些, 并把这些可用的 executor 交给 TaskSchedulerImpl
TaskSchedulerImpl 根据获取到的计算资源, 根据任务本地性级别的要求以及考虑到黑名单因素, 按照 round-robin 的方式对可用的 executor 进行轮询分配任务, 经过多个本地性级别分配, 多轮分配后最终得出任务与 executor 之间的分配关系, 并封装成 TaskDescription 形式返回给 SchedulerBackend
SchedulerBackend 拿到这些分配关系后, 就知道哪些任务该发往哪个 executor 了, 通过调用 rpc 接口将任务通过网络发送即可.
来源: https://www.cnblogs.com/zhuge134/p/10965266.html