今天是 spark 专题的第四篇文章, 我们一起来看下 Pair RDD.
定义
在之前的文章当中, 我们已经熟悉了 RDD 的相关概念, 也了解了 RDD 基本的转化操作和行动操作. 今天我们来看一下 RDD 当中非常常见的 PairRDD, 也叫做键值对 RDD, 可以理解成 KVRDD.
KV 很好理解, 就是 key 和 value 的组合, 比如 Python 当中的 dict 或者是 C++ 以及 Java 当中的 map 中的基本元素都是键值对. 相比于之前基本的 RDD,pariRDD 可以支持更多的操作, 相对来说更加灵活, 可以完成更加复杂的功能. 比如我们可以根据 key 进行聚合, 或者是计算交集等.
所以本身 pairRDD 只不过是数据类型是 KV 结构的 RDD 而已, 并没有太多的内涵, 大家不需要担心.
Pair RDD 转化操作
Pair RDD 也是 RDD, 所以之前介绍的 RDD 的转化操作 Pair RDD 自然也可以使用. 它们两者有些像是类继承的关系, RDD 是父类, Pair RDD 是实现了一些新特性的子类. 子类可以调用父类当中所有的方法, 但是父类却不能调用子类中的方法.
调用的时候需要注意, 由于我们的 Pair RDD 中的数据格式是 KV 的二元组, 所以我们传入的函数必须是针对二元组数据的, 不然的话可能运算的结果会有问题. 下面我们来列举一些最常用的转化操作.
为了方便演示, 我们用一个固定的 RDD 来运行各种转化操作, 来直观了解一下这些转化操作究竟起什么样的作用.
ex1 = sc.parallelize([[1, 2], [3, 4], [3, 5]])
keys,values 和 sortByKey
这三个转化操作应该是最常用也是最简单的, 简单到我们通过字面意思就可以猜出它们的意思.
我们先来看 keys 和 values:
我们的 RDD 当中二元组当中的第一个元素会被当做 key, 第二个元素当做 value, 需要注意的是, 它并不是一个 map 或者是 dict, 所以 key 和 value 都是可以重复的.
sortByKey 也很直观, 我们从字面意思就看得出来是对 RDD 当中的数据根据 key 值进行排序, 同样, 我们也来看下结果:
mapValues 和 flatMapValues
mapValues 不能直接使用, 而必须要传入一个函数作为参数. 它的意思是对所有的 value 执行这个函数, 比如我们想把所有的 value 全部转变成字符串, 我们可以这么操作:
flatMapValues 的操作和我们的认知有些相反, 我们都知道 flatMap 操作是可以将一个嵌套的数组打散, 但是我们怎么对一个 value 打散嵌套呢? 毕竟我们的 value 不一定就是一个数组, 这就要说到我们传入的函数了, 这个 flatMap 的操作其实是针对函数返回的结果的, 也就是说函数会返回一个迭代器, 然后打散的内容其实是这个迭代器当中的值.
我这么表述可能有些枯燥, 我们来看一个例子就明白了:
不知道这个结果有没有出乎大家的意料, 它的整个流程是这样的, 我们调用 flatMapValues 运算之后返回一个迭代器, 迭代器的内容是 range(x, x+3). 其实是每一个 key 对应一个这样的迭代器, 之后再将迭代器当中的内容打散, 和 key 构成新的 pair.
groupByKey,reduceByKey 和 foldByKey
这两个功能也比较接近, 我们先说第一个, 如果学过 SQL 的同学对于 group by 操作的含义应该非常熟悉. 如果没有了解过也没有关系, group by 可以简单理解成归并或者是分桶. 也就是说将 key 值相同的 value 归并到一起, 得到的结果是 key-list 的 Pair RDD, 也就是我们把 key 值相同的 value 放在了一个 list 当中.
我们也来看下例子:
我们调用完 groupby 之后得到的结果是一个对象, 所以需要调用一下 mapValues 将它转成 list 才可以使用, 否则的话是不能使用 collect 获取的.
reduceByKey 和 groupByKey 类似, 只不过 groupByKey 只是归并到一起, 然而 reduceByKey 是传入 reduce 函数, 执行 reduce 之后的结果. 我们来看一个例子:
在这个例子当中我们执行了累加, 把 key 值相同的 value 加在了一起.
foldByKey 和 fold 的用法差别并不大, 唯一不同的是我们加上了根据 key 值聚合的逻辑. 如果我们把分区的初始值设置成 0 的话, 那么它用起来和 reduceByKey 几乎没有区别:
我们只需要清楚 foldByKey 当中的初始值针对的是分区即可.
combineByKey
这个也是一个很核心并且不太容易理解的转化操作, 我们先来看它的参数, 它一共接受 5 个参数. 我们一个一个来说, 首先是第一个参数, 是 createCombiner.
它的作用是初始化, 将 value 根据我们的需要做初始化, 比如将 string 类型的转化成 int, 或者是其他的操作. 我们用记号可以写成是 V => C, 这里的 V 就是 value,C 是我们初始化之后的新值.
它会和 value 一起被当成新的 pair 传入第二个函数, 所以第二个函数的接受参数是 (C, V) 的二元组. 我们要做的是定义这个二元组的合并, 所以第二个函数可以写成(C, V) => C. 源码里的注释和网上的教程都是这么写的, 但我觉得由于出现了两个 C, 可能会让人难以理解, 我觉得可以写成(C, V) => D, 比较好.
最后一个函数是将 D 进行合并, 所以它可以写成是(D, D) => D.
到这里我们看似好像明白了它的原理, 但是又好像有很多问号, 总觉得哪里有些不太对劲. 我想了很久, 才找到了问题的根源, 出在哪里呢, 在于合并. 有没有发现第二个函数和第三个函数都是用来合并的, 为什么我们要合并两次, 它们之间的区别是什么? 如果这个问题没搞明白, 那么对于它的使用一定是错误的, 我个人觉得这个问题才是这个转化操作的核心, 没讲清楚这个问题的博客都是不够清楚的.
其实这两次合并的逻辑大同小异, 但是合并的范围不一样, 第一次合并是针对分区的, 第二次合并是针对 key 的. 因为在 spark 当中数据可能不止存放在一个分区内, 所以我们要合并两次, 第一次先将分区内部的数据整合在一起, 第二次再跨分区合并. 由于不同分区的数据可能相隔很远, 所以会导致网络传输的时间过长, 所以我们希望传输的数据尽量小, 这才有了 groupby 两次的原因.
我们再来看一个例子:
在这个例子当中我们计算了每个单词出现的平均个数, 我们一点一点来看. 首先, 我们第一个函数将 value 转化成了 (1, value) 的元组, 元组的第 0 号元素表示出现该单词的文档数, 第 1 号元素表示文档内出现的次数. 所以第二个函数, 也就是在分组内聚合的函数, 我们对于出现的文档数只需要加一即可, 对于出现的次数要进行累加. 因为这一次聚合的对象都是 (1, value) 类型的元素, 也就是没有聚合之前的结果.
在第三个函数当中, 我们对于出现的总数也进行累加, 是因为这一个函数处理的结果是各个分区已经聚合一次的结果了. 比如 apple 在一个分区内出现在了两个文档内, 一共出现了 20 次, 在一个分区出现在了三个文档中, 一共出现了 30 次, 那么显然我们一共出现在了 5 个文档中, 一共出现了 50 次.
由于我们要计算平均, 所以我们要用出现的总次数除以出现的文档数. 最后经过 map 之后由于我们得到的还是一个二元组, 我们不能直接 collect, 需要用 collectAsMap.
我们把上面这个例子用图来展示, 会很容易理解:
连接操作
在 spark 当中, 除了基础的转化操作之外, spark 还提供了额外的连接操作给 pair RDD. 通过连接, 我们可以很方便地像是操作集合一样操作 RDD. 操作的方法也非常简单, 和 SQL 当中操作数据表的形式很像, 就是 join 操作. join 操作又可以分为 join(inner join),left join 和 right join.
如果你熟悉 SQL 的话, 想必这三者的区别应该非常清楚, 它和 SQL 当中的 join 是一样的. 如果不熟悉也没有关系, 解释起来并不复杂. 在 join 的时候我们往往是用一张表去 join 另外一张表, 就好像两个数相减, 我们用一个数减去另外一个数一样. 比如 A.join(B), 我们把 A 叫做左表, B 叫做右表. 所谓的 join, 就是把两张表当中某一个字段或者是某些字段值相同的行连接在一起.
比如一张表是学生表, 一张表是出勤表. 我们两张表用学生的 id 一关联, 就得到了学生的出勤记录. 但是既然是集合关联, 就会出现数据关联不上的情况. 比如某个学生没有出勤, 或者是出勤表里记错了学生 id. 对于数据关联不上的情况, 我们的处理方式有四种. 第一种是全都丢弃, 关联不上的数据就不要了. 第二种是全部保留, 关联不上的字段就记为 NULL. 第三种是左表关联不上的保留, 右表丢弃. 第四种是右表保留, 左表丢弃.
下图展示了这四种 join, 非常形象.
我们看几个实际的例子来体会一下.
首先创建数据集:
- ex1 = sc.parallelize([['frank', 30], ['bob', 9], ['silly', 3]])
- ex2 = sc.parallelize([['frank', 80], ['bob', 12], ['marry', 22], ['frank', 21], ['bob', 22]])
接着, 我们分别运行这四种 join, 观察一下 join 之后的结果.
从结果当中我们可以看到, 如果两个数据集当中都存在多条 key 值相同的数据, spark 会将它们两两相乘匹配在一起.
行动操作
最后, 我们看下 pair RDD 的行动操作. pair RDD 同样是 rdd, 所以普通 rdd 适用的行动操作, 同样适用于 pair rdd. 但是除此之外, spark 还为它开发了独有的行动操作.
countByKey
countByKey 这个操作顾名思义就是根据 Key 值计算每个 Key 值出现的条数, 它等价于 count groupby 的 SQL 语句. 我们来看个具体的例子:
collectAsMap
这个也很好理解, 其实就是讲最后的结果以 map 的形式输出:
从返回的结果可以看到, 输出的是一个 dict 类型. 也就是 Python 当中的 "map".
lookup
这个单词看起来比较少见, 其实它代表的是根据 key 值查找对应的 value 的意思. 也就是常用的 get 函数, 我们传入一个 key 值, 会自动返回 key 值对应的所有的 value. 如果有多个 value, 则会返回 list.
总结
到这里, 所有的 pair RDD 相关的操作就算是介绍完了. pair rdd 在我们日常的使用当中出现的频率非常高, 利用它可以非常方便地实现一些比较复杂的操作.
另外, 今天的这篇文章内容不少, 想要完全吃透, 需要一点功夫. 这不是看一篇文章就可以实现的, 但是也没有关系, 我们初学的时候只需要对这些 API 和使用方法有一个大概的印象即可, 具体的使用细节可以等用到的时候再去查阅相关的资料.
今天的文章就是这些, 如果觉得有所收获, 请顺手点个关注或者转发吧, 你们的举手之劳对我来说很重要.
来源: https://www.cnblogs.com/techflow/p/12781837.html