当我们在单机情况下, 遇到并发问题, 可以使用 juc 包下的 lock 锁, 或者 synchronized 关键字来加锁. 但是这俩都是 JVM 级别的锁, 如果跨了 JVM 这两个锁就不能控制并发问题了, 也就是说在分布式集群环境中, 需要寻求其他方法来解决并发问题. 前面也说到可以使用 Redis 的 setnx 操作, 如果不存在则 set, 如果存在则不 set. 也就是说每个服务实例都对同一个 key 进行操作. 谁能 set 成功就认为获取到了锁. 可以执行下面的操作. 执行完之后释放锁. 如下按照上述逻辑来简单实现一个分布式锁:
- package com.nijunyang.Redis.lock;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.data.Redis.core.RedisTemplate;
- import org.springframework.data.Redis.core.ValueOperations;
- import org.springframework.web.bind.annotation.GetMapping;
- import org.springframework.Web.bind.annotation.RestController;
- /**
- * Description:
- * Created by nijunyang on 2020/3/17 23:53
- */
- @RestController
- public class LockController {
- @Autowired
- ValueOperations<String, Object> valueOperations;
- @Autowired
- RedisTemplate<String, Object> redisTemplate;
- String lock = "lock";
- String quantityKey = "quantity";
- @GetMapping("/deduct-stock")
- public String deductStock() {
- try {
- boolean getLock = valueOperations.setIfAbsent(lock, 1);
- if (!getLock) {
- return "没有获取到锁";
- }
- // 使用当做数据库, 只是模拟扣减库存场景, 因此不使用原子操作
- Integer quantity = (Integer) valueOperations.get(quantityKey);
- if (quantity> 0) {
- --quantity;
- valueOperations.set(quantityKey, quantity);
- System.out.println("扣减库存成功, 剩余库存:" + quantity);
- } else {
- System.out.println("扣减库存成功, 剩余库存:" + quantity);
- }
- return "true";
- } finally {
- redisTemplate.delete(lock);
- }
- }
- }
如果不出意外这个锁是可以用的, 但是如果拿到锁之后, 在执行业务的过程中, 服务挂了, 就会导致锁没有释放, 其他服务永远无法拿到锁, 因此我们可以优化一下, 加锁的同时给锁设置一个过期时间, 这样来保证, 拿到锁在执行业务的时候挂了, 到了过期时间之后, 其他服务一样可以继续获取锁.
- package com.nijunyang.Redis.lock;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.data.Redis.core.RedisTemplate;
- import org.springframework.data.Redis.core.ValueOperations;
- import org.springframework.Web.bind.annotation.GetMapping;
- import org.springframework.Web.bind.annotation.RestController;
- import java.util.concurrent.TimeUnit;
- /**
- * Description:
- * Created by nijunyang on 2020/3/17 23:53
- */
- @RestController
- public class LockController {
- @Autowired
- ValueOperations<String, Object> valueOperations;
- @Autowired
- RedisTemplate<String, Object> redisTemplate;
- String lock = "lock";
- String quantityKey = "quantity";
- @GetMapping("/deduct-stock")
- public String deductStock() {
- try {
- // 设置值, 并且设置超时时间
- boolean getLock = valueOperations.setIfAbsent(lock, 1, 10, TimeUnit.SECONDS);
- if (!getLock) {
- return "没有获取到锁";
- }
- // 使用当做数据库, 只是模拟扣减库存场景, 因此不使用原子操作
- Integer quantity = (Integer) valueOperations.get(quantityKey);
- if (quantity> 0) {
- --quantity;
- valueOperations.set(quantityKey, quantity);
- System.out.println("扣减库存成功, 剩余库存:" + quantity);
- } else {
- System.out.println("扣减库存成功, 剩余库存:" + quantity);
- }
- return "true";
- } finally {
- redisTemplate.delete(lock);
- }
- }
- }
但是问题又来了, 这个超时时间设置多大合适呢, 如果网络延迟或者出现了 sql 的慢查询等, 导致业务还没执行完, 锁就过期了, 这个时候别的服务又拿到了锁, 现在并发问题问题又来了...A1 服务拿到锁, 设置过期时间 10s, 但是业务逻辑需要 15s 才能执行完, 10s 过后锁自动释放, 这时候 A2 服务拿到锁执行业务, 5s 之后 A1 执行完业务删除锁, 但是这个时候 A1 释放的是 A2 加的锁, A2 这个时候才执行 5s, 等到 A2 执行完去释放的又是别的服务拿到的锁, 如此恶心循环....
我们可以将锁的 value 设置成一个客户端的唯一值, 比如生成一个 UUID, 删除的时候判断一下这个值是否是自己生成, 这样就可以避免把其他服务加的锁删掉.
- package com.nijunyang.Redis.lock;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.data.Redis.core.RedisTemplate;
- import org.springframework.data.Redis.core.ValueOperations;
- import org.springframework.Web.bind.annotation.GetMapping;
- import org.springframework.Web.bind.annotation.RestController;
- import java.util.UUID;
- import java.util.concurrent.TimeUnit;
- /**
- * Description:
- * Created by nijunyang on 2020/3/17 23:53
- */
- @RestController
- public class LockController {
- @Autowired
- ValueOperations<String, Object> valueOperations;
- @Autowired
- RedisTemplate<String, Object> redisTemplate;
- String lock = "lock";
- String quantityKey = "quantity";
- @GetMapping("/deduct-stock")
- public String deductStock() {
- String uuid = UUID.randomUUID().toString();
- try {
- // 设置值, 并且设置超时时间
- boolean getLock = valueOperations.setIfAbsent(lock, uuid, 10, TimeUnit.SECONDS);
- // boolean getLock = valueOperations.setIfAbsent(lock, 1);
- if (!getLock) {
- return "没有获取到锁";
- }
- // 使用当做数据库, 只是模拟扣减库存场景, 因此不使用原子操作
- Integer quantity = (Integer) valueOperations.get(quantityKey);
- if (quantity> 0) {
- --quantity;
- valueOperations.set(quantityKey, quantity);
- System.out.println("扣减库存成功, 剩余库存:" + quantity);
- } else {
- System.out.println("扣减库存成功, 剩余库存:" + quantity);
- }
- return "true";
- } finally {
- // 删除之前判断是否是自己加的锁
- if (uuid.equals(valueOperations.get(lock))) {
- redisTemplate.delete(lock);
- }
- }
- }
- }
这样只是保证自己的锁不被别人删掉, 但是这个判断再删除的操作也不是原子操作, 同时超时的问题还是没有解决. 怎么办呢, 我们给锁续命, 可以在加锁的同时再起一个定时任务, 去检查锁是否释放, 如果没有释放就增加超时时间, 然后再去定时检查, 直到锁被删除了. 比如锁超时时间 10s, 那么定时任务在 8s 后去检查, 锁是否被释放, 如果没有释放则重新设置超时时间. 继续监视锁是否释放.
如果我们自己按照这个逻辑去实现, 有可能还会有很多 bug.Redisson 已经帮我们很好的实现了分布式锁. 配置好之后, 使用就像使用 java 的 lock 一样. 原理就和上述差不多.
加依赖:
- <dependency>
- <groupId>org.redisson</groupId>
- <artifactId>redisson</artifactId>
- <version>3.6.5</version>
- </dependency>
写配置:
- @Bean
- public Redisson redisson() {
- Config config = new Config();
- config.useSingleServer().setAddress("redis://127.0.0.1:6379").setPassword("xxx");
- /**
- * 哨兵
- */
- //config.useSentinelServers().addSentinelAddress("");
- /**
- * 集群
- */
- //config.useClusterServers().addNodeAddress("redis://111.229.53.45:6379");
- return (Redisson) Redisson.create(config);
- }
使用:
- package com.nijunyang.Redis.lock;
- import org.redisson.Redisson;
- import org.redisson.API.RLock;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.data.Redis.core.RedisTemplate;
- import org.springframework.data.Redis.core.ValueOperations;
- import org.springframework.Web.bind.annotation.GetMapping;
- import org.springframework.Web.bind.annotation.RestController;
- import java.util.UUID;
- import java.util.concurrent.TimeUnit;
- /**
- * Description:
- * Created by nijunyang on 2020/3/17 23:53
- */
- @RestController
- public class LockController {
- @Autowired
- ValueOperations<String, Object> valueOperations;
- @Autowired
- RedisTemplate<String, Object> redisTemplate;
- @Autowired
- Redisson redisson;
- String lockKey = "lockKey";
- String quantityKey = "quantity";
- @GetMapping("/deduct-stock2")
- public String deductStock2() {
- RLock redissonLock = redisson.getLock(lockKey);
- try {
- redissonLock.lock();
- // 使用当做数据库, 只是模拟扣减库存场景, 因此不使用原子操作
- Integer quantity = (Integer) valueOperations.get(quantityKey);
- if (quantity> 0) {
- --quantity;
- valueOperations.set(quantityKey, quantity);
- System.out.println("扣减库存成功, 剩余库存:" + quantity);
- return "true";
- } else {
- System.out.println("扣减库存成功, 剩余库存:" + quantity);
- return "false";
- }
- } finally {
- redissonLock.unlock();
- }
- }
- }
和 JUC 包里面 Lock 锁的使用一模一样, 有木有?
Redisson 锁源码逻辑简要分析, 直接在代码中加的注释说明, 里面大量使用 lua 脚本来封装 Redis 操作的原子性, 上面提到的判断再删除的操作, 也可以写成 lua 脚本执行, 保证原子性. 同时 lua 脚本中如果出错了, 数据还会回滚.
虽然看起来已经很完善了, 但是还有一点点问题如果哨兵模式, 或者集群模式, 锁加载 master 上面, 还未同步到 slave 的时候, master 挂了, 这个重新选举, 新的 master 上面是没有加锁的. 不过这种几率已经很小很小了, 如果是在要求强一致性, 那么就只有选择 zookeeper 来实现, 因为 zookeeper 是强一致性的, 它是多数节点数据都同步好了才返回. Master 挂了, 选举也是在数据一致的节点中, 因此重新选上来 leader 肯定是有锁的. 当然 ZK 的性能肯定就没有 Redis 的高了, 怎么选择还是看自己业务是否允许.
Redisson 也提供了一个 RedissonRedLock, 传入多个锁对象, 加锁的时候, 多个锁都加上才认为加锁成功. 但是这样需要连接多个 Redis. 这样肯定是有性能问题的, 还有网络问题等等.
来源: https://www.cnblogs.com/nijunyang/p/12527341.html