一. 前言
在之前的文章中介绍过分布式锁的特点和利用 Redis 实现简单的分布式锁. 但是分布式锁的实现还有很多其他方式, 但是万变不离其宗, 始终遵循一个特点: 同一时刻只能有一个操作获取. 这篇文章主要介绍如何基于 zookeeper 实现分布式锁.
zookeeper 能够作为分布式锁实现的基础
算法流程
实现
关于分布式锁的相关特性, 这里不再赘述, 请参考分布式锁 https://www.cnblogs.com/lxyit/p/9829132.html .
二. zookeeper 能够作为分布式锁实现的基础
这里回顾下分布式锁的特点:
每次只能一个占用锁;
可以重复进入锁;
只有占用者才可以解锁;
获取锁和释放锁都需要原子
不能产生死锁
尽量满足性能
zookeeper 中有一种临时顺序节点, 它具有以下特征:
时效性, 当会话结束, 节点将自动被删除
顺序性, 当多个应用向其注册顺序节点时, 每个顺序号将只能被一个应用获取
利用以上的特点可以满足分布式锁实现的基本要求:
因为顺序性, 可以让最小顺序号的应用获取到锁, 从而满足分布式锁的每次只能一个占用锁, 因为只有它一个获取到, 所以可以实现重复进入, 只要设置标识即可. 锁的释放, 即删除应用在 zookeeper 上注册的节点, 因为每个节点只被自己注册拥有, 所以只有自己才能删除, 这样就满足只有占用者才可以解锁
zookeeper 的序号分配是原子的, 分配后即不会再改变, 让最小序号者获取锁, 所以获取锁是原子的
因为注册的是临时节点, 在会话期间内有效, 所以不会产生死锁
zookeeper 注册节点的性能能满足几千, 而且支持集群, 能够满足大部分情况下的性能
三. 算法流程
1. 获取锁
需要获取分布式锁的应用都向 zookeeper 的 / lock/{resouce} 目录下注册 sequence - 前缀的节点, 序号最小者获取到操作资源的权限:
Note:
这里的 resource 需要依据竞争的具体资源确定, 如竞争账户则可以使用账户号作为 resource.
从图中可以看出, clientA 的顺序号最小, 由它获取到锁, 操作资源.
算法步骤:
client 判断 / lock 目录是否存在, 如果不存在则向其注册 / lock 的持久节点
client 判断 / lock 目录下是否存在竞争的资源 resouce 目录, 如果不存在则向其注册 / lock/resource 的持久节点
client 向 / lock/resource 目录下注册 / lock/resource/sequence - 前缀的临时顺序节点, 并得到顺序号
client 获取 / lock/resource 目录下的所有临时顺序子节点
client 判断临时子节点序号中是否存在比自身的序号小的节点. 如果不存在, 则获取到锁; 如果存在, 则对象该临时节点做 watch 监控
如果收到监控的临时节点被删除的通知, 则再重复 4,5 步骤, 直到获取到锁
流程图:
2. 释放锁
因为最小的节点只被获取到锁的 client 持有, 所以该锁不可能被其他 client 释放. 同时释放锁只需要将临时顺序节点删除, 也是原子性操作.
三. 实现
- /**
- * 基于 Zookeeper 实现分布式锁
- *
- * @author huaijin
- */
- public class DistributedLockBaseZookeeper implements DistributedLock {
- private static final Logger log = LoggerFactory.getLogger(DistributedLockBaseZookeeper.class);
- /**
- * 利用空串作为各个节点存储的数据
- */
- private static final String EMPTY_DATA = "";
- /**
- * 分布式锁的根目录
- */
- private static final String LOCK_ROOT = "/lock";
- /**
- * zookeeper 目录分隔符
- */
- private static final String PATH_SEPARATOR = "/";
- /**
- * 临时顺序节点前缀
- */
- private static final String LOCK_NODE_PREFIX = "sequence-";
- /**
- * 利用 Lock 和 Condition 实现等待通知
- */
- private Lock waitNotifierLock = new ReentrantLock();
- private Condition waitNotifier = waitNotifierLock.newCondition();
- /**
- * 操作 zookeeper 的 client
- */
- private ZkClient zkClient;
- /**
- * 分布式资源的路径
- */
- private String resourcePath;
- /**
- * 锁节点完整前缀
- */
- private String lockNodePrefix;
- /**
- * 当前注册的临时顺序节点路径
- */
- private String currentLockNodePath;
- public DistributedLockBaseZookeeper(String resource, ZkClient zkClient) {
- Objects.requireNonNull(zkClient, "zkClient must not be null!");
- if (resource == null || resource.isEmpty()) {
- throw new IllegalArgumentException("resource must not be null!");
- }
- this.zkClient = zkClient;
- this.resourcePath = LOCK_ROOT + PATH_SEPARATOR + resource;
- this.lockNodePrefix = resourcePath + PATH_SEPARATOR + LOCK_NODE_PREFIX;
- // 创建分布式锁根目录
- if (!this.zkClient.exists(LOCK_ROOT)) {
- try {
- this.zkClient.create(LOCK_ROOT, EMPTY_DATA, CreateMode.PERSISTENT);
- } catch (ZkNodeExistsException e) {
- // ignore, logging
- log.warn("The root path for lock already exists.");
- }
- }
- // 创建资源目录
- if (!this.zkClient.exists(resourcePath)) {
- try {
- this.zkClient.create(resourcePath, EMPTY_DATA, CreateMode.PERSISTENT);
- } catch (ZkNodeExistsException e) {
- // ignore, logging
- log.warn("The resource path for [" + resourcePath + "] already exists.");
- }
- }
- }
- @Override
- public void lock() throws DistributedLockException {
- if (!acquireLock()) {
- // 如果获取锁不成功, 则等待
- waitNotifierLock.lock();
- try {
- waitNotifier.await();
- } catch (Exception e) {
- throw new DistributedLockException("Interrupt when waiting notification.");
- } finally {
- waitNotifierLock.unlock();
- }
- }
- }
- @Override
- public void unlock() {
- // 删除自身节点, 释放锁
- zkClient.delete(currentLockNodePath);
- }
- private boolean acquireLock() throws DistributedLockException {
- // 如果当前未注册临时顺序节点, 则注册
- if (this.currentLockNodePath == null) {
- this.currentLockNodePath = zkClient.create(lockNodePrefix, EMPTY_DATA, CreateMode.EPHEMERAL_SEQUENTIAL);
- }
- // 获取顺序号
- long lockNodeSeq = fetchSeqFromNodePath(currentLockNodePath);
- // 获取所有子节点
- List<String> childNodePaths = zkClient.getChildren(resourcePath);
- if (childNodePaths == null || childNodePaths.isEmpty()) {
- throw new DistributedLockException("Not exists child nodes.");
- }
- // 从所有子节点中获取最小子节点的顺序号
- long minSeq = 1000000L;
- int minIndex = -1;
- for (int i = 0; i <childNodePaths.size(); i++) {
- long nodeSeq = fetchSeqFromNodePath(resourcePath + childNodePaths.get(i));
- if (nodeSeq < minSeq) {
- minSeq = nodeSeq;
- minIndex = i;
- }
- }
- // 比较自身顺序号与最小序号
- if (lockNodeSeq> minSeq) {
- // 如果存在更小序号, 则监控最小序号的子节点
- String minLockNodePath = childNodePaths.get(minIndex);
- zkClient.subscribeDataChanges(resourcePath + PATH_SEPARATOR + minLockNodePath,
- new ListenerForLockRelease());
- return false;
- }
- // 成功获取锁, 返回
- return true;
- }
- private long fetchSeqFromNodePath(String nodePath) {
- String seq = nodePath.substring(lockNodePrefix.length());
- return Long.valueOf(seq);
- }
- private class ListenerForLockRelease implements IZkDataListener {
- @Override
- public void handleDataChange(String dataPath, Object data) throws Exception {
- }
- @Override
- public void handleDataDeleted(String dataPath) throws Exception {
- // 如果成功获取锁, 则通知, 让主线程返回
- if (acquireLock()) {
- waitNotifierLock.lock();
- try {
- waitNotifier.signal();
- } finally {
- waitNotifierLock.unlock();
- }
- }
- }
- }
- }
来源: http://www.bubuko.com/infodetail-2956214.html