Netty 源码分析第六章: 解码器
概述:
在我们上一个章节遗留过一个问题, 就是如果 Server 在读取客户端的数据的时候, 如果一次读取不完整, 就触发 channelRead 事件, 那么 Netty 是如何处理这类问题的, 在这一章中, 会对此做详细剖析
之前的章节我们学习过 pipeline, 事件在 pipeline 中传递, handler 可以将事件截取并对其处理, 而之后剖析的编解码器, 其实就是一个 handler, 截取 byteBuf 中的字节, 然后组建成业务需要的数据进行继续传播
编码器, 通常是 OutBoundHandler, 也就是以自身为基准, 对那些对外流出的数据做处理, 所以也叫编码器, 将数据经过编码发送出去
解码器, 通常是 inboundHandler, 也就是以自身为基准, 对那些流向自身的数据做处理, 所以也叫解码器, 将对向的数据接收之后经过解码再进行使用
同样, 在 netty 的编码器中, 也会对半包和粘包问题做相应的处理
什么是半包, 顾名思义, 就是不完整的数据包, 因为 netty 在轮询读事件的时候, 每次将 channel 中读取的数据, 不一定是一个完整的数据包, 这种情况, 就叫半包
粘包同样也不难理解, 如果 client 往 server 发送数据包, 如果发送频繁很有可能会将多个数据包的数据都发送到通道中, 如果在 server 在读取的时候可能会读取到超过一个完整数据包的长度, 这种情况叫粘包
有关半包和粘包, 入下图所示:
6-0-1
netty 对半包的或者粘包的处理其实也很简单, 通过之前的学习, 我们知道, 每个 handler 是和 channel 唯一绑定的, 一个 handler 只对应一个 channel, 所以将 channel 中的数据读取时候经过解析, 如果不是一个完整的数据包, 则解析失败, 将这块数据包进行保存, 等下次解析时再和这个数据包进行组装解析, 直到解析到完整的数据包, 才会将数据包进行向下传递
具体流程是在代码中如何体现的呢? 我们进入到源码分析中
第一节: ByteToMessageDecoder
ByteToMessageDecoder 解码器, 顾名思义, 是一个将 Byte 解析成消息的解码器,
我们看他的定义:
- public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter{
- // 类体省略
- }
这里继承了 ChannelInboundHandlerAdapter, 根据之前的学习, 我们知道, 这是个 inbound 类型的 handler, 也就是处理流向自身事件的 handler
其次, 该类通过 abstract 关键字修饰, 说明是个抽象类, 在我们实际使用的时候, 并不是直接使用这个类, 而是使用其子类, 类定义了解码器的骨架方法, 具体实现逻辑交给子类, 同样, 在半包处理中也是由该类进行实现的
netty 中很多解码器都实现了这个类, 并且, 我们也可以通过实现该类进行自定义解码器
我们重点关注一下该类的一个属性:
ByteBuf cumulation;
这个属性, 就是有关半包处理的关键属性, 从概述中我们知道, netty 会将不完整的数据包进行保存, 这个数据包就是保存在这个属性中
之前的学习我们知道, ByteBuf 读取完数据会传递 channelRead 事件, 传播过程中会调用 handler 的 channelRead 方法, ByteToMessageDecoder 的 channelRead 方法, 就是编码的关键部分
我们看其 channelRead 方法:
- public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
- // 如果 message 是 byteBuf 类型
- if (msg instanceof ByteBuf) {
- // 简单当成一个 arrayList, 用于盛放解析到的对象
- CodecOutputList out = CodecOutputList.newInstance();
- try {
- ByteBuf data = (ByteBuf) msg;
- // 当前累加器为空, 说明这是第一次从 io 流里面读取数据
- first = cumulation == null;
- if (first) {
- // 如果是第一次, 则将累加器赋值为刚读进来的对象
- cumulation = data;
- } else {
- // 如果不是第一次, 则把当前累加的数据和读进来的数据进行累加
- cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
- }
- // 调用子类的方法进行解析
- callDecode(ctx, cumulation, out);
- } catch (DecoderException e) {
- throw e;
- } catch (Throwable t) {
- throw new DecoderException(t);
- } finally {
- if (cumulation != null && !cumulation.isReadable()) {
- numReads = 0;
- cumulation.release();
- cumulation = null;
- } else if (++ numReads>= discardAfterReads) {
- numReads = 0;
- discardSomeReadBytes();
- }
- // 记录 list 长度
- int size = out.size();
- decodeWasNull = !out.insertSinceRecycled();
- // 向下传播
- fireChannelRead(ctx, out, size);
- out.recycle();
- }
- } else {
- // 不是 byteBuf 类型则向下传播
- ctx.fireChannelRead(msg);
- }
- }
这方法比较长, 带大家一步步剖析
首先判断如果传来的数据是 ByteBuf, 则进入 if 块中
CodecOutputList out = CodecOutputList.newInstance() 这里就当成一个 ArrayList 就好, 用于盛放解码完成的数据
ByteBuf data = (ByteBuf) msg 这步将数据转化成 ByteBuf
first = cumulation == null 这里表示如果 cumulation == null, 说明没有存储板半包数据, 则将当前的数据保存在属性 cumulation 中
如果 cumulation != null , 说明存储了半包数据, 则通过 cumulator.cumulate(ctx.alloc(), cumulation, data)将读取到的数据和原来的数据进行累加, 保存在属性 cumulation 中
我们看 cumulator 属性:
private Cumulator cumulator = MERGE_CUMULATOR;
这里调用了其静态属性 MERGE_CUMULATOR, 我们跟过去:
- public static final Cumulator MERGE_CUMULATOR = new Cumulator() {
- @Override
- public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
- ByteBuf buffer;
- // 不能到过最大内存
- if (cumulation.writerIndex()> cumulation.maxCapacity() - in.readableBytes()
- || cumulation.refCnt()> 1) {
- buffer = expandCumulation(alloc, cumulation, in.readableBytes());
- } else {
- buffer = cumulation;
- }
- // 将当前数据 buffer
- buffer.writeBytes(in);
- in.release();
- return buffer;
- }
- };
这里创建了 Cumulator 类型的静态对象, 并重写了 cumulate 方法, 这里 cumulate 方法, 就是用于将 ByteBuf 进行拼接的方法:
方法中, 首先判断 cumulation 的写指针 + in 的可读字节数是否超过了 cumulation 的最大长度, 如果超过了, 将对 cumulation 进行扩容, 如果没超过, 则将其赋值到局部变量 buffer 中
然后将 in 的数据写到 buffer 中, 将 in 进行释放, 返回写入数据后的 ByteBuf
回到 channelRead 方法中:
最后通过 callDecode(ctx, cumulation, out)方法进行解码, 这里传入了 Context 对象, 缓冲区 cumulation 和集合 out:
我们跟到 callDecode(ctx, cumulation, out)方法中:
- protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
- try {
- // 只要累加器里面有数据
- while (in.isReadable()) {
- int outSize = out.size();
- // 判断当前 List 是否有对象
- if (outSize> 0) {
- // 如果有对象, 则向下传播事件
- fireChannelRead(ctx, out, outSize);
- // 清空当前 list
- out.clear();
- // 解码过程中如 ctx 被 removed 掉就 break
- if (ctx.isRemoved()) {
- break;
- }
- outSize = 0;
- }
- // 当前可读数据长度
- int oldInputLength = in.readableBytes();
- // 子类实现
- // 子类解析, 解析玩对象放到 out 里面
- decode(ctx, in, out);
- if (ctx.isRemoved()) {
- break;
- }
- //List 解析前大小 和解析后长度一样(什么没有解析出来)
- if (outSize == out.size()) {
- // 原来可读的长度 == 解析后可读长度
- // 说明没有读取数据(当前累加的数据并没有拼成一个完整的数据包)
- if (oldInputLength == in.readableBytes()) {
- // 跳出循环(下次在读取数据才能进行后续的解析)
- break;
- } else {
- // 没有解析到数据, 但是进行读取了
- continue;
- }
- }
- //out 里面有数据, 但是没有从累加器读取数据
- if (oldInputLength == in.readableBytes()) {
- throw new DecoderException(
- StringUtil.simpleClassName(getClass()) +
- ".decode() did not read anything but decoded a message.");
- }
- if (isSingleDecode()) {
- break;
- }
- }
- } catch (DecoderException e) {
- throw e;
- } catch (Throwable cause) {
- throw new DecoderException(cause);
- }
- }
这里首先循环判断传入的 ByteBuf 是否有可读字节, 如果还有可读字节说明没有解码完成, 则循环继续解码
然后判断集合 out 的大小, 如果大小大于 1, 说明 out 中盛放了解码完成之后的数据, 然后将事件向下传播, 并清空 out
因为我们第一次解码 out 是空的, 所以这里不会进入 if 块, 这部分我们稍后分析, 这里继续往下看
通过 int oldInputLength = in.readableBytes() 获取当前 ByteBuf, 其实也就是属性 cumulation 的可读字节数, 这里就是一个备份用于比较, 我们继续往下看:
decode(ctx, in, out)方法是最终的解码操作, 这部会读取 cumulation 并且将解码后的数据放入到集合 out 中, 在 ByteToMessageDecoder 中的该方法是一个抽象方法, 让子类进行实现, 我们使用的 netty 很多的解码都是继承了 ByteToMessageDecoder 并实现了 decode 方法从而完成了解码操作, 同样我们也可以遵循相应的规则进行自定义解码器, 在之后的小节中会讲解 netty 定义的解码器, 并剖析相关的实现细节, 这里我们继续往下看:
if (outSize == out.size()) 这个判断表示解析之前的 out 大小和解析之后 out 大小进行比较, 如果相同, 说明并没有解析出数据, 我们进入到 if 块中:
if (oldInputLength == in.readableBytes()) 表示 cumulation 的可读字节数在解析之前和解析之后是相同的, 说明解码方法中并没有解析数据, 也就是当前的数据并不是一个完整的数据包, 则跳出循环, 留给下次解析, 否则, 说明没有解析到数据, 但是读取了, 所以跳过该次循环进入下次循环
最后判断 if (oldInputLength == in.readableBytes()) , 这里代表 out 中有数据, 但是并没有从 cumulation 读数据, 说明这个 out 的内容是非法的, 直接抛出异常
我们回到 channRead 方法中:
我们关注 finally 中的内容:
- finally {
- if (cumulation != null && !cumulation.isReadable()) {
- numReads = 0;
- cumulation.release();
- cumulation = null;
- } else if (++ numReads>= discardAfterReads) {
- numReads = 0;
- discardSomeReadBytes();
- }
- // 记录 list 长度
- int size = out.size();
- decodeWasNull = !out.insertSinceRecycled();
- // 向下传播
- fireChannelRead(ctx, out, size);
- out.recycle();
- }
首先判断 cumulation 不为 null, 并且没有可读字节, 则将累加器进行释放, 并设置为 null
之后记录 out 的长度, 通过 fireChannelRead(ctx, out, size)将 channelRead 事件进行向下传播, 并回收 out 对象
我们跟到 fireChannelRead(ctx, out, size)方法中:
- static void fireChannelRead(ChannelHandlerContext ctx, CodecOutputList msgs, int numElements) {
- // 遍历 List
- for (int i = 0; i <numElements; i ++) {
- // 逐个向下传递
- ctx.fireChannelRead(msgs.getUnsafe(i));
- }
- }
这里遍历 out 集合, 并将里面的元素逐个向下传递
以上就是有关解码的骨架逻辑
来源: http://www.bubuko.com/infodetail-2905456.html