前言
主流的分布式锁一般有三种实现方式:
数据库乐观锁
基于 Redis 的分布式锁
基于 ZooKeeper 的分布式锁
之前我在博客上写过关于 MySQL 和 Redis 实现分布式锁的具体方案:
https://www.cnblogs.com/wang-meng/p/10226618.html
里面主要是从实现原理出发.
这次 [分布式锁] 系列文章主要是深入 Redis 客户端 reddision 源码和 zk 这两种分布式锁的实现原理.
可靠性
首先, 为了确保分布式锁可用, 我们至少要确保锁的实现同时满足以下四个条件:
互斥性. 在任意时刻, 只有一个客户端能持有锁.
不会发生死锁. 即使有一个客户端在持有锁的期间崩溃而没有主动解锁, 也能保证后续其他客户端能加锁.
具有容错性. 只要大部分的 Redis 节点正常运行, 客户端就可以加锁和解锁.
解铃还须系铃人. 加锁和解锁必须是同一个客户端, 客户端自己不能把别人加的锁给解了.
Redisson 加锁原理
redisson 是一个非常强大的开源的 Redis 客户端框架, 官方地址:
https://redisson.org/
使用起来很简单, 配置好 maven 和连接信息, 这里直接看代码实现:
- RLock lock = redisson.getLock("anyLock");
- lock.lock();
- lock.unlock();
redisson 具体的执行加锁逻辑都是通过 lua 脚本来完成的, lua 脚本能够保证原子性.
先看下 RLock 初始化的代码:
- public class Redisson implements RedissonClient {
- @Override
- public RLock getLock(String name) {
- return new RedissonLock(connectionManager.getCommandExecutor(), name);
- }
- }
- public class RedissonLock extends RedissonExpirable implements RLock {
- public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {
- super(commandExecutor, name);
- this.commandExecutor = commandExecutor;
- this.id = commandExecutor.getConnectionManager().getId();
- this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
- this.entryName = id + ":" + name;
- }
首先看下 RedissonLock 的 id 返回的是一个 UUID 对象, 每个机器都对应一个自己的 id 属性, id 值就类似于:"8743c9c0-0795-4907-87fd-6c719a6b4586"
接着往后看 lock() 的代码实现:
- public class RedissonLock extends RedissonExpirable implements RLock {
- @Override
- public void lock() {
- try {
- lockInterruptibly();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- }
- @Override
- public void lockInterruptibly() throws InterruptedException {
- lockInterruptibly(-1, null);
- }
- @Override
- public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
- // 获取当前线程 id
- long threadId = Thread.currentThread().getId();
- Long ttl = tryAcquire(leaseTime, unit, threadId);
- // lock acquired
- if (ttl == null) {
- return;
- }
- RFuture<RedissonLockEntry> future = subscribe(threadId);
- commandExecutor.syncSubscription(future);
- try {
- while (true) {
- ttl = tryAcquire(leaseTime, unit, threadId);
- // lock acquired
- if (ttl == null) {
- break;
- }
- // waiting for message
- if (ttl >= 0) {
- getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
- } else {
- getEntry(threadId).getLatch().acquire();
- }
- }
- } finally {
- unsubscribe(future, threadId);
- }
- }
- <T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
- internalLockLeaseTime = unit.toMillis(leaseTime);
- return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
- "if (redis.call('exists', KEYS[1]) == 0) then" +
- "redis.call('hset', KEYS[1], ARGV[2], 1);" +
- "redis.call('pexpire', KEYS[1], ARGV[1]);" +
- "return nil;" +
- "end;" +
- "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then" +
- "redis.call('hincrby', KEYS[1], ARGV[2], 1);" +
- "redis.call('pexpire', KEYS[1], ARGV[1]);" +
- "return nil;" +
- "end;" +
- "return redis.call('pttl', KEYS[1]);",
- Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
- }
- }
这里省略了一些中间代码, 这里主要看 tryAcquire() 方法, 这里传递的过期时间为 - 1, 然后就是当前的线程 id, 接着就是核心的 lua 脚本执行流程, 我们来一步步看看是如何执行的:
- "if (redis.call('exists', KEYS[1]) == 0) then" +
- "redis.call('hset', KEYS[1], ARGV[2], 1);" +
- "redis.call('pexpire', KEYS[1], ARGV[1]);" +
- "return nil;" +
- "end;" +
KEYS[1] 参数是:"anyLock"
ARGV[2] 是:"id +":"+ threadId"
首先用的 exists 判断 Redis 中是否存在当前 key, 如果不存在就等于 0, 然后执行 hset 指令, 将 "anyLock id:threadId 1" 存储到 Redis 中, 最终 Redis 存储的数据类似于:
- {
- "8743c9c0-0795-4907-87fd-6c719a6b4586:1":1
- }
偷偷说一句, 最后面的一个 1 是为了后面可重入做的计数统计, 后面会有讲解到.
接着往下看, 然后使用 pexpire 设置过期时间, 默认使用 internalLockLeaseTime 为 30s. 最后返回为 null, 即时加锁成功.
Redisson 可重入原理
我们看下锁 key 存在的情况下, 同一个机器同一个线程如何加锁的?
- "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then" +
- "redis.call('hincrby', KEYS[1], ARGV[2], 1);" +
- "redis.call('pexpire', KEYS[1], ARGV[1]);" +
- "return nil;" +
- "end;" +
- "return redis.call('pttl', KEYS[1]);",
ARGV[2] 是:"id +":"+ threadId"
如果同一个机器同一个线程再次来请求, 这里就会是 1, 然后执行 hincrby, hset 设置的 value+1 变成了 2, 然后继续设置过期时间.
同理, 一个线程重入后, 解锁时 value - 1
Redisson watchDog 原理
如果一个场景: 现在有 A,B 在执行业务, A 加了分布式锁, 但是生产环境是各种变化的, 如果万一 A 锁超时了, 但是 A 的业务还在跑. 而这时由于 A 锁超时释放, B 拿到锁, B 执行业务逻辑. 这样分布式锁就失去了意义?
所以 Redisson 引入了 watch dog 的概念, 当 A 获取到锁执行后, 如果锁没过期, 有个后台线程会自动延长锁的过期时间, 防止因为业务没有执行完而锁过期的情况.
我们接着来看看具体实现:
- private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
- if (leaseTime != -1) {
- return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
- }
- RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
- ttlRemainingFuture.addListener(new FutureListener<Long>() {
- @Override
- public void operationComplete(Future<Long> future) throws Exception {
- if (!future.isSuccess()) {
- return;
- }
- Long ttlRemaining = future.getNow();
- // lock acquired
- if (ttlRemaining == null) {
- scheduleExpirationRenewal(threadId);
- }
- }
- });
- return ttlRemainingFuture;
- }
当我们 tryLockInnerAsync 执行完之后, 会添加一个监听器, 看看监听器中的具体实现:
- protected RFuture<Boolean> renewExpirationAsync(long threadId) {
- return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
- "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then" +
- "redis.call('pexpire', KEYS[1], ARGV[1]);" +
- "return 1;" +
- "end;" +
- "return 0;",
- Collections.<Object>singletonList(getName()),
- internalLockLeaseTime, getLockName(threadId));
- }
这里面调度任务每隔 10s 钟执行一次, lua 脚本中是续约过期时间, 使得当前线程持有的锁不会因为过期时间到了而失效
01_redisson watchdog_.PNG
Redisson 互斥性原理
还是看上面执行加锁的 lua 脚本, 最后会执行到:
1"return redis.call('pttl', KEYS[1]);",
返回锁还有多久时间过期, 我们继续接着看代码:
- @Override
- public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
- long threadId = Thread.currentThread().getId();
- Long ttl = tryAcquire(leaseTime, unit, threadId);
- // 返回 ttl 说明加锁成功, 不为空则是加锁失败
- if (ttl == null) {
- return;
- }
- RFuture<RedissonLockEntry> future = subscribe(threadId);
- commandExecutor.syncSubscription(future);
- try {
- // 死循环去尝试获取锁
- while (true) {
- // 再次尝试加锁
- ttl = tryAcquire(leaseTime, unit, threadId);
- // 如果 ttl=null 说明抢占锁成功
- if (ttl == null) {
- break;
- }
- // ttl 大于 0, 抢占锁失败, 这个里面涉及到 Semaphore, 后续会讲解
- if (ttl >= 0) {
- getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
- } else {
- getEntry(threadId).getLatch().acquire();
- }
- }
- } finally {
- unsubscribe(future, threadId);
- }
- }
Redisson 锁释放原理
直接看 lua 代码:
- protected RFuture<Boolean> unlockInnerAsync(long threadId) {
- return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
- // 判断锁 key 值是否存在
- "if (redis.call('exists', KEYS[1]) == 0) then" +
- "redis.call('publish', KEYS[2], ARGV[1]);" +
- "return 1;" +
- "end;" +
- // 判断当前机器, 当前线程 id 对应的 key 是否存在
- "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then" +
- "return nil;" +
- "end;" +
- // 计数器数量 - 1 可重入锁
- "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);" +
- // 如果计数器大于 0, 说明还在持有锁
- "if (counter > 0) then" +
- "redis.call('pexpire', KEYS[1], ARGV[2]);" +
- "return 0;" +
- "else" +
- // 使用 del 指令删除 key
- "redis.call('del', KEYS[1]);" +
- "redis.call('publish', KEYS[2], ARGV[1]);" +
- "return 1;"+
- "end;" +
- "return nil;",
- Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));
- }
总结
一图总结:
01_redission 可重入锁实现原理. jpg
来源: https://www.cnblogs.com/wang-meng/p/12525029.html