云 hbase+spark 2019-05-23 22:57:36 浏览 926 评论 1
Redis
实时系统
数据处理
- spark
- aliyun
- stream
数据结构
jedis
存储
流处理
流数据
StructuredStreaming
摘要: 实时统计广告点击数: Spark StructuredStreaming + Redis Stream 业务场景介绍 某广告公司在网页上投递动态图片广告, 广告的展现形式是根据热点图片动态生成的. 为了收入的最大化, 需要统计每个广告的点击数来决定哪些广告可以投放的更长时间, 哪些需要及时更换.
实时统计广告点击数: Spark StructuredStreaming + Redis Stream
业务场景介绍
某广告公司在网页上投递动态图片广告, 广告的展现形式是根据热点图片动态生成的. 为了收入的最大化, 需要统计每个广告的点击数来决定哪些广告可以投放的更长时间, 哪些需要及时更换. 大部分的广告生命周期很短, 实时获取广告的点击数可以让我们快速确定哪些广告对业务是关键的. 所以我们理想的解决方案是有流处理数据的能力, 可以统计所有广告的点击量以及统计实时的点击量.
业务数据流
来看下我们业务数据链路.
广告点击数据通过手机或者电脑的网页传递到 "数据提取", 提取后的数据经过 "数据处理" 计算实时的点击数, 最后存储到数据库, 使用 "数据查询" 用于统计分析, 统计每个广告的点击总数.
根据我们的数据特点, 整个数据链路的数据输入输出如下:
输入
针对每个点击事件我们使用 asset id 以及 cost 两个字段来表示一个广告信息, 例如:
- asset [asset id] cost [actual cost]
- asset aksh1hf98qwdst9q7 cost 39
- asset aksh1hf98qwdst9q8 cost 19
输出
经过上图中步骤 2: 数据处理后, 我们把结果集存储到一个数据表中, 数据表可以用于上图步骤 3 使用 Sql 查询, 例如:
- select asset, count from clicks order by count desc
- asset count
- ----------------- -----
- aksh1hf98qwdst9q7 2392
- aksh1hf98qwdst9q8 2010
- aksh1hf98qwdst9q6 1938
解决方案
基于以上诉求选择 StructuredStreaming + Redis Stream 作为解决方案. 先介绍下方案中涉及到的组件.
Spark StructuredStreaming 是 Spark 在 2.0 后推出的基于 Spark SQL 上的一种实时处理流数据的框架. 处理时延可达毫秒级别.
Redis Stream 是在 Redis 5.0 后引入的一种新的数据结构, 可高速收集, 存储和分布式处理数据, 处理时延可达亚毫秒级别.
Spark-Redis 连接器提供了 Spark 对接 Redis 的桥梁. 通过 Spark-Redis 连接器, StructuredStreaming 可以使用 Redis Stream 作为数据源, 经过 Spark 处理后数据再写回 Redis.
数据处理流
现在让我们看下如何使用 StructuredStreaming + Redis Stream
通过上图可以看到点击数据首先存储到 Redis Stream, 然后通过 StructuredStreaming 消费数据, 处理聚合数据, 再把处理的结果入库到 Redis, 最后通过 Spark Sql 查询 Redis 进行统计分析. 下面分别看下每个步骤:
数据提取:
Redis Stream 是 Redis 内置的数据结构, 具备每秒百万级别的读写能力, 另外存储的数据可以根据时间自动排序. Spark-Redis 连接器支持使用 Redis Stream 作为数据源, 非常适用这个场景, 把 Redis Stream 数据对接到 Spark 引擎.
数据处理:
Spark 的 StructuredStreaming 非常适合此场景的数据处理部分, Spark-Redis 连接器可以获取 Redis Stream 的数据转换成 Spark 的 DataFrames. 在 StructuredStreaming 处理流数据的过程中, 可以对微批次数据或者整体数据进行查询. 数据的处理结果可以通过自定义的 "writer" 输出到不同的目的地, 本场景中我们直接把数据输出到 Redis 的 Hash 数据结构.
数据查询:
Spark-Redis 连接器可以把 Redis 的数据结构映射成 Spark 的 DataFrames, 然后我们把 DataFrames 创建成一个临时表, 表的字段映射 Redis 的 Hash 数据结构. 借助 Redis 的亚毫米级的延迟, 使用 Spark-SQL 进行实时的数据查询.
开发步骤
通过下面实例介绍下开发的步骤
Redis Stream 存储数据
Redis Streams 是一个 append-only 的数据结构. 部署 Redis Streams 后使用 Redis-cli 向 Redis 发送数据.
Redis-cli 使用方法可参考 Redis-cli 连接. 下面的命令是 Redis 向 Stream clicks 发送数据.
XADD clicks MAXLEN ~ 1000000 * asset aksh1hf98qw7tt9q7 cost 29
数据处理
在 StructuredStreaming 中把数据处理步骤分成 3 个子步骤.
从 Redis Stream 读取, 处理数据.
存储数据到 Redis.
运行 StructuredStreaming 程序.
从 Redis Stream 读取, 处理数据
在 Spark 中读取 Redis Stream 数据需要确定如何去连接 Redis, 以及 Redis Stream 的 schema 信息. 这里使用 Spark-Redis 连接器, 需要创建一个 SparkSession 并带上 Redis 的连接信息.
- val spark = SparkSession
- .builder()
- .appName("StructuredStreaming on Redis")
- .config("spark.redis.host", redisHost)
- .config("spark.redis.port", redisPort)
- .config("spark.redis.auth", redisPassword)
- .getOrCreate()
在 Spark 中构建 schema, 我们给流数据命名为 "clicks", 并且需要设置参数 "stream.kes" 的值为 "clicks". 由于 Redis Stream 中的数据包含两个字段:"asset" 和 "cost", 所以我们要创建 StructType 映射这两个字段.
- val clicks = spark
- .readStream
- .format("redis")
- .option("stream.keys", redisTableName)
- .schema(StructType(Array(
- StructField("asset", StringType),
- StructField("cost", LongType)
- )))
- .load()
在这里统计下每个 asset 的点击次数, 可以创建一个 DataFrames 根据 asset 汇聚数据.
val bypass = clicks.groupBy("asset").count()
最后一个步骤启动 StructuredStreaming.
- val query = bypass
- .writeStream
- .outputMode("update")
- .foreach(clickWriter)
- .start()
存储数据到 Redis
我们通过自定义的 ClickForeachWriter 向 Redis 写数据. ClickForeachWriter 继承自 FroeachWriter, 使用 Redis 的 Java 客户端 Jedis 连接到 Redis.
- class ClickForeachWriter(redisHost: String, redisPort: String, redisPassword: String) extends ForeachWriter[Row] {
- var jedis: Jedis = _
- def connect() = {
- val shardInfo: JedisShardInfo = new JedisShardInfo(redisHost, redisPort.toInt)
- shardInfo.setPassword(redisPassword)
- jedis = new Jedis(shardInfo)
- }
- override def open(partitionId: Long, version: Long): Boolean = {
- true
- }
- override def process(value: Row): Unit = {
- val asset = value.getString(0)
- val count = value.getLong(1)
- if (jedis == null) {
- connect()
- }
- jedis.hset("click:" + asset, "asset", asset)
- jedis.hset("click:" + asset, "count", count.toString)
- jedis.expire("click:" + asset, 300)
- }
- override def close(errorOrNull: Throwable): Unit = {}
- }
运行 StructuredStreaming 程序.
程序完成打包后, 可以通过 Spark 控制台提交任务, 运行 Spark StructuredStreaming 任务.
- --class com.aliyun.spark.Redis.StructuredStremingWithRedisStream
- --jars /spark_on_redis/ali-spark-Redis-2.3.1-SNAPSHOT_2.3.2-1.0-SNAPSHOT.jar,/spark_on_redis/commons-pool2-2.0.jar,/spark_on_redis/jedis-3.0.0-20181113.105826-9.jar
- --driver-memory 1G
- --driver-cores 1
- --executor-cores 1
- --executor-memory 2G
- --num-executors 1
- --name spark_on_polardb
- /spark_on_redis/structuredstreaming-0.0.1-SNAPSHOT.jar
- xxx1 6379 xxx2 clicks
参数说明:
xxx1: Redis 的内网连接地址 (host).
6379:Redis 的端口号 (port).
xxx2: Redis 的登陆密码.
clicks: Redis 的 Stream 名称
数据查询
数据查询使用 Spark-SQL 创建表读取 Redis Hash 数据库. 这里使用 Spark 控制台的 "交互式查询", 输入如下语句:
- CREATE TABLE IF NOT EXISTS clicks(asset STRING, count INT)
- USING org.apache.spark.sql.Redis
- OPTIONS (
- 'host' 'xxx1',
- 'port' '6379',
- 'auth' 'xxx2',
- 'table' 'click'
- )
参数说明:
xxx1: Redis 的内网连接地址 (host).
6379:Redis 的端口号 (port).
xxx2: Redis 的登陆密码.
click: Redis 的 Hash 表名称.
然后运行查询语句:
select * from clicks;
例如下图:
Spark-SQL 通过 Spark-Redis 连接器直接查询 Redis 数据, 统计了广告的点击数.
小结
本文主要介绍了 Spark 如何把 Redis 作为数据源以及 Spark 的 StructuredStreaming 与 Redis Stream 的结合. 更多 Spark 介绍请参考: X-Pack Spark 分析引擎; 更多 Redis 介绍请参考: 云数据库 Redis 版.
参考列表
本文的代码可参考 Spark 样例代码.
Spark 对接 Redis 可参考 Spark 对接 Redis 快速入门.
本文由用户为个人学习及研究之目的自行翻译发表, 如发现侵犯原作者的版权, 请与社区联系处理 yqgroup@service.aliyun.com
[云栖快讯] 阿里巴巴小程序繁星计划, 20 亿补贴第一弹云应用立即开通购买, 限量从速! 详情请点击
来源: https://yq.aliyun.com/articles/703467