卡车
卡车指的是 java 原生类 ByteBuffer, 这兄弟在 NIO 界大名鼎鼎, 与 ChannelSelector 的铁三角组合构筑了 NIO 的核心之所以称它为卡车, 只因编程思想中有段比喻:
我们可以把它想象成一个煤矿, 通道 (Channel) 是一个包含煤层 (数据) 的矿藏, 而缓冲器 (ByteBuffer) 则是派送到矿藏中的卡车卡车满载煤炭而归, 我们再从卡车上获得煤炭也就是说, 我们并没有直接和通道交互; 我们只是和缓冲器交互, 并把缓冲器派送到通道
那么升级版卡车, 自然指的就是 ByteBuf
结构和功能
Netty 之所以再次打造了升级版的缓冲器, 显然是不满 ByteBuffer 中的某些弊端
ByteBuffer 长度固定
使用者经常需要调用 flip()rewind()方法调整 position 的位置, 不方便
API 功能有限
ByteBuffer 中有三个重要的位置属性: positionlimitcapacity, 一个写操作之后大概是这样的
如若想进行读操作, 那么 flip()的调用是少不了的, 从图中不难看出, 目前 position 到 limit 啥也没有
调用 flip()之后则不一样了(我们不一样~):
而 ByteBuf 的人设则不相同, 它的两个位置属性 readIndexwriteIndex, 分别和读操作写操作相对应写不操作 readIndex, 读不操作 writeIndex, 两者不会相互干扰这里盗几张图说明下好了:
初始状态
写入 N 个字节
读取 M 个 (M<N) 字节
释放已读缓存 discardReadBytes
重点在于 ByteBuf 的 read 和 write 相关方法, 已经封装好了对 readIndexwriteIndex 位置索引的操作, 不需要使用者繁琐的 flip()且 write()方法中, ByteBuf 设计了自动扩容, 这一点后续章节会进行详细说明
功能方面, 主要关注两点:
Derived buffers, 类似于数据库视图 ByteBuf 提供了多个接口用于创建某 ByteBuf 的视图或复制 ByteBuf:
duplicate: 返回当前 ByteBuf 的复制对象, 缓冲区内容共享(修改复制的 ByteBuf, 原来的 ByteBuf 内容也随之改变), 索引独立维护
copy: 内容和索引都独立
slice: 返回当前 ByteBuf 的可读子缓冲区, 内容共享, 索引独立
转换成 ByteBuffer
nio 的 SocketChanel 进行网络操作, 还是操作的 java 原生的 ByteBuffer, 所以 ByteBuf 转换成 ByteBuffer 的需求还是有市场的
ByteBuffer nioBuffer(): 当前 ByteBuf 的可读缓冲区转换成 ByteBuffer, 缓冲区内容共享, 索引独立需要指出的是, 返回后的 ByteBuffer 无法感知原 ByteBuf 的动态扩展操作
ByteBuf 星系
称之为星系, 是因为 ByteBuf 一脉涉及到的类实在太多了, 但多而不乱, 归功于类关系结构的设计
类关系结构
依然盗图:
从内存分配角度, ByteBuf 可分为两类
堆内存 HeapByteBuf 字节缓冲区
直接内存 DirectByteBuf 字节缓冲区
从内存回收角度, ByteBuf 也可分为两类:
普通缓冲区 UnpooledByteBuf
池化缓冲区 PooledByteBuf
纵观该关继承节构, 给我留下的印象就是每层各司其职: 读操作以及其它的一些公共功能由父类实现, 差异化功能由子类实现
下面聊下笔者感兴趣的几个点
AbstractByteBuf 的写操作簇
AbstractByteBuf 的写操作有很多, 这里以
writeBytes(byte[] src, int srcIndex, int length)
方法为例
- @Override
- public ByteBuf writeBytes(byte[] src, int srcIndex, int length) {
- ensureWritable(length); // 一确保可写, 对边界进行验证
- setBytes(writerIndex, src, srcIndex, length); // 二写入操作, 不同类型的子类实现方式不同
- writerIndex += length;
- return this;
- }
注释部分分别展开看下
注释一确保可写, 对边界进行验证
跟调用栈
ensureWritable -> ensureWritable0
, 观察 ensureWritable0 方法
- final void ensureWritable0(int minWritableBytes) {
- ensureAccessible(); // 确保对象可用
- if (minWritableBytes <= writableBytes()) {
- return;
- }
- if (minWritableBytes > maxCapacity - writerIndex) {
- throw new IndexOutOfBoundsException(String.format(
- "writerIndex(%d) + minWritableBytes(%d) exceeds maxCapacity(%d): %s",
- writerIndex, minWritableBytes, maxCapacity, this));
- }
- // Normalize the current capacity to the power of 2.
- // 三计算扩容量
- int newCapacity = alloc().calculateNewCapacity(writerIndex + minWritableBytes, maxCapacity);
- // Adjust to the new capacity.
- capacity(newCapacity); // 四内存分配
- }
比较
先对要写入的字节数 minWritableBytes 进行判断: 如果 minWritableBytes < capacity - writeIndex, 那么很好, 不需要扩容; 如果 minWritableBytes > maxCapacity - writerIndex, 也就是要写入字节数超过了允许的最大字节数, 直接抛出越界异常 IndexOutOfBoundsException
眼尖的朋友可能发现了, 两次用来判断的上界并不相同 capacity / maxCapacitymaxCapacity 是 AbstractByteBuf 的属性, 而 capacity 设定在其子类中简单看下
UnpooledDirectByteBuf
的构造函数:
- public UnpooledDirectByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
- super(maxCapacity); // 为 AbstractByteBuf 的 maxCapacity 属性赋值
- /**
- *
- * 省略无关部分
- */
- setByteBuffer(ByteBuffer.allocateDirect(initialCapacity)); //capacity 赋值
- }
也就是说, ByteBuf 的结构, 可看成这样:
扩容计算
- @Override
- public int calculateNewCapacity(int minNewCapacity, int maxCapacity) {
- if (minNewCapacity < 0) {
- throw new IllegalArgumentException("minNewCapacity:" + minNewCapacity + "(expected: 0+)");
- }
- if (minNewCapacity > maxCapacity) {
- throw new IllegalArgumentException(String.format(
- "minNewCapacity: %d (expected: not greater than maxCapacity(%d)",
- minNewCapacity, maxCapacity));
- }
- /**
- * 设置阀值为 4MB
- * 1. 如果扩展的容量大于阀值, 对扩张后的内存和最大内存进行比较: 大于最大长度使用最大长度, 否则步进 4M
- * 2. 如果需要扩展的容量小于阀值, 以 64 进行计数倍增: 64->128->256; 为防止倍增过猛, 最后与最大值再次进行比较
- */
- final int threshold = CALCULATE_THRESHOLD; // 4 MiB page
- if (minNewCapacity == threshold) {
- return threshold;
- }
- // If over threshold, do not double but just increase by threshold.
- if (minNewCapacity > threshold) {
- int newCapacity = minNewCapacity / threshold * threshold;
- if (newCapacity > maxCapacity - threshold) {
- newCapacity = maxCapacity;
- } else {
- newCapacity += threshold;
- }
- return newCapacity;
- }
- // Not over threshold. Double up to 4 MiB, starting from 64.
- int newCapacity = 64;
- while (newCapacity < minNewCapacity) {
- newCapacity <<= 1;
- }
- return Math.min(newCapacity, maxCapacity);
- }
具体的扩容策略, 已拍入注释中, 尽可查看!
注释二写入操作, 不同类型的子类实现方式不同
对比下
UnpooledDirectByteBuf
和 UnpooledHeapByteBuf 的实现
- UnpooledDirectByteBuf
- @Override
- public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) {
- checkSrcIndex(index, length, srcIndex, src.length);
- ByteBuffer tmpBuf = internalNioBuffer(); // 分配
- tmpBuf.clear().position(index).limit(index + length);
- tmpBuf.put(src, srcIndex, length);
- return this;
- }
- UnpooledHeapByteBuf
- @Override
- public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) {
- checkSrcIndex(index, length, srcIndex, src.length);
- System.arraycopy(src, srcIndex, array, index, length); // 分配
- return this;
- }
篇幅有限, 不展开说了, 结论就是:
UnpooledDirectByteBuf 的底层实现为
ByteBuffer.allocateDirect
, 分配时复制体通过 buffer.duplicate()获取复制体; 而 UnpooledHeapByteBuf 的底层实现为 byte[], 分配时通过 System.arraycopy 方法拷贝副本
- AbstractReferenceCountedByteBuf
- AbstractReferenceCountedByteBuf
的名字就挺有意思引用计数, 一副 JVM 垃圾回收的即视感而事实上, 也差不多一个意思
看下类属性:
- private static final AtomicIntegerFieldUpdater<AbstractReferenceCountedByteBuf> refCntUpdater =
- AtomicIntegerFieldUpdater.newUpdater(AbstractReferenceCountedByteBuf.class, "refCnt");
- private volatile int refCnt;
以原子方式更新属性的
AtomicIntegerFieldUpdater
起了关键作用, 将会对 volatile 修饰的 refCnt 进行更新, 见 retain 方法(下面展示的是 retain 的关键部分 retain0):
- private ByteBuf retain0(final int increment) {
- int oldRef = refCntUpdater.getAndAdd(this, increment);
- if (oldRef <= 0 || oldRef + increment < oldRef) {
- // Ensure we don't resurrect (which means the refCnt was 0) and also that we encountered an overflow.
- refCntUpdater.getAndAdd(this, -increment);
- throw new IllegalReferenceCountException(oldRef, increment);
- }
- return this;
- }
源码阅读很有意思的一点就是能看到些自己不熟悉的类, 比如
AtomicIntegerFieldUpdater
我以前就没接触过!
内存池
内存池可有效的提升效率, 道理和线程池数据库连接池相通, 即省去了重复创建销毁的过程
到目前为止, 看到的都是 ByteBuf 中的各 Unpooled 实现, 而池化版的 ByteBuf 没怎么提过为何如此? 因为池化的实现较复杂, 以我现在的功力尚不能完全掌握透彻
先聊下内存池的设计思路, 涨涨姿势:
为了集中集中管理内存的分配和释放, 同事提高分配和释放内存时候的性能, 很多框架和应用都会通过预先申请一大块内存, 然后通过提供相应的分配和释放接口来使用内存这样一来, 堆内存的管理就被集中到几个类或函数中, 由于不再频繁使用系统调用来申请和释放内存, 应用或系统的性能也会大大提高 节选自 Netty 权威指南
Netty 的 ByteBuf 内存池也是按照这个思路搞的首先, 看下官方注释:
/**
* Notation: The following terms are important to understand the code
* > page - a page is the smallest unit of memory chunk that can be allocated
* > chunk - a chunk is a collection of pages
* > in this code chunkSize = 2^{maxOrder} * pageSize
*/
这里面有两个重要的概念 page(页)和 chunk(块),chunk 管理多个 page 组成二叉树结构, 大概就是这个样子:
选择二叉树是有原因的:
/**
* To search for the first offset in chunk that has at least requested size available we construct a
* complete balanced binary tree and store it in an array (just like heaps) - memoryMap
*/
为了在 chunk 中找到至少可用的 size 的偏移量 offset
继线性结构后, 人们又发明了树形结构的意义在于提升查询效率, 也同样是这里选择二叉树的原因
小于一个 page 的内存, 直接在 PoolSubpage 中分配完成
某块内存是否分配, 将通过状态位进行标识
后记
一如既往的啰嗦几句, 最近工作忙, 更新文章较慢, 希望自己能坚持, 如发现问题望大家指正!
thanks..
来源: https://segmentfault.com/a/1190000013523875