上次博客我们说了一下 zookeeper 的配置文件, 以及命令的使用 https://www.cnblogs.com/cxiaocai/p/11597465.html. 我们这次来说一下我们的 zookeeper 的集群配置和 java 的 API 相关操作.
集群:
一般情况下我们用 zookeeper 来做任务调度中心的, 所以一定要做到高可用的, 单机的不可能做到永不宕机, 我们也不会信任他单机的永不宕机, 这时我们就需要做集群处理, 来实现我们的高可用.
配置集群时, 我们尽可能采用奇数的服务器来配置, 什么意思呢? 尽力采用 3,5,7,9 台服务器来配置, 原因是 zookeeper 会默认识别半数以上服务器正常运行, 才认为 zookeeper 是正常运行的, 比如我们现在部署 4 台 zookeeper 服务器, 这时其中两台宕机了, 这时 zookeeper 会认为这个集群时不可用的, 同理我们如果是 5 台服务器的情况, 有两台宕机了, 可以正常运行, 三台宕机了, 才被认为是不可用的, 这个很重要, 包括后面的选举机制也是这样的. 资金有限啊, 我先用 3 台服务器搭建一下 zookeeper 集群.
1. 集群配置
下载解压什么的就不说了啊, 上次都说过了, 我们直接看下配置文件吧. 和单机配置基本一致, 我们看到 dataDir=/tmp/zookeeper, 也就是我们的数据存储路径, 分别建立三个文件 myid, 内部输入 1-255 的数字
每台服务器别重复, 切记一定建立在配置文件 dataDir 对应的目录下, 不然启动会报找不到 myid 文件的错误, 没有对应 / tmp/zookeeper 目录的, 可以启动一下 zookeeper 再关闭就有文件夹了, 或者自己手动创建也行.
再每一个配置文件内加入配置
- server.1=172.16.140.106:2888:3888
- server.2=172.16.140.105:2888:3888
- server.3=172.16.214.74:2888:3888
server.myid(myid 文件的数字)=ip(与 myid 相对应的 IP): 集群之间相互通讯的 IP: 选举时通讯的 IP. 三分配置文件都是一样的.
我们来分别启动一下. 说到这里我们的集群配置也就成功了.
启动成功以后, 我们分别输入./bin/zkServer.sh status 我们可以看到我们的服务器角色
2. 角色:
leader 主节点, 又名领导者. 用于写入数据, 通过选举产生, 如果宕机将会选举新的主节点.
follower 子节点, 又名追随者. 用于实现数据的读取. 同时他也是主节点的备选节点, 并用拥有投票权.
observer 次级子节点, 又名观察者. 用于读取数据, 与 fllower 区别在于没有投票权, 不能选为主节点. 并且在计算集群可用状态时不会将 observer 计算入内. 也就是我们的半数原则计算.
observer 配置:
只要在集群配置中加上 observer 后缀即可, 示例如下:
server.3=127.0.0.1:2889:3889:observer
选举机制:
先说一个简单的, 投票机制的. 假设我们现在有 1,2,3,4,5 五个 follower 要进行选举.
简单流程就是这样的, 第一轮都认为自己很可以, 自己要当选 leader, 但是选举流程失败了, 还得继续, 接下来会把自己的票全盘拖出给自己临近的 id,1 就会给 2 一票, 2 现在有了两票了, 发现还是不够半数啊, 半数是 2.5 啊, 算了还得继续, 2 又把自己的两票都给了 3,3 这时获得了 3 票了, 大于半数了, 当选 leader.
每轮选举结束后都会统一来处理, 如果一轮投票就发现 server1 的 zxid 较大, 那么直接 server1 会当选 leader.
优先检查 ZXID.ZXID 比较大的服务器优先作为 Leader.
如果 ZXID 相同, 那么就比较 myid.myid 较大的服务器作为 Leader 服务器.
留下一个思考题, 5 台服务器, 如果启动可以指定 4 号为 leader 服务 .
javaAPI 相关操作
maven 的 pom 文件内加入
- <dependency>
- <groupId>org.apache.zookeeper</groupId>
- <artifactId>zookeeper</artifactId>
- <version>3.5.5</version>
- </dependency>
首先我们先初始化我们的 Zookeeper 的连接
connectString -> 连接 String 连接串, 包括 ip + 端口 , 集群模式下用逗号隔开 192.168.0.149:2181,192.168.0.150:2181
sessionTimeout-> 会话超时时间, 类型 int, 该值不能超过服务端所设置的 minSessionTimeout(默认 2s) 和 maxSessionTimeout(默认 60s), 单位毫秒
watcher-> 会话监听器 Watcher, 服务端事件将会触该监听
sessionId -> 自定义会话 ID long
sessionPasswd ->byte[] 会话密码
canBeReadOnly ->boolean 该连接是否为只读的
hostProvider ->HostProvider 服务端地址提供者, 指示客户端如何选择某个服务来调用, 默认采用 StaticHostProvider 实现
- @Before
- public void init() throws IOException {
- String conn = "47.111.109.3:2181"; // 连接字符串
- int sessionTimeout = 4000; // 连接超时时间
- zooKeeper = new ZooKeeper(conn, sessionTimeout, new Watcher() {
- public void process(WatchedEvent watchedEvent) {
- }
- });
- }
我们先来看看我们的新增节点, 删除节点等操作吧.
查看节点:
我们使用 getData 方法, 添加三个参数, 分别是路径, 是否监听, 和返回值 (状态 stat).
- /**
- * 获取数据
- *
- * @throws KeeperException
- * @throws InterruptedException
- */
- @Test
- public void getData() throws KeeperException, InterruptedException {
- byte[] data = zooKeeper.getData("/root", false, null);
- System.out.println(new String(data));
- }
添加监听:
- /**
- * 添加监听, 结果在初始化里
- *
- * @throws KeeperException
- * @throws InterruptedException
- */
- @Test
- public void getWatchData() throws KeeperException, InterruptedException {
- byte[] data = zooKeeper.getData("/root", true, null);
- System.out.println(new String(data));
- Thread.sleep(Long.MAX_VALUE);
- }
这个监听是一次性的, 而且结果在我们的初始化的 watch 里, 初始化方法改为
- @Before
- public void init() throws IOException {
- String conn = "47.111.109.3:2181"; // 连接字符串
- int sessionTimeout = 4000; // 连接超时时间
- zooKeeper = new ZooKeeper(conn, sessionTimeout, new Watcher() {
- public void process(WatchedEvent watchedEvent) {
- System.out.println(watchedEvent.getPath());
- }
- });
- }
永久监听设置, 我们只要将 Watcher 新建一下就可以了吗... 我们来看一下实现
- /**
- * 永久监听
- *
- * @throws KeeperException
- * @throws InterruptedException
- */
- @Test
- public void getWatchDataForever() throws KeeperException, InterruptedException {
- byte[] data = zooKeeper.getData("/root", new Watcher() {
- public void process(WatchedEvent watchedEvent) {
- try {
- zooKeeper.getData(watchedEvent.getPath(),this,null);
- } catch (KeeperException e) {
- e.printStackTrace();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- System.out.println(watchedEvent.getPath());
- }
- }, null);
- System.out.println(new String(data));
- Thread.sleep(Long.MAX_VALUE);
- }
- stat:
- /**
- * Stat
- *
- * @throws KeeperException
- * @throws InterruptedException
- */
- @Test
- public void getDataStat() throws InterruptedException, KeeperException {
- Stat stat = new Stat();
- zooKeeper.getData("/root",false, stat);
- System.out.println(stat);
- }
输出结果和我们命令 stat 的输入其实是完全一致的, 只不过一个事 16 进制, 一个事 10 进制的, 可以自己对比一下.
获取子节点 :
- /**
- * 获取子节点
- *
- * @throws KeeperException
- * @throws InterruptedException
- */
- @Test
- public void getWatchChildDataForever() throws KeeperException, InterruptedException {
- List<String> children = zooKeeper.getChildren("/root", false);
- for (int i = 0; i <children.size(); i++) {
- System.out.println(children.get(i));
- }
- }
删除节点:
- /**
- * 删除节点
- *
- * @throws KeeperException
- * @throws InterruptedException
- */
- @Test
- public void deletePath() throws KeeperException, InterruptedException {
- zooKeeper.delete("/root/d2", 0);
- }
创建节点:
- /**
- * 创建节点
- *
- * @throws KeeperException
- * @throws InterruptedException
- */
- @Test
- public void createPath() throws KeeperException, InterruptedException {
- List<ACL> acl = new ArrayList<ACL>();
- int perm = ZooDefs.Perms.ADMIN | ZooDefs.Perms.CREATE | ZooDefs.Perms.READ;
- ACL aclObj = new ACL(perm, new Id("world", "anyone"));
- acl.add(aclObj);
- zooKeeper.create("/root/d4", "hello".getBytes(), acl, CreateMode.CONTAINER);
- }
说到这我们的 API 和集群操作就差不多说完了.
这次代码不多, 就先不上传了, 写完下次博客再一起上传.
来源: https://www.cnblogs.com/cxiaocai/p/11607286.html