在 zk 服务器集群启动过程中, 经 QuorumPeerMain 中, 不光会创建 ZooKeeperServer 对象, 同时会生成 QuorumPeer 对象, 代表了 ZooKeeper 集群中的一台机器. 在整个机器运行期间, 负责维护该机器的运行状态, 同时会根据情况发起 Leader 选举. 下图是 《从 PAXOS 到 ZOOKEEPER 分布式一致性原理与实践》的服务器启动流程.
QuorumPeer 是一个独立的线程, 维护着 zk 机器的状态.
- @Overridepublic synchronized void start() {
- loadDataBase();
- cnxnFactory.start();
- startLeaderElection(); super.start();
- }
本次主要介绍的是选举相关的内容, 至于其他操作可以看其他博客. 之后的行文都是从 startLeaderElection 中衍生出来的.
基本概念:
SID: 服务器 ID, 用来标示 ZooKeeper 集群中的机器, 每台机器不能重复, 和 myid 的值一直
ZXID: 事务 IDVote: 选票, 具体的数据结构后面有
Quorum: 过半机器数
选举轮次: logicalclock,zk 服务器 Leader 选举的轮次
服务器类型:
在 zk 中, 引入了 Leader,Follwer 和 Observer 三种角色. zk 集群中的所有机器通过一个 Leader 选举过程来选定一台被称为 Leader 的机器, Leader 服务器为客户端提供读和写服务. Follower 和 Observer 都能够提供读服务, 唯一的区别在于, Observer 机器不参与 Leader 选举过程, 也不参与写操作的过半写成功策略. 因此, Observer 存在的意义是: 在不影响写性能的情况下提升集群的读性能.
服务器状态:
+ LOOKING:Leader 选举阶段 + FOLLOWING:Follower 服务器和 Leader 保持同步状态 + LEADING:Leader 服务器作为主进程领导状态.+ OBSERVING: 观察者状态, 表明当前服务器是 Observer, 不参与投票
选举的目的就是选择出合适的 Leader 机器, 由 Leader 机器决定事务性的 Proposal 处理过程, 实现类两阶段提交协议 (具体是 ZAB 协议)
QuorumPeer 维护集群机器状态
QuorumPeer 的职责就是不断地检测当前的 zk 机器的状态, 执行对应的逻辑, 简单来说, 就是根据服务所处的不同状态执行不同的逻辑. 删除了一部分逻辑后, 代码如下:
- @Overridepublic void run() {
- setName("QuorumPeer" + "[myid=" + getId() + "]" +
- cnxnFactory.getLocalAddress());
- try {
- while (running) {
- switch (getPeerState()) {
- case LOOKING:
- LOG.info("LOOKING");
- try {
- setBCVote(null); setCurrentVote(makeLEStrategy().lookForLeader());
- }
- catch (Exception e) {
- LOG.warn("Unexpected exception", e); setPeerState(ServerState.LOOKING);
- }
- break;
- case OBSERVING:
- try {
- LOG.info("OBSERVING"); setObserver(makeObserver(logFactory)); observer.observeLeader();
- } catch (Exception e) { LOG.warn("Unexpected exception",e );
- } finally {
- observer.shutdown();
- setObserver(null);
- setPeerState(ServerState.LOOKING);
- }
- break;
- case FOLLOWING:
- try {
- LOG.info("FOLLOWING"); setFollower(makeFollower(logFactory)); follower.followLeader();
- } catch (Exception e) {
- LOG.warn("Unexpected exception",e);
- } finally {
- follower.shutdown();
- setFollower(null);
- setPeerState(ServerState.LOOKING);
- }
- break;
- case LEADING:
- LOG.info("LEADING");
- try {
- setLeader(makeLeader(logFactory));
- leader.lead();
- setLeader(null);
- } catch (Exception e) { LOG.warn("Unexpected exception",e);
- } finally {
- if (leader != null) {
- leader.shutdown("Forcing shutdown"); setLeader(null);
- }
- setPeerState(ServerState.LOOKING);
- }
- break;
- }
- }
- } finally {
- LOG.warn("QuorumPeer main thread exited");
- }
- }
当机器处于 LOOKING 状态时, QuorumPeer 会进行选举, 但是具体的逻辑并不是由 QuorumPeer 来负责的, 整体的投票过程独立出来了, 从逻辑执行的角度看, 整个过程设计到两个主要的环节:
与其他的 zk 集群通信的过程
实现具体的选举算法
而 QuorumPeer 中默认使用的选举算法是 FastLeaderElection, 之后的分析也是基于 FastLeaderElection 而言的.
选举过程中的整体架构
在集群启动的过程中, QuorumPeer 会根据配置实现不同的选举策略 this.electionAlg = createElectionAlgorithm(electionType);
- protected Election createElectionAlgorithm(int electionAlgorithm){
- Election le=null; switch (electionAlgorithm) { case 3:
- QuorumCnxManager qcm = new QuorumCnxManager(this);
- QuorumCnxManager.Listener listener = qcm.listener; if(listener != null){
- listener.start();
- le = new FastLeaderElection(this, qcm);
- } else {
- LOG.error("Null listener when initializing cnx manager");
- } break; default: assert false;
- } return le;
- }
如果 ClientCnxn 是 zk 客户端中处理 IO 请求的管理器, QuorumCnxManager 是 zk 集群间负责选举过程中网络 IO 的管理器, 在每台服务器启动的时候, 都会启动一个 QuorumCnxManager, 用来维持各台服务器之间的网络通信.
对于每一台 zk 机器, 都需要建立一个 TCP 的端口监听, 在 QuorumCnxManager 中交给 Listener 来处理, 使用的是 Socket 的阻塞式 IO(默认监听的端口是 3888, 是在 config 文件里面设置的). 在两两相互连接的过程中, 为了避免两台机器之间重复地创建 TCP 连接, zk 制定了连接的规则: 只允许 SID 打的服务器主动和其他服务器建立连接. 实现的方式也比较简单, 在 receiveConnection 中, 服务器会对比与自己建立连接的服务器的 SID, 判断是否接受请求, 如果自己的 SID 更大, 那么会断开连接, 然后自己主动去和远程服务器建立连接. 这段逻辑是由 Listener 来做的, 且 Listener 独立线程, receivedConnection, 建立连接后的示意图:
QuorumCnxManager 是连接的管家, 具体的 TCP 连接交给了 Listener, 但是对于选票的管理, 内部还维护了一系列的队列:
recvQueue: 消息接收队列, 用来存放那些从其他服务器接收到的消息, 单独的队列
分组队列 (quorumCnxManager 中将 zk 集群中的每台机器按照 SID 单独分组形成队列集合):
queueSendMap: 消息发送队列, 用于保存待发送的消息. new ConcurrentHashMap
senderWorderMap: 发送器集合. 每个 SendWorder 消息发送器, 都对应一台远程 zk 服务器, 负责消息的发放.
lastMessageSent: 最近发送过的消息, 按照 SID 分组
基本的通信流程如下:
以上内容主要是建立各台 zk 服务器之间的连接通信过程, 具体的选举策略 zk 抽象成了, 主要分析的是 FastLeaderElection 方式 (选举算法的核心部分):
- public interface Election {
- public Vote lookForLeader() throws InterruptedException; public void shutdown();
- }
FastLeaderElection 选举算法
上面说过 QuorumPeer 检测到当前服务器的状态是 LOOKING 的时候, 就会进行新一轮的选举, 通过 setCurrentVote(makeLEStrategy().lookForLeader()); 也就是 FastLeaderElection 的 lookForLeader 来进行初始选择, 实现的方式也很简单, 主要的逻辑在 FastLeaderElection.lookForLeader 中实现:
基本流程先说明一下:
QuorumPeer 会轮询检查当前服务器状态, 如果发现 State 是 LOOKING, 调用 Election 的 lookForLeader 来开始新一轮的选举
FastLeaderElection 会首先将 logicallock++, 表示新的一轮选举开始了
构造初始的选票, Vote 的内容就是选自己, 然后通知 zk 集群中的其他机器
FastLeaderElection 会一直轮询查状态, 只要是 LOOKING 态, 就会从 recvqueue 中获取其他服务器同步的选票信息, 为了方便说明, 记录为 n
根据 n 的票选信息状态, 做相关的操作
LOOKING: 都处于无 Leader 态, 比较一下选票的优劣, 看是否更新自己的选票, 如果更新了就同时通知给其他服务器
FOLLOWING,LEADING: 说明集群中已经有 Leader 存在, 更新一下自己的状态, 结束本轮投票
OBSERVING: 这票没什么卵用, 直接舍弃 (OBSERVER 是不参与投票的)
根据上面的流程, 可以大概说明一下 FasterLeaderElection 确定选票更优的策略:
如果外部投票中被推举的 Leader 服务器选举轮次大于自身的轮次, 那么就更新选票
如果选举轮次一致, 就对比两者的 ZXID,ZAB 协议中 ZXID 越大的留存的信息也越多, 因此如果 ZXID 大于自己的, 那么就更新选票
如果 ZXID 也一致, 对比两者的 SID,SID 大, 则优先级高
总结:
以上就是 zk 的默认选票流程, 按照 ZAB 协议的两种状态分析:
初始化的时候, 处于同一轮次进行投票直到投票选择出一个 Leader
崩溃恢复阶段:
Leader 服务器挂了, 那么经历的和初始化流程类似的过程, 选择 Leader
Follower 服务器挂了, 那么自己在执行选举的过程中, 会收到其他服务器给的 Leader 选票信息, 也可以确定 Leader 所属
来源: http://server.51cto.com/sOS-588269.htm