Spark 有几种资源调度设施. 每个 Spark Application(SparkContext 实例)独立地运行在一组 executor 进程内. cluster manager 为应用间的调度提供设施. 在每个 Spark 应用内, 如果将多个 job(多个 spark action)提交给不同的线程, 那么他们会并行运行.
1 Application 间的资源调度
集群上, 每个 Spark application 获得独立的一组 executor JVM, 这组 executor JVM 只为那个 application 运行 task 和存储数据. 如果多个用户要共享集群, 有不同的策略管理资源分配, 这取决于使用的 cluster manager.
资源的静态分区 (static partitioning) 可被所有的 cluster manager 获得, 这样每个 application 在他的生命周期内都可获得他能使用的最多资源. standalone,YARN,coarse-grained Mesos mode 这三种模式使用的就是这种方式.
1.1 控制资源使用
集群类型下, 如下配置资源分配:
Standalone mode:application 提交到 standalone mode 集群, 将会以 FIFO 的顺序运行, 每个 application 会尽可能地使用所有可用节点, 配置 spark.cores.max 来限制 application 使用节点的数目, 或者设置 spark.deploy.defaultCores. 除了可以设置 application 可用内核数, 还可以设置 spark.executor.memory 来控制内存的使用.
Mesos: 为了使用静态分区 (static partitioning) 在 Mesos 集群上, spark.mesos.coarse=true, 可以通过设置 spark.cores.max 来限制每个 application 的资源共享, 通过设置 spark.executor.memory 来控制 executor 内存的使用.
YARN: 通过设置 --num-executors 选项, spark YARN 客户端可控制集群上有多少 executor 被分配 (对应的配置属性为 spark.executor.instances),--executor-memory(对应的配置属性 spark.executor.memory) 和 --executor-cores(对应的配置属性 spark.executor.cores)控制了分配给每个 executor 的资源.
应用之间无法共享内存.
1.2 动态资源分配
Spark 提供了依据应用的工作量动态调整资源的机制. 这意味着你的 application 不在使用的资源会返还给集群, 当需要的时候再申请分配资源, 这种特性对于多应用共享集群特别有用.
这个特性默认失效, 但在所有 coarse-grained cluster manager 上都可用, 如: standalone mode, YARN mode, 和 Mesos coarse-grained mode.
使用这个特性有两个要求. 首先用于必须设置 spark.dynamicAllocation.enabled=true, 其次要设置 external shuffle service 在集群上的每个 worker node 并设置 spark.shuffle.service.enabled=true. 设置 external shuffle service 目的是 executor 可被移除但是不删除他们生成的 shuffle 文件.
设置这个变量的方式为:
在 standalone 模式: 设置 spark.shuffle.service.enabled=true
Mesos coarse-grained 模式: 在所有从节点运行 $SPARK_HOME/sbin/start-mesos-shuffle-service.sh 设置 spark.shuffle.service.enabled=true
YARN: 详见运行 spark 与 YARN
1.3 资源分配策略
当 Spark 不再使用 executor 时就出让它, 需要的时候再获取它. 因为没有一个确定的方式预测将要被移除的 executor 是否在不久的将来会被使用, 或者一个将要被添加的新 executor 实际上是否是空闲的, 所以我们需要一系列试探来确定是移除 executor(可能会移除多个)还是请求 executor(可能会请求多个).
请求策略
开启 Spark application 动态分配资源特性, 当 pending task 等待被调度时, Spark application 会请求额外的 executor. 这就意味着, 当前的这些 executor 无法同时满足所有的 task, 这些 task 已经被提交, 但是还没有执行完.
Spark 轮流请求 executor. 当 task 等待的时间大于 spark.dynamicAllocation.schedulerBacklogTimeout 时, 真正的请求 (申请 executor 的请求) 被触发, 之后, 如果未完成 task 队列存在, 那么每隔 spark.dynamicAllocation.sustainedSchedulerBacklogTimeout 秒请求被触发一次. 每一轮请求的 executor 数量以指数级增长. 例如, 第一轮请求一个 executor, 第二轮请求 2 个, 第三, 四轮分别请求 4,8 个.
按指数形式增长的动机有两个, 首先, 起初应用应该慎重地请求 executor, 以防只需几个 executor 就能满足需求, 这和 TCP 慢启动类似. 其次, 当应用确实需要更多的 executor 时, 应用应该能够及时地增加资源的使用.
移除策略
当 executor 闲置超过 spark.dynamicAllocation.executorIdleTimeout 秒时, 就将他移除. 注意, 大多数情况下, executor 的移除条件和请求条件是互斥的, 这样如果仍然有待调度的 task 的情况下 executor 是不会被移除的.
executor 优雅地退役
非动态分配资源情况下, 一个 Spark executor 或者是由于失败而退出, 或者是因相关 application 退出而退出. 这两种情况下, 不在需要与 executor 相关联的状态并且这些状态可以被安全地丢弃. 动态分配资源的情况下, 当 executor 被明确移除时, application 仍然在运行. 如果 application 要想使用这些由 executor 存储和写下的状态, 就必须重新计算状态. 这样就需要一种优雅的退役机制, 即在 executor 退役前保留他的状态.
这个机制对于 shuffles 特别重要. shuffle 期间, executor 自己的 map 输出写入本地磁盘. 当其他的 executor 要获取这些文件的时候, 这个 executor 充当了文件服务器的角色. 对于那些落后的 executor, 他们的 task 执行时间比同辈要长, 在 shuffle 完成之前, 动态资源分配可能移除了一个 executor, 这种情形下, 那个 executor 写入本地的文件 (即 executor 的状态) 不必重新计算.
保留 shuffle 文件的办法就是使用外部的 shuffle 服务, 这是在 Spark 1.2 中引入的. 这个外部的 shuffle 服务指的是长时间运行的进程, 它运行与集群的每个节点上, 独立于 application 和 executor. 如果这个服务可用, executor 就从这个服务获 shuffle file, 而不是彼此之间获取 shuffle file. 这意味着 executor 生成的任何 shuffle 文件都可能被服务包含, 即使在 executor 生命周期之外也是如此.
executor 除了写 shuffle 文件到本地硬盘, 还缓存数据到硬盘或内存中. 但是, 当 executor 被移除后, 缓存到内存中的数据将不可用. 为了解决这一问题, 默认地缓存数据到内存的 executor 永远不会被删除. 可以通过 spark.dynamicAllocation.cachedExecutorIdleTimeout 配置这一行为,
2 Application 内的资源调度
概述
给定的 application 内部(SparkContext 实例), 如果多个并行的 job 被提交到不同的线程上, 那么这些 job 可以同时执行. 这里的 job 指的是 Spark action 及 Spark action 触发的计算 task.Spark scheduler 是线程安全的, 支持 spark application 服务于多个请求.
默认地 Spark scheduler 以 FIFO 的顺序执行 job, 每个 job 被切分为一到多个 stage(例如, map 和 reduce), 当第一个 job 的 stage 的 task 启动后, 这个 job 优先获得所有可用资源, 然后才是第二, 三个 job....... 如果队头的 job 不必使用整个集群, 之后的 job 就能立即启动. 如果队头的 job 较大, 那么之后的 job 启动延迟会比较明显.
从 Spark 0.8 开始, 也可以通过配置实现队列间的公平调度. Job 间的 task 资源分配采用单循环的方式. 所有 job 都会获得大致相同的集群资源. 这就意味着, 当有长 job 存在时, 提交的短 job 可以立即获得资源启动运行而不必等到长 job 执行完毕. 可以设置 spark.scheduler.mode 为 FAIR
- val conf = new SparkConf().setMaster(...).setAppName(...)
- conf.set("spark.scheduler.mode", "FAIR")
- val sc = new SparkContext(conf)
公平调度池(可能多个)
公平调度器也支持在池中对 job 分组并给每个池配置不同的选项. 这有助于为更重要的 job 设置高优先级池, 例如把每个用户的 job 分到一组, 并且给这些用户相等的资源不论有多少并行 task, 而不是给每个 job 相等的资源.
不需要任何干预, 新 job 会进入默认池, 但是可以使用 spark.scheduler.pool 设置 job 池.
sc.setLocalProperty("spark.scheduler.pool", "pool1")
设置完后, 这个线程 (通过调用 RDD.save, count, collect) 提交的所有 job 都会使用这个资源池的名称. 设置是针对每一个线程的, 这样更容易实现一个线程运行一个用户的多个 job. 如果想清除与一个线程相关的池, 调用: sc.setLocalProperty("spark.scheduler.pool", null)
池默认行为
默认地每个池都能获得相等的资源(在默认池中每个 job 都能获得相等的资源), 但在每个池内部, job 以 FIFO 的顺序运行. 例如如果为每一个用户创建一个池, 这就意味着每一个用户将获得相等的资源, 并且每个用户的查询都会按顺序运行而不会出现后来的查询抢占了前面查询的资源
配置池属性
可以通过修改配置文件改变池属性. 每个池都支持三种属性:
schedulingMode: 可以是 FIFO 或 FAIR, 控制池中的 job 排队等候或公平地分享集群资源.
weight: 控制资源分配的比例. 默认所有池分配资源比重都是 1. 如果指定一个池的比重为 2, 那么他获得的资源是其他池的 2 倍. 如果将一个池的比重设的很高, 比如 1000, 那么不论他是否有活跃的 job, 他总是第一个开始执行 task.
minShare: 除了设置总体的占比之外, 还可以对每个池设定一个最小资源分配 (例如 CPU 核数). 在根据比重重新分配资源之前, 公平调度器总是试图满足所有活跃池的最小资源需求. minShare 属性能以另一种方式确保一个池快速地获得一定数量的资源(10 个核) 而不必给他更高的优先级. 默认地 minShare=0.
调用 SparkConf.set, 可以通过 xml 文件配置池属性:
conf.set("spark.scheduler.allocation.file", "/path/to/file")
每个池一个, 在 xml 文件中没有配置的池使用默认配置(调度模式 FIFO, weight 1, minShare 0), 例如:
FAIR
1
2
FIFO
2
3
来源: http://bigdata.51cto.com/art/201904/594653.htm