这里有新鲜出炉的 Redis 官方指南, 程序狗速度看过来!
Redis Key-Value 数据库
Redis 是一个开源的使用 ANSI C 语言编写, 支持网络, 可基于内存亦可持久化的日志型, Key-Value 数据库, 并提供多种语言的 API.
本文将详细介绍 redisson 实现分布式锁原理. 具有很好的参考价值, 下面跟着小编一起来看下吧
Redisson 分布式锁
之前的基于注解的锁有一种锁是基本 redis 的分布式锁, 锁的实现我是基于 redisson 组件提供的 RLock, 这篇来看看 redisson 是如何实现锁的.
不同版本实现锁的机制并不相同
引用的 redisson 最近发布的版本 3.2.3, 不同的版本可能实现锁的机制并不相同, 早期版本好像是采用简单的 setnx,getset 等常规命令来配置完成, 而后期由于 redis 支持了脚本 Lua 变更了实现原理.
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.2.3</version>
</dependency>
setnx 需要配合 getset 以及事务来完成, 这样才能比较好的避免死锁问题, 而新版本由于支持 lua 脚本, 可以避免使用事务以及操作多个 redis 命令, 语义表达更加清晰一些.
RLock 接口的特点继承标准接口 Lock
拥有标准锁接口的所有特性, 比如 lock,unlock,trylock 等等.
扩展标准接口 Lock
扩展了很多方法, 常用的主要有: 强制锁释放, 带有效期的锁, 还有一组异步的方法. 其中前面两个方法主要是解决标准 lock 可能造成的死锁问题. 比如某个线程获取到锁之后, 线程所在机器死机, 此时获取了锁的线程无法正常释放锁导致其余的等待锁的线程一直等待下去.
可重入机制
各版本实现有差异, 可重入主要考虑的是性能, 同一线程在未释放锁时如果再次申请锁资源不需要走申请流程, 只需要将已经获取的锁继续返回并且记录上已经重入的次数即可, 与 jdk 里面的 ReentrantLock 功能类似. 重入次数靠 hincrby 命令来配合使用, 详细的参数下面的代码.
怎么判断是同一线程? redisson 的方案是, RedissonLock 实例的一个 guid 再加当前线程的 id, 通过 getLockName 返回
public class RedissonLock extends RedissonExpirable implements RLock {
final UUID id;
protected RedissonLock(CommandExecutor commandExecutor, String name, UUID id) {
super(commandExecutor, name);
this.internalLockLeaseTime = TimeUnit.SECONDS.toMillis(30L);
this.commandExecutor = commandExecutor;
this.id = id;
}
String getLockName(long threadId) {
return this.id + ":" + threadId;
}
RLock 获取锁的两种场景
这里拿 tryLock 的源码来看: tryAcquire 方法是申请锁并返回锁有效期还剩余的时间, 如果为空说明锁未被其它线程申请直接获取并返回, 如果获取到时间, 则进入等待竞争逻辑.
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
long time = unit.toMillis(waitTime);
long current = System.currentTimeMillis();
final long threadId = Thread.currentThread().getId();
Long ttl = this.tryAcquire(leaseTime, unit);
if (ttl == null) {
// 直接获取到锁
return true;
} else {
// 有竞争的后续看
}
}
无竞争, 直接获取锁
先看下首先获取锁并释放锁背后的 redis 都在做什么, 可以利用 redis 的 monitor 来在后台监控 redis 的执行情况. 当我们在方法了增加 @RequestLockable 之后, 其实就是调用 lock 以及 unlock, 下面是 redis 命令:
加锁
由于高版本的 redis 支持 lua 脚本, 所以 redisson 也对其进行了支持, 采用了脚本模式, 不熟悉 lua 脚本的可以去查找下. 执行 lua 命令的逻辑如下:
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
this.internalLockLeaseTime = unit.toMillis(leaseTime);
return this.commandExecutor.evalWriteAsync(this.getName(), LongCodec.INSTANCE, command, "if (redis.call(\'exists\', KEYS[1]) == 0) then redis.call(\'hset\', KEYS[1], ARGV[2], 1); redis.call(\'pexpire\', KEYS[1], ARGV[1]); return nil; end; if (redis.call(\'hexists\', KEYS[1], ARGV[2]) == 1) then redis.call(\'hincrby\', KEYS[1], ARGV[2], 1); redis.call(\'pexpire\', KEYS[1], ARGV[1]); return nil; end; return redis.call(\'pttl\', KEYS[1]);", Collections.singletonList(this.getName()), new Object[]{Long.valueOf(this.internalLockLeaseTime), this.getLockName(threadId)});
}
加锁的流程:
判断 lock 键是否存在, 不存在直接调用 hset 存储当前线程信息并且设置过期时间, 返回 nil, 告诉客户端直接获取到锁.
判断 lock 键是否存在, 存在则将重入次数加 1, 并重新设置过期时间, 返回 nil, 告诉客户端直接获取到锁.
被其它线程已经锁定, 返回锁有效期的剩余时间, 告诉客户端需要等待.
"EVAL"
"if (redis.call('exists', KEYS[1]) == 0) then
redis.call('hset', KEYS[1], ARGV[2], 1);
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil; end;
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
redis.call('hincrby', KEYS[1], ARGV[2], 1);
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil; end;
return redis.call('pttl', KEYS[1]);""1" "lock.com.csp.product.api.service.ProductAppService.searchProductByPage#0"
"1000" "346e1eb8-5bfd-4d49-9870-042df402f248:21"
上面的 lua 脚本会转换成真正的 redis 命令, 下面的是经过 lua 脚本运算之后实际执行的 redis 命令.
1486642677.053488 [0 lua] "exists" "lock.com.csp.product.api.service.ProductAppService.searchProductByPage#0"
1486642677.053515 [0 lua] "hset" "lock.com.csp.product.api.service.ProductAppService.searchProductByPage#0"
"346e1eb8-5bfd-4d49-9870-042df402f248:21" "1"
1486642677.053540 [0 lua] "pexpire" "lock.com.csp.product.api.service.ProductAppService.searchProductByPage#0" "1000"
解锁
解锁的流程看起来复杂些:
如果 lock 键不存在, 发消息说锁已经可用
如果锁不是被当前线程锁定, 则返回 nil
由于支持可重入, 在解锁时将重入次数需要减 1
如果计算后的重入次数 > 0, 则重新设置过期时间
如果计算后的重入次数 <=0, 则发消息说锁已经可用
"EVAL"
"if (redis.call('exists', KEYS[1]) == 0) then
redis.call('publish', KEYS[2], ARGV[1]);
return 1; end;
if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then
return nil;end;
local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);
if (counter > 0) then redis.call('pexpire', KEYS[1], ARGV[2]); return 0;
else redis.call('del', KEYS[1]); redis.call('publish', KEYS[2], ARGV[1]); return 1; end;
return nil;""2" "lock.com.csp.product.api.service.ProductAppService.searchProductByPage#0"
"redisson_lock__channel:{lock.com.csp.product.api.service.ProductAppService.searchProductByPage#0}"
"0" "1000"
"346e1eb8-5bfd-4d49-9870-042df402f248:21"
无竞争情况下解锁 redis 命令:
主要是发送一个解锁的消息, 以此唤醒等待队列中的线程重新竞争锁.
1486642678.493691 [0 lua] "exists" "lock.com.csp.product.api.service.ProductAppService.searchProductByPage#0"
1486642678.493712 [0 lua] "publish" "redisson_lock__channel:{lock.com.csp.product.api.service.ProductAppService.searchProductByPage#0}" "0"
有竞争, 等待
有竞争的情况在 redis 端的 lua 脚本是相同的, 只是不同的条件执行不同的 redis 命令, 复杂的在 redisson 的源码上. 当通过 tryAcquire 发现锁被其它线程申请时, 需要进入等待竞争逻辑中.
this.await 返回 false, 说明等待时间已经超出获取锁最大等待时间, 取消订阅并返回获取锁失败
this.await 返回 true, 进入循环尝试获取锁.
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
long time = unit.toMillis(waitTime);
long current = System.currentTimeMillis();
final long threadId = Thread.currentThread().getId();
Long ttl = this.tryAcquire(leaseTime, unit);
if (ttl == null) {
return true;
} else {
// 重点是这段
time -= System.currentTimeMillis() - current;
if (time <= 0L) {
return false;
} else {
current = System.currentTimeMillis();
final RFuture subscribeFuture = this.subscribe(threadId);
if (!this.await(subscribeFuture, time, TimeUnit.MILLISECONDS)) {
if (!subscribeFuture.cancel(false)) {
subscribeFuture.addListener(new FutureListener() {
public void operationComplete(Future < RedissonLockEntry > future) throws Exception {
if (subscribeFuture.isSuccess()) {
RedissonLock.this.unsubscribe(subscribeFuture, threadId);
}
}
});
}
return false;
} else {
boolean var16;
try {
time -= System.currentTimeMillis() - current;
if (time <= 0L) {
boolean currentTime1 = false;
return currentTime1;
}
do {
long currentTime = System.currentTimeMillis();
ttl = this.tryAcquire(leaseTime, unit);
if (ttl == null) {
var16 = true;
return var16;
}
time -= System.currentTimeMillis() - currentTime;
if (time <= 0L) {
var16 = false;
return var16;
}
currentTime = System.currentTimeMillis();
if (ttl.longValue() >= 0L && ttl.longValue() < time) {
this.getEntry(threadId).getLatch().tryAcquire(ttl.longValue(), TimeUnit.MILLISECONDS);
} else {
this.getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
}
time -= System.currentTimeMillis() - currentTime;
} while ( time > 0L );
var16 = false;
} finally {
this.unsubscribe(subscribeFuture, threadId);
}
return var16;
}
}
}
}
循环尝试一般有如下几种方法:
while 循环, 一次接着一次的尝试, 这个方法的缺点是会造成大量无效的锁申请.
Thread.sleep, 在上面的 while 方案中增加睡眠时间以降低锁申请次数, 缺点是这个睡眠的时间设置比较难控制.
基于信息量, 当锁被其它资源占用时, 当前线程订阅锁的释放事件, 一旦锁释放会发消息通知待等待的锁进行竞争, 有效的解决了无效的锁申请情况. 核心逻辑是 this.getEntry(threadId).getLatch().tryAcquire,this.getEntry(threadId).getLatch() 返回的是一个信号量, 有兴趣可以再研究研究.
redisson 依赖
由于 redisson 不光是针对锁, 提供了很多客户端操作 redis 的方法, 所以会依赖一些其它的框架, 比如 netty, 如果只是简单的使用锁也可以自己去实现.
以上就是本文的全部内容, 希望本文的内容对大家的学习或者工作能带来一定的帮助, 同时也希望多多支持 PHPERZ!
来源: http://www.phperz.com/article/18/0128/358530.html