服务通常需要考虑速度和容量限制, 增强系统的鲁棒性.
背景
笔者曾负责过某公司内公众号服务开发. 公众号接口服务接收到用户的推送请求后会构造公众号消息并写入消息队列, 路由服务异步接收到消息后进行消息存储后, 再交由推送服务向用户推送消息. 基本流程如下图所示:
消息存储过程:
路由服务发起消息存储请求, 并将消息缓存到本地;
存储服务成功存储消息后异步发送成功通知;
路由服务接收到成功通知后从本地缓存获取消息内容后进行后续推送处理;
问题
若存储服务异常, 系统会出现什么问题?
路由服务使用 local cache 临时存储消息. 当存储服务异常时, 若不加限制, 路由服务极有可能导致内存溢出, 路由服务不可用;
路由服务发起消息存储请求为异步过程, 很有可能会一直消费 MQ 里的消息, 导致存储服务承受更大的服务压力. 同时会存在消息可能丢失的风险;
方案
基于信号量实现限制容量的本地缓存. 容量大小为信号量个数, 当路由服务发起消息存储请求时, 信号量减 1. 当路由服务接收到存储成功通知后, 信号量加 1.
存储服务正常时, 容量限制机制不会起作用, 服务性能不会受到影响;
存储服务异常时, 本地缓存的容量会越来越小. 最后再无可用的信号量时, 服务会阻塞等待. 此时不再对消息队列进行消费. 既避免了服务 OOM 的状况, 也降低了服务继续恶化的可能;
实现
基于信号量实现的限容数据结构 BlockingHashMap
- public class BlockingHashMap<K, V> {
- private static final int DEFAULT_MAX_AVAILABLE = 1000;
- private final ConcurrentHashMap<K, V> inmap = new ConcurrentHashMap<>(DEFAULT_MAX_AVAILABLE);
- private Semaphore sem;
- public BlockingHashMap() {
- this(DEFAULT_MAX_AVAILABLE);
- }
- public BlockingHashMap(int permits) {
- sem = new Semaphore(permits);
- }
- public V put(K key, V value) {
- boolean wasAdded = false;
- try {
- sem.acquire();
- V v = inmap.putIfAbsent(key, value);
- if (v != null) {
- return v;
- }
- wasAdded = true;
- } catch (Exception e) {
- } finally {
- if (!wasAdded) {
- // 若添加失败, 需要释放信号量
- sem.release();
- }
- }
- return value;
- }
- public V remove(K key) {
- V value = inmap.remove(key);
- if (value != null) {
- // 只有当成功移除元素时才释放信号量
- sem.release();
- }
- return value;
- }
- }
总结
基于信号量实现固定容量的本地缓存, 简单有效.
来源: https://juejin.im/post/5c1e5222e51d4554055559a2