通过 ZooKeeper 的有序节点, 节点路径不回重复, 还有节点删除会触发 Wathcer 事件的这些特性, 我们可以实现分布式锁.
一, 思路
zookeeper 中创建一个根节点 Locks, 用于后续各个客户端的锁操作.
当要获取锁的时候, 在 Locks 节点下创建 "Lock_序号" 的零时有序节点 (临时节点为了客户端突发断开连接, 则此节点消失).
如果没有得到锁, 就监控排在自己前面的序号节点, 等待它的释放.
当前面的锁被释放后, 触发 Process 方法, 然后继续获取当前子节点, 判断当前节点是不是第一个, 是 返回锁, 否 获取锁失败.
二, 实现
在实现是要了解一个类 AutoResetEvent.AutoResetEvent 常常被用来在两个线程之间进行信号发送. 它有两个重要的方法:
Set() : 发送信号到等待线程以继续其工作.
bool WaitOne(): 等待另一个线程发信号, 只有收到信号, 线程才继续往下执行 , 会一直等待下去, 返回值表示是否收到信号.
bool WaitOne(int millisecondsTimeout): 等待指定时间, 如果没有收到信号继续执行, 返回值表示是否收到信号.
下面为具体实现方法:
- public class ZooKeeperLock
- {
- private MyWatcher myWatcher;
- private string lockNode;
- private org.apache.zookeeper.ZooKeeper zooKeeper;
- public ZooKeeperLock()
- {
- myWatcher = new MyWatcher();
- }
- /// <summary>
- /// 获取锁
- /// </summary>
- /// <param name="millisecondsTimeout"> 等待时间 </param>
- /// <returns></returns>
- public async Task<bool> TryLock(int millisecondsTimeout = 0)
- {
- try
- {
- zooKeeper = new org.apache.zookeeper.ZooKeeper("127.0.0.1", 50000, new MyWatcher());
- // 创建锁节点
- if (await zooKeeper.existsAsync("/Locks") == null)
- await zooKeeper.createAsync("/Locks", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- // 新建一个临时锁节点
- lockNode = await zooKeeper.createAsync("/Locks/Lock_", null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
- // 获取锁下所有节点
- var lockNodes = await zooKeeper.getChildrenAsync("/Locks");
- lockNodes.Children.Sort();
- // 判断如果创建的节点就是最小节点 返回锁
- if (lockNode.Split("/").Last() == lockNodes.Children[0])
- return true;
- else
- {
- // 当前节点的位置
- var location = lockNodes.Children.FindIndex(n => n == lockNode.Split("/").Last());
- // 获取当前节点 前面一个节点的路径
- var frontNodePath = lockNodes.Children[location - 1];
- // 在前面一个节点上加上 Watcher , 当前面那个节点删除时, 会触发 Process 方法
- await zooKeeper.getDataAsync("/Locks/" + frontNodePath, myWatcher);
- // 如果时间为 0 一直等待下去
- if (millisecondsTimeout == 0)
- myWatcher.AutoResetEvent.WaitOne();
- else // 如果时间不为 0 等待指定时间后, 返回结果
- {
- var result = myWatcher.AutoResetEvent.WaitOne(millisecondsTimeout);
- if (result)// 如果返回 True, 说明在指定时间内, 前面的节点释放了锁 (但是可能是中间节点主机宕机 导致, 所以不一定触发了 Process 方法就是得到了锁. 需要重新判断是不是第一个节点)
- {
- // 获取锁下所有节点
- lockNodes = await zooKeeper.getChildrenAsync("/Locks");
- // 判断如果创建的节点就是最小节点 返回锁
- if (lockNode.Split("/").Last() == lockNodes.Children[0])
- return true;
- else
- return false;
- }
- else
- return false;
- }
- }
- }
- catch (KeeperException e)
- {
- await UnLock();
- throw e;
- }
- return false;
- }
- /// <summary>
- /// 释放锁
- /// </summary>
- /// <returns></returns>
- public async Task UnLock()
- {
- try
- {
- myWatcher.AutoResetEvent.Dispose();await zooKeeper.deleteAsync(lockNode);
- }
- catch (KeeperException e)
- {
- throw e;
- }
- }
- }
Process 方法实现:
- public class MyWatcher : Watcher
- {
- public AutoResetEvent AutoResetEvent;
- public MyWatcher()
- {
- this.AutoResetEvent = new AutoResetEvent(false);
- }
- public override Task process(WatchedEvent @event)
- {
- if (@event.get_Type() == EventType.NodeDeleted)
- {
- AutoResetEvent.Set();
- }
- return null;
- }
- }
本文源代码在: 分布式实现代码
如果你认为文章写的不错, 就点个 [推荐] 吧
来源: http://www.bubuko.com/infodetail-2971638.html