学习 Spark,首先要熟悉 Scala,当然你说你会 Python 或者 Java 能不能玩 Spark?能!但是不推荐,首推 Scala,因为 Scala 非常便捷,而且 Scala 有非常好的交互式编程体验(当然了,在这里,Python 也不差).其次呢,我们要对 Hadoop 的 MapReduce 要是有一定的了解.不然,学习起来,是会稍微费点功夫.好,不扯这么多了,相关的故事啊,疑问啊可以评论留言询问或者百度查询,我们现在直接进入正题.
Spark 特征简述
* Spark 是什么官方描述:Spark is a fast and general engine for large-scale data processing
** Spark 是一个快速的,通用的,大数据规模的运算引擎.这是一个非常精准的描述.
** Spark 是基于 MapReducer 实现的通用的分布式计算框架,所以它继承了 MapReduce 的优点,同时还支持将 Job 运算任务产生的中间结果和最终结果保存在内存中.
* Spark 优势
** Spark 的中间数据放到内存中,对于迭代运算效率更高
** 运算速度奇快
** 更灵活的数据操作,比如:map, filter, flatMap, sample, groupByKey, reduceByKey, union, join, cogroup, mapValues, sort,partionBy 等等
* Spark 不适合做什么
** 不适合做增量变化的应用模型
* Spark 支持语言
Java,Scala,Python
* 适用场景讨论
** 适用于需要多次操作特定数据集的应用场合.需要反复操作的次数越多,所需读取的数据量越大,受益越大,数据量小但是计算密集度较大的场 * 合,受益就相对较小.
Spark 下载
一般情况下,我们使用 spark 之前,都需要下载源码,然后根据自己的集群环境(也就是 Hadoop 版本)进行编译,然后再安装使用.
Spark 下载:
http://spark.apache.org/downloads.html
打开页面后,做出如下选择,即可开始下载源码
在这里我们使用 1.6.1 的源码
Spark 编译
在此我们简单介绍两种方式:
** SBT 编译
这是一个类似 Maven 的仓库,基于 Scala
** Maven 编译
命令:
** make-distribution.sh 编译
修改源码根目录下的 make-distribution.sh 文件,修改内容如图:
依次为:配置 Spark 版本,Scala 版本,Hadoop 版本,是否支持 Hive,1 为支持
配置镜像:注意,如果编译的是原版,请添加此镜像,如果编译的是 CDH 版本的,请注意去掉此镜像.
配置域名解析服务器:
$ sudo vi /etc/resolv.conf,配置如下:
最后执行命令:
nameserver 8.8.8.8
nameserver 8.8.4.4
注意要支持 yarn 和 hive
世界充满爱之编译好的 Spark 传送门(分别包含包含 Apache 和 CDH 版本的):
链接:http://pan.baidu.com/s/1eRBJtjs 密码:t03u
Spark 运行模式
** Local
即本地模式
** Standalone
即 Spark 自带的集群模式,分为 Master 节点和 Worker 节点,顾名思义,一个管理者,多个干活的.:)
** Yarn
国内相当主流的一种运行部署模式,只是目前 Yarn 分配的 Container 是不能够动态伸缩的,后续可能会考虑支持.
** Mesos
Spark 在出生的时候就考虑支持该框架,很灵活,但国内使用似乎不多,感兴趣请自行研究之.
Spark 安装部署
将 Spark 解压出来,然后到 conf 目录下,自己将 template 文件拷贝出文后提到的文件进行配置即可,在之前的章节我们已经提到过很多次,此步骤想必应该非常熟练了,不再赘述了.
Local 模式:spark-env.sh 文件配置如下:
Spark 测试案例之 Local 模式
在案例开始前,请确保你的 HDFS 是可用的,并且 spark-shell 在 active 的 NameNode 节点上运行.此刻建议你已经熟知 Hadoop 中 MapReduce 的编写过程以及运行原理.
案例一:基于本地模式的 WordCount,words.txt 中的内容:
Step1,进入 spark 根目录使用 $ bin/spark-shell 命令启动 spark,如下图:Step2,读取 / input/words.txt 文件,尝试检查一下 words.txt 文件有多少行数据,操作如下:
scala> val rdd = sc.textFile("/input/words.txt")
scala> rdd.count
当然了,统计词频,这个步骤可以省略,在此只是想验证一下自己读取到的数据有没有问题
好,大家可以看到,有 3 行数据,每一行都有若干英文单词.那么这里面涉及到几个问题需要拿出来讨论一下:
1,什么是 rdd?
RDD is a fault-tolerant collection of elements that can be operated on in parallel,RDD 是弹性分布式数据集,全称 Resilient Distributed Datasets,具有分布式,高容错性等特点,在这里,刚开始接触的话,你可暂且理解为一个集合就可以了,一个数据集合.
2,什么是 sc?
sc 的全称是 SparkContext,即 Spark 的上下文对象,这个理解可以类比于在 Hadoop 阶段我们在 MapReduce 中接触到的 Context,不管是读取文件还是其他数据操作,都依赖于 SparkContext 的实例化.在这里,sc 即一个实例化好的 SparkContext 对象.
我们通过 sc.textFile 方法读取到 HDFS 系统中存放的 words.txt 文件信息,该方法返回一个 RDD 对象,之后我们通过 rdd 对象调用 count 方法,来查看读取到的文件中数据有多少行.
Step3,利用得到的 rdd 对象进行数据的拆分,即,每一个单词都拆分成一个 RDD 对象,比如类似这样的理解:RDD<String> rdd = new RDD("hadoop");那么使用 scala 在 spark 中如何做呢?请看:
scala> val wordRdd = rdd.flatMap(line => line.split(""))
然后我们使用 wordRdd 显示一下第一个单词看一看:
scala> wordRdd.first
Step4,将分割出来的每一个单词做 Map 映射
scala> val mapRdd = wordRdd.map(word => (word, 1))
这是 scala 的高阶函数,注意不理解请重新复习 Scala 语言.该语句的意思是:将 wordRdd 中存放的单词映射为一个 tuple 元组,元组中有两个元素,第一个元素为单词,第二个元素为当前单词本次的个数,固定为 1,这个 1 就像 Hadoop 阶段中 Map 的 LongWritable 一样,这个 word 就像 Text 一样.
Step5,这一步要做的就是讲 map 映射出来的数据集进行 reduce 运算
scala> val reduceRdd = mapRdd.reduceByKey((x, y) => x + y)
该行代码的意思是将某一个单词的好多个 1(当然如果进行 Combine 操作了,也许可能不是多个 1,如果你无法理解我这一句在说什么,请继续前进,然后重新复习 Hadoop 的 MapReduce 相关知识点)进行相加运算.
Step6,查看一下结果
scala> reduceRdd.collect
显示出来了,而且执行过程非常的迅速,你懂得.
当然了,以上的操作,完全可以使用一句话来实现,并且代码的体现形式可以非常骚气,如:
scala> sc.textFile("/input/words.txt").flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_).collectStep7,当然了结果也可以输出到 HDFS 系统当中,比如:
scala> reduceRdd.saveAsTextFile("/output/spark/test01")
案例二:基于案例一,进行二次排序,即,将统计出的词频结果按照降序或者升序排列
Step1,得到案例一的统计好的词频结果,然后做一个 map 映射,将单词和单词出现的次数颠倒过来,也就是说,(hadoop, 1) 变成 (1, hadoop),这样做的原因是因为 OrderedRDDFunctions 类中有一个方法叫做:sortByKey,意思是按照 Key 的大小进行排序,默认参数是升序,如图:
sc.textFile("/input/words.txt")
.flatMap(_.split(" "))
.map((_, 1))
.reduceByKey(_ + _)
.map(x => (x._2, x._1))
.sortByKey()
.map(x => (x._2, x._1))
.collect
为了使用该方法,我们这么做:
上一个案例,我们得到:
val reduce = sc.textFile("/input/words.txt").flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
然后:
val reverseRdd = reduce.map(x => (x._2, x._1))
然后我们看一眼这个 RDD 集合:
Step2,直接使用 sortByKey 进行默认排序
val sortRdd = reverseRdd.sortByKey()
Step3,排序结束你不得给人家再反转回来?所以:
sortRdd.map(x => (x._2, x._1)).collect,如图:
当然了,以上分解步骤一气呵成最爽快:
Step4,当然了,sortByKey 方法也可以实现倒序,如:
sc.textFile("/input/words.txt")
.flatMap(_.split(" "))
.map((_, 1))
.reduceByKey(_ + _)
.map(x => (x._2, x._1))
.sortByKey()
.map(x => (x._2, x._1))
.collect
Step5,二次排序还可以使用 toptop 源码:
sc.textFile("/input/words.txt")
.flatMap(_.split(" "))
.map((_, 1))
.reduceByKey(_ + _)
.map(x => (x._2, x._1))
.sortByKey(false)
.map(x => (x._2, x._1))
.collect
这是一个柯里化的函数,top 命令是查看前多少条数据,如图可见,在查看之时,元素也是排序好的
比如:
输出如图:
sc.textFile("/input/words.txt")
.flatMap(_.split(" "))
.map((_, 1))
.reduceByKey(_ + _)
.map(x => (x._2, x._1))
.top(12)
Spark 运行模式之 Standalone
配置:spark-env.sh
Master 节点:SPARK_MASTER_IP=z01
Master 节点端口号:SPARK_MASTER_PORT=7077
Master webUI 端口号:SPARK_MASTER_WEBUI_PORT=8080
Worker 节点可用 CPU 核心数:SPARK_WORKER_CORES=2
Worker 可用内存:SPARK_WORKER_MEMORY=2g
Worker 端口号:SPARK_WORKER_PORT=7078
Worker WebUI 端口号:SPARK_WORKER_WEBUI_PORT=8081
允许在每台机器上开启几个 Worker 进程,默认为 1 个 SPARK_WORKER_INSTANCES=1
配置:slaves
即配置允许哪几台机器当做 Woker 节点
以上配置完成后,scp 到其他集群节点启动:Master$ sbin/start-master.sh
Worker$ sbin/start-slaves.sh
完成后通过 z01:8080 端口访问即可如图所示:
也可以 JPS 看一下进程:
在 Standalone 上运行 Spark
首先,查看一下 spark 的帮助文档来引导该怎么做:
$ bin/spark-shell --help
注意红框内的内容,那么接下来,我们应该知道怎么让 spark 运行在 standalone 上了:
$ bin/spark-shell --master spark://z01:7077
如图:注意红框内容
尖叫提示:如果直接不加参数的使用 spark-shell 方式启动,则还是在本地模式(Local)启动的.
Spark 测试案例之 Standalone 模式
案例一:跑一个一气呵成的 WordCount
scala> sc.textFile("/input/words.txt").flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_).collect
WEBUI,http://192.168.122.200:4040/jobs/ 如图:
可以看到,有一个 Job 任务已经运行完毕了.
案例二:做一个每日的 PV 分析 Step1,首先,我们将网站的访问数据导入到 hive 当中,执行:
$ cat hql-file/track-log.hql
其中 track-log.hql 文件如下:
该部分内容可以参看 Hive 框架基础(一)
Step2,通过 Hive 查看 track_log 文件在哪
hive> desc formatted track_log;
如图:注意红框内容,对于我们来讲,有用的即:/user/hive/warehouse/track_log/2015082818
Step3,将日志数据读入到 RDD 中等待分析
scala> val rdd = sc.textFile("/user/hive/warehouse/track_log/2015082818")
Step4,清洗无效的数据,即空白行,以及 url 字段为空的,我们要过滤掉
1,先过滤空白行
2,再分割字段值
3,最后过滤 url 字段为空的
综合来写:
scala> val validRdd = rdd.filter(line => line.length > 0).map(_.split("\t")).filter(arr => arr(1).length > 0)
当然了,此时你可以 count 一下,看看过滤后剩下多少数据
scala> validRdd.count
Step4,将 URL 做 map 映射,比如做出这样的映射:(今日日期, 1)那么今日的日期在 tracktime 字段,属于分割后的数组的第 17 个索引处在 hive 中我们查看一下该日期的格式:hive> select tracktime from track_log limit 1;
如图:
那么截取出 2015-08-28 应该很容易,所以:
scala> val mapRdd = validRdd.map(arr => (arr(17).substring(0, 10), 1))
Step5,你懂得,再来一个 Reduce 即可
scala> val reduceRdd = mapRdd.reduceByKey(_ + _)
完事之后可以查看一下结果:
scala> reduceRdd.collect
如图:
当然了也可以一气呵成走你:
Step6,我们使用 Hive 来验证一下注意如果你的 Yarn 没有启动,需要将 Hive 设置成 Local 模式:
scala> sc.textFile("/user/hive/warehouse/track_log/2015082818")
.filter(line => line.length > 0)
.map(_.split("\t"))
.filter(arr => arr(1).length > 0)
.map(arr => (arr(17).substring(0, 10), 1))
.reduceByKey(_ + _)
.collect
hive> set hive.exec.mode.local.auto = true;
然后执行:
结果如图:
对比可知,两个结果是一样的.
案例三:PV 和 UV 分析 PV:即页面访问次数 UV:即不同用户访问页面次数 Step1,读取网站日志文件生成 RDD 对象
scala> val rdd = sc.textFile("/user/hive/warehouse/track_log")
Step2,过滤不必要的数据,并生成 map 映射,注意此时的操作与之前的案例略有不同,请注意观察,如图:
Step3,可选步骤,此处可以将数据 cache 到内存中,注意,cache 后,不会立刻缓存到内存中,需要执行一个 action,比如 count,take,collect 都可以
scala> val mapRdd = rdd.filter(_.length> 0).map(line => {
| val arr = line.split("\t")
| val date = arr(17).substring(0, 10)
| val guid = arr(5)
| val url = arr(1)
| (date, guid, url)
| }).filter(tuple => tuple._3.length > 0)
在此之后就可以在 4040 端口的页面是 storge 选项中看到缓存到内存中的数据信息,如图:Step4,统计 PV
scala> mapRdd.cache
scala> mapRdd.count
scala> val pvRdd = mapRdd.map(tuple => (tuple._1, 1)).reduceByKey(_ + _)
scala> pvRdd.first,如图:
Step5,UV 统计
scala> val uvRdd = mapRdd.map(tuple => (tuple._1 +"_"+ tuple._2, 1)).distinct.map(tuple => {
val arr = tuple._1.split("_")
(arr(0), 1)
}).reduceByKey(_ + _)
此时可以自行使用 uvRdd.first 查看结果,不再展示
Step6,合并 PV 和 UV 的结果进行显示 union 方式:
scala> val pv_uvRdd = pvRdd.union(uvRdd)
scala> pv_uvRdd.collect,如图:
join 方式:
scala> val pv_uvRdd = pvRdd.join(uvRdd)
scala> pv_uvRdd.first,如图:
验证:使用 Hive 或者 SparkSQL 验证结果一致性首先创建 SQL 语句:SparkSQL 方式:
scala> val sql = """上边的 SQL 代码""",如图:
然后执行:
scala> val result = sqlContext.sql(sql)
scala> result.show()
尖叫提示:如果你的 hive 使用了 thrift 的 metastore 方式,请把 hive 的 hive-site.xml 文件软连接到 spark 的 conf 目录下!!否则上述指令将会出现找不到 table 的错误.HIVE 方式:直接使用 Hive 客户端执行上面的 SQL 语句,如图:
Spark 任务历史服务
对于 Yarn 有 mr-historyserver
对于 Spark 有 SparkHistory
所以应该很容易明白这是一个任务日志的历史服务,比如你可以查看昨天半夜运行的任务情况.
开启这个服务也很简单:
可以参看:http://spark.apache.org/docs/1.6.1/monitoring.html
Step1,配置参数配置:spark-env.sh,日志默认是保存在本地的,此刻我们将日志保存到 HDFS 系统当中如图:配置:spark-defaults.conf,spark 启动时默认加载的配置文件Step2,在 HDFS 系统中创建目录 / user/z/spark-eventsStep3,将配置文件重新 scp 到其他节点之后,重启服务,然后开启历史服务
$ sbin/start-history-server.sh
JPS 看一眼:
然后在浏览器打开:http://z01:18080/
如图:
Step4,测试玩一玩?
$ bin/spark-shell --master spark://z01:7077
随便执行执行一个我们之前的案例任务,即可,运行几个任务,成功运行几个,再失败几个,如图:
注意红框内容,如果你当前的 spark-shell 没有退出,那么该任务就是属于正在运行的任务.请自行切换观察即可.
* 总结
对于 RDD 到 RDD 的 操作,我们称之为 Transformation 操作
例如:我们在案例中使用的过滤,或者 map,或者 reduce 等等
对 RDD 到其他类型的操作,我们称之为 Action
例如:我们在案例中使用的 top,或者 take,collect 等操作
另外 RDD 中的数据可以持久化到内存中来操作,使用:
rdd.cache 来操作,比较适用于频繁使用的.
这一节我们大概了解了 Spark 的操作,也应该更加深刻的熟悉了 Scala 的操作.下一节我们针对 Spark 进行更深入的探讨.
个人微博:http://weibo.com/seal13
QQ 大数据技术交流群(广告勿入):476966007
来源: http://www.bubuko.com/infodetail-2459060.html