RDD 是分布式内存的一个抽象概念, 是一种高度受限的共享内存模型, 即 RDD 是只读的记录分区的集合, 能横跨集群所有节点并行计算, 是一种基于工作集的应用抽象.
RDD 底层存储原理: 其数据分布存储于多台机器上, 事实上, 每个 RDD 的数据都以 Block 的形式存储于多台机器上, 每个 Executor 会启动一个 BlockManagerSlave, 并管理一部分 Block; 而 Block 的元数据由 Driver 节点上的 BlockManagerMaster 保存, BlockManagerSlave 生成 Block 后向 BlockManagerMaster 注册该 Block,BlockManagerMaster 管理 RDD 与 Block 的关系, 当 RDD 不再需要存储的时候, 将向 BlockManagerSlave 发送指令删除相应的 Block.
BlockManager 管理 RDD 的物理分区, 每个 Block 就是节点上对应的一个数据块, 可以存储在内存或者磁盘上. 而 RDD 中的 Partition 是一个逻辑数据块, 对应相应的物理块 Block. 本质上, 一个 RDD 在代码中相当于数据的一个元数据结构, 存储着数据分区及其逻辑结构映射关系, 存储着 RDD 之前的依赖转换关系.
BlockManager 在每个节点上运行管理 Block(Driver 和 Executors), 它提供一个接口检索本地和远程的存储变量, 如 memory,disk,off-heap. 使用 BlockManager 前必须先初始化. BlockManager.scala 的部分源码如下所示:
- private[spark] class BlockManager(
- executorId: String,
- rpcEnv: RpcEnv,
- val master: BlockManagerMaster,
- serializerManager: SerializerManager,
- val conf: SparkConf,
- memoryManager: MemoryManager,
- mapOutputTracker: MapOutputTracker,
- shuffleManager: ShuffleManager,
- val blockTransferService: BlockTransferService,
- securityManager: SecurityManager,
- numUsableCores: Int)
- extends BlockDataManager with BlockEvictionHandler with Logging {
BlockManagerMaster 会持有整个 Application 的 Block 的位置, Block 所占用的存储空间等元数据信息, 在 Spark 的 Driver 的 DAGScheduler 中, 就是通过这些信息来确认数据运行的本地性的. Spark 支持重分区, 数据通过 Spark 默认的或者用户自定义的分区器决定数据块分布在哪些节点. RDD 的物理分区是由 Block-Manager 管理的, 每个 Block 就是节点上对应的一个数据块, 可以存储在内存或者磁盘. 而 RDD 中的 partition 是一个逻辑数据块, 对应相应的物理块 Block. 本质上, 一个 RDD 在代码中相当于数据的一个元数据结构(一个 RDD 就是一组分区), 存储着数据分区及 Block,Node 等的映射关系, 以及其他元数据信息, 存储着 RDD 之前的依赖转换关系. 分区是一个逻辑概念, Transformation 前后的新旧分区在物理上可能是同一块内存存储.
Spark 通过读取外部数据创建 RDD, 或通过其他 RDD 执行确定的转换 Transformation 操作 (如 map,union 和 groubByKey) 而创建, 从而构成了线性依赖关系, 或者说血统关系 (Lineage), 在数据分片丢失时可以从依赖关系中恢复自己独立的数据分片, 对其他数据分片或计算机没有影响, 基本没有检查点开销, 使得实现容错的开销很低, 失效时只需要重新计算 RDD 分区, 就可以在不同节点上并行执行, 而不需要回滚(Roll Back) 整个程序. 落后任务 (即运行很慢的节点) 是通过任务备份, 重新调用执行进行处理的.
因为 RDD 本身支持基于工作集的运用, 所以可以使 Spark 的 RDD 持久化 (persist) 到内存中, 在并行计算中高效重用. 多个查询时, 我们就可以显性地将工作集中的数据缓存到内存中, 为后续查询提供复用, 这极大地提升了查询的速度. 在 Spark 中, 一个 RDD 就是一个分布式对象集合, 每个 RDD 可分为多个片(Partitions), 而分片可以在集群环境的不同节点上计算.
RDD 作为泛型的抽象的数据结构, 支持两种计算操作算子: Transformation(变换)与 Action(行动). 且 RDD 的写操作是粗粒度的, 读操作既可以是粗粒度的, 也可以是细粒度的. RDD.scala 的源码如下:
- /**
- * Internally, each RDD is characterized by five main properties:
- * 每个 RDD 都有 5 个主要特性
- * - A list of partitions 分区列表
- * - A function for computing each split 每个分区都有一个计算函数
- * - A list of dependencies on other RDDs 依赖于其他 RDD 的列表
- * - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned) 数据类型 (key-value) 的 RDD 分区器
- * - Optionally, a list of preferred locations to compute each split on (e.g. block locations for 每个分区都有一个分区位置列表
- */
- abstract class RDD[T: ClassTag](
- @transient private var _sc: SparkContext,
- @transient private var deps: Seq[Dependency[_]]
- ) extends Serializable with Logging {
其中, SparkContext 是 Spark 功能的主要入口点, 一个 SparkContext 代表一个集群连接, 可以用其在集群中创建 RDD, 累加变量, 广播变量等, 在每一个可用的 JVM 中只有一个 SparkContext, 在创建一个新的 SparkContext 之前, 必须先停止该 JVM 中可用的 SparkContext, 这种限制可能最终会被修改. SparkContext 被实例化时需要一个 SparkConf 对象去描述应用的配置信息, 在这个配置对象中设置的信息, 会覆盖系统默认的配置.
RDD 五大特性:
(1)分区列表 (a list of partitions).Spark RDD 是被分区的, 每一个分区都会被一个计算任务(Task) 处理, 分区数决定并行计算数量, RDD 的并行度默认从父 RDD 传给子 RDD. 默认情况下, 一个 HDFS 上的数据分片就是一个 Partition,RDD 分片数决定了并行计算的力度, 可以在创建 RDD 时指定 RDD 分片个数, 如果不指定分区数量, 当 RDD 从集合创建时, 则默认分区数量为该程序所分配到的资源的 CPU 核数(每个 Core 可以承载 2~4 个 Partition), 如果是从 HDFS 文件创建, 默认为文件的 Block 数.
(2)每一个分区都有一个计算函数(a function for computing each split). 每个分区都会有计算函数, Spark 的 RDD 的计算函数是以分片为基本单位的, 每个 RDD 都会实现 compute 函数, 对具体的分片进行计算, RDD 中的分片是并行的, 所以是分布式并行计算. 有一点非常重要, 就是由于 RDD 有前后依赖关系, 遇到宽依赖关系, 例如, 遇到 reduceBykey 等宽依赖操作的算子, Spark 将根据宽依赖划分 Stage,Stage 内部通过 Pipeline 操作, 通过 Block Manager 获取相关的数据, 因为具体的 split 要从外界读数据, 也要把具体的计算结果写入外界, 所以用了一个管理器, 具体的 split 都会映射成 BlockManager 的 Block, 而具体 split 会被函数处理, 函数处理的具体形式是以任务的形式进行的.
(3)依赖于其他 RDD 的列表(a list of dependencies on other RDDs).RDD 的依赖关系, 由于 RDD 每次转换都会生成新的 RDD, 所以 RDD 会形成类似流水线的前后依赖关系, 当然, 宽依赖就不类似于流水线了, 宽依赖后面的 RDD 具体的数据分片会依赖前面所有的 RDD 的所有的数据分片, 这时数据分片就不进行内存中的 Pipeline, 这时一般是跨机器的. 因为有前后的依赖关系, 所以当有分区数据丢失的时候, Spark 会通过依赖关系重新计算, 算出丢失的数据, 而不是对 RDD 所有的分区进行重新计算. RDD 之间的依赖有两种: 窄依赖(Narrow Dependency), 宽依赖(Wide Dependency).RDD 是 Spark 的核心数据结构, 通过 RDD 的依赖关系形成调度关系. 通过对 RDD 的操作形成整个 Spark 程序.
RDD 有 Narrow Dependency 和 Wide Dependency 两种不同类型的依赖, 其中的 Narrow Dependency 指的是每一个 parent RDD 的 Partition 最多被 child RDD 的一个 Partition 所使用, 而 Wide Dependency 指的是多个 child RDD 的 Partition 会依赖于同一个 parent RDD 的 Partition. 可以从两个方面来理解 RDD 之间的依赖关系: 一方面是该 RDD 的 parent RDD 是什么; 另一方面是依赖于 parent RDD 的哪些 Partitions; 根据依赖于 parent RDD 的 Partitions 的不同情况, Spark 将 Dependency 分为宽依赖和窄依赖两种. Spark 中宽依赖指的是生成的 RDD 的每一个 partition 都依赖于父 RDD 的所有 partition, 宽依赖典型的操作有 groupByKey,sortByKey 等, 宽依赖意味着 shuffle 操作, 这是 Spark 划分 Stage 边界的依据, Spark 中宽依赖支持两种 Shuffle Manager, 即 HashShuffleManager 和 SortShuffleManager, 前者是基于 Hash 的 Shuffle 机制, 后者是基于排序的 Shuffle 机制. Spark 2.2 现在的版本中已经没有 Hash Shuffle 的方式.
(4)key-value 数据类型的 RDD 分区器 (-Optionally,a Partitioner for key-value RDDS), 控制分区策略和分区数. 每个 key-value 形式的 RDD 都有 Partitioner 属性, 它决定了 RDD 如何分区. 当然, Partition 的个数还决定每个 Stage 的 Task 个数. RDD 的分片函数, 想控制 RDD 的分片函数的时候可以分区(Partitioner) 传入相关的参数, 如 HashPartitioner,RangePartitioner, 它本身针对 key-value 的形式, 如果不是 key-value 的形式, 它就不会有具体的 Partitioner.Partitioner 本身决定了下一步会产生多少并行的分片, 同时, 它本身也决定了当前并行(parallelize)Shuffle 输出的并行数据, 从而使 Spark 具有能够控制数据在不同节点上分区的特性, 用户可以自定义分区策略, 如 Hash 分区等. Spark 提供了 "partitionBy" 运算符, 能通过集群对 RDD 进行数据再分配来创建一个新的 RDD.
(5)每个分区都有一个优先位置列表(-Optionally,a list of preferred locations to compute each split on). 它会存储每个 Partition 的优先位置, 对于一个 HDFS 文件来说, 就是每个 Partition 块的位置. 观察运行 spark 集群的控制台会发现 Spark 的具体计算, 具体分片前, 它已经清楚地知道任务发生在什么节点上, 也就是说, 任务本身是计算层面的, 代码层面的, 代码发生运算之前已经知道它要运算的数据在什么地方, 有具体节点的信息. 这就符合大数据中数据不动代码动的特点. 数据不动代码动的最高境界是数据就在当前节点的内存中. 这时有可能是 memory 级别或 Alluxio 级别的, Spark 本身在进行任务调度时候, 会尽可能将任务分配到处理数据的数据块所在的具体位置. 据 Spark 的 RDD.Scala 源码函数 getPreferredLocations 可知, 每次计算都符合完美的数据本地性.
RDD 类源码文件中的 4 个方法和一个属性对应上述阐述的 RDD 的 5 大特性. RDD.scala 的源码如下:
- /**
- * :: DeveloperApi ::
- * Implemented by subclasses to compute a given partition. 通过子类实现给定分区的计算
- */
- @DeveloperApi
- def compute(split: Partition, context: TaskContext): Iterator[T]
- /**
- * Implemented by subclasses to return the set of partitions in this RDD. This method will only
- * be called once, so it is safe to implement a time-consuming computation in it.
- * 通过子类实现, 返回一个 RDD 分区列表, 这个方法只被调用一次, 它是安全的执行一次耗时计算
- *
- * 数组中的分区必须符合以下属性设置
- * The partitions in this array must satisfy the following property:
- * `rdd.partitions.zipWithIndex.forall { case (partition, index) => partition.index == index }`
- */
- protected def getPartitions: Array[Partition]
- /**
- * 返回对父 RDD 的依赖列表, 这个方法仅只被调用一次, 它是安全的执行一次耗时计算
- * Implemented by subclasses to return how this RDD depends on parent RDDs. This method will only
- * be called once, so it is safe to implement a time-consuming computation in it.
- */
- protected def getDependencies: Seq[Dependency[_]] = deps
- /**
- * 可选的, 指定优先位置, 输入参数是 spilt 分片, 输出结果是一组优先的节点位置
- * Optionally overridden by subclasses to specify placement preferences.
- */
- protected def getPreferredLocations(split: Partition): Seq[String] = Nil
- /**
- * Optionally overridden by subclasses to specify how they are partitioned.
- * 可选的, 通过子类实现, 指定如何分区
- */
- @transient val partitioner: Option[Partitioner] = None
其中, TaskContext 是读取或改变执行任务的环境, 用 org.apache.spark.TaskContext.get()可返回当前可用的 TaskContext, 可以调用内部的函数访问正在运行任务的环境信息. Partitioner 是一个对象, 定义了如何在 key-Value 类型的 RDD 元素中用 Key 分区, 从 0 到 numPartitions-1 区间内映射每一个 Key 到 Partition ID.Partition 是一个 RDD 的分区标识符. Partition.scala 的源码如下.
- /**
- * An identifier for a partition in an RDD.
- */
- trait Partition extends Serializable {
- /**
- * Get the partition's index within its parent RDD
- */
- def index: Int
- // A better default implementation of HashCode
- override def hashCode(): Int = index
- override def equals(other: Any): Boolean = super.equals(other)
- }
来源: http://www.bubuko.com/infodetail-3279961.html