LearnerZooKeeperServer 是所有 Follower 和 Observer 的父类, 在 LearnerZooKeeperServer 里有 2 个重要的属性:
- // 提交请求处理器
- protected CommitProcessor commitProcessor;
- // 同步处理器
- protected SyncRequestProcessor syncProcessor;
FollowerZooKeeperServer 和 ObserverZooKeeperServer 都继承了 LearnerZooKeeperServer 服务器.
1,FollowerZooKeeperServer
1.1, 类属性
- // 待同步的请求
- ConcurrentLinkedQueue<Request> pendingSyncs;
- // 待处理的事务请求
- LinkedBlockingQueue<Request> pendingTxns = new LinkedBlockingQueue<Request>();
1.2, 核心函数
1.2.1,setupRequestProcessors
构建请求处理链, FollowerZooKeeperServer 的请求处理链是:
- FollowerRequestProcessor -> CommitProcessor ->FinalRequestProcessor
- @Override
- protected void setupRequestProcessors() {
- // 最后的处理器
- RequestProcessor finalProcessor = new FinalRequestProcessor(this);
- // 第二个处理器
- commitProcessor = new CommitProcessor(finalProcessor,
- Long.toString(getServerId()), true, getZooKeeperServerListener());
- commitProcessor.start();
- // 第一个请求处理器 FollowerRequestProcessor
- firstProcessor = new FollowerRequestProcessor(this, commitProcessor);
- ((FollowerRequestProcessor) firstProcessor).start();
- syncProcessor = new SyncRequestProcessor(this,
- new SendAckRequestProcessor((Learner)getFollower()));
- syncProcessor.start();
- }
- 1.2.2,logRequest
该函数将请求进行记录 (放入到对应的队列中), 等待处理.
- public void logRequest(TxnHeader hdr, Record txn) {
- Request request = new Request(hdr.getClientId(), hdr.getCxid(), hdr.getType(), hdr, txn, hdr.getZxid());
- //zxid 不等于 0, 说明此服务器已经处理过请求
- if ((request.zxid & 0xffffffffL) != 0) {
- // 将该请求放入 pendingTxns 中, 等待事务处理
- pendingTxns.add(request);
- }
- // 使用 SyncRequestProcessor 处理请求 (其会将请求放在队列中, 异步进行处理)
- syncProcessor.proce***equest(request);
- }
- 1.2.3,commit
函数会提交 zxid 对应的请求 (pendingTxns 的队首元素), 其首先会判断队首请求对应的 zxid 是否为传入的 zxid, 然后再进行移除和提交 (放在 committedRequests 队列中).
- public void commit(long zxid) {
- // 没有还在等待处理的事务
- if (pendingTxns.size() == 0) {
- LOG.warn("Committing" + Long.toHexString(zxid)
- + "without seeing txn");
- return;
- }
- // 队首元素的 zxid
- long firstElementZxid = pendingTxns.element().zxid;
- // 如果队首元素的 zxid 不等于需要提交的 zxid, 则退出程序
- if (firstElementZxid != zxid) {
- LOG.error("Committing zxid 0x" + Long.toHexString(zxid)
- + "but next pending txn 0x"
- + Long.toHexString(firstElementZxid));
- System.exit(12);
- }
- // 从待处理事务请求队列中移除队首请求
- Request request = pendingTxns.remove();
- // 提交该请求
- commitProcessor.commit(request);
- }
- 2,ObserverZooKeeperServer
2.1, 类属性
- // 同步处理器是否可用, 系统参数控制
- private boolean syncRequestProcessorEnabled = this.self.getSyncEnabled();
- // 待同步请求队列
- ConcurrentLinkedQueue<Request> pendingSyncs =
- new ConcurrentLinkedQueue<Request>();
2.2, 核心方法
2.2.1,setupRequestProcessors
构建请求处理链, ObserverZooKeeperServer 的请求处理链是: ObserverRequestProcessor->CommitProcessor->FinalRequestProcessor, 可能会存在 SyncRequestProcessor.
- @Override
- protected void setupRequestProcessors() {
- // We might consider changing the processor behaviour of
- // Observers to, for example, remove the disk sync requirements.
- // Currently, they behave almost exactly the same as followers.
- RequestProcessor finalProcessor = new FinalRequestProcessor(this);
- commitProcessor = new CommitProcessor(finalProcessor, Long.toString(getServerId()), true, getZooKeeperServerListener());
- commitProcessor.start();
- firstProcessor = new ObserverRequestProcessor(this, commitProcessor);
- ((ObserverRequestProcessor) firstProcessor).start();
- /*
- * Observer should write to disk, so that the it won't request
- * too old txn from the leader which may lead to getting an entire
- * snapshot.
- *
- * However, this may degrade performance as it has to write to disk
- * and do periodic snapshot which may double the memory requirements
- */
- // 是否使用同步处理器, 看系统参数配置, 会影响性能
- if (syncRequestProcessorEnabled) {
- syncProcessor = new SyncRequestProcessor(this, null);
- syncProcessor.start();
- }
- }
- 2.2.2,commitRequest
同步处理器可用, 则使用同步处理器进行处理 (放入同步处理器的 queuedRequests 队列中), 然后提交请求 (放入提交请求处理器的 committedRequests 队列中)
- public void commitRequest(Request request) {
- if (syncRequestProcessorEnabled) {
- // Write to txnlog and take periodic snapshot
- // 写事务日志, 并定期快照
- syncProcessor.proce***equest(request);
- }
- commitProcessor.commit(request);
- }
来源: http://www.bubuko.com/infodetail-3343142.html