Spark 是一个快速的集群化的实时计算系统。支持 Java, Scala, Python 和 R 语言的高级 API。
一 Spark 生态:
1111.png
支持 Spark Sql 用于 sql 和结构化数据查询处理;支持 MLlib 用于机器学习;支持 GraphX 用于图形处理;支持 Spark Streaming 和 Structured Sql(spark2.1.1 版本发布) 用于实时计算。(其中,我们使用的 Spark 功能主要是 Spark Sql 和 Structured Sql。其中 Spark sql 用于查询模块,可以联合多个数据源进行查询。Structured Sql 用于流式数据处理。)
- 部署方式有:
- 1、本地运行模式:new SparkConf().setAppName("sparkName")
- .setMaster(config.getString("local[*]")))
- 2、Stanalone模式:
- 1)由master/slaves服务组成的
- 2)各个节点上的资源被抽象成粗粒度的slot,有多少slot就能同时运行多少task。
- 3)部署时通过spark-env.sh和slave配置文件进行配置,使用start-all.sh可以一键启动。
- 3、EC2模式:
- 部署于云端。
- 4、Spark on Mesos模式:
- 支持粗粒度模式和细粒度模式。
- 1)粗粒度模式:应用程序的各个任务正式运行之前,需要将运行环境中的资源全部申请好,且运行过 程中要一直占用这些资源,即使不用,最后程序运行结束后,回收这些资源。比如你提交应用程序时,指定使用5个executor运行你的应用程序,每个executor占用5GB内存和5个CPU,每个executor内部设置了5个slot,则Mesos需要先为executor分配资源并启动它们,之后开始调度任务。另外,在程序运行过程中,mesos的master和slave并不知道executor内部各个task的运行情况,executor直接将任务状态通过内部的通信机制汇报给Driver,从一定程度上可以认为,每个应用程序利用mesos搭建了一个虚拟集群自己使用。
- 2)细粒度模式:鉴于粗粒度模式会造成大量资源浪费,Spark On Mesos还提供了另外一种调度模式:细粒度模式,这种模式类似于现在的云计算,思想是按需分配。与粗粒度模式一样,应用程序启动时,先会启动executor,但每个executor占用资源仅仅是自己运行所需的资源,不需要考虑将来要运行的任务,之后,mesos会为每个executor动态分配资源,每分配一些,便可以运行一个新任务,单个Task运行完之后可以马上释放对应的资源。每个Task会汇报状态给Mesos slave和Mesos Master,便于更加细粒度管理和容错,这种调度模式类似于MapReduce调度模式,每个Task完全独立,优点是便于资源控制和隔离,但缺点也很明显,短作业运行延迟大。
- 5、Spark on yarn模式:
- 支持粗粒度模式,只要用yarn的resource manage进行调度管理。(目前选择的是该模式)
- (细粒度模式尚未实现 https://issues.apache.org/jira/browse/YARN-1197)
集成性:
Spark 可以很好的集成 HDFS,HBase,Elatatic Search,kudu 等存储系统, mysql 等关系性数据库和 json csv 等静态文件处理。
二、Spark 基本架构:
image.png
- )Cluster Manager:在standalone模式中即为Master主节点,控制整个集群,监控worker。在YARN模式中为资源管理器
- )Worker节点:从节点,负责控制计算节点,启动Executor或者Driver。
- )Driver: 运行Application 的main()函数
- )Executor:执行器,是为某个Application运行在worker node上的一个进程
三、运行流程
1)创建 Spark context
2)Spark context 向 Cluster manager 申请运行 Executor 资源,并启动 StandaloneExecutorbackend
3)Executor 向 SparkContext 申请 Task
4)SparkContext 将应用程序分发给 Executor
5)SparkContext 构建成 DAG 图,将 DAG 图分解成 Stage、将 Taskset 发送给 Task Scheduler,最后由 Task Scheduler 将 Task 发送给 Executor 运行
6)Task 在 Executor 上运行,运行完释放所有资源
四、Cluster 模式和 client 模式
yarn-cluster 模式下,driver 运行在 AM(Application Master) 中,它负责向 YARN 申请资源,并监督作业的运行状况。当用户提交了作业之后,就可以关掉 Client,作业会继续在 YARN 上运行。yarn-cluster 模式不适合运行交互类型的作业。
image2.png
Yarn-client 模式下,Application Master 仅仅向 YARN 请求 executor,client 会和请求的 container 通信来调度他们工作。
image3.png
五、Spark sql
Spark sql 应用于查询模块。
以 CSV 文件为例,前端查询为 select * from CSV.test
1)通过 CSV.test 查询数据库获取对应的 csv 文件存储路径 path。
2)spark 读取 path 对应的 hdfs 文件生成 dataset
3)dataset.createTempView() 生成临时表 testTable
4)spark 执行 sql,select * from testTable 并返回结果
六、Structured Streaming
Spark2.0 中提出一个概念,continuous applications(连续应用程序)。
Spark Streaming 等流式处理引擎致力于流式数据的运算,比如通过 map 运行一个方法来改变流中的每一条记录,通过 reduce 可以基于时间做数据聚合。但是,事实上很少有只在流式数据上做运算的需求,相对的,流式处理往往是一个大型应用的一部分。continuous applications 提出后,实时运算作为一部分,不同系统间的交互等也可以由 Structured Streaming 来处理。如下图,左侧为 Spark Streaming 类的流式引擎,交互是由使用者来处理;右侧为 Strctured Streaming 类的连续应用,交互由应用来处理。( https://databricks.com/blog/2016/07/28/continuous-applications-evolving-streaming-in-apache-spark-2-0.html )
image4.png
Structured Streaming 是一个建立在 Spark sql 引擎上的可扩展高容错的流式处理引擎。它使得可以像对静态数据进行批量处理一样来处理流式数据。Structured Streaming provides fast, scalable, fault-tolerant, end-to-end exactly-once stream processing without the user having to reason about streaming.
Structured Streaming 抽象了一个 DataSet 中无边界的表。structured streaming 将流数据看作是一张没有边界的表,流数据不断的向表尾增加数据
image5.png
在每一个周期(默认 1s),新的内容将会增加到表尾,查询的结果将会更新到结果表中。一旦结果表被更新,就需要将改变后的表内容输出到外部的 sink 中。
如 Kafka—etl—es 的过程,spark 每秒钟从 source—kafka 读取一批数据,写入无边界表中,通过 dataset 的 spark sql 操作进行 ETL 转换,更新 result 表,随着 result 表更新,变化的 result 行将被写入外部 sink—es。
source 类型:File source,Kafka source Socket source
sink 类型:File sink,Foreach sink,Console sink,Memory sink,其中 es sink 是由 Elastatic search 扩展的。
输出模式:
Complete mode: 不删除任何数据,在 Result Table 中保留所有数据,每次触发操作输出所有窗口数据;
Append mode: 当确定不会更新窗口时,将会输出该窗口的数据并删除,保证每个窗口的数据只会输出一次;
Updated mode: 删除不再更新的时间窗口,每次触发聚合操作时,输出更新的窗口。
聚合:输出模式必须是 Append 或 Updated。sink 为 es 时只能是 Append。
Event time:时间发生时间,来源于 source 数据中的时间列。
Watermark:数据过期时间。
来源: http://www.jianshu.com/p/f5afc04652f3