Tomcat 对 BIO 和 NIO 两种模型都进行了实现, 其中 BIO 的实现理解起来比较简单, 而 NIO 的实现就比较复杂了, 并且它跟常用的 Reactor 模型也略有不同, 具体设计如下:
可以看出多了一个 BlockPoller 的设计, 这是因为在 Servlet 规范中 ServletInputStream 和 ServletOutputStream 是阻塞的, 所以请求体和响应体的读取和发送需要阻塞处理. 请求行读取和 SSL 握手使用非阻塞的 Poller 处理. 一次连接基本的处理流程是:
Acceptor 接收 TCP 连接, 并将其注册到 Poller 上
Poller 发现通道有就绪的 I/O 事件, 将事件分配给线程池中的线程处理
线程池线程首先在 Poller 上非阻塞完成请求行和 SSL 握手的处理, 然后通过容器调用 Servlet, 生成响应, 最后如果需要读取请求体或者发送响应, 那就会将通道注册到 BlockPoller 上模拟阻塞完成
接下来分析核心代码的实现, 源码来自 Tomcat 6.0.53 版本, 之所以使用这个版本是因为看起来简单直观没有太多的抽象, 也不影响来理解核心的处理逻辑. 首先看下连接处理的方法调用情况, 可右键直接打开图片查看大图:
相关类或接口的功能如下:
Acceptor: 阻塞监听和接收通道连接
Poller: 事件多路复用器, 通知 I/O 事件的发生并分配合适的处理器
PollerEvent: 是对通道, SelectionKey 的附加对象和通道关注事件的封装
SocketProcessor: 线程池调度单元, 它处理 SSL 握手, 调用 Handler 解析协议
Handler: 通道处理接口, 用于适配多种协议, 如 HTTP,AJP
NioEndpoint: 服务启停初始化的入口
NioSelectorPool: 提供一个阻塞读写使用的 Selector 池和一个单例 Selector
NioBlockingSelector: 提供阻塞读和写的方法
BlockPoller: 与 NioBlockingSelector 配合完成模拟阻塞
NioChannel: 封装 SocketChannel 和读写使用的 ByteBuffer
KeyAttachment: Key 的附加对象, 它包含通道最后访问时间和用于模拟阻塞使用的读写闭锁
1. Acceptor 注册通道到 Poller 上
Acceptor 和 Poller 分属两个不同的线程, 通常情况下 Poller 阻塞在 select() 方法的调用上, 此方法会锁住内部的 publicKeys 集合, 所以 Acceptor 接收到通道连接不能直接注册到 Poller 上, 否则会造成死锁. Tomcat 使用生产者 - 消费者模式来进行并发协作, 缓冲区使用的是 ConcurrentLinkedQueue 无界队列.
Acceptor 接收到连接的 SocketChannel 后, 将其配置成非阻塞模式, 封装成 NioChannel, 最后调用 getPoller0().register(NioChannel) 加入到某个 Poller 的事件队列中.
- public void register(final NioChannel socket) {
- socket.setPoller(this); // 关联此 Poller
- KeyAttachment key = keyCache.poll();
- final KeyAttachment ka = key!=null?key:new KeyAttachment();
- // 重置或者初始化 KeyAttachment 对象
- ka.reset(this,socket,getSocketProperties().getSoTimeout());
- PollerEvent r = eventCache.poll();
- // 声明此通道关注的事件
- ka.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into.
- // 将此通道和 SelectionKey 附件对象封装成 PollerEvent 对象
- if ( r==null) r = new PollerEvent(socket,ka,OP_REGISTER);
- else r.reset(socket,ka,OP_REGISTER);
- // 加入到 Poller 的 events 队列中
- addEvent(r);
- }
- public void addEvent(Runnable event) {
- events.offer(event); // 插入队列
- if ( wakeupCounter.incrementAndGet() == 0 )
- selector.wakeup(); // 唤醒 Selector
- }
Poler 有个 events() 方法, 用于遍历事件队列进行处理, events() 会在 select 调用超时或者被唤醒且没有通道发生 I/O 事件时被调用, 代码如下:
- public boolean events() {
- boolean result = false;
- Runnable r = null;
- // 遍历事件队列
- while ( (r = events.poll()) != null ) {
- result = true;// 有事件待处理
- try {
- r.run(); // 本质调用的是 PollerEvent.run()
- if ( r instanceof PollerEvent ) {
- // 重置并缓存 PollerEvent 对象
- ((PollerEvent)r).reset();
- eventCache.offer((PollerEvent)r);
- }
- } catch ( Throwable x ) {
- log.error("",x);
- }
- }
- return result;
- }
可以看出这里有个关键对象 PollerEvent, 它内部有个 interestOps 属性, 表示要处理的事件类型, 它有三个可能的值分别是:
NioEndpoint.OP_REGISTER: 通道注册事件
SelectionKey.OP_READ: 通道重新声明在 Poller 上关注读事件
SelectionKey.OP_WRITE: 通道重新声明在 Poller 上关注写事件
OP_REGISTER 的处理就是将通道注册到 Selector 上的最终实现, 代码如下:
- if ( interestOps == OP_REGISTER ) {
- try {
- // 将 SocketChannel 注册到 Poller 的 Selector 上并指定关注的事件和附加对象
- socket.getIOChannel().register(socket.getPoller().getSelector(), SelectionKey.OP_READ, key);
- } catch (Exception x) {
- log.error("", x);
- }
- }
至此已完成了通道注册, 接下来看一下 PollerEvent 为什么还要处理 OP_READ 和 OP_WRITE 事件.
2. PollerEvent 对 OP_READ 和 OP_WRITE 的处理
PollerEvent(又或者说 Poller)要处理读写事件, 就是因为程序需要一次非阻塞的读或写操作. 一开始通道是在 Poller 上声明关注的事件, 但是在发生 I/O 事件后, Poller 就会把此通道就绪的事件从它关注的事件中移除(原因见下文), 所以如果需要非阻塞的读或写, 只能再次在这个 Poller 上重新声明.
解析请求行是非阻塞的, 解析过程中, 由于 TCP 存在粘包 / 拆包的问题, 可能导致数据读取不完整, 需要再次从通道读取, 此时就要在关联的 Poller 上重新关注读事件, 核心代码:
- // 拿到通道在 Poller 上对应的 SelectionKey
- final SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());
- try {
- boolean cancel = false;
- if (key != null) {
- ...
- // 将 interestOps 合并到 key 现有关注的事件集合中
- int ops = key.interestOps() | interestOps;
- // 更新 key 和 附加对象关注的操作
- att.interestOps(ops);
- key.interestOps(ops);
- att.setCometOps(ops);
- } else {
- cancel = true;
- }
- }catch (CancelledKeyException ckx) {}
3. Poller 对 I/O 事件的处理
Poller 就是 Reactor, 主要功能是将就绪的 SelectionKey 分配给处理器处理, 此外它还检查通道是否超时. 它在调用 select 方法时会根据条件确定是阻塞还是非阻塞, 代码如下:
- if ( !close ) {
- if (wakeupCounter.getAndSet(-1)> 0) {
- // wakeupCounter 大于 0, 意味着 Acceptor 接收了大量连接, 产生大量 PollerEvent 急
- // 需 Poller 消费处理, 此时进行一次非阻塞调用
- keyCount = selector.selectNow();// 非阻塞直接返回
- } else {
- // wakeupCounter 等于 0, 阻塞等待 IO 事件发生或被唤醒
- keyCount = selector.select(selectorTimeout);
- }
- wakeupCounter.set(0);
- }
当有通道 I/O 事件就绪时, Poller 将会创建一个 SocketProcessor 提交线程池处理, 具体代码不再贴出. 在这个过程中有一个将当前就绪的事件从 SelectionKey 中移除的操作, 这是为了后续能够在 BlockPoller 上阻塞读写时, 防止多个线程的干扰, 具体代码如下:
- protected void unreg(SelectionKey sk, KeyAttachment attachment, int readyOps) {
- // 取反再与 - 表示从 sk.interestOps() 中清除 readyOps 所在的位
- reg(sk,attachment,sk.interestOps()& (~readyOps));
- }
- protected void reg(SelectionKey sk, KeyAttachment attachment, int intops) {
- sk.interestOps(intops);
- attachment.interestOps(intops);
- //attachment.setCometOps(intops);
- }
检查超时的方法是 Poller.timedout(keyCount, hasEvents), 它在 Poller 的每次循环上都被调用, 但不是每次都处理超时, 因为这会产生过多的负载, 而超时可等待几秒钟再超时也没事. Poler 有一个名为 nextExpiration 的成员变量, 它表示检查超时的最短时间间隔, 在这个时间内, 如果只是 select() 调用超时 (表示负载不大) 会执行处理超时.
4. SocketProcessor 的处理
SocketProcessor 处理 SSL 握手和调用 Handler 进行实际的 I/O 操作. Handler 的子类 Http11ConnectionHandler 会创建 一个 Http11NioProcessor 对象最终处理 Socket, 这里不分析具体的协议处理, 来看看几种处理结果:
- public SocketState process(NioChannel socket) {
- Http11NioProcessor processor = null;
- try {
- processor = connections.remove(socket);
- ...
- SocketState state = processor.process(socket);
- if (state == SocketState.LONG) {
- // 在处理 request 和生成 response 之间, 保持 socket 和此 processor 的关联
- connections.put(socket, processor);
- // 通常是收到了不完整的请求行, 再次以 OP_READ 注册到 Poller 上
- socket.getPoller().add(socket);
- } else if (state == SocketState.OPEN) {
- // 长连接, Http 保活, 回收 processor
- release(socket, processor);
- // 此时已处理一个完整的请求并响应, 再次注册到 Poller 上, 等待处理下个请求
- socket.getPoller().add(socket);
- } else if (state == SocketState.SENDFILE) {
- // 处理文件
- connections.put(socket, processor);
- } else {
- // 连接关闭, 回收 processor
- release(socket, processor);
- }
- return state;
- } catch (...) {...}
- release(socket, processor);
- return SocketState.CLOSED;
- }
5. 模拟阻塞的实现
模拟阻塞是通过 NioBlockingSelector 和 BlockPoller, 以及 KeyAttachment 中的两个 CountDownLatch 读写闭锁合作完成. 这里分析阻塞读, 阻塞写的实现类似. 一般的, 在读取 POST 请求参数时会使用模拟阻塞完成, 来看下 NioBlockingSelector.read() 方法的具体实现:
- public int read(ByteBuffer buf, NioChannel socket, long readTimeout) throws IOException {
- // 拿到通道在 Poller 上注册的 key
- SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());
- if ( key == null ) throw new IOException("Key no longer registered");
- KeyReference reference = new KeyReference();
- // key 的附加对象
- KeyAttachment att = (KeyAttachment) key.attachment();
- int read = 0; // 读取的字节数
- boolean timedout = false; // 是否超时
- int keycount = 1; //assume we can write 假设通道可读
- long time = System.currentTimeMillis(); //start the timeout timer
- try {
- while ( (!timedout) && read == 0) {
- if (keycount> 0) { //only read if we were registered for a read
- // 尝试读取一次, 如果通道无数据可读则返回 0, 若连接断开则返回 -1
- int cnt = socket.read(buf);
- if (cnt == -1) throw new EOFException();
- read += cnt;
- if (cnt> 0) break;
- }
- try {
- // 初始化读闭锁
- if ( att.getReadLatch()==null || att.getReadLatch().getCount()==0) att.startReadLatch(1);
- // 将此通道注册到 BlockPoller, 关注读取事件
- poller.add(att,SelectionKey.OP_READ, reference);
- // 阻塞等待通道可读
- att.awaitReadLatch(readTimeout,TimeUnit.MILLISECONDS);
- }catch (InterruptedException ignore) {
- Thread.interrupted();
- }
- if ( att.getReadLatch()!=null && att.getReadLatch().getCount()> 0) {
- // 被打断了, 但是没有接收到 blockPoller 的提醒
- keycount = 0;
- // 继续循环等待可读
- }else {
- // 通道可读, 重置读闭锁
- keycount = 1;
- att.resetReadLatch();
- }
- if (readTimeout> 0 && (keycount == 0)) // 如果超时了, 则不再读取, 抛异常
- timedout = (System.currentTimeMillis() - time)>= readTimeout;
- } //while
- if (timedout)
- throw new SocketTimeoutException();
- } finally {
- poller.remove(att,SelectionKey.OP_READ); // 移除注册
- if (timedout && reference.key!=null) {
- poller.cancelKey(reference.key); // 超时取消
- }
- reference.key = null;
- }
- return read;
- }
BlockPoller 实现逻辑与 Poller 大致相同, 不同的地方在于对就绪 key 的处理, 核心代码如下:
- Iterator<SelectionKey> iterator = keyCount> 0 ? selector.selectedKeys().iterator() : null;
- while (run && iterator != null && iterator.hasNext()) {
- SelectionKey sk = iterator.next();
- KeyAttachment attachment = (KeyAttachment)sk.attachment();
- try {
- attachment.access();
- iterator.remove();
- // 移除已就绪的事件
- sk.interestOps(sk.interestOps() & (~sk.readyOps()));
- // 可读或可写时减少对应闭锁的值, 此时阻塞在 NioBlockingSelector.read() 上的线程继续执行读取
- if ( sk.isReadable() ) {
- countDown(attachment.getReadLatch());
- }
- if (sk.isWritable()) {
- countDown(attachment.getWriteLatch());
- }
- }catch (CancelledKeyException ckx) {
- if (sk!=null) sk.cancel();
- countDown(attachment.getReadLatch());
- countDown(attachment.getWriteLatch());
- }
- }//while
6. 小结
至此, 本文对连接的接收, 分发以及模拟阻塞的核心代码实现进行了分析, 为了更好的理解内部流程, 尽可能的使用简洁的代码仿写了这部分功能.
源码地址: https://github.com/tonwu/rxtomcat 位于 rxtomcat.NET 模块
7. Tomcat 8.5 版本变化
7.1 替换缓存数据结构
Tomcat 对 PollerEvent,NioChannel 和 Processor 对象进行了缓存, 目的是减少 GC 提高系统性能, 这是一种用空间换时间, 被称为对象池的优化手段. 从版本 8.* 开始, 缓存数据结构从 ConcurrentLinkedQueue 换成了自定义的同步栈 SynchronizedStack.SynchronizedStack 的 javadoc 明确说明:
当需要创建一个无需缩小的可重用对象池时, 这是 ConcurrentLinkedQueue 无 GC 的主要替代方案. 目的是尽可能快地以最少的垃圾提供最少的所需功能.
在这个特殊的情况下, ConcurrentLinkedQueue 有很多功能是不需要的, 所以就实现了一个有重点的类, 可以专注完成一件事, 来提升性能. 但它不是 ConcurrentLinkedQueue 的替代品.
7.2 LimitLatch
Acceptor 在接收连接前添加了一个 LimitLatch(类似信号量)来控制总连接数. 分析下如果不加有什么现象, 在极端情况下, 线程池没有空闲线程并且它内部的队列已满, 当有通道发生可读或可写事件时, Poller 会关闭此通道, 此时系统负载已达到最高, 如果 Acceptor 还在继续接收连接并请求注册, 而不加限制, 那么就会一直重复 PollerEvent 入队出队和 Poller 单纯关闭通道的操作, 浪费系统资源.
来源: https://www.cnblogs.com/wskwbog/p/10542194.html