一, 基本功能演示
1.1 Maven 依赖信息
1.2 代码演示
方法说明
1.3 创建 Zookeeper 节点信息
二, Watcher
2.1 什么是 Watcher 接口
2.2 Watcher 代码
一, 基本功能演示
1.1 Maven 依赖信息
- <dependency>
- <groupId>org.apache.zookeeper</groupId>
- <artifactId>zookeeper</artifactId>
- <version>3.4.6</version>
- </dependency>
1.2 代码演示
- public class Test001 {
- private static final String CONNECTSTRING = "127.0.0.1:2181";
- private static final int SESSIONTIMEOUT = 5000;
- // 信号量, 阻塞程序执行, 用户等待 zookeeper 连接成功, 发送成功信号,
- private static final CountDownLatch countDownLatch = new CountDownLatch(1);
- public static void main(String[] args) throws Exception {
- ZooKeeper zooKeeper = null;
- try {
- //1.Zookeeper 创建了一连接
- zooKeeper = new ZooKeeper(CONNECTSTRING, SESSIONTIMEOUT, new Watcher() {
- @Override
- public void process(WatchedEvent event) {
- // 监听节点是否发生变化
- // 获取事件状态
- KeeperState keeperState = event.getState();
- // 获取事件类型
- EventType eventType = event.getType();
- if (KeeperState.SyncConnected == keeperState) {
- if (EventType.None == eventType) {
- countDownLatch.countDown();// 调用该方法会减一, 如果为 0 的话
- System.out.println("zk 启动连接...");
- }
- }
- }
- });
- // 2.
- // 节点类型:
- // 1. CreateMode . EPHEMERAL 创建 - - 个临时节点
- // 2.CreateMode. EPHEMERAL _SEQUENTIAL 如果节点发生重复的情况下, 会自动 id 自增保证唯 - - 性
- // 3.CreateMode . PERSISTENT 持久类型永久保存在硬盘上
- // 4.CreateMode. PERSISTENT SEQUENTIAL 持久类型如果节点发生重复的情况下, 会自动 id 自增保证唯一性
- countDownLatch.await();// 如果计数器不为 0, 则一直等待
- String nodeResult = zooKeeper.create("/test", "zhangsan".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- System.out.println("节点名称:"+nodeResult);
- } catch (Exception e) {
- e.printStackTrace();
- }finally {
- if(zooKeeper!=null) {
- zooKeeper.close();
- }
- }
- }
- }
方法说明
创建节点(znode) 方法:
create(String path, byte[] data, List<ACL> acl, CreateMode createMode) throws KeeperException, InterruptedException
提供了两套创建节点的方法, 同步和异步创建节点方式.
同步方式:
* 参数 1, 节点路径名称 : InodeName (不允许递归创建节点, 也就是说在父节点不存在的情况下, 不允许创建子节点)
* 参数 2, 节点内容: 要求类型是字节数组(也就是说, 不支持序列化方式, 如果需要实现序列化, 可使用 java 相关序列化框架, 如 Hessian,Kryo 框架)
* 参数 3, 节点权限: 使用 Ids.OPEN_ACL_UNSAFE 开放权限即可.(这个参数一般在权展
没有太高要求的场景下, 没必要关注)
* 参数 4, 节点类型: 创建节点的类型: CreateMode, 提供四种节点类型
* PERSISTENT 持久化节点
* PERSISTENT_SEQUENTIAL 顺序自动编号持久化节点, 这种节点会根据当前已存在的节点数自动加 1
* EPHEMERAL 临时节点, 客户端 session 超时这类节点就会被自动删除
* EPHEMERAL_SEQUENTIAL 临时自动编号节点
1.3 创建 Zookeeper 节点信息
- //1. 创建持久节点, 并且允许任何服务器可以操作
- String result = zk.create("/haoworld", "Lasting".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- System.out.println("result:" + result);
- //2. 创建临时节点
- String result = zk.create("/haoworld_tmp", "temp".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
- System.out.println("result:" + result);
二, Watcher
在 ZooKeeper 中, 接口类 Watcher 用于表示一个标准的事件处理器, 其定义了事件通知相关的逻辑, 包含 KeeperState 和 EventType 两个枚举类, 分别代表了通知状态和事件类型, 同时定义了事件的回调方法: process(WatchedEvent event).
2.1 什么是 Watcher 接口
同一个事件类型在不同的通知状态中代表的含义有所不同, 表 7-3 列举了常见的通知状态和事件类型.
Watcher 通知状态与事件类型一览
![-w735](media/15681306896776/15681308320378.jpg)?
表中列举了 ZooKeeper 中最常见的几个通知状态和事件类型.
回调方法 process()
process 方法是 Watcher 接口中的一个回调方法, 当 ZooKeeper 向客户端发送一个 Watcher 事件通知时, 客户端就会对相应的 process 方法进行回调, 从而实现对事件的处理. process 方法的定义如下:
abstract public void process(WatchedEvent event);
这个回调方法的定义非常简单, 我们重点看下方法的参数定义: WatchedEvent.WatchedEvent 包含了每一个事件的三个基本属性: 通知状态 (keeperState), 事件类型(EventType) 和节点路径(path),.ZooKeeper 使用 WatchedEvent 对象来封装服务端事件并传递给 Watcher, 从而方便回调方法 process 对服务端事件进行处理.
提到 WatchedEvent, 不得不讲下 WatcherEvent 实体. 笼统地讲, 两者表示的是同一个事物, 都是对一个服务端事件的封装. 不同的是, WatchedEvent 是一个逻辑事件, 用于服务端和客户端程序执行过程中所需的逻辑对象, 而 WatcherEvent 因为实现了序列化接口, 因此可以用于网络传输.
服务端在生成 WatchedEvent 事件之后, 会调用 getWrapper 方法将自己包装成一个可序列化的 WatcherEvent 事件, 以便通过网络传输到客户端. 客户端在接收到服务端的这个事件对象后, 首先会将 WatcherEvent 还原成一个 WatchedEvent 事件, 并传递给 process 方法处理, 回调方法 process 根据入参就能够解析出完整的服务端事件了.
需要注意的一点是, 无论是 WatchedEvent 还是 WatcherEvent, 其对 ZooKeeper 服务端事件的封装都是机及其简单的. 举个例子来说, 当 / zk-book 这个节点的数据发生变更时, 服务端会发送给客户端一个 "ZNode 数据内容变更" 事件, 客户端只能够接收到如下信
2.2 Watcher 代码
- public class ZkClientWatcher implements Watcher {
- // 集群连接地址
- private static final String CONNECT_ADDRES = "192.168.110.159:2181,192.168.110.160:2181,192.168.110.162:2181";
- // 会话超时时间
- private static final int SESSIONTIME = 2000;
- // 信号量, 让 zk 在连接之前等待, 连接成功后才能往下走.
- private static final CountDownLatch countDownLatch = new CountDownLatch(1);
- private static String LOG_MAIN = "[main]";
- private ZooKeeper zk;
- public void createConnection(String connectAddres, int sessionTimeOut) {
- try {
- zk = new ZooKeeper(connectAddres, sessionTimeOut, this);
- System.out.println(LOG_MAIN + "zk 开始启动连接服务器....");
- countDownLatch.await();
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- public boolean createPath(String path, String data) {
- try {
- this.exists(path, true);
- this.zk.create(path, data.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- System.out.println(LOG_MAIN + "节点创建成功, Path:" + path + ",data:" + data);
- } catch (Exception e) {
- e.printStackTrace();
- return false;
- }
- return true;
- }
- /**
- * 判断指定节点是否存在
- *
- * @param path
- * 节点路径
- */
- public Stat exists(String path, boolean needWatch) {
- try {
- return this.zk.exists(path, needWatch);
- } catch (Exception e) {
- e.printStackTrace();
- return null;
- }
- }
- public boolean updateNode(String path,String data) throws KeeperException, InterruptedException {
- exists(path, true);
- this.zk.setData(path, data.getBytes(), -1);
- return false;
- }
- public void process(WatchedEvent watchedEvent) {
- // 获取事件状态
- KeeperState keeperState = watchedEvent.getState();
- // 获取事件类型
- EventType eventType = watchedEvent.getType();
- // zk 路径
- String path = watchedEvent.getPath();
- System.out.println("进入到 process() keeperState:" + keeperState + ", eventType:" + eventType + ", path:" + path);
- // 判断是否建立连接
- if (KeeperState.SyncConnected == keeperState) {
- if (EventType.None == eventType) {
- // 如果建立建立成功, 让后程序往下走
- System.out.println(LOG_MAIN + "zk 建立连接成功!");
- countDownLatch.countDown();
- } else if (EventType.NodeCreated == eventType) {
- System.out.println(LOG_MAIN + "事件通知, 新增 node 节点" + path);
- } else if (EventType.NodeDataChanged == eventType) {
- System.out.println(LOG_MAIN + "事件通知, 当前 node 节点" + path + "被修改....");
- }
- else if (EventType.NodeDeleted == eventType) {
- System.out.println(LOG_MAIN + "事件通知, 当前 node 节点" + path + "被删除....");
- }
- }
- System.out.println("--------------------------------------------------------");
- }
- public static void main(String[] args) throws KeeperException, InterruptedException {
- ZkClientWatcher zkClientWatcher = new ZkClientWatcher();
- zkClientWatcher.createConnection(CONNECT_ADDRES, SESSIONTIME);
- // boolean createResult = zkClientWatcher.createPath("/p15", "pa-644064");
- zkClientWatcher.updateNode("/pa2","7894561");
- }
- }
来源: http://www.bubuko.com/infodetail-3198423.html