前言
在网络传输过程中, 字节是最基本也是最小的单元. JAVA NIO 有提供一个 ByteBuffer 容器去装载这些数据, 但是用起来会有点复杂, 经常要在读写间进行切换以及不支持动态扩展等等. 而 netty 为我们提供了一个 ByteBuf 组件, 功能是很强大的, 本文主要对 ByteBuf 进行一些讲解, 中间会穿插着和 ByteBuffer 进行对比.
优势
ByteBuf 与 ByteBuffer 的相比的优势:
读和写用不同的索引.
读和写可以随意的切换, 不需要调用 flip() 方法.
容量能够被动态扩展, 和 StringBuilder 一样.
用其内置的复合缓冲区可实现透明的零拷贝.
支持方法链.
支持引用计数. count == 0,release.
支持池.
下面将会对每一种优势进行详细的解读.
读写索引
ByteBuffer 读写同用 position 索引, 利用 flip() 方法切换读写模式, 而 ByteBuf 读写分不同的索引, 读用 readIndex, 写用 writeIndex, 这样可以更加方便我们进行操作, 省去了 flip 这一步骤. ByteBuffer 与 ByteBuf 两种读写模型会在下面用图解形式给大家进行说明.
bytebuffer.PNG
可以根据下面简单的代码自行测试一下:
- ByteBuffer byteBuffer = ByteBuffer.allocate(8);
- System.err.println("startPosition:" + byteBuffer.position() + ",limit:" + byteBuffer.limit() + ",capacity:" + byteBuffer.capacity());
- byteBuffer.put("abc".getBytes());
- System.err.println("writePosition:" + byteBuffer.position() + ",limit:" + byteBuffer.limit() + ",capacity:" + byteBuffer.capacity());
- byteBuffer.flip();
- System.err.println("readPosition:" + byteBuffer.position() + ",limit:" + byteBuffer.limit() + ",capacity:" + byteBuffer.capacity());
bytebuf.PNG
可以根据下面简单的代码自行测试一下:
- ByteBuf heapBuffer = Unpooled.buffer(8);
- int startWriterIndex = heapBuffer.writerIndex();
- System.err.println("startWriterIndex:" + startWriterIndex);
- int startReadIndex = heapBuffer.readerIndex();
- System.err.println("startReadIndex:" + startReadIndex);
- System.err.println("capacity:" + heapBuffer.capacity());
- System.err.println("========================");
- for (int i = 0; i < 3; i++) {
- heapBuffer.writeByte(i);
- }
- int writerIndex = heapBuffer.writerIndex();
- System.err.println("writerIndex:" + writerIndex);
- heapBuffer.readBytes(2);
- int readerIndex = heapBuffer.readerIndex();
- System.err.println("readerIndex:" + readerIndex);
- System.err.println("capacity:" + heapBuffer.capacity());
动态扩展
ByteBuffer 是不支持动态扩展的, 给定一个具体的 capacity, 一旦 put 进去的数据超过其容量, 就会抛出 java.nio.BufferOverflowException 异常, 而 ByteBuf 完美的解决了这一问题, 支持动态扩展其容量.
零拷贝
netty 提供了 CompositeByteBuf 类实现零拷贝. 大多数情况下, 在进行网络数据传输时我们会将消息分为消息头 head 和消息体 body, 甚至还会有其他部分, 这里我们简单的分为两部分来进行探讨:
以前的做法
- ByteBuffer header = ByteBuffer.allocate(1);
- header.put("a".getBytes());
- header.flip();
- ByteBuffer body = ByteBuffer.allocate(1);
- body.put("b".getBytes());
- body.flip();
- ByteBuffer message = ByteBuffer.allocate(header.remaining() + body.remaining());
- message.put(header);
- message.put(body);
- message.flip();
- while (message.hasRemaining()){
- System.err.println((char)message.get());
- }
这样为了得到完整的消息体相当于对内存进行了多余的两次拷贝, 造成了很大的资源的浪费.
netty 提供的方法
- CompositeByteBuf messageBuf = Unpooled.compositeBuffer();
- ByteBuf headerBuf = Unpooled.buffer(1);
- headerBuf.writeByte('a');
- ByteBuf bodyBuf = Unpooled.buffer(1);
- bodyBuf.writeByte('b');
- messageBuf.addComponents(headerBuf, bodyBuf);
- for (ByteBuf buf : messageBuf) {
- System.out.println((char)buf.readByte());
- System.out.println(buf.toString());
- }
这里通过 CompositeByteBuf 对象将 headerBuf 与 bodyBuf 组合到了一起, 也得到了完整的消息体, 但是并未进行内存上的拷贝. 可以注意下我在上面代码段中进行的 buf.toString() 方法的调用, 得出来的结果是: 指向的还是原来分配的空间地址, 也就证明了零拷贝的观点.
支持引用计数
看一段简单的代码段:
- ByteBuf buffer = Unpooled.buffer(1);
- int i = buffer.refCnt();
- System.err.println("refCnt :" + i); //refCnt : 1
- buffer.retain();
- buffer.retain();
- buffer.retain();
- buffer.retain();
- i = buffer.refCnt();
- System.err.println("refCnt :" + i); //refCnt : 5
- boolean release = buffer.release();
- i = buffer.refCnt();
- System.err.println("refCnt :" + i + "=====" + release); //refCnt : 4 ===== false
- release = buffer.release(4);
- i = buffer.refCnt();
- System.err.println("refCnt :" + i + "=====" + release); //refCnt : 0 ===== true
这里我感觉就是 AQS 差不多的概念, retain 和 lock 类似, release 和 unlock 类似, 内部维护一个计数器, 计数器到 0 的时候就表示已经释放掉了. 往一个已经被 release 掉的 buffer 中去写数据, 会抛出 IllegalReferenceCountException: refCnt: 0 异常.
在 Netty in Action https://book.douban.com/subject/24700704/ 一书中对其的介绍是:
- The idea behind reference counting isn't particularly complex; mostly it involves
- tracking the number of active references to a specified object. A ReferenceCounted
implementation instance will normally start out with an active reference count of 1. As long as the reference count is greater than 0, the object is guaranteed not to be released.When the number of active references decreases to 0, the instance will be released. Note that while the precise meaning of release may be implementation-specific, at the very least an object that has been released should no longer be available for use.
引用计数器实现的原理并不复杂, 仅仅只是涉及到一个指定对象的活动引用, 对象被初始化后引用计数值为 1. 只要引用计数大于 0, 这个对象就不会被释放, 当引用计数减到为 0 时, 这个实例就会被释放, 被释放的对象不应该再被使用.
支持池
Netty 对 ByteBuf 的分配提供了池支持, 具体的类是 PooledByteBufAllocator. 用这个分配器去分配 ByteBuf 可以提升性能以及减少内存碎片. Netty 中默认用 PooledByteBufAllocator 当做 ByteBuf 的分配器. PooledByteBufAllocator 对象可以从 Channel 中或者绑定了 Channel 的 ChannelHandlerContext 中去获取到.
- Channel channel = ...;
- ByteBufAllocator allocator = channel.alloc();
- ...
- ChannelHandlerContext ctx = ...;
- ByteBufAllocator allocator2 = ctx.alloc();
API 介绍 (介绍容易混淆的几个)
创建 ByteBuf
- // 创建一个 heapBuffer, 是在堆内分配的
- ByteBuf heapBuf = Unpooled.buffer(5);
- if (heapBuf.hasArray()) {
- byte[] array = heapBuf.array();
- int offset = heapBuf.arrayOffset() + heapBuf.readerIndex();
- int length = heapBuf.readableBytes();
- handleArray(array, offset, length);
- }
- // 创建一个 directBuffer, 是分配的堆外内存
- ByteBuf directBuf = Unpooled.directBuffer();
- if (!directBuf.hasArray()) {
- int length = directBuf.readableBytes();
- byte[] array = new byte[length];
- directBuf.getBytes(directBuf.readerIndex(), array);
- handleArray(array, 0, length);
- }
这两者的主要区别:
a. 分配的堆外内存空间, 在进行网络传输时就不用进行拷贝, 直接被网卡使用. 但是这些空间想要被 jvm 所使用, 必须拷贝到堆内存中.
b. 分配和释放堆外内存相比堆内存而言, 是相当昂贵的.
c. 使用这两者 buffer 中的数据的方式也略有不同, 见上面的代码段.
读写数据 (readByte writeByte)
- ByteBuf heapBuf = Unpooled.buffer(5);
- heapBuf.writeByte(1);
- System.err.println("writeIndex :" + heapBuf.writerIndex());//writeIndex : 1
- heapBuf.readByte();
- System.err.println("readIndex :" + heapBuf.readerIndex());//readIndex : 1
- heapBuf.setByte(2, 2);
- System.err.println("writeIndex :" + heapBuf.writerIndex());//writeIndex : 1
- heapBuf.getByte(2);
- System.err.println("readIndex :" + heapBuf.readerIndex());//readIndex : 1
进行 readByte 和 writeByte 方法的调用时会改变 readIndex 和 writeIndex 的值, 而调用 set 和 get 方法时不会改变 readIndex 和 writeIndex 的值. 上面的测试案例中打印的 writeIndex 和 readIndex 均为 1, 并未在调用 set 和 get 方法后被改变.
discardReadBytes 方法
先看一张图:
discardReadBytes.PNG
从上面的图中可以观察到, 调用 discardReadBytes 方法后, readIndex 置为 0,writeIndex 也往前移动了 Discardable bytes 长度的距离, 扩大了可写区域. 但是这种做法会严重影响效率, 它进行了大量的拷贝工作. 如果要进行数据的清除操作, 建议使用 clear 方法. 调用 clear() 方法将会将 readIndex 和 writeIndex 同时置为 0, 不会进行内存的拷贝工作, 同时要注意, clear 方法不会清除内存中的内容, 只是改变了索引位置而已.
Derived buffers
这里介绍三个方法 (浅拷贝):
duplicate(): 直接拷贝整个 buffer.
slice(): 拷贝 buffer 中已经写了的数据.
slice(index,length): 拷贝 buffer 中从 index 开始, 长度为 length 的数据.
readSlice(length): 从当前 readIndex 读取 length 长度的数据.
我对上面这几个方法的形容虽然是拷贝, 但是这几个方法并没有实际意义上去复制一个新的 buffer 出来, 它和原 buffer 是共享数据的. 所以说调用这些方法消耗是很低的, 并没有开辟新的空间去存储, 但是修改后会影响原 buffer. 这种方法也就是咱们俗称的浅拷贝.
要想进行深拷贝, 这里可以调用 copy() 和 copy(index,length) 方法, 使用方法和上面介绍的一致, 但是会进行内存复制工作, 效率很低.
测试 demo:
- ByteBuf heapBuf = Unpooled.buffer(5);
- heapBuf.writeByte(1);
- heapBuf.writeByte(1);
- heapBuf.writeByte(1);
- heapBuf.writeByte(1);
- // 直接拷贝整个 buffer
- ByteBuf duplicate = heapBuf.duplicate();
- duplicate.setByte(0, 2);
- System.err.println("duplicate:" + duplicate.getByte(0) + "====heapBuf:" + heapBuf.getByte(0));//duplicate: 2====heapBuf: 2
- // 拷贝 buffer 中已经写了的数据
- ByteBuf slice = heapBuf.slice();
- System.err.println("slice capacity:" + slice.capacity());//slice capacity: 4
- slice.setByte(2, 5);
- ByteBuf slice1 = heapBuf.slice(0, 3);
- System.err.println("slice1 capacity:"+slice1.capacity());//slice1 capacity: 3
- System.err.println("duplicate:" + duplicate.getByte(2) + "====heapBuf:" + heapBuf.getByte(2));//duplicate: 5====heapBuf: 5
上面的所有测试代码均可以在我的中获取 (netty 中的 buffer 模块).
End
来源: https://juejin.im/post/5c468f7be51d45524976275d