NIO.2 概览
NIO.2 也就是人们常说的 AIO, 在 Java 7 中引入了 NIO 的改进版 NIO 2, 它是异步非阻塞的 IO 方式.
AIO 的核心概念就是发起非阻塞方式的 I/O 操作, 立即响应, 却不立即返回结果, 当 I/O 操作完成时通知.
这篇文章主要介绍 NIO 2 的异步通道 API 的一些内容, 后续文章再分析 NIO.2 的其他特性
异步通道 API
从 Java 7 开始, java.nio.channel 包中新增加 4 个异步通道:
- AsynchronousSocketChannel
- AsynchronousServerSocketChannel
- AsynchronousFileChannel
- AsynchronousDatagramChannel
这些类在风格上与 NIO 通道 API 相似, 他们共享相同的方法与参数结构体, 并且大多数对于 NIO 通道类可用的参数, 对于新的异步版本仍然可用.
异步通道 API 提供了两种对已启动异步操作的监测与控制机制:
第一种通过返回一个 java.util.concurrent.Future 对象来表示异步操作的结果
第二种是通过传递给操作一个新类的对象 java.nio.channels.CompletionHandler 来完成, 它会定义在操作完毕后所执行的处理程序方法.
Future
从 Java 1.5 开始, 引入了 Future 接口, 使用该接口可以在任务执行完毕之后得到任务执行结果. 在 NIO 2 中, Future 对象表示异步操作的结果, 假设我们要创建一个服务器来监听客户端连接, 打开 AsynchronousServerSocketChannel 并将其绑定到类似于 ServerSocketChannel 的地址:
- AsynchronousServerSocketChannel server
- = AsynchronousServerSocketChannel.open().bind(null);
复制代码
方法 bind() 将一个套接字地址作为其参数, 这里传递了一个 Null 地址, 它会自动将套接字绑定到本地主机地址, 并使用空闲的临时端口, 就像传统的 ServerSocket 设置 0 端口一样, 也是使用操作系统随机分配的临时端口. 然后调用服务器的 accept()方法:
Future<AsynchronousSocketChannel> future = server.accept();
复制代码
当我们在 NIO 中调用 ServerSocketChannel 的 accept()方法时, 它会阻塞, 直到从客户端收到传入连接. 但是 AsynchronousServerSocketChannel 的 accept() 方法会立即返回 Future 对象.
Future 对象的泛型类型是操作的返回类型, 在上面的例子, 它是 AsynchronousSocketChannel , 但它也可以是 Integer 或 String , 具体取决于操作的最终返回类型.
我们可以使用 Future 对象来查询操作的状态
future.isDone();
复制代码
如果基础操作已经完成, 则此 API 返回 true, 请注意, 在这种情况下, 完成可能意味着正常终止, 异常, 或者取消.
我们还可以明确检查操作是否被取消, 如果操作在正常完成之前被取消, 则它返回 true. 如下:
future.isCancelled();
复制代码
实际的取消操作, 如下:
future.cancel(true)
复制代码
cancel()方法可利用一个布尔标志来指出执行接受的线程是否可被中断.
要检索操作结果, 我们使用 get()方法, 该方法将阻塞等待结果的返回:
AsynchronousSocketChannel client= future.get();
复制代码
另外, 我们也可以设置阻塞时间, 下例设置为 10s:
AsynchronousSocketChannel worker = future.get(10, TimeUnit.SECONDS);
复制代码
CompletionHandler
使用 Future 来处理操作的替代方法是使用 CompletionHandler 类的回调机制. 异步通道允许指定完成处理程序以使用操作的结果:
- AsynchronousServerSocketChannel listener
- = AsynchronousServerSocketChannel.open().bind(null);
- listener.accept(
- attachment, new CompletionHandler<AsynchronousSocketChannel, Object>() {
- public void completed(
- AsynchronousSocketChannel client, Object attachment) {
- // do whatever with client
- }
- public void failed(Throwable exc, Object attachment) {
- // handle failure
- }
- });
复制代码
I/O 操作成功完成时, 将调用已完成的回调 API. 如果操作失败, 则调用失败的 API.
异步通道 API 实例
服务端 (with Future)
下面是使用 Future 的方式构建服务端.
- public class AsyncEchoServer {
- private AsynchronousServerSocketChannel server;
- private Future<AsynchronousSocketChannel> future;
- private AsynchronousSocketChannel worker;
- public AsyncEchoServer() throws IOException, ExecutionException, InterruptedException {
- System.out.println("Open Server Channel");
- server = AsynchronousServerSocketChannel.open().bind(new InetSocketAddress("127.0.0.1", 9090));
- future = server.accept();
- }
- public void runServer() throws ExecutionException, InterruptedException, IOException, TimeoutException {
- // 获取操作结果
- worker = future.get();
- if (worker != null && worker.isOpen()) {
- ByteBuffer buffer = ByteBuffer.allocate(100);
- // 将通道中的数据写入缓冲区
- worker.read(buffer).get(10,TimeUnit.SECONDS);
- System.out.println("received from client:" + new String(buffer.array()));
- }
- server.close();
- }
- public static void main(String[] args) throws InterruptedException, ExecutionException, IOException, TimeoutException {
- AsyncEchoServer server = new AsyncEchoServer();
- server.runServer();
- }
- }
复制代码
服务端(With CompletionHandler)
下面我们将了解如何使用 CompletionHandler 方法而不是 Future 方法实现相同的服务端代码.
- public class AsyncEchoServerWithCallBack {
- private AsynchronousServerSocketChannel server;
- private AsynchronousSocketChannel worker;
- private AsynchronousChannelGroup group;
- public AsyncEchoServerWithCallBack() throws IOException, ExecutionException, InterruptedException {
- System.out.println("Open Server Channel");
- group = AsynchronousChannelGroup.withFixedThreadPool(10, Executors.defaultThreadFactory());
- server = AsynchronousServerSocketChannel.open(group).bind(new InetSocketAddress("127.0.0.1", 9090));
- // 当有新连接建立时会调用 CompletionHandler 接口实现对象中的 completed()方法
- server.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>() {
- @Override
- public void completed(AsynchronousSocketChannel result, Object attachment) {
- if (server.isOpen()) {
- server.accept(null, this);
- }
- worker = result;
- if ((worker != null) && (worker.isOpen())) {
- ByteBuffer byteBuffer = ByteBuffer.allocate(100);
- worker.read(byteBuffer);
- System.out.println("received the client:"+new String(byteBuffer.array()));
- }
- }
- @Override
- public void failed(Throwable exc, Object attachment) {
- //TODO
- }
- });
- }
- public static void main(String[] args) throws InterruptedException, ExecutionException, IOException, TimeoutException {
- AsyncEchoServerWithCallBack server = new AsyncEchoServerWithCallBack();
- }
- }
复制代码
当有新连接建立时会调用 CompletionHandler 接口实现对象中的 completed()方法, 当出现错误时, 会调用 failed 方法.
accept 方法的第一个参数可以是一个任意类型的对象, 称为调用时的 "附加对象". 附件对象在 accept()方法调用时传入, 可以在 CompletionHandler 接口的实现对象中从 completed 和 failed 方法的参数 (attachment) 中获取, 这样就可以进行数据的传递. 使用 CompletionHandler 接口的方法都支持使用附件对象来传递数据.
AsynchronousChannelGroup 类
异步通道在处理 I/O 请求时, 需要使用一个 AsynchronousChannelGroup 类, 该类的对象表示的是一个异步通道的分组, 每一个分组都有一个线程池与之对应, 需要使用 AsynchronousChannelGroup 类的静态工厂方法 withFixedThreadPool,withCachedThreadPool 或者 withThreaPool 设置线程池. 这个线程池中的线程用来处理 I/O 事件. 多个异步通道可以共用一个分组的线程池资源.
调用 AsynchronousSocketChannel 和 AsynchronousServerSocketChannel 类的 open 方法 打开异步套接字通道时, 可以传入一个 AsynchronousChannelGroup 类的对象. 如果调用的 open 方法没有传入 AsynchronousChannelGroup 类的对象, 默认使用系统提供的分组, 系统分组对应的线程池中的线程是守护线程, 如果使用默认分组, 程序启动之后很快就退出了, 因为系统分组使用的守护线程不会阻止虚拟机的退出.
客户端
- public class AsyncEchoClient {
- private AsynchronousSocketChannel client;
- private Future<Void> future;
- public AsyncEchoClient() throws IOException {
- // 打开一个异步 channel
- System.out.println("Open client channel");
- client = AsynchronousSocketChannel.open();
- // 连接本地端口和地址, 在连接成功后不返回任何内容, 但是, 我们仍然可以使用 Future 对象来监视异步操作的状态
- System.out.println("Connect to server");
- future = client.connect(new InetSocketAddress("127.0.0.1", 9090));
- }
- /**
- * 向服务端发送消息
- *
- * @param message
- * @return
- */
- public void sendMessage(String message) throws ExecutionException, InterruptedException {
- if (!future.isDone()) {
- future.cancel(true);
- return;
- }
- // 将一个字节数组封装到 ByteBuffer 中
- ByteBuffer byteBuffer = ByteBuffer.wrap(message.getBytes());
- System.out.println("Sending message to the server");
- // 将数据写入通道
- int numberBytes = client.write(byteBuffer).get();
- byteBuffer.clear();
- }
- public static void main(String[] args) throws IOException, ExecutionException, InterruptedException {
- AsyncEchoClient client = new AsyncEchoClient();
- client.sendMessage("hello world");
- }
- }
复制代码
测试结果
客户端:
- Open client channel
- Connect to server
复制代码
服务端:
- Open Server Channel
- received the client: hello world
复制代码
Java NIO 2 异步 IO 的体现
我们都知道由 JDK 1.7 提供的 NIO 2.0 新增了异步的套接字通道, 它是真正的异步 IO, 在异步 IO 操作的时候可以传递变量, 当操作完成之后会回调相关的方法. 那么 NIO 2 的异步非阻塞特性是如何体现的呢? 从之前的描述就可以窥见很多细节:
异步的体现
以 AsynchronousServerSocketChannel 为例, 当调用该类的对象的 accept()方法时, 其返回了一个 Future<AsynchronousSocketChannel > 对象, 调用 accept()方法就像调用传统 I/O 中的 ServerSocket 的 accept()一样, 本质上都是接收客户端连接请求, 只不过 AsynchronousServerSocketChannel 对象没有一直阻塞等待, 而是立马返回一个 Future 对象, 利用 Future 的 get 方法去获取连接结果, Future 对象就是异步操作的结果, 我们还可以利用 Future 的 isDone 方法查询操作完成的状态, 这就是异步的体现.
当然在使用 CompletionHandler 方法中一样的道理, 有新连接建立时会回调 CompletionHandler 接口实现对象中的 completed()方法, 当出现错误时, 会调用 failed 方法.
非阻塞的体现
当调用 AsynchronousServerSocketChannel 对象的 accept()方法后, 返回 Future 对象, 此时线程可以接着干其他事情, 这是非阻塞的, 要想获得操作结果, 就调用 Future 的 isDone 方法查询操作是否完毕, 使用 get()去获取结果, 典型的非阻塞操作. 而在传统 I/O 模型中, 套接字类对象的 accept 方法会一直阻塞等待, 直到有新连接接入进来才停止阻塞.
小结
NIO.2, 也叫 AIO, 了解其异步通道 API, 也能更好地帮助我们去理解异步 IO 操作. 当我们学习 NIO2 的 API 时, 也可以对照 NIO 中的通道 API 进行学习, 它们还是有很多相似的地方.
参考资料 & 鸣谢
NIO.2 入门, 第 1 部分 异步通道 API https://www.ibm.com/developerworks/cn/java/j-nio2-1/index.html
A Guide to NIO2 Asynchronous Socket Channel https://www.baeldung.com/java-nio2-async-socket-channel
深入理解 Java 7: 核心技术与最佳实践 https://book.douban.com/subject/10734875/
来源: https://juejin.im/post/5b87ff2be51d4538a108c912