一, Spark 介绍
Apache Spark is a fast and general-purpose cluster computing system. It provides high-level APIs in Java, Scala, Python and R, and an optimized engine that supports general execution graphs. It also supports a rich set of higher-level tools including Spark SQL for SQL and structured data processing, MLlib for machine learning, GraphX for graph processing, and Spark Streaming.
Spark 是一个快速且多功能的集群计算系统. 它为多种不同语言提供高级 API, 和支持一般执行图的优化引擎. 它也有丰富的高级工具集, Spark SQL 进行结构化数据的处理, MLib 处理机器学习, GraphX 进行图处理, 以及 Spark Streaming 流计算.
组成
它的主要组件有:
SparkCore
将分布式数据抽象为弹性分布式数据集(RDD), 实现了应用任务调度, RPC, 序列化和压缩, 并为运行在其上的上层组件提供 API.
SparkSQL
Spark Sql 是 Spark 来操作结构化数据的程序包, 可以让我使用 SQL 语句的方式来查询数据, Spark 支持 多种数据源, 包含 Hive 表, parquest 以及 JSON 等内容.
SparkStreaming
是 Spark 提供的实时数据进行流式计算的组件.
MLlib
提供常用机器学习算法的实现库.
GraphX
提供一个分布式图计算框架, 能高效进行图计算.
BlinkDB
用于在海量数据上进行交互式 SQL 的近似查询引擎.
Tachyon
以内存为中心高容错的的分布式文件系统.
返回一个包含数据集前 n 个元素的数组
二, WordCount 程序讲解
编写代码
scala 程序编写
- object WordCountDemo {
- def main(args: Array[String]): Unit = {
- // 创建 Spark 配置对象
- val conf = new SparkConf().setMaster("local").setAppName("MyApp")
- // 通过 conf 创建 sc
- val sc = new SparkContext(conf)
- // 读取文件
- val rdd1 = sc.textFile("/Users/README.md")
- // 计算
- val rdd2 = rdd1.flatMap(line => line.split(" ")).map(Word => (Word,1)).reduceByKey(_+_)
- // 打印
- rdd2.take(10).foreach(println)
- }
- }
java 程序编写
- public class WordCountJavaDemo {
- public static void main(String[] args) {
- SparkConf conf = new SparkConf();
- conf.setAppName("myapp").setMaster("local");
- JavaSparkContext sc = new JavaSparkContext(conf);
- JavaRDD<String> rdd1 = sc.textFile("/Users/README.md");
- JavaRDD<String> rdd2 = rdd1.flatMap(new FlatMapFunction<String, String>() {
- @Override
- public Iterator<String> call(String s) throws Exception {
- List<String> list = new ArrayList<>();
- String[] arr = s.split(" ");
- for (String ss : arr) {
- list.add(ss);
- }
- return list.iterator();
- }
- });
- JavaPairRDD<String, Integer> rdd3 = rdd2.mapToPair(new PairFunction<String, String, Integer>() {
- @Override
- public Tuple2<String, Integer> call(String s) throws Exception {
- return new Tuple2<String, Integer>(s, 1);
- }
- });
- JavaPairRDD<String, Integer> rdd4 = rdd3.reduceByKey(new Function2<Integer, Integer, Integer>() {
- @Override
- public Integer call(Integer v1, Integer v2) throws Exception {
- return v1 + v2;
- }
- });
- List<Tuple2<String, Integer>> list = rdd4.collect();
- for (Tuple2<String, Integer> t : list) {
- System.out.println(t._1() + " " + t._2());
- }
- }
- }
三, 原理介绍
RDD
由一系列 Partition 组成
RDD 之间有一系列依赖关系
RDD 每个算子实际上是作用在每个 Partition 上
RDD 会提供一系列最佳位置
分区器是作用在 KV 格式的 RDD 上
RDD 会在多个节点上存储, 就和 hdfs 的分布式道理是一样的. hdfs 文件被切分为多个 block 存储在各个节点上, 而 RDD 是被切分为多个 partition. 不同的 partition 可能在不同的节点上.
Spark 执行流程
1,Driver
分发 task, 在分发之前, 会调用 RDD 的方法, 获取 partition 的位置.
将 task 的计算结果, 拉回到 Driver 端
Driver 是一个 JVM 进程
2,Worker
宽依赖, 窄依赖
图中 stage2 的并行度是 4, 也就是有 4 个 task.
宽依赖
父 RDD 与子 RDD,partition 的关系是一对多, 就是宽依赖. 宽依赖于 shuffle 对应.
窄依赖
父 RDD 与子 RDD,partition 的关系是一对一或多对一, 就是窄依赖.
四, Spark 常用算子
Transformation 算子
特点: 懒执行
(1)map
map 的输入变换函数应用于 RDD 中所有元素
(2)flatMap
flatMap 与 map 区别在于 map 为 "映射", 而 flatMap"先映射, 后扁平化",map 对每一次 (func) 都产生一个元素, 返回一个对象, 而 flatMap 多一步就是将所有对象合并为一个对象.
(3)flatMapValues
每个元素的 Value 被输入函数映射为一系列的值, 然后这些值再与原 RDD 中的 Key 组成一系列新的 KV 对.
代码
- x = sc.parallelize([("a", ["x", "y", "z"]), ("b", ["p", "r"])])
- def f(x): return x
- x.flatMapValues(f).collect()
打印结果
- [('a', 'x'), ('a', 'y'), ('a', 'z'), ('b', 'p'), ('b', 'r')]
- filter
过滤操作, 满足 filter 内 function 函数为 true 的 RDD 内所有元素组成一个新的数据集.
(4)groupByKey
主要作用是将相同的所有的键值对分组到一个集合序列当中, 其顺序是不确定的.
(5)reduceByKey
与 groupByKey 类似, 却有不同. 如(a,1), (a,2), (b,1), (b,2).groupByKey 产生中间结果为( (a,1), (a,2) ), ( (b,1), (b,2) ). 而 reduceByKey 为(a,3), (b,3).
reduceByKey 主要作用是聚合, groupByKey 主要作用是分组.
(6)take
Action 算子
特点: 立即触发执行
五, SparkSQL
介绍
Spark SQL is a Spark module for structured data processing. Unlike the basic Spark RDD API, the interfaces provided by Spark SQL provide Spark with more information about the structure of both the data and the computation being performed. Internally, Spark SQL uses this extra information to perform extra optimizations. There are several ways to interact with Spark SQL including SQL and the Dataset API. When computing a result the same execution engine is used, independent of which API/language you are using to express the computation. This unification means that developers can easily switch back and forth between different APIs based on which provides the most natural way to express a given transformation.
SparkSQL 是 Spark 的一个用来处理结构化数据的模块. 使用类似 SQL 的方式访问 Hadoop, 实现 MR 计算.
Datasets 的概念
A Dataset is a distributed collection of data. Dataset is a new interface added in Spark 1.6 that provides the benefits of RDDs (strong typing, ability to use powerful lambda functions) with the benefits of Spark SQL's optimized execution engine. A Dataset can be constructed from JVM objects and then manipulated using functional transformations (map, flatMap, filter, etc.). The Dataset API is available in Scala and Java. Python does not have the support for the Dataset API. But due to Python's dynamic nature, many of the benefits of the Dataset API are already available (i.e. you can access the field of a row by name naturally row.columnName). The case for R is similar.
Dataset 是分布式数据集合.
DataFrames 概念
A DataFrame is a Dataset organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in R/Python, but with richer optimizations under the hood. DataFrames can be constructed from a wide array of sources such as: structured data files, tables in Hive, external databases, or existing RDDs. The DataFrame API is available in Scala, Java, Python, and R. In Scala and Java, a DataFrame is represented by a Dataset of Rows. In the Scala API, DataFrame is simply a type alias of Dataset[Row]. While, in Java API, users need to use Datasetto represent a DataFrame.
基本使用
(1)创建 DataFrames
数据
- {
- "id":"1","name":"zhangsan","age":"12"
- }
- {
- "id":"2","name":"lisi","age":"12"
- }
- {
- "id":"3","name":"wangwu","age":"12"
- }
代码
- object SparkSqlDemo {
- def main(args: Array[String]): Unit = {
- // 创建 Spark 配置对象
- val conf = new SparkConf().setMaster("local[4]").setAppName("MyApp");
- val spark = SparkSession
- .builder()
- .appName("Spark SQL basic example")
- .config(conf)
- .getOrCreate()
- val df = spark.read.JSON("/Users/opensource/dev-problem/source/people_sample_json.json");
- df.show()
- }
- }
(2)查询
- val df = spark.read.JSON("/Users/fangzhijie/opensource/dev-problem/source/people_sample_json.json");
- df.createOrReplaceTempView("people")
- val sqlDF = spark.sql("SELECT * FROM people WHERE name ='zhangsan'")
- sqlDF.show()
六, SparkStreaming
介绍
Spark Streaming is an extension of the core Spark API that enables scalable, high-throughput, fault-tolerant stream processing of live data streams. Data can be ingested from many sources like Kafka, Flume, Kinesis, or TCP sockets, and can be processed using complex algorithms expressed with high-level functions like map, reduce, join and Windows. Finally, processed data can be pushed out to filesystems, databases, and live dashboards. In fact, you can apply Spark's machine learning and graph processing algorithms on data streams.
基本使用
(1)简单使用
- object SparkStreamingDemo {
- def main(args: Array[String]): Unit = {
- val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
- // 创建 Spark 流上下文
- val ssc = new StreamingContext(conf, Seconds(1))
- // 创建 Socket 文本流
- val lines = ssc.socketTextStream("localhost", 9999)
- val words = lines.flatMap(_.split(" "))
- val pairs = words.map(Word => (Word, 1))
- val wordCounts = pairs.reduceByKey(_ + _)
- // Print the first ten elements of each RDD generated in this DStream to the console
- wordCounts.print()
- // 启动
- ssc.start()
- // 等待结束
- ssc.awaitTermination() // Wait for the computation to terminate
- }
- }
使用 shell 命令监听端口, 输入待计算内容
$ nc -lk 9999
原理
SparkStreaming 的编程抽象是离散化流(DStream), 它是一个 RDD 序列, 每个 RDD 代表数据流中一个时间片内的数据.
参考文档
Spark Quick Start
Spark32 个常用算子总结
SparkSQL Guide
SparkSQL 官方文档
《Spark 快速大数据分析》
SparkStream 官方文档
来源: https://www.cnblogs.com/fonxian/p/11887518.html