前面的第一篇文章中, 我以 spark 中的 netty 客户端的创建为切入点, 分析了 netty 的客户端引导类 Bootstrap 的参数设置以及启动过程. 显然, 我们还有另一个重要的部分 -- 服务端的初始化和启动过程没有探究, 所以这一节, 我们就来从源码层面详细分析一下 netty 的服务端引导类 ServerBootstrap 的启动过程.
spark 中 netty 服务端的创建
我们仍然以 spark 中对 netty 的使用为例, 以此为源码分析的切入点, 首先我们看一下 spark 的 NettyRpc 模块中创建 netty 服务端引导类的代码:
TransportServer.init
TransportServer 的构造方法中会调用 init 方法, ServerBootstrap 类就是在 init 方法中被创建并初始化以及启动的.
这个方法主要分为三块:
创建 ServerBootstrap 对象, 并设置各种参数. 我们看到, 这里的 bossGroup 和 workerGroup 是同一个线程组, 此外还设置了 socket 的一些参数如排队的连接数, 接收缓冲区, 发送缓冲区大小等.
设置 childHandler 参数, 之所以把这个参数的设置单独拿出来就是为了凸显这个参数的重要性, childHandler 参数是用户实现时间处理逻辑的地方
最后将服务端绑定到某个端口, 同时在绑定的过程中也会启动服务端, 开始监听 io 事件.
很显然, ServerBootstrap 的启动入口就是 bind 方法.
- // 初始化 netty 服务端
- private void init(String hostToBind, int portToBind) {
- // io 模式, 有两种选项 NIO, EPOLL
- IOMode ioMode = IOMode.valueOf(conf.ioMode());
- // 创建 bossGroup 和 workerGroup, 即主线程组合子线程组
- EventLoopGroup bossGroup =
- NettyUtils.createEventLoop(ioMode, conf.serverThreads(), conf.getModuleName() + "-server");
- EventLoopGroup workerGroup = bossGroup;
- // 缓冲分配器, 分为堆内存和直接内存
- PooledByteBufAllocator allocator = NettyUtils.createPooledByteBufAllocator(
- conf.preferDirectBufs(), true /* allowCache */, conf.serverThreads());
- // 创建一个 netty 服务端引导对象, 并设置相关参数
- Bootstrap = new ServerBootstrap()
- .group(bossGroup, workerGroup)
- .channel(NettyUtils.getServerChannelClass(ioMode))
- .option(ChannelOption.ALLOCATOR, allocator)
- .childOption(ChannelOption.ALLOCATOR, allocator);
- // 内存使用的度量对象
- this.metrics = new NettyMemoryMetrics(
- allocator, conf.getModuleName() + "-server", conf);
- // 排队的连接数
- if (conf.backLog()> 0) {
- Bootstrap.option(ChannelOption.SO_BACKLOG, conf.backLog());
- }
- // socket 接收缓冲区大小
- if (conf.receiveBuf()> 0) {
- Bootstrap.childOption(ChannelOption.SO_RCVBUF, conf.receiveBuf());
- }
- // socket 发送缓冲区大小
- if (conf.sendBuf()> 0) {
- Bootstrap.childOption(ChannelOption.SO_SNDBUF, conf.sendBuf());
- }
- // 子 channel 处理器
- Bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
- @Override
- protected void initChannel(SocketChannel ch) {
- RpcHandler rpcHandler = appRpcHandler;
- for (TransportServerBootstrap Bootstrap : bootstraps) {
- rpcHandler = Bootstrap.doBootstrap(ch, rpcHandler);
- }
- context.initializePipeline(ch, rpcHandler);
- }
- });
- InetSocketAddress address = hostToBind == null ?
- new InetSocketAddress(portToBind): new InetSocketAddress(hostToBind, portToBind);
- // 绑定到 ip 地址和端口
- channelFuture = Bootstrap.bind(address);
- // 同步等待绑定成功
- channelFuture.syncUninterruptibly();
- port = ((InetSocketAddress) channelFuture.channel().localAddress()).getPort();
- logger.debug("Shuffle server started on port: {}", port);
- }
- AbstractBootstrap.init(SocketAddress localAddress)
这里的校验主要是对 group 和 channelFactory 的非空校验
- public ChannelFuture bind(SocketAddress localAddress) {
- validate();
- return doBind(ObjectUtil.checkNotNull(localAddress, "localAddress"));
- }
- AbstractBootstrap.doBind
这个方法, 我们之前在分析 Bootstrap 的启动过程时提到过, 它的主要作用如下:
通过反射根据传入的 channel 类型创建一个具体的 channel 对象
调用 init 方法对这个 channel 对象进行初始化
将初始化完成的 channel 对象注册到一个 EventLoop 线程上
之前, 我们分析了 NioSocketChannel 的构造过程, 以及 Bootstarp 中对 channel 的初始化过程,
本节我们要分析 NioServerSocketChannel 的构造过程, 以及 ServerBootstrap 的 init 方法的实现.
- private ChannelFuture doBind(final SocketAddress localAddress) {
- // 创建一个 channel, 并对这个 channel 做一些初始化工作
- final ChannelFuture regFuture = initAndRegister();
- final Channel channel = regFuture.channel();
- if (regFuture.cause() != null) {
- return regFuture;
- }
- if (regFuture.isDone()) {
- // At this point we know that the registration was complete and successful.
- ChannelPromise promise = channel.newPromise();
- // 将这个 channel 绑定到指定的地址
- doBind0(regFuture, channel, localAddress, promise);
- return promise;
- } else {// 对于尚未注册成功的情况, 采用异步的方式, 即添加一个回调
- // Registration future is almost always fulfilled already, but just in case it's not.
- final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
- regFuture.addListener(new ChannelFutureListener() {
- @Override
- public void operationComplete(ChannelFuture future) throws Exception {
- Throwable cause = future.cause();
- if (cause != null) {
- // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
- // IllegalStateException once we try to access the EventLoop of the Channel.
- promise.setFailure(cause);
- } else {
- // Registration was successful, so set the correct executor to use.
- // See https://github.com/netty/netty/issues/2586
- promise.registered();
- doBind0(regFuture, channel, localAddress, promise);
- }
- }
- });
- return promise;
- }
- }
NioServerSocketChannel 的构造方法
这里通过调用 jdk 的 API 创建了一个 ServerSocketChannel.
- public NioServerSocketChannel() {
- this(newSocket(DEFAULT_SELECTOR_PROVIDER));
- }
与 NioSocketChannelConfig 类似, NioServerSocketChannelConfig 也是一种门面模式, 是对 NioServerSocketChannel 中的参数接口的封装.
此外, 我们注意到, 这里规定了 NioServerSocketChannel 的初始的感兴趣的事件是 ACCEPT 事件, 即默认会监听请求建立连接的事件.
而在 NioSocketChannel 中的初始感兴趣的事件是 read 事件.
所以, 这里与 NioSocketChannel 构造过程最主要的不同就是初始的感兴趣事件不同.
- public NioServerSocketChannel(ServerSocketChannel channel) {
- super(null, channel, SelectionKey.OP_ACCEPT);
- config = new NioServerSocketChannelConfig(this, javaChannel().socket());
- }
这里首先调用了父类的构造方法, 最终调用了 AbstractNioChannel 类的构造方法, 这个过程我们在之前分析 NioSocketChannel 初始化的时候已经详细说过, 主要就是创建了内部的 Unsafe 对象和 ChannelPipeline 对象.
ServerBootstrap.init
分析完了 channel 的构造过程, 我们再来看一下 ServerBootstrap 是怎么对 channel 对象进行初始化的.
设置参数, 设置属性
获取子 channel 的参数和属性, 以便在有新的连接时给新创建的 channel 设置参数和属性
给 serverChannel 中添加一个重要的 handler, 这个 handler 中实现了对新创建的 channel 的处理逻辑.
所以, 很显然, 我们接下来就要看一下这个特殊的 handler,ServerBootstrapAcceptor 的 read 方法.
- void init(Channel channel) throws Exception {
- final Map<ChannelOption<?>, Object> options = options0();
- // 设置参数
- synchronized (options) {
- setChannelOptions(channel, options, logger);
- }
- // 设置属性
- final Map<AttributeKey<?>, Object> attrs = attrs0();
- synchronized (attrs) {
- for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
- @SuppressWarnings("unchecked")
- AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
- channel.attr(key).set(e.getValue());
- }
- }
- ChannelPipeline p = channel.pipeline();
- // 子 channel 的 group 和 handler 参数
- final EventLoopGroup currentChildGroup = childGroup;
- final ChannelHandler currentChildHandler = childHandler;
- final Entry<ChannelOption<?>, Object>[] currentChildOptions;
- final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
- synchronized (childOptions) {
- currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0));
- }
- synchronized (childAttrs) {
- currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));
- }
- // 添加处理器
- p.addLast(new ChannelInitializer<Channel>() {
- @Override
- public void initChannel(final Channel ch) throws Exception {
- final ChannelPipeline pipeline = ch.pipeline();
- // 一般情况下, 对于 ServerBootstrap 用户无需设置 handler
- ChannelHandler handler = config.handler();
- if (handler != null) {
- pipeline.addLast(handler);
- }
- // 这里添加了一个关键的 handler, 并且顺手启动了对应的 EventLoop 的线程
- ch.eventLoop().execute(new Runnable() {
- @Override
- public void run() {
- pipeline.addLast(new ServerBootstrapAcceptor(
- ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
- }
- });
- }
- });
- }
- NioEventLoop.processSelectedKey(SelectionKey k, AbstractNioChannel ch)
在分析 ServerBootstrapAcceptor 之前, 我们首先来回顾一下 NioEventLoop 的循环中, 对于 accept 事件的处理逻辑, 这里截取其中的一小段代码:
- // 处理 read 和 accept 事件
- if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
- unsafe.read();
- }
可见, 对于 accept 事件和 read 事件一样, 调用 NioUnsafe 的 read 方法
AbstractNioMessageChannel.NioMessageUnsafe.read
因为 NioServerSocketChannel 继承了 AbstractNioMessageChannel, 并且 read 方法的实现也是在 AbstractNioMessageChannel 中,
doReadMessages 是一个抽象方法, 在 NioServerSocketChannel 的实现中, 这个方法调用 jdk 的 API 接收一个连接, 并包装成 NioSocketChannel 对象
以读取到的 channel 对象作为消息, 在 channelPipeline 中触发一个读事件
根据前面对 channelPipeline 的分析, 我们知道, 读事件对从头结点开始, 向尾节点传播. 上面我们也提到了, 对于初始的那个 NioServerSocketChannel, 会在 ServerBootstarp 的 init 方法中向这个 channel 的处理链中加入一个 ServerBootstrapAcceptor 处理器, 所以, 很显然, 接下来我们应该分析 ServerBootstrapAcceptor 中对读事件的处理.
- public void read() {
- // 确认当前代码的执行是在 EventLoop 的线程中
- assert eventLoop().inEventLoop();
- final ChannelConfig config = config();
- final ChannelPipeline pipeline = pipeline();
- final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
- allocHandle.reset(config);
- boolean closed = false;
- Throwable exception = null;
- try {
- try {
- do {
- // 这里读取到的是建立的连接对应的 channel,
- // jdk 的 socketChannel 被包装成了 netty 的 NioSocketChannel
- int localRead = doReadMessages(readBuf);
- if (localRead == 0) {
- break;
- }
- if (localRead <0) {
- closed = true;
- break;
- }
- allocHandle.incMessagesRead(localRead);
- } while (allocHandle.continueReading());
- } catch (Throwable t) {
- exception = t;
- }
- int size = readBuf.size();
- for (int i = 0; i < size; i ++) {
- readPending = false;
- // 把接收到的每一个 channel 作为消息, 在 channelPipeline 中触发一个读事件
- pipeline.fireChannelRead(readBuf.get(i));
- }
- readBuf.clear();
- allocHandle.readComplete();
- // 最后触发一个读完成的事件
- pipeline.fireChannelReadComplete();
- if (exception != null) {
- closed = closeOnReadError(exception);
- pipeline.fireExceptionCaught(exception);
- }
- if (closed) {
- inputShutdown = true;
- if (isOpen()) {
- close(voidPromise());
- }
- }
- } finally {
- // Check if there is a readPending which was not processed yet.
- // This could be for two reasons:
- //* The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
- //* The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
- //
- // See https://github.com/netty/netty/issues/2254
- if (!readPending && !config.isAutoRead()) {
- removeReadOp();
- }
- }
- }
- }
- ServerBootstrapAcceptor.channelRead
代码逻辑还是比较简单的, 因为有了前面的铺垫, 即在 ServerBootstrap 的 init 方法对创始的那个 serverChannel 进行初始化时, 将用户设置的子 channel 的参数, 属性, 子 channel 的 handler 和子 group 等参数作为构造参数全部传给了 ServerBootstrapAcceptor, 所以在这里直接用就行了.
其实这里的子 channel 的初始化和注册过程和 Bootstrap 中对一个新创建的 channel 的初始化过程基本一样, 区别在于 Bootstrap 中 channel 是用户代码通过调用 connect 方法最终在 initAndregistry 中通过反射构造的一个对象; 而在服务端, 通过监听 ServerSocketChannel 的 accept 事件, 当有新的连接建立请求时, 会自动创建一个 SocketChannel(jdk 的代码实现), 然后 NioServerSocketChannel 将其包装成一个 NioSocketChannel, 并作为消息在传递给处理器, 所以在 ServerSocketChannel 中的子 channel 的创建是由底层的 jdk 的库实现的.
- public void channelRead(ChannelHandlerContext ctx, Object msg) {
- // 类型转换, 这里的强制转换是安全的的,
- // 是由各种具体的 AbstractNioMessageChannel 子类型的实现保证的
- // 各种具体的 AbstractNioMessageChannel 子类型的读方法确保它们读取并最终返回的是一个 Channel 类型
- final Channel child = (Channel) msg;
- // 给子 channel 添加 handler
- child.pipeline().addLast(childHandler);
- // 给子 channel 设置参数
- setChannelOptions(child, childOptions, logger);
- // 给子 channel 设置属性
- for (Entry<AttributeKey<?>, Object> e: childAttrs) {
- child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
- }
- try {
- // 将子 channel 注册到子 group 中
- childGroup.register(child).addListener(new ChannelFutureListener() {
- @Override
- public void operationComplete(ChannelFuture future) throws Exception {
- if (!future.isSuccess()) {
- forceClose(child, future.cause());
- }
- }
- });
- } catch (Throwable t) {
- forceClose(child, t);
- }
- }
- AbstractBootstrap.doBind0
回到 doBind 方法中, 在完成了 channel 的构造, 初始化和注册逻辑后, 接下来就要把这个 server 类型的 channel 绑定到一个地址上, 这样才能接受客户端建立连接的请求.
从代码中可以看出, 调用了 channel 的 bind 方法实现绑定的逻辑.
- private static void doBind0(
- final ChannelFuture regFuture, final Channel channel,
- final SocketAddress localAddress, final ChannelPromise promise) {
- // This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up
- // the pipeline in its channelRegistered() implementation.
- channel.eventLoop().execute(new Runnable() {
- @Override
- public void run() {
- if (regFuture.isSuccess()) {
- // 调用了 channel.bind 方法完成绑定的逻辑
- channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
- } else {
- promise.setFailure(regFuture.cause());
- }
- }
- });
- }
- AbstractChannel.bind
bind 操作的传递是从尾节点开始向前传递, 所以我们直接看 Headcontext 对于 bind 方法的实现
- public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
- return pipeline.bind(localAddress, promise);
- }
- DefaultChannelPipeline.bind
- public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
- return tail.bind(localAddress, promise);
- }
- HeadContext.bind
调用了 unsafe 的 bind 方法.
- public void bind(
- ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {
- unsafe.bind(localAddress, promise);
- }
因为后面右有几个事件的触发, 每个触发事件都是通过 channel 的相关方法来触发, 然后又是通过 channelpipeline 的传递事件, 这些事件最后基本都是由 HeadContext 处理了, 所以这里我只简单地叙述一下后面的 大概逻辑, 代码比较繁琐, 而且很多都是相同的调用过程, 所以就不贴代码了.
通过前面的分析, 我们知道首先通过 channel 触发了一个 bind 操作, 这个操作的实现最终由 HeadCOntex 实现, HeadContex 的实现中是调用了 unsafe.bind
bind 的实现逻辑中, 首先通过 jdk 的 API 完成了 ServerSocketChannel 的绑定, 然后又触发了一个 channelActive 的事件, 这个事件的处理最终也是有 HeadContext 实现
在 HeadContext 对 channelActive 操作的实现中, 触发了一个 read() 操作, 注意这里的这个 read 方法是不带参数的, 是 ChannelOutboundInvoker 接口中定义的一个方法, 也是有 HeadContext 实现
HeadContext 对 read 操作的实现中, 调用了 Unsafe.beginRead 方法, 经过几个子类的具体实现后, 最终由 AbstractNioChannel.doBeginRead 实现具体的开始读的逻辑,
从代码中可以看出来, 最终调用了 jdk 的 API, 将感兴趣的事件添加到 selectionKey 中. 通过前面的 分析, 我们知道对于 NioSocketChannel, 它的感兴趣的读事件类型是 SelectionKey.OP_READ, 也就是读事件;
而对于 NioServerSocketChannel, 根据前面对其构造方法的分析, 它的感兴趣的事件是 SelectionKey.OP_ACCEPT, 也就是建立连接的事件.
- protected void doBeginRead() throws Exception {
- // Channel.read() or ChannelHandlerContext.read() was called
- final SelectionKey selectionKey = this.selectionKey;
- if (!selectionKey.isValid()) {
- return;
- }
- readPending = true;
- // 将读事件类型加入到 selectionKey 的感兴趣的事件中
- // 这样 jdk 底层的 selector 就会监听相应类型的事件
- final int interestOps = selectionKey.interestOps();
- if ((interestOps & readInterestOp) == 0) {
- selectionKey.interestOps(interestOps | readInterestOp);
- }
- }
总结
到这里, 我们就把 ServerBootstrap 的主要功能代码分析完了, 这里面主要包括三个方面:
ServerBootstrap 中对 server 类型的 channel 的初始化, 包括最重要的 handler----ServerBootstrapAcceptor 的添加
ServerBootstrapAcceptor 中对于新创建的子 channel 的处理, 包括初始化和注册的逻辑
将 serverChannel 绑定到具体的地址上, 绑定过程中也启动了对应的注册的线程.
来源: https://www.cnblogs.com/zhuge134/p/11108493.html