1. 实时插入 MySQL 时遇到的问题, 使用的 updateStaeBykey 有状态的算子 必须设置 checkpoint 如果报错直接删掉 checkpoint
在创建的时候自己保存偏移量即可 再次启动时读取正确偏移量就行了 管他 checkpoint 无关的事了
实时插入时有个问题是怎么进行 MySQL 的数据覆盖 掉一批次的值:
1. 使用局部更新的 sql :
insert into area_user_amt (date,country,provence,amt) values('${datekey}','${countrykey}','${provencekey}','${amt}') ON DUPLICATE KEY UPDATE `amt`= '${amt}'
2. 使用 replace 相当于先删除在插入
replace into stream_offset(topic,partitions,groupid,brokerlist,offset)values (?,?,?,?,?)
2. 使用 Redis 不使用叠加状态的 updateStaeBykey , 进行完 reduceBykey(list1,list2)=>(list.zip(list2)).map(_.1+_.2) reduceBykey 的两个参 (累计值, 当前值) 一直做 zip 操作, 做完后
(10,1).zip(20,2)=》((10,20),(1,2))在做 map 对里面每一个进行相加就是累加值 (只是当前批次的)
使用 Redis 的 hincrby 值增加的方法实现 累加求和
- .foreachPartition(iter=>{
- // 在各分区获取 Redis 连接
- val jedis=JedisUtil.getJedisClient()
- iter.foreach(tp=>{
- //B2019040114 , 成功量 , 总量
- jedis.hincrBy("P-"+tp._1._1.substring(0,8),tp._1._2,tp._2(0).toLong)
- // 设置 key 的有效时间
- jedis.expire(tp._1._1,60*60*24*7) }) jedis.close()
- })
SparkStreaming 使用 checkpoint 存在的问题
SparkStreaming 在处理 kafka 中的数据时, 存在一个 kafka offset 的管理问题:
官方的解决方案是 checkpoint:
checkpoint 是对 sparkstreaming 运行过程中的元数据和 每次 rdds 的数据状态保存到一个持久化系统中, 当然这里面也包含了 offset, 一般是 HDFS,S3, 如果程序挂了, 或者集群挂了, 下次启动仍然能够从 checkpoint 中恢复, 从而做到生产环境的 7*24 高可用. 如果 checkpoint 存储做 hdfs 中, 会带来小文件的问题.
但是 checkpoint 的最大的弊端在于, 一旦你的流式程序代码或配置改变了, 或者更新迭代新功能了, 这个时候, 你先停旧的 sparkstreaming 程序, 然后新的程序打包编译后执行运行, 会出现两种情况:
(1)启动报错, 反序列化异常
(2)启动正常, 但是运行的代码仍然是上一次的程序的代码.
为什么会出现上面的两种情况?
这是因为 checkpoint 第一次持久化的时候会把整个相关的 jar 给序列化成一个二进制文件, 每次重启都会从里面恢复, 但是当你新的 程序打包之后序列化加载的仍然是旧的序列化文件, 这就会导致报错或者依旧执行旧代码. 有的同学可能会说, 既然如此, 直接把上次的 checkpoint 删除了, 不就能启动了吗? 确实是能启动, 但是一旦你删除了旧的 checkpoint, 新启动的程序, 只能从 kafka 的 smallest 或者 largest 的偏移量消费, 默认是从最新的, 如果是最新的, 而不是上一次程序停止的那个偏移量 就会导致有数据丢失, 如果是老的, 那么就会导致数据重复. 不管怎么样搞, 都有问题.
来源: http://www.bubuko.com/infodetail-2986641.html