- 将程序达成jar 包
- 在项目名称上右击点击export选择java 下的jar file,点击next,选择输出目录,输入文件名,点击next,点击next,然后点击完成。导出jar 包。
- 将jar放到系统某个目录中。执行
- . / spark - submit--class com.dt.spark.WordCount_Cluster--master spark: //worker1:7077 ./wordcount.jar
也可以将以上命令保存到. sh 文件中,直接执行 sh 文件即可。
由于 spark1.6 需要 scala 2.10.X 版本的。推荐 2.10.4,java 版本最好是 1.8。所以提前我们要需要安装好 java 和 scala 并在环境变量中配置好
安装完成以后启动 IDEA,并进行配置,默认即可,然后点击 ok 以后,设置 ui 风格,然后点击 next 会出现插件的选择页面,默认不需求修改,点击 next,选择安装 scala 语言,点击 install 按钮(非常重要,以为要开发 spark 程序所以必须安装),等安装完成以后点击 start 启动 IDEA。
点击 create new project ,然后填写 project name 为 "Wordcount",选择项目的保存地址 project location。
然后设置 project sdk 即 java 的安装目录。点击右侧的 new 按钮,选择 jdk,然后选择 java 的安装路径即可。
然后选择 scalasdk。点击右侧的 create ,默认出现时 2.10.x 版本的 scala,点击 ok 即可。然后点击 finish。
点击 file->project structure 来设置工程的 libraries。核心是添加 spark 的 jar 依赖。选择 Libraries ,点击右侧的加号,选择 java,选择 spark1.6.0 的 spark-1.6.0-bin-hadoop2.6\lib\spark-assembly-1.6.0-hadoop2.6.0.jar。点击 ok。稍等片刻后然后点击 ok(Libraries 作用于 WordCount),然后点击 apply,点击 ok。(这一步很重要,如果没有无法编写 spark 的代码)
在 src 上右击 new ->package 填入 package 的 name 为 com.dt.spark。
在包的名字上右击选择 new ->scala class 。在弹出框中填写 Name ,并制定 kind 为 object ,点击 ok。
- import org.apache.spark.SparkConf
- import org.apache.spark.rdd.RDD
- def main(args: Array[String]): Unit ={
- * 集群的master的URL,如果设置为local则在本地运行。
- val conf = new SparkConf()
- conf.setMaster("local")
- /**第2步,创建SparkContext对象,SparkContext是spark程序所有功能的唯一入口,其作用是初始化spark应用程序的
- * */
- * 数据被RDD划分为一系列的Partitions,分配到每个partition的数据属于一个Task的处理范畴
- val lines = sc.textFile("G://datarguru spark//tool//spark-1.4.0-bin-hadoop2.6//README.md", 1) //读取本地文件并设置一个partition
- /**第4步,对初始的RDD进行Transformation级别的处理,如map、filter高阶函数编程,进行具体计算
- val words = lines.flatMap{ line => line.split(" ")}//对每行字符串进行单词拆分,并把所有拆分结果通过flat合并成一个大的单词集合
- (word, 1)} //在单词拆分基础上对每个单词实例计数为1
- wordCounts.foreach(wordNumberPair => println(wordNumberPair._1 + ":" + wordNumberPair._2))
- }
- 在代码去右击选择点击run"wordCount"来运行程序。在生成环境下肯定是写自动化shell 脚本自动提交程序的。
- 注意:如果val sc = new SparkContext(conf)报错,并且没有运行结果,需要将scala的module改成scala
- 2.10版本的。具体操作:File->project structure -> Dependencies ->删除scala
- 2.11.x的module.-> 左上角的"+" -> scala ->选中scala2.10.4 -> apply
- import org.apache.spark.SparkConf
- import org.apache.spark.rdd.RDD
- def main(args: Array[String]): Unit ={
- * 集群的master的URL,如果设置为local则在本地运行。
- val conf = new SparkConf()
- //conf.setMaster("spark://master:7077")
- * 核心组件,包括DAGScheduler,TaskScheduler,SchedulerBackend
- val sc = new SparkContext(conf)
- /**第3步,根据数据源(HDFS,HBase,Local FS)通过SparkContext来创建RDD
- * */
- /**第4步,对初始的RDD进行Transformation级别的处理,如map、filter高阶函数编程,进行具体计算
- val words = lines.flatMap{ line => line.split(" ")}//对每行字符串进行单词拆分,并把所有拆分结果通过flat合并成一个大的单词集合
- (word, 1)} //在单词拆分基础上对每个单词实例计数为1
- pairs._2, pairs._1)).sortByKey(false).map(pair=>(pair._1, pair._2))//相同的key,value累加并且排名
- println(wordNumberPair._1 + ":" + wordNumberPair._2))
- }
在 spark 中执行 wordcount 方法。 将 jar 放到 linux 系统某个目录中。执行
- 将程序达成jar 包
- 点击file->project structure,在弹出的页面点击Artifacts,点击右侧的"+",选择jar –> from
- modules with dependencies,在弹出的页面中,设置好main class
- 然后点击ok,在弹出页面修改Name(系统生成的name不规范)、导出位置并删除scala和spark的jar(因为集群环境中已经存在)点击ok
- 。然后在菜单栏中点击build –> Artifacts ,在弹出按钮中,点击bulid,会自动开始打包。
- 注意事项:
- 为什么不能再ide开发环境中,直接发布spark程序到spark集群中?
- 1. 开发机器的内存和cores的限制,默认情况情况下,spark程序的dirver在提交spark程序的机器上,如果在idea中提交程序的话,那idea机器就必须非常强大。
- 2. Dirver要指挥workers的运行并频繁的发生同学,如果开发环境和spark集群不在同样一个网络下,就会出现任务丢失,运行缓慢等多种不必要的问题。
- 3. 这是不安全的。
- xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- 4.0.0</modelVersion>
- com.dt.spark</groupId>
- SparkApps</artifactId>
- 0.0.1-SNAPSHOT</version>
- jar</packaging>
- SparkApps</name>
- http://maven.apache.org</url>
- <properties>
- UTF-8</project.build.sourceEncoding>
- <dependencies>
- junit</groupId>
- junit</artifactId>
- 3.8.1</version>
- test</scope>
- org.apache.spark</groupId>
- spark-core_2.10</artifactId>
- 1.6.0</version>
- org.apache.spark</groupId>
- spark-sql_2.10</artifactId>
- 1.6.0</version>
- org.apache.spark</groupId>
- spark-hive_2.10</artifactId>
- 1.6.0</version>
- org.apache.spark</groupId>
- spark-streaming_2.10</artifactId>
- 1.6.0</version>
- org.apache.hadoop</groupId>
- hadoop-client</artifactId>
- 2.6.0</version>
- org.apache.spark</groupId>
- spark-streaming-kafka_2.10</artifactId>
- 1.6.0</version>
- org.apache.spark</groupId>
- spark-graphx_2.10</artifactId>
- 1.6.0</version>
- <build>
- src/main/java</sourceDirectory>
- src/main/test</testSourceDirectory>
- <plugins>
- maven-assembly-plugin</artifactId>
- jar-with-dependencies</descriptorRef>
- make-assembly</id>
- package</phase>
- single</goal>
- org.codehaus.mojo</groupId>
- exec-maven-plugin</artifactId>
- 1.3.1</version>
- exec</goal>
- java</executable>
- false</includeProjectDependencies>
- compile</classpathScope>
- com.dt.spark.SparkApps.WordCount</mainClass>
- org.apache.maven.plugins</groupId>
- maven-compiler-plugin</artifactId>
- 1.6</source>
- 1.6</target>
- </project>
- import java.util.Arrays;
- import scala.Function;
- public static void main(String[] args){
- //其底层就是scala的SparkContext
- String> lines = sc.textFile("G://datarguru spark//tool//spark-1.4.0-bin-hadoop2.6//README.md");
- String> words = lines.flatMap(new FlatMapFunction<String, String>(){
- public Iterable<String> call(String line)throws Exception{
- });
- JavaPairRDD<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>(){
- public Tuple2<String, Integer> call(String word)throws Exception{
- String, Integer>(word, 1);
- });
- JavaPairRDD<String, Integer> wordsCount = pairs.reduceByKey(new Function2<Integer, Integer, Integer>(){ //对相同的Key,进行Value的累计(包括Local和Reducer级别同时Reduce)
- public Integer call(Integer v1, Integer v2)throws Exception{
- });
- wordsCount.foreach(new VoidFunction<Tuple2<String, Integer>>(){
- public void call(Tuple2<String, Integer>pair)throws Exception{
- });
- }
- import java.util.Arrays;
- import scala.Function;
- public static void main(String[] args){
- String> lines = sc.textFile("/library/wordcount/input/Data");
- String> words = lines.flatMap(new FlatMapFunction<String, String>(){
- public Iterable<String> call(String line)throws Exception{
- });
- JavaPairRDD<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>(){
- public Tuple2<String, Integer> call(String word)throws Exception{
- String, Integer>(word, 1);
- });
- JavaPairRDD<String, Integer> wordsCount = pairs.reduceByKey(new Function2<Integer, Integer, Integer>(){
- public Integer call(Integer v1, Integer v2)throws Exception{
- });
- wordsCount.foreach(new VoidFunction<Tuple2<String, Integer>>(){
- public void call(Tuple2<String, Integer>pair)throws Exception{
- });
- }
四、彻底解析 wordcount 运行原理
即用 Spark 作单词计数统计,数据到底是怎么流动的, 参看一图:
- word,1)).reduceByKey(_+_).saveAsTextFile(outputPathwordcount)
(1)在 IntelliJ IDEA 中编写下面代码:
- import org.apache.spark.SparkConf
- object WordCount {
- valconf = new SparkConf()
- conf.setMaster("local")
- val lines = sc.textFile("D://tmp//helloSpark.txt", 1)
- line.split(" ") }
- (word,1) }
- wordCounts.foreach(wordNumberPair =>println(wordNumberPair._1 + " : " + wordNumberPair._2))
- }
- (2)在D盘下地tmp文件夹下新建helloSpark.txt文件,内容如下:
- Hello Hadoop
- Spark is awesome
- Flink : 1
- is : 1
- awesome : 1
- Scala : 1
- path: String,
- assertNotStopped()
- minPartitions).map(pair => pair._2.toString)
- 可以看出在进行了hadoopFile之后又进行了map操作。
- HadoopRDD从HDFS上读取分布式文件,并且以数据分片的方式存在于集群之中。
- * Return a new RDD by applying a function to all elements of this RDD.
- def map[U: ClassTag](f: T => U): RDD[U] = withScope {
- new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
- 读取到的一行数据(key,value的方式),对行的索引位置不感兴趣,只对其value事情兴趣。pair时有个匿名函数,是个tuple,取第二个元素。
- 此处又产生了MapPartitionsRDD。MapPartitionsRDD基于hadoopRDD产生的Parition去掉行的KEY。
- 注:可以看出一个操作可能产生一个RDD也可能产生多个RDD。如sc.textFile就产生了两个RDD:hadoopRDD和MapParititionsRDD。
- 下一步:
- line.split(" ") }
- * Return a new RDD by first applying a function to all elements of this
- */
- TraversableOnce[U]): RDD[U] = withScope {
- new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF))
- 可以看出flatMap又产生了一个MapPartitionsRDD,此时的各个Partition都是拆分后的单词。
- 下一步:
- (word,1) }
- reduceByKey是进行全局单词计数统计,对相同的key的value相加,包括local和reducer同时进行reduce。所以在map之后,本地又进行了一次统计,即local级别的reduce。
- shuffle前的Local Reduce操作,主要负责本地局部统计,并且把统计后的结果按照分区策略放到不同的File。
- 下一Stage就叫Reducer了,下一阶段假设有3个并行度的话,每个Partition进行Local Reduce后都会把数据分成三种类型。最简单的方式就是用HashCode对其取模。
- 至此都是stage1。
- Stage内部完全基于内存迭代,不需要每次操作都有读写磁盘,所以速度非常快。
- V): RDD[(K, V)] = self.withScope {
- v, func, func, partitioner)
- * Merge the values for each key using an associative and commutative reduce function. This will
- * to a "combiner" in MapReduce. Output will be hash-partitioned with numPartitions partitions.
- def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)] = self.withScope {
- }
- /**
- * also perform the merging locally on each mapper before sending results to a reducer, similarly
- * parallelism level.
- def reduceByKey(func: (V, V) => V): RDD[(K, V)] = sel
来源: http://www.bubuko.com/infodetail-1972269.html