一, 前期准备
前期的环境准备, 在 Linux 系统下要有 Hadoop 系统, spark 伪分布式或者分布式, 具体的教程可以查阅我的这两篇博客:
Hadoop2.0 伪分布式平台环境搭建
Spark2.4.0 伪分布式环境搭建
然后在 spark 伪分布式的环境下必须出现如下八个节点才算 spark 环境搭建好.
然后再在本地 Windows 系统下有一个简单的词频统计程序.
- import org.apache.spark.SparkConf
- import org.apache.spark.SparkContext
- import org.apache.spark.rdd.RDD
- object ScalaSparkDemo {
- def main(args: Array[String]) {
- /**
- * 第一步: 创建 Spark 的配置对象 SparkConf, 设置 Spark 程序的运行时的配置信息,
- * 例如说通过 setMaster 来设置程序要连接的 Spark 集群的 Master 的 URL,
- * 如果设置为 local, 则代表 Spark 程序在本地运行, 特别适合于机器配置条件非常差
- * (例如只有 1G 的内存)的初学者
- */
- val conf = new SparkConf() // 创建 SparkConf 对象, 由于全局只有一个 SparkConf 所以不需要工厂方法
- conf.setAppName("wow,my first spark app") // 设置应用程序的名称, 在程序的监控界面可以看得到名称
- //conf.setMaster("local") // 此时程序在本地运行, 不需要安装 Spark 集群
- /**
- * 第二步: 创建 SparkContext 对象
- * SparkContext 是 Spark 程序所有功能的唯一入口, 无论是采用 Scala,Java,Python,R 等都必须要有一个
- * SparkContext
- * SparkContext 核心作用: 初始化 Spark 应用程序运行所需要的核心组件, 包括 DAGScheduler,TaskScheduler,SchedulerBacked,
- * 同时还会负责 Spark 程序往 Master 注册程序等
- * SparkContext 是整个 Spark 应用程序中最为至关重要的一个对象
- */
- val sc = new SparkContext(conf) // 创建 SpackContext 对象, 通过传入 SparkConf 实例来定制 Spark 运行的具体参数的配置信息
- /**
- * 第三步: 根据具体的数据来源 (HDFS,HBase,Local,FileSystem,DB,S3) 通过 SparkContext 来创建 RDD
- * RDD 的创建基本有三种方式,(1)根据外部的数据来源 (例如 HDFS)(2) 根据 Scala 集合 (3) 由其它的 RDD 操作
- * 数据会被 RDD 划分为成为一系列的 Partitions, 分配到每个 Partition 的数据属于一个 Task 的处理范畴
- */
- // 读取本地文件并设置为一个 Partition
- // val lines = sc.textFile("words.txt", 1) // 第一个参数为为本地文件路径, 第二个参数 minPartitions 为最小并行度, 这里设为 1
- sc.setLogLevel("WARN")
- val lines = sc.parallelize(List("pandas","i like pandas"))
- // 类型推断 , 也可以写下面方式
- // val lines : RDD[String] =sc.textFile("words.txt", 1)
- /**
- * 第四步: 对初始的 RDD 进行 Transformation 级别的处理, 例如 map,filter 等高阶函数
- * 编程. 来进行具体的数据计算
- * 第 4.1 步: 将每一行的字符串拆分成单个的单词
- */
- // 对每一行的字符串进行单词拆分并把所有行的结果通过 flat 合并成一个大的集合
- val words = lines.flatMap { line => line.split(" ") }
- /**
- * 第 4.2 步在单词拆分的基础上, 对每个单词实例计数为 1, 也就是 Word=>(Word,1)tuple
- */
- val pairs = words.map { Word => (Word, 1) }
- /**
- * 第 4.3 步在每个单词实例计数为 1 的基础之上统计每个单词在文中出现的总次数
- */
- // 对相同的 key 进行 value 的累加(包括 local 和 Reduce 级别的同时 Reduce)
- val wordCounts = pairs.reduceByKey(_ + _)
- // 打印结果
- wordCounts.foreach(wordNumberPair => println(wordNumberPair._1 + ":" + wordNumberPair._2))
- // 释放资源
- sc.stop()
- }
- }
二, 导出 jar 包
这里注意词频统计程序的包名为 test, 类名为 ScalaSparkDemo.
注意这里勾选要打包所依赖的一些文件. 当然可以选择把整个工程打包. 还要注意这里打包后的文件名为 test.jar.
然后上传到 Ubuntu 中, 使用这个命令 bin/spark-submit --class test.ScalaSparkDemo --master local /home/xiaow/test.jar 即可运行./home/xiaow/test.jar: 指明此 jar 包在主节点上的位置. 关于打包到集群的详细命令, 可以查阅我的这一篇博客: Spark 学习之在集群上运行 Spark
如此, 搞定收工!!!
来源: http://www.bubuko.com/infodetail-3049217.html