[TOC]
研究Spark作业调度,是为了合理使用集群的资源。更具体一点,是看看是否提供了可以个性化配置的点,然后根据应用的具体情况制定配置或者使用方案。
本文参考官网作业调度文档。
spark的作业调度分为两个场景:跨应用的调度和应用内部的调度,下面分别介绍。
声明:文中配图是自己的理解,并不敢保证其准确性。
跨应用的调度是由底层的集群管理器负责的,有两种资源分配策略。
一种是静态资源分隔,即一个应用一开始就申请所有的资源,并在程序运行期间使用持有这些资源。
一种是动态资源分配,应用根据自己的负载情况动态请求或释放资源。这种策略默认是不开启的。
所有的集群管理器都支持静态资源分隔,只是具体的配置策略不同:
提交到Standalone mode集群的应用会以FIFO的顺序运行,每一个正在运行的应用都会尝试占用所有的可用资源。使用下面的配置项可以限制每个应用申请的资源:
spark.cores.max
应用可以申请的最大数量的CPU核的数量,如果没有设置,取
的值。
- spark.deploy.defaultCores
spark.executor.memory
分配给每个executor进程的内存资源。
为了使用静态资源隔离,需要设置
为true,这称为粗粒度的Mesos模式。
- spark.mesos.coarse
另外,
和
- spark.cores.max
在Mesos模式下同样有效。
- spark.executor.memory
--num-executors
在使用spark-submit提交作业时,可以使用
选项请求指定的executor个数。
- --num-executors
在程序内部,可以通过设置
属性达到同样的目的。
- spark.executor.instances
--executor-memory
在使用spark-submit提交作业时,可以使用
选项设置每个executor申请的内存。
- --executor-memory
在程序内部,可以通过设置
属性达到同样的目的。
- spark.executor.memory
--executor-cores
在使用spark-submit提交作业时,可以使用
选项设置每个executor申请的CPU核。
- --executor-cores
在程序内部,可以通过设置
属性达到同样的目的。
- spark.executor.cores
spark的运行模型是基于executor的,executor是资源的实际持有者。所以动态资源分配,是通过动态的申请executor和释放executor来实现的。
动态资源分配涉及到两个方面,如何在需要的时候动态申请资源,以及如何在空闲的时候动态释放资源。
动态请求策略:如果一个应用有tasks在等待,超过一定的时间(
秒)就会申请1个executor。此后每隔一定的时间(
- spark.dynamicAllocation.schedulerBacklogTimeout
秒)就检测应用是否有tasks在等待,有就继续申请executor。
- spark.dynamicAllocation.sustainedSchedulerBacklogTimeout
动态请求资源的数量是指数级的,第一次申请1个,第二次申请2个,接着是4, 8 ...这种考虑是为了在谨慎申请资源的同时,又可以在允许的时间范围内获得真正需要的资源量。
动态释放资源:是通过检查应用占据的executor是否超过了指定的时间(
秒)来决定的,超过了就释放。
- spark.dynamicAllocation.executorIdleTimeout
释放资源的条件和请求资源的条件是互斥的,即如果一个应用有tasks在排队,就不应该会有空闲的executor。
为了使用动态资源分配,需要做两件事:
值为true
- spark.dynamicAllocation.enabled
,并设置
- external shuffle service
为true
- spark.shuffle.service.enabled
的作用在后面会介绍,不同集群模式下启动
- external shuffle service
的方式不同:
- external shuffle service
,只需要设置
- external shuffle service
为true即可。
- spark.shuffle.service.enabled
来启动
- $SPARK_HOME/sbin/start-mesos-shuffle-service.sh
。
- external shuffle service
动态释放资源需要额外的支持,因为executor可能会产生中间结果并输出到本地,在需要的时候需要通过这个executor获取它的中间结果。冒然移除executor会丢失它计算的中间结果,导致在真正需要的时候又要重新计算。
比如在map阶段executor输出map结果,在shuffle阶段这些map结果又需要通过executor读出来传送到负责reduce的executor。
spark通过
来解决这个问题。
- external shuffle service
是指在每一个node都运行的一个长期进程,这个进程独立于应用和executor,负责提供executor的输出数据的获取服务。原来executor之间相互请求来获取对方的输出结果,变成了统一从external shuffle service获取结果。
- external shuffle service
即使executor已经被移除了,它所输出的数据依然可以通过
来获取。
- external shuffle service
另外,executor还可能会把中间结果缓存到内存,目前的策略是不移除此类的executor。未来可能采取将缓存持久化的方式,进而释放executor。
一个spark应用可以支持多个不同线程的job同时提交,这常见于spark应用提供网络服务的场景。
spark默认的调度策略是FIFO,如果队列头部的job比较大,占用了集群的所有资源,后面的小任务将迟迟得不到运行的机会。
另外,spark还支持配置FAIR调度,spark循环调度每个job的task。这样即使有大job在运行,刚提交的小job也可以及时获得资源,而不需要等到大job结束。
通过设置属性
来启用公平调度:
- spark.scheduler.mode
- val conf = new SparkConf().setMaster(...).setAppName(...)
- conf.set("spark.scheduler.mode", "FAIR")
- val sc = new SparkContext(conf)
spark支持公平调度池的概念,每个线程可以指定将jobs提交到哪个池子,最细粒度的场景下是每个线程对应一个池,也可以多个线程使用同一个池。
每个线程默认使用的池是default,也可以通过设置参数来明确指定池。
- // Assuming sc is your SparkContext variable
- sc.setLocalProperty("spark.scheduler.pool", "pool1")
如果想重置当前线程绑定的池子,调用
。
- sc.setLocalProperty("spark.scheduler.pool", null)
可以通过配置文件将资源按照一定的比重分配到池,配置文件的模板:
。
- conf/fairscheduler.xml.template
通过
指定配置文件。
- conf.set("spark.scheduler.allocation.file", "/path/to/file")
每个池可支持的参数有三个:
配置文件示例:
- <?xml version="1.0"?>
- <allocations>
- <pool name="production">
- <schedulingMode>FAIR</schedulingMode>
- <weight>1</weight>
- <minShare>2</minShare>
- </pool>
- <pool name="test">
- <schedulingMode>FIFO</schedulingMode>
- <weight>2</weight>
- <minShare>3</minShare>
- </pool>
- </allocations>
没有出现在配置文件中的池,所有参数取默认值(schedulingMode=FIFO,weight=1,minShare=0)。
executor到底指什么?和容器、JVM的关系是怎样的?
executor是负责一定职责的程序组件,可以在已有的JVM中运行(比如local mode),也可以在新的JVM中运行。使用YARN时,executor是在YARN容器中运行的。
spark的job - stage - task的划分是怎么样的?
spark的job可以划分为多个stage,这些stage构成了DAG。每一个stage又可以划分为多个tasks。stage的划分是根据shuffle map task来的,这一类的task相当于MapReduce中shuffle的map端,负责在本地RDD分区进行计算,并将结果输出到新的分区,供后续的使用。在划分stage时,shuffle map任务作为阶段的结束的边界。
Mesos的粗粒度和细粒度
Mesos可以启用CPU核的共享,即同一个节点executor在不使用核的情况下可以让给另一个executor来使用。
不启用CPU核共享称为粗粒度,启用则称为细粒度,相关的配置项为
,值为true表示粗粒度。
- spark.mesos.coarse
来源: http://www.cnblogs.com/ywjy/p/7792639.html