[TOC]
引入
前面进行过 wordcount 的单词统计例子, 关键是, 如何对统计的单词按照单词个数来进行排序?
如下:
- scala> val retRDD = sc.textFile("hdfs://ns1/hello").flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_)
- scala> val retSortRDD = retRDD.map(pair => (pair._2, pair._1)).sortByKey(false).map(pair => (pair._2, pair._1))
- scala> retSortRDD.collect().foreach(println)
- ...
- (hello,3)
- (me,1)
- (you,1)
- (he,1)
下面的测试都需要引入 maven 的依赖
- <dependency>
- <groupId>org.scala-lang</groupId>
- <artifactId>scala-library</artifactId>
- <version>2.10.5</version>
- </dependency>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-core_2.10</artifactId>
- <version>1.6.2</version>
- </dependency>
Spark 二次排序
测试数据与说明
需要进行二次排序的数据格式如下:
- field_1' 'field_2(使用空格分割)
- 20 21
- 50 51
- 50 52
- 50 53
- 50 54
- 60 51
- 60 53
- 60 52
- 60 56
- 60 57
- 70 58
- 60 61
- 70 54
思路下面的代码注释会有详细的说明, 这里要指出的是, 在下面的排序过程中, 分别使用 Java 和 Scala 进行排序的操作, 并且:
Java 版本
方式 1: 使元素具备比较性 --->需要使用 SecondarySort 对象
方式 2: 提供比较器 --->需要使用 SecondarySort 对象
不管使用哪一种方式, 都需要使用一个新的变量对象 SecondarySort
Scala 版本
方式 1: 使元素具备比较性, 其实就是 Java 版本方式 1 的 scala 实现 --->需要使用 SecondarySort 对象
方式 2: 使用 sortBy 的第一种方式, 基于原始的数据进行排序 --->不需要使用 SecondarySort 对象
方式 3: 使用 sortBy 的第二种方式, 将原始数据进行转换 --->需要使用 SecondarySort 对象
所以这个二次排序的例子包含 Java 和 Scala 总共 5 个版本的实现, 非常有价值!
公共对象
其实就是 SecondarySort 对象, 如下:
- package cn.xpleaf.bigdata.spark.java.core.domain;
- import scala.Serializable;
- public class SecondarySort implements Comparable<SecondarySort>, Serializable {
- private int first;
- private int second;
- public SecondarySort(int first, int second) {
- this.first = first;
- this.second = second;
- }
- public int getFirst() {
- return first;
- }
- public void setFirst(int first) {
- this.first = first;
- }
- public int getSecond() {
- return second;
- }
- public void setSecond(int second) {
- this.second = second;
- }
- @Override
- public int compareTo(SecondarySort that) {
- int ret = this.getFirst() - that.getFirst();
- if(ret == 0) {
- ret = that.getSecond() - this.getSecond();
- }
- return ret;
- }
- @Override
- public String toString() {
- return this.first + " " + this.second;
- }
- }
Java 版本
测试代码如下:
- package cn.xpleaf.bigdata.spark.java.core.p3;
- import cn.xpleaf.bigdata.spark.java.core.domain.SecondarySort;
- import org.apache.spark.SparkConf;
- import org.apache.spark.api.java.JavaPairRDD;
- import org.apache.spark.api.java.JavaRDD;
- import org.apache.spark.api.java.JavaSparkContext;
- import org.apache.spark.api.java.function.PairFunction;
- import org.apache.spark.api.java.function.VoidFunction;
- import scala.Serializable;
- import scala.Tuple2;
- import java.util.Comparator;
/**
* Java 版本的二次排序
* field_1' 'field_2(使用空格分割)
* 20 21
50 51
50 52
50 53
50 54
60 51
60 53
60 52
60 56
60 57
70 58
60 61
70 54
需求: 首先按照第一列升序排序, 如果第一列相等, 按照第二列降序排序
分析: 要排序的话, 使用 sortByKey, 也可以使用 sortBy
如果用 sortByKey 的话, 只能按照 key 来排序, 现在的是用第一列做 key? 还是第二列?
根据需求, 只能使用复合 key(既包含第一列, 也包含第二列), 因为要进行比较, 所以该复合 key 必须具备比较性, 要么该操作提供一个比较器
问题是查看该操作的时候, 并没有给我们提供比较器, 没得选只能让元素具备比较性
使用自定义的对象 可以使用 comprable 接口
*/
- public class _01SparkSecondarySortOps {
- public static void main(String[] args) {
- SparkConf conf = new SparkConf().setMaster("local[2]").setAppName(_01SparkSecondarySortOps.class.getSimpleName());
- JavaSparkContext jsc = new JavaSparkContext(conf);
- JavaRDD<String> linesRDD = jsc.textFile("D:/data/spark/secondsort.csv");
- JavaPairRDD<SecondarySort, String> ssRDD = linesRDD.mapToPair(new PairFunction<String, SecondarySort, String>() {
- @Override
- public Tuple2<SecondarySort, String> call(String line) throws Exception {
- String[] fields = line.split(" ");
- int first = Integer.valueOf(fields[0].trim());
- int second = Integer.valueOf(fields[1].trim());
- SecondarySort ss = new SecondarySort(first, second);
- return new Tuple2<SecondarySort, String>(ss, "");
- }
- });
- /*// 第一种方式: 使元素具备比较性
- JavaPairRDD<SecondarySort, String> sbkRDD = ssRDD.sortByKey(true, 1); // 设置 partition 为 1, 这样数据才整体有序, 否则只是 partition 中有序
- */
- /**
- * 第二种方式, 提供比较器
- * 与前面方式相反, 这次是: 第一列降序, 第二列升序
- */
- JavaPairRDD<SecondarySort, String> sbkRDD = ssRDD.sortByKey(new MyComparator<SecondarySort>() {
- @Override
- public int compare(SecondarySort o1, SecondarySort o2) {
- int ret = o2.getFirst() - o1.getFirst();
- if(ret == 0) {
- ret = o1.getSecond() - o2.getSecond();
- }
- return ret;
- }
- }, true, 1);
- sbkRDD.foreach(new VoidFunction<Tuple2<SecondarySort, String>>() {
- @Override
- public void call(Tuple2<SecondarySort, String> tuple2) throws Exception {
- System.out.println(tuple2._1);
- }
- });
- jsc.close();
- }
- }
- /**
- * 做一个中间的过渡接口
- * 比较需要实现序列化接口, 否则也会报异常
- * 是用到了适配器 Adapter 模式
- * 适配器模式 (Adapter Pattern) 是作为两个不兼容的接口之间的桥梁, 这里就是非常好的体现了.
- */
- interface MyComparator<T> extends Comparator<T>, Serializable{}
输出结果如下:
- 740 58
- 730 54
- 530 54
- 203 21
- 74 58
- 73 57
- 71 55
- 71 56
- 70 54
- 70 55
- 70 56
- 70 57
- 70 58
- 70 58
- 63 61
- 60 51
- 60 52
- 60 53
- 60 56
- 60 56
- 60 57
- 60 57
- 60 61
- 50 51
- 50 52
- 50 53
- 50 53
- 50 54
- 50 62
- 50 512
- 50 522
- 40 511
- 31 42
- 20 21
- 20 53
- 20 522
- 12 211
- 7 8
- 7 82
- 5 6
- 3 4
- 1 2
Scala 版本
测试代码如下:
package cn.xpleaf.bigdata.spark.scala.core.p3
- import cn.xpleaf.bigdata.spark.java.core.domain.SecondarySort
- import org.apache.spark.rdd.RDD
- import org.apache.spark.{SparkConf, SparkContext}
- import scala.reflect.ClassTag
- object _05SparkSecondarySortOps {
- def main(args: Array[String]): Unit = {
- val conf = new SparkConf().setMaster("local[2]").setAppName(_05SparkSecondarySortOps.getClass.getSimpleName)
- val sc = new SparkContext(conf)
val linesRDD = sc.textFile("D:/data/spark/secondsort.csv")
- /*
- val ssRDD:RDD[(SecondarySort, String)] = linesRDD.map(line => {
- val fields = line.split(" ")
- val first = Integer.valueOf(fields(0).trim())
- val second = Integer.valueOf(fields(1).trim())
- val ss = new SecondarySort(first, second)
- (ss, "")
- })
- // 第一种方式, 使用元素具备比较性
- val sbkRDD:RDD[(SecondarySort, String)] = ssRDD.sortByKey(true, 1)
- sbkRDD.foreach{case (ss:SecondarySort, str:String) => { // 使用模式匹配的方式
- println(ss)
- }}
- */
- /*// 使用 sortBy 的第一种方式, 基于原始的数据
- val retRDD = linesRDD.sortBy(line => line, numPartitions = 1)(new Ordering[String] {
- override def compare(x: String, y: String): Int = {
- val xFields = x.split(" ")
- val yFields = y.split(" ")
- var ret = xFields(0).toInt - yFields(0).toInt
- if(ret == 0) {
- ret = yFields(1).toInt - xFields(1).toInt
- }
- ret
- }
- }, ClassTag.Object.asInstanceOf[ClassTag[String]])
- */
- // 使用 sortBy 的第二种方式, 将原始数据做转换 --->sortBy()第一个参数的作用, 就是做数据的转换
- val retRDD:RDD[String] = linesRDD.sortBy(line => {
- // f: (T) => K
- // 这里 T 的类型为 String,K 是 SecondarySort 类型
- val fields = line.split(" ")
- val first = Integer.valueOf(fields(0).trim())
- val second = Integer.valueOf(fields(1).trim())
- val ss = new SecondarySort(first, second)
- ss
- }, true, 1)(new Ordering[SecondarySort] {
- override def compare(x: SecondarySort, y: SecondarySort): Int = {
- var ret = x.getFirst - y.getFirst
- if(ret == 0) {
- ret = y.getSecond - x.getSecond
- }
- ret
- }
- }, ClassTag.Object.asInstanceOf[ClassTag[SecondarySort]])
- retRDD.foreach(println)
- sc.stop()
- }
- }
输出结果如下:
- 1 2
- 3 4
- 5 6
- 7 82
- 7 8
- 12 211
- 20 522
- 20 53
- 20 21
- 31 42
- 40 511
- 50 522
- 50 512
- 50 62
- 50 54
- 50 53
- 50 53
- 50 52
- 50 51
- 60 61
- 60 57
- 60 57
- 60 56
- 60 56
- 60 53
- 60 52
- 60 51
- 63 61
- 70 58
- 70 58
- 70 57
- 70 56
- 70 55
- 70 54
- 71 56
- 71 55
- 73 57
- 74 58
- 203 21
- 530 54
- 730 54
- 740 58
TopN 问题
需求与说明
需求与数据说明如下:
* TopN 问题的说明:
* TopN 问题显然是可以使用 action 算子 take 来完成, 但是因为 take 需要将所有数据都拉取到 Driver 上才能完成操作,
* 所以 Driver 的内存压力非常大, 不建议使用 take.
*
* 这里要进行 TopN 问题的分析, 数据及需求如下:
* chinese ls 91
* english ww 56
* chinese zs 90
* chinese zl 76
* english zq 88
* chinese wb 95
* chinese sj 74
* english ts 87
* english ys 67
* english mz 77
* chinese yj 98
* english gk 96
*
* 需求: 排出每个科目的前三名
下面分别使用性能很低的 groupByKey 和性能很好的 combineByKey 来进行操作, 详细的说明已经在代码中给出, 注意其思想非常重要, 尤其是使用 combineByKey 来解决 groupByKey 出现的性能问题, 有兴趣的话, 可以好好阅读一下代码, 以及其所体现的思想, 因为这都跟 Spark 本身的理论紧密相关.
使用 groupByKey 解决
测试代码如下:
package cn.xpleaf.bigdata.spark.scala.core.p3
- import org.apache.log4j.{Level, Logger}
- import org.apache.spark.rdd.RDD
- import org.apache.spark.{SparkConf, SparkContext}
- import scala.collection.mutable
/**
* TopN 问题的说明:
* TopN 问题显然是可以使用 action 算子 take 来完成, 但是因为 take 需要将所有数据都拉取到 Driver 上才能完成操作,
* 所以 Driver 的内存压力非常大, 不建议使用 take.
*
* 这里要进行 TopN 问题的分析, 数据及需求如下:
* chinese ls 91
* english ww 56
* chinese zs 90
* chinese zl 76
* english zq 88
* chinese wb 95
* chinese sj 74
* english ts 87
* english ys 67
* english mz 77
* chinese yj 98
* english gk 96
*
* 需求: 排出每个科目的前三名
*
* 思路: 先进行 map 操作转换为 (subject, name + score) 的元组
* 再根据 subject 这个 key 进行 groupByKey, 这样就可以得到 gbkRDD
* 之后再对其进行 map 操作, 在 map 操作中使用 treeSet 得到前三名(既能控制大小, 又能进行排序)
*
* 问题:
* 上面的方案在生产过程中慎用
* 因为, 执行 groupByKey, 会将 key 相同的数据都拉取到同一个 partition 中, 再执行操作,
* 拉取的过程是 shuffle, 是分布式性能杀手! 再一个, 如果 key 对应的数据过多, 很有可能造成数据倾斜, 或者 OOM,
* 那么就需要尽量的避免这种操作方式.
* 那如何做到? 可以参考 MR 中 TopN 问题的思想, MR 中, 是在每个 map task 中对数据进行筛选, 虽然最后还是需要 shuffle 到一个节点上, 但是数据量会大大减少.
* Spark 中参考其中的思想, 就是可以在每个 partition 中对数据进行筛选, 然后再对各个分区筛选出来的数据进行合并, 再做一次排序, 从而得到最终排序的结果.
* 显然, 这样就可以解决前面说的数据到同一个 partition 中导致数据量过大的问题! 因为分区筛选的工作已经可以大大减少数据量.
* 那么在 Spark 中有什么算子可以做到这一点呢? 那就是 combineByKey 或者 aggregateByKey, 其具体的用法可以参考我前面的博客文章, 这里我使用 combineByKey 来操作.
*/
- object _06SparkTopNOps {
- def main(args: Array[String]): Unit = {
- val conf = new SparkConf().setMaster("local[2]").setAppName(_06SparkTopNOps.getClass.getSimpleName())
- val sc = new SparkContext(conf)
- Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
- Logger.getLogger("org.apache.hadoop").setLevel(Level.OFF)
- Logger.getLogger("org.spark_project").setLevel(Level.OFF)
- // 1. 转换为 linesRDD
- val linesRDD:RDD[String] = sc.textFile("D:/data/spark/topn.txt")
- // 2. 转换为 pairsRDD
- val pairsRDD:RDD[(String, String)] = linesRDD.map(line => {
- val fields = line.split(" ")
- val subject = fields(0).trim()
- val name = fields(1).trim()
- val score = fields(2).trim()
- (subject, name + "" + score) // ("chinese","zs 90")
- })
- // 3. 转换为 gbkRDD
- val gbkRDD:RDD[(String, Iterable[String])] = pairsRDD.groupByKey()
- println("==========TopN 前 ==========")
- gbkRDD.foreach(println)
- // (english,CompactBuffer(ww 56, zq 88, ts 87, ys 67, mz 77, gk 96))
- // (chinese,CompactBuffer(ls 91, zs 90, zl 76, wb 95, sj 74, yj 98))
- // 4. 转换为 retRDD
- val retRDD:RDD[(String, Iterable[String])] = gbkRDD.map(tuple => {
- var ts = new mutable.TreeSet[String]()(new MyOrdering())
- val subject = tuple._1 // chinese
- val nameScores = tuple._2 // ("ls 91", "ww 56", "zs 90", ...)
- for(nameScore <- nameScores) { // 遍历每一份成绩 "ls 91"
- // 添加到 treeSet 中
- ts.add(nameScore)
- if(ts.size> 3) { // 如果大小大于 3, 则弹出最后一份成绩
- ts = ts.dropRight(1)
- }
- }
- (subject, ts)
- })
- println("==========TopN 后 ==========")
- retRDD.foreach(println)
- sc.stop()
- }
- }
- // gbkRDD.map 中用于排序的 treeSet 的排序比较规则, 根据需求, 应该为降序
- class MyOrdering extends Ordering[String] {
- override def compare(x: String, y: String): Int = {
- // x 或者 y 的格式为:"zs 90"
- val xFields = x.split(" ")
- val yFields = y.split(" ")
- val xScore = xFields(1).toInt
- val yScore = yFields(1).toInt
- val ret = yScore - xScore
- ret
- }
- }
输出结果如下:
==========TopN 前 ==========
(chinese,CompactBuffer(ls 91, zs 90, zl 76, wb 95, sj 74, yj 98))
(english,CompactBuffer(ww 56, zq 88, ts 87, ys 67, mz 77, gk 96))
==========TopN 后 ==========
- (chinese,TreeSet(yj 98, wb 95, ls 91))
- (english,TreeSet(gk 96, zq 88, ts 87))
使用 combineByKey 解决
测试代码如下:
package cn.xpleaf.bigdata.spark.scala.core.p3
- import org.apache.log4j.{Level, Logger}
- import org.apache.spark.{SparkConf, SparkContext}
- import org.apache.spark.rdd.RDD
- import scala.collection.mutable
/**
* 使用 combineByKey 算子来优化前面的 TopN 问题
* 关于 combineByKey 算子的使用, 可以参考我的博客文章, 上面有非常详细的例子
* 一定要掌握, 因为非常重要
*/
- object _07SparkTopNOps {
- def main(args: Array[String]): Unit = {
- val conf = new SparkConf().setMaster("local[2]").setAppName(_07SparkTopNOps.getClass().getSimpleName())
- val sc = new SparkContext(conf)
- Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
- Logger.getLogger("org.apache.hadoop").setLevel(Level.OFF)
- Logger.getLogger("org.spark_project").setLevel(Level.OFF)
- // 1. 转换为 linesRDD
- val linesRDD:RDD[String] = sc.textFile("D:/data/spark/topn.txt")
- // 2. 转换为 pairsRDD
- val pairsRDD:RDD[(String, String)] = linesRDD.map(line => {
- val fields = line.split(" ")
- val subject = fields(0).trim()
- val name = fields(1).trim()
- val score = fields(2).trim()
- (subject, name + "" + score) // ("chinese","zs 90")
- })
- println("==========TopN 前 ==========")
- pairsRDD.foreach(println)
- // (chinese,sj 74)
- // (chinese,ls 91)
- // (english,ts 87)
- // (english,ww 56)
- // (english,ys 67)
- // (chinese,zs 90)
- // (english,mz 77)
- // (chinese,zl 76)
- // (chinese,yj 98)
- // (english,zq 88)
- // (english,gk 96)
- // (chinese,wb 95)
- // 3. 转换为 cbkRDD
val cbkRDD:RDD[(String, mutable.TreeSet[String])] = pairsRDD.combineByKey(createCombiner, mergeValue, mergeCombiners)
- println("==========TopN 后 ==========")
- cbkRDD.foreach(println)
- // (chinese,TreeSet(yj 98, wb 95, ls 91))
- // (english,TreeSet(gk 96, zq 88, ts 87))
- }
- // 创建一个容器, 这里返回一个 treeSet, 作为每个分区中相同 key 的 value 的容器
- def createCombiner(nameScore: String):mutable.TreeSet[String] = {
- // nameScore 格式为:"zs 90"
- // 指定排序规则 MyOrdering, 为降序排序
- val ts = new mutable.TreeSet[String]()(new MyOrdering())
- ts.add(nameScore)
- ts
- }
- // 合并分区中 key 相同的 value, 同时使用 treeSet 来进行排序
- def mergeValue(ts:mutable.TreeSet[String], nameScore:String):mutable.TreeSet[String] = {
- ts.add(nameScore)
- if(ts.size> 3) { // 如果超过 3 个, 删除一个再返回
- ts.dropRight(1) // scala 中的集合进行操作后, 本身不变, 但是会返回一个新的集合
- }
- ts
- }
- // 合并不同分区中 key 相同的 value 集合, 同时使用 treeSet 来进行排序
- def mergeCombiners(ts1:mutable.TreeSet[String], ts2:mutable.TreeSet[String]):mutable.TreeSet[String] = {
- var newTS = new mutable.TreeSet[String]()(new MyOrdering())
- // 将分区 1 中集合的 value 添加到新的 treeSet 中, 同时进行排序和控制大小
- for(nameScore <- ts1) {
- newTS.add(nameScore)
- if(newTS.size> 3) { // 如果数量大于 3, 则删除一个后再赋值给本身
- newTS = newTS.dropRight(1)
- }
- }
- // 将分区 2 中集合的 value 添加到新的 treeSet 中, 同时进行排序和控制大小
- for(nameScore <- ts2) {
- newTS.add(nameScore)
- if(newTS.size> 3) { // 如果数量大于 3, 则删除一个后再赋值给本身
- newTS = newTS.dropRight(1)
- }
- }
- newTS
- }
- }
输出结果如下:
==========TopN 前 ==========
- (chinese,ls 91)
- (chinese,sj 74)
- (english,ww 56)
- (english,ts 87)
- (chinese,zs 90)
- (english,ys 67)
- (chinese,zl 76)
- (english,mz 77)
- (english,zq 88)
- (chinese,yj 98)
- (chinese,wb 95)
- (english,gk 96)
==========TopN 后 ==========
- (english,TreeSet(gk 96, zq 88, ts 87))
- (chinese,TreeSet(yj 98, wb 95, ls 91))
来源: http://blog.51cto.com/xpleaf/2108763