在 hadoop、zookeeper、hbase、spark 集群环境搭建 中已经把环境搭建好了,工欲善其事必先利其器,现在器已经有了,接下来就要开搞了,先从 spark-shell 开始揭开 Spark 的神器面纱。
spark-shell 是 Spark 的命令行界面,我们可以在上面直接敲一些命令,就像 windows 的 cmd 一样,进入 Spark 安装目录,执行以下命令打开 spark-shell:
- bin / spark - shell--master spark: //hxf:7077 --executor-memory 1024m --driver-memory 1024m --total-executor-cores 4
executor-memory 是 slave 的内存,driver-memory 是 master 的内存,total-executor-cores 是所有的核数
终端显示如下图,可以看到 spark-shell 已经帮我们初始化了两个变量 sc、spark,sc 是 Spark context,spark 是 Spark session,没吃过猪肉见过猪跑,像这些包含 context 啊 session 啊不用想就很重要,同样 Spark 的执行就是靠这俩变量,目前先混个眼熟,日后再说
Spark 管理页面显示如下图:
OK,现在我们开始动手敲第一个例子,统计 Spark 目录下 README.md 这个文件中各个单词出现的次数:
首先给出完整的代码,方便大家有一个整体的思路:
- val textFile = sc.textFile("file:/data/install/spark-2.0.0-bin-hadoop2.7/README.md")
- val wordCounts = textFile.flatMap(line => line.split(" ")).map(word => (word,1)).reduceByKey((a, b) => a + b)
- wordCounts.collect()
代码很简单,但是第一次见到可能不是很理解,下面进行讲解
首先读取 README.md:
- valtextFile = sc.textFile("README.md")
这条代码是读取原始数据转化为 Spark 自己的数据格式 RDD,一般读取原始数据有两种方式
1、测试用法:调用 SparkContext 的 parallelize 方法
- valrdd = sc.parallelize(Array(1 to 10))
这样就获取到了 1 到 10 的数组,多用于测试程序,正式开发不用这种
2、正式用法:所有 Hadoop 可以使用的数据源 Spark 都可以使用,当然我们最常用的还是 SparkContext 的 textFile 方法,如读取 Hdfs 上的文件:
- valrdd = sc.parallelize("hadoop://hxf:9000/test/test.log")
上面通过 textFile 得到的结果叫做 RDD,是 Spark 的基础数据类型。
RDD 是 Resillient Distributed Dataset 的简称,意思是弹性分布式数据集,这个名字不是太好理解,但是我们可以从字面上了解到 RDD 是分布式的、并且是数据集合,假设分布式系统下有多个文件,这些文件有很多行,RDD 指的是所有这些文件所有行的集合,而不是单独某一行。所以我们对 RDD 进行的一系列操作都是对整个集合进行的操作,并且 Spark 是将整个 RDD 放在内存中进行处理,而不是像 MapReduce 那样放在磁盘中,所以 Spark 的运算速度才会比 MapReduce 快。
接下来继续讲解代码:
- val wordCounts = textFile.flatMap(line => line.split(" ")).map(word => (word,1)).reduceByKey((a, b) => a + b)
- wordCounts.collect()
最后的结果显示各个单词出现的次数,代码中的 flatMap、map、reduceByKey 是 RDD 的转化操作,collect 是 RDD 的行动操作,不理解没关系,后文详解。这一节先暂时讲到这里,欲听后事如何,请听下回分解。
来源: http://blog.csdn.net/trigl/article/details/70445949