数据过滤在很多场景都会应用到, 特别是在大数据环境下. 在数据量很大的场景实现过滤或者全局去重, 需要存储的数据量和计算代价是非常庞大的. 很多小伙伴第一念头肯定会想到布隆过滤器, 有一定的精度损失, 但是存储性能和计算性能可以达到几何级别的提升. 很多第三方框架也实现了相应的功能, 比如 hbase 框架实现的布隆过滤器性能是非常的棒, Redis 也可以实现相应的功能. 这些需要借助于第三方框架, 需要维护第三方框架. 如果公司没有部署相应架构, 单独为使用布隆过滤器部署一套集群, 代价还是非常大的.
我们在做流式计算时需要实现数据小时级别去重和天级别数据去重, 初始功能版本使用的是基于 Redis 实现的布隆过滤器. 性能也非常的好, 三个节点的 Redis 集群 (三主三从, 主从交叉策略) 性能可以达到每秒十几万的处理性能. 在后期的使用中主要瓶颈就在 Redis 的吞吐量的性能上. 一直想在这块做一定的性能优化.
后来, 发现 spark 官方封装了基于 DataFrame 的布隆过滤器, 使用起来相当方便. 性能不再受制于第三方框架的吞吐量限制, 依赖于 spark 的并行资源. 可以减少架构设计的复杂度, 提高可维护性. 在流式计算应用中可以将布隆过滤器做成 driver 级别的全局变量, 在 batch 结束更新布隆过滤器. 如果考虑容错, 可以将布隆过滤器数据定期持久化到磁盘(hdfs/Redis).
直接上代码, 看一下使用方法
- val bf = df.stat.bloomFilter("dd",dataLen,0.01)
- val rightNum = rdd.map(x=>(x.toInt,bf.mightContainString(x)))
首先, 在生成布隆过滤器直接调用 bloomFilter(colName:String,expectedNumItems:Long,fpp:Double)就可以了, 第一个参数是使用的数据列, 第二个参数是数据量期望会有多少, 第三个参数是损失精度. 损失精度越低生成的布隆数组长度就会越长, 占用的空间就会越多, 计算过程就会越漫长.
在用有些场景布隆过滤器还需要合并, 官方也提供了相应的 API
mergeInPlace(BloomFilter var1):BloomFilter
判定数据是否存在, 官方一共提供了四个方法:
- mightContain(Object var1),
- mightContainString(String val1),
- mightContainLong(long var1),
- mightContainBinary(byte[] var1)
不同的方法适用于不同的类型, bloomFilter(calname:String...)这个方法中使用列的数据类型一定要和以上四个方法对应, 否则会出问题.
官方还很贴心的提供了序列化和反序列化工具: writeTo 和 readFrom, 可以很方便的将布隆过滤器序列化到磁盘和从磁盘加载布隆过滤器.
Spark 布隆过滤器(bloomFilter)
来源: http://www.bubuko.com/infodetail-3108886.html