我们都知道 spark 之所以比 mapreduce 计算的快, 是因为他是基于内存的, 不用每次计算完都写磁盘, 再读取出来进行下一次计算, spark 直接把内存作为数据的临时储存介质. 所以 mapreduce 就没有强调内存管理, 而 spark 需要管理内存.
内存不过是计算机分级存储系统中的靠近 CPU 的一个存储介质.
spark 运行起来内存里都存的啥?
如何管理里面所存的东西?
spark 用 java 和 scala 这样的 jvm 语言写的, 没有像 c 语言那样显式申请释放内存, 如何进行内存的管理的?
我们应该如何设置 spark 关于内存的参数?
我们一起来解决这些问题
一, 内存模型
远古大神曾告诉我们这个神秘公式: 程序 = 算法 + 数据.
1.1 什么是内存模型
内存模型就是告诉我们怎么划分内存, 怎么合理利用我们的内存.
首先我们要存什么, 根据大神的公式, 我们这样来分析:
数据: 就是我们代码操作的数据, 比如人的数据 (年龄, 职位等) 或者输入的某个值. 这些可在运行时将要计算的部分数据加载到内存.
算法: 就是操作数据的逻辑, 表现形式就是代码或者编译后的指令. 当然它要运行起来, 会依赖一部分内存, 来储存程序计数器(代码执行到那一句了), 函数调用栈等运行时需要的数据. 总而言之就是执行数据操作逻辑所必要的内存.
这下我们就可以把我们需要储存的东西分为数据区和执行区.
二, spark 内存模型
2.1 spark 为啥快
我们都知道 spark 之所以比 mapreduce 计算的快, 是因为他是基于内存的, 不用每次计算完都写磁盘, 再读取出来进行下一次计算, spark 直接把内存作为数据的临时储存介质. 所以 mapreduce 就没有强调内存管理, 而 spark 需要管理内存.
2.2 spark 管理的内存
系统区: spark 运行自身的代码需要一定的空间.
用户区: 我们自己写的一些 udf 之类的代码也需要一定的空间来运行.
存储区: spark 的任务就是操作数据, spark 为了快可能把数据存内存, 而这些数据也需要占用空间.
执行区: spark 操作数据的单元是 partition,spark 在执行一些 shuffle,join,sort,aggregation 之类的操作, 需要把 partition 加载到内存进行运算, 这也会运用到部分内存.
2.3 spark 内存模型
spark 内存
上图就是 spark 内存划分的图了
我们从下到上一层一层的解释:
第 1 层: 整个 excutor 所用到的内存
第 2 层: 分为 jvm 中的内存和 jvm 外的内存, 这里的 jvm 内存在 yarn 的时候就是指申请的 container 的内存
第 3 层: 对于 spark 来内存分为 jvm 堆内的和 memoryoverhead,off-heap
jvm 堆内的下一层再说
memoryOverhead 对应的参数就是 spark.yarn.executor.memoryOverhead 这块内存是用于虚拟机的开销, 内部的字符串, 还有一些本地开销 (比如 python 需要用到的内存) 等. 其实就是额外的内存, spark 并不会对这块内存进行管理.
off-heap 这里特指的 spark.memory.offHeap.size 这个参数指定的内存(广义上是指所有堆外的). 这部分内存的申请和释放是直接进行的不通过 jvm 管控所以没有 GC, 被 spark 分为 storage 和 excution 两部分和第 5 层讲的一同被 spark 统一进行管理.
第 4 层: jvm 堆内的内存分为三个部分
reservedMemory 预留内存 300M, 用于保障 spark 正常运行
other memory 用于 spark 内部的一些元数据, 用户的数据结构, 防止出现对内存估计不足导致 oom 时的内存缓冲, 占用空间比较大的记录做缓冲
memory faction spark 主要控制的内存, 由参数 spark.memory.fraction 控制.
第 5 层: 分成 storage 和 execution 由参数 spark.memory.storageFraction 控制它两的大小, 但是
execution 用于 spark 的计算: shuffle,sort,aggregation 等这些计算时会用到的内存, 如果计算是内存不足会向 storage 部分借, 如果还是不够就会 spill 到磁盘.
storage 主要用于 rdd 的缓存, 如果 execution 来借内存, 可能会牺牲自己丢弃缓存来借给 execution,storage 也可以向 execution 借内存, 但 execution 不会牺牲自己.
三, 源码层面
3.1 整体架构
内存管理
内存申请和释放(绿色):
看上图绿色那块, 就是内存的申请和释放模块. MemoryAllocator 接口负责内存申请, 有两个子类实现分别负责堆内内存和 off-heap 内存.
内存池(粉色):
MemoryPool 内存池有两个子类分别管理着执行内存和储存内存. 可以看到两种内存池的申请方法的参数有很明显的区别, 执行内存主要是面向 task 的, 而储存内存主要是面向 block 的也就是用于 rdd 缓存呀啥的.
统一内存管理:
MemoryManager 负责记录内存的消耗, 管理这 4 个内存池, 子类 UnifiedMemoryManager 负责把这执行内存和储存内存统一起来管理, 实现相互借用之类的功能.
MemoryManager 的使用场景
一个是 BlockManager 用于管理储存, 还有一部分是运行 Task 是的内存使用, 主要有 executor 的使用, shuffle 时 spill 呀外部排序呀, 这样的场景.
3.2 如何实现内存申请释放.
spark 是用 scala 和 java 实现的, 印象中没有管理内存申请释放的 API,spark 是如何利用这些 jvm 语言管理内存的呢.
我们来看看源码片段
- //HeapMemoryAllocator.scala
- private final Map>> bufferPoolsBySize = new HashMap<>();
- ......
- public MemoryBlock allocate(long size) throws OutOfMemoryError {
...... 上面是些内存的判断 ......
- long[] array = new long[numWords];
- // 上面这就很关键了
- MemoryBlock memory = new MemoryBlock(array, Platform.LONG_ARRAY_OFFSET, size);
- if (MemoryAllocator.MEMORY_DEBUG_FILL_ENABLED) {
- memory.fill(MemoryAllocator.MEMORY_DEBUG_FILL_CLEAN_VALUE);
- }
- return memory;
- }
HeapMemoryAllocator 可以看到上面的源码片段, 实际的内存申请是这个代码: new long[numWords]; 就是 new 了个数组来占着内存, 用 MemoryBlock 包装了一下. bufferPoolsBySize 这个是为了防止内存频繁申请和释放做的 buffer.
接下来看看 off-heap 是怎么申请内存的.
- //UnsafeMemoryAllocator
- public MemoryBlock allocate(long size) throws OutOfMemoryError {
- long address = Platform.allocateMemory(size);
- MemoryBlock memory = new MemoryBlock(null, address, size);
- if (MemoryAllocator.MEMORY_DEBUG_FILL_ENABLED) {
- memory.fill(MemoryAllocator.MEMORY_DEBUG_FILL_CLEAN_VALUE);
- }
- return memory;
- }
offheap 的就和 C 语言一样的了可以直接使用 API 来申请. 这部分内存就需要自己进行管理了, 没有 jvm 的控制, 没有内存回收机制.
当然这也不意味了你能无限制的使用内存, 在 yarn 的情况下, yarn 是监测子进程的内存占用来看你是否超了内存, 如果超了直接 kill 掉.
四, 总结
我们能回答开头提出的几个问题了吗? 还是又有了更多的问题呢.
来源: http://stor.51cto.com/art/201910/604029.htm