根据之前的一系列分析, 我们对 spark 作业从创建到调度分发, 到执行, 最后结果回传 driver 的过程有了一个大概的了解. 但是在分析源码的过程中也留下了大量的问题, 最主要的就是涉及到的 spark 中重要的几个基础模块, 我们对这些基础设施的内部细节并不是很了解, 之前走读源码时基本只是大概了解每个模块的作用以及对外的主要接口, 这些重要的模块包括 BlockMananger, MemoryMananger, ShuffleManager, MapOutputTracker, rpc 模块 NettyRPCEnv, 以及 BroadcastManager. 而对于调度系统涉及到的几个类包括 DAGSchedulerManager, TaskSchedulerManager, CoarseGrainedSchedulerBackend, CoarseGrainedExecutorBackend, Executor, TaskRunner, 我们之前已经做了较为详细的分析, 因此这几个模块暂告一段落.
本篇, 我们来看一下 spark 中最基础的一个的模块 -- 存储系统 BlockManager 的内部实现.
BlockManager 调用时机
首先, 我们来整理一下在一个作业的运行过程中都有哪些地方使用到了 BlockManager.
DAGScheduler.getCacheLocs. 这个方法的调用是在提交一个 stage 时, 需要获取分区的偏向位置时会调用该方法. 我们知道 rdd 是可以缓存的, 而 rdd 的缓存就是通过 blockManager 来管理的, 有一个专门的 RDDBlockId 用来表示一个 RDD 缓存块的唯一标识.
最终调用的方法是: blockManagerMaster.getLocations(blockIds)
广播变量. 在 DAGscheduler 中提交 stage 时需要把 rdd 和 ShuffleDependency(对于 ResultStage 则是一个函数) 对象序列化用于网络传输, 实际上序列化后的字节数组是通过 broadcastManager 组件进行网络传输的, 而 broadcastManager 实际又是通过 BlockMananger 来将要广播的数据存储成 block, 并在 executor 端发送 rpc 请求向 BlockManangerMaster 请求数据. 每个广播变量会对应一个 TorrentBroadcast 对象, TorrentBroadcast 对象内的 writeBlocks 和 readBlocks 是读写广播变量的方法,
最终调用的方法是: blockManager.putSingle 和 blockManager.putBytes
Shuffle 的 map 阶段输出. 如果我们没有启动外部 shuffle 服务及 ExternalShuffle, 那么就会用 spark 自己的 shuffle 机制, 在 map 阶段输出时通过 blockManager 对输出的文件进行管理. shuffle 这部分主要使用的是 DiskBlockManager 组件.
最终调用的是: DiskBlockManager 相关方法包括 createTempShuffleBlock,getDiskWriter,
DiskBlockObjectWriter 相关方法, 包括 write 方法和 commitAndGet 方法
任务运行结果序列化后传回 driver. 这里分为两种情况, 如果结果序列化后体积较小, 小于 maxDirectResultSize, 则直接通过 rpc 接口传回, 如果体积较大, 就需要先通过 blockManager 写入 executor 几点的内存和磁盘中, 然后在 driver 端进行拉取.
最终调用的是: blockManager.putBytes
此外, 我们还注意到, 以上几种情形中使用的 BlockId 都是不同的, 具体可以看一下 BlockId.scala 文件中关于各种 BlockId 的定义.
所以, 接下来, 我们的思路就很清晰了, 以上面提到的对 BlockManager 的方法调用为切入点进行分析.
BlockManagerMaster.getLocations
这个方法用于获取指定的 blockId 对应的块所在存储位置.
- def getLocations(blockIds: Array[BlockId]): IndexedSeq[Seq[BlockManagerId]] = {
- driverEndpoint.askSync[IndexedSeq[Seq[BlockManagerId]]](
- GetLocationsMultipleBlockIds(blockIds))
- }
这里向 driverEndpoint 发送了一个 GetLocations 消息, 注意这里的 driverEndpoint 并不是 DriverEndpoint 的端点引用, 在 SparkEnv 的构造过程我们可以看到, 这是一个 BlockManagerMasterEndpoint 端点的引用. 所以我们需要在 BlockManagerMasterEndpoint 中寻找对于该消息的处理. 注意, 由于这里调用了 ask 方法, 所以在服务端是由 receiveAndReply 方法来处理并响应的.
BlockManagerMasterEndpoint.receiveAndReply
我们截取了对 GetLocations 处理的部分代码
- case GetLocationsMultipleBlockIds(blockIds) =>
- context.reply(getLocationsMultipleBlockIds(blockIds))
调用的是 getLocations 方法:
- private def getLocations(blockId: BlockId): Seq[BlockManagerId] = {
- if (blockLocations.containsKey(blockId)) blockLocations.get(blockId).toSeq else Seq.empty
- }
这个方法很简单, 就是直接从缓存中查找 blockId 对应的位置, 位置信息用 BlockManagerId 封装. 那么缓存中的信息什么时候加进去呢? 当然是写入新的 block 并更新 block 位置信息的时候, 后面的会分析到.
BlockManager.putSingle
这个方法写入一个有单个对象组成的块,
- def putSingle[T: ClassTag](
- blockId: BlockId,
- value: T,
- level: StorageLevel,
- tellMaster: Boolean = true): Boolean = {
- putIterator(blockId, Iterator(value), level, tellMaster)
- }
可以看到, 把对象包装成了一个只有一个元素的迭代器, 然后调用 putIterator 方法, 最后调用 doPutIterator 方法
BlockManager.doPutIterator
上面的方法, 最终调用了 doPutIterator 方法.
- private def doPutIterator[T](
- blockId: BlockId,
- iterator: () => Iterator[T],
- level: StorageLevel,
- classTag: ClassTag[T],
- tellMaster: Boolean = true,
- keepReadLock: Boolean = false): Option[PartiallyUnrolledIterator[T]] = {
- //
- doPut(blockId, level, classTag, tellMaster = tellMaster, keepReadLock = keepReadLock) { info =>
- val startTimeMs = System.currentTimeMillis
- var iteratorFromFailedMemoryStorePut: Option[PartiallyUnrolledIterator[T]] = None
- // Size of the block in bytes
- var size = 0L
- // 如果存储等级中包含内存级别, 那么我们优先写入内存中
- if (level.useMemory) {
- // Put it in memory first, even if it also has useDisk set to true;
- // We will drop it to disk later if the memory store can't hold it.
- // 对于不进行序列化的情况, 只能存储内存中
- if (level.deserialized) {
- memoryStore.putIteratorAsValues(blockId, iterator(), classTag) match {
- case Right(s) =>
- size = s
- case Left(iter) =>
- // Not enough space to unroll this block; drop to disk if applicable
- // 内存空间不够时, 如果存储等级允许磁盘, 则存储到磁盘中
- if (level.useDisk) {
- logWarning(s"Persisting block $blockId to disk instead.")
- diskStore.put(blockId) { channel =>
- val out = Channels.newOutputStream(channel)
- // 注意对于存储到磁盘的情况一定是要序列化的
- serializerManager.dataSerializeStream(blockId, out, iter)(classTag)
- }
- size = diskStore.getSize(blockId)
- } else {
- iteratorFromFailedMemoryStorePut = Some(iter)
- }
- }
- } else { // !level.deserialized
- // 以序列化的形式进行存储
- memoryStore.putIteratorAsBytes(blockId, iterator(), classTag, level.memoryMode) match {
- case Right(s) =>
- size = s
- case Left(partiallySerializedValues) =>
- // Not enough space to unroll this block; drop to disk if applicable
- if (level.useDisk) {
- logWarning(s"Persisting block $blockId to disk instead.")
- diskStore.put(blockId) { channel =>
- val out = Channels.newOutputStream(channel)
- partiallySerializedValues.finishWritingToStream(out)
- }
- size = diskStore.getSize(blockId)
- } else {
- iteratorFromFailedMemoryStorePut = Some(partiallySerializedValues.valuesIterator)
- }
- }
- }
- } else if (level.useDisk) {// 对于存储级别不允许存入内存的情况, 我们只能选择存入磁盘
- diskStore.put(blockId) { channel =>
- val out = Channels.newOutputStream(channel)
- // 存储到磁盘是一定要序列化的
- serializerManager.dataSerializeStream(blockId, out, iterator())(classTag)
- }
- size = diskStore.getSize(blockId)
- }
- // 获取刚刚刚刚写入的块的状态信息
- val putBlockStatus = getCurrentBlockStatus(blockId, info)
- val blockWasSuccessfullyStored = putBlockStatus.storageLevel.isValid
- // 如果块存储成功, 那么进行接下来的动作
- if (blockWasSuccessfullyStored) {
- // Now that the block is in either the memory or disk store, tell the master about it.
- info.size = size
- // 向 driver 汇报块信息
- if (tellMaster && info.tellMaster) {
- reportBlockStatus(blockId, putBlockStatus)
- }
- // 更新任务度量系统中关于块信息的相关统计值
- addUpdatedBlockStatusToTaskMetrics(blockId, putBlockStatus)
- logDebug("Put block %s locally took %s".format(blockId, Utils.getUsedTimeMs(startTimeMs)))
- // 如果副本数大于 1, 那么需要进行额外的复制
- if (level.replication> 1) {
- val remoteStartTime = System.currentTimeMillis
- val bytesToReplicate = doGetLocalBytes(blockId, info)
- // [SPARK-16550] Erase the typed classTag when using default serialization, since
- // NettyBlockRpcServer crashes when deserializing repl-defined classes.
- // TODO(ekl) remove this once the classloader issue on the remote end is fixed.
- val remoteClassTag = if (!serializerManager.canUseKryo(classTag)) {
- scala.reflect.classTag[Any]
- } else {
- classTag
- }
- try {
- replicate(blockId, bytesToReplicate, level, remoteClassTag)
- } finally {
- bytesToReplicate.dispose()
- }
- logDebug("Put block %s remotely took %s"
- .format(blockId, Utils.getUsedTimeMs(remoteStartTime)))
- }
- }
- assert(blockWasSuccessfullyStored == iteratorFromFailedMemoryStorePut.isEmpty)
- iteratorFromFailedMemoryStorePut
- }
- }
总结一下这段代码的主要逻辑:
如果存储等级允许存入内存, 那么优先存入内存中. 根据存储的数据是否需要序列化分别选择调用 memoryStore 的不同方法.
如果存储等级不允许内存, 那么只能存入磁盘中, 存入磁盘中的数据一定是经过序列化的, 这点要注意.
向 BlockManagerMaster 汇报刚写入的块的位置信息
更新任务度量系统中关于块信息的相关统计值
如果副本数大于 1, 那么需要进行额外的复制
从上面的步骤可以看到, 在完成数据写入后, 会通过 rpc 调用向 BlockManagerMaster 汇报块的信息, 这也解答了 blockManagerMaster.getLocations 方法从内存的 map 结构中查询块的位置信息的来源.
单纯就存储数据来说, 最重要的无疑是内存管理器 MemoryStore 和磁盘管理器 DiskStore.
对于 MemoryStore 和 DiskStore 调用的存储方法有:
- memoryStore.putIteratorAsValues
- memoryStore.putIteratorAsBytes
- diskStore.put(blockId: BlockId)(writeFunc: WritableByteChannel => Unit): Unit
- diskStore.getSize(blockId)
- blockManager.putBytes
我们再来接着看另一个写入方法, putBytes, 即写入字节数组数据. 它的实际写入的逻辑在 doPutBytes 方法中, 我们看一下这个方法:
blockManager.doPutBytes
这个方法的主要步骤与 doPutIterator 方法差不多. 只不过 doPutIterator 方法插入的是 java 对象, 如果存储级别要求序列化或者存储到磁盘时, 需要将对象序列化.
- private def doPutBytes[T](
- blockId: BlockId,
- bytes: ChunkedByteBuffer,
- level: StorageLevel,
- classTag: ClassTag[T],
- tellMaster: Boolean = true,
- keepReadLock: Boolean = false): Boolean = {
- doPut(blockId, level, classTag, tellMaster = tellMaster, keepReadLock = keepReadLock) { info =>
- val startTimeMs = System.currentTimeMillis
- // Since we're storing bytes, initiate the replication before storing them locally.
- // This is faster as data is already serialized and ready to send.
- // 启动副本复制
- val replicationFuture = if (level.replication> 1) {
- Future {
- // This is a blocking action and should run in futureExecutionContext which is a cached
- // thread pool. The ByteBufferBlockData wrapper is not disposed of to avoid releasing
- // buffers that are owned by the caller.
- replicate(blockId, new ByteBufferBlockData(bytes, false), level, classTag)
- }(futureExecutionContext)
- } else {
- null
- }
- val size = bytes.size
- // 如果缓存级别中包含内存, 优先写入内存中
- if (level.useMemory) {
- // Put it in memory first, even if it also has useDisk set to true;
- // We will drop it to disk later if the memory store can't hold it.
- // 是否以序列化形式存储
- val putSucceeded = if (level.deserialized) {
- val values =
- serializerManager.dataDeserializeStream(blockId, bytes.toInputStream())(classTag)
- memoryStore.putIteratorAsValues(blockId, values, classTag) match {
- case Right(_) => true
- case Left(iter) =>
- // If putting deserialized values in memory failed, we will put the bytes directly to
- // disk, so we don't need this iterator and can close it to free resources earlier.
- iter.close()
- false
- }
- } else {
- // 如果以序列化格式存储, 则不需要反序列化
- val memoryMode = level.memoryMode
- memoryStore.putBytes(blockId, size, memoryMode, () => {
- // 如果存在非直接内存, 那么需要将数据拷贝一份到直接内存中
- if (memoryMode == MemoryMode.OFF_HEAP &&
- bytes.chunks.exists(buffer => !buffer.isDirect)) {
- bytes.copy(Platform.allocateDirectBuffer)
- } else {
- bytes
- }
- })
- }
- // 如果插入内存失败, 并且允许写入磁盘的话, 就将数据写入磁盘
- // 插入内存失败一般是因为内存不够引起
- if (!putSucceeded && level.useDisk) {
- logWarning(s"Persisting block $blockId to disk instead.")
- diskStore.putBytes(blockId, bytes)
- }
- } else if (level.useDisk) {// 如果只允许存储到磁盘, 那就只能存到磁盘了
- // 存储到磁盘的数据一定是序列化的
- diskStore.putBytes(blockId, bytes)
- }
- // 刚刚插入的块的信息
- val putBlockStatus = getCurrentBlockStatus(blockId, info)
- val blockWasSuccessfullyStored = putBlockStatus.storageLevel.isValid
- if (blockWasSuccessfullyStored) {
- // Now that the block is in either the memory or disk store,
- // tell the master about it.
- info.size = size
- // 向 driver 端的 BlockManagerMaster 组件汇报块信息
- if (tellMaster && info.tellMaster) {
- reportBlockStatus(blockId, putBlockStatus)
- }
- // 更新任务度量值
- addUpdatedBlockStatusToTaskMetrics(blockId, putBlockStatus)
- }
- logDebug("Put block %s locally took %s".format(blockId, Utils.getUsedTimeMs(startTimeMs)))
- if (level.replication> 1) {
- // Wait for asynchronous replication to finish
- // 等待之前启动的副本复制线程完成
- // 注意这里的超时被设成了无穷大
- try {
- ThreadUtils.awaitReady(replicationFuture, Duration.Inf)
- } catch {
- case NonFatal(t) =>
- throw new Exception("Error occurred while waiting for replication to finish", t)
- }
- }
- if (blockWasSuccessfullyStored) {
- None
- } else {
- Some(bytes)
- }
- }.isEmpty
- }
对于 MemoryStore 和 DiskStore 调用的方法有:
- memoryStore.putBytes
- diskStore.putBytes(blockId, bytes)
总结
综上, 我们把一个 spark 作业运行过程中需要调用到 BlockManager 的时机以及调用的 BlockManager 的一些写入数据的方法大致整理了一下. BlockManager 主要是通过内部的两个组件 MemoryStore 和 DiskStore 来管理数据向内存或磁盘写入的. 此外 DiskBlockManager 组件主要是用来管理 Block 和磁盘文件之间的对应关系, 分配文件路径, 管理本地文件系统路径等作用. 对于 MemoryStore 和 DiskStore 的调用主要有如下几个方法:
- memoryStore.putIteratorAsValues
- memoryStore.putIteratorAsBytes
- diskStore.put(blockId: BlockId)(writeFunc: WritableByteChannel => Unit): Unit
- diskStore.getSize(blockId)
- memoryStore.putBytes
- diskStore.putBytes(blockId, bytes)
来源: https://www.cnblogs.com/zhuge134/p/10995585.html