- /
- def resourceOffers(offers: IndexedSeq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {
- // Mark each slave as alive and remember its hostname
- // Also track if new executor is added
- var newExecAvail = false
- // 检查 work 是否已经存在了, 把不存在的加入到 work 调度池中
- for (o <- offers) {
- if (!hostToExecutors.contains(o.host)) {
- hostToExecutors(o.host) = new HashSet[String]()
- }
- if (!executorIdToRunningTaskIds.contains(o.executorId)) {
- hostToExecutors(o.host) += o.executorId
- executorAdded(o.executorId, o.host)
- executorIdToHost(o.executorId) = o.host
- executorIdToRunningTaskIds(o.executorId) = HashSet[Long]()
- newExecAvail = true
- }
- for (rack <- getRackForHost(o.host)) {
- hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) += o.host
- }
- }
- // 打乱 work 机器的顺序, 以免每次分配任务时都在同一个机器上进行避免某一个 work 计算压力太大
- val shuffledOffers = Random.shuffle(offers)
- // 对于每一 work, 创建一个与其核数大小相同的数组, 数组的大小决定了这台 work 上可以并行执行 task 的数目.
- val tasks = shuffledOffers.map(o => new ArrayBufferTaskDescription)
- // 取出每台机器的 cpu 核数
- val availableCpus = shuffledOffers.map(o => o.cores).toArray
- // 从 task 任务调度池中, 按照我们的调度算法, 取出需要执行的任务
- val sortedTaskSets = rootPool.getSortedTaskSetQueue
- for (taskSet <- sortedTaskSets) {
- logDebug("parentName: %s, name: %s, runningTasks: %s".format(
- taskSet.parent.name, taskSet.name, taskSet.runningTasks))
- if (newExecAvail) {
- taskSet.executorAdded()
- }
- }
- // 下面的这个循环, 是用来标记 task 根据 work 的信息来标定数据本地化的程度的当我们在 yarn 资源管理器, 以 --driver-mode 配置
- // 为 client 时, 我们就会在打出来的日志上看出每一台机器上运行 task 的数据本地化程度同时还会选择每个 task 对应的 work 机器
- // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY
- for (taskSet <- sortedTaskSets) {
- var launchedAnyTask = false
- var launchedTaskAtCurrentMaxLocality = false
- for (currentMaxLocality <- taskSet.myLocalityLevels) {
- do {
- launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet(
- taskSet, currentMaxLocality, shuffledOffers, availableCpus, tasks)
- launchedAnyTask |= launchedTaskAtCurrentMaxLocality
- } while (launchedTaskAtCurrentMaxLocality)
- }
- if (!launchedAnyTask) {
- taskSet.abortIfCompletelyBlacklisted(hostToExecutors)
- }
- }
来源: http://blog.51cto.com/9269309/2091219