提出需求
实时统计业务系统 (web,APP 之类) 的访问人数, 即所谓 UV, 或者 DAU 指标.
这个需求怕是流计算最最最常见的需求了.
计算 UV 的关键点就在于去重, 即同一个人访问两次是只计一个 UV 的. 在离线计算中统计 UV 比较容易想到的方法就是用 group 或 distinct 机制来去重. 但是在实时计算场景, 还用 group 就不太科学了, 一个是全量数据的 group 是比较费时的, 第二个是全量数据的 group 是很费内存和 CPU 的. 特别是当用户量巨大的时候, 还要做到秒级更新就更难了.
总结起来, 需求就是: 海量用户场景 UV 实时计算.
分享之前我还是要推荐下我自己创建的大数据学习交流 Qun531629188
无论是大牛还是想转行想学习的大学生
小编我都挺欢迎, 今天的已经资讯上传到群文件, 不定期分享干货,
接受挑战
不难发现, 问题的主要难点就是去重.
Spark Streaming 目前没有给出内置方案(这个其实可以有), 但是海量数据去重问题早就有解决办法了. 所以 Spark Streaming 程序完全可以利用其他系统的现有方案解决去重问题, 比如 Redis.
Redis 的海量去重计数方案
Bitmap 方案
所谓的 Bitmap 就是用一个 bit 位来标记某个元素对应的 Value, 比如 ID 为 2 的用户, 就用第 2 个 bit 位来表示, 然后用该位的值来表示该用户是否访问过. 如果要计算 UV, 那就只要数一下有多少个 1 就行啦.
假设我们有 40 亿用户, 使用 Bitmap 需要 2^32 个 bit 位, 算下来也就 500M 左右.
你可能没想到的是, Redis 中最常用的数据结构 string, 就可以实现 bitmap 算法.
Redis 提供了如下命令
// 插入
setbit key offset value
- // 获取
- getbit key offset
- // 计数
- BITCOUNT key [start] [end]
这里 offset 最大值就是 2^32. 比如 ID 为 2 的用户, 可以 setbit uv 2 1, 来记录. 最后要计算 UV, 就直接 BITCOUNT uv. 这步计数非常快, 复杂度是 O(1).
HyperLogLog 方案
若要计算很多页面的 UV, 用 bitmap 还是比较费空间的, N 个页面就得有 N 个 500M. 这时候 HyperLogLog 结构就是一个比较好的选择.
Redis 在 2.8.9 版本添加了 HyperLogLog 结构. Redis HyperLogLog 是用来做基数统计的算法, HyperLogLog 的优点是, 在输入元素的数量或者体积非常非常大时, 计算基数所需的空间总是固定 的, 并且是很小的. 在 Redis 里面, 每个 HyperLogLog 键只需要花费 12 KB 内存, 就可以计算接近 2^64 个不同元素的基 数. 这和计算基数时, 元素越多耗费内存就越多的集合形成鲜明对比. 但是, 因为 HyperLogLog 只会根据输入元素来计算基数, 而不会储存输入元素本身, 所以 HyperLogLog 不能像集合那样, 返回输入的各个元素.
也就是说 HyperLogLog 是一种基数统计算法, 计算结果是近似值, 12 KB 内存就可以计算 2^64 个不同元素的基数.
Redis 命令如下:
- redis 127.0.0.1:6379> PFADD runoobkey "redis"
- 1) (integer) 1
- redis 127.0.0.1:6379> PFADD runoobkey "mongodb"
- 1) (integer) 1
- redis 127.0.0.1:6379> PFADD runoobkey "mysql"
- 1) (integer) 1
- redis 127.0.0.1:6379> PFCOUNT runoobkey
- (integer) 3
代码实现
下面给出 HyperLogLog 方案的参考实现:
- stream.foreachRDD { rdd =>
- // 统计人数
- rdd.foreachPartition { partition =>
- // 从分区所属 executor 的 redis 线程池获取一个连接.
- val redis = RedisUtil.getRedis
- partition.foreach { case (date, userId) =>
- // 统计当前 userId
- redis.pfadd(s"uv:$date", userId)
- }
- redis.close()
- }
- }
关于 Redis 的连接, 如果是用 java 或 scala 可以使用 JedisPool, 注意处理序列化即可.
来源: http://www.bubuko.com/infodetail-2605966.html