前言
单机环境下我们可以通过 JAVA 的 Synchronized 和 Lock 来实现进程内部的锁, 但是随着分布式应用和集群环境的出现, 系统资源的竞争从单进程多线程的竞争变成了多进程的竞争, 这时候就需要分布式锁来保证.
实现分布式锁现在主流的方式大致有以下三种
1. 基于数据库的索引和行锁
2. 基于 Redis 的单线程原子操作: setNX
3. 基于 Zookeeper 的临时有序节点
这篇文章我们用 Redis 来实现, 会基于现有的各种锁实现来分析, 最后分享 Redission 的锁源码分析来看下分布式锁的开源实现
复制代码
设计实现
加锁
一, 通过 setNx 和 getSet 来实现
这是现在网上大部分版本的实现方式, 笔者之前项目里面用到分布式锁也是通过这样的方式实现
- public boolean lock(Jedis jedis, String lockName, Integer expire) {
- // 返回是否设置成功
- //setNx 加锁
- long now = System.currentTimeMillis();
- boolean result = jedis.setnx(lockName, String.valueOf(now + expire * 1000)) == 1;
- if (!result) {
- // 防止死锁的容错
- String timestamp = jedis.get(lockName);
- if (timestamp != null && Long.parseLong(timestamp) <now) {
- // 不通过 del 方法来删除锁. 而是通过同步的 getSet
- String oldValue = jedis.getSet(lockName, String.valueOf(now + expire));
- if (oldValue != null && oldValue.equals(timestamp)) {
- result = true;
- jedis.expire(lockName, expire);
- }
- }
- }
- if (result) {
- jedis.expire(lockName, expire);
- }
- return result;
- }
复制代码
代码分析:
通过 setNx 命令老保证操作的原子性, 获取到锁, 并且把过期时间设置到 value 里面
通过 expire 方法设置过期时间, 如果设置过期时间失败的话, 再通过 value 的时间戳来和当前时间戳比较, 防止出现死锁
通过 getSet 命令在发现锁过期未被释放的情况下, 避免删除了在这个过程中有可能被其余的线程获取到了锁
存在问题
防止死锁的解决方案是通过系统当前时间决定的, 不过线上服务器系统时间一般来说都是一致的, 这个不算是严重的问题
锁过期的时候可能会有多个线程执行 getSet 命令, 在竞争的情况下, 会修改 value 的时间戳, 理论上来说会有误差
锁无法具备客户端标识, 在解锁的时候可能被其余的客户端删除同一个 key
虽然有小问题, 不过大体上来说这种分布式锁的实现方案基本上是符合要求的, 能够做到锁的互斥和避免死锁
二, 通过 Redis 高版本的原子命令
jedis 的 set 命令可以自带复杂参数, 通过这些参数可以实现原子的分布式锁命令
jedis.set(lockName, "","NX","PX", expireTime);
复制代码
代码分析
redis 的 set 命令可以携带复杂参数, 第一个是锁的 key, 第二个是 value, 可以存放获取锁的客户端 ID, 通过这个校验是否当前客户端获取到了锁, 第三个参数取值 NX/XX, 第四个参数 EX|PX, 第五个就是时间
NX: 如果不存在就设置这个 key XX: 如果存在就设置这个 key
EX: 单位为秒, PX: 单位为毫秒
这个命令实质上就是把我们之前的 setNx 和 expire 命令合并成一个原子操作命令, 不需要我们考虑 set 失败或者 expire 失败的情况
解锁
一, 通过 Redis 的 del 命令
- public boolean unlock(Jedis jedis, String lockName) {
- jedis.del(lockName);
- return true;
- }
复制代码
代码分析
通过 redis 的 del 命令可以直接删除锁, 可能会出现误删其他线程已经存在的锁的情况
二, Redis 的 del 检查
- public static void unlock2(Jedis jedis, String lockKey, String requestId) {
- // 判断加锁与解锁是不是同一个客户端
- if (requestId.equals(jedis.get(lockKey))) {
- // 若在此时, 这把锁突然不是这个客户端的, 则会误解锁
- jedis.del(lockKey);
- }
- }
复制代码
代码分析
新增了 requestId 客户端 ID 的判断, 但由于不是原子操作, 在多个进程下面的并发竞争情况下, 无法保证安全
三, Redis 的 LUA 脚本
- public static boolean unlock3(Jedis jedis, String lockKey, String requestId) {
- String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
- Object result = jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(""));
- if (1L == (long) result) {
- return true;
- }
- return false;
- }
复制代码
代码分析
通过 Lua 脚本来保证操作的原子性, 其实就是把之前的先判断再删除合并成一个原子性的脚本命令, 逻辑就是, 先通过 get 判断 value 是不是相等, 若相等就删除, 否则就直接 return
Redission 的分布式锁
Redission 是 redis 官网推荐的一个 redis 客户端, 除了基于 redis 的基础的 CURD 命令以外, 重要的是就是 Redission 提供了方便好用的分布式锁 API
复制代码
一, 基本用法
- RedissonClient redissonClient = RedissonTool.getInstance();
- RLock distribute_lock = redissonClient.getLock("distribute_lock");
- try {
- boolean result = distribute_lock.tryLock(3, 10, TimeUnit.SECONDS);
- } catch (InterruptedException e) {
- e.printStackTrace();
- } finally {
- if (distribute_lock.isLocked()) {
- distribute_lock.unlock();
- }
- }
复制代码
代码流程
通过 redissonClient 获取 RLock 实例
tryLock 获取尝试获取锁, 第一个是等待时间, 第二个是锁的超时时间, 第三个是时间单位
执行完业务逻辑后, 最终释放锁
二, 具体实现
我们通过 tryLock 来分析 redission 分布式的实现, lock 方法跟 tryLock 差不多, 只不过没有最长等待时间的设置, 会自旋循环等待锁的释放, 直到获取锁为止
- long time = unit.toMillis(waitTime);
- long current = System.currentTimeMillis();
- // 获取当前线程 ID, 用于实现可重入锁
- final long threadId = Thread.currentThread().getId();
- // 尝试获取锁
- Long ttl = tryAcquire(leaseTime, unit, threadId);
- // lock acquired
- if (ttl == null) {
- return true;
- }
- time -= (System.currentTimeMillis() - current);
- if (time <= 0) {
- // 等待时间结束, 返回获取失败
- acquireFailed(threadId);
- return false;
- }
- current = System.currentTimeMillis();
- // 订阅锁的队列, 等待锁被其余线程释放后通知
- final RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
- if (!await(subscribeFuture, time, TimeUnit.MILLISECONDS)) {
- if (!subscribeFuture.cancel(false)) {
- subscribeFuture.addListener(new FutureListener<RedissonLockEntry>() {
- @Override
- public void operationComplete(Future<RedissonLockEntry> future) throws Exception {
- if (subscribeFuture.isSuccess()) {
- unsubscribe(subscribeFuture, threadId);
- }
- }
- });
- }
- acquireFailed(threadId);
- return false;
- }
- try {
- time -= (System.currentTimeMillis() - current);
- if (time <= 0) {
- acquireFailed(threadId);
- return false;
- }
- while (true) {
- long currentTime = System.currentTimeMillis();
- ttl = tryAcquire(leaseTime, unit, threadId);
- // lock acquired
- if (ttl == null) {
- return true;
- }
- time -= (System.currentTimeMillis() - currentTime);
- if (time <= 0) {
- acquireFailed(threadId);
- return false;
- }
- // waiting for message, 等待订阅的队列消息
- currentTime = System.currentTimeMillis();
- if (ttl>= 0 && ttl <time) {
- getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
- } else {
- getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
- }
- time -= (System.currentTimeMillis() - currentTime);
- if (time <= 0) {
- acquireFailed(threadId);
- return false;
- }
- }
- } finally {
- unsubscribe(subscribeFuture, threadId);
- }
复制代码
代码分析
首先 tryAcquire 尝试获取锁, 若返回 ttl 为 null, 说明获取到锁了
判断等待时间是否过期, 如果过期, 直接返回获取锁失败
通过 Redis 的 Channel 订阅监听队列, subscribe 内部通过信号量 semaphore, 再通过 await 方法阻塞, 内部其实是用 CountDownLatch 来实现阻塞, 获取 subscribe 异步执行的结果, 来保证订阅成功, 再判断是否到了等待时间
再次尝试申请锁和等待时间的判断, 循环阻塞在这里等待锁释放的消息 RedissonLockEntry 也维护了一个 semaphore 的信号量
无论是否释放锁, 最终都要取消订阅这个队列消息
redission 内部的 getEntryName 是客户端实例 ID + 锁名称来保证多个实例下的锁可重入
tryAcquire 获取锁
redisssion 获取锁的核心代码, 内部其实是异步调用, 但是用 get 方法阻塞了
- private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
- return get(tryAcquireAsync(leaseTime, unit, threadId));
- }
复制代码
- private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
- if (leaseTime != -1) {
- return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
- }
- RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(LOCK_EXPIRATION_INTERVAL_SECONDS, TimeUnit.SECONDS, threadId, RedisCommands.EVAL_LONG);
- ttlRemainingFuture.addListener(new FutureListener<Long>() {
- @Override
- public void operationComplete(Future<Long> future) throws Exception {
- if (!future.isSuccess()) {
- return;
- }
- Long ttlRemaining = future.getNow();
- // lock acquired
- if (ttlRemaining == null) {
- scheduleExpirationRenewal(threadId);
- }
- }
- });
- return ttlRemainingFuture;
- }
复制代码
tryLockInnerAsync 方法内部是基于 Lua 脚本来获取锁的
先判断 KEYS[1](锁名称) 对应的 key 是否存在, 不存在获取到锁, hset 设置 key 的 value,pexpire 设置过期时间, 返回 null 表示获取到锁
存在的话, 锁被占, hexists 判断是否是当前线程的锁, 若是的话, hincrby 增加重入次数, 重新设置过期时间, 不是当前线程的锁, 返回当前锁的过期时间
- <T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
- internalLockLeaseTime = unit.toMillis(leaseTime);
- return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
- "if (redis.call('exists', KEYS[1]) == 0) then" +
- "redis.call('hset', KEYS[1], ARGV[2], 1);" +
- "redis.call('pexpire', KEYS[1], ARGV[1]);" +
- "return nil;" +
- "end;" +
- "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then" +
- "redis.call('hincrby', KEYS[1], ARGV[2], 1);" +
- "redis.call('pexpire', KEYS[1], ARGV[1]);" +
- "return nil;" +
- "end;" +
- "return redis.call('pttl', KEYS[1]);",
- Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
- }
复制代码
Redission 避免死锁的解决方案:
Redission 为了避免锁未被释放, 采用了一个特殊的解决方案, 若未设置过期时间的话, redission 默认的过期时间是 30s, 同时未避免锁在业务未处理完成之前被提前释放, Redisson 在获取到锁且默认过期时间的时候, 会在当前客户端内部启动一个定时任务, 每隔 internalLockLeaseTime/3 的时间去刷新 key 的过期时间, 这样既避免了锁提前释放, 同时如果客户端宕机的话, 这个锁最多存活 30s 的时间就会自动释放 (刷新过期时间的定时任务进程也宕机)
- // lock acquired, 获取到锁的时候设置定期更新时间的任务
- if (ttlRemaining) {
- scheduleExpirationRenewal(threadId);
- }
- //expirationRenewalMap 的并发安全 MAP 记录设置过的缓存, 避免并发情况下重复设置任务, internalLockLeaseTime / 3 的时间后重新设置过期时间
- private void scheduleExpirationRenewal(final long threadId) {
- if (expirationRenewalMap.containsKey(getEntryName())) {
- return;
- }
- Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
- @Override
- public void run(Timeout timeout) throws Exception {
- RFuture<Boolean> future = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
- "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then" +
- "redis.call('pexpire', KEYS[1], ARGV[1]);" +
- "return 1;" +
- "end;" +
- "return 0;",
- Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
- future.addListener(new FutureListener<Boolean>() {
- @Override
- public void operationComplete(Future<Boolean> future) throws Exception {
- expirationRenewalMap.remove(getEntryName());
- if (!future.isSuccess()) {
- log.error("Can't update lock "+ getName() +" expiration", future.cause());
- return;
- }
- if (future.getNow()) {
- // reschedule itself
- scheduleExpirationRenewal(threadId);
- }
- }
- });
- }
- }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
- if (expirationRenewalMap.putIfAbsent(getEntryName(), task) != null) {
- task.cancel();
- }
- }
复制代码
unlock 解锁
- protected RFuture<Boolean> unlockInnerAsync(long threadId) {
- return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
- "if (redis.call('exists', KEYS[1]) == 0) then" +
- "redis.call('publish', KEYS[2], ARGV[1]);" +
- "return 1;" +
- "end;" +
- "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then" +
- "return nil;" +
- "end;" +
- "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);" +
- "if (counter> 0) then" +
- "redis.call('pexpire', KEYS[1], ARGV[2]);" +
- "return 0;" +
- "else" +
- "redis.call('del', KEYS[1]);" +
- "redis.call('publish', KEYS[2], ARGV[1]);" +
- "return 1;"+
- "end;" +
- "return nil;",
- Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));
- }
复制代码
Redission 的 unlock 解锁也是基于 Lua 脚本实现的, 内部逻辑是先判断锁是否存在, 不存在说明已经被释放了, 发布锁释放消息后返回, 锁存在再判断当前线程是否锁拥有者, 不是的话, 无权释放返回, 解锁的话, 会减去重入的次数, 重新更新过期时间, 若重入数捡完, 删除当前 key, 发布锁释放消息
写在后面
主要基于 Redis 来设计和实现分布式锁, 通过常用的设计思路引申到 Redission 的实现, 无论是设计思路还是代码健壮性 Redission 的设计都是优秀的, 值得学习, 下一步会讲解关于 Zookeeper 的分布式锁实现和相关开源源码分析.
来源: https://juejin.im/post/5b7bcd7ce51d4538826f4684