写数据是 NIO Channel 实现的另一个比较复杂的功能. 每一个 channel 都有一个 outboundBuffer, 这是一个输出缓冲区. 当调用 channel 的 write 方法写数据时, 这个数据被一系列 ChannelOutboundHandler 处理之后, 它被放进这个缓冲区中, 并没有真正把数据写到 socket channel 中. 然后再调用 channel 的 flush 方法, flush 会把 outboundBuffer 中数据真正写到 socket channel. 正常情况下 flush 之后, 数据已经真正写完了. 但使用 Selector 加非阻塞 socket 的方式写数据, 让写操作变得复杂了. 操作系统为每个 socket 维护了一个数据发送缓冲区, 它的长度 SO_SNDBUF, 每次发送数据, 先把数据写到这个缓冲区中, 操作系统负责把这个发送缓冲区中的数据发送出去, 并清理这个缓冲区. 当向缓冲区写的速率大于系统的发送速率时, 它会被填满,在非阻塞模式下的表现为: 调用 socket 的 write 方法写入长度为 n 数据, 实际写入的数据长度 m 的范围是: 0=<m<n. 这个时候还剩下长度为 n-m 的数据没有写入到 socket, 而数据必须以正确的顺序完整地写入到 socket 中. outboundBuffer 正是为解决这个问题而设计的, 没写进 socket 的剩余数据会以正确的顺序保存在 outboundBuffer 中, 当发送缓冲区中有空间可以写时, 可以从 outboundBuffer 中取出剩余的数据继续写入到 socket 中.
Channel write 实现: 把数据写到 outboundBuffer 中
write 调用栈:
- io.netty.channel.AbstractChannel#write(java.lang.Object)
- io.netty.channel.DefaultChannelPipeline#write(java.lang.Object)
- io.netty.channel.AbstractChannelHandlerContext#write(java.lang.Object)
- io.netty.channel.AbstractChannelHandlerContext#write(java.lang.Object, io.netty.channel.ChannelPromise)
- io.netty.channel.AbstractChannelHandlerContext#write(java.lang.Object, boolean, io.netty.channel.ChannelPromise)
- io.netty.channel.AbstractChannelHandlerContext#invokeWrite
- io.netty.channel.DefaultChannelPipeline.HeadContext#write
- io.netty.channel.AbstractChannel.AbstractUnsafe#write
write 的主要逻辑在 io.netty.channel.AbstractChannel.AbstractUnsafe#write 中实现, 这个方法把要写的数据 msg 对象放到 outboundBuffer 中. 在执行 close 时, netty 不希望有希望写新的数据, 避免引起不可预料的错误, 因此会把 outboundBuffer 置为 null. 这里在向 outboundBuffer 写数据之前会把对它进行检查, 如果是 null 就抛出错误. 下面是这个 write 方法的实现.
- @Override
- public final void write(Object msg, ChannelPromise promise) {
- assertEventLoop();
- ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
- if (outboundBuffer == null) {
- safeSetFailure(promise, WRITE_CLOSED_CHANNEL_EXCEPTION);
- ReferenceCountUtil.release(msg);
- return;
- }
- int size;
- try {
- msg = filterOutboundMessage(msg);
- size = pipeline.estimatorHandle().size(msg);
- if (size <0) {
- size = 0;
- }
- } catch (Throwable t) {
- safeSetFailure(promise, t);
- ReferenceCountUtil.release(msg);
- return;
- }
- outboundBuffer.addMessage(msg, size, promise);
- }
第 5-9 行, 对 outboudBuffer 进行检查, 如果是 null 抛出错误. 这个里有个小细节, 用一个局部变量引用 outboundBuffer, 避免由其他线程对 this.outboundBuffer 置空引发错误.
14 行, 调用 filterOutboundMessage 对 msg 进行过滤. 这是一个 protected 方法, 默认实现是什么都没做, 返回输入的 msg 参数. 子类可以覆盖这个方法, 把 msg 转换成期望的类型.
15 行, 计算 msg 的长度.
25 行, 把放入到 outboundBuffer 中.
Channel flush 实现: 把数据真正写到 channel
flush 调用栈:
- io.netty.channel.AbstractChannel#flush
- io.netty.channel.DefaultChannelPipeline#flush
- io.netty.channel.AbstractChannelHandlerContext#flush
- io.netty.channel.AbstractChannelHandlerContext#invokeFlush
- io.netty.channel.DefaultChannelPipeline.HeadContext#flush
- io.netty.channel.AbstractChannel.AbstractUnsafe#flush
- io.netty.channel.AbstractChannel.AbstractUnsafe#flush0
- io.netty.channel.socket.nio.NioSocketChannel#doWrite
- io.netty.channel.nio.AbstractNioByteChannel#doWrite
- io.netty.channel.socket.nio.NioSocketChannel#doWriteBytes
以上是 io.netty.channel.socket.nio.NioSocketChannel 的 flush 调用栈, 对于 io.netty.channel.socket.nio.NioDatagramChannel 来说, 从第 8 行开始变得不同:
- io.netty.channel.AbstractChannel.AbstractUnsafe#flush0
- io.netty.channel.nio.AbstractNioMessageChannel#doWrite
- io.netty.channel.socket.nio.NioDatagramChannel#doWriteMessage
把 Byte 数据流写入 channel
io.netty.channel.socket.nio.NioSocketChannel#doWrite 是 Byte 数据流的写逻辑, io.netty.channel.nio.AbstractNioByteChannel#doWrite 也是, 这两者不同的地方在于前者是在 outboundBuffer 可以转换成 java.nio.ByteBuffer 的情况下执行, 后者是在 outboundBuffer 中的 msg 是 ByteBuf 或 FileRegin 类型时执行. 除此之外其他逻辑都一样:
尽量把 outboundBuffer 中的数据写到 channel 中.
如果 channel 无法写入数据, 在 channel 的 SelectionKey 上注册 OP_WRITE 事件, 等 channel 可写的时候再继续写入.
如写入次数超过限制, 把 flush 操作包装成 task 放到 eventLoop 排队, 等待再次执行.
下面来看看 io.netty.channel.socket.nio.NioSocketChannel#doWrite 的实现代码:
- @Override
- protected void doWrite(ChannelOutboundBuffer in) throws Exception {
- for (;;) {
- int size = in.size();
- if (size == 0) {
- // All written so clear OP_WRITE
- clearOpWrite();
- break;
- }
- long writtenBytes = 0;
- boolean done = false;
- boolean setOpWrite = false;
- // Ensure the pending writes are made of ByteBufs only.
- ByteBuffer[] nioBuffers = in.nioBuffers();
- int nioBufferCnt = in.nioBufferCount();
- long expectedWrittenBytes = in.nioBufferSize();
- SocketChannel ch = javaChannel();
- // Always us nioBuffers() to workaround data-corruption.
- // See https://github.com/netty/netty/issues/2761
- switch (nioBufferCnt) {
- case 0:
- // We have something else beside ByteBuffers to write so fallback to normal writes.
- super.doWrite(in);
- return;
- case 1:
- // Only one ByteBuf so use non-gathering write
- ByteBuffer nioBuffer = nioBuffers[0];
- for (int i = config().getWriteSpinCount() - 1; i>= 0; i --) {
- final int localWrittenBytes = ch.write(nioBuffer);
- if (localWrittenBytes == 0) {
- setOpWrite = true;
- break;
- }
- expectedWrittenBytes -= localWrittenBytes;
- writtenBytes += localWrittenBytes;
- if (expectedWrittenBytes == 0) {
- done = true;
- break;
- }
- }
- break;
- default:
- for (int i = config().getWriteSpinCount() - 1; i>= 0; i --) {
- final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);
- if (localWrittenBytes == 0) {
- setOpWrite = true;
- break;
- }
- expectedWrittenBytes -= localWrittenBytes;
- writtenBytes += localWrittenBytes;
- if (expectedWrittenBytes == 0) {
- done = true;
- break;
- }
- }
- break;
- }
- // Release the fully written buffers, and update the indexes of the partially written buffer.
- in.removeBytes(writtenBytes);
- if (!done) {
- // Did not write all buffers completely.
- incompleteWrite(setOpWrite);
- break;
- }
- }
- }
第 5-7 行, 如果 outboundBuffer 中已经没有数据了, 调用 clearOpWrite 方法清除 channel SelectionKey 上的 OP_WRITE 事件.
第 15-17 行, 把 outboundBuffer 转换成 ByteBuffer 类型, 并得到数据长度.
25 行, outboundBuffer 不能转换成 ByteBuffer, 调用 io.netty.channel.nio.AbstractNioByteChannel#doWrite 执行写操作.
29-42,45-57 的逻辑基本已经, 都是尽量把 ByteBuffer 中的数据写到 channel 中, 满足下列条件中的任意一个时, 结束本次写操作:
1. ByteBuffer 中的数据已经写完, 正常结束.
2. channel 已经不能写入数据, 需要在 channel 可以写是继续执行写操作.
3. 者超过 channel config 中写入次数限制, 需要选择合适的实际继续执行写操作.
62 行, 把已经写入到 channel 的数据从 outboundBuffer 中删除.
64-66 行, 如果数据没写完, 调用 incompleteWrite 处理没写完的情况. 当 setOpWrite==true 时, 在 channel 的 SelectionKey 上设置 OP_WRITE 事件, 等 eventLoop 触发这个事件时再继续执行 flush 操作. 否则, 把 flush 包装成 task 放到 eventLoop 中排队执行.
当 NioEventLoop 检测到 OP_WRITE 事件时, 会调用 processSelectedKey 方法处理:
- if ((readyOps & SelectionKey.OP_WRITE) != 0) {
- ch.unsafe().forceFlush();
- }
forceFlush 的调用栈如下:
- io.netty.channel.nio.AbstractNioChannel.AbstractNioUnsafe#forceFlush
- io.netty.channel.AbstractChannel.AbstractUnsafe#flush0
- io.netty.channel.socket.nio.NioSocketChannel#doWrite
- io.netty.channel.nio.AbstractNioByteChannel#doWrite
- io.netty.channel.socket.nio.NioSocketChannel#doWriteBytes
把数据写入 UDP 类型的 channel
- io.netty.channel.nio.AbstractNioMessageChannel#doWrite 是数据报的写逻辑. 相较于 Byte 流类型的数据, 数据报的写逻辑简单一些. 它只是把 outboundBuffer 中的数据报依次写入到 channel 中, 如果 channel 写满了, 在 channel 的 SelectionKey 上设置 OP_WRITE 事件随后退出, 其后 OP_WRITE 事件处理逻辑和 Byte 流写逻辑一样. 真正的写操作在 io.netty.channel.socket.nio.NioDatagramChannel#doWriteMessage 中实现, 这个方法的实现如下:
- @Override
- protected boolean doWriteMessage(Object msg, ChannelOutboundBuffer in) throws Exception {
- final SocketAddress remoteAddress;
- final ByteBuf data;
- if (msg instanceof AddressedEnvelope) {
- @SuppressWarnings("unchecked")
- AddressedEnvelope<ByteBuf, SocketAddress> envelope = (AddressedEnvelope<ByteBuf, SocketAddress>) msg;
- remoteAddress = envelope.recipient();
- data = envelope.content();
- } else {
- data = (ByteBuf) msg;
- remoteAddress = null;
- }
- final int dataLen = data.readableBytes();
- if (dataLen == 0) {
- return true;
- }
- final ByteBuffer nioData = data.internalNioBuffer(data.readerIndex(), dataLen);
- final int writtenBytes;
- if (remoteAddress != null) {
- writtenBytes = javaChannel().send(nioData, remoteAddress);
- } else {
- writtenBytes = javaChannel().write(nioData);
- }
- return writtenBytes> 0;
- }
5-9 行, 处理 AddressedEnvelope 类型的数据报, 得到数据报的远程地址和数据.
10-12 行, 发送的是一个 ByteBuf. 没有指定远程地址. 这种情况下需要先调用 channel 的 connect 方法.
20-26 行, 分别针对两种情况发送数据报. 23 行指定了远程地址, 25 行没有指定远程地址, 但调用过了 connect 方法.
来源: https://www.cnblogs.com/brandonli/p/10314962.html