2019-04-20
关键字: Spark 的 agrregate 作用, Scala 的 aggregate 是什么
Spark 编程中的 aggregate 方法还是比较常用的. 本篇文章站在初学者的角度以大白话的形式来讲解一下 aggregate 方法.
aggregate 方法是一个聚合函数, 接受多个输入, 并按照一定的规则运算以后输出一个结果值.
aggregate 在哪
aggregate 方法是 Spark 编程模型 RDD 类( org.apache.spark.RDD ) 中定义的一个公有方法. 它的方法声明如下
- def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U = withScope {
- // ...
- }
aggregate 的参数是什么意思
然后我们一块一块来学习这个方法的声明. 其实这小节讲的, 都是 Scala 的语法知识.
def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U = withScope {
首先看到的是 "泛型" 声明. 懂 Java 的同学直接把这个 "[U: ClassTag]" 理解成是一个泛型声明就好了. 如果您不是很熟悉 Java 语言, 那我们只需要知道这个 U 表示我们的 aggregate 方法只能接受某一种类型的输入值, 至于到底是哪种类型, 要看您在具体调用的时候给了什么类型.
def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U = withScope {
然后我们来看看 aggregate 的参数列表. 明显这个 aggregate 方法是一个柯里化函数. 柯里化的知识不在本篇文章讨论的范围之内. 如果您还不了解柯里化的概念, 那在这里简单地理解为是通过多个圆括号来接受多个输入参数就可以了.
然后我们来看看第 1 部分, 即上面蓝色加粗的 "(zeroValue: U)" . 这个表示它接受一个任意类型的输入参数, 变量名为 zeroValue . 这个值就是初值, 至于这个初值的作用, 姑且不用理会, 等到下一小节通过实例来讲解会更明了, 在这里只需要记住它是一个 "只使用一次" 的值就好了.
第 2 部分, 我们还可以再把它拆分一下, 因为它里面其实有两个参数. 笔者认为 Scala 语法在定义多个参数时, 辨识度比较弱, 不睁大眼睛仔细看, 很难确定它到底有几个参数.
首先是第 1 个参数 "seqOp: (U, T) => U" 它是一个函数类型, 以一个输入为任意两个类型 U, T 而输出为 U 类型的函数作为参数. 这个函数会先被执行. 这个参数函数的作用是为每一个分片 ( slice ) 中的数据遍历应用一次函数. 换句话说就是假设我们的输入数据集 ( RDD ) 有 1 个分片, 则只有一个 seqOp 函数在运行, 假设有 3 个分片, 则有三个 seqOp 函数在运行. 可能有点难以理解, 不过没关系, 到后面结合实例就很容易理解了.
另一个参数 "combOp: (U, U) => U" 接受的也是一个函数类型, 以输入为任意类型的两个输入参数而输出为一个与输入同类型的值的函数作为参数. 这个函数会在上面那个函数执行以后再执行. 这个参数函数的输入数据来自于第一个参数函数的输出结果, 这个函数仅会执行 1 次, 它是用来最终聚合结果用的. 同样这里搞不懂没关系, 下一小节的实例部分保证让您明白.
def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U = withScope {
最后是上面这个红色加粗的 ": U" 它是 aggregate 方法的返回值类型, 也是泛型表示.
对了, 最后还有一个 "withScope", 这个就不介绍了, 因为笔者也不知道它是干嘛的, 哈哈哈哈. 反正对我们理解这个方法也没什么影响.
aggregate 正确的使用姿势
我们直接在 spark-shell 中来演示实例了. 这里以两个小例子来演示, 一个是不带分片的 RDD , 另一个则是带 3 个分片的 RDD .
首先我们来创建一个 RDD
- scala> val rdd1 = sc.parallelize(1 to 5)
- rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at parallelize at <console>:24
- scala> rdd1 collect
- warning: there was one feature warning; re-run with -feature for details
- res24: Array[Int] = Array(1, 2, 3, 4, 5)
这个 RDD 仅有 1 个分片, 包含 5 个数据: 1, 2, 3, 4, 5 .
然后我们来应用一下 aggregate 方法.
哦, 不对, 在使用 aggregate 之前, 我们还是先定义两个要给 aggregate 当作输入参数的函数吧.
- scala> :paste
- // Entering paste mode (ctrl-D to finish)
- def pfun1(p1: Int, p2: Int): Int = {
- p1 * p2
- }
- // Exiting paste mode, now interpreting.
- pfun1: (p1: Int, p2: Int)Int
- scala>
首先来定义第 1 个函数, 即等下要被当成 seqOp 的形参使用的函数. 在上一小节我们知道 seqOp 函数是一个输入类型为 U, T 类型而输出为 U 类型的函数. 但是在这里, 因为我们的 RDD 只包含一个 Int 类型数据, 所以这里的 seqOp 的两个输入参数都是 Int 类型的, 这是没毛病的哦! 然后这个函数的返回类型也为 Int . 我们这个函数的作用就是将输入的参数 p1 , p2 求积以后返回.
- scala> :paste
- // Entering paste mode (ctrl-D to finish)
- def pfun2(p3: Int, p4: Int): Int = {
- p3 + p4
- }
- // Exiting paste mode, now interpreting.
- pfun2: (p3: Int, p4: Int)Int
- scala>
接着是第 2 个函数. 就不再解释什么了.
然后终于可以开始应用我们的 aggregate 方法了.
- scala> rdd1.aggregate(3)(pfun1, pfun2)
- res25: Int = 363
- scala>
输出结果是 363 ! 这个结果是怎么算出来的呢?
首先我们的 zeroValue 即初值是 3 . 然后通过上面小节的介绍, 我们知道首先会应用 pfun1 函数, 因为我们这个 RDD 只有 1 个分片, 所以整个运算过程只会有一次 pfun1 函数调用. 它的计算过程如下:
首先用初值 3 作为 pfun1 的参数 p1 , 然后再用 RDD 中的第 1 个值, 即 1 作为 pfun1 的参数 p2 . 由此我们可以得到第一个计算值为 3 * 1 = 3 . 接着这个结果 3 被当成 p1 参数传入, RDD 中的第 2 个值即 2 被当成 p2 传入, 由此得到第二个计算结果为 3 * 2 = 6 . 以此类推, 整个 pfun1 函数执行完成以后, 得到的结果是 3 * 1 * 2 * 3 * 4 * 5 = 360 . 这个 pfun1 的应用过程有点像是 "在 RDD 中滑动计算" .
在 aggregate 方法的第 1 个参数函数 pfun1 执行完毕以后, 我们得到了结果值 360 . 于是, 这个时候就要开始执行第 2 个参数函数 pfun2 了.
pfun2 的执行过程与 pfun1 是差不多的, 同样会将 zeroValue 作为第一次运算的参数传入, 在这里即是将 zeroValue 即 3 当成 p3 参数传入, 然后是将 pfun1 的结果 360 当成 p4 参数传入, 由此得到计算结果为 363 . 因为 pfun1 仅有一个结果值, 所以整个 aggregate 过程就计算完毕了, 最终的结果值就是 363 .
怎么样? 相信您已经完全明白 aggregate 方法的的作用与用法了吧. 下面再贴一个有多个分片的 RDD 的示例.
- scala> val rdd2 = sc.makeRDD(1 to 10, 3)
- rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at makeRDD at <console>:24
- scala> rdd2.getNumPartitions
- res26: Int = 3
- scala> rdd2.foreachPartition(myprint)
- 1 , 2 , 3 ,
- 4 , 5 , 6 ,
- 7 , 8 , 9 , 10 ,
这里定义了一个拥有 3 个分片的 RDD . 然后 aggregate 的两个函数参数仍然是使用上面定义的 pfun1 与 pfun2 .
- scala> rdd2.aggregate(2)(pfun1, pfun2)
- res29: Int = 10334
结果是 10334 . 怎么来的呢?
因为前面小节有提到 seqOp 函数, 即这里的 pfun1 函数会分别在 RDD 的每个分片中应用一次, 所以这里 pfun1 的计算过程为
- 2 * 1 * 2 * 3 = 12
- 2 * 4 * 5 * 6 = 240
- 2 * 7 * 8 * 9 * 10 = 10080
标橙的为 zeroValue .
在这里 pfun1 的输出结果有 3 个值. 然后就来应用 combOp 即这里的 pfun2
2 + 12 + 240 + 10080 = 10334
所以, 结果就是 10334 咯!
来源: https://www.cnblogs.com/chorm590/p/spark_201904201159.html