Spark 有两种内存管理模式, 静态内存管理 (Static MemoryManager) 和动态 (统一) 内存管理(Unified MemoryManager). 动态内存管理从 Spark1.6 开始引入, 在 SparkEnv.scala 中的源码可以看到, Spark 目前默认采用动态内存管理模式, 若将 spark.memory.useLegacyMode 设置为 true, 则会改为采用静态内存管理.
一, Spark 内存管理模式
Spark 有两种内存管理模式, 静态内存管理 (Static MemoryManager) 和动态 (统一) 内存管理(Unified MemoryManager). 动态内存管理从 Spark1.6 开始引入, 在 SparkEnv.scala 中的源码可以看到, Spark 目前默认采用动态内存管理模式, 若将 spark.memory.useLegacyMode 设置为 true, 则会改为采用静态内存管理.
- // SparkEnv.scala
- val useLegacyMemoryManager = conf.getBoolean("spark.memory.useLegacyMode", false)
- val memoryManager: MemoryManager =
- if (useLegacyMemoryManager) {
- new StaticMemoryManager(conf, numUsableCores)
- } else {
- UnifiedMemoryManager(conf, numUsableCores)
- }
二, Spark 动态内存管理空间分配
相比于 Static MemoryManager 模式, Unified MemoryManager 模型打破了存储内存和运行内存的界限, 使每一个内存区能够动态伸缩, 降低 OOM 的概率. 由上图可知, executor JVM 内存主要由以下几个区域组成:
(1)Reserved Memory(预留内存): 这部分内存预留给系统使用, 默认为 300MB, 可通过 spark.testing.reservedMemory 进行设置.
- // UnifiedMemoryManager.scala
- private val RESERVED_SYSTEM_MEMORY_BYTES = 300 * 1024 * 1024
另外, JVM 内存的最小值也与 reserved Memory 有关, 即 minSystemMemory = reserved Memory*1.5, 即默认情况下 JVM 内存最小值为 300MB*1.5=450MB.
- // UnifiedMemoryManager.scala
- val minSystemMemory = (reservedMemory * 1.5).ceil.toLong
(2)Spark Memeoy: 分为 execution Memory 和 storage Memory. 去除掉 reserved Memory, 剩下 usableMemory 的一部分用于 execution 和 storage 这两类堆内存, 默认是 0.6, 可通过 spark.memory.fraction 进行设置. 例如: JVM 内存是 1G, 那么用于 execution 和 storage 的默认内存为(1024-300)*0.6=434MB.
- // UnifiedMemoryManager.scala
- val usableMemory = systemMemory - reservedMemory
- val memoryFraction = conf.getDouble("spark.memory.fraction", 0.6)
- (usableMemory * memoryFraction).toLong
他们的边界由 spark.memory.storageFraction 设定, 默认为 0.5. 即默认状态下 storage Memory 和 execution Memory 为 1:1.
- // UnifiedMemoryManager.scala
- onHeapStorageRegionSize =
- (maxMemory * conf.getDouble("spark.memory.storageFraction", 0.5)).toLong,
- numCores = numCores)
(3)user Memory: 剩余内存, 用户根据需要使用, 默认占 usableMemory 的(1-0.6)=0.4.
三, 内存控制详解
首先我们先来了解一下 Spark 内存管理实现类之前的关系.
1.MemoryManager 主要功能是:(1)记录用了多少 StorageMemory 和 ExecutionMemory;(2)申请 Storage,Execution 和 Unroll Memory;(3)释放 Stroage 和 Execution Memory.
Execution 内存用来执行 shuffle,joins,sorts 和 aggegations 操作, Storage 内存用于缓存和广播数据, 每一个 JVM 中都存在着一个 MemoryManager. 构造 MemoryManager 需要指定 onHeapStorageMemory 和 onHeapExecutionMemory 参数.
- // MemoryManager.scala
- private[spark] abstract class MemoryManager(
- conf: SparkConf,
- numCores: Int,
- onHeapStorageMemory: Long,
- onHeapExecutionMemory: Long) extends Logging {
创建 StorageMemoryPool 和 ExecutionMemoryPool 对象, 用来创建堆内或堆外的 Storage 和 Execution 内存池, 管理 Storage 和 Execution 的内存分配.
- // MemoryManager.scala
- @GuardedBy("this")
- protected val onHeapStorageMemoryPool = new StorageMemoryPool(this, MemoryMode.ON_HEAP)
- @GuardedBy("this")
- protected val offHeapStorageMemoryPool = new StorageMemoryPool(this, MemoryMode.OFF_HEAP)
- @GuardedBy("this")
- protected val onHeapExecutionMemoryPool = new ExecutionMemoryPool(this, MemoryMode.ON_HEAP)
- @GuardedBy("this")
- protected val offHeapExecutionMemoryPool = new ExecutionMemoryPool(this, MemoryMode.OFF_HEAP)
默认情况下, 不使用堆外内存, 可通过 saprk.memory.offHeap.enabled 设置, 默认堆外内存为 0, 可使用 spark.memory.offHeap.size 参数设置.
- // All the code you will ever need
- final val tungstenMemoryMode: MemoryMode = {
- if (conf.getBoolean("spark.memory.offHeap.enabled", false)) {
- require(conf.getSizeAsBytes("spark.memory.offHeap.size", 0) > 0,
- "spark.memory.offHeap.size must be > 0 when spark.memory.offHeap.enabled == true")
- require(Platform.unaligned(),
- "No support for unaligned Unsafe. Set spark.memory.offHeap.enabled to false.")
- MemoryMode.OFF_HEAP
- } else {
- MemoryMode.ON_HEAP
- }
- }
- // MemoryManager.scala
- protected[this] val maxOffHeapMemory = conf.getSizeAsBytes("spark.memory.offHeap.size", 0)
释放 numBytes 字节的 Execution 内存方法
- // MemoryManager.scala
- def releaseExecutionMemory(
- numBytes: Long,
- taskAttemptId: Long,
- memoryMode: MemoryMode): Unit = synchronized {
- memoryMode match {
- case MemoryMode.ON_HEAP => onHeapExecutionMemoryPool.releaseMemory(numBytes, taskAttemptId)
- case MemoryMode.OFF_HEAP => offHeapExecutionMemoryPool.releaseMemory(numBytes, taskAttemptId)
- }
- }
释放指定 task 的所有 Execution 内存并将该 task 标记为 inactive.
- // MemoryManager.scala
- private[memory] def releaseAllExecutionMemoryForTask(taskAttemptId: Long): Long = synchronized {
- onHeapExecutionMemoryPool.releaseAllMemoryForTask(taskAttemptId) +
- offHeapExecutionMemoryPool.releaseAllMemoryForTask(taskAttemptId)
- }
释放 numBytes 字节的 Stoarge 内存方法
- // MemoryManager.scala
- def releaseStorageMemory(numBytes: Long, memoryMode: MemoryMode): Unit = synchronized {
- memoryMode match {
- case MemoryMode.ON_HEAP => onHeapStorageMemoryPool.releaseMemory(numBytes)
- case MemoryMode.OFF_HEAP => offHeapStorageMemoryPool.releaseMemory(numBytes)
- }
- }
释放所有 Storage 内存方法
- // MemoryManager.scala
- final def releaseAllStorageMemory(): Unit = synchronized {
- onHeapStorageMemoryPool.releaseAllMemory()
- offHeapStorageMemoryPool.releaseAllMemory()
- }
2. 接下来我们了解一下, UnifiedMemoryManager 是如何对内存进行控制的? 动态内存是如何实现的呢?
UnifiedMemoryManage 继承了 MemoryManager
- // UnifiedMemoryManage.scala
- private[spark] class UnifiedMemoryManager private[memory] (
- conf: SparkConf,
- val maxHeapMemory: Long,
- onHeapStorageRegionSize: Long,
- numCores: Int)
- extends MemoryManager(
- conf,
- numCores,
- onHeapStorageRegionSize,
- maxHeapMemory - onHeapStorageRegionSize) {
重写了 maxOnHeapStorageMemory 方法, 最大 Storage 内存 = 最大内存 - 最大 Execution 内存.
- // UnifiedMemoryManage.scala
- override def maxOnHeapStorageMemory: Long = synchronized {
- maxHeapMemory - onHeapExecutionMemoryPool.memoryUsed
- }
核心方法 acquireStorageMemory: 申请 Storage 内存.
- // UnifiedMemoryManage.scala
- override def acquireStorageMemory(
- blockId: BlockId,
- numBytes: Long,
- memoryMode: MemoryMode): Boolean = synchronized {
- assertInvariants()
- assert(numBytes >= 0)
- val (executionPool, storagePool, maxMemory) = memoryMode match {
- // 根据不同的内存模式去创建 StorageMemoryPool 和 ExecutionMemoryPool
- case MemoryMode.ON_HEAP => (
- onHeapExecutionMemoryPool,
- onHeapStorageMemoryPool,
- maxOnHeapStorageMemory)
- case MemoryMode.OFF_HEAP => (
- offHeapExecutionMemoryPool,
- offHeapStorageMemoryPool,
- maxOffHeapMemory)
- }
- if (numBytes > maxMemory) {
- // 若申请内存大于最大内存, 则申请失败
- logInfo(s"Will not store $blockId as the required space ($numBytes bytes) exceeds our" +
- s"memory limit ($maxMemory bytes)")
- return false
- }
- if (numBytes > storagePool.memoryFree) {
- // 如果 Storage 内存池没有足够的内存, 则向 Execution 内存池借用
- val memoryBorrowedFromExecution = Math.min(executionPool.memoryFree, numBytes)// 当 Execution 内存有空闲时, Storage 才能借到内存
- executionPool.decrementPoolSize(memoryBorrowedFromExecution)// 缩小 Execution 内存
- storagePool.incrementPoolSize(memoryBorrowedFromExecution)// 增加 Storage 内存
- }
- storagePool.acquireMemory(blockId, numBytes)
- }
核心方法 acquireExecutionMemory: 申请 Execution 内存.
- // UnifiedMemoryManage.scala
- override private[memory] def acquireExecutionMemory(
- numBytes: Long,
- taskAttemptId: Long,
- memoryMode: MemoryMode): Long = synchronized {// 使用了 synchronized 关键字, 调用 acquireExecutionMemory 方法可能会阻塞, 直到 Execution 内存池有足够的内存.
- ...
- executionPool.acquireMemory(
- numBytes, taskAttemptId, maybeGrowExecutionPool, computeMaxExecutionPoolSize)
- }
方法最后调用了 ExecutionMemoryPool 的 acquireMemory 方法, 该方法的参数需要两个函数: maybeGrowExecutionPool()和 computeMaxExecutionPoolSize().
每个 Task 能够使用的内存被限制在 pooSize / (2 * numActiveTask) ~ maxPoolSize / numActiveTasks. 其中 maxPoolSize 代表了 execution pool 的最大内存, poolSize 表示当前这个 pool 的大小.
- // ExecutionMemoryPool.scala
- val maxPoolSize = computeMaxPoolSize()
- val maxMemoryPerTask = maxPoolSize / numActiveTasks
- val minMemoryPerTask = poolSize / (2 * numActiveTasks)
maybeGrowExecutionPool()方法实现了如何动态增加 Execution 内存区的大小. 在每次申请 execution 内存的同时, execution 内存池会进行多次尝试, 每次尝试都可能会回收一些存储内存.
- // UnifiedMemoryManage.scala
- def maybeGrowExecutionPool(extraMemoryNeeded: Long): Unit = {
- if (extraMemoryNeeded > 0) {// 如果申请的内存大于 0
- // 计算 execution 可借到的 storage 内存, 是 storage 剩余内存和可借出内存的最大值
- val memoryReclaimableFromStorage = math.max(
- storagePool.memoryFree,
- storagePool.poolSize - storageRegionSize)
- if (memoryReclaimableFromStorage > 0) {// 如果可以申请到内存
- val spaceToReclaim = storagePool.freeSpaceToShrinkPool(
- math.min(extraMemoryNeeded, memoryReclaimableFromStorage))// 实际需要的内存, 取实际需要的内存和 storage 内存区域全部可用内存大小的最小值
- storagePool.decrementPoolSize(spaceToReclaim)//storage 内存区域减少
- executionPool.incrementPoolSize(spaceToReclaim)//execution 内存区域增加
- }
- }
- }
来源: http://stor.51cto.com/art/201806/575484.htm