并发场景中大部分处理的是先更新 DB, 再 (删缓, 更新) 缓存的处理方式, 但是在实际场景中有可能 DB 更新成功了, 但是缓存设置失败了, 就造成了缓存与 DB 数据不一致的问题, 下面就以实际情况说下怎么解决此类问题.
名词 Cache: 本文内指 Redis,ReadRequest: 请求从 Cache,Db 中拿去数据, WriteRequest: 数据写入 DB 并删除缓存
若要保证数据库与缓存一直, 我们需要采用先删缓存, 在更新 DB 的情况, 这时候有的同学可能会问, 如果缓存删除成功了, 而 DB 更新失败了怎么办, 其实仔细考虑一下, DB 虽然失败了, 那真正是不会产生数据影响的, 而当下次一次请求进来的时候, 我们重新把 DB 中未更新的数据重新塞入缓存, 从结果上来看是没有影响的. 我们把请求分为 ReadRequest ,WriteRequest, 大部分同学都知道我们在使用 Cache 时 首先都会去 Cache 内查一下, 如果 Cache 中没有拿到数据我们在从数据库中去获取数据, 这个时候在高并发的场景的踩过坑的同学都知道恰巧在这时候有更新请求把缓存删除了, 这时候大量请求进来, Cache 内没有此项数据, 请求就会直接落在 DB 上, 就很容易造成缓存雪崩, 数据库很可能瞬时就挂掉了, 所以处理方案就是我们需要对查询写入的缓存进行排队处理, 而正确从 cache 内获取的姿势:
1, 每次查询数据的时候我们吧请求数据放入队列, 由队列消费者去检查一下 cache 是否存在, 不存在则进行插入, 存在就跳过
2, 当前 readRequest 就自循环, 我们不断尝试从 cache 内去获取数据, 拿到数据或超时当前线程立即退出
3, 如果拿到数据了就返回结果, 没有拿到数据我们就从 DB 去查
而 WriteRequest 的处理相对就简单多了我们直接删除缓存后, 更新 DB 即可, 下面上代码说明:
消息队列这里我们基于 jdk 并发包内的 BlockingQueue 进行实现, 使用 MQ(Rabbit,Kafka 等)的话思想差不多, 只是需要交互一次 mq 的服务端. 首先项目启动时我们在程序后台开辟监听线程, 从数据共享缓冲区 (ArrayBlockingQueue) 内监听消息
- public class BlockQueueThreadPool {
- /**
- * 核心线程数
- */
- private Integer corePoolSize = 10;
- /**
- * 线程池最大线程数
- */
- private Integer maximumPoolSize = 20;
- /**
- * 线程最大存活时间
- */
- private Long keepAliveTime = 60L;
- private ExecutorService threadPool = new ThreadPoolExecutor(this.corePoolSize, this.maximumPoolSize,
- this.keepAliveTime, TimeUnit.SECONDS,
- new ArrayBlockingQueue(this.corePoolSize));
- public BlockQueueThreadPool() {
- RequestQueue requestQueue = RequestQueue.getInstance();
- BlockingQueue<RequestAction> queue = new ArrayBlockingQueue<>(this.corePoolSize);
- requestQueue.add(queue);
- this.threadPool.submit(new JobThread(queue));
- }
- }
PS:ArrayBlockingQueue 中很好的利用了 Condition 中的等待和通知功能, 这里我们就能实现对共享通道队列的事件监听了.
- public class JobThread implements Callable<Boolean> {
- private BlockingQueue<RequestAction> queue;
- public JobThread(BlockingQueue<RequestAction> queue) {
- this.queue = queue;
- }
- @Override
- public Boolean call() throws Exception {
- try {
- while (true) {
- // ArrayBlockingQueue take 方法 获取队列排在首位的对象, 如果队列为空或者队列满了, 则会被阻塞住
- RequestAction request = this.queue.take();
- RequestQueue requestQueue = RequestQueue.getInstance();
- Map<String, Boolean> tagMap = requestQueue.getTagMap();
- if (request instanceof ReadRequest) {
- Boolean tag = tagMap.get(request.getIdentity());
- if (null == tag) {
- tagMap.put(request.getIdentity(), Boolean.FALSE);
- }
- if (tag != null && tag) {
- tagMap.put(request.getIdentity(), Boolean.FALSE);
- }
- if (tag != null && !tag) {
- return Boolean.TRUE;
- }
- } else if (request instanceof WriteRequest) {
- // 如果是更新数据库的操作
- tagMap.put(request.getIdentity(), Boolean.TRUE);
- }
- // 执行请求处理
- log.info("缓存队列执行 +++++++++++++++++,{}", request.getIdentity());
- request.process();
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- return Boolean.TRUE;
- }
- }
接下来就要定义我们的 WriteRequest,ReadRequest 了
- @Slf4j
- public class ReadRequest<TResult> extends BaseRequest {
- public ReadRequest(String cacheKey, GetDataSourceInterface action) {
- super(cacheKey, action);
- }
- @Override
- public void process() {
- TResult result = (TResult) action.exec();
- if (Objects.isNull(result)) {
- // 防止缓存击穿
- Redis.set(cacheKey, "", 10000);
- } else {
- Redis.set(cacheKey, result, 10000);
- }
- }
- }
- public class WriteRequest<TResult> extends BaseRequest {
- public WriteRequest(String cacheKey, GetDataSourceInterface action) {
- super(cacheKey, action);
- }
- @Override
- public void process() {
- Redis.del(cacheKey);
- action.exec();
- }
- }
这里我们需要坐下判断, 在数据库内查询数据为空后把 "" 写入了缓存, 这样子是避免有人恶意请求不存在的数据时造成缓存击穿. 接下来就是我们针对各项业务场景中需要获取与更新缓存的路由端了
- @UtilityClass
- public class RouteUtils {
- public static void route(RequestAction requestAction) {
- try {
- BlockingQueue<RequestAction> queue = RequestQueue.getInstance().getQueue(0);
- queue.put(requestAction);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
- public class RequestQueue {
- private RequestQueue() {
- }
- private List<BlockingQueue<RequestAction>> queues = new ArrayList<>();
- private Map<String, Boolean> tagMap = new ConcurrentHashMap<>(1);
- private static class Singleton {
- private static RequestQueue queue;
- static {
- queue = new RequestQueue();
- }
- private static RequestQueue getInstance() {
- return queue;
- }
- }
- public static RequestQueue getInstance() {
- return Singleton.getInstance();
- }
- public void add(BlockingQueue<RequestAction> queue) {
- this.queues.add(queue);
- }
- public BlockingQueue<RequestAction> getQueue(int index) {
- return this.queues.get(index);
- }
- public int size() {
- return this.queues.size();
- }
- public Map<String, Boolean> getTagMap() {
- return this.tagMap;
- }
- }
这里有一个小的知识点, 很多时候我们在保证线程安全的时候多数会使用 DSL 双锁模型, 但是我始终觉得这类代码不够美观, 所以我们可以利用 JVM 的类加载原则, 使用静态类包裹初始化类, 这样子也一定能保证单例模型, 并且代码也更美观了. 接下来就可以看下 Service 的代码
- @Service
- public class StudentService {
- public Student getStudent(String name) {
- ReadRequest<Student> readRequest = new ReadRequest<>(name, () -> Student.builder().name(name).age(3).build());
- return CacheProcessor.builder().build().getData(readRequest);
- }
- public void update(Student student) {
- WriteRequest<Student> writeRequest = new WriteRequest<>(student.getName(), () -> student);
- CacheProcessor.builder().build().setData(writeRequest);
- }
- }
Service 内直接调用了 Cachce 的处理者, 我们通过处理者来获取缓存与更新缓存
- @Builder
- public class CacheProcessor {
- public <TResult> TResult getData(ReadRequest readRequest) {
- try {
- RouteUtils.route(readRequest);
- long startTime = System.currentTimeMillis();
- long waitTime = 0L;
- while (true) {
- if (waitTime> 3000) {
- break;
- }
- TResult result = (TResult) readRequest.Redis.get(readRequest.getIdentity());
- if (!Objects.isNull(result)) {
- return result;
- } else {
- Thread.sleep(20);
- waitTime = System.currentTimeMillis() - startTime;
- }
- }
- return (TResult) readRequest.get();
- } catch (Exception e) {
- return null;
- }
- }
- public void setData(WriteRequest writeRequest){
- RouteUtils.route(writeRequest);
- }
- }
这里我们就先把请求数据发送到数据共享渠道, 消费者端与当前的 ReadRequest 线程同步执行, 拿到数据后 ReadRequest 就立马退出, 超时后我们就从数据库中获取数据. 这里面我使用了 java8 @FunctionalInterface 标记接口, 对各个业务中需要用到缓存的地方统一进行封装方便调用, 以上的代码就已经基本说明并发中 Db 和 Cache 双休一致性的解决思路, 聪明的小伙伴肯定能看出其实还有很多优化的地方, 比如说我们栗子中是单线程吞吐量不高, 采用多线程与多消费者端的时候我们还需要保证商品的更新和读取请求需要落在同一个消费者端等等问题. 或者在使用外部 MQ 时, 我们除了要考虑以上同一商品的读写保证落在一个消费节点上, 还需要考虑队列内有插入缓存请求的时候需要跳过的处理等等, 更多情况还需要根据实际情况大家自己去发现咯
参考: 中华石杉的教程
来源: https://www.cnblogs.com/doNetTom/p/11797024.html