上篇 spark 源码分析之十五 -- Spark 内存管理剖析 讲解了 Spark 的内存管理机制, 主要是 MemoryManager 的内容. 跟 Spark 的内存管理机制最密切相关的就是内存存储, 本篇文章主要介绍 Spark 内存存储.
总述
跟内存存储的相关类的关系如下:
MemoryStore 是负责内存存储的类, 其依赖于 BlockManager,SerializerManager,BlockInfoManager,MemoryManager.
BlockManager 是 BlockEvictionHandler 的实现类, 负责实现 dropFromMemory 方法, 必要时从内存中把 block 丢掉, 可能会转储到磁盘上.
SerializerManager 是负责持久化的一个类, 可以参考文章 spark 源码分析之十三 -- SerializerManager 剖析做深入了解.
BlockInfoManager 是一个实现了对 block 读写时的一个锁机制, 具体可以看下文.
MemoryManager 是一个内存管理器, 从 Spark 1.6 以后, 其存储内存池大小和执行内存池大小是可以动态扩展的. 即存储内存和执行内存必要时可以从对方内存池借用空闲内存来满足自己的使用需求. 可以参考文章 spark 源码分析之十五 -- Spark 内存管理剖析 做深入了解.
BlockInfo 保存了跟 block 相关的信息.
BlockId 的 name 不同的类型有不同的格式, 代表不同的 block 类型.
StorageLevel 表示 block 的存储级别, 它本身是支持序列化的.
当存储一个集合为序列化字节数组时, 失败的结果由 PartiallySerializedBlock 返回.
当存储一个集合为 Java 对象数组时, 失败的结果由 PartiallyUnrolledIterator 返回.
RedirectableOutputStream 是对另一个 outputstream 的包装 outputstream, 负责直接将数据中转到另一个 outputstream 中.
ValueHolder 是一个内存中转站, 其有一个 getBuilder 方法可以获取到 MemoryEntryBuilder 对象, 该对象会负责将中转站的数据转换为对应的可以保存到 MemStore 中的 MemoryEntry.
我们逐个来分析其源码:
BlockInfo
它记录了 block 的相关信息.
level: StorageLevel 类型, 代表 block 的存储级别
classTag:block 的对应类, 用于选择序列化类
tellMaster:block 的变化是否告知 master. 大部分情况下都是需要告知的, 除了广播的 block.
size: block 的大小 (in byte)
readerCount:block 读的次数
writerTask: 当前持有该 block 写锁的 taskAttemptId, 其中 BlockInfo.NON_TASK_WRITER 表示非 task 任务 持有锁, 比如 driver 线程, BlockInfo.NO_WRITER 表示没有任何代码持有写锁.
- BlockId
- A Block can be uniquely identified by its filename, but each type of Block has a different set of keys which produce its unique name. If your BlockId should be serializable, be sure to add it to the BlockId.apply() method.
其子类, 在上图中已经标明.
BlockInfoManager
文档介绍如下:
Component of the BlockManager which tracks metadata for blocks and manages block locking. The locking interface exposed by this class is readers-writer lock. Every lock acquisition is automatically associated with a running task and locks are automatically released upon task completion or failure. This class is thread-safe.
它有三个成员变量, 如下:
infos 保存了 Block-id 和 block 信息的对应关系.
writeLocksByTask 保存了每一个任务和任务持有写锁的 block-id
readLockByTasks 保存了每一个任务和任务持有读锁的 block-id, 因为读锁是可重入的, 所以 ConcurrentHashMultiset 是支持多个重复值的.
方法如下:
1. 注册 task
2. 获取当前 task
3. 获取读锁
思路: 如果 block 存在, 并且没有 task 在写, 则直接读即可, 否则进入锁等待区等待.
4. 获取写锁
思路: 如果 block 存在, 且没有 task 在读, 也没有 task 在写, 则在写锁 map 上记录 task, 表示已获取写锁, 否则进入等待区等待
5. 断言有 task 持有写锁写 block
6. 写锁降级
思路: 首先把和 block 绑定的 task 取出并和当前 task 比较, 若是同一个 task, 则调用 unlock 方法
7. 释放锁:
思路: 若当前任务持有写锁, 则直接释放, 否则读取次数减 1, 并且从读锁记录中删除一条读锁记录. 最后唤醒在锁等待区等待的 task.
8. 获取为写一个新的 block 获取写锁
9. 释放掉指定 task 的所有锁
思路: 先获取该 task 的读写锁记录, 然后移除写锁记录集中的每一条记录, 移除读锁记录集中的每一条读锁记录.
10. 移除并释放写锁
读写锁记录清零, 解除 block-id 和 block 信息的绑定.
还有一些查询方法, 不再做详细说明.
简单总结一下:
读锁支持可重入, 即可以重复获取读锁. 可以获取读锁的条件是: 没有 task 在写该 block, 对有没有 task 在读 block 没有要求.
写锁当且仅当一个 task 获取, 可以获取写锁的条件是: 没有 task 在读 block, 没有 task 在写 block.
注意, 这种设计可以用在一个 block 的读的次数远大于写的次数的情况下. 我们可以来做个假设: 假设一个 block 写的次数远超过读的次数, 同时多个 task 写同一个 block 的操作就变成了串行的, 写的效率, 因为只有一个 BlockInfoManager 对象, 即一个锁, 即所有在锁等待区等待的 writer 们都在竞争一个锁. 对于读的次数远超过写的次数情况下, reader 们可以肆无忌惮地读取数据数据, 基本处于无锁情况下, 几乎没有了锁切换带来的开销, 并且可以允许不同 task 同时读取同一个 block 的数据, 读的吞吐量也提高了.
总之, BlockInfoManager 自己实现了 block 的一套读写锁机制, 这种读写锁的设计思路是非常经典和值得学习的.
RedirectableOutputStream
文档说明:
A wrapper which allows an open [[OutputStream]] to be redirected to a different sink.
即这个类可以将 outputstream 重定向到另一个 outputstream.
源码也很简单:
os 成员变量就是重定向的目标 outputstream
MemoryEntry
memoryEntry 本质上就是内存中一个 block, 指向了存储在内存中的真实数据.
如上图, 它有两个子类:
其中, DeserializedMemoryEntry 是用来保存反序列化之后的 java 对象数组的, value 是一个数据, 保存着真实的反序列化数据, size 表示, classTag 记录着数组中被擦除的数据的 Class 类型, 这种数据只能保存在堆内内存中.
SerializedMemoryEntry 是用来保存序列化之后的 ByteBuffer 数组的, buffer 中记录的是真实的 Array[ByteBuffer] 数据. memoryMode 表示数据存储的内存区域, 堆外内存还是堆内内存, classTag 记录着序列化前被擦除的数据的 Class 类型, size 表示字节数据大小.
MemoryEntryBuilder
build 方法将内存数据构建到 MemoryEntry 中
ValuesHolder
本质上来说, 就是一个内存中转站. 数据被临时写入到这个中转站, 然后调用其 getBuilder 方法获取 MemoryEntryBuilder 对象, 这个对象用于构建 MemoryEntry 对象.
storeValues 用于写入数据, estimateSize 用于评估 holder 中内存的大小. 调用 getBuilder 之后会返回 MemoryEntryBuilder 对象, 后续可以拿这个 builder 创建 MemoryEntry
调用 getBuilder 之后, 会关闭流, 禁止数据写入.
它有两个子类: 用于中转 Java 对象的 DeserializedValuesHolder 和用于中转字节数据的 SerializedValuesHolder.
其实现类具体如下:
1. DeserializedValuesHolder
2. SerializedValuesHolder
接下来, 我们看一下 Spark 内存存储中的重头戏 -- MemoryStore
MemoryStore
文档说明:
Stores blocks in memory, either as Arrays of deserialized Java objects or as serialized ByteBuffers.
类内部结构如下:
对成员变量的说明:
entries 本质上就是在内存中保存 blockId 和 block 内容的一个 map, 它的 accessOrder 为 true, 即最近访问的会被移动到链表尾部.
onHeapUnrollMemoryMap 记录了 taskAttemptId 和需要摊开一个 block 需要的堆内内存大小的关系
offHeapUnrollMemoryMap 记录了 taskAttemptId 和需要摊开一个 block 需要的堆外内存大小的关系
unrollMemoryThreshold 表示在摊开一个 block 之前给 request 分配的初始内存, 可以通过 spark.storage.unrollMemoryThreshold 来调整, 默认是 1MB
下面, 开门见山, 直接剖析比较重要的方法:
1. putBytes: 这个方法只被 BlockManager 调用, 其中_bytes 回调用于生成直接被缓存的 ChunkedByteBuffer:
思路: 先从 MemoryManager 中申请内存, 如果申请成功, 则调用回调方法 _bytes 获取 ChunkedByteBuffer 数据, 然后封装成 SerializedMemoryEntry 对象 , 最后将封装好的 SerializedMemoryEntry 对象缓存到 entries 中.
2. 把迭代器中值保存为内存中的 Java 对象
思路: 转换为 DeserializedValueHolder 对象, 进而调用 putIterator 方法, ValueHolder 就是一个抽象, 使得 putIterator 既可以缓存序列化的字节数据又可以缓存 Java 对象数组.
3. 把迭代器中值保存为内存中的序列化字节数据
思路: 转换为 SerializedValueHolder 对象, 进而调用 putIterator 方法.
MAX_ROUND_ARRARY_LENGTH 和 unrollMemoryThreshold 的定义如下:
- public static int MAX_ROUNDED_ARRAY_LENGTH = Integer.MAX_VALUE - 15;
- private val unrollMemoryThreshold: Long = conf.getLong("spark.storage.unrollMemoryThreshold", 1024 * 1024)
unrollMemoryThreshold 默认是 1MB, 可以通过 spark.storage.unrollMemoryThreshold 参数调整大小.
4. putIterator 方法由参数 ValueHolder, 使得缓存字节数据和 Java 对象可以放到一个方法来. 方法 2 跟 3 都调用了 putIterator 方法, 如下:
思路:
第一步: 定义摊开内存初始化大小, 摊开内存增长率, 摊开内存检查频率等变量.
第二步: 向 MemoryManager 请求申请摊开初始内存, 若成功, 则记录这笔摊开内存.
第三步: 然后进入 223~240 行的 while 循环, 在这个循环里:
循环条件: 如果还有值需要摊开并且上次内存申请是成功的, 则继续进行该次循环
不断想 ValueHolder 中 add 数据. 如果摊开的元素个数不是 UNROLL_MEMORY_CHECK_PERIOD 的整数倍, 则摊开个数加 1; 否则, 查看 ValueHolder 中的内存是否大于了已分配内存, 若大于, 则请求 MemoryManager 分配内存, 并将分配的内存累加到已分配内存中.
第四步:
若上一次向 MemoryManager 申请内存成功, 则从 ValueHolder 中获取 builder, 并且计算准确内存开销. 查看准确内存是否大于了已分配内存, 若大于, 则请求 MemoryManager 分配内存, 并将分配的内存累加到已分配内存中.
否则, 否则打印内存使用情况, 返回为摊开该 block 申请的内存
第五步:
若上一次向 MemoryManager 申请内存成功, 首先调用 MemoryEntryBuilder 的 build 方法构建出可以直接存入内存的 MemoryEntry, 并向 MemoryManager 请求释放摊开内存, 申请存储内存, 并确保存储内存申请成功. 最后将数据存入内存的 entries 中.
否则打印内存使用情况, 返回为摊开该 block 申请的内存
其实之前不是很理解 unroll 这个词在这里的含义, 一直译作摊开, 它其实指的就是集合的数据转储到中转站这个操作, 摊开内存指这个操作需要的内存.
下面来看一下这个方法里面依赖的常量和方法:
4. 1 unrollMemoryThreshold 在上一个方法已做说明. UNROLL_MEMORY_CHECK_PERIOD 和 UNROLL_MEMORY_GROWTH_FACTOR 常量定义如下:
即, UNROLL_MEMORY_CHECK_PERIOD 默认是 16,UNROLL_MEMORY_GROWTH_FACTOR 默认是 1.5
4.2 reserveUnrollMemoryForThisTask 方法源码如下, 思路大致上是先从 MemoryManager 申请摊开内存, 若成功, 则根据 memoryMode 在堆内或堆外记录摊开内存的 map 上记录新分配的内存.
4.3 releaseUnrollMemoryForThisTask 方法如下, 实现思路: 先根据 memoryMode 获取到对应记录堆内或堆外内存的使用情况的 map, 然后在该 task 的摊开内存上减去这笔内存开销, 如果减完之后, task 使用内存为 0, 则直接从 map 中移除对该 task 的内存记录.
4.4 日志打印 block 摊开内存和当前内存使用情况
5. 获取缓存的值:
思路: 直接根据 blockId 从 entries 中取出 MemoryEntry 数据, 然后根据 MemoryEntry 类型取出数据即可.
6. 移除 Block 或清除缓存, 比较简单, 不做过多说明:
7. 尝试驱逐 block 来释放指定大小的内存空间来存储给定的 block, 代码如下:
该方法有三个参数: 要分配内存的 blockId,block 大小, 内存类型 (堆内还是堆外).
第 469~485 行: dropBlock 方法思路: 先从 MemoryEntry 中获取 data, 再调用 BlockManager 从内存中驱逐出该 block, 如果该 block 的 StorageLevel 允许落地到磁盘, 则先落到磁盘, 再从内存中删除之, 最后更新该 block 的 StorageLevel, 最后检查新的 StorageLevel, 若该 block 还在内存或磁盘中, 则释放锁, 否则, 直接从 BlockInfoManager 中删除之.
第 443 行: 找到 block 对应的 rdd.
第 451~467 行: 先给 entries 上锁, 然后遍历 entries 集合, 检查 block 是否可以从内存中驱逐, 若可以则把它加入到 selectedBlocks 集合中, 并把该 block 大小累加到 freedMemory 中.
461 行的 lockForWriting 方法, 不堵塞, 即如果第一次拿不到写锁, 则一直不停地轮询, 直到可以拿到写锁为止. 那么问题来了, 为什么要先获取写锁呢? 因为写锁具有排他性并且不具备可重入性, 一旦拿到写锁, 其他锁就不能再访问该 block 了.
487 行~ 528 行: 若计划要释放的内存小于存储新 block 需要的内存大小, 则直接释放写锁, 不从内存中驱逐之前选择的 block, 直接返回.
若计划要释放的内存不小于存储新 block 需要的内存大小, 则遍历之前选择的每一个 block, 获取 entry, 并调用 dropMemory 方法, 返回释放的内存大小. finally 代码块是防止在 dropMemory 过程中, 该线程被中断, 其余 block 写锁不能被释放的情况.
其依赖的方法如下:
存储内存失败之后, 会返回 PartiallySerializedBlock 或者 PartiallyUnrolledIterator.
PartiallyUnrolledIterator 是一个 Iterator, 可以用来遍历 block 数据, 同时负责释放摊开内存.
PartiallySerializedBlock 它可以将失败的 block 转化成 PartiallyUnrolledIterator 用来遍历, 可以直接丢弃失败的 block, 也可以把数据转储到给定的可以落地的 outputstream 中, 同时释放摊开内存.
总结:
本篇文章主要讲解了 Spark 的内存存储相关的内容, 重点讲解了 BlockInfoManager 实现的锁机制, 跟 ValuesHolder 中转站相关的 MemoryEntry,EmmoryEntryBuilder 等相关内容以及内存存储中的重头戏 -- MemStore 相关的 Block 存储, Block 释放, 为新 Block 驱逐内存等等功能.
来源: https://www.cnblogs.com/johnny666888/p/11210233.html