概述
一个问题
编码器实现了
ChannelOutboundHandler
, 并将出站数据从 一种格式转换为另一种格式, 和我们方才学习的解码器的功能正好相反. Netty 提供了一组类, 用于帮助你编写具有以下功能的编码器:
将消息编码为字节
将消息编码为消息
我们将首先从抽象基类 MessageToByteEncoder 开始来对这些类进行考察
1 抽象类 MessageToByteEncoder
MessageToByteEncoder API
解码器通常需要在 Channel 关闭之后产生最后一个消息 (因此也就有了 decodeLast() 方法)
这显然不适于编码器的场景 -- 在连接被关闭之后仍然产生一个消息是毫无意义的
1.1 ShortToByteEncoder
其接受一 Short 型实例作为消息, 编码为 Short 的原子类型值, 并写入 ByteBuf, 随后转发给 ChannelPipeline 中的下一个 ChannelOutboundHandler
每个传出的 Short 值都将会占用 ByteBuf 中的 2 字节
ShortToByteEncoder
1.2 Encoder
Netty 提供了一些专门化的 MessageToByteEncoder, 可基于此实现自己的编码器
webSocket08FrameEncoder 类提供了一个很好的实例
2 抽象类 MessageToMessageEncoder
你已经看到了如何将入站数据从一种消息格式解码为另一种
为了完善这幅图, 将展示 对于出站数据将如何从一种消息编码为另一种. MessageToMessageEncoder 类的 encode()方法提供了这种能力
MessageToMessageEncoderAPI
为了演示, 使用
IntegerToStringEncoder
扩展了
MessageToMessageEncoder
编码器将每个出站 Integer 的 String 表示添加到了该 List 中
IntegerToStringEncoder 的设计
关于有趣的 MessageToMessageEncoder 的专业用法, 请查看 io.netty.handler. codec.protobuf.ProtobufEncoder 类, 它处理了由 Google 的 Protocol Buffers 规范所定义 的数据格式.
一个 java 对象最后是如何转变成字节流, 写到 socket 缓冲区中去的
pipeline 中的标准链表结构
java 对象编码过程
write: 写队列
flush: 刷新写队列
writeAndFlush: 写队列并刷新
pipeline 中的标准链表结构
标准的 pipeline 链式结构
数据从 head 节点流入, 先拆包, 然后解码成业务对象, 最后经过业务 Handler 处理, 调用 write, 将结果对象写出去
而写的过程先通过 tail 节点, 然后通过 encoder 节点将对象编码成 ByteBuf, 最后将该 ByteBuf 对象传递到 head 节点, 调用底层的 Unsafe 写到 JDK 底层管道
Java 对象编码过程
为什么我们在 pipeline 中添加了 encoder 节点, java 对象就转换成 netty 可以处理的 ByteBuf, 写到管道里?
我们先看下调用 write 的 code
业务处理器接受到请求之后, 做一些业务处理, 返回一个 user
然后, user 在 pipeline 中传递
AbstractChannel#
DefaultChannelPipeline#
AbstractChannelHandlerContext#
AbstractChannelHandlerContext#
情形一
AbstractChannelHandlerContext#
AbstractChannelHandlerContext#
情形二
AbstractChannelHandlerContext#
AbstractChannelHandlerContext#invokeWrite0
AbstractChannelHandlerContext#invokeFlush0
handler 如果不覆盖 flush 方法, 就会一直向前传递直到 head 节点
落到 Encoder 节点, 下面是 Encoder 的处理流程
按照简单自定义协议, 将 Java 对象 User 写到传入的参数 out 中, 这个 out 到底是什么?
需知 User 对象, 从 BizHandler 传入到 MessageToByteEncoder 时, 首先传到 write
1. 判断当前 Handelr 是否能处理写入的消息(匹配对象)
判断该对象是否是该类型参数匹配器实例可匹配到的类型
TypeParameterMatcher#
具体实例
2 分配内存
3 编码实现
调用 encode, 这里就调回到 Encoder 这个 Handler 中
其为抽象方法, 因此自定义实现类实现编码方法
4 释放对象
既然自定义 Java 对象转换成 ByteBuf 了, 那么这个对象就已经无用, 释放掉 (当传入的 msg 类型是 ByteBuf 时, 就不需要自己手动释放了)
5 传播数据
//112 如果 buf 中写入了数据, 就把 buf 传到下一个节点, 直到 header 节点
6 释放内存
- //115 否则, 释放 buf, 将空数据传到下一个节点
- // 120 如果当前节点不能处理传入的对象, 直接扔给下一个节点处理
- // 127 当 buf 在 pipeline 中处理完之后, 释放
Encoder 处理传入的 Java 对象
判断当前 Handler 是否能处理写入的消息
如果能处理, 进入下面的流程
否则, 直接扔给下一个节点处理
将对象强制转换成 Encoder 可以处理的 Response 对象
分配一个 ByteBuf
调用 encoder, 即进入到 Encoder 的 encode 方法, 该方法是用户代码, 用户将数据写入 ByteBuf
既然自定义 Java 对象转换成 ByteBuf 了, 那么这个对象就已经无用了, 释放掉(当传入的 msg 类型是 ByteBuf 时, 无需自己手动释放)
如果 buf 中写入了数据, 就把 buf 传到下一个节点, 否则, 释放 buf, 将空数据传到下一个节点
最后, 当 buf 在 pipeline 中处理完之后, 释放节点
总结就是, Encoder 节点分配一个 ByteBuf, 调用 encode 方法, 将 Java 对象根据自定义协议写入到 ByteBuf, 然后再把 ByteBuf 传入到下一个节点, 在我们的例子中, 最终会传入到 head 节点
- public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
- unsafe.write(msg, promise);
- }
这里的 msg 就是前面在 Encoder 节点中, 载有 java 对象数据的自定义 ByteBuf 对象
write - 写 buffer 队列
ChannelOutboundInvoker#
write(Object msg, boolean flush, ChannelPromise promise)
HeadContext in DefaultChannelPipeline#write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
Unsafe in Channel#write(Object msg, ChannelPromise promise)
以下过程分三步讲解
direct ByteBuf
AbstractChannel#filterOutboundMessage(Object msg)
首先, 调用 assertEventLoop 确保该方法的调用是在 reactor 线程中
然后, 调用
filterOutboundMessage()
, 将待写入的对象过滤, 把非 ByteBuf 对象和 FileRegion 过滤, 把所有的非直接内存转换成直接内存 DirectBuffer
AbstractNioChannel#newDirectBuffer
插入写队列
接下来, 估算出需要写入的 ByteBuf 的 size
最后, 调用 ChannelOutboundBuffer 的 addMessage(msg, size, promise) 方法, 所以, 接下来, 我们需要重点看一下这个方法干了什么事情
ChannelOutboundBuffer
想要理解上面这段代码, 须掌握写缓存中的几个消息指针
ChannelOutboundBuffer 里面的数据结构是一个单链表结构, 每个节点是一个 Entry,Entry 里面包含了待写出 ByteBuf 以及消息回调 promise 下面分别是
三个指针的作用
flushedEntry
表第一个被写到 OS Socket 缓冲区中的节点
ChannelOutboundBuffer
unFlushedEntry
表第一个未被写入到 OS Socket 缓冲区中的节点
ChannelOutboundBuffer
tailEntry
表
ChannelOutboundBuffer
缓冲区的最后一个节点
ChannelOutboundBuffer
图解过程
初次调用 write 即 addMessage 后
fushedEntry 指向空, unFushedEntry 和 tailEntry 都指向新加入节点
第二次调用 addMessage 后
第 n 次调用 addMessage 后
可得, 调用 n 次 addMessage 后
flushedEntry 指针一直指向 null, 表此时尚未有节点需写到 Socket 缓冲区
unFushedEntry 后有 n 个节点, 表当前还有 n 个节点尚未写到 Socket 缓冲区
设置写状态
ChannelOutboundBuffer#addMessage
统计当前有多少字节需要需要被写出
ChannelOutboundBuffer#addMessage(Object msg, int size, ChannelPromise promise)
当前缓冲区中有多少待写字节
ChannelOutboundBuffer#
ChannelConfig#getWriteBufferHighWaterMark()
所以默认不能超过 64k
WriteBufferWaterMark
自旋锁 + CAS 操作, 通过 pipeline 将事件传播到 channelhandler 中监控
flush: 刷新 buffer 队列
添加刷新标志并设置写状态
不管调用 channel.flush(), 还是 ctx.flush(), 最终都会落地到 pipeline 中的 head 节点
DefaultChannelPipeline#flush
之后进入到 AbstractUnsafe
AbstractChannel#flush()
flush 方法中, 先调用
ChannelOutboundBuffer#addFlush
ChannelOutboundBuffer#decrementPendingOutboundBytes(long size, boolean invokeLater, boolean notifyWritability)
和之前那个实例相同, 不再赘述
结合前面的图来看, 上述过程即
首先拿到 unflushedEntry 指针, 然后将 flushedEntry 指向 unflushedEntry 所指向的节点, 调用完毕后
遍历 buffer 队列, 过滤 bytebuf
接下来, 调用 flush0()
发现这里的核心代码就一个 doWrite
AbstractChannel#
AbstractNioByteChannel
继续跟
- protected void doWrite(ChannelOutboundBuffer in) throws Exception {
- int writeSpinCount = -1;
- boolean setOpWrite = false;
- for (;;) {
- // 拿到第一个需要 flush 的节点的数据
- Object msg = in.current();
- if (msg instanceof ByteBuf) {
- boolean done = false;
- long flushedAmount = 0;
- // 拿到自旋锁迭代次数
- if (writeSpinCount == -1) {
- writeSpinCount = config().getWriteSpinCount();
- }
- // 自旋, 将当前节点写出
- for (int i = writeSpinCount - 1; i>= 0; i --) {
- int localFlushedAmount = doWriteBytes(buf);
- if (localFlushedAmount == 0) {
- setOpWrite = true;
- break;
- }
- flushedAmount += localFlushedAmount;
- if (!buf.isReadable()) {
- done = true;
- break;
- }
- }
- in.progress(flushedAmount);
- // 写完之后, 将当前节点删除
- if (done) {
- in.remove();
- } else {
- break;
- }
- }
- }
- }
第一步, 调用 current()先拿到第一个需要 flush 的节点的数据
ChannelOutboundBuffer#current
第二步, 拿到自旋锁的迭代次数
第三步 调用 JDK 底层 API 进行自旋写
自旋的方式将 ByteBuf 写到 JDK NIO 的 Channel
强转为 ByteBuf, 若发现没有数据可读, 直接删除该节点
拿到自旋锁迭代次数
image.PNG
在并发编程中使用自旋锁可以提高内存使用率和写的吞吐量, 默认值为 16
ChannelConfig
继续看源码
AbstractNioByteChannel#
javaChannel(), 表明 JDK NIO Channel 已介入此次事件
NioSocketChannel#
ByteBuf#readBytes(GatheringByteChannel out, int length)
得到向 JDK 底层已经写了多少字节
PooledDirectByteBuf#
从 Netty 的 bytebuf 写到 JDK 底层的 bytebuffer
第四步, 删除该节点
节点的数据已经写入完毕, 接下来就需要删除该节点
首先拿到当前被 flush 掉的节点(flushedEntry 所指)
然后拿到该节点的回调对象 ChannelPromise, 调用 removeEntry()移除该节点
这里是逻辑移除, 只是将 flushedEntry 指针移到下个节点, 调用后
随后, 释放该节点数据的内存, 调用 safeSuccess 回调, 用户代码可以在回调里面做一些记录, 下面是一段 Example
- ctx.write(xx).addListener(new GenericFutureListener<Future<? super Void>>() {
- @Override
- public void operationComplete(Future<? super Void> future) throws Exception {
- // 回调
- }
- })
最后, 调用 recycle, 将当前节点回收
writeAndFlush: 写队列并刷新
writeAndFlush 在某个 Handler 中被调用之后, 最终会落到 TailContext 节点
- public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
- write(msg, true, promise);
- return promise;
- }
AbstractChannelHandlerContext#
AbstractChannelHandlerContext#
最终, 通过一个 boolean 变量, 表示是调用 invokeWriteAndFlush, 还是 invokeWrite,invokeWrite 便是我们上文中的 write 过程
AbstractChannelHandlerContext#
可以看到, 最终调用的底层方法和单独调用 write 和 flush 一样的
由此看来, invokeWriteAndFlush 基本等价于 write 之后再来一次 flush
总结
调用 write 并没有将数据写到 Socket 缓冲区中, 而是写到了一个单向链表的数据结构中, flush 才是真正的写出
writeAndFlush 等价于先将数据写到 netty 的缓冲区, 再将 netty 缓冲区中的数据写到 Socket 缓冲区中, 写的过程与并发编程类似, 用自旋锁保证写成功
netty 中的缓冲区中的 ByteBuf 为 DirectByteBuf
当 BizHandler 通过 writeAndFlush 方法将自定义对象往前传播时, 其实可以拆分成两个过程
通过 pipeline 逐渐往前传播, 传播到其中的一个 encode 节点后, 其负责重写 write 方法将自定义的对象转化为 ByteBuf, 接着继续调用 write 向前传播
pipeline 中的编码器原理是创建一个 ByteBuf, 将 Java 对象转换为 ByteBuf, 然后再把 ByteBuf 继续向前传递, 若没有再重写了, 最终会传播到 head 节点, 其中缓冲区列表拿到缓存写到 JDK 底层 ByteBuffer
来源: http://www.jianshu.com/p/6f45540d4b62