键值对 RDD 是 Spark 中许多操作所需要的常见数据类型. 除了在基础 RDD 类中定义的操作之外, Spark 为包含键值对类型的 RDD 提供了一些专有的操作在 PairRDDFunctions 专门进行了定义. 这些 RDD 被称为 pairRDD
有很多中方式创建 pairRDD, 一般如果从一个普通的 RDD 转为 pairRDD 时, 可以调用 map()函数来实现, 传递的函数需要返回键值对
val pairs = lines.map(x => (x.split(" ")(0), x))
3.1 键值对 RDD 的转化操作
3.1.1 转化操作列表
针对一个 pairRDD 的转化操作
pairRDD 的转化操作 (以键值对集合{(1,2),(3,4),(3,6)} 为例)
针对两个 pairRDD 的转化操作(rdd = {(1,2),(3,4),(3,6)} other = {(3,9)})
3.1.2 聚合操作
当数据集以键值对形式组织的时候, 聚合具有相同键的元素进行一些统计是很常见的操作. 之前写过基础 RDD 上的 fold(), combine(), reduce()等行动操作, pairRDD 上则有相应的针对键的转化操作. Spark 有一组类似的操作, 可以组合具有相同键的值. 这些操作返回 RDD, 因此它们是转化操作而不是行动操作
reduceByKey()与 reduce()相当类似; 它们都接收一个函数, 并使用该函数对值进行合并. reduceByKey()会为数据集中的每个键进行并行的归约操作, 每个归约操作都会将键相同的值合并起来. 因为数据集中可能有大量的键, 所以 reduceByKey()没有被实现为向用户程序返回一个值的行动操作, 实际上, 它会返回一个由各键和对应键归约出来的结果值组成的新的 RDD
foldByKey()则与 fold()相当类似; 它们都使用一个与 RDD 和合并函数中的数据类型相同的零值作为初始值. 与 fold()一样, foldByKey()操作所使用的合并函数对零值与另一个元素进行合并, 结果仍为该元素
求均值操作: 版本一
input.mapValues(x => (x, 1)).reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2)).map{ case (key, value) => (key, value._1 / value._2.toFloat) }
combineByKey()是最为常用的基于键进行聚合的函数. 大多数基于键聚合的函数都是用它实现的. 和 aggregate()一样, combineByKey()可以让用户返回与输入数据的类型不同的返回值
要理解 combineByKey(), 要先理解它在处理数据时是如何处理每个元素的. 由于 combineByKey()会遍历分区中的所有元素, 因此每个元素的键要么还没有遇到过, 要么就和之前的某个元素的键相同
如果这是一个新的元素, combineByKey()会使用一个叫作 createCombiner()的函数来创建那个键对应的累加器的初始值. 需要注意的是, 这一过程会在每个分区中第一次出现各个键时发生, 而不是在整个 RDD 中第一次出现一个键时发生
如果这是一个在处理当前分区之前已经遇到的键, 它会使用 mergeValue()方法将该键的累加器对应的当前值与这个新的值进行合并
由于每个分区都是独立处理的, 因此对于同一个键可以有多个累加器. 如果有两个或者更多的分区都有对应同一个键的累加器, 就需要使用用户提供的 mergeCombiners()方法将各个分区的结果进行合并
求均值: 版本二
3.1.3 数据分组
如果数据已经以预期的方式提取了键, groupByKey()就会使用 RDD 中的键来对数据进行分组. 对于一个由类型 K 的键和类型 V 的值组成的 RDD, 所得到的结果 RDD 类型会是[K, Iterable[V]]
groupBy()可以用于未成对的数据上, 也可以根据除键相同以外的条件进行分组. 它可以接收一个函数, 对源 RDD 中的每个元素使用该函数, 将返回结果作为键再进行分组
多个 RDD 分组, 可以使用 cogroup 函数, cogroup()函数对多个共享同一个键的 RDD 进行分组. 对两个键的类型均为 K 值的类型分别为 V 和 W 的 RDD 进行 cogroup()时, 得到的结果 RDD 类型为 [(k, (Iterable[V], Iterable[W]))]. 如果其中的一个 RDD 对于另一个中存在的某个键没有对应的记录, 那么对应的迭代器则为空. cogroup() 提供了为多个 RDD 进行数据分组的方法
3.1.4 连接
连接主要用于多个 PairRDD 的操作, 连接方式多种多样: 右外连接, 左外连接, 交叉连接以及内连接
普通的 join 操作符表示内连接, 只有在两个 pairRDD 中都存在的键才叫输出. 当一个输入对应的某个键有多个值时, 生成的 pairRDD 会包括来自两个输入 RDD 的每一组相对应的纪录
leftOuterJoin()产生的 pairRDD 中, 源 RDD 的每一个键都有对应的纪录. 每个键相应的值是由一个源 RDD 中的值与一个包含第二个 RDD 的值的 Option(在 Java 中为 Optional)对象组成的二元组
rightOuterJoin()完全一样, 只不过预期结果中的键必须出现在第二个 RDD 中, 而二元组中的可缺失的部分则来自于源 RDD 而非第二个 RDD
3.1.5 数据排序
sortByKey()函数接收一个叫作 ascending 的参数, 表示我们是否想要让结果按升序排序(默认值为 true)
3.2 键值对 RDD 的行动操作
pairRDD 的行动操作 (以键值对集合{(1,2),(3,4),(3,6)} 为例)
3.3 键值对 RDD 的数据分区
Spark 目前支持 Hash 分区和 Range 分区, 用户也可以自定义分区, Hash 分区为当前的默认分区, Spark 中分区器直接决定了 RDD 中分区的个数, RDD 中每条数据经过 Shuffle 过程属于哪个分区和 Reduce 的个数
注意:
(1) 只有 Key-Value 类型的 RDD 才有分区的, 非 Key-Value 类型的 RDD 分区的值是 None
(2) 每个 RDD 的分区 ID 范围: 0~numPartitions-1, 决定这个值是属于哪个分区的
3.3.1 获取 RDD 的分区方式
可以通过使用 RDD 的 partitioner 属性来获取 RDD 的分区方式. 它会返回一个 scala.Option 对象, 通过 get 方法获取其中的值
3.3.2 Hash 分区方式
HashPartitioner 分区的原理: 对于给定的 key, 计算其 hashCode, 并除于分区的个数取余, 如果余数小于 0, 则用余数 + 分区的个数, 最后返回的值就是这个 key 所属的分区 ID
3.3.3 Range 分区方式
HashPartitioner 分区弊端: 可能导致, 每个分区中数据量的不均匀, 极端情况下会导致某些分区拥有 RDD 的全部数据
RangePartitioner 分区优势: 尽量保证每个分区中数据量的均匀, 而且分区与分区之间是有序的. 一个分区中的元素肯定都是比另一个分区内的元素小或者大
但是分区内的元素是不能保证顺序的. 简单来说就是将一定范围内的数映射到某一个分区内
RangePartitioner 作用: 将一定范围内的数映射到某一个分区内, 在实现中, 分界的算法尤为重要. 用到了水塘抽样算法
3.3.4 自定义分区方式
要实现自定义的分区器, 需要继承 org.apache.spark.Partitioner 类并实现下面三个方法
numPartitions: Int: 返回创建出来的分区数
getPartition(key: Any): Int: 返回给定键的分区编号(0 到 numPartitions-1)
equals(): Java 判断相等性的标准方法. 这个方法的实现非常重要, Spark 需要用这个方法来检查你的分区器对象是否和其他分区器实例相同, 这样 Spark 才可以判断两个 RDD 的分区方式是否相同
假设我们需要将相同后缀的数据写入相同的文件, 我们通过将相同后缀的数据分区到相同的分区并保存输出来实现
使用自定义的 Partitioner 是很容易的: 只要把它传给 partitionBy()方法即可. Spark 中有许多依赖于数据混洗的方法, 比如 join()和 groupByKey(), 它们也可以接收一个可选的 Partitioner 对象来控制输出数据的分区方式
3.3.5 分区 Shuffle 优化
在分布式程序中, 通信的代价是很大的, 因此控制数据分布式以获得最少的网络传输可以极大地提升整体性能.
Spark 中所有的键值对 RDD 都可以进行分区. 系统会根据一个针对键的函数对元素进行分组. 主要有哈希分区和范围分区, 当然用户也可以自定义分区函数.
通过分区可以有效提升程序性能. 如下例子:
有这样一个应用, 它在内存中保存着一张很大的用户信息表 - 也就是一个由 (UserID, UserInfo) 对组成的 RDD, 其中 UserInfo 包含一个该用户所订阅的主题的列表. 该应用会周期性的将这张表与一个小文件进行组合, 这个小文件中存着过去五分钟内发生的事件 - 其实就是一个由 (UserID, LinkInfo) 对组成的表, 存放着过去五分钟内某网站各用户的访问情况. 例如我们可能需要对用户访问其未订阅主题的页面的情况进行统计
解决方案一:
这段代码可以正常运行, 但是不够高效. 这是因为在每次调用 processNewLogs()时都会用到 join()操作, 而我们对数据集是如何区分的却一无所知. 默认情况下
来源: http://www.bubuko.com/infodetail-3117041.html