我们知道分布式锁的特性是排他, 避免死锁, 高可用. 分布式锁的实现可以通过数据库的乐观锁 (通过版本号) 或者悲观锁 (通过 for update),Redis 的 setnx() 命令, Zookeeper(在某个持久节点添加临时有序节点, 判断当前节点是否是序列中最小的节点, 如果不是则监听比当前节点还要小的节点. 如果是, 获取锁成功. 当被监听的节点释放了锁(也就是被删除), 会通知当前节点. 然后当前节点再尝试获取锁, 如此反复)
本篇文章, 主要讲如何用 Redis 的形式实现分布式锁. 后续文章会讲解热点 KEY 读取, 缓存穿透和缓存雪崩的场景和解决方案, 缓存更新策略等等知识点, 理论知识点较多.
Redis 配置
我的 redis 配置如下
- spring.redis.host=
- spring.redis.port=6379
- #reids 超时连接时间
- spring.redis.timeout=100000
- spring.redis.password=
- # 连接池最大连接数
- spring.redis.pool.max-active=10000
- # 连接池最大空闲数
- spring.redis.pool.max-idle=1000
- # 连接池最大等待时间
- spring.redis.pool.max-wait=10000
- @Component
- @Getter
- @Setter
- @ConfigurationProperties(prefix = "spring.redis")
- public class RedisConfig {
- @Value("${spring.redis.host}")
- private String host;
- @Value("${spring.redis.port}")
- private int port;
- @Value("${spring.redis.password}")
- private String password;
- @Value("${spring.redis.timeout}")
- private int timeout;
- @Value("${spring.redis.pool.max-active}")
- private int poolMaxActive;
- @Value("${spring.redis.pool.max-idle}")
- private int poolMaxIdle;
- @Value("${spring.redis.pool.max-wait}")
- private int poolMaxWait;
- }
- @Component
- public class RedisPoolFactory {
- @Autowired
- private RedisConfig redisConfig;
- @Bean
- public JedisPool jedisPoolFactory() {
- JedisPoolConfig poolConfig = new JedisPoolConfig();
- poolConfig.setMaxIdle(redisConfig.getPoolMaxIdle());
- poolConfig.setMaxTotal(redisConfig.getPoolMaxActive());
- poolConfig.setTestOnBorrow(true);
- poolConfig.setMaxWaitMillis(redisConfig.getPoolMaxWait());
- JedisPool jp = new JedisPool(poolConfig, redisConfig.getHost(), redisConfig.getPort(),
- redisConfig.getTimeout(), redisConfig.getPassword(), 0);
- return jp;
- }
- }
为了区分不同模块的 key, 我抽象出了一个 KeyPrefix 接口和 BasePrefix 类.
- public interface KeyPrefix {
- int expireSeconds();
- String getPrefix();
- }
- /**
- * @author cmazxiaoma
- * @version V1.0
- * @Description: TODO
- * @date 2018/5/10 12:35
- */
- public abstract class BasePrefix implements KeyPrefix {
- private int expireSeconds;
- private String prefix;
- public BasePrefix(int expireSeconds, String prefix) {
- this.expireSeconds = expireSeconds;
- this.prefix = prefix;
- }
- public BasePrefix(String prefix) {
- this(0, prefix);
- }
- @Override
- public int expireSeconds() {
- return expireSeconds;
- }
- @Override
- public String getPrefix() {
- String className = getClass().getSimpleName();
- return className + ":" + prefix;
- }
- }
分布式锁分析与编码
下面进入正文. 因为分布式系统之间是不同进程的, 单机版的锁无法满足要求. 所以我们可以借助中间件 Redis 的 setnx()命令实现分布式锁. setnx()命令只会对不存在的 key 设值, 返回 1 代表获取锁成功. 对存在的 key 设值, 会返回 0 代表获取锁失败. 这里的 value 是 System.currentTimeMillis() (获取锁的时间)+ 锁持有的时间. 我这里设置锁持有的时间是 200ms, 实际业务执行的时间远比这 200ms 要多的多, 持有锁的客户端应该检查锁是否过期, 保证锁在释放之前不会过期. 因为客户端故障的情况可能是很复杂的. 比如现在有 A,B 俩个客户端. A 客户端获取了锁, 执行业务中做了骚操作导致阻塞了很久, 时间应该远远超过 200ms, 当 A 客户端从阻塞状态下恢复继续执行业务代码时, A 客户端持有的锁由于过期已经被其他客户端占有. 这时候 A 客户端执行释放锁的操作, 那么有可能释放掉其他客户端的锁.
我这里设置的客户端等待锁的时间是 200ms. 这里通过轮询的方式去让客户端获取锁. 如果客户端在 200ms 之内没有锁的话, 直接返回 false. 实际场景要设置合适的客户端等待锁的时间, 避免消耗 CPU 资源.
如果获取锁的逻辑只有这三行代码的话, 会造成死循环, 明显不符合分布式锁的特性.
- if (jedis.setnx(realKey, value) == 1) {
- return true;
- }
所以, 我们要加上锁过期, 然后获取锁的策略. 通过 realKey 获取当前的 currentValue.currentValue 也就是获取锁的时间 + 锁持有的时间. 如果 currentValue 不等于 null 且 currentValue 小于当前时间, 说明锁已经过期. 这时候如果突然来了 C,D 两个客户端获取锁的请求, 不就让 C,D 两个客户端都获取锁了吗. 如果防止这种现象发生, 我们采用 getSet()命令来解决. getSet(key,value)的命令会返回 key 对应的 value, 然后再把 key 原来的值更新为 value. 也就是说 getSet()返回的是已过期的时间戳. 如果这个已过期的时间戳等于 currentValue, 说明获取锁成功.
假设客户端 A 一开始持有锁, 保存在 redis 中的 value(时间戳)等于 T1. 这时候客户端 A 的锁已经过期, 那么 C,D 客户端就可以开始争抢锁了. currentValue 是 T1,C 客户端的 value 是 T2,D 客户端的 value 是 T3. 首先 C 客户端进入到 String oldValue = jedis.getSet(realKey, value); 这行代码, 获得的 oldValue 是 T1, 同时也会把 realKey 对应的 value 更新为 T2. 再执行后续的代码, oldValue 等于 currentValue, 那么客户端 C 获取锁成功. 接着 D 客户端也执行到了 String oldValue = jedis.getSet(realKey, value); 这行代码, 获取的 oldValue 是 T2, 同时也会把 realKey 对应的 value 更新为 T3. 由于 oldValue 不等于 currentValue, 那么客户端 D 获取锁失败.
- public boolean lock(KeyPrefix prefix, String key, String value) {
- Jedis jedis = null;
- Long lockWaitTimeOut = 200L;
- Long deadTimeLine = System.currentTimeMillis() + lockWaitTimeOut;
- try {
- jedis = jedisPool.getResource();
- String realKey = prefix.getPrefix() + key;
- for (;;) {
- if (jedis.setnx(realKey, value) == 1) {
- return true;
- }
- String currentValue = jedis.get(realKey);
- // if lock is expired
- if (!StringUtils.isEmpty(currentValue) &&
- Long.valueOf(currentValue) < System.currentTimeMillis()) {
- // gets last lock time
- String oldValue = jedis.getSet(realKey, value);
- if (!StringUtils.isEmpty(oldValue) && oldValue.equals(currentValue)) {
- return true;
- }
- }
- lockWaitTimeOut = deadTimeLine - System.currentTimeMillis();
- if (lockWaitTimeOut <= 0L) {
- return false;
- }
- }
- } finally {
- returnToPool(jedis);
- }
- }
我们讲解了获取的逻辑, 接着讲讲释放锁的逻辑. 我们在这里加上! StringUtils.isEmpty(currentValue) && value.equals(currentValue)判断是为了防止释放了不属于当前客户端的锁. 还是举个例子, 如果没有这个逻辑, A 客户端调用 unlock()方法之前, 锁突然就过期了. 这时候 B 客户端发现锁过期了, 立马获取了锁. 然后 A 客户端接着调用 unlock()方法, 却释放了原本属于 B 客户端的锁.
- public void unlock(KeyPrefix prefix, String key, String value) {
- Jedis jedis = null;
- try {
- jedis = jedisPool.getResource();
- String realKey = prefix.getPrefix() + key;
- String currentValue = jedis.get(realKey);
- if (!StringUtils.isEmpty(currentValue)
- && value.equals(currentValue)) {
- jedis.del(realKey);
- }
- } catch (Exception ex) {
- log.info("unlock error");
- } finally {
- returnToPool(jedis);
- }
- }
编码 RedisController, 模拟商品秒杀操作. 测试分布式锁是否可行.(强调: 这里只是举一个例子, 更直观的判断分布式锁可行, 不适合实际场景!!!!! 实际上抢购, 是直接将库存放入到 redis, 是否结束标记放入到内存中, 通过内存标记和 redis 中的 decr()预减库存, 然后将秒杀消息入队到消息队列中, 最后消费消息并落地到 DB 中)
- /**
- * @author cmazxiaoma
- * @version V1.0
- * @Description: TODO
- * @date 2018/8/28 9:27
- */
- @RestController
- @RequestMapping("/redis")
- public class RedisController {
- private static LongAdder longAdder = new LongAdder();
- private static Long LOCK_EXPIRE_TIME = 200L;
- private static Long stock = 10000L;
- @Autowired
- private RedisService redisService;
- static {
- longAdder.add(10000L);
- }
- @GetMapping("/v1/seckill")
- public String seckillV1() {
- Long time = System.currentTimeMillis() + LOCK_EXPIRE_TIME;
- if (!redisService.lock(SeckillKeyPrefix.seckillKeyPrefix, "redis-seckill", String.valueOf(time))) {
- return "人太多了, 换个姿势操作一下";
- }
- if (longAdder.longValue() == 0L) {
- return "已抢光";
- }
- doSomeThing();
- if (longAdder.longValue() == 0L) {
- return "已抢光";
- }
- longAdder.decrement();
- redisService.unlock(SeckillKeyPrefix.seckillKeyPrefix, "redis-seckill", String.valueOf(time));
- Long stock = longAdder.longValue();
- Long bought = 10000L - stock;
- return "已抢" + bought + ", 还剩下" + stock;
- }
- @GetMapping("/detail")
- public String detail() {
- Long stock = longAdder.longValue();
- Long bought = 10000L - stock;
- return "已抢" + bought + ", 还剩下" + stock;
- }
- @GetMapping("/v2/seckill")
- public String seckillV2() {
- if (longAdder.longValue() == 0L) {
- return "已抢光";
- }
- doSomeThing();
- if (longAdder.longValue() == 0L) {
- return "已抢光";
- }
- longAdder.decrement();
- Long stock = longAdder.longValue();
- Long bought = 10000L - stock;
- return "已抢" + bought + ", 还剩下" + stock;
- }
- @GetMapping("/v3/seckill")
- public String seckillV3() {
- if (stock == 0) {
- return "已抢光";
- }
- doSomeThing();
- stock--;
- Long bought = 10000L - stock;
- return "已抢" + bought + ", 还剩下" + stock;
- }
- public void doSomeThing() {
- try {
- TimeUnit.MILLISECONDS.sleep(100);
- } catch (InterruptedException ex) {
- ex.printStackTrace();
- }
- }
- }
对 http://localhost:8081/redis/v1/seckill 进行压测, 我使用的压测工具是 ab 测试工具. 这里用 10000 个并发用户, 20000 个请求来进行压测.
ab -c 10000 -n 20000 http://localhost:8081/redis/v1/seckill
压测结果如下:
- E:\cmazxiaoma_download\httpd-2.4.34-o102o-x64-vc14\Apache24\bin>ab -c 10000 -n 2
- 0000 http://localhost:8081/redis/v1/seckill
- This is ApacheBench, Version 2.3 <$Revision: 1826891 $>
- Copyright 1996 Adam Twiss, Zeus Technology Ltd, http://www.zeustech.net/
- Licensed to The Apache Software Foundation, http://www.apache.org/
- Benchmarking localhost (be patient)
- Completed 2000 requests
- Completed 4000 requests
- Completed 6000 requests
- Completed 8000 requests
- Completed 10000 requests
- Completed 12000 requests
- Completed 14000 requests
- Completed 16000 requests
- Completed 18000 requests
- Completed 20000 requests
- Finished 20000 requests
- Server Software:
- Server Hostname: localhost
- Server Port: 8081
- Document Path: /redis/v1/seckill
- Document Length: 22 bytes
- Concurrency Level: 10000
- Time taken for tests: 108.426 seconds
- Complete requests: 20000
- Failed requests: 19991
- (Connect: 0, Receive: 0, Length: 19991, Exceptions: 0)
- Total transferred: 3420218 bytes
- HTML transferred: 760218 bytes
- Requests per second: 184.46 [#/sec] (mean)
- Time per request: 54213.000 [ms] (mean)
- Time per request: 5.421 [ms] (mean, across all concurrent requests)
- Transfer rate: 30.80 [Kbytes/sec] received
- Connection Times (ms)
- min mean[+/-sd] median max
- Connect: 0 0 6.3 0 549
- Processing: 2393 36477 16329.1 45101 90269
- Waiting: 182 36435 16351.4 45046 90267
- Total: 2393 36477 16329.0 45101 90269
- Percentage of the requests served within a certain time (ms)
- 50% 45101
- 66% 47680
- 75% 49136
- 80% 50392
- 90% 53200
- 95% 53743
- 98% 54510
- 99% 56014
- 100% 90269 (longest request)
我们再来看看是否有超卖现象, 貌似还是正常.
回溯分析
我打开 RedisDesktopManager 查看 db0 的 key 信息时, 发现还有一个 key 没有删除掉. 说明我们写的 unlock()方法在 1w 并发用户, 2w 请求下还是存在问题.
仔细推敲自己之前写的代码发现 (还是拿上面的例子说事), 客户端 D 虽然获取锁失败, 但是之前进行了 String oldValue = jedis.getSet(realKey, value); 操作, 还是成功的更新了 realKey 对应的 value. 我们进行 unlock() 操作时, 释放客户端的锁是根据 value 来标识当前客户端的. 一开始客户端 C 的 value 是 T2, 由于客户端 D 的 getSet()操作, 覆盖掉了客户端 C 的 value, 让其更新成 T3. 由于 value.equals(currentValue)条件不成立, 所以不会执行到 jedis.del(realKey)
其实 lock()方法也经不起推敲: 1. 分布式各个系统时间不一致, 如果要这样做, 只能进行时间同步. 2. 当某个客户端锁过期时, 多个客户端开始争抢锁. 虽然最后只有一个客户端能成功锁, 但是获取锁失败的客户端能覆盖获取锁成功客户端的过期时间. 3. 当客户端的锁过期时间被覆盖, 会造成锁不具有标识性, 会造成客户端没有释放锁.
所以我们要重写 lock 与 unlock()的逻辑, 看到网上已经有很多的解决方案.(不过也有很多错误案例)
我们可以通过 redis 的 set(key,value,NX,EX,timeout)合并普通的 set()和 expire()操作, 使其具有原子性.
- /**
- * Set the string value as value of the key. The string can't be longer than 1073741824 bytes (1
- * GB).
- * @param key
- * @param value
- * @param nxxx NX|XX, NX -- Only set the key if it does not already exist. XX -- Only set the key
- * if it already exist.
- * @param expx EX|PX, expire time units: EX = seconds; PX = milliseconds
- * @param time expire time in the units of <code>expx</code>
- * @return Status code reply
- */
- public String set(final String key, final String value, final String nxxx, final String expx,
- final long time) {
- checkIsInMultiOrPipeline();
- client.set(key, value, nxxx, expx, time);
- return client.getStatusCodeReply();
- }
通过 set(key,value,NX,EX,timeout)方法, 我们就可以轻松实现分布式锁. 值得注意的是这里的 value 作为客户端锁的唯一标识, 不能重复.
- public boolean lock1(KeyPrefix prefix, String key, String value, Long lockExpireTimeOut,
- Long lockWaitTimeOut) {
- Jedis jedis = null;
- try {
- jedis = jedisPool.getResource();
- String realKey = prefix.getPrefix() + key;
- Long deadTimeLine = System.currentTimeMillis() + lockWaitTimeOut;
- for (;;) {
- String result = jedis.set(realKey, value, "NX", "PX", lockExpireTimeOut);
- if ("OK".equals(result)) {
- return true;
- }
- lockWaitTimeOut = deadTimeLine - System.currentTimeMillis();
- if (lockWaitTimeOut <= 0L) {
- return false;
- }
- }
- } catch (Exception ex) {
- log.info("lock error");
- } finally {
- returnToPool(jedis);
- }
- return false;
- }
我们可以使用 lua 脚本合并 get()和 del()操作, 使其具有原子性. 一切大功告成.
- public boolean unlock1(KeyPrefix prefix, String key, String value) {
- Jedis jedis = null;
- try {
- jedis = jedisPool.getResource();
- String realKey = prefix.getPrefix() + key;
- String luaScript = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
- Object result = jedis.eval(luaScript, Collections.singletonList(realKey),
- Collections.singletonList(value));
- if ("1".equals(result)) {
- return true;
- }
- } catch (Exception ex) {
- log.info("unlock error");
- } finally {
- returnToPool(jedis);
- }
- return false;
- }
刚才看了评论, 看到了各位大佬提出的一系列问题. 我做出以下解释:
秒杀操作, 我在这里只是举一个例子, 更直观的判断分布式锁可行, 不适合实际场景!!!!! 实际上抢购, 是将商品库存放入到 redis, 将是否结束标记 Flag 放入到内存中, 通过内存标记和 redis 中的 decr()预减库存, 然后将秒杀消息入队到消息队列中, 最后消费消息并落地到 DB 中.
2. 请耐心读完本篇文章. 第一个案例代码是错误的, 我后续讲解了如何发现和分析错误案例代码的思路. 在此基础下, 推导出正确的代码.
3. 通过评论, 我看到有一篇文章作者的思路是这样的: 获取锁之后, 通过标志位和开启新线程的方式轮询去刷新当前客户端持有锁的时间, 以保证在释放锁之前锁不会过期, 然后锁释放后, 将标志位置为 false, 线程停止循环. 但是这样有一个问题: 假如执行了 lock()操作之后, 客户端由于一些原因阻塞了, 那么 unlock()方法一直得不到执行, 那么标志位一直为 true, 开启刷新过期时间的线程一直死循环, 会造成资源的严重浪费. 而且线程一直增加当前客户端持有锁的时间, 会造成其他客户端一直拿不到锁, 而且造成死锁.
喜欢的点点关注, 点点赞.
来源: http://blog.51cto.com/13941961/2171299