spark 经典之单词统计
准备数据
既然要统计单词我们就需要一个包含一定数量的文本, 我们这里选择了英文原著《GoneWithTheWind》(《飘》)的文本来做一个数据统计, 看看文章中各个单词出现频次如何. 为了便于大家下载文本. 可以到 GitHub https://github.com/Sizhao/Spark-learning 上下载文本以及对应的代码. 我将文本放在项目的目录下.
首先我们要读取该文件, 就要用到 SparkContext 中的 textFile 的方法, 我们尝试先读取第一行.
scala 实现
- import org.apache.spark.{SparkConf, SparkContext}
- object WordCount {
- def main(args: Array[String]): Unit = {
- val conf = new SparkConf().setMaster("local").setAppName("WordCount")
- val sc = new SparkContext(conf)
- println(sc.textFile("./GoneWithTheWind").first())
- }
- }
java 实现
- import org.apache.spark.SparkConf;
- import org.apache.spark.API.java.JavaSparkContext;
- public class WordCountJava {
- public static void main(String[] args){
- SparkConf conf = new SparkConf().setMaster("local").setAppName("WordCountJava");
- JavaSparkContext sc = new JavaSparkContext(conf);
- System.out.println(sc.textFile("./GoneWithTheWind").first());
- }
- }
python 实现
- from pyspark import SparkConf,SparkContext
- conf = SparkConf().setMaster("local").setAppName("HelloWorld")
- sc = SparkContext(conf=conf)
- print(sc.textFile("./GoneWithTheWind").first())
得到输出
Chapter 1
以 scala 为例, 其余两种语言也差不多. 第一步我们创建了一个 SparkConf
val conf = new SparkConf().setMaster("local").setAppName("WordCount")
这里我们设置 Master 为 local, 该程序名称为 WordCount, 当然程序名称可以任意取, 和类名不同也无妨. 但是这个 Master 则不能乱写, 当我们在集群上运行, 用 spark-submit 的时候, 则要注意. 我们现在只讨论本地的写法, 因此, 这里只写 local.
接着一句我们创建了一个 SparkContext, 这是 spark 的核心, 我们将 conf 配置传入初始化
val sc = new SparkContext(conf)
最后我们将文本路径告诉 SparkContext, 然后输出第一行内容
println(sc.textFile("./GoneWithTheWind").first())
开始统计
接着我们就可以开始统计文本的单词数了, 因为单词是以空格划分, 所以我们可以把空格作为单词的标记.
scala 实现
- import org.apache.spark.{SparkConf, SparkContext}
- object WordCount {
- def main(args: Array[String]): Unit = {
- val conf = new SparkConf().setMaster("local").setAppName("WordCount")
- val sc = new SparkContext(conf)
- // 设置数据路径
- val text = sc.textFile("./GoneWithTheWind")
- // 将文本数据按行处理, 每行按空格拆成一个数组
- // flatMap 会将各个数组中元素合成一个大的集合
- val textSplit = text.flatMap(line =>line.split(" "))
- // 处理合并后的集合中的元素, 每个元素的值为 1, 返回一个元组(key,value)
- // 其中 key 为单词, value 这里是 1, 即该单词出现一次
- val textSplitFlag = textSplit.map(Word => (Word,1))
- //reduceByKey 会将 textSplitFlag 中的 key 相同的放在一起处理
- // 传入的 (x,y) 中, x 是上一次统计后的 value,y 是本次单词中的 value, 即每一次是 x+1
- val countWord = textSplitFlag.reduceByKey((x,y)=>x+y)
- // 将计算后的结果存在项目目录下的 result 目录中
- countWord.saveAsTextFile("./result")
- }
- }
java 实现
- import org.apache.spark.SparkConf;
- import org.apache.spark.API.java.JavaPairRDD;
- import org.apache.spark.API.java.JavaRDD;
- import org.apache.spark.API.java.JavaSparkContext;
- import org.apache.spark.API.java.function.FlatMapFunction;
- import org.apache.spark.API.java.function.Function2;
- import org.apache.spark.API.java.function.PairFunction;
- import scala.Tuple2;
- import java.util.Arrays;
- import java.util.Iterator;
- public class WordCountJava {
- public static void main(String[] args){
- SparkConf conf = new SparkConf().setMaster("local").setAppName("WordCountJava");
- JavaSparkContext sc = new JavaSparkContext(conf);
- // 设置数据的路径
- JavaRDD<String> textRDD = sc.textFile("./GoneWithTheWind");
- // 将文本数据按行处理, 每行按空格拆成一个数组, flatMap 会将各个数组中元素合成一个大的集合
- // 这里需要注意的是 FlatMapFunction 中 < String, String>, 第一个表示输入, 第二个表示输出
- // 与 Hadoop 中的 map-reduce 非常相似
- JavaRDD<String> splitRDD = textRDD.flatMap(new FlatMapFunction<String, String>() {
- @Override
- public Iterator<String> call(String s) throws Exception {
- return Arrays.asList(s.split(" ")).iterator();
- }
- });
- // 处理合并后的集合中的元素, 每个元素的值为 1, 返回一个 Tuple2,Tuple2 表示两个元素的元组
- // 值得注意的是上面是 JavaRDD, 这里是 JavaPairRDD, 在返回的是元组时需要注意这个区别
- //PairFunction 中 < String, String, Integer>, 第一个 String 是输入值类型
- // 第二第三个, String, Integer 是返回值类型
- // 这里返回的是一个 Word 和一个数值 1, 表示这个单词出现一次
- JavaPairRDD<String, Integer> splitFlagRDD = splitRDD.mapToPair(new PairFunction<String, String, Integer>() {
- @Override
- public Tuple2<String, Integer> call(String s) throws Exception {
- return new Tuple2<>(s,1);
- }
- });
- //reduceByKey 会将 splitFlagRDD 中的 key 相同的放在一起处理
- // 传入的 (x,y) 中, x 是上一次统计后的 value,y 是本次单词中的 value, 即每一次是 x+1
- JavaPairRDD<String, Integer> countRDD = splitFlagRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {
- @Override
- public Integer call(Integer integer, Integer integer2) throws Exception {
- return integer+integer2;
- }
- });
- // 将计算后的结果存在项目目录下的 result 目录中
- countRDD.saveAsTextFile("./resultJava");
- }
- }
python 实现
- from pyspark import SparkConf,SparkContext
- conf = SparkConf().setMaster("local").setAppName("HelloWorld")
- sc = SparkContext(conf=conf)
- # 设置数据的路径
- textData = sc.textFile("./GoneWithTheWind")
- # 将文本数据按行处理, 每行按空格拆成一个数组, flatMap 会将各个数组中元素合成一个大的集合
- splitData = textData.flatMap(lambda line:line.split(" "))
- # 处理合并后的集合中的元素, 每个元素的值为 1, 返回一个元组(key,value)
- # 其中 key 为单词, value 这里是 1, 即该单词出现一次
- flagData = splitData.map(lambda Word:(Word,1))
- # reduceByKey 会将 textSplitFlag 中的 key 相同的放在一起处理
- # 传入的 (x,y) 中, x 是上一次统计后的 value,y 是本次单词中的 value, 即每一次是 x+1
- countData = flagData.reduceByKey(lambda x,y:x+y)
- # 输出文件
- countData.saveAsTextFile("./result")
运行后在住目录下得到一个名为 result 的目录, 该目录如下图, SUCCESS 表示生成文件成功, 文件内容存储在 part-00000 中
我们可以查看文件的部分内容:
- ('Chapter', 1)
- ('1', 1)
- ('SCARLETT', 1)
- ('O'HARA', 1)
- ('was', 74)
- ('not', 33)
- ('beautiful,', 1)
- ('but', 32)
- ('men', 4)
- ('seldom', 3)
- ('realized', 2)
- ('it', 37)
- ('when', 19)
- ('caught', 1)
- ('by', 20)
- ('her', 65)
- ('charmas', 1)
- ('the', 336)
- ('Tarleton', 7)
- ('twins', 16)
- ('were.', 1)
- ('In', 1)
- ('face', 6)
- ('were', 49)
- ...
- ...
- ...
- ...
这样就完成了一个 spark 的真正 HelloWorld 程序 -- 单词计数. 对比三个语言版本的程序, 发现一个事实那就是, 用 scala 和 python 写的代码非常简洁而且易懂, 而 Java 实现的则相对复杂, 难懂. 当然这个易懂和难懂是相对而言的. 如果你只会 Java 无论如何你都应该从中能看懂 java 的程序, 而简洁的 scala 和 python 对你来说根本看不懂. 这也无妨, 语言只是工具, 重点看你怎么用. 况且, 我们使用 java8 的特性也可以写出简洁的代码.
java8 实现
- import org.apache.spark.SparkConf;
- import org.apache.spark.API.java.JavaPairRDD;
- import org.apache.spark.API.java.JavaRDD;
- import org.apache.spark.API.java.JavaSparkContext;
- import org.apache.spark.API.java.function.FlatMapFunction;
- import org.apache.spark.API.java.function.Function2;
- import org.apache.spark.API.java.function.PairFunction;
- import scala.Tuple2;
- import java.util.Arrays;
- import java.util.Iterator;
- public class WordCountJava {
- public static void main(String[] args){
- SparkConf conf = new SparkConf().setMaster("local").setAppName("WordCountJava");
- JavaSparkContext sc = new JavaSparkContext(conf);
- countJava8(sc);
- }
- public static void countJava8(JavaSparkContext sc){
- sc.textFile("./GoneWithTheWind")
- .flatMap(s->Arrays.asList(s.split(" ")).iterator())
- .mapToPair(s->new Tuple2<>(s,1))
- .reduceByKey((x,y)->x+y)
- .saveAsTextFile("./resultJava8");
- }
- }
spark 的优越性在这个小小的程序中已经有所体现, 计算一本书的每个单词出现的次数, spark 在单机上运行(读取文件, 生成临时文件, 将结果写到硬盘), 加载 - 运行 - 结束只花费了 2 秒时间.
对程序进行优化
程序是否还能再简单高效呢? 当然是可以的, 我们可以用 countByValue 这个函数, 这个函数正是常用的计数的方法.
scala 实现
- import org.apache.spark.{SparkConf, SparkContext}
- object WordCount {
- def main(args: Array[String]): Unit = {
- val conf = new SparkConf().setMaster("local").setAppName("WordCount")
- val sc = new SparkContext(conf)
- // 设置数据路径
- val text = sc.textFile("./GoneWithTheWind")
- // 将文本数据按行处理, 每行按空格拆成一个数组
- // flatMap 会将各个数组中元素合成一个大的集合
- val textSplit = text.flatMap(line =>line.split(" "))
- println(textSplit.countByValue())
- }
- }
运行得到结果
Map(Heknew -> 1, "Ashley -> 1,"Let's -> 1, anarresting -> 1, of. -> 1, pasture -> 1, war's -> 1, wall. -> 1, looks -> 2, ain't -> 7,.......
java 实现
- public class WordCountJava {
- public static void main(String[] args){
- SparkConf conf = new SparkConf().setMaster("local").setAppName("WordCountJava");
- JavaSparkContext sc = new JavaSparkContext(conf);
- countJava(sc);
- }
- public static void countJava(JavaSparkContext sc){
- // 设置数据的路径
- JavaRDD<String> textRDD = sc.textFile("./GoneWithTheWind");
- // 将文本数据按行处理, 每行按空格拆成一个数组, flatMap 会将各个数组中元素合成一个大的集合
- // 这里需要注意的是 FlatMapFunction 中 < String, String>, 第一个表示输入, 第二个表示输出
- // 与 Hadoop 中的 map-reduce 非常相似
- JavaRDD<String> splitRDD = textRDD.flatMap(new FlatMapFunction<String, String>() {
- @Override
- public Iterator<String> call(String s) throws Exception {
- return Arrays.asList(s.split(" ")).iterator();
- }
- });
- System.out.println(splitRDD.countByValue());
- }
- }
运行得到结果
{Heknew=1, "Ashley=1,"Let's=1, anarresting=1, of.=1, pasture=1, war's=1, wall.=1, looks=2, ain't=7, Clayton=1, approval.=1, ideas=1,
python 实现
- from pyspark import SparkConf,SparkContext
- conf = SparkConf().setMaster("local").setAppName("HelloWorld")
- sc = SparkContext(conf=conf)
- # 设置数据的路径
- textData = sc.textFile("./GoneWithTheWind")
- # 将文本数据按行处理, 每行按空格拆成一个数组, flatMap 会将各个数组中元素合成一个大的集合
- splitData = textData.flatMap(lambda line:line.split(" "))
- print(splitData.countByValue())
运行得到结果:
defaultdict(<class 'int'>, {'Chapter': 1, '1': 1, 'SCARLETT': 1, 'O'HARA': 1,'was': 74,'not': 33,'beautiful,': 1,'but': 32,'men': 4,
spark 的优越性在这个小小的程序中已经有所体现, 计算一本书的每个单词出现的次数, spark 在单机上运行(读取文件, 生成临时文件, 将结果写到硬盘), 加载 - 运行 - 结束只花费了 2 秒时间. 如果想要获取源代码以及数据内容, 可以前往我的 https://github.com/Sizhao/Spark-learning 下载.
来源: https://juejin.im/post/5c768f5b6fb9a049b348a811