我们在使用 Netty 的时候的初始化代码一般如下
- EventLoopGroup bossGroup = new NioEventLoopGroup();
- EventLoopGroup workerGroup = new NioEventLoopGroup();
- try {
- // 配置服务器的 NIO 线程组
- ServerBootstrap b = new ServerBootstrap();
- b.group(bossGroup, workerGroup)
- .channel(NioServerSocketChannel.class)
- .option(ChannelOption.SO_BACKLOG, 1024)
- .childHandler(new ChildChannelHandler());
- // 绑定端口, 同步等待成功
- ChannelFuture f = b.bind(port).sync();
- // 等待服务端监听端口关闭
- f.channel().closeFuture().sync();
- } finally {
- // 优雅退出, 释放线程池资源
- bossGroup.shutdownGracefully();
- workerGroup.shutdownGracefully();
- }
前面已经说过线程池的启动过程, 接下来就是通过 builder 模式构造启动参数, 接下来看看 bind 的过程. channel 的注册和 ip:port 的绑定都是在 bind 方法中进行的, bind 方法的主要逻辑是
初始化 channel
channel 注册到 selector
NioServerSocketChannel
先看看 channel 的初始化, server 端使用的 NioServerSocketChannel 封装了 JDK 的 ServerSocketChannel, 初始化过程如下:
- // 配置使用的 channel 的时候会指定对应的 channelFactory
- public B channel(Class<? extends C> channelClass) {
- if (channelClass == null) {
- throw new NullPointerException("channelClass");
- }
- return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
- }
- final ChannelFuture initAndRegister() {
- Channel channel = null;
- try {
- // channelFactory 是 ReflectiveChannelFactory
- channel = channelFactory.newChannel();
- init(channel);
- } catch (Throwable t) {
- if (channel != null) {
- // channel can be null if newChannel crashed (eg SocketException("too many open files"))
- channel.unsafe().closeForcibly();
- // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
- return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
- }
- // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
- return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
- }
- ChannelFuture regFuture = config().group().register(channel);
- if (regFuture.cause() != null) {
- if (channel.isRegistered()) {
- channel.close();
- } else {
- channel.unsafe().closeForcibly();
- }
- }
- // If we are here and the promise is not failed, it's one of the following cases:
- // 1) If we attempted registration from the event loop, the registration has been completed at this point.
- // i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
- // 2) If we attempted registration from the other thread, the registration request has been successfully
- // added to the event loop's task queue for later execution.
- // i.e. It's safe to attempt bind() or connect() now:
- // because bind() or connect() will be executed *after* the scheduled registration task is executed
- // because register(), bind(), and connect() are all bound to the same thread.
- return regFuture;
- }
上面使用的是 io.netty.channel.ReflectiveChannelFactory#newChannel 来创建 channel, 利用反射创建实例, 使用的是 NioServerSocketChannel 的无参构造方法, 在午无参造方法中调用 newChannel
- // 创建 serverChannel 的时候先调用 newSocket, 然后调用下面的构造方法
- public NioServerSocketChannel(ServerSocketChannel channel) {
- // 设置当前 socket 监听的事件, 由于是 server 一定要添加 accept 事件
- super(null, channel, SelectionKey.OP_ACCEPT);
- config = new NioServerSocketChannelConfig(this, javaChannel().socket());
- }
- // io.netty.channel.socket.nio.NioServerSocketChannel#newSocket
- private static ServerSocketChannel newSocket(SelectorProvider provider) {
- try {
- /**
- * Use the {@link SelectorProvider} to open {@link SocketChannel} and so remove condition in
- * {@link SelectorProvider#provider()} which is called by each ServerSocketChannel.open() otherwise.
- *
- * See <a href="https://github.com/netty/netty/issues/2308">#2308</a>.
- */
- return provider.openServerSocketChannel();
- } catch (IOException e) {
- throw new ChannelException(
- "Failed to open a server socket.", e);
- }
- }
- ServerSocketChannelImpl(SelectorProvider sp) throws IOException {
- super(sp);
- // 创建一个 socket, 返回的是 socket 对应的文件描述符
- this.fd = Net.serverSocket(true);
- this.fdVal = IOUtil.fdVal(fd);
- this.state = ST_INUSE;
- }
- // sun.nio.ch.Net#serverSocket
- static FileDescriptor serverSocket(boolean stream) {
- // socket0 是一个 native 方法, 返回的是 int 类型的 linux 的文件描述符, 使用 newFD 转化为 Java 的文件描述符
- return IOUtil.newFD(socket0(isIPv6Available(), stream, true));
- }
- // jdk/src/solaris/native/sun/nio/ch/Net.c
- JNIEXPORT int JNICALL
- Java_sun_nio_ch_Net_socket0(JNIEnv *env, jclass cl, jboolean preferIPv6,
- jboolean stream, jboolean reuse)
- {
- // 省略中间代码...
- // 调用 socket 方法创建一个 socket, 并返回对应的文件描述符
- fd = socket(domain, type, 0);
- if (fd < 0) {
- return handleSocketError(env, errno);
- }
- // 省略中间代码...
- return fd;
- }
不难看出 channel 初始化的过程就是创建了一个 socket, 接下来看看 channel 的注册
- // config() 返回的是 ServerBootstrapConfig
- // group() 返回的是 parentGroup, 对应开始的例子是 bossGroup, 也就是 NioEventLoopGroup
- // 所以是调用的是 NioEventLoopGroup.register, 该方法继承自 MultithreadEventLoopGroup
- ChannelFuture regFuture = config().group().register(channel);
- // io.netty.channel.MultithreadEventLoopGroup#register(io.netty.channel.Channel)
- public ChannelFuture register(Channel channel) {
- // 使用的是 bossGroup,next 方法选出第一个 NioEventLoop, 调用 NioEventLoop.register, 该方法继承自 SingleThreadEventLoop
- return next().register(channel);
- }
- // io.netty.channel.SingleThreadEventLoop#register(io.netty.channel.Channel)
- public ChannelFuture register(Channel channel) {
- // 注册的还是使用一个 promise, 可以异步注册
- return register(new DefaultChannelPromise(channel, this));
- }
- // io.netty.channel.SingleThreadEventLoop#register(io.netty.channel.ChannelPromise)
- public ChannelFuture register(final ChannelPromise promise) {
- ObjectUtil.checkNotNull(promise, "promise");
- // channel 返回的是 NioServerSocketChannel
- // unsafe 返回的是 io.netty.channel.nio.AbstractNioMessageChannel.NioMessageUnsafe
- // 所以调用的是 NioMessageUnsafe.register, 该方法继承自 AbstractUnsafe
- promise.channel().unsafe().register(this, promise);
- return promise;
- }
- // io.netty.channel.AbstractChannel.AbstractUnsafe#register
- public final void register(EventLoop eventLoop, final ChannelPromise promise) {
- // 省略中间代码...
- // 当前线程是 main 线程, eventLoop 是 bossGroup 中的一个线程, 所以这里返回 false, 会在新线程中执行 register0
- if (eventLoop.inEventLoop()) {
- register0(promise);
- } else {
- try {
- // 在 eventLoop 中执行
- eventLoop.execute(new Runnable() {
- @Override
- public void run() {
- register0(promise);
- }
- });
- } catch (Throwable t) {
- // 省略中间代码...
- }
- }
- }
- private void register0(ChannelPromise promise) {
- try {
- // 省略中间代码...
- // 这里面主要是调用 ServerSocketChannelImpl.register, 注册的过程中主要是将需要监听的文件描述符添加到 EPollArrayWrapper 中
- doRegister();
- neverRegistered = false;
- registered = true;
- // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
- // user may already fire events through the pipeline in the ChannelFutureListener.
- pipeline.invokeHandlerAddedIfNeeded();
- safeSetSuccess(promise);
- pipeline.fireChannelRegistered();
- // Only fire a channelActive if the channel has never been registered. This prevents firing
- // multiple channel actives if the channel is deregistered and re-registered.
- if (isActive()) {
- if (firstRegistration) {
- pipeline.fireChannelActive();
- } else if (config().isAutoRead()) {
- // This channel was registered before and autoRead() is set. This means we need to begin read
- // again so that we process inbound data.
- //
- // See https://github.com/netty/netty/issues/4805
- beginRead();
- }
- }
- } catch (Throwable t) {
- // 省略中间代码...
- }
- }
下面看看 channel 注册过程中做了哪些事情
- // sun.nio.ch.SelectorImpl#register
- // 这里 ch 是 ServerSocketChannelImpl
- // attachment 是 NioServerSocketChannel
- // ops 是 0, 这里并不注册需要监听的事件
- // selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
- protected final SelectionKey register(AbstractSelectableChannel ch,
- int ops,
- Object attachment)
- {
- if (!(ch instanceof SelChImpl))
- throw new IllegalSelectorException();
- // 创建一个 SelectionKeyImpl,
- SelectionKeyImpl k = new SelectionKeyImpl((SelChImpl)ch, this);
- k.attach(attachment);
- synchronized (publicKeys) {
- // 调用 sun.nio.ch.EPollSelectorImpl#implRegister
- implRegister(k);
- }
- // 设置当前 channel 关注的事件
- k.interestOps(ops);
- return k;
- }
- protected void implRegister(SelectionKeyImpl ski) {
- if (closed)
- throw new ClosedSelectorException();
- SelChImpl ch = ski.channel;
- int fd = Integer.valueOf(ch.getFDVal());
- fdToKey.put(fd, ski);
- // poolWrapper 是 epoll 监听事件所需数据结构的 java 版本
- // add 方法调用 setUpdateEvents 来指定当前 socket 监听的事件
- pollWrapper.add(fd);
- keys.add(ski);
- }
- /**
- * struct epoll_event {
- * __uint32_t events;
- * epoll_data_t data;
- * };
- * 由于一开始并不知道会监听多少个 socket, 所以 jdk 默认指定了 MAX_UPDATE_ARRAY_SIZE
- * 如果小于 MAX_UPDATE_ARRAY_SIZE 则使用数组 eventsLow 存储每个 socket 监听的事件, eventsLow 的下标就是 socket 对应的文件描述符
- * 如果大于等于 MAX_UPDATE_ARRAY_SIZE 个则使用 EPollArrayWrapper#eventsHigh, 也就是一个 map 来保存每个 socket 监听的事件
- *
- * 注意这个时候调用 setUpdateEvents 的 events 参数是 0, 也就是还没有执行监听的事件类型
- */
- private void setUpdateEvents(int fd, byte events, boolean force) {
- if (fd < MAX_UPDATE_ARRAY_SIZE) {
- if ((eventsLow[fd] != KILLED) || force) {
- eventsLow[fd] = events;
- }
- } else {
- Integer key = Integer.valueOf(fd);
- if (!isEventsHighKilled(key) || force) {
- eventsHigh.put(key, Byte.valueOf(events));
- }
- }
- }
需要注意的时候上面并没有设置当前 channel 监听的事件, 真正设置监听的事件类型是在 beginRead 方法里面, 在当前 channel 被激活的时候会调用 beginRead 方法
- // io.netty.channel.nio.AbstractNioChannel#doBeginRead
- protected void doBeginRead() throws Exception {
- // Channel.read() or ChannelHandlerContext.read() was called
- final SelectionKey selectionKey = this.selectionKey;
- if (!selectionKey.isValid()) {
- return;
- }
- readPending = true;
- final int interestOps = selectionKey.interestOps();
- if ((interestOps & readInterestOp) == 0) {
- // readInterestOp 是 16, 在 NioServerSocketChannel 构造方法里面指定了这个 channel 需要监听 accept 事件
- // 这里才是真正设置 socket 监听事件的地方
- // 下面这个方法最后会调用到 sun.nio.ch.EPollArrayWrapper#setInterest
- selectionKey.interestOps(interestOps | readInterestOp);
- }
- }
- // sun.nio.ch.EPollArrayWrapper#setInterest
- void setInterest(int fd, int mask) {
- synchronized (updateLock) {
- // record the file descriptor and events
- int oldCapacity = updateDescriptors.length;
- if (updateCount == oldCapacity) {
- int newCapacity = oldCapacity + INITIAL_PENDING_UPDATE_SIZE;
- int[] newDescriptors = new int[newCapacity];
- System.arraycopy(updateDescriptors, 0, newDescriptors, 0, oldCapacity);
- updateDescriptors = newDescriptors;
- }
- updateDescriptors[updateCount++] = fd;
- // events are stored as bytes for efficiency reasons
- byte b = (byte)mask;
- assert (b == mask) && (b != KILLED);
- // 上面已经说过这个方法了, 把当前 socket 对应的文件描述符监听的事件设置为 b
- setUpdateEvents(fd, b, false);
- }
- }
到这里一个 serverSocketChannel 注册成功了, 而且也设置了关注的事件, 接下来看看完成 ip:port 的绑定
- public ServerSocketChannel bind(SocketAddress local, int backlog) throws IOException {
- synchronized (lock) {
- // 省略中间代码...
- // 调用 native 方法的 bind, 最后调用 linux 的 bind 方法
- Net.bind(fd, isa.getAddress(), isa.getPort());
- // 最后调用 listen 方法完成监听 serverSocket 的文件描述符
- Net.listen(fd, backlog < 1 ? 50 : backlog);
- synchronized (stateLock) {
- localAddress = Net.localAddress(fd);
- }
- }
- return this;
- }
总结
server 在 bind 的过程中主要初始化了 NioServerSocketChannel, 并将 channel 注册到 selector, 添加了 channel 需要监听的事件, 接下来该 socketChannel 就可以监听端口接受来自客户端的请求了.
来源: https://www.cnblogs.com/sunshine-2015/p/9357760.html