原文引自:https://www.cnblogs.com/yxpblog/p/5269314.html
在开始之前,我先介绍一下,RDD 是什么?
RDD 是 Spark 中的抽象数据结构类型,任何数据在 Spark 中都被表示为 RDD。从编程的角度来看,RDD 可以简单看成是一个数组。和普通数组的区别是,RDD 中的数据是分区存储的,这样不同分区的数据就可以分布在不同的机器上,同时可以被并行处理。因此,Spark 应用程序所做的无非是把需要处理的数据转换为 RDD,然后对 RDD 进行一系列的变换和操作从而得到结果。
创建 RDD:
1
|
1,2, , , ], )
|
上面这种是数组创建,也可以从文件系统或者 HDFS 中的文件创建出来,后面会讲到。只要搞懂了 spark 的函数们,你就成功了一大半。
spark 的函数主要分两类,Transformations 和 Actions。Transformations 为一些数据转换类函数,actions 为一些行动类函数:
转换:转换的返回值是一个新的 RDD 集合,而不是单个值。调用一个变换方法,不会有任何求值计算,它只获取一个 RDD 作为参数,然后返回一个新的 RDD。
行动:行动操作计算并返回一个新的值。当在一个 RDD 对象上调用行动函数时,会在这一时刻计算全部的数据处理查询并返回结果值。
下面介绍 spark 常用的 Transformations, Actions 函数:
Transformations
map(func [, preservesPartitioning=False]) --- 返回一个新的分布式数据集,这个数据集中的每个元素都是经过 func 函数处理过的。
1
2
3
|
>>> data = [1,2,
, , ] >>> distData =
map(lambda x: x+1).collect()
|
filter(func) --- 返回一个新的数据集,这个数据集中的元素是通过 func 函数筛选后返回为 true 的元素(简单的说就是,对数据集中的每个元素进行筛选,如果符合条件则返回 true,不符合返回 false,最后将返回为 true 的元素组成新的数据集返回)。
1
2
|
>>> rdd =
filter(lambda x:x%2==0).collect() #结果:[2, 4]
|
flatMap(func [, preservesPartitioning=False]) --- 类似于 map(func), 但是不同的是 map 对每个元素处理完后返回与原数据集相同元素数量的数据集,而 flatMap 返回的元素数不一定和原数据集相同。each input item can be mapped to 0 or more output items (so funcshould return a Seq rather than a single item)
1
2
3
4
5
6
7
8
9
10
11
12
13
|
#### for flatMap()
>>> rdd = sc.parallelize([2,
, ]) >>> sorted(rdd.flatMap(lambda x: range(1,x)).collect())
>>> sorted(rdd.flatMap(lambda
#### for map()
>>> rdd = sc.parallelize([2,
, ]) >>> sorted(rdd.flatMap(lambda x: range(1,x)).collect())
>>> sorted(rdd.flatMap(lambda
|
mapPartitions(func [, preservesPartitioning=False]) ---mapPartitions 是 map 的一个变种。map 的输入函数是应用于 RDD 中每个元素,而 mapPartitions 的输入函数是应用于每个分区,也就是把每个分区中的内容作为整体来处理的。
1
2
3
4
|
>>> rdd = sc.parallelize([1,2,
, , ], ) >>> def f(iterator): yield sum(iterator)
#结果:[1,5,9]
|
mapPartitionsWithIndex(func [, preservesPartitioning=False]) ---Similar to mapPartitions, but takes two parameters. The first parameter is the index of the partition and the second is an iterator through all the items within this partition. The output is an iterator containing the list of items after applying whatever transformation the function encodes.
1
2
3
4
|
>>> rdd = sc.parallelize([1,2,
, , ], ) >>> def
yield splitIndex
|
reduceByKey(func [, numPartitions=None,partitionFunc=
1
2
3
4
5
|
>>> from operator import add
>>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
>>> sorted
>>>
|
aggregateByKey(zeroValue)(seqOp, combOp [, numPartitions=None]) ---
sortByKey([ascending=True, numPartitions=None,keyfunc=
1
2
3
4
5
6
|
>>> tmp = [('a', 1), ('b', 2), ('1',
), ('D', )] True, 1).collect()
True, 2, keyfunc=lambda
|
join(otherDataset [, numPartitions=None]) --- join 就是对元素为 kv 对的 RDD 中 key 相同的 value 收集到一起组成 (v1,v2),然后与原 RDD 中的 key 组合成一个新的 kv 对,返回。
1
2
3
4
|
>>> x = sc.parallelize([("a", 1), ("b",
)]) >>> y = sc.parallelize([("a", 2), ("a",
)]) >>> sorted
|
cartesian(otherDataset) --- 返回一个笛卡尔积的数据集,这个数据集是通过计算两个 RDDs 得到的。
1
2
3
4
|
>>> x = sc.parallelize([1,2,
]) >>> y = sc.parallelize([
, ])
|
reduce(func) --- reduce 将 RDD 中元素两两传递给输入函数,同时产生一个新的值,新产生的值与 RDD 中下一个元素再被传递给输入函数直到最后只有一个值为止。
1
2
3
|
>>> from operator import add
1,2, , , ]).reduce(add) # 结果:15
|
collect() --- 返回 RDD 中的数据,以 list 形式。
1
2
|
1,2, , , ]).collect()
|
count() --- 返回 RDD 中的元素个数。
1
2
|
1,2, , , ]).count #结果:5
|
first() --- 返回 RDD 中的第一个元素。
1
2
|
1,2, , , ]).first() #结果:1
|
take(n) --- 返回 RDD 中前 n 个元素。
1
2
|
1,2, , , ]).take(2) #结果:[1,2]
|
takeOrdered(n [, key=None]) --- 返回 RDD 中前 n 个元素,但是是升序 (默认) 排列后的前 n 个元素,或者是通过 key 函数指定后的 RDD(这个 key 我也没理解透,后面在做详解)
1
2
3
4
|
, , ,2, , ]).takeOrdered( ) #结果:[2,3,4]
, , ,2, , ]).takeOrdered( , key=lambda x:-x) #结果:[9,7,6]
|
saveAsTextFile(path [, compressionCodecClass=None]) --- 该函数将 RDD 保存到文件系统里面,并且将其转换为文本行的文件中的每个元素调用 tostring 方法。
parameters: path - 保存于文件系统的路径
compressionCodecClass - (None by default) string i.e. "org.apache.hadoop.io.compress.GzipCodec"
1
2
3
4
5
6
7
|
>>> tempFile =
=True)
>>> sc.parallelize(range(
>>> from fileinput import input
>>> from glob import glob
>>> ''.join(sorted(input
+ "/part-0000*"))))
|
Empty lines are tolerated when saving to text files:
1
2
3
4
5
|
>>> tempFile2 =
=True)
','foo','', 'bar', '
>>> ''.join(sorted(input
+ "/part-0000*"))))
|
Using compressionCodecClass:
1
2
3
4
5
6
7
8
|
>>> tempFile3 =
=True)
>>> codec =
'foo', 'bar'
>>> from fileinput import input, hook_compressed
>>> result = sorted(input
+ "/part*.gz"), openhook=hook_compressed)) >>> b'
utf- ') u'bar\nfoo\n'
|
countByKey() --- 返回一个字典(key,count),该函数操作数据集为 kv 形式的数据,用于统计 RDD 中拥有相同 key 的元素个数。
1
2
3
4
5
|
>>> defdict = sc.parallelize([("a",1), ("b",1), ("a", 1)]).countByKey()
>>> defdict
>>> defdict.items()
|
countByValue() --- 返回一个字典(value,count),该函数操作一个 list 数据集,用于统计 RDD 中拥有相同 value 的元素个数。
1
2
|
1,2, ,1,2, , ,2, ,2
|
foreach(func) --- 运行函数 func 来处理 RDD 中的每个元素,这个函数常被用来 updating an Accumulator 或者与外部存储系统的交互。
1
2
3
|
>>> def f(x): print(x)
1, 2, , , ]).foreach(f)
|
来源: http://www.bubuko.com/infodetail-2439776.html