一、前言
前面已经学习了 NIOServerCnxn,接着继续学习 NettyServerCnxn。
二、NettyServerCnxn 源码分析
2.1 类的继承关系
- public class NettyServerCnxn extends ServerCnxn {}
说明:NettyServerCnxn 继承了 ServerCnxn 抽象类,使用 Netty 框架来高效处理与客户端之间的通信。
2.2 类的内部类
1. SendBufferWriter 类
SendBufferWriter
- private class SendBufferWriter extends Writer {
- private StringBuffer sb = new StringBuffer();
- /**
- * Check if we are ready to send another chunk.
- * @param force force sending, even if not a full chunk
- */
- // 是否准备好发送另一块
- private void checkFlush(boolean force) {
- if ((force && sb.length() > 0) || sb.length() > 2048) { // 当强制发送并且sb大小大于0,或者sb大小大于2048即发送缓存
- sendBuffer(ByteBuffer.wrap(sb.toString().getBytes()));
- // clear our internal buffer
- sb.setLength(0);
- }
- }
- @Override
- public void close() throws IOException {
- if (sb == null) return;
- // 关闭之前需要强制性发送缓存
- checkFlush(true);
- sb = null; // clear out the ref to ensure no reuse
- }
- @Override
- public void flush() throws IOException {
- checkFlush(true);
- }
- @Override
- public void write(char[] cbuf, int off, int len) throws IOException {
- sb.append(cbuf, off, len);
- checkFlush(false);
- }
- }
说明:与 NIOServerCnxn 中相同,该类用来将给客户端的响应进行分块,不再累赘。
2. ResumeMessageEvent 类
ResumeMessageEvent
- static class ResumeMessageEvent implements MessageEvent {
- // 通道
- Channel channel;
- // 构造函数
- ResumeMessageEvent(Channel channel) {
- this.channel = channel;
- }
- @Override
- public Object getMessage() {return null;}
- @Override
- public SocketAddress getRemoteAddress() {return null;}
- @Override
- public Channel getChannel() {return channel;}
- @Override
- public ChannelFuture getFuture() {return null;}
- };
说明:ResumeMessageEvent 继承 MessageEvent,其表示消息的传输或接收。
3. CommandThread 类
CommandThread
- private abstract class CommandThread
- /*extends Thread*/
- {
- PrintWriter pw;
- CommandThread(PrintWriter pw) {
- this.pw = pw;
- }
- public void start() {
- run();
- }
- public void run() {
- try {
- commandRun();
- } catch(IOException ie) {
- LOG.error("Error in running command ", ie);
- } finally {
- cleanupWriterSocket(pw);
- }
- }
- public abstract void commandRun() throws IOException;
- }
说明:其与 NIOServerCnxn 中类似,也是每个子类对应着一个命令,值得注意的是针对每个 CMD 命令,其仅仅使用一个线程来处理。
2.3 类的属性
类的属性
- public class NettyServerCnxn extends ServerCnxn {
- // 日志
- Logger LOG = LoggerFactory.getLogger(NettyServerCnxn.class);
- // 通道
- Channel channel;
- // 通道缓存
- ChannelBuffer queuedBuffer;
- // 节流与否
- volatile boolean throttled;
- // Byte缓冲区
- ByteBuffer bb;
- // 四个字节的缓冲区
- ByteBuffer bbLen = ByteBuffer.allocate(4);
- // 会话ID
- long sessionId;
- // 会话超时时间
- int sessionTimeout;
- // 计数
- AtomicLong outstandingCount = new AtomicLong();
- /** The ZooKeeperServer for this connection. May be null if the server
- * is not currently serving requests (for example if the server is not
- * an active quorum participant.
- */
- // Zookeeper服务器
- private volatile ZooKeeperServer zkServer;
- // NettyServerCnxn工厂
- NettyServerCnxnFactory factory;
- // 初始化与否
- boolean initialized;
- // 四个字节
- private static final byte[] fourBytes = new byte[4];
- private static final String ZK_NOT_SERVING =
- "This ZooKeeper instance is not currently serving requests";
- }
说明:NettyServerCnxn 维护了与客户端之间的通道缓冲、缓冲区及会话的相关属性。
2.4 类的构造函数
构造函数
- NettyServerCnxn(Channel channel, ZooKeeperServer zks, NettyServerCnxnFactory factory) {
- // 给属性赋值
- this.channel = channel;
- this.zkServer = zks;
- this.factory = factory;
- if (this.factory.login != null) { // 需要登录信息(用户名和密码登录)
- this.zooKeeperSaslServer = new ZooKeeperSaslServer(factory.login);
- }
- }
说明:构造函数对 NettyServerCnxn 中的部分重要属性进行了赋值,其中还涉及到是否需要用户登录。
2.5 核心函数分析
1. receiveMessage 函数
receiveMessage
- public void receiveMessage(ChannelBuffer message) {
- try {
- while(message.readable() && !throttled) { // 当writerIndex > readerIndex,并且不节流时,满足条件
- if (bb != null) { // 不为null
- if (LOG.isTraceEnabled()) {
- LOG.trace("message readable " + message.readableBytes()
- + " bb len " + bb.remaining() + " " + bb);
- ByteBuffer dat = bb.duplicate();
- dat.flip();
- LOG.trace(Long.toHexString(sessionId)
- + " bb 0x"
- + ChannelBuffers.hexDump(
- ChannelBuffers.copiedBuffer(dat)));
- }
- if (bb.remaining() > message.readableBytes()) { // bb剩余空间大于message中可读字节大小
- // 确定新的limit
- int newLimit = bb.position() + message.readableBytes();
- bb.limit(newLimit);
- }
- // 将message写入bb中
- message.readBytes(bb);
- // 重置bb的limit
- bb.limit(bb.capacity());
- if (LOG.isTraceEnabled()) {
- LOG.trace("after readBytes message readable "
- + message.readableBytes()
- + " bb len " + bb.remaining() + " " + bb);
- ByteBuffer dat = bb.duplicate();
- dat.flip();
- LOG.trace("after readbytes "
- + Long.toHexString(sessionId)
- + " bb 0x"
- + ChannelBuffers.hexDump(
- ChannelBuffers.copiedBuffer(dat)));
- }
- if (bb.remaining() == 0) { // 已经读完message,表示内容已经全部接收
- // 统计接收信息
- packetReceived();
- // 翻转,可读
- bb.flip();
- ZooKeeperServer zks = this.zkServer;
- if (zks == null) { // Zookeeper服务器为空
- throw new IOException("ZK down");
- }
- if (initialized) { // 未被初始化
- // 处理bb中包含的包信息
- zks.processPacket(this, bb);
- if (zks.shouldThrottle(outstandingCount.incrementAndGet())) { // 是否已经节流
- // 不接收数据
- disableRecvNoWait();
- }
- } else { // 已经初始化
- LOG.debug("got conn req request from "
- + getRemoteSocketAddress());
- // 处理连接请求
- zks.processConnectRequest(this, bb);
- initialized = true;
- }
- bb = null;
- }
- } else { // bb为null
- if (LOG.isTraceEnabled()) {
- LOG.trace("message readable "
- + message.readableBytes()
- + " bblenrem " + bbLen.remaining());
- // 复制bbLen缓冲
- ByteBuffer dat = bbLen.duplicate();
- // 翻转
- dat.flip();
- LOG.trace(Long.toHexString(sessionId)
- + " bbLen 0x"
- + ChannelBuffers.hexDump(
- ChannelBuffers.copiedBuffer(dat)));
- }
- if (message.readableBytes() < bbLen.remaining()) { // bb剩余空间大于message中可读字节大小
- // 重设bbLen的limit
- bbLen.limit(bbLen.position() + message.readableBytes());
- }
- // 将message内容写入bbLen中
- message.readBytes(bbLen);
- // 重置bbLen的limit
- bbLen.limit(bbLen.capacity());
- if (bbLen.remaining() == 0) { // 已经读完message,表示内容已经全部接收
- // 翻转
- bbLen.flip();
- if (LOG.isTraceEnabled()) {
- LOG.trace(Long.toHexString(sessionId)
- + " bbLen 0x"
- + ChannelBuffers.hexDump(
- ChannelBuffers.copiedBuffer(bbLen)));
- }
- // 读取position后四个字节
- int len = bbLen.getInt();
- if (LOG.isTraceEnabled()) {
- LOG.trace(Long.toHexString(sessionId)
- + " bbLen len is " + len);
- }
- // 清除缓存
- bbLen.clear();
- if (!initialized) { // 未被初始化
- if (checkFourLetterWord(channel, message, len)) { // 是否是四个字母的命令
- return;
- }
- }
- if (len < 0 || len > BinaryInputArchive.maxBuffer) {
- throw new IOException("Len error " + len);
- }
- // 根据len重新分配缓冲,以便接收内容
- bb = ByteBuffer.allocate(len);
- }
- }
- }
- } catch(IOException e) {
- LOG.warn("Closing connection to " + getRemoteSocketAddress(), e);
- close();
- }
- }
说明:该函数用于接收 ChannelBuffer 中的数据,函数在 while 循环体中,当 writerIndex 大于 readerIndex(表示 ChannelBuffer 中还有可读内容) 且 throttled 为 false 时执行 while 循环体,该函数大致可以分为两部分,首先是当 bb 不为空时,表示已经准备好读取 ChannelBuffer 中的内容,其流程如下
- if (bb != null) { // 不为null,表示已经准备好读取message
- if (LOG.isTraceEnabled()) {
- LOG.trace("message readable " + message.readableBytes()
- + " bb len " + bb.remaining() + " " + bb);
- ByteBuffer dat = bb.duplicate();
- dat.flip();
- LOG.trace(Long.toHexString(sessionId)
- + " bb 0x"
- + ChannelBuffers.hexDump(
- ChannelBuffers.copiedBuffer(dat)));
- }
- if (bb.remaining() > message.readableBytes()) { // bb剩余空间大于message中可读字节大小
- // 确定新的limit
- int newLimit = bb.position() + message.readableBytes();
- bb.limit(newLimit);
- }
- // 将message写入bb中
- message.readBytes(bb);
- // 重置bb的limit
- bb.limit(bb.capacity());
- if (LOG.isTraceEnabled()) {
- LOG.trace("after readBytes message readable "
- + message.readableBytes()
- + " bb len " + bb.remaining() + " " + bb);
- ByteBuffer dat = bb.duplicate();
- dat.flip();
- LOG.trace("after readbytes "
- + Long.toHexString(sessionId)
- + " bb 0x"
- + ChannelBuffers.hexDump(
- ChannelBuffers.copiedBuffer(dat)));
- }
- if (bb.remaining() == 0) { // 已经读完message,表示内容已经全部接收
- // 统计接收信息
- packetReceived();
- // 翻转,可读
- bb.flip();
- ZooKeeperServer zks = this.zkServer;
- if (zks == null) { // Zookeeper服务器为空
- throw new IOException("ZK down");
- }
- if (initialized) { // 未被初始化
- // 处理bb中包含的包信息
- zks.processPacket(this, bb);
- if (zks.shouldThrottle(outstandingCount.incrementAndGet())) { // 是否已经节流
- // 不接收数据
- disableRecvNoWait();
- }
- } else { // 已经初始化
- LOG.debug("got conn req request from "
- + getRemoteSocketAddress());
- // 处理连接请求
- zks.processConnectRequest(this, bb);
- initialized = true;
- }
- bb = null;
- }
- }
其中主要的部分是判断 bb 的剩余空间是否大于 message 中的内容,简单而言,就是判断 bb 是否还有足够空间存储 message 内容,然后设置 bb 的 limit,之后将 message 内容读入 bb 缓冲中,之后再次确定时候已经读完 message 内容,统计接收信息,再根据是否已经初始化来处理包或者是连接请求,其中的请求内容都存储在 bb 中。而当 bb 为空时,其流程如下
- else { // bb为null
- if (LOG.isTraceEnabled()) {
- LOG.trace("message readable "
- + message.readableBytes()
- + " bblenrem " + bbLen.remaining());
- // 复制bbLen缓冲
- ByteBuffer dat = bbLen.duplicate();
- // 翻转
- dat.flip();
- LOG.trace(Long.toHexString(sessionId)
- + " bbLen 0x"
- + ChannelBuffers.hexDump(
- ChannelBuffers.copiedBuffer(dat)));
- }
- if (message.readableBytes() < bbLen.remaining()) { // bb剩余空间大于message中可读字节大小
- // 重设bbLen的limit
- bbLen.limit(bbLen.position() + message.readableBytes());
- }
- // 将message内容写入bbLen中
- message.readBytes(bbLen);
- // 重置bbLen的limit
- bbLen.limit(bbLen.capacity());
- if (bbLen.remaining() == 0) { // 已经读完message,表示内容已经全部接收
- // 翻转
- bbLen.flip();
- if (LOG.isTraceEnabled()) {
- LOG.trace(Long.toHexString(sessionId)
- + " bbLen 0x"
- + ChannelBuffers.hexDump(
- ChannelBuffers.copiedBuffer(bbLen)));
- }
- // 读取position后四个字节
- int len = bbLen.getInt();
- if (LOG.isTraceEnabled()) {
- LOG.trace(Long.toHexString(sessionId)
- + " bbLen len is " + len);
- }
- // 清除缓存
- bbLen.clear();
- if (!initialized) { // 未被初始化
- if (checkFourLetterWord(channel, message, len)) { // 是否是四个字母的命令
- return;
- }
- }
- if (len < 0 || len > BinaryInputArchive.maxBuffer) {
- throw new IOException("Len error " + len);
- }
- // 根据len重新分配缓冲,以便接收内容
- bb = ByteBuffer.allocate(len);
- }
- }
当 bb 为空时,表示还没有给 bb 分配足够的内存空间来读取 message,首先还是将 message 内容(后续内容的长度)读入 bbLen 中,然后再确定读入的内容代表后续真正内容的长度 len,然后再根据 len 来为 bb 分配存储空间,方便后续读取真正的内容。
2. sendResponse 函数
- public void sendResponse(ReplyHeader h, Record r, String tag)
- throws IOException {
- if (!channel.isOpen()) {
- return;
- }
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- // Make space for length
- BinaryOutputArchive bos = BinaryOutputArchive.getArchive(baos);
- try {
- // 向baos中写入四个字节(空)
- baos.write(fourBytes);
- // 写入记录
- bos.writeRecord(h, "header");
- if (r != null) {
- // 写入记录
- bos.writeRecord(r, tag);
- }
- // 关闭
- baos.close();
- } catch (IOException e) {
- LOG.error("Error serializing response");
- }
- // 转化为Byte Array
- byte b[] = baos.toByteArray();
- // 将Byte Array封装成ByteBuffer
- ByteBuffer bb = ByteBuffer.wrap(b);
- bb.putInt(b.length - 4).rewind();
- // 发送缓冲
- sendBuffer(bb);
- if (h.getXid() > 0) {
- // zks cannot be null otherwise we would not have gotten here!
- if (!zkServer.shouldThrottle(outstandingCount.decrementAndGet())) {
- enableRecv();
- }
- }
- }
说明:其首先会将 header 和 record 都写入 baos,之后再将 baos 转化为 ByteBuffer,之后在调用 sendBuffer 来发送缓冲,而 sendBuffer 完成的操作是将 ByteBuffer 写入 ChannelBuffer 中。
3. process 函数
- public void process(WatchedEvent event) {
- // 创建响应头
- ReplyHeader h = new ReplyHeader(-1, -1L, 0);
- if (LOG.isTraceEnabled()) {
- ZooTrace.logTraceMessage(LOG, ZooTrace.EVENT_DELIVERY_TRACE_MASK,
- "Deliver event " + event + " to 0x"
- + Long.toHexString(this.sessionId)
- + " through " + this);
- }
- // Convert WatchedEvent to a type that can be sent over the wire
- WatcherEvent e = event.getWrapper();
- try {
- // 发送响应
- sendResponse(h, e, "notification");
- } catch (IOException e1) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Problem sending to " + getRemoteSocketAddress(), e1);
- }
- close();
- }
- }
说明:首先创建 ReplyHeader,然后再调用 sendResponse 来发送响应,最后调用 close 函数进行后续关闭处理。
三、总结
本篇博文讲解了基于 Netty 完成服务端与客户端之间的通信,其效率相对较高,也谢谢各位园友的观看~
来源: http://www.cnblogs.com/leesf456/p/6486454.html