缓存锁
我们常常将缓存作为分布式锁的解决方案, 但是却不能单纯的判断某个 key 是否存在 来作为锁的获得依据, 因为无论是 exists 和 get 命名都不是线程安全的, 都无法保证只有一个线程可以获得锁, 存在线程争抢, 可能会有多个线程同时拿到锁的情况(经典的 Redis "读后写" 的问题).
incr 缓存锁
- @Component
- public class LockClient {
- private StringRedisTemplate stringRedisTemplate;
- private ValueOperations<String, String> valueOperations;
- @Autowired
- public void setStringRedisTemplate(StringRedisTemplate stringRedisTemplate) {
- this.stringRedisTemplate = stringRedisTemplate;
- this.valueOperations = stringRedisTemplate.opsForValue();
- }
- public void lockIncr() {
- Long lockIncr = valueOperations.increment("lockIncr", 1);
- // 说明拿到了锁
- if (lockIncr == 1) {
- // 业务操作
- }
- }
- }
incr: 递增指定键对应的数值, 如果不存在 key 对应的值, 那么会先将 key 的值设置为 0, 然后执行 incr 操作, 返回递增的值.
这种锁的实现原理主要是利用 incr 命令的原子性, 同一时间只会有一个线程操作这个命令.
这种锁的实现方式, 不在乎结果数据. 保证只有唯一线程能够执行到业务代码.
setnx 缓存锁
上面的锁实现方式, 我们对资源做了隔离, 保证只有唯一线程可以拿到资源并执行操作. 但是如果资源并不是唯一线程执行的呢? 存在多个线程争抢的情况下呢?
- public void lockSetnx() {
- String lock = "lockSetnx";
- long millis = System.currentTimeMillis();
- long timeout = millis + 3000L + 1;
- try {
- while (true) {
- boolean setnx = valueOperations.setIfAbsent(lock, timeout + "");
- if (setnx == true) {
- break;
- }
- String oldTimeout = valueOperations.get(lock);
- // 这一步是为了解决客户端异常宕机, 锁没有被正常释放的时候.
- // 当 p1,p2 同时执行到这里, 发现锁的时间过期了. p1,p2 同时执行 getSet 命令.
- // 假设 p1 先执行成功了, 那么 p1 得到的值就是原来锁的过期时间(可以符合下面的判断式), 表示争抢锁成功.
- // 假设 p2 后执行成功了, 那么 p2 得到的值就是 p1 set 进去的值(不会符合下面的表达式), 表示争抢锁失败.
- String oldValue = valueOperations.getAndSet(lock, timeout + "");
- if (millis> Long.valueOf(oldTimeout) && millis> Long.valueOf(oldValue)) {
- break;
- }
- // 休眠 100 毫秒, 再去争抢锁
- Thread.sleep(100);
- }
- // 执行业务代码
- } catch (InterruptedException e) {
- e.printStackTrace();
- } finally {
- if (millis < timeout) {
- stringRedisTemplate.delete(lock);
- }
- }
- }
setnx: 只有第一个线程会执行成功, 返回 true, 其余线程执行失败, 返回 false.
getSet: 返回 key 中的旧值, 并把新的值 set 进去.
细细看来, 好像似乎 setnx 命令就能够实现分布式锁了, 为什么还要 getSet 命名呢? getSet 命令是为了解决客户端异常宕机, 锁没有被正常释放的情况下, 结合过期时间来保证线程安全. 可以看看官网 http://redis.io/commands/setnx 的介绍, 有详细解释这个问题.
zookeeper 锁
zookeeper, 天生的分布式协调工具, 生来就是为了解决各种分布式的难题, 比如分布式锁, 分布式计数器, 分布式队列等等.
zookeeper 分布式锁, 如果自己实现的话, 大抵的实现方式如下:
公平锁:
在 zookeeper 的指定节点 (locks) 下创建临时顺序节点 node_n ;
获取 locks 下面的所有子节点 children.
对子节点按节点自增序号从小到大排序.
判断本节点是不是第一个子节点, 如果是, 则获取到锁. 如果不是, 则监听比该节点小的那个节点的删除事件.
若监听事件生效, 则回到第二步重新判断, 直到获取到锁.
不公平锁
在 zookeeper 的某个节点 (lock) 上创建临时节点 znode.
创建成功, 就表示获取到了这个锁; 其他客户端来创建锁会失败, 只能注册对这个锁的监听.
其他客户端监听到这个锁被释放(znode 节点被删除), 就会尝试加锁(创建节点), 继续执行第二步.
幸运的是, zookeeper recipes 客户端为我们提供了多种分布式锁实现:
- InterProcessMutex(可重入排他锁)
- InterProcessSemaphoreMutex(不可重入排他锁)
- InterProcessReadWriteLock(分布式读写锁)
- InterProcessSemaphore(共享信号量 -- 设置最大并行数量)
zookeeper recipes 锁的简单使用:
- public InterProcessMutex interProcessMutex(String lockPath) {
- CuratorFramework client = CuratorFrameworkFactory.newClient(zookeeper, new ExponentialBackoffRetry(1000, 3));
- // 启用命名空间, 做微服务间隔离
- client.usingNamespace(namespace);
- client.start();
- return new InterProcessMutex(client, lockPath);
- }
- public void lockUse() {
- InterProcessMutex interProcessMutex = interProcessMutex("/lockpath");
- try {
- // 获取锁
- if (interProcessMutex.acquire(100, TimeUnit.MILLISECONDS)) {
- // 执行业务代码
- }
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- // 释放锁
- try {
- interProcessMutex.release();
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
推荐一篇 zookeeper 介绍很全面的文章: https://www.cnblogs.com/shamo89/p/9800925.html
比较
缓存分布式锁, 必须采用轮询的方式去尝试加锁, 对性能浪费很大; zookeeper 分布式锁, 可以通过监听的方式等待通知或超时, 当有锁释放, 通知使用者即可.
如果缓存获取锁的那个客户端宕机了, 锁不会被释放, 只能通过其它方式解决(上面的 getSet 判断); 而 zookeeper 的话, 因为创建的是临时 znode, 只要客户端挂了, znode 就没了, 此时就自动释放锁.
来源: https://www.cnblogs.com/jmcui/p/11186224.html