本文以 WordCount 为例, 画图说明 spark 程序的执行过程
WordCount 就是统计一段数据中每个单词出现的次数,
例如 hello spark hello you 这段文本中 hello 出现 2 次, spark 出现 1 次, you 出现 1 次.
先上完整代码:
- object WordCount {
- def main(args: Array[String]) {
- val conf = new SparkConf().setAppName("WordCount");
- val sc = new SparkContext(conf)
- val lines = sc.textFile("hdfs://xxx:9000/spark.txt", 3);
- val words = lines.flatMap { line => line.split("\s+") }
- val pairs = words.map { Word => (Word, 1) }
- val wordCounts = pairs.reduceByKey { _ + _ }
- wordCounts.foreach(wordCount => println(wordCount._1 + "appeared" + wordCount._2 + "times."))
- }
- }
上面几行代码就把 hdfs 上的 spark.txt 中每个单词出现的个数计算完成.
Spark 集群的执行单位是 Application, 任何提交的任务都会产生一个 Application. 一个 Application 只会关联上一个 Spark 上下文, 也就是 SparkContext. 构建 SparkContext 时可以传入 Spark 相关配置, 也就是 SparkConf, 它可以用来指定 Application 的名称, 任务需要的 CPU 核数 / 内存大小, 调优需要的配置等等. 以下两行创建了 SparkContext:
- val conf = new SparkConf().setAppName("WordCount");
- val sc = new SparkContext(conf)
创建完 SparkContext 之后, spark.txt 的文件数如何被 spark 处理的呢, 让我们一起看一下:
首先我们假设 spark.txt 在 hdfs 上对应着 3 个文件, 文件内容都一样, sc.textFile("hdfs://xxx:9000/spark.txt", 3)也执行了最小分区数为 3.
然后 wordcount 执行过程如下:

说明:
绿, 红, 黄色箭头的地方发生了 `Shuffer, 把整个任务分成了 2 个 Stage(2 个蓝色虚线框)
红色虚线框代表一个 Partition 窄依赖 (每个分区只被子 RDD 的一个分区所使用) 的运行过程, 多个 partition 是并行执行的
reduceByKey 会先把每个 Partition 中的数据预聚合(groupByKey 不会)
Stage 中的数据都是在内存中, 不像 MapReduce 会频繁写磁盘, 速度很快.
补充: 其实
textFile,flatMap,map,reduceByKey
等 transformation 操作都是 lazy 的, 程序执行到这里不会立即执行, 只有再触发 action 操作的时候才会执行, 此例中为 wordCounts.foreach 这个 action 操作.
来源: https://www.cnblogs.com/wangtcc/p/da-huaSpark-3yi-tu-shen-ru-li-jieWordCount-cheng-x.html