《Scalable IO in Java》 是 java.util.concurrent 包的作者, 大师 Doug Lea 关于分析与构建可伸缩的高性能 IO 服务的一篇经典文章, 在文章中 Doug Lea 通过各个角度, 循序渐进的梳理了服务开发中的相关问题, 以及在解决问题的过程中服务模型的演变与进化, 文章中基于 Reactor 反应器模式的几种服务模型架构, 也被 Netty,Mina 等大多数高性能 IO 服务框架所采用, 因此阅读这篇文章有助于你更深入了解 Netty,Mina 等服务框架的编程思想与设计模式.
下面是我对《Scalable IO in Java》原文核心内容的一个翻译, 原文连接: http://gee.cs.oswego.edu/dl/cpjslides/nio.pdf
一, 网络服务
在一般的网络或分布式服务等应用程序中, 大都具备一些相同的处理流程, 例如:
1 读取请求数据;
2 对请求数据进行解码;
3 对数据进行处理;
4 对回复数据进行编码;
5 发送回复;
当然在实际应用中每一步的运行效率都是不同的, 例如其中可能涉及到 xml 解析, 文件传输, web 页面的加载, 计算服务等不同功能.
1, 传统的服务设计模式
在一般的网络服务当中都会为每一个连接的处理开启一个新的线程, 我们可以看下大致的示意图:
每一个连接的处理都会对应分配一个新的线程, 下面我们看一段经典的 Server 端 Socket 服务代码:
- class Server implements Runnable {
- public void run() {
- try {
- ServerSocket ss = new ServerSocket(PORT);
- while (!Thread.interrupted())
- new Thread(new Handler(ss.accept())).start();
- // or, single-threaded, or a thread pool
- } catch (IOException ex) {
- /* ... */ }
- }
- static class Handler implements Runnable {
- final Socket socket;
- Handler(Socket s) {
- socket = s;
- }
- public void run() {
- try {
- byte[] input = new byte[MAX_INPUT];
- socket.getInputStream().read(input);
- byte[] output = process(input);
- socket.getOutputStream().write(output);
- } catch (IOException ex) {
- /* ... */ }
- }
- private byte[] process(byte[] cmd) {
- /* ... */ }
- }
- }
2, 构建高性能可伸缩的 IO 服务
在构建高性能可伸缩 IO 服务的过程中, 我们希望达到以下的目标:
1 能够在海量负载连接情况下优雅降级;
2 能够随着硬件资源的增加, 性能持续改进;
3 具备低延迟, 高吞吐量, 可调节的服务质量等特点;
而分发处理就是实现上述目标的一个最佳方式.
3, 分发模式
分发模式具有以下几个机制:
1 将一个完整处理过程分解为一个个细小的任务;
2 每个任务执行相关的动作且不产生阻塞;
3 在任务执行状态被触发时才会去执行, 例如只在有数据时才会触发读操作;
在一般的服务开发当中, IO 事件通常被当做任务执行状态的触发器使用, 在 hander 处理过程中主要针对的也就是 IO 事件;
java.nio 包就很好的实现了上述的机制:
1 非阻塞的读和写
2 通过感知 IO 事件分发任务的执行
所以结合一系列基于事件驱动模式的设计, 给高性能 IO 服务的架构与设计带来丰富的可扩展性;
二, 基于事件驱动模式的设计
基于事件驱动的架构设计通常比其他架构模型更加有效, 因为可以节省一定的性能资源, 事件驱动模式下通常不需要为每一个客户端建立一个线程, 这意味这更少的线程开销, 更少的上下文切换和更少的锁互斥, 但任务的调度可能会慢一些, 而且通常实现的复杂度也会增加, 相关功能必须分解成简单的非阻塞操作, 类似与 GUI 的事件驱动机制, 当然也不可能把所有阻塞都消除掉, 特别是 GC, page faults(内存缺页中断) 等. 由于是基于事件驱动的, 所以需要跟踪服务的相关状态 (因为你需要知道什么时候事件会发生);
下图是 AWT 中事件驱动设计的一个简单示意图, 可以看到, 在不同的架构设计中的基于事件驱动的 IO 操作使用的基本思路是一致的;
三, Reactor 模式
Reactor 也可以称作反应器模式, 它有以下几个特点:
1 Reactor 模式中会通过分配适当的 handler(处理程序) 来响应 IO 事件, 类似与 AWT 事件处理线程;
2 每个 handler 执行非阻塞的操作, 类似于 AWT ActionListeners 事件监听
3 通过将 handler 绑定到事件进行管理, 类似与 AWT addActionListener 添加事件监听;
1, 单线程模式
下图展示的就是单线程下基本的 Reactor 设计模式
首先我们明确下 java.nio 中相关的几个概念:
Channels
支持非阻塞读写的 socket 连接;
Buffers
用于被 Channels 读写的字节数组对象
Selectors
用于判断 channle 发生 IO 事件的选择器
SelectionKeys
负责 IO 事件的状态与绑定
Ok, 接下来我们一步步看下基于 Reactor 模式的服务端设计代码示例:
第一步 Rector 线程的初始化
- class Reactor implements Runnable {
- final Selector selector;
- final ServerSocketChannel serverSocket;
- Reactor(int port) throws IOException {
- selector = Selector.open();
- serverSocket = ServerSocketChannel.open();
- serverSocket.socket().bind(new InetSocketAddress(port));
- serverSocket.configureBlocking(false);
- SelectionKey sk = serverSocket.register(selector, SelectionKey.OP_ACCEPT); // 注册 accept 事件
- sk.attach(new Acceptor()); // 调用 Acceptor() 为回调方法
- }
- public void run() {
- try {
- while (!Thread.interrupted()) {// 循环
- selector.select();
- Set selected = selector.selectedKeys();
- Iterator it = selected.iterator();
- while (it.hasNext())
- dispatch((SelectionKey)(it.next()); //dispatch 分发事件
- selected.clear();
- }
- } catch (IOException ex) { /* ... */ }
- }
- void dispatch(SelectionKey k) {
- Runnable r = (Runnable)(k.attachment()); // 调用 SelectionKey 绑定的调用对象
- if (r != null)
- r.run();
- }
- // Acceptor 连接处理类
- class Acceptor implements Runnable { // inner
- public void run() {
- try {
- SocketChannel c = serverSocket.accept();
- if (c != null)
- new Handler(selector, c);
- }
- catch(IOException ex) { /* ... */ }
- }
- }
- }
第二步 Handler 处理类的初始化
- final class Handler implements Runnable {
- final SocketChannel socket;
- final SelectionKey sk;
- ByteBuffer input = ByteBuffer.allocate(MAXIN);
- ByteBuffer output = ByteBuffer.allocate(MAXOUT);
- static final int READING = 0, SENDING = 1;
- int state = READING;
- Handler(Selector sel, SocketChannel c) throws IOException {
- socket = c;
- c.configureBlocking(false);
- // Optionally try first read now
- sk = socket.register(sel, 0);
- sk.attach(this); // 将 Handler 绑定到 SelectionKey 上
- sk.interestOps(SelectionKey.OP_READ);
- sel.wakeup();
- }
- boolean inputIsComplete() { /* ... */ }
- boolean outputIsComplete() { /* ... */ }
- void process() { /* ... */ }
- public void run() {
- try {
- if (state == READING) read();
- else if (state == SENDING) send();
- } catch (IOException ex) { /* ... */ }
- }
- void read() throws IOException {
- socket.read(input);
- if (inputIsComplete()) {
- process();
- state = SENDING;
- // Normally also do first write now
- sk.interestOps(SelectionKey.OP_WRITE);
- }
- }
- void send() throws IOException {
- socket.write(output);
- if (outputIsComplete()) sk.cancel();
- }
- }
下面是基于 GoF 状态对象模式对 Handler 类的一个优化实现, 不需要再进行状态的判断.
- class Handler { // ...
- public void run() { // initial state is reader
- socket.read(input);
- if (inputIsComplete()) {
- process();
- sk.attach(new Sender());
- sk.interest(SelectionKey.OP_WRITE);
- sk.selector().wakeup();
- }
- }
- class Sender implements Runnable {
- public void run(){ // ...
- socket.write(output);
- if (outputIsComplete()) sk.cancel();
- }
- }
- }
2, 多线程设计模式
在多处理器场景下, 为实现服务的高性能我们可以有目的的采用多线程模式:
1, 增加 Worker 线程, 专门用于处理非 IO 操作, 因为通过上面的程序我们可以看到, 反应器线程需要迅速触发处理流程, 而如果处理过程也就是 process() 方法产生阻塞会拖慢反应器线程的性能, 所以我们需要把一些非 IO 操作交给 Woker 线程来做;
2, 拆分并增加反应器 Reactor 线程, 一方面在压力较大时可以饱和处理 IO 操作, 提高处理能力; 另一方面维持多个 Reactor 线程也可以做负载均衡使用; 线程的数量可以根据程序本身是 CPU 密集型还是 IO 密集型操作来进行合理的分配;
2.1 多线程模式
Reactor 多线程设计模式具备以下几个特点:
1 通过卸载非 IO 操作来提升 Reactor 线程的处理性能, 这类似与 POSA2 中 Proactor 的设计;
2 比将非 IO 操作重新设计为事件驱动的方式更简单;
3 但是很难与 IO 重叠处理, 最好能在第一时间将所有输入读入缓冲区;(这里我理解的是最好一次性读取缓冲区数据, 方便异步非 IO 操作处理数据)
4 可以通过线程池的方式对线程进行调优与控制, 一般情况下需要的线程数量比客户端数量少很多;
下面是 Reactor 多线程设计模式的一个示意图与示例代码 (我们可以看到在这种模式中在 Reactor 线程的基础上把非 IO 操作放在了 Worker 线程中执行):
- class Handler implements Runnable {
- // uses util.concurrent thread pool
- static PooledExecutor pool = new PooledExecutor(...);// 声明线程池
- static final int PROCESSING = 3;
- // ...
- synchronized void read() { // ...
- socket.read(input);
- if (inputIsComplete()) {
- state = PROCESSING;
- pool.execute(new Processer());// 处理程序放在线程池中执行
- }
- }
- synchronized void processAndHandOff() {
- process();
- state = SENDING; // or rebind attachment
- sk.interest(SelectionKey.OP_WRITE);
- }
- class Processer implements Runnable {
- public void run() {
- processAndHandOff();
- }
- }
- }
当你把非 IO 操作放到线程池中运行时, 你需要注意以下几点问题:
1 任务之间的协调与控制, 每个任务的启动, 执行, 传递的速度是很快的, 不容易协调与控制;
2 每个 hander 中 dispatch 的回调与状态控制;
3 不同线程之间缓冲区的线程安全问题;
4 需要任务返回结果时, 任务线程等待和唤醒状态间的切换;
为解决上述问题可以使用 PooledExecutor 线程池框架, 这是一个可控的任务线程池, 主函数采用 execute(Runnable r), 它具备以下功能, 可以很好的对池中的线程与任务进行控制与管理:
1 可设置线程池中最大与最小线程数;
2 按需要判断线程的活动状态, 及时处理空闲线程;
3 当执行任务数量超过线程池中线程数量时, 有一系列的阻塞, 限流的策略;
2.2 基于多个反应器的多线程模式
这是对上面模式的进一步完善, 使用反应器线程池, 一方面根据实际情况用于匹配调节 CPU 处理与 IO 读写的效率, 提高系统资源的利用率, 另一方面在静态或动态构造中每个反应器线程都包含对应的 Selector,Thread,dispatchloop, 下面是一个简单的代码示例与示意图 (Netty 就是基于这个模式设计的, 一个处理 Accpet 连接的 mainReactor 线程, 多个处理 IO 事件的 subReactor 线程):
- Selector[] selectors; // Selector 集合, 每一个 Selector 对应一个 subReactor 线程
- //mainReactor 线程
- class Acceptor { // ...
- public synchronized void run() {
- //...
- Socket connection = serverSocket.accept();
- if (connection != null)
- new Handler(selectors[next], connection);
- if (++next == selectors.length)
- next = 0;
- }
- }
在服务的设计当中, 我们还需要注意与 java.nio 包特性的结合:
一是注意线程安全, 每个 selectors 对应一个 Reactor 线程, 并将不同的处理程序绑定到不同的 IO 事件, 在这里特别需要注意线程之间的同步;
二是 java nio 中文件传输的方式:
1 Memory-mapped files 内存映射文件的方式, 通过缓存区访问文件;
2 Direct buffers 直接缓冲区的方式, 在合适的情况下可以使用零拷贝传输, 但同时这会带来初始化与内存释放的问题 (需要池化与主动释放);
以上就是对《Scalable IO in Java》中核心内容的译文, 限于本人各方面水平有限, 本次翻译也只是便于自己阅读与理解, 其中难免有翻译与认知错误的地方, 望请大家谅解, 如果对这方面的内容感兴趣还是建议大家去阅读原文.
来源: https://www.cnblogs.com/dafanjoy/p/11217708.html