ZooKeeperServer, 为所有服务器的父类.
QuorumZooKeeperServer, 其是所有参与选举的服务器的父类, 是抽象类, 其继承了 ZooKeeperServer 类.
LeaderZooKeeperServer,Leader 服务器, 继承了 QuorumZooKeeperServer 类, 也会继承 ZooKeeperServer 中的很多方法.
LearnerZooKeeper, 其是 Learner 服务器的父类, 为抽象类, 也继承了 QuorumZooKeeperServer 类.
FollowerZooKeeperServer,Follower 服务器, 继承了 LearnerZooKeeper.
ObserverZooKeeperServer,Observer 服务器, 继承了 LearnerZooKeeper.
ReadOnlyZooKeeperServer, 只读服务器, 不提供写服务, 继承 QuorumZooKeeperServer.
ZooKeeperServer
1, 类的继承关系
public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {}
ZooKeeperServer 是 ZooKeeper 中所有服务器的父类, 其实现了 Session.Expirer 和 ServerStats.Provider 接口, SessionExpirer 中定义了 expire 方法 (表示会话过期) 和 getServerId 方法(表示获取服务器 ID), 而 Provider 则主要定义了获取服务器某些数据的方法.
2, 类属性
- protected static final Logger LOG;
- static {
- LOG = LoggerFactory.getLogger(ZooKeeperServer.class);
- Environment.logEnv("Server environment:", LOG);
- }
- //jmx 服务
- protected ZooKeeperServerBean jmxServerBean;
- protected DataTreeBean jmxDataTreeBean;
- // 默认心跳频率
- public static final int DEFAULT_TICK_TIME = 3000;
- protected int tickTime = DEFAULT_TICK_TIME;
- // 最小会话过期时间
- /** value of -1 indicates unset, use default */
- protected int minSessionTimeout = -1;
- // 最大会话过期时间
- /** value of -1 indicates unset, use default */
- protected int maxSessionTimeout = -1;
- protected SessionTracker sessionTracker;
- // 事务日志快照
- private FileTxnSnapLog txnLogFactory = null;
- // Zookeeper 内存数据库
- private ZKDatabase zkDb;
- private final AtomicLong hzxid = new AtomicLong(0);
- public final static Exception ok = new Exception("No prob");
- // 请求处理器
- protected RequestProcessor firstProcessor;
- protected volatile State state = State.INITIAL;
- protected enum State {
- INITIAL, RUNNING, SHUTDOWN, ERROR
- }
- /**
- * This is the secret that we use to generate passwords. For the moment,
- * it's more of a checksum that's used in reconnection, which carries no
- * security weight, and is treated internally as if it carries no
- * security weight.
- */
- static final private long superSecret = 0XB3415C00L;
- private final AtomicInteger requestsInProcess = new AtomicInteger(0);
- // 未处理的 ChangeRecord
- final Deque<ChangeRecord> outstandingChanges = new ArrayDeque<>();
- // this data structure must be accessed under the outstandingChanges lock
- // 记录 path 对应的 ChangeRecord
- final HashMap<String, ChangeRecord> outstandingChangesForPath =
- new HashMap<String, ChangeRecord>();
- protected ServerCnxnFactory serverCnxnFactory;
- protected ServerCnxnFactory secureServerCnxnFactory;
- // 服务器统计数据
- private final ServerStats serverStats;
- private final ZooKeeperServerListener listener;
- private ZooKeeperServerShutdownHandler zkShutdownHandler;
- private volatile int createSessionTrackerServerId = 1;
3, 核心函数
3.1 loadData
该函数用于加载数据, 其首先会判断内存库是否已经加载设置 zxid, 之后会调用 killSession 函数删除过期的会话
- if(zkDb.isInitialized()){ // 内存数据库已被初始化
- // 设置为最后处理的 Zxid
- setZxid(zkDb.getDataTreeLastProcessedZxid());
- }
- else {
- // 未被初始化, 则加载数据库
- setZxid(zkDb.loadDataBase());
- }
- // Clean up dead sessions
- LinkedList<Long> deadSessions = new LinkedList<Long>();
- for (Long session : zkDb.getSessions()) {// 遍历所有的会话
- if (zkDb.getSessionWithTimeOuts().get(session) == null) {
- deadSessions.add(session);
- }
- }
- for (long session : deadSessions) { // 删除过期的会话
- // XXX: Is lastProcessedZxid really the best thing to use?
- killSession(session, zkDb.getDataTreeLastProcessedZxid());
- }
- // Make a clean snapshot
- // 初始化一个快照
- takeSnapshot();
- 3.2,submitRequest
提交请求, 处理器进行处理
- public void submitRequest(Request si) {
- if (firstProcessor == null) {// 第一个处理器为空
- synchronized (this) {
- try {
- // Since all requests are passed to the request
- // processor it should wait for setting up the request
- // processor chain. The state will be updated to RUNNING
- // after the setup.
- // 服务器调用链还未初始化完成
- while (state == State.INITIAL) {
- wait(1000);
- }
- } catch (InterruptedException e) {
- LOG.warn("Unexpected interruption", e);
- }
- if (firstProcessor == null || state != State.RUNNING) {
- throw new RuntimeException("Not started");
- }
- }
- }
- try {
- touch(si.cnxn);
- // 是否为合法的请求
- boolean validpacket = Request.isValid(si.type);
- if (validpacket) {
- // 调用链第一处理器开始处理
- firstProcessor.proce***equest(si);
- if (si.cnxn != null) {
- incInProcess();
- }
- } else {
- LOG.warn("Received packet at server of unknown type" + si.type);
- new UnimplementedRequestProcessor().proce***equest(si);
- }
- } catch (MissingSessionException e) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Dropping request:" + e.getMessage());
- }
- } catch (RequestProcessorException e) {
- LOG.error("Unable to process request:" + e.getMessage(), e);
- }
- }
LeaderZooKeeperServer 介绍
1, 类属性
- // 提交请求处理器
- CommitProcessor commitProcessor;
- // 处理链请求第一个处理处理器
- PrepRequestProcessor prepRequestProcessor;
2, 构造方法
- LeaderZooKeeperServer(FileTxnSnapLog logFactory, QuorumPeer self, ZKDatabase zkDb) throws IOException {
- super(logFactory, self.tickTime, self.minSessionTimeout, self.maxSessionTimeout, zkDb, self);
- }
直接调用父类 QuorumZooKeeperServer 的构造函数, 然后再调用 ZooKeeperServer 的构造函数, 逐级构造.
3, 核心函数
- 3.1,setupRequestProcessors
- @Override
- protected void setupRequestProcessors() {
- // 创建 FinalRequestProcessor, 处理链最后一个处理器
- RequestProcessor finalProcessor = new FinalRequestProcessor(this);
- // 创建 ToBeAppliedRequestProcessor
- RequestProcessor toBeAppliedProcessor = new Leader.ToBeAppliedRequestProcessor(finalProcessor, getLeader());
- // 创建 CommitProcessor, 提交处理器
- commitProcessor = new CommitProcessor(toBeAppliedProcessor,
- Long.toString(getServerId()), false,
- getZooKeeperServerListener());
- // 启动 CommitProcessor
- commitProcessor.start();
- // 创建 ProposalRequestProcessor
- ProposalRequestProcessor proposalProcessor = new ProposalRequestProcessor(this,
- commitProcessor);
- // 初始化 ProposalProcessor
- proposalProcessor.initialize();
- // 创建 PrepRequestProcessor, 作为以第一个处理链处理器
- prepRequestProcessor = new PrepRequestProcessor(this, proposalProcessor);
- prepRequestProcessor.start();
- // firstProcessor 为 PrepRequestProcessor
- firstProcessor = new LeaderRequestProcessor(this, prepRequestProcessor);
- setupContainerManager();
- }
setupRequestProcessors 函数表示创建处理链, 可以看到其处理链的顺序为 PrepRequestProcessor -> ProposalRequestProcessor -> CommitProcessor -> Leader.ToBeAppliedRequestProcessor -> FinalRequestProcessor.
zookeeper(14)源码分析 - 服务器(1)
来源: http://www.bubuko.com/infodetail-3337607.html