zookeeper
为了向您介绍ZooKeeper Java API,我们在这里开发了一个非常简单的观看客户端。该ZooKeeper客户端通过启动或停止程序来观察ZooKeeper节点的更改并进行响应。
要求
有四个要求:
1.它作为参数:
ZooKeeper服务的地址
那么znode的名字就是被观看的
具有参数的可执行文件
2.它获取与znode相关联的数据,并启动可执行文件。
3.如果znode更改,客户端将重新启动内容并重新启动可执行文件。
4.如果znode消失,客户端将杀死可执行文件。
程序设计
通常,ZooKeeper应用程序分为两个单元,一个维护连接,另一个用于监视数据。在此应用程序中,名为Executor的类维护ZooKeeper连接,并且名为DataMonitor的类监视ZooKeeper树中的数据。
此外,Executor包含主线程并包含执行逻辑。它负责什么样的用户交互,以及与您作为参数传递的可执行程序的交互以及根据znode的状态关闭和重新启动示例。
1. Executor.java
- package com.hellojd.cloud;
- import org.apache.zookeeper.KeeperException;
- import org.apache.zookeeper.WatchedEvent;
- import org.apache.zookeeper.Watcher;
- import org.apache.zookeeper.ZooKeeper;
- import java.io.FileOutputStream;
- import java.io.IOException;
- import java.io.InputStream;
- import java.io.OutputStream;
- /**
- * A simple example program to use DataMonitor to start and
- * stop executables based on a znode. The program watches the
- * specified znode and saves the data that corresponds to the
- * znode in the filesystem. It also starts the specified program
- * with the specified arguments when the znode exists and kills
- * the program if the znode goes away.
- */
- public class Executor
- implements Watcher, Runnable, DataMonitor.DataMonitorListener
- {
- DataMonitor dm;
- ZooKeeper zk;
- String filename;
- String exec[];
- Process child;
- public Executor(String hostPort, String znode, String filename,
- String exec[]) throws KeeperException, IOException {
- this.filename = filename;
- this.exec = exec;
- zk = new ZooKeeper(hostPort, 3000, this);
- dm = new DataMonitor(zk, znode, null, this);
- }
- /**
- * @param args
- */
- public static void main(String[] args) {
- if (args.length < 4) {
- System.err
- .println("USAGE: Executor hostPort znode filename program [args ...]");
- System.exit(2);
- }
- String hostPort = args[0];
- String znode = args[1];
- String filename = args[2];
- String exec[] = new String[args.length - 3];
- System.arraycopy(args, 3, exec, 0, exec.length);
- try {
- new Executor(hostPort, znode, filename, exec).run();
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- /***************************************************************************
- * We do process any events ourselves, we just need to forward them on.
- *
- * @see org.apache.zookeeper.Watcher#process(org.apache.zookeeper.proto.WatcherEvent)
- */
- public void process(WatchedEvent event) {
- System.out.println("Watcher process");
- dm.process(event);
- }
- public void run() {
- try {
- synchronized (this) {
- while (!dm.dead) {
- wait();
- }
- }
- } catch (InterruptedException e) {
- }
- }
- //以响应ZooKeeper连接永久消失。
- public void closing(int rc) {
- synchronized (this) {
- notifyAll();
- }
- }
- static class StreamWriter extends Thread {
- OutputStream os;
- InputStream is;
- StreamWriter(InputStream is, OutputStream os) {
- this.is = is;
- this.os = os;
- start();
- }
- public void run() {
- byte b[] = new byte[80];
- int rc;
- try {
- while ((rc = is.read(b)) > 0) {
- os.write(b, 0, rc);
- }
- } catch (IOException e) {
- }
- }
- }
- public void exists(byte[] data) {
- if (data == null) {
- if (child != null) {
- System.out.println("Killing process");
- child.destroy();
- try {
- child.waitFor();
- } catch (InterruptedException e) {
- }
- }
- child = null;
- } else {
- if (child != null) {
- System.out.println("Stopping child");
- child.destroy();
- try {
- child.waitFor();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- //保存znode数据至文件
- try {
- FileOutputStream fos = new FileOutputStream(filename);
- fos.write(data);
- fos.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
- try {
- System.out.println("Starting child");
- child = Runtime.getRuntime().exec(exec);
- new StreamWriter(child.getInputStream(), System.out);
- new StreamWriter(child.getErrorStream(), System.err);
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- }
- }
2. DataMonitor.java
- /**
- * A simple class that monitors the data and existence of a ZooKeeper
- * node. It uses asynchronous ZooKeeper APIs.
- */
- package com.hellojd.cloud;
- import java.util.Arrays;
- import org.apache.zookeeper.KeeperException;
- import org.apache.zookeeper.WatchedEvent;
- import org.apache.zookeeper.Watcher;
- import org.apache.zookeeper.ZooKeeper;
- import org.apache.zookeeper.AsyncCallback.StatCallback;
- import org.apache.zookeeper.KeeperException.Code;
- import org.apache.zookeeper.data.Stat;
- /**
- * 另一方面,DataMonitorListener接口不是ZooKeeper API的一部分。 它是一个完全定制的界面,专为此示例应用程序而设计。
- * DataMonitor对象使用它来回传给它的容器,它也是Executor对象。
- */
- public class DataMonitor implements StatCallback {
- //Executor或一些类似Executor的对象“拥有”ZooKeeper连接,但可以将事件委托给其他事件到其他对象。
- ZooKeeper zk;
- String znode;
- Watcher chainedWatcher;
- boolean dead;
- //简单地将这些事件转发到DataMonitor来决定如何处理它们
- DataMonitorListener listener;
- byte prevData[];
- // 它主要是异步和事件驱动
- public DataMonitor(ZooKeeper zk, String znode, Watcher chainedWatcher,
- DataMonitorListener listener) {
- this.zk = zk;
- this.znode = znode;
- this.chainedWatcher = chainedWatcher;
- this.listener = listener;
- // Get things started by checking if the node exists. We are going
- // to be completely event driven
- zk.exists(znode, true, this, null);
- }
- /**
- * 该接口在DataMonitor类中定义,并在Executor类中实现。 当调用Executor.exists()时,执行器根据要求决定是启动还是关闭。
- * 当znode不再存在时,需要说的是杀死可执行文件。
- */
- public interface DataMonitorListener {
- /**
- * The existence status of the node has changed.
- */
- void exists(byte data[]);
- /**
- * The ZooKeeper session is no longer valid.
- *
- * @param rc
- * the ZooKeeper reason code
- */
- void closing(int rc);
- }
- //响应ZooKeeper状态的更改
- public void process(WatchedEvent event) {
- String path = event.getPath();
- if (event.getType() == Watcher.Event.EventType.None) {
- // We are are being told that the state of the
- // connection has changed
- switch (event.getState()) {
- case SyncConnected:
- // In this particular example we don‘t need to do anything
- // here - watches are automatically re-registered with
- // server and any watches triggered while the client was
- // disconnected will be delivered (in order of course)
- break;
- case Expired:
- // It‘s all over
- dead = true;
- listener.closing(KeeperException.Code.SessionExpired);
- break;
- }
- } else {
- if (path != null && path.equals(znode)) {
- // Something has changed on the node, let‘s find out
- zk.exists(znode, true, this, null);
- }
- }
- if (chainedWatcher != null) {
- chainedWatcher.process(event);
- }
- }
- public void processResult(int rc, String path, Object ctx, Stat stat) {
- /**
- * 首先检查znode存在,致命错误和可恢复错误的错误代码。
- * 如果文件(或znode)存在,它将从znode获取数据,然后调用Executor的exists()回调,
- * 如果状态已更改。 注意,它不必对getData调用执行异常处理,因为它具有挂起的任何可能导致错误的监视器:
- * 如果节点在调用ZooKeeper.getData()之前被删除,则由ZooKeeper设置的监视事件 .exists()触发回调;
- *如果发生通信错误,连接回显将触发连接监视事件。
- */
- boolean exists;
- switch (rc) {
- case Code.Ok:
- exists = true;
- break;
- case Code.NoNode:
- exists = false;
- break;
- case Code.SessionExpired:
- case Code.NoAuth:
- dead = true;
- listener.closing(rc);
- return;
- default:
- // Retry errors
- zk.exists(znode, true, this, null);
- return;
- }
- //文件(或znode)存在
- byte b[] = null;
- if (exists) {
- try {
- b = zk.getData(znode, false, null);
- } catch (KeeperException e) {
- // We don‘t need to worry about recovering now. The watch
- // callbacks will kick off any exception handling
- e.printStackTrace();
- } catch (InterruptedException e) {
- return;
- }
- }
- if ((b == null && b != prevData)
- || (b != null && !Arrays.equals(prevData, b))) {
- listener.exists(b);
- prevData = b;
- }
- }
- }
调试:
参数列表:192.168.0.10:2181 /hellojd_node filename calc
192.168.0.10:2181:ZK地址
/hellojd_node :监视node
filename :备份数据文件
calc:命令
来源: http://www.bubuko.com/infodetail-2362193.html