概述
X-Pack Spark 分析引擎是基于 Spark 提供的复杂分析, 流式处理, 机器学习的能力. Spark 分析引擎可以对接阿里云的多种数据源, 例如: 云 HBase 数据库, MongoDB,Phoenix 等, 同时也支持对接阿里云日志服务 LogHub. 阿里云日志服务 (Log Service, 简称 LOG) 是针对实时日志数据的一站式服务, 提供日志类数据采集, 消费, 投递及查询分析功能, 全面提升海量日志处理和分析能力.
场景介绍
某一款销售平台的 App, 针对用户在 App 中打开首页, 搜索, 商品详细页以及最终下订单购买商品等操作, 操作所产生的事件均记录到阿里云日志系统中. 现需要对 App 的用户的行为数据做一些统计分析, 每天, 每周出详细的运营数据, 以及给用户提供在线查询账单等.
如何实现
通过阿里云的日志服务 + X-Pack Spark + 云 HBase 完成这些诉求. 先看下整理的数据流图:
由上图可见数据流程为: 用户通过 LogHub 对接 App 的日志数 ->Spark Streming 对接 LogHub 同步数到 HBase(Phoenix)->在线数据同步到 Spark 离线数仓 ->离线数仓批量计算输出运营数据等.
App 日中包含用户的使用 App 所产生的事件信息, 下面以一个简单的例子说明下每一个步骤的实现.
LogHub 对接 App 日志
假设 App 的日志产生在某机器的目录文件中, 通过 LogHub 可以对接机器的文件, 读取解析日志. 假设日志的字段信息如下:
- event_time: long #事件产生的时间戳
- user_id: string #用户 ID, 唯一值.
- device_id: String #设备 id,App 使用的设备.
- event_name: String #事件名称, 如: 首页, 搜索, 明细页, 购买
- prod_id: String #商品 ID.
- stay_times: int #停留时间.
上述信息在 App 的日志中使用逗号分隔符, 所以在 LogHub 配置指定采集模式时选择逗号分隔.
SparkStreaming 对接 App
SparkStreaming 对接 App 可以使用 X-Pack Connectors 中对接 LogHub 的插件(可参考: Spark 对接 LogHub 快速入门).SparkStreaming 对接 LogHub 可以设置每个 1 分钟同步一次数据到 Phoenix.
同步数据之前需要在 Phoenix 中创建一张表, 如下:
- CREATE TABLE IF NOT EXISTS user_event (
- event_time BIGINT NOT NULL,
- user_id VARCHAR NOT NULL,
- device_id VARCHAR,
- event_name VARCHAR,
- prod_id VARCHAR
- CONSTRAINT my_pk PRIMARY KEY (event_time, user_id)
- );
Phoenix 表 user_event 使用 user_event 和 user_id 作为组合主键, 主要是为了使用 user_id 进行运营明细查询, 时间信息方便按照时间范围同步数据到 Spark.
SparkStreaming 同步 LogHub 数据到 Phoenix 的代码主要逻辑如下:
- val loghubStream = LoghubUtils.createStream(
- ssc,
- loghubProject,
- logStore,
- loghubGroupName,
- endpoint,
- numReceiver,
- accessKeyId,
- accessKeySecret,
- StorageLevel.MEMORY_AND_DISK)
- loghubStream.foreachRDD { rdd =>
- rdd.foreachPartition { pt =>
- // 获取 Phoenix 的链接
- val phoenixConn = DriverManager.getConnection("jdbc:phoenix:" + zkAddress)
- val statment = phoenixConn.createStatement()
- var i = 0
- while (pt.hasNext) {
- val value = pt.next()
- // 获取的 LogHub 的数据是 JSON 格式的, 需要进行转换
- val valueFormatted = JSON.parseObject(new String(value))
- // 构造 phonenix 插入语句
- val insetSql = s"upsert into $phoenixTableName values(" +
- s"${valueFormatted.getLong("event_time")}," +
- s"'${valueFormatted.getString("user_id").trim}'," +
- s"'${valueFormatted.getString("device_id").trim}'," +
- s"'${valueFormatted.getString("event_name").trim}'," +
- s"'${valueFormatted.getString("prod_id").trim}')"
- statment.execute(insetSql)
- i = i + 1
- // 每隔 batchSize 行提交一次 commit 到 Phoenix.
- if (i % batchSize == 0) {
- phoenixConn.commit()
- println(s"====finish upsert $i rows====")
- }
- }
- phoenixConn.commit()
- println(s"==last==finish upsert $i rows====")
- phoenixConn.close()
- }
- }
SparkStreaming 同步数据到 Phoenix 后, 可以对 Phoenix 数据库进行用户明细查询. 例如:
- # 查询用户 user_id_1006 所有浏览明细.
- select * from user_event where user_id = 'user_id_1006';
同步到 Spark 离线数仓
Phoenix 在线数据库适合明细查询, 如果需要进行统计, 离线计算需要用到 Spark 数仓. Phoenix 同步数据到 Spark 数仓实质就是在 Spark 上创建表, 然后把数据同步一份到 Spark 表中.
归档数据到 Spark 请参考: 批量归档. 本文用 Sql 表示下同步的逻辑, 这里假设数据每天同步一次到 Spark .
Spark 中建表, 同步的方法如下:
- # 在 Spark 中创建 Parquet 格式表: user_event_parquet, 使用 dt 作为分区字段.
- create table user_event_parquet(
- event_time long,
- user_id string,
- device_id string,
- event_name string,
- prod_id string,
- dt string
- ) using parquet
- partitioned by(dt);
- # 在 Spark 中创建表 user_event_phoenix 映射 Phoenix 数据库的表.
- CREATE TABLE user_event_phoenix USING org.apache.phoenix.spark
- OPTIONS (
- 'zkUrl' 'hb-xx-master3-001.hbase.rds.aliyuncs.com:2181,hb-xx-master1-001.hbase.rds.aliyuncs.com:2181,hb-xx-master2-001.hbase.rds.aliyuncs.com:2181',
- 'table' 'user_event'
- );
- # 向 Parquet 表: user_event_parquet 插入一天: 2019-01-01 的数据
- insert into user_event_parquet select EVENT_TIME,USER_ID,DEVICE_ID,EVENT_NAME,PROD_ID,'2019-01-01' from user_event_phoenix where EVENT_TIME>=1546272000 and EVENT_TIME < 1546358400
离线数仓批量计算
数据同步到 Spark 可以对 Spark 数据做统计分析预算, 例如:
- # 统计每天的访问数
- select dt, count(*) from user_event_parquet group by dt
- # 统计前十的访问
- select dt, count(*) total from user_event_parquet group by dt order by total desc limit 10
- # 统计前 100 个用户的访问数
- select dt,user_id, count(*) total from user_event_parquet group by dt,user_id order by total desc limit 100
计算的结果可以回写到业务数据库, 供业务查询, 出报表等.
小结
本文简单介绍了 Spark 如何对接 LogHub 以及如何同步数据等常用的操作. 参考链接如下:
Spark 对接 LogHub 快速指导参考: Spark 对接 LogHub 快速入门.
本例的代码可以参考: SparkOnLogHubDemo.
X-Pack Spark 介绍参考: X-Pack Spark 基本介绍.
来源: https://yq.aliyun.com/articles/706379