- Working on a Per-Partition Basis
- (基于分区的操作)
以每个分区为基础处理数据使我们可以避免为每个数据项重做配置工作如打开数据库连接或者创建随机数生成器这样的操作, 我们希望避免为每个元素重做配置工作 Spark 有分区版本的 map 和 foreach, 通过让 RDD 的每个分区只运行一次代码, 可帮助降低这些操作的成本
回到我们的呼号例子中, 有一个无线电台呼号的在线数据库, 我们可以查询联系日志的公共列表通过使用基于分区的操作, 我们可以分享数据库的连接池来避免为多个连接配置连接参数, 并且可以复用 JSON 解析器如 Example6-10 到 6-12 所示, 我们使用 mapPartitions()函数, 这个函数会给我们一个输入 RDD 每个分区中元素的迭代器, 并且期望我们的返回是一个结果的迭代器
- Example 6-10. Shared connection pool in Python
- def processCallSigns(signs):
- """Lookup call signs using a connection pool"""
- # Create a connection pool
- http = urllib3.PoolManager()
- # the URL associated with each call sign record
- urls = map(lambda x: "http://73s.com/qsos/%s.json" % x, signs)
- # create the requests (non-blocking)
- requests = map(lambda x: (x, http.request('GET', x)), urls)
- # fetch the results
- result = map(lambda x: (x[0], json.loads(x[1].data)), requests)
- # remove any empty results and return
- return filter(lambda x: x[1] is not None, result)
- def fetchCallSigns(input):
- """Fetch call signs"""
- return input.mapPartitions(lambda callSigns : processCallSigns(callSigns))
- contactsContactList = fetchCallSigns(validSigns)
- Example 6-11. Shared connection pool and JSON parser in Scala
- val contactsContactLists = validSigns.distinct().mapPartitions{
- signs =>
- val mapper = createMapper()
- val client = new HttpClient()
- client.start()
- // create http request
- signs.map {sign =>
- createExchangeForSign(sign)
- // fetch responses
- }.map{ case (sign, exchange) =>
- (sign, readExchangeCallLog(mapper, exchange))
- }.filter(x => x._2 != null) // Remove empty CallLogs
- }
- Example 6-12. Shared connection pool and JSON parser in Java
- // Use mapPartitions to reuse setup work.
- JavaPairRDD<String, CallLog[]> contactsContactLists =
- validCallSigns.mapPartitionsToPair(
- new PairFlatMapFunction<Iterator<String>, String, CallLog[]>() {
- public Iterable<Tuple2<String, CallLog[]>> call(Iterator<String> input) {
- // List for our results.
- ArrayList<Tuple2<String, CallLog[]>> callsignLogs = new ArrayList<>();
- ArrayList<Tuple2<String, ContentExchange>> requests = new ArrayList<>();
- ObjectMapper mapper = createMapper();
- HttpClient client = new HttpClient();
- try {
- client.start();
- while (input.hasNext()) {
- requests.add(createRequestForSign(input.next(), client));
- }
- for (Tuple2<String, ContentExchange> signExchange : requests) {
- callsignLogs.add(fetchResultFromRequest(mapper, signExchange));
- }
- } catch (Exception e) {
- }
- return callsignLogs;
- }});
- System.out.println(StringUtils.join(contactsContactLists.collect(), ","));
当使用基于分区的操作时, Spark 会给函数一个分区中元素的迭代器我们的返回结果也是一个迭代器除了 mapPartitions()之外, Spark 还有很多其他分区操作, 如表 6-1 中所示:
函数名 | 所调用内容 | 返回类型 | RDD[T] 函数签名 |
---|---|---|---|
mapPartitions() | 分区元素的迭代器 | 返回元素的迭代器 | f: (Iterator[T]) ->Iterator[U] |
mapPartitionsWithIndex() | 分区数量和分区元素的迭代器 | 返回元素的迭代器 | f: (Int, Iterator[T]) -> Iterator[U] |
foreachPartition() | 元素迭代器 | 无 | f: (Iterator[T]) -> Unit |
除了可以避免重复的配置工作, 我们有时还可以使用 mapPartitions()来避免创建对象的开销有时我们需要创建一个对象来聚合不同类型的结果回想一下第三章我们计算平均数的时候, 当时把数值 RDD 转换成了元组 RDD, 这样便于跟踪在归约步骤中元素处理的数量从而计算平均值除了这种对每个元素都处理的方式, 我们可以为每个分区创建一次元组, 示例如下:
- Example 6-13. Average without mapPartitions() in Python
- def combineCtrs(c1, c2):
- return (c1[0] + c2[0], c1[1] + c2[1])
- def basicAvg(nums):
- """Compute the average"""
- nums.map(lambda num: (num, 1)).reduce(combineCtrs)
- Example 6-14. Average with mapPartitions() in Python
- def partitionCtr(nums):
- """Compute sumCounter for partition"""
- sumCount = [0, 0]
- for num in nums:
- sumCount[0] += num
- sumCount[1] += 1
- return [sumCount]
- def fastAvg(nums):
- """Compute the avg"""
- sumCount = nums.mapPartitions(partitionCtr).reduce(combineCtrs)
- return sumCount[0] / float(sumCount[1])
- Piping to External Programs(连接外部程序)
Spark 提供三种开箱即用的语言支持, 你似乎在编写 Spark 程序时不会遇到语言上的障碍但是, 如果 Scala,Java 和 Python 都不是你想要的, 那也没关系, SPark 提供了一个使用其他语言 (如 R 脚本) 传输数据给程序的机制,
Spark 在 RDD 上提供了一个 pipe()方法这个方法可以让我们使用其他语言编写 job, 但前提是这种语言支持 Unix 标准流的读入和写出使用 pipe(), 可以编写一个 RDD 转换, 它从标准输入读取每个 RDD 元素作为 String, 接着按照需要操作该 String, 然后将结果作为 String 写入标准输出提供的接口和编程模型是受到了一些限制, 但是有时后你需要做的就是像写在 map()或 filter()中使用的本地代码函数一样
你可能会想把 RDD 的内容传输给外部程序或者脚本, 因为你可能已经构建并测试了一个复杂的软件, 而且你想在 Spark 中重用他有很多数据科学家有 R 语言写的代码, 我们可以通过 pipe()函数与 R 程序进行交互
在 Example6-15 中, 我们使用 R 语言的库来计算所有联系人的距离 RDD 中的每个元素通过换行符作为分割写出, 并且程序输出的每一行都是结果 RDD 中的字符串元素为了使 R 程序转换输入数据简便一些, 我们把数据转换成
mylat, mylon, theirlat, theirlon
格式我们以逗号作为分隔符
- Example 6-15. R distance program
- #!/usr/bin/env Rscript
- library("Imap")
- f <- file("stdin")
- open(f)
- while(length(line <- readLines(f,n=1))> 0) {
- # process line
- contents <- Map(as.numeric, strsplit(line, ","))
- mydist <- gdist(contents[[1]][1], contents[[1]][2],
- contents[[1]][3], contents[[1]][4],
- units="m", a=6378137.0, b=6356752.3142, verbose = FALSE)
- write(mydist, stdout())
- }
如果把上述代码写入一个名为
./src/R/finddistance.R
的可执行文件, 那么使用方法大体如下:
- $ ./src/R/finddistance.R
- 37.75889318222431,-122.42683635321838,37.7614213,-122.4240097
- 349.2602
- coffee
- NA
- ctrl-d
目前为止很不错, 我们学会了从 stdin 读取的每一行转换成 stdout 输出的方法了现在我们需要让每个工作节点都能使用 finddistance.R 并且能让 shell 脚本转换我们的 RDD 完成这两点其实在 Spark 中很简单, 示例如下:
- Example 6-16. Driver program using pipe() to call finddistance.R in Python
- # Compute the distance of each call using an external R program
- distScript = "./src/R/finddistance.R"
- distScriptName = "finddistance.R"
- sc.addFile(distScript)
- def hasDistInfo(call):
- """Verify that a call has the fields required to compute the distance"""
- #验证计算距离的必要字段
- requiredFields = ["mylat", "mylong", "contactlat", "contactlong"]
- return all(map(lambda f: call[f], requiredFields))
- def formatCall(call):
- """Format a call so that it can be parsed by our R program"""
- #格式话数据便于 R 程序解析
- return "{0},{1},{2},{3}".format(
- call["mylat"], call["mylong"],
- call["contactlat"], call["contactlong"])
- pipeInputs = contactsContactList.values().flatMap(
- lambda calls: map(formatCall, filter(hasDistInfo, calls)))
- distances = pipeInputs.pipe(SparkFiles.get(distScriptName))
- print distances.collect()
- Example 6-17. Driver program using pipe() to call finddistance.R in Scala
- // Compute the distance of each call using an external R program
- // adds our script to a list of files for each node to download with this job
- // 通过外部的 R 程序计算每个呼叫的距离
- // 把脚本添加到这个 job 中每个节点都要下载的文件列表
- val distScript = "./src/R/finddistance.R"
- val distScriptName = "finddistance.R"
- sc.addFile(distScript)
- val distances = contactsContactLists.values.flatMap(x => x.map(y =>
- s"$y.contactlay,$y.contactlong,$y.mylat,$y.mylong")).pipe(Seq(
- SparkFiles.get(distScriptName)))
- println(distances.collect().toList)
- Example 6-18. Driver program using pipe() to call finddistance.R in Java
- // Compute the distance of each call using an external R program
- // adds our script to a list of files for each node to download with this job
- String distScript = "./src/R/finddistance.R";
- String distScriptName = "finddistance.R";
- sc.addFile(distScript);
- JavaRDD<String> pipeInputs = contactsContactLists.values()
- .map(new VerifyCallLogs()).flatMap(
- new FlatMapFunction<CallLog[], String>() {
- public Iterable<String> call(CallLog[] calls) {
- ArrayList<String> latLons = new ArrayList<String>();
- for (CallLog call: calls) {
- latLons.add(call.mylat + "," + call.mylong +
- "," + call.contactlat + "," + call.contactlong);
- }
- return latLons;
- }
- });
- JavaRDD<String> distances = pipeInputs.pipe(SparkFiles.get(distScriptName));
- System.out.println(StringUtils.join(distances.collect(), ","));
使用
SparkContext.addFile(path)
方法, 我们可以构建一个在单个 job 中每个工作节点都要下载的文件列表这些文件可以来自驱动程序的本地文件系统(如例子中那样), 或者来自 HDFS 或其他 Hadoop 支持的文件系统, 也可以是一个 HTTP,HTTPS 或 FTP 的 URI 当一个 action 在一个 job 中运行, 每个节点都会开始下载这些文件这些文件可以在工作节点的
SparkFiles.getRootDirectory
路径或者
SparkFiles.get(filename)
中找到当然, 这只是确定 pipe()方法能够在每个工作节点上找到脚本文件的方法之一你可以使用其他的远程复制工具来将脚本文件放在每个节点可知位置上
使用
SparkContext.addFile(path)
添加的所有文件都存放在同一个目录中, 因此使用唯一名称非常重要
一旦确定了脚本是可以使用的, RDD 上的 pipe()方法可以很简单地把 RDD 中的元素传输给脚本也许更智能的 findDistance 版本会接受 SEPARATOR 作为命令行参数在那种情况下, 以下任何一个都可以完成这项工作, 尽管第一个是首选:
- rdd.pipe(Seq(SparkFiles.get("finddistance.R"), ","))
- rdd.pipe(SparkFiles.get("finddistance.R") + ",")
第一种方式, 我们把命令的调用当做具有位置的元素序列来传递(命令本身在零偏移的位置上); 第二种方式, 我们把它作为一个单独的命令字符串传递, 然后 Spark 会分解成位置参数
我们也可以在需要的情况下明确设置 pipe()的环境变量直接把环境变量的 map 作为 pipe()的第二个参数, Spark 会去设置这些在值
你现在应该至少理解了如何使用 pipe()来利用外部命令处理 RDD 的元素, 还有如何将命令脚本分发到集群中工作节点能找到的位置
Numeric RDD Operations(数值 RDD 的操作)
Spark 提供了几种包含数值数据 RDD 的描述性统计操作除此之外, 更复杂的统计和机器学习方法之外, 我们将在后面的第 11 章中介绍这些方法
Spark 的数值操作是通过流式算法实现的, 它允许一次构建我们模型的一个元素描述性统计对数据一次性计算出结果并且调用 stats()会返回一个 StatsCounter 对象表 6-2 介绍了 StatsCounter 对象的方法
Table 6-2. Summary statistics available fromStatsCounter
方法 | 作用 |
---|---|
count() | 元素数量 |
mean() | 元素平均值 |
sum() | 总值 |
max() | 最大值 |
min() | 最小值 |
variance() | 方差 |
sampleVariance() | 样本方差 |
stdev() | 标准差 |
sampleStdev() | 样本标准差 |
如果您只想计算其中一个统计数据, 那么也可以直接在 RDD 上调用相应的方法, 例如 rdd.mean()或 rdd.sum()
在例 6-19 到 6-21 中, 我们将使用汇总统计来从我们的数据中删除一些异常值因为我们可能会遍历相同的 RDD 两次(一次是汇总统计, 第二次是删除异常值), 所以将 RDD 缓存起来回到我们的呼叫日志的例子上, 我们可以从呼叫日志上删除距离太远的联系地点
- Example 6-19. Removing outliers in Python
- # Convert our RDD of strings to numeric data so we can compute stats and
- # remove the outliers.
- distanceNumerics = distances.map(lambda string: float(string))
- stats = distanceNumerics.stats()
- stddev = std.stdev()
- mean = stats.mean()
- reasonableDistances = distanceNumerics.filter(
- lambda x: math.fabs(x - mean) <3 * stddev)
- print reasonableDistances.collect()
- Example 6-20. Removing outliers in Scala
- // Now we can go ahead and remove outliers since those may have misreported locations
- // first we need to take our RDD of strings and turn it into doubles.
- val distanceDouble = distance.map(string => string.toDouble)
- val stats = distanceDoubles.stats()
- val stddev = stats.stdev
- val mean = stats.mean
- val reasonableDistances = distanceDoubles.filter(x => math.abs(x-mean) <3 * stddev)
- println(reasonableDistance.collect().toList)
- Example 6-21. Removing outliers in Java
- // First we need to convert our RDD of String to a DoubleRDD so we can
- // access the stats function
- JavaDoubleRDD distanceDoubles = distances.mapToDouble(new DoubleFunction<String>() {
- public double call(String value) {
- return Double.parseDouble(value);
- }});
- final StatCounter stats = distanceDoubles.stats();
- final Double stddev = stats.stdev();
- final Double mean = stats.mean();
- JavaDoubleRDD reasonableDistances =
- distanceDoubles.filter(new Function<Double, Boolean>() {
- public Boolean call(Double x) {
- return (Math.abs(x-mean) < 3 * stddev);}});
- System.out.println(StringUtils.join(reasonableDistance.collect(), ","));
配合累加器和广播变量, 分区处理, 外部程序接口和汇总统计, 我们最终完成了示例程序
完整的代码在 src/python/ChapterSixExample.py,src/main/scala/com/oreilly/learningsparkexamples/scala/ChapterSixExample.scala, 和 src/main/java/com/oreilly/learningsparkexamples/java/ChapterSixExample.java 这几个文件之中
Conclusion(总结)
这一章节, 我们介绍了一些 Spark 的高级编程特性, 你可以使用这些特性来提高你程序的效率或可读性后续章节将介绍部署和调整 Spark 应用程序, 以及用于 SQL, 流媒体以及机器学习的内置库我们也将开始看到更复杂和更完整的示例应用程序, 它们利用了迄今为止描述的大部分功能, 并且也会对你自己使用 Spark 的方式有所指导或启发
来源: http://www.bubuko.com/infodetail-2530484.html