前言
Spark 自从 2014 年 1.2 版本发布以来, 已成为大数据计算的通用组件. 网上介绍 Spark 的资源也非常多, 但是不利于用户快速入门, 所以本文主要通从用户的角度来介绍 Spark, 让用户能快速的认识 Spark, 知道 Spark 是什么, 能做什么, 怎么去做.
Spark 是什么
摘用官网的定义:
Spark 是一个快速的, 通用的分布式计算系统.
提供了高级 API, 如: Java,Scala,Python 和 R.
同时也支持高级工具, 如: Spark SQL 处理结构化数据, MLib 处理机器学习, GraphX 用于图计算, Spark Streming 用于流数据处理.
也就是说 Spark 提供了灵活的, 丰富接口的大数据处理能力. 下图是 Spark 的模块图:
用户使用的 SQL,Streaming,MLib,GraphX 接口最终都会转换成 Spark Core 分布式运行.
目前用户用的比较多的是 SQL 和 Streaming, 这里先主要介绍下这两个.
Spark SQL
Spark SQL 是 Spark 提供的 SQL 接口, 用户使用 Spark SQL 可以像使用传统数据库一样使用 SQL. 例如: 创建表, 删除表, 查询表, join 表等. 连接到 Spark SQL 后可以做如下操作
此命令运行完毕后, Spark 系统会在 hdfs 上创建一个名称为 test_parquet 的目录, 例如 / user/hive/warehouse/test_parquet/.
然后往 Spark 表中插入数据.
插入数据的步骤运行后, Spark 会在 hdfs 目录:/user/hive/warehouse/test_parquet / 中创建一些列后缀为. parquet 的文件, 如下:
插入完成后开始查询数据.
查询数据的过程是 Spark 并行从 hdfs 系统上拉取每个 Parquet, 然后在 Spark 中并行计算.
这里只是简单列举了 Spark 创建 Parquet 表的过程, Spark 也可以支持读取其他格式的表, 例如对接数据库等.
Spark SQL 除了对用户提供了 SQL 命令的接口, 也提供了 API 接口. Datasets(DataFrames), 例如使用 API 创建 Parquet 如下:
Spark Streaming
Spark Streaming 是流式处理系统, 可以处理流式数据. 下面用个例子说明 Streaming 的过程.
Spark Streaming 可以对接 Kafka. 假如 kafka 产生的数据格式为:
现在业务需要每分钟从 Kafka 读取一批数据, 对数进行信息补齐, 因为 kafka 拿到的数据只有 id 信息, 用户想补齐 name 信息.
假如具有 id,name 信息的表存储在 Phoenix 中.
这样就可以通过 Spark Streaming 来完成这些业务诉求. 在 Spark 的业务处理逻辑中拿到 kafka 的数据后, 使用 id 关联 Phoenix 表拿到 name 信息, 然再写入到其他数据库.
例如此业务的 Spark Streaming 的业务逻辑代码如下:
先看下 Spark 在当前常用的 BigData 业务架构中的位置.
下图是常用的 BigData 大数据组件 Spark+HBase+Cassandra+ES(Solor), 这些组件组合可覆盖 BigData 95% 以上的业务场景.
图中数据 BigData 分 4 个层次, 由上到下分别为:
业务系统层: 一般是直接面向用户的业务系统.
计算层: Spark 的分布式计算.
数据库层: HBase+Cassandra 数据库提供实时查询的能力.
存储层: HDFS 或者 OSS.
这里主要介绍下计算层 Spark.
Spark 计算层会把数据从数据库, 列式存储 (数仓) 中拉去到 Spark 中进行分布式计算. 我们把 Spark 打开看下是如何分布式计算的. 先看下 Spark 运行时候的部署结构.
由上图可以看到 Spark 部署时分布式的, 有一个 Driver, 有 N 个 Executor. 业务系统对接 Driver,Driver 把计算逻辑发送到每个 Executor 运行, Executor 运行结果再返回.
所以当 Spark 拉取数据库, 数仓数据时会并行拉取到每个 Executor 做并行运算.
例如 Spark SQL 中查询表的例子, 以及 Spark Streming 的中处理批数据的例子, Spark 运算时是每个 Executor 并处理数据的, Executor 处理数据的逻辑是由用户编码控制的, 例如用户写的 SQL 语句, 调用 API 写的业务代码等.
那么 Spark 适合什么样的计算呢?
下图列出了 Spark 和 HBase 数据库各自适用的场景:
举个例子说明下上面每一项对应的场景, 如下图:
此图描述的是用户登录手机淘宝, 淘宝根据用户的 ID 信息在淘宝首页推荐商品这样的一个流程. 我们看下每个流程中哪些场景适合 Phoenix,Spark.
1, 获取用户的推荐列表.
用户登录后, 手机淘宝要根据用户的 ID 从 "用户推荐商品列表 user_reco_list" 这个表中获取信息. SQL 语句可能是这样的:
这个 SQL 的特点如下:
简单: 只有 select *, 没有 join,group by.
有关键字过滤: user_id = 'user0001'.
返回的结果集少(大概返回几十行).
user_reco_list 表数据量很大(百亿级别)
并发量很大, 可能同时会有上万个用户同时登录.
低时延: 用户一登录要立刻显示推荐.
类似这种特点的业务查询就适合使用在线数据 Phoenix.
2, 统计用户的浏览记录.
"用户推荐商品列表 user_reco_list" 中数据是怎么来的呢? 是从用户的浏览记录, 购买记录, 加入购物车记录等信息统计而来的. 后台任务每天凌晨从用户的记录中进行大量的统计分析, 然后把结果写入 "用户推荐商品列表 user_reco_list".SQL 语句可能是这的:
这个 SQL 的特点如下:
统计分析: 有 sum,group by.
查询时间范围大: times 的时间范围要半年, 即扫描的数据量大.
返回的结果集大, 可能返回百万级.
user_scan_list 表数据量很大, 百亿级别, 千亿级等.
并发量小, 每天凌晨计算一次.
高时延: 计算结果可能要分钟级别, 甚至小时级别.
类似这种特点的业务查询就适合使用离线数仓: Spark 列存(Parquet).
通过上面的例子大概可以认识到哪些场景适合 Spark, 哪些适合 Phoenix.Spark 和 Phoenix 相互配合, 解决大数据的问题.
Spark 如何建数仓
那 Spark 如何建数仓呢? 本质就是把数据导入到 Spark, 使用 Spark 的列式存储文件格式 (例如 parquet) 存储数据, 使用 Spark 完成批处理, 离线分析的业务.
例如在 Spark 创建一个以天为分区的明细表:
上面只是个简单的实例, 下面举例几个实际的业务场景.
先看下面的一个典型的业务场景.
上图是一个典型的复杂分析及查询系统. 数据流程由图可见:
数据由 App, 传感器, 商业系统等客户的业务系统产生发送到 Kafka 系统.
Spark Streming 对接 kafka 周期读取数据入库到在线数据库 HBase/Phoenix, 用户的运营系统实时查询在线数据库.
HBase/Phoenix 数据库周期同步到 Spark 数仓做离线计算. 离线计算的结果写回 HBase/Phoenix 或者其他业务数据库.
上面是一个常用的方案. Spark 创建数仓也有客户对数仓进行分层, 例如下图:
来源: http://www.jianshu.com/p/a33eb81de94b