前面经过 channel 初始化, 注册, 所需要的数据结构 (epoll_event) 基本上准备好了, serverSocket 也处于监听状态, 可以接收来自客户端的请求了. NioServerSocketChannel 注册在了 NioEventLoop#selector, 在注册过程中启动了 NioEventLoop,run 方法会循环执行, 每次循环都会执行 select 和执行所有的 task. 如果 select 有事件, 则会处理收到的事件.
- private void processSelectedKeys() {
- if (selectedKeys != null) {
- // 是否使用优化过的 selectionKey
- processSelectedKeysOptimized();
- } else {
- processSelectedKeysPlain(selector.selectedKeys());
- }
- }
前面在 NioEventLoop 初始化的时候说过关于 selectionKey 优化的问题, 这里不再赘述. 两种方式主要是遍历 selectionKey 的方式不同, 具体处理事件的调用是一样的. 这里以 processSelectedKeysOptimized 为例.
- accept
- private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
- // channel 是 NioServerSocketChannel
- // unsafe 是 NioMessageUnsafe
- final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
- // 省略中间代码...
- if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
- // 调用 NioMessageUnsafe.read
- unsafe.read();
- }
- } catch (CancelledKeyException ignored) {
- unsafe.close(unsafe.voidPromise());
- }
- public void read() {
- // 省略中间代码...
- // 由于是 ServerSocket, 只负责 accept, 如果有 IO 事件说明就是有新的客户端连接, 所以这里就是创建 NioSocketChannel
- int localRead = doReadMessages(readBuf);
- if (localRead == 0) {
- break;
- }
- if (localRead <0) {
- closed = true;
- break;
- }
- allocHandle.incMessagesRead(localRead);
- } while (allocHandle.continueReading());
- } catch (Throwable t) {
- exception = t;
- }
- int size = readBuf.size();
- for (int i = 0; i < size; i ++) {
- readPending = false;
- // 注册刚刚创建的 NioSocketChannel
- pipeline.fireChannelRead(readBuf.get(i));
- }
- readBuf.clear();
- allocHandle.readComplete();
- pipeline.fireChannelReadComplete();
- // 省略中间代码...
- }
- }
- protected int doReadMessages(List<Object> buf) throws Exception {
- // 调用 java.nio.channels.ServerSocketChannel#accept 来创建 SocketChannel
- SocketChannel ch = SocketUtils.accept(javaChannel());
- try {
- if (ch != null) {
- // 创建 NioSocketChannel
- buf.add(new NioSocketChannel(this, ch));
- return 1;
- }
- } catch (Throwable t) {
- // 省略中间代码...
- }
- return 0;
- }
上面创建了 NioSocketChannel 之后, 接下来注册所有客户端连接的 NioSocketChannel, 调用的是 DefaultChannelPipeline#fireChannelRead 方法, 接下来是执行 pipeline 中的 handler, 在初始化的时候添加了 LoggingHandler (如果启动的时候配置了的话), 那么目前 pipeline 中的 handler 有
io.netty.channel.DefaultChannelPipeline$HeadContext:pipeline 创建的时候默认的第一个 handler
io.netty.handler.logging.LoggingHandler: 启动的时候用户配置的 handler
io.netty.bootstrap.ServerBootstrap$ServerBootstrapAcceptor
io.netty.channel.DefaultChannelPipeline$TailContext:pipeline 创建的时候默认的最后一个 handler
下面看下 ServerBootstrap$ServerBootstrapAcceptor 是什么时候添加到 handler 的
- // io.netty.bootstrap.ServerBootstrap#init
- // 这个方法是 NioServerSocketChannel 初始化的时候调用的
- void init(Channel channel) throws Exception {
- // 省略中间代码...
- p.addLast(new ChannelInitializer<Channel>() {
- @Override
- public void initChannel(final Channel ch) throws Exception {
- final ChannelPipeline pipeline = ch.pipeline();
- ChannelHandler handler = config.handler();
- if (handler != null) {
- pipeline.addLast(handler);
- }
- ch.eventLoop().execute(new Runnable() {
- @Override
- public void run() {
- // 在 pipeline 中添加 ServerBootstrapAcceptor
- pipeline.addLast(new ServerBootstrapAcceptor(
- ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
- }
- });
- }
- });
- }
之所以说 ServerBootstrapAcceptor, 是因为 NioSocketChannel 的 register 过程是这个 handler 的 channelRead 方法开始的
- public void channelRead(ChannelHandlerContext ctx, Object msg) {
- final Channel child = (Channel) msg;
- child.pipeline().addLast(childHandler);
- setChannelOptions(child, childOptions, logger);
- // 配置 NioSocketChannel
- for (Entry<AttributeKey<?>, Object> e: childAttrs) {
- child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
- }
- try {
- // 这里 childGroup 就是一开始我们配置的 workerGroup
- // 所以调用的是 io.netty.channel.MultithreadEventLoopGroup#register(io.netty.channel.Channel)
- childGroup.register(child).addListener(new ChannelFutureListener() {
- @Override
- public void operationComplete(ChannelFuture future) throws Exception {
- if (!future.isSuccess()) {
- forceClose(child, future.cause());
- }
- }
- });
- } catch (Throwable t) {
- forceClose(child, t);
- }
- }
接下来的注册过程和 NioServerSocketChannel 的注册过程是类似的, 创建 socket, 创建 SelectionKeyImpl 等. 只不过 NioSocketChannel 不监听 accept 事件.
read
上面在接收到来自客户端的连接请求后, 将 NioSocketChannel 注册到 selector 上, 这个 selector 也是在 NioEventLoop 里面的, 后面和这个客户端的通信都会通过这个 channel 进行, 如果客户端发送来数据, 也是 selector 收到读事件通知, 然后调用 processSelectedKey 来处理 read 事件.
- private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
- // channel 是 NioSocketChannel
- // unsafe 是 NioSocketChannelUnsafe
- final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
- // 省略中间代码...
- if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
- // 调用 NioByteUnsafe.read
- unsafe.read();
- }
- } catch (CancelledKeyException ignored) {
- unsafe.close(unsafe.voidPromise());
- }
- public final void read() {
- final ChannelConfig config = config();
- if (shouldBreakReadReady(config)) {
- clearReadPending();
- return;
- }
- final ChannelPipeline pipeline = pipeline();
- // PooledByteBufAllocator, 默认的内存申请管理器
- final ByteBufAllocator allocator = config.getAllocator();
- // AdaptiveRecvByteBufAllocator$HandleImpl
- final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
- allocHandle.reset(config);
- ByteBuf byteBuf = null;
- boolean close = false;
- try {
- do {
- // 申请内存
- byteBuf = allocHandle.allocate(allocator);
- // 读取数据
- allocHandle.lastBytesRead(doReadBytes(byteBuf));
- if (allocHandle.lastBytesRead() <= 0) {
- // nothing was read. release the buffer.
- byteBuf.release();
- byteBuf = null;
- close = allocHandle.lastBytesRead() < 0;
- if (close) {
- // There is nothing left to read as we received an EOF.
- readPending = false;
- }
- break;
- }
- allocHandle.incMessagesRead(1);
- readPending = false;
- // 执行 pipeline 中的 handler
- pipeline.fireChannelRead(byteBuf);
- byteBuf = null;
- } while (allocHandle.continueReading());
- // 省略中间代码
- }
- }
一般来说 NioSocketChannel 中的 handler 包括
io.netty.channel.DefaultChannelPipeline$HeadContext
org.lep.test.netty.protocol.custom.codec.NettyMessageDecoder: 自定义的解码器
org.lep.test.netty.protocol.custom.codec.NettyMessageEncoder: 自定义的编码器
org.lep.test.netty.protocol.custom.server.LoginAuthRespHandler: 自定义的 handler
org.lep.test.netty.protocol.custom.server.HeartBeatRespHandler: 自定义的 handler
io.netty.channel.DefaultChannelPipeline$TailContext
netty 提供了一些基本的编解码功能, 自己可以根据实际需要扩展使用, 然后自定义自己的逻辑处理 handler.
上面还涉及到内存的分配部分留在下一节介绍.
总结
read 事件处理过程:
接收到 read 事件
分配内存, 初始化 buffer
调用 channel.read 将数据读取到 buffer 中
执行 pipeline 中的 handler, 包括了编解码的 handler, 自定义的 handler 来处理数据
来源: https://www.cnblogs.com/sunshine-2015/p/9369282.html