在看 sparkContext 之前, 先回顾一下 Scala 的语法. Scala 构造函数分主构造和辅构造函数, 辅构造函数是关键字 def+this 定义的, 而类中不在方法体也不在辅构造函数中的代码就是主构造函数, 实例化对象的时候主构造函数都会被执行, 例:
- class person(name String,age Int){
- println("主构造函数被调用")
- def this(name String,age Int){ // 辅构造函数
- this () // 必须先调用主构造函数
- this.name = name
- this.age = age
- }
- def introduce(){
- println("name :" + name + "-age :" + age)
- }
- }
- val jack = new person("jack",2)
- jack.introduce()
运行结果:
主构造函数被调用
name :jack-age :2
切入正题, 看 sparkContext 的主构造函数比较重要的一些代码:
- try{
- ...
- // Create the Spark execution environment (cache, map output tracker, etc)
- _env = createSparkEnv(_conf, isLocal, listenerBus)
- SparkEnv.set(_env)
- ...
- // We need to register "HeartbeatReceiver" before "createTaskScheduler" because Executor will
- // retrieve "HeartbeatReceiver" in the constructor. (SPARK-6640)
- _heartbeatReceiver = env.rpcEnv.setupEndpoint(
- HeartbeatReceiver.ENDPOINT_NAME, new HeartbeatReceiver(this))
- // Create and start the scheduler
- val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
- _schedulerBackend = sched
- _taskScheduler = ts
- _dagScheduler = new DAGScheduler(this)
- _heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)
- // start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's
- // constructor
- _taskScheduler.start()
- }
首先:
- _env = createSparkEnv(_conf, isLocal, listenerBus)
- SparkEnv.set(_env)
- _heartbeatReceiver = env.rpcEnv.setupEndpoint(
- HeartbeatReceiver.ENDPOINT_NAME, new HeartbeatReceiver(this))
这里是在 sparkContext 中创建 rpcEnv, 并通过 setupEndpoint 向 rpcEnv 注册一个心跳的 Endpoint.
接着:
val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
调的 sparkContext 自己的方法, 创建 taskScheduler, 返回的是一个 (SchedulerBackend, TaskScheduler) 元组
- private def createTaskScheduler(
- sc: SparkContext,
- master: String,
- deployMode: String): (SchedulerBackend, TaskScheduler) = {
- import SparkMasterRegex._
- // When running locally, don't try to re-execute tasks on failure.
- val MAX_LOCAL_TASK_FAILURES = 1
- master match {
- //...
- //standalone 的提交模式
- case SPARK_REGEX(sparkUrl) =>
- val scheduler = new TaskSchedulerImpl(sc)
- val masterUrls = sparkUrl.split(",").map("spark://" + _)
- val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls)
- // 调用初始化方法
- scheduler.initialize(backend)
- (backend, scheduler)
- }
- //...
- }
方法内部根据 master 参数判断不同的提交模式, 创建不同的 (SchedulerBackend, TaskScheduler) , 拿 standalon 模式举例, 根据入参创建 TaskSchedulerImpl 和 StandalonSchedulerBackend, 再调用 TaskSchedulerImpl 的初始化方法, 最后返回一个元组.
scheduler.initialize(backend), 其实就是根据不同的 schedulingMode 创建不同的 schedulableBuilder, 它就是对 Scheduleable tree 的封装, 负责对 taskSet 的调度.
- def initialize(backend: SchedulerBackend) {
- this.backend = backend
- schedulableBuilder = {
- schedulingMode match {
- case SchedulingMode.FIFO =>
- new FIFOSchedulableBuilder(rootPool)
- case SchedulingMode.FAIR =>
- new FairSchedulableBuilder(rootPool, conf)
- case _ =>
- throw new IllegalArgumentException(s"Unsupported $SCHEDULER_MODE_PROPERTY:" +
- s"$schedulingMode")
- }
- }
- schedulableBuilder.buildPools()
- }
接着下面两行代码:
_dagScheduler = new DAGScheduler(this)
创建 DAG 有向无环图, 实现类面向 stage 的调度机制的高层次调度层, 他会为每个 stage 计算 DAG(有向无环图), 追踪 RDD 和 stage 的输出是否被物化 (写入磁盘或内存), 并且寻找一个最少消耗的调度机制来运行 job. 它会将 stage 作为 taskSets 提交到底层的 TaskSchedulerImpl 上来在集群运行. 除了处理 stage 的 DAG, 它还负责决定运行每个 task 的最佳位置, 基于当前的缓存状态, 将最佳位置提交给底层的 TaskSchedulerImpl, 此外, 他会处理由于每个 shuffle 输出文件导致的失败, 在这种情况下旧的 stage 可能会被重新提交. 一个 stage 内部的失败, 如果不是由于 shuffle 文件丢失导致的失败, 会被 taskScheduler 处理, 它会多次重试每个 task, 还不行才会取消整个 stage.
_heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)
在上面创建好了 TaskScheduler 和 SchedulerBackend 后, 告诉 HeartbeatReceiver(心跳) 的监听端.
最后:
_taskScheduler.start()
在 TaskSchedulerImpl 的 start() 方法中调的是 SchedulerBackend 的 start() 方法, 所以 start() 方法运行的是这段:
- override def start() {
- super.start()
- // SPARK-21159. The scheduler backend should only try to connect to the launcher when in client
- // mode. In cluster mode, the code that submits the application to the Master needs to connect
- // to the launcher instead.
- if (sc.deployMode == "client") {
- launcherBackend.connect()
- }
- // 参数设置
- val appDesc = ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,
- webUrl, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor, initialExecutorLimit)
- client = new StandaloneAppClient(sc.env.rpcEnv, masters, appDesc, this, conf)
- client.start()
- launcherBackend.setState(SparkAppHandle.State.SUBMITTED)
- waitForRegistration()
- launcherBackend.setState(SparkAppHandle.State.RUNNING)
- }
这里创建了两个对象: AppliactionDescription 和 AppClient,AppliactionDescription 顾名思义就是对 Application 的描述类, 比如它需要的资源等; AppClient 负责负责为 application 与 spark 集群通信. SchedulerBackend 的 start() 最终调用了 AppClient 的 start(), 代码如下:
- def start() {
- // Just launch an rpcEndpoint; it will call back into the listener.
- endpoint.set(rpcEnv.setupEndpoint("AppClient", new ClientEndpoint(rpcEnv)))
- }
启动一个 rpcEndPoint 并回调给监听器, RPC 原理可看这篇 https://www.cnblogs.com/superhedantou/p/7570692.html
最后画个大概流程图
来源: https://www.cnblogs.com/cnblogs-syui/p/10948471.html