Apache Spark https://jq.qq.com/?_wv=1027&k=5MhFTVi 是一个围绕速度, 易用性和复杂分析构建的大数据处理框架, 最初在 2009 年由加州大学伯克利分校的 AMPLab 开发, 并于 2010 年成为 Apache 的开源项目之一, 与 Hadoop 和 Storm 等其他大数据和 MapReduce 技术相比, Spark 有如下优势:
Spark https://jq.qq.com/?_wv=1027&k=5MhFTVi 提供了一个全面, 统一的框架用于管理各种有着不同性质 (文本数据, 图表数据等) 的数据集和数据源 (批量数据或实时的流数据) 的大数据处理的需求
官方资料介绍 Spark 可以将 Hadoop 集群中的应用在内存中的运行速度提升 100 倍, 甚至能够将应用在磁盘上的运行速度提升 10 倍
目标:
架构及生态
spark 与 hadoop
运行流程及特点
常用术语
standalone 模式
yarn 集群
RDD 运行流程
架构及生态:
通常当需要处理的数据量超过了单机尺度 (比如我们的计算机有 4GB 的内存, 而我们需要处理 100GB 以上的数据) 这时我们可以选择 spark 集群进行计算, 有时我们可能需要处理的数据量并不大, 但是计算很复杂, 需要大量的时间, 这时我们也可以选择利用 spark 集群强大的计算资源, 并行化地计算, 其架构示意图如下:
Spark Core: 包含 Spark 的基本功能; 尤其是定义 RDD 的 API, 操作以及这两者上的动作. 其他 Spark 的库都是构建在 RDD 和 Spark Core 之上的
Spark SQL: 提供通过 Apache Hive 的 SQL 变体 Hive 查询语言 (HiveQL) 与 Spark 进行交互的 API. 每个数据库表被当做一个 RDD,Spark SQL 查询被转换为 Spark 操作.
Spark Streaming: 对实时数据流进行处理和控制. Spark Streaming 允许程序能够像普通 RDD 一样处理实时数据
MLlib: 一个常用机器学习算法库, 算法被实现为对 RDD 的 Spark 操作. 这个库包含可扩展的学习算法, 比如分类, 回归等需要对大量数据集进行迭代的操作.
GraphX: 控制图, 并行图操作和计算的一组算法和工具的集合. GraphX 扩展了 RDD API, 包含控制图, 创建子图, 访问路径上所有顶点的操作
Spark 架构的组成图如下:
Cluster Manager: 在 standalone 模式中即为 Master 主节点, 控制整个集群, 监控 worker. 在 YARN 模式中为资源管理器
Worker 节点: 从节点, 负责控制计算节点, 启动 Executor 或者 Driver.
Driver: 运行 Application 的 main()函数
Executor: 执行器, 是为某个 Application 运行在 worker node 上的一个进程
Spark 与 hadoop:
Hadoop 有两个核心模块, 分布式存储模块 HDFS 和分布式计算模块 Mapreduce
spark 本身并没有提供分布式文件系统, 因此 spark 的分析大多依赖于 Hadoop 的分布式文件系统 HDFS
Hadoop 的 Mapreduce 与 spark 都可以进行数据计算, 而相比于 Mapreduce,spark 的速度更快并且提供的功能更加丰富
关系图如下:
运行流程及特点:
spark 运行流程图如下:
构建 Spark Application 的运行环境, 启动 SparkContext
SparkContext 向资源管理器 (可以是 Standalone,Mesos,Yarn) 申请运行 Executor 资源, 并启动 StandaloneExecutorbackend,
Executor 向 SparkContext 申请 Task
SparkContext 将应用程序分发给 Executor
SparkContext 构建成 DAG 图, 将 DAG 图分解成 Stage, 将 Taskset 发送给 Task Scheduler, 最后由 Task Scheduler 将 Task 发送给 Executor 运行
Task 在 Executor 上运行, 运行完释放所有资源
Spark 运行特点:
每个 Application 获取专属的 executor 进程, 该进程在 Application 期间一直驻留, 并以多线程方式运行 Task. 这种 Application 隔离机制是有优势的, 无论是从调度角度看(每个 Driver 调度他自己的任务), 还是从运行角度看(来自不同 Application 的 Task 运行在不同 JVM 中), 当然这样意味着 Spark Application 不能跨应用程序共享数据, 除非将数据写入外部存储系统
Spark 与资源管理器无关, 只要能够获取 executor 进程, 并能保持相互通信就可以了
提交 SparkContext 的 Client 应该靠近 Worker 节点(运行 Executor 的节点), 最好是在同一个 Rack 里, 因为 Spark Application 运行过程中 SparkContext 和 Executor 之间有大量的信息交换
Task 采用了数据本地性和推测执行的优化机制
常用术语:
Application:Appliction 都是指用户编写的 Spark 应用程序, 其中包括一个 Driver 功能的代码和分布在集群中多个节点上运行的 Executor 代码
**Driver: Spark 中的 Driver 即运行上述 Application 的 main 函数并创建 SparkContext, 创建 SparkContext 的目的是为了准备 Spark 应用程序的运行环境, 在 Spark 中有 SparkContext 负责与 ClusterManager 通信, 进行资源申请, 任务的分配和监控等, 当 Executor 部分运行完毕后, Driver 同时负责将 SparkContext 关闭, 通常用 SparkContext 代表 Driver
Executor: 某个 Application 运行在 worker 节点上的一个进程, 该进程负责运行某些 Task, 并且负责将数据存到内存或磁盘上, 每个 Application 都有各自独立的一批 Executor, 在 Spark on Yarn 模式下, 其进程名称为 CoarseGrainedExecutor Backend. 一个 CoarseGrainedExecutor Backend 有且仅有一个 Executor 对象, 负责将 Task 包装成 taskRunner, 并从线程池中抽取一个空闲线程运行 Task, 这个每一个 oarseGrainedExecutor Backend 能并行运行 Task 的数量取决与分配给它的 CPU 个数
Cluter Manager: 指的是在集群上获取资源的外部服务. 目前有三种类型
Standalon : spark 原生的资源管理, 由 Master 负责资源的分配
Apache Mesos: 与 hadoop MR 兼容性良好的一种资源调度框架
Hadoop Yarn: 主要是指 Yarn 中的 ResourceManager
Worker: 集群中任何可以运行 Application 代码的节点, 在 Standalone 模式中指的是通过 slave 文件配置的 Worker 节点, 在 Spark on Yarn 模式下就是 NoteManager 节点
Task: 被送到某个 Executor 上的工作单元, 但 hadoopMR 中的 MapTask 和 ReduceTask 概念一样, 是运行 Application 的基本单位, 多个 Task 组成一个 Stage, 而 Task 的调度和管理等是由 TaskScheduler 负责
Job: 包含多个 Task 组成的并行计算, 往往由 Spark Action 触发生成, 一个 Application 中往往会产生多个 Job
Stage: 每个 Job 会被拆分成多组 Task, 作为一个 TaskSet, 其名称为 Stage,Stage 的划分和调度是有 DAGScheduler 来负责的, Stage 有非最终的 Stage(Shuffle Map Stage)和最终的 Stage(Result Stage)两种, Stage 的边界就是发生 shuffle 的地方
DAGScheduler: 根据 Job 构建基于 Stage 的 DAG(Directed Acyclic Graph 有向无环图), 并提交 Stage 给 TASkScheduler. 其划分 Stage 的依据是 RDD 之间的依赖的关系找出开销最小的调度方法, 如下图
image
TASKSedulter: 将 TaskSET 提交给 worker 运行, 每个 Executor 运行什么 Task 就是在此处分配的. TaskScheduler 维护所有 TaskSet, 当 Executor 向 Driver 发生心跳时, TaskScheduler 会根据资源剩余情况分配相应的 Task. 另外 TaskScheduler 还维护着所有 Task 的运行标签, 重试失败的 Task. 下图展示了 TaskScheduler 的作用
在不同运行模式中任务调度器具体为:
Spark on Standalone 模式为 TaskScheduler
YARN-Client 模式为 YarnClientClusterScheduler
YARN-Cluster 模式为 YarnClusterScheduler
将这些术语串起来的运行层次图如下:
Job = 多个 stage,Stage = 多个同种 task, Task 分为 ShuffleMapTask 和 ResultTask,Dependency 分为 ShuffleDependency 和 NarrowDependency
Spark 运行模式:
Spark 的运行模式多种多样, 灵活多变, 部署在单机上时, 既可以用本地模式运行, 也可以用伪分布模式运行, 而当以分布式集群的方式部署时, 也有众多的运行模式可供选择, 这取决于集群的实际情况, 底层的资源调度即可以依赖外部资源调度框架, 也可以使用 Spark 内建的 Standalone 模式.
对于外部资源调度框架的支持, 目前的实现包括相对稳定的 Mesos 模式, 以及 hadoop YARN 模式
本地模式: 常用于本地开发测试, 本地还分别 local 和 local cluster
standalone: 独立集群运行模式
Standalone 模式使用 Spark 自带的资源调度框架
采用 Master/Slaves 的典型架构, 选用 ZooKeeper 来实现 Master 的 HA
框架结构图如下:
该模式主要的节点有 Client 节点, Master 节点和 Worker 节点. 其中 Driver 既可以运行在 Master 节点上中, 也可以运行在本地 Client 端. 当用 spark-shell 交互式工具提交 Spark 的 Job 时, Driver 在 Master 节点上运行; 当使用 spark-submit 工具提交 Job 或者在 Eclips,IDEA 等开发平台上使用 "new SparkConf.setManager(" spark://master:7077 ")" 方式运行 Spark 任务时, Driver 是运行在本地 Client 端上的
运行过程如下图:
SparkContext 连接到 Master, 向 Master 注册并申请资源(CPU Core 和 Memory)
Master 根据 SparkContext 的资源申请要求和 Worker 心跳周期内报告的信息决定在哪个 Worker 上分配资源, 然后在该 Worker 上获取资源, 然后启动 StandaloneExecutorBackend;
StandaloneExecutorBackend 向 SparkContext 注册;
SparkContext 将 Applicaiton 代码发送给 StandaloneExecutorBackend; 并且 SparkContext 解析 Applicaiton 代码, 构建 DAG 图, 并提交给 DAG Scheduler 分解成 Stage(当碰到 Action 操作时, 就会催生 Job; 每个 Job 中含有 1 个或多个 Stage,Stage 一般在获取外部数据和 shuffle 之前产生), 然后以 Stage(或者称为 TaskSet)提交给 Task Scheduler,Task Scheduler 负责将 Task 分配到相应的 Worker, 最后提交给 StandaloneExecutorBackend 执行;
StandaloneExecutorBackend 会建立 Executor 线程池, 开始执行 Task, 并向 SparkContext 报告, 直至 Task 完成
所有 Task 完成后, SparkContext 向 Master 注销, 释放资源
yarn:
Spark on YARN 模式根据 Driver 在集群中的位置分为两种模式: 一种是 YARN-Client 模式, 另一种是 YARN-Cluster(或称为 YARN-Standalone 模式)
Yarn-Client 模式中, Driver 在客户端本地运行, 这种模式可以使得 Spark Application 和客户端进行交互, 因为 Driver 在客户端, 所以可以通过 webUI 访问 Driver 的状态, 默认是 http://hadoop1:4040/ 访问, 而 YARN 通过 http:// hadoop1:8088 访问
YARN-client 的工作流程步骤为:
Spark Yarn Client 向 YARN 的 ResourceManager 申请启动 Application Master. 同时在 SparkContent 初始化中将创建 DAGScheduler 和 TASKScheduler 等, 由于我们选择的是 Yarn-Client 模式, 程序会选择 YarnClientClusterScheduler 和 YarnClientSchedulerBackend
ResourceManager 收到请求后, 在集群中选择一个 NodeManager, 为该应用程序分配第一个 Container, 要求它在这个 Container 中启动应用程序的 ApplicationMaster, 与 YARN-Cluster 区别的是在该 ApplicationMaster 不运行 SparkContext, 只与 SparkContext 进行联系进行资源的分派
Client 中的 SparkContext 初始化完毕后, 与 ApplicationMaster 建立通讯, 向 ResourceManager 注册, 根据任务信息向 ResourceManager 申请资源(Container)
一旦 ApplicationMaster 申请到资源 (也就是 Container) 后, 便与对应的 NodeManager 通信, 要求它在获得的 Container 中启动 CoarseGrainedExecutorBackend,CoarseGrainedExecutorBackend 启动后会向 Client 中的 SparkContext 注册并申请 Task
client 中的 SparkContext 分配 Task 给 CoarseGrainedExecutorBackend 执行, CoarseGrainedExecutorBackend 运行 Task 并向 Driver 汇报运行的状态和进度, 以让 Client 随时掌握各个任务的运行状态, 从而可以在任务失败时重新启动任务
应用程序运行完成后, Client 的 SparkContext 向 ResourceManager 申请注销并关闭自己
Spark Cluster 模式:
在 YARN-Cluster 模式中, 当用户向 YARN 中提交一个应用程序后, YARN 将分两个阶段运行该应用程序:
第一个阶段是把 Spark 的 Driver 作为一个 ApplicationMaster 在 YARN 集群中先启动;
第二个阶段是由 ApplicationMaster 创建应用程序, 然后为它向 ResourceManager 申请资源, 并启动 Executor 来运行 Task, 同时监控它的整个运行过程, 直到运行完成
YARN-cluster 的工作流程分为以下几个步骤
Spark Yarn Client 向 YARN 中提交应用程序, 包括 ApplicationMaster 程序, 启动 ApplicationMaster 的命令, 需要在 Executor 中运行的程序等
ResourceManager 收到请求后, 在集群中选择一个 NodeManager, 为该应用程序分配第一个 Container, 要求它在这个 Container 中启动应用程序的 ApplicationMaster, 其中 ApplicationMaster 进行 SparkContext 等的初始化
ApplicationMaster 向 ResourceManager 注册, 这样用户可以直接通过 ResourceManage 查看应用程序的运行状态, 然后它将采用轮询的方式通过 RPC 协议为各个任务申请资源, 并监控它们的运行状态直到运行结束
一旦 ApplicationMaster 申请到资源 (也就是 Container) 后, 便与对应的 NodeManager 通信, 要求它在获得的 Container 中启动 CoarseGrainedExecutorBackend,CoarseGrainedExecutorBackend 启动后会向 ApplicationMaster 中的 SparkContext 注册并申请 Task. 这一点和 Standalone 模式一样, 只不过 SparkContext 在 Spark Application 中初始化时, 使用 CoarseGrainedSchedulerBackend 配合 YarnClusterScheduler 进行任务的调度, 其中 YarnClusterScheduler 只是对 TaskSchedulerImpl 的一个简单包装, 增加了对 Executor 的等待逻辑等
ApplicationMaster 中的 SparkContext 分配 Task 给 CoarseGrainedExecutorBackend 执行, CoarseGrainedExecutorBackend 运行 Task 并向 ApplicationMaster 汇报运行的状态和进度, 以让 ApplicationMaster 随时掌握各个任务的运行状态, 从而可以在任务失败时重新启动任务
应用程序运行完成后, ApplicationMaster 向 ResourceManager 申请注销并关闭自己
Spark Client 和 Spark Cluster 的区别:
理解 YARN-Client 和 YARN-Cluster 深层次的区别之前先清楚一个概念: Application Master. 在 YARN 中, 每个 Application 实例都有一个 ApplicationMaster 进程, 它是 Application 启动的第一个容器. 它负责和 ResourceManager 打交道并请求资源, 获取资源之后告诉 NodeManager 为其启动 Container. 从深层次的含义讲 YARN-Cluster 和 YARN-Client 模式的区别其实就是 ApplicationMaster 进程的区别
YARN-Cluster 模式下, Driver 运行在 AM(Application Master)中, 它负责向 YARN 申请资源, 并监督作业的运行状况. 当用户提交了作业之后, 就可以关掉 Client, 作业会继续在 YARN 上运行, 因而 YARN-Cluster 模式不适合运行交互类型的作业
YARN-Client 模式下, Application Master 仅仅向 YARN 请求 Executor,Client 会和请求的 Container 通信来调度他们工作, 也就是说 Client 不能离开
思考: 我们在使用 Spark 提交 job 时使用的哪种模式?
RDD 运行流程:
RDD 在 Spark 中运行大概分为以下三步:
创建 RDD 对象
DAGScheduler 模块介入运算, 计算 RDD 之间的依赖关系, RDD 之间的依赖关系就形成了 DAG
每一个 Job 被分为多个 Stage. 划分 Stage 的一个主要依据是当前计算因子的输入是否是确定的, 如果是则将其分在同一个 Stage, 避免多个 Stage 之间的消息传递开销
示例图如下:
以下面一个按 A-Z 首字母分类, 查找相同首字母下不同姓名总个数的例子来看一下 RDD 是如何运行起来的
创建 RDD 上面的例子除去最后一个 collect 是个动作, 不会创建 RDD 之外, 前面四个转换都会创建出新的 RDD . 因此第一步就是创建好所有 RDD( 内部的五项信息 )?
创建执行计划 Spark 会尽可能地管道化, 并基于是否要重新组织数据来划分 阶段 (stage) , 例如本例中的 groupBy() 转换就会将整个执行计划划分成两阶段执行. 最终会产生一个 DAG(directed acyclic graph , 有向无环图 ) 作为逻辑执行计划
调度任务 将各阶段划分成不同的 任务 (task) , 每个任务都是数据和计算的合体. 在进行下一阶段前, 当前阶段的所有任务都要执行完成. 因为下一阶段的第一个转换一定是重新组织数据的, 所以必须等当前阶段所有结果数据都计算出来了才能继续.
结语
感谢您的观看, 如有不足之处, 欢迎批评指正.
获取资料 https://jq.qq.com/?_wv=1027&k=5MhFTVi
来源: http://www.jianshu.com/p/912ff4f8b7e8