前言
前面已经学习了 Redission 可重入锁以及公平锁的原理, 接着看看 Redission 是如何来实现 RedLock 的.
RedLock 原理
RedLock 是基于 Redis 实现的分布式锁, 它能够保证以下特性:
互斥性: 在任何时候, 只能有一个客户端能够持有锁; 避免死锁:
当客户端拿到锁后, 即使发生了网络分区或者客户端宕机, 也不会发生死锁;(利用 key 的存活时间)
容错性: 只要多数节点的 Redis 实例正常运行, 就能够对外提供服务, 加锁或者释放锁;
RedLock 算法思想, 意思是不能只在一个 Redis 实例上创建锁, 应该是在多个 Redis 实例上创建锁, n / 2 + 1, 必须在大多数 Redis 节点上都成功创建锁, 才能算这个整体的 RedLock 加锁成功, 避免说仅仅在一个 Redis 实例上加锁而带来的问题.
这里附上一个前几天对 RedLock 解析比较透彻的文章:
https://mp.weixin.qq.com/s/gOYWLg3xYt4OhS46woN_Lg
Redisson 实现原理
Redisson 中有一个 MultiLock 的概念, 可以将多个锁合并为一个大锁, 对一个大锁进行统一的申请加锁以及释放锁
而 Redisson 中实现 RedLock 就是基于 MultiLock 去做的, 接下来就具体看看对应的实现吧
RedLock 使用案例
先看下官方的代码使用:
- (https://github.com/redisson/redisson/wiki/8.-distributed-locks-and-synchronizers#84-redlock)
- RLock lock1 = redisson1.getLock("lock1");
- RLock lock2 = redisson2.getLock("lock2");
- RLock lock3 = redisson3.getLock("lock3");
- RLock redLock = anyRedisson.getRedLock(lock1, lock2, lock3);
- // traditional lock method
- redLock.lock();
- // or acquire lock and automatically unlock it after 10 seconds
- redLock.lock(10, TimeUnit.SECONDS);
- // or wait for lock aquisition up to 100 seconds
- // and automatically unlock it after 10 seconds
- boolean res = redLock.tryLock(100, 10, TimeUnit.SECONDS);
- if (res) {
- try {
- ...
- } finally {
- redLock.unlock();
- }
- }
这里是分别对 3 个 Redis 实例加锁, 然后获取一个最后的加锁结果.
RedissonRedLock 实现原理
上面示例中使用 redLock.lock() 或者 tryLock() 最终都是执行 RedissonRedLock 中方法.
RedissonRedLock 继承自 RedissonMultiLock, 实现了其中的一些方法:
- public class RedissonRedLock extends RedissonMultiLock {
- public RedissonRedLock(RLock... locks) {
- super(locks);
- }
- /**
- * 锁可以失败的次数, 锁的数量 - 锁成功客户端最小的数量
- */
- @Override
- protected int failedLocksLimit() {
- return locks.size() - minLocksAmount(locks);
- }
- /**
- * 锁的数量 / 2 + 1, 例如有 3 个客户端加锁, 那么最少需要 2 个客户端加锁成功
- */
- protected int minLocksAmount(final List<RLock> locks) {
- return locks.size()/2 + 1;
- }
- /**
- * 计算多个客户端一起加锁的超时时间, 每个客户端的等待时间
- * remainTime 默认为 4.5s
- */
- @Override
- protected long calcLockWaitTime(long remainTime) {
- return Math.max(remainTime / locks.size(), 1);
- }
- @Override
- public void unlock() {
- unlockInner(locks);
- }
- }
看到 locks.size()/2 + 1 , 例如我们有 3 个客户端实例, 那么最少 2 个实例加锁成功才算分布式锁加锁成功.
接着我们看下 lock() 的具体实现
RedissonMultiLock 实现原理
- public class RedissonMultiLock implements Lock {
- final List<RLock> locks = new ArrayList<RLock>();
- public RedissonMultiLock(RLock... locks) {
- if (locks.length == 0) {
- throw new IllegalArgumentException("Lock objects are not defined");
- }
- this.locks.addAll(Arrays.asList(locks));
- }
- public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
- long newLeaseTime = -1;
- if (leaseTime != -1) {
- // 如果等待时间设置了, 那么将等待时间 * 2
- newLeaseTime = unit.toMillis(waitTime)*2;
- }
- // time 为当前时间戳
- long time = System.currentTimeMillis();
- long remainTime = -1;
- if (waitTime != -1) {
- remainTime = unit.toMillis(waitTime);
- }
- // 计算锁的等待时间, RedLock 中: 如果 remainTime=-1, 那么 lockWaitTime 为 1
- long lockWaitTime = calcLockWaitTime(remainTime);
- // RedLock 中 failedLocksLimit 即为 n/2 + 1
- int failedLocksLimit = failedLocksLimit();
- List<RLock> acquiredLocks = new ArrayList<RLock>(locks.size());
- // 循环每个 Redis 客户端, 去获取锁
- for (ListIterator<RLock> iterator = locks.listIterator(); iterator.hasNext();) {
- RLock lock = iterator.next();
- boolean lockAcquired;
- try {
- // 调用 tryLock 方法去获取锁, 如果获取锁成功, 则 lockAcquired=true
- if (waitTime == -1 && leaseTime == -1) {
- lockAcquired = lock.tryLock();
- } else {
- long awaitTime = Math.min(lockWaitTime, remainTime);
- lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS);
- }
- } catch (Exception e) {
- lockAcquired = false;
- }
- // 如果获取锁成功, 将锁加入到 list 集合中
- if (lockAcquired) {
- acquiredLocks.add(lock);
- } else {
- // 如果获取锁失败, 判断失败次数是否等于失败的限制次数
- // 比如, 3 个 Redis 客户端, 最多只能失败 1 次
- // 这里 locks.size = 3, 3-x=1, 说明只要成功了 2 次就可以直接 break 掉循环
- if (locks.size() - acquiredLocks.size() == failedLocksLimit()) {
- break;
- }
- // 如果最大失败次数等于 0
- if (failedLocksLimit == 0) {
- // 释放所有的锁, RedLock 加锁失败
- unlockInner(acquiredLocks);
- if (waitTime == -1 && leaseTime == -1) {
- return false;
- }
- failedLocksLimit = failedLocksLimit();
- acquiredLocks.clear();
- // 重置迭代器 重试再次获取锁
- while (iterator.hasPrevious()) {
- iterator.previous();
- }
- } else {
- // 失败的限制次数减一
- // 比如 3 个 Redis 实例, 最大的限制次数是 1, 如果遍历第一个 Redis 实例, 失败了, 那么 failedLocksLimit 会减成 0
- // 如果 failedLocksLimit 就会走上面的 if 逻辑, 释放所有的锁, 然后返回 false
- failedLocksLimit--;
- }
- }
- if (remainTime != -1) {
- remainTime -= (System.currentTimeMillis() - time);
- time = System.currentTimeMillis();
- if (remainTime <= 0) {
- unlockInner(acquiredLocks);
- return false;
- }
- }
- }
- if (leaseTime != -1) {
- List<RFuture<Boolean>> futures = new ArrayList<RFuture<Boolean>>(acquiredLocks.size());
- for (RLock rLock : acquiredLocks) {
- RFuture<Boolean> future = rLock.expireAsync(unit.toMillis(leaseTime), TimeUnit.MILLISECONDS);
- futures.add(future);
- }
- for (RFuture<Boolean> rFuture : futures) {
- rFuture.syncUninterruptibly();
- }
- }
- return true;
- }
- }
核心代码都已经加了注释, 实现原理其实很简单, 基于 RedLock 思想, 遍历所有的 Redis 客户端, 然后依次加锁, 最后统计成功的次数来判断是否加锁成功.
申明
本文章首发自本人博客: https://www.cnblogs.com/wang-meng 和公众号: 壹枝花算不算浪漫, 如若转载请标明来源!
感兴趣的小伙伴可关注个人公众号: 壹枝花算不算浪漫
来源: https://www.cnblogs.com/wang-meng/p/12536660.html