1|0 前言
现如今很多系统都会基于分布式或微服务思想完成对系统的架构设计. 那么在这一个系统中, 就会存在若干个微服务, 而且服务间也会产生相互通信调用. 那么既然产生了服务调用, 就必然会存在服务调用延迟或失败的问题. 当出现这种问题, 服务端会进行重试等操作或客户端有可能会进行多次点击提交. 如果这样请求多次的话, 那最终处理的数据结果就一定要保证统一, 如支付场景. 此时就需要通过保证业务幂等性方案来完成.
image.PNG
2|0 什么是幂等
幂等本身是一个数学概念. 即 f(n) = 1^n , 无论 n 为多少, f(n) 的值永远为 1. 在编程开发中, 对于幂等的定义为: 无论对某一个资源操作了多少次, 其影响都应是相同的. 换句话说就是: 在接口重复调用的情况下, 对系统产生的影响是一样的, 但是返回值允许不同, 如查询.
幂等性不仅仅只是一次或多次操作对资源没有产生影响, 还包括第一次操作产生影响后, 以后多次操作不会再产生影响. 并且幂等关注的是是否对资源产生影响, 而不关注结果.
以 SQL 为例:
select * from table where id=1 . 此 SQL 无论执行多少次, 虽然结果有可能出现不同, 都不会对数据产生
改变, 具备幂等性.
insert into table(id,name) values(1,'heima') . 此 SQL 如果 id 或 name 有唯一性约束, 多次操作只允许插
入一条记录, 则具备幂等性. 如果不是, 则不具备幂等性, 多次操作会产生多条数据.
update table set score=100 where id = 1 . 此 SQL 无论执行多少次, 对数据产生的影响都是相同的. 具备
幂等性.
update table set score=50+score where id = 1 . 此 SQL 涉及到了计算, 每次操作对数据都会产生影响.
不具备幂等性.
delete from table where id = 1 . 此 SQL 多次操作, 产生的结果相同, 具备幂等性.
幂等性设计主要从两个维度进行考虑: 空间, 时间.
空间: 定义了幂等的范围, 如生成订单的话, 不允许出现重复下单.
时间: 定义幂等的有效期. 有些业务需要永久性保证幂等, 如下单, 支付等. 而部分业务只要保证一段时间
幂等即可.
同时对于幂等的使用一般都会伴随着出现锁的概念, 用于解决并发安全问题.
3|0 接口幂等
对于幂等的考虑, 主要解决两点前后端交互与服务间交互. 这两点有时都要考虑幂等性的实现. 从前端的思路解决
的话, 主要有三种: 前端防重, PRG 模式, Token 机制.
2.1) 前端防重
通过前端防重保证幂等是最简单的实现方式, 前端相关属性和 JS 代码即可完成设置. 可靠性并不好, 有经验的人员可以通过工具跳过页面仍能重复提交. 主要适用于表单重复提交或按钮重复点击.
2.2)PRG 模式
PRG 模式即 POST-REDIRECT-GET. 当用户进行表单提交时, 会重定向到另外一个提交成功页面, 而不是停留在原先的表单页面. 这样就避免了用户刷新导致重复提交. 同时防止了通过浏览器按钮前进 / 后退导致表单重复提交.
是一种比较常见的前端防重策略.
2.3)token 机制
2.3.1) 方案介绍
通过 token 机制来保证幂等是一种非常常见的解决方案, 同时也适合绝大部分场景. 该方案需要前后端进行一定程度的交互来完成.
image.PNG
1) 服务端提供获取 token 接口, 供客户端进行使用. 服务端生成 token 后, 如果当前为分布式架构, 将 token 存放于 Redis 中, 如果是单体架构, 可以保存在 jvm 缓存中.
2) 当客户端获取到 token 后, 会携带着 token 发起请求.
3) 服务端接收到客户端请求后, 首先会判断该 token 在 Redis 中是否存在. 如果存在, 则完成进行业务处理, 业务处理完成后, 再删除 token. 如果不存在, 代表当前请求是重复请求, 直接向客户端返回对应标识.
但是现在有一个问题, 当前是先执行业务再删除 token. 在高并发下, 很有可能出现第一次访问时 token 存在, 完成具体业务操作. 但在还没有删除 token 时, 客户端又携带 token 发起请求, 此时, 因为 token 还存在, 第二次请求也会验证通过, 执行具体业务操作.
对于这个问题的解决方案的思想就是并行变串行. 会造成一定性能损耗与吞吐量降低.
第一种方案: 对于业务代码执行和删除 token 整体加线程锁. 当后续线程再来访问时, 则阻塞排队.
第二种方案: 借助 Redis 单线程和 incr 是原子性的特点. 当第一次获取 token 时, 以 token 作为 key, 对其进行自增.
然后将 token 进行返回, 当客户端携带 token 访问执行业务代码时, 对于判断 token 是否存在不用删除, 而是对其继续 incr. 如果 incr 后的返回值为 2. 则是一个合法请求允许执行, 如果是其他值, 则代表是非法请求, 直接返回.
那如果先删除 token 再执行业务呢? 其实也会存在问题, 假设具体业务代码执行超时或失败, 没有向客户端返回明确结果, 那客户端就很有可能会进行重试, 但此时之前的 token 已经被删除了, 则会被认为是重复请求, 不再进行业务处理.
这种方案无需进行额外处理, 一个 token 只能代表一次请求. 一旦业务执行出现异常, 则让客户端重新获取令牌, 重新发起一次访问即可. 推荐使用先删除 token 方案
但是无论先删 token 还是后删 token, 都会有一个相同的问题. 每次业务请求都回产生一个额外的请求去获取 token. 但是, 业务失败或超时, 在生产环境下, 一万个里最多也就十个左右会失败, 那为了这十来个请求, 让其他九千九百多个请求都产生额外请求, 就有一些得不偿失了. 虽然 Redis 性能好, 但是这也是一种资源的浪费.
4|0 服务幂等
4|1 防重表
对于防止数据重复提交, 还有一种解决方案就是通过防重表实现. 防重表的实现思路也非常简单. 首先创建一张表
作为防重表, 同时在该表中建立一个或多个字段的唯一索引作为防重字段, 用于保证并发情况下, 数据只有一条.
在向业务表中插入数据之前先向防重表插入, 如果插入失败则表示是重复数据.
对于防重表的解决方案, 可能有人会说为什么不使用悲观锁. 悲观锁在使用的过程中也是会发生死锁的. 悲观锁是
通过锁表的方式实现的. 假设现在一个用户 A 访问表 A(锁住了表 A), 然后试图访问表 B; 另一个用户 B 访问表
B(锁住了表 B), 然后试图访问表 A. 这时对于用户 A 来说, 由于表 B 已经被用户 B 锁住了, 所以用户 A 必须等到用
户 B 释放表 B 才能访问. 同时对于用户 B 来说, 由于表 A 已经被用户 A 锁住了, 所以用户 B 必须等到用户 A 释放表 A 才
能访问. 此时死锁就已经产生了.
4|2Mysql 乐观锁保证幂等
MySQL 乐观锁是基于数据库完成分布式锁的一种实现, 实现的方式有两种: 基于版本号, 基于条件. 但是实现思
想都是基于 MySQL 的行锁思想来实现的.
image.PNG
通过版本号控制是一种非常常见的方式, 适合于大多数场景. 但现在库存扣减的场景来说, 通过版本号控制就是多
人并发访问购买时, 查询时显示可以购买, 但最终只有一个人能成功, 这也是不可以的. 其实最终只要商品库存不
发生超卖就可以. 那此时就可以通过条件来进行控制.
MySQL 乐观锁更适用于一些需要计数的表上, 而且在竞争不激烈, 出现并发冲突几率较小时, 推荐使用乐观锁. 虽
然通过 MySQL 乐观锁可以完成并发控制, 但锁的操作是直接作用于数据库上, 这样就会在一定程度上对数据库性能产生影响. 并且 MySQL 的连接数量是有限的, 如果出现大量锁操作占用连接时, 也会造成 MySQL 的性能瓶颈.
4|3zookeeper 分布式锁
实现思想
对于分布式锁的实现, zookeeper 天然携带的一些特性能够很完美的实现分布式锁. 其内部主要是利用 znode 节点
特性和 watch 机制完成.
在 zookeeper 中节点会分为四类, 分别是:
持久节点: 一旦创建, 则永久存在于 zookeeper 中, 除非手动删除.
持久有序节点: 一旦创建, 则永久存在于 zookeeper 中, 除非手动删除. 同时每个节点都会默认存在节点序号, 每个节点的序号都是有序递增的. 如 demo000001,demo000002.....demo00000N.
临时节点: 当节点创建后, 一旦服务器重启或宕机, 则被自动删除.
临时有序节点: 当节点创建后, 一旦服务器重启或宕机, 则被自动删除. 同时每个节点都会默认存在节点序号, 每个节点的序号都是有序递增的. 如 demo000001,demo000002.....demo00000N.
watch 监听机制
watch 监听机制主要用于监听节点状态变更, 用于后续事件触发, 假设当 B 节点监听 A 节点时, 一旦 A 节点发生修
改, 删除, 子节点列表发生变更等事件, B 节点则会收到 A 节点改变的通知, 接着完成其他额外事情.
image.PNG
实现原理
其实现思想是当某个线程要对方法加锁时, 首先会在 zookeeper 中创建一个与当前方法对应的父节点, 接着每个要
获取当前方法的锁的线程, 都会在父节点下创建一个临时有序节点, 因为节点序号是递增的, 所以后续要获取锁的
线程在 zookeeper 中的序号也是逐次递增的. 根据这个特性, 当前序号最小的节点一定是首先要获取锁的线程, 因
此可以规定序号最小的节点获得锁. 所以, 每个线程再要获取锁时, 可以判断自己的节点序号是否是最小的, 如果
是则获取到锁. 当释放锁时, 只需将自己的临时有序节点删除即可.
image.PNG
在并发下, 每个线程都会在对应方法节点下创建属于自己的临时节点, 且每个节点都是临时且有序的.
那么 zookeeper 又是如何有序的将锁分配给不同线程呢? 这里就应用到了 watch 监听机制. 每当添加一个新的临时
节点时, 其都会基于 watcher 机制监听着它本身的前一个节点等待前一个节点的通知, 当前一个节点删除时, 就轮
到它来持有锁了. 然后依次类推.
优缺点
1)zookeeper 是基于 cp 模式, 能够保证数据强一致性.
2) 基于 watch 机制实现锁释放的自动监听, 锁操作性能较好.
3) 频繁创建节点, 对于 zk 服务器压力较大, 吞吐量没有 Redis 强.
原理剖析
低效锁思想
在通过 zookeeper 实现分布式锁时, 有另外一种实现的写法, 这种也是非常常见的, 但是它的效率并不高, 此处可
以先对这种实现方式进行探讨.
image.PNG
此种实现方式, 只会存在一个锁节点. 当创建锁节点时, 如果锁节点不存在, 则创建成功, 代表当前线程获取到
锁, 如果创建锁节点失败, 代表已经有其他线程获取到锁, 则该线程会监听锁节点的释放. 当锁节点释放后, 则继
续尝试创建锁节点加锁.
这种方案的低效点就在于, 只有一个锁节点, 其他线程都会监听同一个锁节点, 一旦锁节点释放后, 其他线程都会
收到通知, 然后竞争获取锁节点. 这种大量的通知操作会严重降低 zookeeper 性能, 对于这种由于一个被 watch 的
znode 节点的变化, 而造成大量的通知操作, 叫做羊群效应.
高效锁思想
为了避免羊群效应的出现, 业界内普遍的解决方案就是, 让获取锁的线程产生排队, 后一个监听前一个, 依次排
序. 推荐使用这种方式实现分布式锁
image.PNG
按照上述流程会在根节点下为每一个等待获取锁的线程创建一个对应的临时有序节点, 序号最小的节点会持有锁,
并且后一个节点只监听其前面的一个节点, 从而可以让获取锁的过程有序且高效.
代码实现
- public abstract class AbstractLock {
- //zookeeper 服务器地址
- public static final String ZK_SERVER_ADDR="192.168.200.131:2181";
- //zookeeper 超时时间
- public static final int CONNECTION_TIME_OUT=30000;
- public static final int SESSION_TIME_OUT=30000;
- // 创建 zk 客户端
- protected ZkClient zkClient = new
- ZkClient(ZK_SERVER_ADDR,SESSION_TIME_OUT,CONNECTION_TIME_OUT);
- /**
- * 获取锁
- * @return
- */
- public abstract boolean tryLock();
- /**
- * 等待加锁
- */
- public abstract void waitLock();
- /**
- * 释放锁
- */
- public abstract void releaseLock();
- public void getLock() {
- String threadName = Thread.currentThread().getName();
- if (tryLock()) {
- System.out.println(threadName+": 获取锁成功");
- }else {
- System.out.println(threadName+": 获取锁失败, 等待中");
- // 等待锁
- waitLock();
- getLock();
- }
- }
- }
- public class HighLock extends AbstractLock{
- private static final String PARENT_NODE_PATH="/high_lock";
- // 当前节点路径
- private String currentNodePath;
- // 前一个节点的路径
- private String preNodePath;
- private CountDownLatch countDownLatch;
- @Override
- public boolean tryLock() {
- // 判断父节点是否存在
- if (!zkClient.exists(PARENT_NODE_PATH)){
- // 不存在
- zkClient.createPersistent(PARENT_NODE_PATH);
- }
- // 创建第一个临时有序子节点
- if (currentNodePath == null || "".equals(currentNodePath)){
- // 根节点下没有节点信息, 将当前节点作为第一个子节点, 类型: 临时有序
- currentNodePath = zkClient.createEphemeralSequential(PARENT_NODE_PATH+"/","lock");
- }
- // 不是第一个子节点, 获取父节点下所有子节点
- List<String> childrenNodeList = zkClient.getChildren(PARENT_NODE_PATH);
- // 子节点升序排序
- Collections.sort(childrenNodeList);
- // 判断是否加锁成功
- if (currentNodePath.equals(PARENT_NODE_PATH+"/"+childrenNodeList.get(0))){
- // 当前节点是序号最小的节点
- return true;
- }else {
- // 当前节点不是序号最小的节点, 获取其前面的节点名称, 并赋值
- int length = PARENT_NODE_PATH.length();
- int currentNodeNumber = Collections.binarySearch(childrenNodeList,
- currentNodePath.substring(length + 1));
- preNodePath = PARENT_NODE_PATH+"/"+childrenNodeList.get(currentNodeNumber-1);
- }
- return false;
- }
- @Override
- public void waitLock() {
- IZkDataListener zkDataListener = new IZkDataListener() {
- @Override
- public void handleDataChange(String dataPath, Object data) throws Exception {
- }
- @Override
- public void handleDataDeleted(String dataPath) throws Exception {
- if (countDownLatch != null){
- countDownLatch.countDown();
- }
- }
- };
- // 监听前一个节点的改变
- zkClient.subscribeDataChanges(preNodePath,zkDataListener);
- if (zkClient.exists(preNodePath)){
- countDownLatch = new CountDownLatch(1);
- try {
- countDownLatch.await();
- } catch (InterruptedException e) {
- }
- }
- zkClient.unsubscribeDataChanges(preNodePath,zkDataListener);
- }
- @Override
- public void releaseLock() {
- zkClient.delete(currentNodePath);
- zkClient.close();
- }
- }
4|4Redis 分布式锁
原理 & 实现
分布式锁的一个很重要的特性就是互斥性, 同一时间内多个调用方加锁竞争, 只能有一个调用方加锁成功. 而 Redis 是基于单线程模型的, 可以利用这个特性让调用方的请求排队, 对于并发请求, 只会有一个请求能获取到锁.
Redis 实现分布式锁也很简单, 基于客户端的几个 API 就可以完成, 主要涉及三个核心 API:
setNx(): 向 Redis 中存 key-value, 只有当 key 不存在时才会设置成功, 否则返回 0. 用于体现互斥性.
expire(): 设置 key 的过期时间, 用于避免死锁出现.
delete(): 删除 key, 用于释放锁.
1) 编写工具类实现加锁
通过 jedis.set 进行加锁, 如果返回值是 OK, 代表加锁成功
如果加锁失败, 则自旋不断尝试获取锁, 同时在一定时间内如果仍没有获取到锁, 则退出自旋, 不再尝试获取锁.
requestId: 用于标识当前每个线程自己持有的锁标记
- public class SingleRedisLock {
- JedisPool jedisPool = new JedisPool("192.168.200.128",6379);
- // 锁过期时间
- protected long internalLockLeaseTime = 30000;
- // 获取锁的超时时间
- private long timeout = 999999;
- /**
- * 加锁
- * @param lockKey 锁键
- * @param requestId 请求唯一标识
- * @return
- */
- SetParams setParams = SetParams.setParams().nx().px(internalLockLeaseTime);
- public boolean tryLock(String lockKey, String requestId){
- String threadName = Thread.currentThread().getName();
- Jedis jedis = this.jedisPool.getResource();
- Long start = System.currentTimeMillis();
- try{
- for (;;){
- String lockResult = jedis.set(lockKey, requestId, setParams);
- if ("OK".equals(lockResult)){
- System.out.println(threadName+": 获取锁成功");
- return true;
- }
- // 否则循环等待, 在 timeout 时间内仍未获取到锁, 则获取失败
- System.out.println(threadName+": 获取锁失败, 等待中");
- long l = System.currentTimeMillis() - start;
- if (l>=timeout) {
- return false;
- }
- try {
- Thread.sleep(100);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }finally {
- jedis.close();
- }
- }
- }
解锁时, 要避免当前线程将别人的锁释放掉. 假设线程 A 加锁成功, 当过了一段时间线程 A 来解锁, 但线程 A 的锁已
经过期了, 在这个时间节点, 线程 B 也来加锁, 因为线程 A 的锁已经过期, 所以线程 B 时可以加锁成功的. 此时, 就
会出现问题, 线程 A 将线程 B 的锁给释放了.
对于这个问题, 就需要使用到加锁时的 requestId. 当解锁时要判断当前锁键的 value 与传入的 value 是否相同, 相
同的话, 则代表是同一个人, 可以解锁. 否则不能解锁.
但是对于这个操作, 有非常多的人, 会先查询做对比, 接着相同则删除. 虽然思路是对的, 但是忽略了一个问题,
原子性. 判断与删除分成两步执行, 则无法保证原子性, 一样会出现问题. 所以解锁时不仅要保证加锁和解锁是同
一个人还要保证解锁的原子性. 因此结合 lua 脚本完成查询 & 删除操作.
- /**
- * 解锁
- * @param lockKey 锁键
- * @param requestId 请求唯一标识
- * @return
- */
- public boolean releaseLock(String lockKey,String requestId){
- String threadName = Thread.currentThread().getName();
- System.out.println(threadName+": 释放锁");
- Jedis jedis = this.jedisPool.getResource();
- String lua =
- "if redis.call('get',KEYS[1]) == ARGV[1] then" +
- "return redis.call('del',KEYS[1])" +
- "else" +
- "return 0" +
- "end";
- try {
- Object result = jedis.eval(lua, Collections.singletonList(lockKey),
- Collections.singletonList(requestId));
- if("1".equals(result.toString())){
- return true;
- }
- return false;
- }finally {
- jedis.close();
- }
- }
测试类
- public class LoclTest {
- public static void main(String[] args) {
- // 模拟多个 5 个客户端
- for (int i=0;i<5;i++) {
- Thread thread = new Thread(new LockRunnable());
- thread.start();
- }
- }
- private static class LockRunnable implements Runnable {
- @Override
- public void run() {
- SingleRedisLock singleRedisLock = new SingleRedisLock();
- String requestId = UUID.randomUUID().toString();
- boolean lockResult = singleRedisLock.tryLock("lock", requestId);
- if (lockResult){
- try {
- TimeUnit.SECONDS.sleep(5);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- singleRedisLock.releaseLock("lock",requestId);
- }
- }
- }
此时可以发现, 多线程会竞争同一把锁, 且没有获取获取到锁的线程会自旋不断尝试去获取锁. 每当一个线程将锁
释放后, 则会有另外一个线程持有锁. 依次类推.
存在的问题
锁续期
当对业务进行加锁时, 锁的过期时间, 绝对不能想当然的设置一个值. 假设线程 A 在执行某个业务时加锁成功
并设置锁过期时间. 但该业务执行时间过长, 业务的执行时间超过了锁过期时间, 那么在业务还没执行完
时, 锁就自动释放了. 接着后续线程就可以获取到锁, 又来执行该业务. 就会造成线程 A 还没执行完, 后续线
程又来执行, 导致同一个业务逻辑被重复执行. 因此对于锁的超时时间, 需要结合着业务执行时间来判断,
让锁的过期时间大于业务执行时间.
上面的方案是一个基础解决方案, 但是仍然是有问题的.
业务执行时间的影响因素太多了, 无法确定一个准确值, 只能是一个估值. 无法百分百保证业务执行期间,
锁只能被一个线程占有.
如想保证的话, 可以在创建锁的同时创建一个守护线程, 同时定义一个定时任务每隔一段时间去为未释放的锁增加过期时间. 当业务执行完, 释放锁后, 再关闭守护线程. 这种实现思想可以用来解决锁续期.
服务单点 & 集群问题
在单点 Redis 虽然可以完成锁操作, 可一旦 Redis 服务节点挂掉了, 则无法提供锁操作.
在生产环境下, 为了保证 Redis 高可用, 会采用异步复制方法进行主从部署. 当主节点写入数据成功, 会异步的将
数据复制给从节点, 并且当主节点宕机, 从节点会被提升为主节点继续工作. 假设主节点写入数据成功, 在没有将
数据复制给从节点时, 主节点宕机. 则会造成提升为主节点的从节点中是没有锁信息的, 其他线程则又可以继续加锁, 导致互斥失效.
4|*5Redisson 分布式锁
redisson 是 Redis 官网推荐实现分布式锁的一个第三方类库. 其内部完成的功能非常强大, 对各种锁都有实现, 同
时对于使用者来说非常简单, 让使用者能够将更多的关注点放在业务逻辑上. 此处重点利用 Redisson 解决单机锁产生的两个问题.
单机 Redisson 实现
依赖
- <dependency>
- <groupId>org.apache.commons</groupId>
- <artifactId>commons-pool2</artifactId>
- </dependency>
- <!--Redis 分布式锁 -->
- <dependency>
- <groupId>org.redisson</groupId>
- <artifactId>redisson-spring-boot-starter</artifactId>
- <version>3.13.1</version>
- </dependency>
配置文件
- server:
- Redis:
- host: 192.168.200.150
- port: 6379
- database: 0
- jedis:
- pool:
- max-active: 500
- max-idle: 1000
- min-idle: 4
启动类
- @Value("${spring.redis.host}")
- private String host;
- @Value("${spring.redis.port}")
- private String port;
- @Bean
- public RedissonClient redissonClient(){
- RedissonClient redissonClient;
- Config config = new Config();
- String url = "redis://" + host + ":" + port;
- config.useSingleServer().setAddress(url);
- try {
- redissonClient = Redisson.create(config);
- return redissonClient;
- } catch (Exception e) {
- e.printStackTrace();
- return null;
- }
- }
锁工具
- @Component
- public class RedissonLock {
- @Autowired
- private RedissonClient redissonClient;
- /**
- * 加锁
- * @param lockKey
- * @return
- */
- public boolean addLock(String lockKey){
- try {
- if (redissonClient == null){
- System.out.println("redisson client is null");
- return false;
- }
- RLock lock = redissonClient.getLock(lockKey);
- // 设置锁超时时间为 5 秒, 到期自动释放
- lock.lock(5, TimeUnit.SECONDS);
- System.out.println(Thread.currentThread().getName()+": 获取到锁");
- // 加锁成功
- return true;
- } catch (Exception e) {
- e.printStackTrace();
- return false;
- }
- }
- public boolean releaseLock(String lockKey){
- try{
- if (redissonClient == null){
- System.out.println("redisson client is null");
- return false;
- }
- RLock lock = redissonClient.getLock(lockKey);
- lock.unlock();
- System.out.println(Thread.currentThread().getName()+": 释放锁");
- return true;
- }catch (Exception e){
- e.printStackTrace();
- return false;
- }
- }
- }
测试类
- @SpringBootTest
- @RunWith(SpringRunner.class)
- public class RedissonLockTest {
- @Autowired
- private RedissonLock redissonLock;
- @Test
- public void easyLock(){
- // 模拟多个 10 个客户端
- for (int i=0;i<10;i++) {
- Thread thread = new Thread(new LockRunnable());
- thread.start();
- }
- try {
- System.in.read();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- private class LockRunnable implements Runnable {
- @Override
- public void run() {
- redissonLock.addLock("demo");
- try {
- TimeUnit.SECONDS.sleep(3);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- redissonLock.releaseLock("demo");
- }
- }
- }
根据执行效果可知, 多线程并发获取所时, 当一个线程获取到锁, 其他线程则获取不到, 并且其内部会不断尝试获
取锁, 当持有锁的线程将锁释放后, 其他线程则会继续去竞争锁.
源码分析
lock() 源码分析
当获取到 RLock 对象后, 调用其内部的 lock() 执行加锁操作. 根据源码描述, 当线程获取锁时, 如果没有获取到
锁, 则会让其进入自旋, 直到获取到锁. 如果获取到锁, 则会一直保留到调用 unLock() 手动释放或根据传入的
leaseTime 时间自动释放.
当前传入两个参数值: 锁超时时间, 时间单位. 主要用于避免死锁的出现, 假设持有锁的 Redis 节点宕机, 到期后
锁可以自动释放.
image.PNG
lock() 方法中还会调用 lock() 的另外一个重载方法, 需要传入三个参数: 过期时间, 时间单位, 是否中断.
image.PNG
在三个参数的 lock() 重载方法中, 首先会获取当前线程 id, 接着调用 tryAcquire() 方法尝试获取锁, 如果返回值为
null, 代表获取到锁. 如果返回值不是 null, 则根据当前线程 id 创建异步任务并放入线程池中, 接着进入自旋, 在
自旋过程中, 尝试调用 tryAcquire() 获取锁, 如果获取到则退出自旋. 否则会不断的尝试获取锁.
image.PNG
在 lock() 方法中, 最核心的是 tryAcquire(). 其内部核心实现会调用 tryAcquireAsync(), 并传入过期时间, 时间单位和当前线程 id, 进行锁的获取. 如果 leaseTime 不为 - 1, 代表设置了有效时间, 接着调用 tryAcquireAsync() 去获取锁. 如果是 - 1 的话, 则默认把永不过期改为 30 秒过期, 并且创建异步任务, 如果没有获取到锁, 则什么都不做. 如果获取到了锁, 则调用 scheduleExpirationRenewal() 对当前线程 id 的锁进行延时.
image.PNG
最终的 tryLockInnerAsync() 则是获取锁的具体实现. 可以看到, 其内部是基于 lua 脚本语言完成锁获取的. 因为获取锁的过程涉及到了多步, 为了保证执行过程的原子性, 所以使用了 lua, 最核心的就是要理解这段 lua 脚本的执行过程.
对于这款 lua 脚本来说, KEYS[1] 代表需要加锁的 key,ARGV[1] 代表锁的超时时间, ARGV[2] 代表锁的唯一标识.
对于这段 lua 脚本, 简单来说:
1) 检查锁 key 是否被占用了, 如果没有则设置锁 key 和唯一标识, 初始值为 1, 并且设置锁 key 的过期时间.
2) 如果锁 key 存在, 并且 value 也匹配, 表示是当前线程持有的锁, 那么重入次数加 1, 并且设置失效时间.
3) 返回锁 key 的失效时间毫秒数.
unLock() 源码分析
在释放锁时, unlock() 内部会调用 unlockAsync() 对当前线程持有的锁进行释放. 其内部最终会执行 unlockInnerAsync() 方法完成锁释放并返回结果.
image.PNG
在 unlockInnerAsync() 中仍然是结合 lua 脚本完成释放锁操作.
相关参数:
KEYS[1]: 当前锁 key.
KEYS[2]:Redis 消息的 ChannelName, 每个锁对应唯一的一个 channelName.
ARGV[1]:Redis 消息体, 用于标记 Redis 的 key 已经解锁, 用于通知其他线程申请锁.
ARGV[2]: 锁超时时间.
ARGV[3]: 锁的唯一标识.
1) 判断锁 key 和锁的唯一标识是否匹配, 如果不匹配, 表示锁已经被占用, 那么直接返回.
2) 如果是当前线程持有锁, 则 value 值 - 1, 用于重入操作.
3) 如果 - 1 后的值大于 0, 则对锁设置过期时间.
4) 如果 - 1 后的值为 0, 则删除锁 key, 并发布消息, 该锁已被释放. 用于通知其他线程申请锁.
image.PNG
锁续期
对于锁续期问题, 在单点 Redis 实现分布式锁时已经介绍过了, 用于防止业务执行超时或宕机而引起的业务被重复
执行.
根据对 lock 方法的解析, 可以发现, 当设置完过期时间后, 当前锁的过期时间就已经被设定了, 不会发生改变, 锁
到期后则会被自动释放, 因此在业务执行中, 通过 lock() 方法加锁会造成隐患.
红锁
当在单点 Redis 中实现 Redis 锁时, 一旦 Redis 服务器宕机, 则无法进行锁操作. 因此会考虑将 Redis 配置为主从结
构, 但在主从结构中, 数据复制是异步实现的. 假设在主从结构中, master 会异步将数据复制到 slave 中, 一旦某
个线程持有了锁, 在还没有将数据复制到 slave 时, master 宕机. 则 slave 会被提升为 master, 但被提升为 slave 的
master 中并没有之前线程的锁信息, 那么其他线程则又可以重新加锁
redlock 算法
redlock 是一种基于多节点 Redis 实现分布式锁的算法, 可以有效解决 Redis 单点故障的问题. 官方建议搭建五台
Redis 服务器对 redlock 算法进行实现.
在 Redis 官网中, 对于 redlock 算法的实现思想也做了详细的介绍. 地址: https://redis.io/topics/distlock.
整个实现过程分为五步:
1) 记录获取锁前的当前时间
2) 使用相同的 key,value 获取所有 Redis 实例中的锁, 并且设置获取锁的时间要远远小于锁自动释放的时间. 假设
锁自动释放时间是 10 秒, 则获取时间应在 5-50 毫秒之间. 通过这种方式避免客户端长时间等待一个已经关闭的实
例, 如果一个实例不可用了, 则尝试获取下一个实例.
3) 客户端通过获取所有实例的锁后的时间减去第一步的时间, 得到的差值要小于锁自动释放时间, 避免拿到一个
已经过期的锁. 并且要有超过半数的 Redis 实例成功获取到锁, 才算最终获取锁成功. 如果不是超过半数, 有可能
出现多个客户端重复获取到锁, 导致锁失效.
4) 当已经获取到锁, 那么它的真正失效时间应该为: 过期时间 - 第三步的差值.
5) 如果客户端获取锁失败, 则在所有 Redis 实例中释放掉锁. 为了保证更高效的获取锁, 还可以设置重试策略, 在
一定时间后重新尝试获取锁, 但不能是无休止的, 要设置重试次数.
虽然通过 redlock 能够更加有效的防止 Redis 单点问题, 但是仍然是存在隐患的. 假设 Redis 没有开启持久化,
clientA 获取锁后, 所有 Redis 故障重启, 则会导致 clientA 锁记录消失, clientB 仍然能够获取到锁. 这种情况虽然发生几率极低, 但并不能保证肯定不会发生.
保证的方案就是开始 AOF 持久化, 但是要注意同步的策略, 使用每秒同步, 如果在一秒内重启, 仍然数据丢失. 使用 always 又会造成性能急剧下降.
官方推荐使用默认的 AOF 策略即每秒同步, 且在 Redis 停掉后, 要在 ttl 时间后再重启. 缺点就是 ttl 时间内 Redis 无法
对外提供服务.
实现
redisson 对于红锁的实现已经非常完善, 通过其内部提供的 API 既可以完成红锁的操作.
- @Configuration
- public class RedissonRedLockConfig {
- public RedissonRedLock initRedissonClient(String lockKey){
- Config config1 = new Config();
- config1.useSingleServer().setAddress("redis://192.168.200.150:7000").setDatabase(0);
- RedissonClient redissonClient1 = Redisson.create(config1);
- Config config2 = new Config();
- config2.useSingleServer().setAddress("redis://192.168.200.150:7001").setDatabase(0);
- RedissonClient redissonClient2 = Redisson.create(config2);
- Config config3 = new Config();
- config3.useSingleServer().setAddress("redis://192.168.200.150:7002").setDatabase(0);
- RedissonClient redissonClient3 = Redisson.create(config3);
- Config config4 = new Config();
- config4.useSingleServer().setAddress("redis://192.168.200.150:7003").setDatabase(0);
- RedissonClient redissonClient4 = Redisson.create(config4);
- Config config5 = new Config();
- config5.useSingleServer().setAddress("redis://192.168.200.150:7004").setDatabase(0);
- RedissonClient redissonClient5 = Redisson.create(config5);
- RLock rLock1 = redissonClient1.getLock(lockKey);
- RLock rLock2 = redissonClient2.getLock(lockKey);
- RLock rLock3 = redissonClient3.getLock(lockKey);
- RLock rLock4 = redissonClient4.getLock(lockKey);
- RLock rLock5 = redissonClient5.getLock(lockKey);
- RedissonRedLock redissonRedLock = new
- RedissonRedLock(rLock1,rLock2,rLock3,rLock4,rLock5);
- return redissonRedLock;
- }
- }
测试类
- @SpringBootTest
- @RunWith(SpringRunner.class)
- public class RedLockTest {
- @Autowired
- private RedissonRedLockConfig redissonRedLockConfig;
- @Test
- public void easyLock(){
- // 模拟多个 10 个客户端
- for (int i=0;i<10;i++) {
- Thread thread = new Thread(new RedLockTest.RedLockRunnable());
- thread.start();
- }
- try {
- System.in.read();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- private class RedLockRunnable implements Runnable {
- @Override
- public void run() {
- RedissonRedLock redissonRedLock = redissonRedLockConfig.initRedissonClient("demo");
- try {
- boolean lockResult = redissonRedLock.tryLock(100, 10, TimeUnit.SECONDS);
- if (lockResult){
- System.out.println("获取锁成功");
- TimeUnit.SECONDS.sleep(3);
- }
- } catch (InterruptedException e) {
- e.printStackTrace();
- }finally {
- redissonRedLock.unlock();
- System.out.println("释放锁");
- }
- }
- }
- }
redissonRedLock 加锁源码分析
- public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException
- {
- long newLeaseTime = -1;
- if (leaseTime != -1) {
- newLeaseTime = unit.toMillis(waitTime)*2;
- }
- long time = System.currentTimeMillis();
- long remainTime = -1;
- if (waitTime != -1) {
- remainTime = unit.toMillis(waitTime);
- }
- long lockWaitTime = calcLockWaitTime(remainTime);
- /**
- * 1\. 允许加锁失败节点个数限制 (N-(N/2+1)), 当前假设五个节点, 则允许失败节点数为 2
- */
- int failedLocksLimit = failedLocksLimit();
- /**
- * 2\. 遍历所有节点执行 lua 加锁, 用于保证原子性
- */
- List<RLock> acquiredLocks = new ArrayList<>(locks.size());
- for (ListIterator<RLock> iterator = locks.listIterator(); iterator.hasNext();) {
- RLock lock = iterator.next();
- boolean lockAcquired;
- /**
- * 3. 对节点尝试加锁
- */
- try {
- if (waitTime == -1 && leaseTime == -1) {
- lockAcquired = lock.tryLock();
- } else {
- long awaitTime = Math.min(lockWaitTime, remainTime);
- lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS);
- }
- } catch (RedisResponseTimeoutException e) {
- // 如果抛出这类异常, 为了防止加锁成功, 但是响应失败, 需要解锁所有节点
- unlockInner(Arrays.asList(lock));
- lockAcquired = false;
- } catch (Exception e) {
- // 抛出异常表示获取锁失败
- lockAcquired = false;
- }
- if (lockAcquired) {
- /**
- *4\. 如果获取到锁则添加到已获取锁集合中
- */
- acquiredLocks.add(lock);
- } else {
- /**
- * 5\. 计算已经申请锁失败的节点是否已经到达 允许加锁失败节点个数限制 (N-(N/2+1))
- * 如果已经到达, 就认定最终申请锁失败, 则没有必要继续从后面的节点申请了
- * 因为 Redlock 算法要求至少 N/2+1 个节点都加锁成功, 才算最终的锁申请成功
- 4) 消息幂等
- */
- if (locks.size() - acquiredLocks.size() == failedLocksLimit()) {
- break;
- }
- if (failedLocksLimit == 0) {
- unlockInner(acquiredLocks);
- if (waitTime == -1 && leaseTime == -1) {
- return false;
- }
- failedLocksLimit = failedLocksLimit();
- acquiredLocks.clear();
- // reset iterator
- while (iterator.hasPrevious()) {
- iterator.previous();
- }
- } else {
- failedLocksLimit--;
- }
- }
- /**
- * 6. 计算从各个节点获取锁已经消耗的总时间, 如果已经等于最大等待时间, 则申请锁失败, 返回 false
- */
- if (remainTime != -1) {
- remainTime -= System.currentTimeMillis() - time;
- time = System.currentTimeMillis();
- if (remainTime <= 0) {
- unlockInner(acquiredLocks);
- return false;
- }
- }
- }
- if (leaseTime != -1) {
- List<RFuture<Boolean>> futures = new ArrayList<>(acquiredLocks.size());
- for (RLock rLock : acquiredLocks) {
- RFuture<Boolean> future = ((RedissonLock)
- rLock).expireAsync(unit.toMillis(leaseTime), TimeUnit.MILLISECONDS);
- futures.add(future);
- }
- for (RFuture<Boolean> rFuture : futures) {
- rFuture.syncUninterruptibly();
- }
- }
- /**
- * 7. 如果逻辑正常执行完则认为最终申请锁成功, 返回 true
- */
- return true;
- }
来源: http://www.jianshu.com/p/2927542c6dd9