对于 Java NIO, 其主要由三个组件组成: Channel,Selector 和 Buffer. 关于这三个组件的作用主要如下:
Channel 是客户端连接的一个抽象, 当每个客户端连接到服务器时, 服务器都会为其生成一个 Channel 对象;
Selector 则是 Java NIO 实现高性能的关键, 其本质上使用了 IO 多路复用的原理, 通过一个线程不断的监听多个 Channel 连接来实现多所有这些 Channel 事件进行处理, 这样的优点在于只需要一个线程就可以处理大量的客户端连接, 当有客户端事件到达时, 再将其分发出去交由其它线程处理;
Buffer 从字面上讲是一个缓存, 本质上其是一个字节数组, 通过 Buffer, 可以从 Channel 上读取数据, 然后交由下层的处理器进行处理. 这里的 Buffer 的优点在于其封装了一套非常简单的用于读取和写入数据 API.
关于 Channel 和 Selector 的整体结构, 可以通过下图进行的理解, 这也是 IO 多路复用的原理图:
可以看到, 对于每个 Channel 对象, 其只要注册到 Selector 上, 那么 Selector 上监听的线程就会监听这个 Channel 的事件, 当任何一个 Channel 有对应的事件到达时, Selector 就会将该事件分发到下层的应用进行处理.
本文首先会对 Channel,Selector 和 Buffer 的主要 API 进行讲解, 然后会结合一个服务器与客户端的例子来具体讲解它们三者的使用方式.
核心 API
1.1 Channel
对于 Channel, 其主要的 API 如下:
- // 服务器端:
- // 用于创建一个供服务器使用的 ServerSocketChannel 实例
- ServerSocketChannel.open();
- // 绑定一个服务器端口, 从而提供对外的服务
- ServerSocketChannel.bind();
- // 获取一个客户端的 Channel 连接
- ServerSocketChannel.accept();
- // 客户端:
- // 用于创建一个供客户端使用的 SocketChannel 实例
- SocketChannel.open();
- // 连接参数中指定地址和端口对应的服务器
- SocketChannel.connect();
- // ServerSocketChannel 和 SocketChannel 两者兼备的方法
- // 用于指定服务器处理请求的方式是阻塞的还是非阻塞的, 对于 Java NIO 都是以非阻塞的方式进行处理的
- Channel.configureBlocking();
- // 将当前 channel 注册到一个 Selector 上, 该方法会返回注册之后得到的 SelectionKey 对象.
- // 这里在注册 Channel 的时候可以选择 Selector 将关注该 Channel 的哪些事件, 可选的有如下几种:
- // SelectionKey.OP_CONNECT: 监听 Channel 建立连接事件
- // SelectionKey.OP_READ: 监听 Channel 的可读取事件, 也即客户端已经发送数据过来, 此时可以读取
- // SelectionKey.OP_WRITE: 监听 Channel 的可写事件, 即当前可以写入数据到 Channel 中
- Channel.register(Selector, int);
- 1.2 Selector
对于 Selector, 其主要的 API 如下:
- // 创建一个 Selector 实例
- Selector.open();
- // 监听所有注册的 Channel, 一直阻塞知道有任何一个客户端 Channel 有相应的事件到达,
- // 需要注意的是, 这里的 select() 方法返回的是当前接收到是事件数目, 而不是具体的事件,
- // 具体的事件要通过 selectedKeys() 方法获取
- Selector.select();
- // 获取当前所有有事件到达的客户端 Channel 对应的 SelectionKey 实例
- Selector.selectedKeys();
- 1.3 SelectionKey
- // 判断当前收到的 Channel 的事件是否为 OP_CONNECT 事件
- SelectionKey.isConnectable();
- // 判断当前收到的 Channel 的事件是否为 OP_READ 事件
- SelectionKey.isReadable();
- // 判断当前收到的 Channel 的事件是否为 OP_WRITE 事件
- SelectionKey.isWritable();
- // 返回当前 SelectionKey 中所封装的 Channel 对象
- SelectionKey.channel();
从上面的 API 中可以看出, 这里关于 Channel 处理的大致流程是, 首先由 SocketChannel 或者 ServerSocketChannel 调用 open() 方法创建一个 Channel 对象; 然后调用 Channel.register() 方法将当前 Channel 注册到 Selector 中; 接着通过 Selector.select() 方法监听所有注册的 Channel 的连接, 如果有任何一个有事件到达, 此时这些事件会封装到当前客户端 Channel 对应的 SelectionKey 中, 最后通过 SelectionKey 判断具体是什么类型的事件, 然后对这些事件进行处理.
用法示例
2.1 服务器
服务器端使用的是 ServerSocketChannel, 这里主要是通过监听客户端 Channel, 获取数据进行打印, 然后返回一段数据给客户端. 如下是具体的示例:
- public class Server {
- public static void main(String[] args) throws IOException {
- new Server().start();
- }
- private void start() throws IOException {
- // 创建一个服务器 ServerSocketChannel 对象
- ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
- // 将当前服务器绑定到 8080 端口
- serverSocketChannel.bind(new InetSocketAddress(8080));
- // 设置当前 channel 为非阻塞的模式
- serverSocketChannel.configureBlocking(false);
- // 创建一个 Selector 对象
- Selector selector = Selector.open();
- // 将服务器 ServerSocketChannel 注册到 Selector 上, 并且监听与客户端建立连接的事件
- serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
- while (true) {
- // 监听 ServerSocketChannel 上的事件, 每秒钟循环一次,
- // 这里 select() 方法返回的是当前监听得到的事件数目, 为 0 表示当前没有任何事件到达
- if (selector.select(1000) == 0) {
- System.out.println("has no message...");
- continue;
- }
- // 走到这里说明当前有监听的事件到达, 获取所有监听的 Channel 所对应的 SelectionKey 对象,
- // 这里需要注意的是, 前面我们已经将 ServerSocketChannel 注册到 Selector 中了,
- // 因而对于 ServerSocketChannel, 其监听得到的则是 SelectionKey.OP_CONNECT 事件.
- // 但是下面的代码中, 我们也会将与客户端建立的连接 Channel 注册到 Selector 中,
- // 因而这里 Selector 中也会存在接收到的 SelectionKey.OP_READ 和 OP_WRITE 事件.
- Set<SelectionKey> selectionKeys = selector.selectedKeys();
- // 对监听到的事件进行遍历
- Iterator<SelectionKey> iterator = selectionKeys.iterator();
- while (iterator.hasNext()) {
- SelectionKey key = iterator.next();
- // 这里需要注意的是, Selector 在为每个有事件到达的 Channel 建立 SelectionKey 对象
- // 之后, 其并不会将其移除, 如果我们不进行移除, 那么下次循环时该事件还会再被处理一次,
- // 因而这里要调用 remove() 方法移除该 SelectionKey
- iterator.remove();
- // 如果是有新的客户端 Channel 连接建立, 则处理该事件
- if (key.isAcceptable()) {
- accept(key, selector);
- }
- // 如果客户端连接中有可读取的数据, 则处理该事件
- if (key.isReadable()) {
- read(key);
- }
- // 如果可往客户端连接中写入数据, 则处理该事件
- if (key.isValid() && key.isWritable()) {
- write(key);
- }
- }
- }
- }
- private void accept(SelectionKey key, Selector selector) throws IOException {
- // 这里由于只有 ServerSocketChannel 才会有客户端连接建立事件, 因而这里可以直接将
- // Channel 强转为 ServerSocketChannel 对象
- ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel();
- // 获取客户端的连接
- SocketChannel socketChannel = serverChannel.accept();
- socketChannel.configureBlocking(false);
- // 将客户端连接 Channel 注册到 Selector 中, 并且监听该 Channel 的 OP_READ 事件,
- // 也即等待客户端发送数据到服务器端
- socketChannel.register(selector, SelectionKey.OP_READ);
- }
- private void read(SelectionKey key) throws IOException {
- // 这里只有客户端才会发送数据到服务器, 因而可将其强转为 SocketChannel 对象
- SocketChannel clientChannel = (SocketChannel) key.channel();
- ByteBuffer buffer = ByteBuffer.wrap(new byte[1024]);
- // 从客户端 Channel 中读取数据, 这里 read() 方法返回读取到的数据长度,
- // 如果为 - 1, 则表示客户端断开连接了
- int len = clientChannel.read(buffer);
- if (len == -1) {
- clientChannel.close();
- return;
- }
- // 处理客户端数据
- System.out.println("**********server: read message**********");
- System.out.println(new String(buffer.array(), 0, len));
- // 由于已经读取了客户端数据, 因而这里将对该 Channel 感兴趣的事件修改为
- // SelectionKey.OP_READ 和 OP_WRITE, 用于服务器往该 Channel 中写入数据
- key.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
- }
- private void write(SelectionKey key) throws IOException {
- String message = "message from server";
- ByteBuffer buffer = ByteBuffer.wrap(message.getBytes());
- // 由于上面为客户端 Channel 设置了可供写入数据的事件, 因而这里可以往客户端 Channel 写入数据
- SocketChannel clientChannel = (SocketChannel) key.channel();
- if (clientChannel.isOpen()) {
- System.out.println("**********server: write message**********");
- System.out.println(message);
- // 往客户端 Channel 写入数据
- clientChannel.write(buffer);
- }
- // 写入完成后, 监听客户端会继续发送的数据
- if (!buffer.hasRemaining()) {
- key.interestOps(SelectionKey.OP_READ);
- }
- buffer.compact();
- }
- }
2.2 客户端
客户端使用 SocketChannel 连接服务器, 并且会往服务器中写入数据, 然后等待服务器返回数据并且打印出来. 如下是客户端代码:
- public class Client {
- public static void main(String[] args) throws IOException {
- new Client().start();
- }
- private void start() throws IOException {
- // 创建一个客户端 SocketChannel 对象
- SocketChannel channel = SocketChannel.open();
- // 设置客户端 Channel 为非阻塞模式
- channel.configureBlocking(false);
- // 创建一个供给客户端使用的 Selector 对象
- Selector selector = Selector.open();
- // 注册客户端 Channel 到 Selector 中, 这里客户端 Channel 首先监听的是 OP_CONNECT 事件,
- // 因为其首先必须与服务器建立连接, 然后才能发送和读取数据
- channel.register(selector, SelectionKey.OP_CONNECT);
- // 调用客户端 Channel.connect() 方法连接服务器, 需要注意的是, 该方法的调用必须放在
- // 上述 Channel.register() 方法之后, 否则在注册之前客户端就已经注册完成,
- // 此时 Selector 就无法收到 SelectionKey.OP_CONNECT 事件了
- channel.connect(new InetSocketAddress("127.0.0.1", 8080));
- while (true) {
- // 监听客户端 Channel 的事件, 这里会一直等待, 直到有监听的事件到达.
- // 对于客户端, 首先监听到的应该是 SelectionKey.OP_CONNECT 事件,
- // 然后在后续代码中才会将 SelectionKey.OP_READ 和 WRITE 事件注册
- // 到 Selector 中
- selector.select();
- Set<SelectionKey> selectionKeys = selector.selectedKeys();
- Iterator<SelectionKey> iterator = selectionKeys.iterator();
- while (iterator.hasNext()) {
- SelectionKey key = iterator.next();
- iterator.remove();
- // 监听到客户端 Channel 的 SelectionKey.OP_CONNECT 事件, 并且处理该事件
- if (key.isConnectable()) {
- connect(key, selector);
- }
- // 监听到客户端 Channel 的 SelectionKey.OP_WRITE 事件, 并且处理该事件
- if (key.isWritable()) {
- write(key, selector);
- }
- // 监听到客户端 Channel 的 SelectionKey.OP_READ 事件, 并且处理该事件
- if (key.isReadable()) {
- read(key);
- }
- }
- }
- }
- private void connect(SelectionKey key, Selector selector) throws IOException {
- // 由于是客户端 Channel, 因而可以直接强转为 SocketChannel 对象
- SocketChannel channel = (SocketChannel) key.channel();
- channel.finishConnect();
- // 连接建立完成后就监听该 Channel 的 WRITE 事件, 以供客户端写入数据发送到服务器
- channel.register(selector, SelectionKey.OP_WRITE);
- }
- private void write(SelectionKey key, Selector selector) throws IOException {
- SocketChannel channel = (SocketChannel) key.channel();
- String message = "message from client";
- System.out.println("**client: write message**");
- System.out.println(message);
- // 客户端写入数据到服务器 Channel 中
- channel.write(ByteBuffer.wrap(message.getBytes()));
- // 数据写入完成后, 客户端 Channel 监听 OP_READ 事件, 以等待服务器发送数据过来
- channel.register(selector, SelectionKey.OP_READ);
- }
- private void read(SelectionKey key) throws IOException {
- System.out.println("**client: read message**");
- SocketChannel channel = (SocketChannel) key.channel();
- ByteBuffer buffer = ByteBuffer.wrap(new byte[1024]);
- // 接收到客户端 Channel 的 SelectionKey.OP_READ 事件, 说明服务器发送数据过来了,
- // 此时可以从 Channel 中读取数据, 并且进行相应的处理
- int len = channel.read(buffer);
- if (len == -1) {
- channel.close();
- return;
- }
- System.out.println(new String(buffer.array(), 0, len));
- }
- }
2.3 运行结果
服务器:
has no message...
has no message...
has no message...
has no message...
- **server: read message**
- message from client
- **server: write message**
- message from server
客户端:
- **client: write message**
- message from client
- **client: read message**
- message from server
小结
本文首先讲解了 Java NIO 中三大组件的作用, 然后讲解了各个组件主要的方法及其注意事项, 最后通过一个客户端和服务器实例详细讲解了 Java NIO 是如何使用的
来源: http://www.bubuko.com/infodetail-3057705.html