前言
上文 [从入门到放弃 - ZooKeeper] ZooKeeper 实战 - 分布式锁中, 我们通过利用 ZooKeeper 的临时节点特性, 实现了一个分布式锁.
但是是通过轮询的方式去判断不断尝试获取锁, 空转对于 CPU 还是有一定消耗的, 同时, 对于多个线程竞争锁激烈的时候, 很容易出现羊群效应.
为了解决上面两个问题. 本文来看一下如何实现一个升级版的分布式锁.
设计
我们依然实现 java.util.concurrent.locks.Lock 接口.
和上一文中实现方式不同的是, 我们使用 ZooKeeper 的 EPHEMERAL_SEQUENTIAL 临时顺序节点.
当首次获取锁时, 会创建一个临时节点, 如果这个临时节点末尾数字是当前父节点下同名节点中最小的, 则获取锁成功.
否则, 则监听上一个数字较大的节点, 直到上一个节点被释放, 则再次尝试获取锁成功. 这样可以避免多个线程同时获取一把锁造成的竞争.
同时使用了 ZooKeeper 提供的 watch 功能, 避免了轮询带来的 CPU 空转.
获取锁后使用一个 volatile int 类型的 state 进行计数, 来实现锁的可重入机制.
- DistributedFairLock
- public class DistributedFairLock implements Lock {
- private static Logger logger = LoggerFactory.getLogger(DistributedFairLock.class);
- //ZooKeeper 客户端, 进行 ZooKeeper 操作
- private ZooKeeper zooKeeper;
- // 根节点名称
- private String dir;
- // 加锁节点
- private String node;
- //ZooKeeper 鉴权信息
- private List<ACL> acls;
- // 要加锁节点
- private String fullPath;
- // 加锁标识, 为 0 时表示未获取到锁, 每获取一次锁则加一, 释放锁时减一. 减到 0 时断开连接, 删除临时节点.
- private volatile int state;
- // 当前锁创建的节点 id
- private String id;
- // 通过 CountDownLatch 阻塞, 直到监听上一节点被取消, 再进行后续操作
- private CountDownLatch countDownLatch;
- /**
- * Constructor.
- *
- * @param zooKeeper the zoo keeper
- * @param dir the dir
- * @param node the node
- * @param acls the acls
- */
- public DistributedFairLock(ZooKeeper zooKeeper, String dir, String node, List<ACL> acls) {
- this.zooKeeper = zooKeeper;
- this.dir = dir;
- this.node = node;
- this.acls = acls;
- this.fullPath = dir.concat("/").concat(this.node);
- init();
- }
- private void init() {
- try {
- Stat stat = zooKeeper.exists(dir, false);
- if (stat == null) {
- zooKeeper.create(dir, null, acls, CreateMode.PERSISTENT);
- }
- } catch (Exception e) {
- logger.error("[DistributedFairLock#init] error :" + e.toString(), e);
- }
- }
- }
- lock
- public void lock() {
- try {
- // 加锁
- synchronized (this) {
- // 如果当前未持有锁
- if (state <= 0) {
- // 创建节点
- if (id == null) {
- id = zooKeeper.create(fullPath, null, acls, CreateMode.EPHEMERAL_SEQUENTIAL);
- }
- // 获取当前路径下所有的节点
- List<String> nodes = zooKeeper.getChildren(dir, false);
- SortedSet<String> sortedSet = new TreeSet<>();
- for (String node : nodes) {
- sortedSet.add(dir.concat("/").concat(node));
- }
- // 获取所有 id 小于当前节点顺序的节点
- SortedSet<String> lessSet = ((TreeSet<String>) sortedSet).headSet(id);
- if (!lessSet.isEmpty()) {
- // 监听上一个节点, 就是通过这里避免多锁竞争和 CPU 空转, 实现公平锁的
- Stat stat = zooKeeper.exists(lessSet.last(), new LockWatcher());
- if (stat != null) {
- countDownLatch = new CountDownLatch(1);
- countDownLatch.await();
- }
- }
- }
- state++;
- }
- } catch (InterruptedException e) {
- logger.error("[DistributedFairLock#lock] error :" + e.toString(), e);
- Thread.currentThread().interrupt();
- } catch (KeeperException ke) {
- logger.error("[DistributedFairLock#lock] error :" + ke.toString(), ke);
- if (!KeeperException.Code.NODEEXISTS.equals(ke.code())) {
- Thread.currentThread().interrupt();
- }
- }
- }
- tryLock
- public boolean tryLock() {
- try {
- synchronized (this) {
- if (state <= 0) {
- if (id == null) {
- id = zooKeeper.create(fullPath, null, acls, CreateMode.EPHEMERAL_SEQUENTIAL);
- }
- List<String> nodes = zooKeeper.getChildren(dir, false);
- SortedSet<String> sortedSet = new TreeSet<>();
- for (String node : nodes) {
- sortedSet.add(dir.concat("/").concat(node));
- }
- SortedSet<String> lessSet = ((TreeSet<String>) sortedSet).headSet(id);
- if (!lessSet.isEmpty()) {
- return false;
- }
- }
- state++;
- }
- } catch (InterruptedException e) {
- logger.error("[DistributedFairLock#tryLock] error :" + e.toString(), e);
- return false;
- } catch (KeeperException ke) {
- logger.error("[DistributedFairLock#tryLock] error :" + ke.toString(), ke);
- if (!KeeperException.Code.NODEEXISTS.equals(ke.code())) {
- return false;
- }
- }
- return true;
- }
- @Override
- public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
- try {
- synchronized (this) {
- if (state <= 0) {
- if (id == null) {
- id = zooKeeper.create(fullPath, null, acls, CreateMode.EPHEMERAL_SEQUENTIAL);
- }
- List<String> nodes = zooKeeper.getChildren(dir, false);
- SortedSet<String> sortedSet = new TreeSet<>();
- for (String node : nodes) {
- sortedSet.add(dir.concat("/").concat(node));
- }
- SortedSet<String> lessSet = ((TreeSet<String>) sortedSet).headSet(id);
- if (!lessSet.isEmpty()) {
- Stat stat = zooKeeper.exists(lessSet.last(), new LockWatcher());
- if (stat != null) {
- countDownLatch = new CountDownLatch(1);
- countDownLatch.await(time, unit);
- }
- }
- }
- state++;
- }
- } catch (InterruptedException e) {
- logger.error("[DistributedFairLock#tryLock] error :" + e.toString(), e);
- return false;
- } catch (KeeperException ke) {
- logger.error("[DistributedFairLock#tryLock] error :" + ke.toString(), ke);
- if (!KeeperException.Code.NODEEXISTS.equals(ke.code())) {
- return false;
- }
- }
- return true;
- }
- unlock
- public void unlock() {
- synchronized (this) {
- if (state> 0) {
- state--;
- }
- // 当不再持有锁时, 删除创建的临时节点
- if (state == 0 && zooKeeper != null) {
- try {
- zooKeeper.delete(id, -1);
- id = null;
- } catch (Exception e) {
- logger.error("[DistributedFairLock#unlock] error :" + e.toString(), e);
- }
- }
- }
- }
- LockWatcher
- private class LockWatcher implements Watcher {
- @Override
- public void process(WatchedEvent event) {
- synchronized (this) {
- if (countDownLatch != null) {
- countDownLatch.countDown();
- }
- }
- }
- }
总结
上面就是我们改良后, 通过临时顺序节点和 watch 机制实现的公平可重入分布式锁.
源代码可见: aloofJr
通过 watch 机制避免轮询带来的 CPU 空转.
通过顺序临时节点避免了羊群效应.
如果对以上方式有更好的优化方案, 欢迎一起讨论.
更多文章
见我的博客: https://nc2era.com
来源: https://yq.aliyun.com/articles/738391