RDD 的四种依赖关系
RDD 四种依赖关系, 分别是 ShuffleDependency,PrunDependency,RangeDependency 和 OneToOneDependency 四种依赖关系. 如下图所示: org.apache.spark.Dependency 有两个一级子类, 分别是 ShuffleDependency 和 NarrowDependency. 其中, NarrowDependency 是一个抽象类, 它有三个实现类, 分别是 OneToOneDependency,RangeDependency 和 PruneDependency.
RDD 的窄依赖
我们先来看窄 RDD 是如何确定依赖的父 RDD 的分区的呢? NarrowDependency 定义了一个抽象方法, 如下:
- /**
- * Get the parent partitions for a child partition.
- * @param partitionId a partition of the child RDD
- * @return the partitions of the parent RDD that the child partition depends upon
- */
- def getParents(partitionId: Int): Seq[Int]
其输入参数是子 RDD 的 分区 Id, 输出是子 RDD 分区依赖的父 RDD 的 partition 的 id 序列.
下面, 分别看三种子类的实现:
OneToOneDependency
首先, OneToOneDependency 的 getParent 实现如下:
override def getParents(partitionId: Int): List[Int] = List(partitionId)
就一行代码, 实现比较简单, 子 RDD 对应的 partition index 跟父 RDD 的 partition 的 index 一样. 相当于父 RDD 的 每一个 partition 复制到 子 RDD 的对应分区中, 分区的关系是一对一的. RDD 的关系也是一对一的.
RangeDependency
其次, RangeDependency 的 getParent 实现如下:
- /**
- * :: DeveloperApi ::
- * Represents a one-to-one dependency between ranges of partitions in the parent and child RDDs.
- * @param rdd the parent RDD
- * @param inStart the start of the range in the parent RDD
- * @param outStart the start of the range in the child RDD
- * @param length the length of the range
- */
- @DeveloperApi
- class RangeDependency[T](rdd: RDD[T], inStart: Int, outStart: Int, length: Int)
- extends NarrowDependency[T](rdd) {
- override def getParents(partitionId: Int): List[Int] = {
- if (partitionId>= outStart && partitionId <outStart + length) {
- List(partitionId - outStart + inStart)
- } else {
- Nil
- }
- }
- }
首先解释三个变量: inStart: 父 RDD range 的起始位置; outStart: 子 RDD range 的起始位置; length:range 的长度.
获取 父 RDD 的 partition index 的规则是: 如果子 RDD 的 partition index 在父 RDD 的 range 内, 则返回的 父 RDD partition 是 子 RDD partition index - 父 RDD 分区 range 起始 + 子 RDD 分区 range 起始. 其中,(- 父 RDD 分区 range 起始 + 子 RDD 分区 range 起始) 即 子 RDD 的分区的 range 起始位置和 父 RDD 的分区的 range 的起始位置 的相对距离. 子 RDD 的 parttion index 加上这个相对距离就是 对应父的 RDD partition. 否则是无依赖的父 RDD 的 partition index. 父子 RDD 的分区关系是一对一的. RDD 的关系可能是一对一 (length 是 1 , 就是特殊的 OneToOneDependency), 也可能是多对一, 也可能是一对多.
PruneDependency
最后, PruneDependency 的 getParent 实现如下:
- /**
- * Represents a dependency between the PartitionPruningRDD and its parent. In this
- * case, the child RDD contains a subset of partitions of the parents'.
- */
- private[spark] class PruneDependency[T](rdd: RDD[T], partitionFilterFunc: Int => Boolean)
- extends NarrowDependency[T](rdd) {
- @transient
- val partitions: Array[Partition] = rdd.partitions
- .filter(s => partitionFilterFunc(s.index)).zipWithIndex
- .map { case(split, idx) => new PartitionPruningRDDPartition(idx, split) : Partition }
- override def getParents(partitionId: Int): List[Int] = {
- List(partitions(partitionId).asInstanceOf[PartitionPruningRDDPartition].parentSplit.index)
- }
- }
首先, 解释三个变量: rdd 是指向父 RDD 的实例引用; partitionFilterFunc 是一个回调函数, 作用是过滤出符合条件的父 RDD 的 partition 集合; PartitionPruningRDDPartition 类声明如下:
- private[spark] class PartitionPruningRDDPartition(idx: Int, val parentSplit: Partition)
- extends Partition {
- override val index = idx
- }
partitions 的生成过程如下: 先根据父 RDD 引用获取父 RDD 对应的 partition 集合, 然后根据过滤函数和 partition index , 过滤出想要的父 RDD 的 partition 集合并且从 0 开始编号, 最后, 根据父 RDD 的 partition 和 新编号实例化新的 PartitionPruningRDDPartition 实例, 并放入到 partitions 集合中, 相当于是先对 parent RDD 的分区做 Filter 剪枝操作.
在 getParent 方法中, 先根据子 RDD 的 partition index 获取 到对应的 parent RDD 的对应分区, 然后获取 Partition 的成员函数 index, 该 index 就是 父 RDD 的 partition 在父 RDD 的所有分区中的 index. 子 RDD partition 和 父 RDD partition 的关系是 一对一的, 父 RDD 和子 RDD 的关系是 多对一, 也可能是一对多, 也可能是一对一.
简言之, 在窄依赖中, 子 RDD 的 partition 和 父 RDD 的 partition 的关系是 一对一的.
RDD 的宽依赖
下面重点看 ShuffleDependency,ShuffleDependency 代表的是 一个 shuffle stage 的输出. 先来看其构造方法, 即其依赖的变量或实例:
- @DeveloperApi
- class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
- @transient private val _rdd: RDD[_ <: Product2[K, V]],
- val partitioner: Partitioner,
- val serializer: Serializer = SparkEnv.get.serializer,
- val keyOrdering: Option[Ordering[K]] = None,
- val aggregator: Option[Aggregator[K, V, C]] = None,
- val mapSideCombine: Boolean = false)
- extends Dependency[Product2[K, V]]
其中,_rdd 代指父 RDD 实例; partitioner 是用于给 shuffle 的输出分区的分区器; serializer, 主要用于序列化, 默认是 org.apache.spark.serializer.JavaSerializer, 可以通过 `spark.serializer` 参数指定; keyOrdering RDD shuffle 的 key 的顺序. aggregator,map 或 reduce 端用于 RDD shuffle 的 combine 聚合器; mapSideCombine 是否执行部分的聚合 (即 map 端的预聚合, 可以提高网络传输效率和 reduce 端的执行效率), 默认是 false. 因为并不是所有的都适合这样做. 比如求全局平均值, 均值, 平方差等, 但像全局最大值, 最小值等是适合用 mapSideCombine 的. 注意, 当 mapSideCombine 为 true 时, 必须设置 combine 聚合器, 因为 shuffle 前需要使用聚合器做 map-combine 操作.
partitioner 的 7 种实现
partitioner 定义了 RDD 里的 key-value 对 是如何按 key 来分区的. 映射每一个 key 到一个分区 id, 从 0 到 分区数 - 1; 注意, 分区器必须是确定性的, 即给定同一个 key, 必须返回同一个分区, 便于任务失败时, 追溯分区数据, 确保了每一个要参与计算的分区数据的一致性. 即 partition 确定了 shuffle 过程中 数据是要流向哪个具体的分区的.
org.apache.spark.Partition 的 7 个实现类如下:
我们先来看 Partitioner 的方法定义:
- abstract class Partitioner extends Serializable {
- def numPartitions: Int
- def getPartition(key: Any): Int
- }
其中, numPartitions 是返回 子 RDD 的 partition 数量; getPartition 会根据指定的 key 返回 子 RDD 的 partition index.
HashPartitioner 的 getPartition 的 实现如下, 思路是 key.hashcode() mod 子 RDD 的 partition 数量:
- def getPartition(key: Any): Int = key match {
- case null => 0
- case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
- }
RangePartitioner 的 getPartition 的实现如下:
- def getPartition(key: Any): Int = {
- val k = key.asInstanceOf[K]
- var partition = 0
- if (rangeBounds.length <= 128) { // 不大于 128 分区
- // If we have Less than 128 partitions naive search
- while (partition <rangeBounds.length && ordering.gt(k, rangeBounds(partition))) {
- partition += 1
- }
- } else { // 大于 128 个分区数量
- // Determine which binary search method to use only once.
- partition = binarySearch(rangeBounds, k) // 二分查找
- // binarySearch either returns the match location or -[insertion point]-1
- if (partition < 0) {
- partition = -partition-1
- }
- if (partition> rangeBounds.length) {
- partition = rangeBounds.length
- }
- }
- if (ascending) {
- partition
- } else {
- rangeBounds.length - partition
- }
- }
PythonPartitioner 的 getPartition 如下, 跟 hash 很相似:
- override def getPartition(key: Any): Int = key match {
- case null => 0
- // we don't trust the Python partition function to return valid partition ID's so
- // let's do a modulo numPartitions in any case
- case key: Long => Utils.nonNegativeMod(key.toInt, numPartitions)
- case _ => Utils.nonNegativeMod(key.hashCode(), numPartitions)
- }
PartitionIdPassthrough 的 getPartition 如下:
1 override def getPartition(key: Any): Int = key.asInstanceOf[Int]
GridPartitioner 的 getPartition 如下, 思想, 二元组定位到网格的 partition:
- override val numPartitions: Int = rowPartitions * colPartitions
- /**
- * Returns the index of the partition the input coordinate belongs to.
- *
- * @param key The partition id i (calculated through this method for coordinate (i, j) in
- * `simulateMultiply`, the coordinate (i, j) or a tuple (i, j, k), where k is
- * the inner index used in multiplication. k is ignored in computing partitions.
- * @return The index of the partition, which the coordinate belongs to.
- */
- override def getPartition(key: Any): Int = {
- key match {
- case i: Int => i
- case (i: Int, j: Int) =>
- getPartitionId(i, j)
- case (i: Int, j: Int, _: Int) =>
- getPartitionId(i, j)
- case _ =>
- throw new IllegalArgumentException(s"Unrecognized key: $key.")
- }
- }
- /** Partitions sub-matrices as blocks with neighboring sub-matrices. */
- private def getPartitionId(i: Int, j: Int): Int = {
- require(0 <= i && i < rows, s"Row index $i out of range [0, $rows).")
- require(0 <= j && j < cols, s"Column index $j out of range [0, $cols).")
- i / rowsPerPart + j / colsPerPart * rowPartitions
- }
包括匿名类, 还有好多种, 就不一一介绍了. 总而言之, 宽依赖是根据 partitioner 确定 分区内的数据具体到哪个分区.
至此, RDD 的窄依赖和宽依赖都介绍清楚了.
来源: http://www.bubuko.com/infodetail-3109631.html