今天抽空回顾了一下 Spark 相关的源码,本来想要了解一下 Block 的管理机制,但是看着看着就回到了 SparkContext 的创建与使用.正好之前没有正式的整理过这部分的内容,这次就顺带着回顾一下.
更多内容参考: 我的大数据之路
Spark 作为目前最流行的大数据计算框架,已经发展了几个年头了.版本也从我刚接触的 1.6 升级到了 2.2.1.由于目前工作使用的是 2.2.0,所以这次的分析也就从 2.2.0 版本入手了.
涉及的内容主要有:
Standalone 模式中的 Master 与 Worker
client,driver,excutor 的关系
下面就按照顺序依次介绍一下.
Master 与 Worker
在最开始编程的时候,很少会涉及分布式,因为数据量也不大.后来随着硬件的发展 cpu 的瓶颈,开始流行多线程编程,基于多线程来加快处理速度;再后来,衍生出了网格计算,CPU 与 GPU 的异构并行计算以及当时流行的 mapreduce 分布式计算.但是 mapreduce 由于存储以及计算流程的限制,spark 开始流行起来.Spark 凭借内存计算,强大的 DAG 回溯能力,快速的占领并行计算的风口.
那么并行计算肯定是需要分布式集群的,常见的集群管理方式,有 Master-Slave 模式,P2P 模式等等.
比如 Mysql 的主从复制,就是 Master-Slave 模式;Elasticsearch 的分片管理就是 P2P 模式.在 Spark 中有不同的部署方式,但是计算的模式都是 Master-Slave 模式,只不过 Slave 换了名字叫做 worker 而已.集群的部署模式如下所示:
流程就是用户以 client 的身份向 master 提交任务,master 去 worker 上面创建执行任务的载体(driver 和 excutor).
client,driver,excutor 的关系
Master 和 Worker 是服务器的部署角色,程序从执行上,则分成了 client,driver,excutor 三种角色.按照模式的不同,client 和 driver 可能是同一个.以 2.2.0 版本的 standalone 模式来说,他们三个是独立的角色.client 用于提交程序,初始化一些环境变量;driver 用于生成 task 并追踪管理 task 的运行;excutor 负责最终 task 的执行.
源码探索
总的流程可以总结为下面的一张图:
通过查看源码,来看一下
1 SparkContext 创建调度器
在创建 SparkContext 的时候会创建几个核心的模块:
DAGScheduler 面向 job 的调度器
TaskScheduler 不同的集群模式,有不同的实现方式,如 standalone 下的 taskschedulerImpl
SchedulerBackend 不同的集群模式下,有不同的实现方式,如 standalone 下的 StandaloneSchedulerBackend. 负责向 master 发起注册
在 createTaskSchduler 中,根据 master 的不同,选择不同的实现方式,主要是在 backend 的实现上有差异:
// 创建并启动调度器
val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
_schedulerBackend = sched
_taskScheduler = ts
_dagScheduler = new DAGScheduler(this)
...
// 启动调度器
_taskScheduler.start()
我们这里只看一下 standalone 模式的创建,就是创建了 TaskSchedulerImpl 和 StandaloneSchedulerBackend 的对象,另外初始化了调度器,根据配置选择调度模式,默认是 FIFO:
master match {
case "local" =>
...
case LOCAL_N_REGEX(threads) =>
...
case LOCAL_N_FAILURES_REGEX(threads, maxFailures) =>
...
case SPARK_REGEX(sparkUrl) =>
// 创建调度器
val scheduler = new TaskSchedulerImpl(sc)
val masterUrls = sparkUrl.split(",").map("spark://" + _)
// 创建backend
val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls)
// 把backend注入到schduler中
scheduler.initialize(backend)
(backend, scheduler)
case LOCAL_CLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerSlave) =>
...
case masterUrl =>
...
}
2 TaskSchedulerImpl 执行 start 方法
def initialize(backend: SchedulerBackend) {
this.backend = backend
schedulableBuilder = {
schedulingMode match {
case SchedulingMode.FIFO =>
new FIFOSchedulableBuilder(rootPool)
case SchedulingMode.FAIR =>
new FairSchedulableBuilder(rootPool, conf)
case _ =>
throw new IllegalArgumentException(s"Unsupported $SCHEDULER_MODE_PROPERTY: " +
s"$schedulingMode")
}
}
schedulableBuilder.buildPools()
}
其实是执行了 backend 的 start() 方法
3 StandaloneSchedulerBackend 执行 start 方法
override def start() {
backend.start()
...
}
这部分代码比较多,可以简化的看:
封装 command 对象
封装 appDesc 对象
创建 StandaloneAppClient 对象
执行 start() 方法
其中 command 中包含的那个类,就是 excutor 的实现类.
4 发起注册
override def start() {
//初始化参数
...
val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend",
args, sc.executorEnvs, classPathEntries ++ testingClassPath, libraryPathEntries, javaOpts)
...
val appDesc = ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,
webUrl, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor, initialExecutorLimit)
// 注意前面创建了一大堆的配置对象,主要就是那个class等信息
client = new StandaloneAppClient(sc.env.rpcEnv, masters, appDesc, this, conf)
client.start()
...
}
核心的代码在 StanaloneAppClient 中,并在 start() 方法中启动了一个 rpc 的服务——ClientEndpoint
registerWithMaster 采用了异步发送请求连接 master,只要有一个注册成功,其他的都会 cancel.这里有时间可以做个小 hello world 玩玩看.
override def onStart(): Unit = {
try {
registerWithMaster(1)//发起注册
} catch {
...
}
}
5 Master 接收到请求执行 schedule 方法
private def registerWithMaster(nthRetry: Int) {
registerMasterFutures.set(tryRegisterAllMasters())
registrationRetryTimer.set(registrationRetryThread.schedule(new Runnable {
override def run(): Unit = {
if (registered.get) {
registerMasterFutures.get.foreach(_.cancel(true))
registerMasterThreadPool.shutdownNow()
} else if (nthRetry >= REGISTRATION_RETRIES) {
markDead("All masters are unresponsive! Giving up.")
} else {
registerMasterFutures.get.foreach(_.cancel(true))
registerWithMaster(nthRetry + 1)
}
}
}, REGISTRATION_TIMEOUT_SECONDS, TimeUnit.SECONDS))
}
//发起注册
private def tryRegisterAllMasters(): Array[JFuture[_]] = {
...
masterRef.send(RegisterApplication(appDescription, self))
...
}
Master 是一个常驻的进程,时刻监听别人发过来的消息.刚才 client 发送了一个 RegisterApplication 消息,忽略前面创建 app 的内容,直接执行了 schedule 方法:
6 Master 发送 launchDriver
case RegisterApplication(description, driver) =>
// TODO Prevent repeated registrations from some driver
if (state == RecoveryState.STANDBY) {
// ignore, don't send response
} else {
...
schedule()
}
发送 lanunchDriver 请求
7 Worker 创建 DriverRunner
private def schedule(): Unit = {
...
for (driver <- waitingDrivers.toList) { // iterate over a copy of waitingDrivers
...
while (numWorkersVisited < numWorkersAlive && !launched) {
...
if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
launchDriver(worker, driver)
...
}
...
}
}
startExecutorsOnWorkers()
}
//向worker发送launchDriver请求
private def launchDriver(worker: WorkerInfo, driver: DriverInfo) {
...
worker.endpoint.send(LaunchDriver(driver.id, driver.desc))
...
}
8 Master 发送 launchExcutor
case LaunchDriver(driverId, driverDesc) =>
logInfo(s"Asked to launch driver $driverId")
val driver = new DriverRunner(
conf,
driverId,
workDir,
sparkHome,
driverDesc.copy(command = Worker.maybeUpdateSSLSettings(driverDesc.command, conf)),
self,
workerUri,
securityMgr)
drivers(driverId) = driver
driver.start()
coresUsed += driverDesc.cores
memoryUsed += driverDesc.mem
第 6 步中最后有一个 startExecutorsOnWorkers 方法.
9 Worker 创建 ExcutorRunner
private def startExecutorsOnWorkers(): Unit = {
...
for (app <- waitingApps if app.coresLeft > 0) {
...
for (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) {
allocateWorkerResourceToExecutors(
app, assignedCores(pos), coresPerExecutor, usableWorkers(pos))
}
}
}
private def allocateWorkerResourceToExecutors(
app: ApplicationInfo,
assignedCores: Int,
coresPerExecutor: Option[Int],
worker: WorkerInfo): Unit = {
...
for (i <- 1 to numExecutors) {
...
launchExecutor(worker, exec)
...
}
}
private def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc): Unit = {
...
worker.endpoint.send(LaunchExecutor(masterUrl,
exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory))
...
}
至此,Driver 和 Excutor 就启动起来了.....
case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) =>
if (masterUrl != activeMasterUrl) {
...
} else {
try {
...
val manager = new ExecutorRunner(
appId,
execId,
appDesc.copy(command = Worker.maybeUpdateSSLSettings(appDesc.command, conf)),
cores_,
memory_,
self,
workerId,
host,
webUi.boundPort,
publicAddress,
sparkHome,
executorDir,
workerUri,
conf,
appLocalDirs, ExecutorState.RUNNING)
...
} catch {
...
}
}
之后代码是怎么运行的,就且听下回分解把!
参考
SparkContext http://www.cnblogs.com/jcchoiling/p/6427406.html
spark worker 解密: http://www.cnblogs.com/jcchoiling/p/6433196.html
2.2.0 源码
《Spark 内核机制及性能调优》· 王家林
来源: https://www.cnblogs.com/xing901022/p/8260362.html