在中我们介绍了 Java 基本 IO,也就是阻塞式 IO(BIO),在 JDK1.4 版本后推出了新的 IO 系统(NIO),也可以理解为非阻塞 IO(Non-Blocking IO)。引用《Java NIO》中的一段话来解释一下 NIO 出现的原因:
操作系统与 Java 基于流的 I/O 模型有些不匹配。操作系统要移动的是大块数据(缓冲区),这往往是在硬件直接存储器存取( DMA)的协助下完成的。而 JVM 的 I/O 类喜欢操作小块数据——单个字节、几行文本。结果,操作系统送来整缓冲区的数据, java.io 的流数据类再花大量时间把它们拆成小块,往往拷贝一个小块就要往返于几层对象。操作系统喜欢整卡车地运来数据, java.io 类则喜欢一铲子一铲子地加工数据。有了 NIO,就可以轻松地把一卡车数据备份到您能直接使用的地方( ByteBuffer 对象)。但是 Java 里的 RandomAccessFile 类是比较接近操作系统的方式。
可以看出 Java 原生的 IO 模型之所以慢,是因为与操作系统的操作方式不匹配造成的,那么 NIO 之所以比 BIO 快主要就是用到了缓冲区相关的技术,接下来慢慢介绍这些技术点。
下图描述了操作系统中数据是如何从外部存储向运行中的进程内存区域移动的过程:进程使用 read() 系统调用要求缓冲区被填充满。内核随即向磁盘控制器发出指令,要求其从磁盘读取数据。磁盘控制器通过 DMA 直接把磁盘上的数据写入缓冲区,这一步不需要 CPU 的参与。当缓冲区填满时,内核将数据从临时缓冲区拷贝到进程执行 read() 调用时指定的缓冲区。
这里需要主要为什么要执行系统调用这样一个中间步骤而不是直接 DMA 到进程的缓冲区,是因为用户空间是无法直接操作硬件的,另外磁盘这种块存储设备操作的是固定大小的数据块,而用户请求的则是非规则大小的数据,内核空间在这里的作用就是分解、重组的作用。
Java NIO 主要依赖的组件有三个:缓冲区 Buffer、通道 Channel 和选择器 Selector。
Buffer 家族主要有这么些个成员,根据类名也大概能猜到它们的用处,用的最多的是 ByteBuffer,在下面的例子中也会主要用到它。
在这里就不仔细讲 Buffer 类的 API 了,因为需要用的时候可以去查 Java Doc,而以几个常用的操作来讲述一下怎么使用 Buffer。
容量(capacity):缓冲区的最大大小
上界(limit):缓冲区当前的大小
位置(position):下一个要读写的位置,由 get() 和 put() 更新
标记(mark): 备忘位置,由 mark() 来指定 mark = position,由 reset() 来指定 position=mark
它们之间的大小关系:
0 <= mark <= position <= limit <= capacity
一种最常用的方式是:
- ByteBuffer buffer = ByteBuffer.allocate(1024);
这种方法是创建一个 1024 字节大小的缓冲区。也可以用下面这种方式来包装自己创建的字节数组。
byte[] bytes = new byte[1024];
ByteBuffer buffer = ByteBuffer.wrap(bytes);
Buffer 在填充完毕后需要传递到一个通道中,这时如果直接读取 Buffer,其实是什么都读不到的。因为 Buffer 的设计中是有一个指针概念的,指向当前的位置,当一个 Buffer 填充完毕时指针是指向末尾的,因此在读取时应该将指针指向 Buffer 的头部,简单的方法就是使用下面这个方法:
- Buffer.flip();
flip 的实现如下:
- public final Buffer flip() {
- limit = position;
- position = 0;
- mark = -1;
- return this;
- }
可以看出 flip 其实是把当前的 limit 从 capacity 变成了 position,又把 position 放到了缓冲区的起点,并取消了 mark。
- Buffer.clear();
clear 的实现如下:
- public final Buffer clear() {
- position = 0;
- limit = capacity;
- mark = -1;
- return this;
- }
clear 函数就是将 position 放到起点,并重置 limiti 为 capacity,以及取消 mark。
- Buffer.rewind();
rewind 的实现如下:
- public final Buffer rewind() {
- position = 0;
- mark = -1;
- return this;
- }
rewind 和 flip 的区别在于没有改变 limit 的值。
- Buffer.compact()
开始我不是很理解 Channel 这个东西为什么要存在,看了书才慢慢明白,缓冲区为我们装载了数据,但是数据的写入和读取并不能直接进行 read() 和 write() 这样的系统调用,而是 JVM 为我们提供了一层对系统调用的封装。而 Channel 可以用最小的开销来访问操作系统本身的 IO 服务,这就是为什么要有 Channel 的原因。
下面来讲讲常用的几个 Channel 类及其常用的方法。
I/O 从广义上可以分为 File I/O 和 Stream I/O,对应到通道来说就有文件通道和 socket 通道,具体的说是 FileChannle 类和 SocketChannel、ServerSocketChannel 和 DatagramChannel 类。
它们之间的区别还是很大的,从继承关系上来看:
- public abstract class FileChannel extends AbstractInterruptibleChannel implements SeekableByteChannel,
- GatheringByteChannel,
- ScatteringByteChannel
FileChannel 主要是继承了可中断接口,而对于 socket 相关的 Channel 类都继承 AbstractSelectableChannel,这是选择器(Selector)相关的通道,在下一节中具体讲解。
- public abstract class SocketChannel extends AbstractSelectableChannel implements ByteChannel,
- ScatteringByteChannel,
- GatheringByteChannel,
- NetworkChannel
FileChannel 只能通过工厂方法来实例化,那就是调用 RandomAccessFile、FileInputStream 和 FileOutputStream 的 getChannel() 方法。如:
- RandomAccessFile file = new RandomAccessFile("a.txt", "r");
- FileChannel fc = file.getChannel();
先看看 FileChannel 提供的方法句柄:
- public abstract int read(ByteBuffer dst) throws IOException; //把通道中数据传到目的缓冲区中,dst是destination的缩写
- public abstract int write(ByteBuffer src) throws IOException; //把源缓冲区中的内容写到指定的通道中去
从句柄可以看出 FileChannel 是既可以读又可以写的,是全双工的。下面这个例子用来展示 FileChannel 是如何进行读和写的。
- public class FileChannelTest {
- public static void readFile(String path) throws IOException {
- FileChannel fc = new FileInputStream(path).getChannel();
- ByteBuffer buffer = ByteBuffer.allocate(128);
- StringBuilder sb = new StringBuilder();
- while ((fc.read(buffer)) >= 0) {
- //翻转指针
- buffer.flip();
- //remaining = limit - position
- byte[] bytes = new byte[buffer.remaining()];
- buffer.get(bytes);
- String string = new String(bytes, "UTF-8");
- sb.append(string);
- //清空buffer
- buffer.clear();
- }
- System.out.println(sb.toString());
- }
- public static void writeFile(String path, String string) throws IOException {
- FileChannel fc = new FileOutputStream(path).getChannel();
- ByteBuffer buffer = ByteBuffer.allocate(10);
- int current = 0;
- int len = string.getBytes().length;
- while (current < len) {
- for (int i = 0; i < 10; i++) {
- if (current + i >= len) break;
- buffer.put(string.getBytes()[current + i]);
- }
- current += buffer.position();
- buffer.flip();
- fc.write(buffer);
- buffer.clear();
- }
- }
- public static void main(String[] args) throws IOException {
- String in ="D:/in.txt";
- String out = "D:/out.txt";
- readFile( in );
- writeFile(out, "hello world");
- readFile(out);
- }
- }
分析一下上面这段代码,在 readFile() 函数中,通过 FileInputStream.getChannel() 得到 FileChannel 对象,并创建 ByteBuffer 对象,接着利用 FileChannel 的 read 方法填充 buffer,得到填充完的 buffer 之后我们将 buffer 的当前指针翻转一下接着利用 buffer 的 get 方法把数据放到 byte 数组中,接着就可以读取数据了。
读取文件的整个过程相比原生的 I/O 方法还是略显麻烦,但是我们如果把数据看成一堆煤矿,把 ByteBuffer 看成装煤的矿车,而 FileChannel 看成是运煤的矿道,那么上面的过程就演变成了:先打通一条矿道,然后把煤矿装在小车里运出来。形象的记忆更利于理解这个过程。
而 writeFile() 函数也是类似,为了更好的理解 Buffer 的属性,我特意将 buffer 的大小设置为 10,为要写入的字符串长度为 11 个字节。首先还是通过 FileOutputStream.getChannel() 方法得到 FileChannel 对象,并创建一个 10 字节大小的缓冲区,接着定义一个整型变量 current 指向要写入的字符串的当前下标,每次向 buffer 中 put10 个字节,并更新 current,通过 buffer.position() 方法可以得到 buffer 被填充之后指针的位置,也就是缓冲区里的字节个数,然后翻转指针,最后通过 FileChannel.write(buffer) 方法将 buffer 写入到文件中。
同样考虑一下形象化的过程:我们首先把煤矿装入小车 (buffer.put()),并打开一条通往矿山的矿道 (FileOutputStream.getChannel()),接着把煤矿运输进去 (FileChannel.write(buffer))。还是很容易理解的吧!
在中介绍了阻塞式 TCP 的使用,接下来会介绍一下非阻塞式的 TCP 使用。
Socket 通道与文件通道有着不同的特征,最显著的就是可以运行非阻塞模式并且是可以选择的。在 2.2.1 节中我们讲到 Socket 通道都继承自 AbstractSelectableChannel 类,而文件通道没有,而这个类就是 Socket 通道拥有非阻塞和可选择特点的关键。下面是 SelectableChannel 的几个方法句柄:
- public abstract boolean isBlocking();
- public abstract SelectableChannel configureBlocking(boolean block) throws IOException;
从这两个方法句柄可以看到,设置一个 socket 通道的非阻塞模式只需要:
- socketChannel.configureBlocking(false)
即可。而有条件的选择 (readiness selection) 是一种可以用来查询通道的机制,该查询可以判断通道是否准备好执行一个目标操作,比如 read、write 或 accept。这个特性是在 SelectableChannel 类和 SelectionKey 类中进行了定义。
- public static final int OP_READ = 1 << 0;
- public static final int OP_WRITE = 1 << 2;
- public static final int OP_CONNECT = 1 << 3;
- public static final int OP_ACCEPT = 1 << 4;
SelectionKey 中的四个常量定义了 socket 通道的四种状态,而 SelectableChannel 的 register 方法正好返回了 SelectionKey 对象。
- public abstract SelectionKey register(Selector sel, int ops, Object att) throws ClosedChannelException;
socket 通道与文件通道不同,并不是通过 socket.getChannel() 来创建对象(尽管 socket 对象有这个方法),而是通过 SocketChannel.open() 这样的静态工厂方法去创建对象。每一个 socket 通道有与之关联的一个 socket 对象,却并不是所有的 socket 对象都有一个关联的通道,如果用传统的方法创建了一个 socket 对象,则它不会有一个关联的通道并且 getChannel() 方法总是返回 null。
- SocketChannel sc = SocketChannel.open();
- sc.configureBlocking(false);
这样就创建了一个非阻塞的 socket 通道。
- public abstract class ServerSocketChannel extends AbstractSelectableChannel {
- public static ServerSocketChannel open() throws IOException;
- public abstract ServerSocket socket();
- public abstract ServerSocket accept() throws IOException;
- public final int validOps();
- }
ServerSocketChannel 与 SocketChannel 和 DatagramChannel 不同,它本身是不传输数据的,提供的接口非常简单,如果要进行数据读写,需要通过 ServerSocketChannel.socket() 方法返回一个与之关联的 ServerSocket 对象来进行。
- ServerSocketChannel ssc = ServerSocketChannel.open();
- ServerSocket ss = ssc.socket();
- ss.bind(new InetSocketAddress(port));
ServerSocketChannel 同 ServerSocket 一样也有 accept() 方法,当调用 ServerSocket 的 accept() 函数时只能是阻塞式的,而调用 ServerSocketChannel 的 accept() 函数却可以是非阻塞式。
下面这个例子展示了 ServerSocketChannel 的用法:
- public class Server {
- static int port = 20001;
- public static void main(String[] args) throws IOException,
- InterruptedException {
- ServerSocketChannel ssc = ServerSocketChannel.open();
- ssc.socket().bind(new InetSocketAddress(port));
- ssc.configureBlocking(false);
- String string = "hello client";
- ByteBuffer buffer = ByteBuffer.wrap(string.getBytes());
- ByteBuffer in =ByteBuffer.allocate(1024);
- System.out.println("Server wait for connection...");
- while (true) {
- SocketChannel sc = ssc.accept();
- if (sc == null) {
- TimeUnit.SECONDS.sleep(1);
- } else {
- //rewind只是将position调到0,不会改变Limit的值,而flip是将limit调整成position,再把position改成0
- System.out.println("Server get a connection...");
- sc.read( in ); in .flip();
- buffer.rewind();
- sc.write(buffer);
- System.out.println("From client:" + new String( in .array(), "UTF-8"));
- }
- }
- }
- }
选择器其实是一种多路复用机制在 Java 语言中的应用,在学习 Selector 之前有必要学习一下 I/O 多路复用的概念。
在中我们已经看到对于每个客户端请求都分配一个线程的设计,或者是利用线程池来处理客户端请求,但是这样的设计对于处理客户端有大量请求的情况都束手无策。原因在于首先线程非常消耗系统资源,其次阻塞式的设计在某一个请求发送的数据很大时会使其他请求等待很久。那么究竟有没有其他方法来解决这个问题呢?早在上世纪 80 年代在 Unix 系统中就已经提出 select 模型来解决这个问题,在之后对 select 进行优化又提出了 poll 模型和 epoll 模型(Linux 专有)。
select/poll/epoll 其实都是一种多路复用模型,什么是多路复用,开始听见这个名词我也是一脸懵逼,觉得好像很高大上很难理解的样子。后面通过看书和看知乎上的形象化描述,慢慢理解了其实多路复用也没有想象着那么难。我们如果把每个客户端请求看成一个电路,如下图,那么是否有必要为每条电路都分配一条专有的线路呢?还是当电流来了进行开关切换?很明显,后者只需要一个开关就可以节省大量的不必要开销。select 模型其实就是这样做的,监控所有的 socket 请求,当某个 socket 准备好(read/write/accept)时就进行处。但是如何做到监控所有 socket 的状态呢,select 做的很简单,也许你也想到了,就是去轮询所有 socket 的状态,这样很明显当 socket 数量比较大时效率非常低。并且 select 对于监控的 socket 数量有限制,默认是 1024 个。poll 模型进行了一些改进,但是并没有本质的改变。到了 epoll 模型,就有了非常大的改观。假象另一个场景,如果你是一个监考老师,考试结束时要去收卷子,你按照常理一个一个的收着,一旦有一个学生还没写完,于是你就会卡(阻塞)在那,并且整个轮询一遍下来非常慢。所以你想到了吗?让那些已经做完的学生举手告知你他已经做完了,你再过去收一下卷子即可。这样很明显阻塞会大幅度减少。这就是 epoll,让那些已经准备好的 socket 发出通知,然后来处理它。
如果还是不理解,可以看看。
好了,废话这么多,已经是可以理解多路复用是什么了。Java 语言直到 JDK1.4 版本才有多路复用这个概念,很大原因也是因为没人用 Java 去写服务器,例如著名的 Apache 和 Nginx 都是用 C/C++ 写的。接下来对 NIO 中多路复用的实现进行介绍。
NIO 处理多路复用请求只需要三个组件:可选择的通道(SelectableChannels)、选择器(Selector)和选择键(SelectionKey),他们之间的关系如下图所示:
可选择的通道可以主动注册到一个选择器上,并指定对哪些动作是感兴趣的。这个注册行为会返回一个选择键,选择键封装了该通道和选择器之间的注册关系,包含两个比特集:指示该注册关系所关心的通道操作;通道已经准备好的操作。选择器是核心组件,它管理着注册在其上的通道集合的信息和它们的就绪状态。值得注意的是,通道在注册到一个选择器之前,必须设置为非阻塞模式。原因在。
通过静态工厂方法创建一个选择器。
- Selector selector = Selector.open();
这是通道拥有的方法,先看看方法句柄:
- public abstract SelectionKey register(Selector sel, int ops, Object att) throws ClosedChannelException;
- public final SelectionKey register(Selector sel, int ops) throws ClosedChannelException {
- return register(sel, ops, null);
- }
值得注意的是第二个参数 ops,这个参数表示了该通道感兴趣的操作,所有的操作包括读(read)、写(write)、连接(connect)和接受(accept)。并不是所有通道都支持这些操作,比如 SocketChannel 就没有 accept 这个操作,因为这是专属于 ServerSocketChannel 的操作。可以通过调用 Channel.validOps() 来查询支持的操作。
第三个参数是用来传递一个对象的引用,在调用新生成的选择键的 attach() 方法时会返回该对象的引用。
选择器的核心功能是选择过程,选择器实际上是对 select()、poll() 等本地系统调用的一个封装。每一个选择器会维护三个键集合:已注册的键集合、已选择的键集合和已取消的键集合。通过执行 Selector.select()、Selector.select(int timeout) 或 Selector.selectNow(),选择过程被调用,这时会执行以下步骤:
好不容易才写完上面这段,因为我在看原书的时候看了 2-3 遍才看懂,过程还是比较复杂的,我觉得是时候去看看 Unix 中的 select() 是怎么做的,也许这样更利于理解这个选择过程。
说了这么多原理,不知道你晕没晕,反正我是快晕了。这时候来一段实战代码,告诉你了解了这么多,到底该怎么用!
通常的做法如下:在选择器上调用一次 select 操作(这会更新已选择键的集合),然后遍历 selectedKeys 返回的键的集合。接着键将从已选择的键的集合中被移除(通过 Iterator.remove() 方法),然后检测下一个键。完成后,继续下一次 select 操作。
服务端程序演示:
- public class SelectorTest {
- public static void main(String[] args) throws IOException {
- new SelectorTest().select();
- }
- public void select() throws IOException {
- //创建选择器
- Selector selector = Selector.open();
- //创建serverChannel
- ServerSocketChannel ssc = ServerSocketChannel.open();
- //设置为非阻塞模式
- ssc.configureBlocking(false);
- //绑定监听的地址
- ssc.socket().bind(new InetSocketAddress(20000), 1024);
- //将serverChannel注册到选择器上,监听accept事件,返回选择键
- ssc.register(selector, SelectionKey.OP_ACCEPT);
- while (true) {
- //此次选择过程准备就绪的通道数量
- int num = selector.select();
- if (num == 0) {
- //若没有准备好的就继续循环
- continue;
- }
- //返回已就绪的键集合
- Iterator < SelectionKey > it = selector.selectedKeys().iterator();
- while (it.hasNext()) {
- SelectionKey key = it.next();
- handle(selector, key);
- //因为已经处理了该键,所以把当前的key从已选择的集合中去除
- it.remove();
- }
- }
- }
- public void handle(Selector selector, SelectionKey key) throws IOException {
- if (key.isValid()) {
- //当一个ServerChannel为accept状态时,注册这个ServerChannel的SocketChannel为可读取状态
- if (key.isAcceptable()) {
- ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel();
- SocketChannel channel = serverChannel.accept();
- //把通道注册到选择器之前要设置为非阻塞,否则会报异常
- serverChannel.configureBlocking(false);
- channel.register(selector, SelectionKey.OP_READ);
- }
- //如果channel是可读取状态,则读取其中的数据
- if (key.isReadable()) {
- //只有SocketChannel才能读写数据,所以如果是可读取状态,只能是SocketChannel
- SocketChannel sc = (SocketChannel) key.channel();
- ByteBuffer in =ByteBuffer.allocate(1024);
- //将socketChannel中的数据读入到buffer中,返回当前字节的位置
- int readBytes = sc.read( in );
- if (readBytes > 0) {
- //把buffer的position指针指向buffer的开头
- in .flip();
- byte[] bytes = new byte[ in .remaining()]; in .get(bytes);
- String body = new String(bytes, "UTF-8");
- System.out.println("The server receive : " + body);
- //把response输出到socket中
- doWrite(sc, "Hello client");
- } else if (readBytes < 0) {
- key.cancel();
- sc.close();
- }
- }
- }
- }
- private void doWrite(SocketChannel sc, String response) throws IOException {
- //把服务器端返回的数据写到socketChannel中
- if (response == null && response.trim().length() > 0) {
- byte[] bytes = response.getBytes();
- ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
- writeBuffer.put(bytes);
- writeBuffer.flip();
- sc.write(writeBuffer);
- }
- }
- }
代码相较于复杂了太多倍,但是基本思路跟我上面那段话写的是一样的,而且基本每一段代码都写了注释,耐心看下去肯定看的懂。我就不再解释这段代码啦。
客户端演示:
- public class Client {
- public static final int PORT = 20000;
- public static final String HOST = "127.0.0.1";
- private volatile boolean stop = false;
- public static void main(String[] args) throws IOException {
- new Client().select();
- }
- public void select() throws IOException {
- // 创建选择器
- Selector selector = Selector.open();
- // 创建SocketChannel
- SocketChannel sc = SocketChannel.open();
- // 设置为非阻塞模式
- sc.configureBlocking(false);
- try {
- doConnect(selector, sc);
- } catch(Exception e) {
- e.printStackTrace();
- System.exit(1);
- }
- while (!stop) {
- int num = selector.select();
- if (num == 0) {
- continue;
- }
- Iterator < SelectionKey > it = selector.selectedKeys().iterator();
- while (it.hasNext()) {
- SelectionKey key = it.next();
- try {
- handleKeys(selector, key);
- } catch(Exception e) {
- e.printStackTrace();
- if (key != null) {
- key.cancel();
- if (key.channel() != null) {
- key.channel().close();
- }
- }
- }
- // 因为已经处理了该键,所以把当前的key从已选择的集合中去除
- it.remove();
- }
- }
- if (selector != null) {
- selector.close();
- }
- }
- private void doWrite(SocketChannel sc, String response) throws IOException {
- if (response != null && response.trim().length() > 0) {
- byte[] bytes = response.getBytes();
- ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
- writeBuffer.put(bytes);
- writeBuffer.flip();
- sc.write(writeBuffer);
- if (!writeBuffer.hasRemaining()) {
- System.out.println("Send msg successfully");
- }
- }
- }
- private void handleKeys(Selector selector, SelectionKey key) throws IOException {
- if (key.isValid()) {
- SocketChannel sc = (SocketChannel) key.channel();
- // 判断是否连接成功
- if (key.isConnectable()) {
- if (sc.finishConnect()) {
- sc.register(selector, SelectionKey.OP_READ);
- doWrite(sc, "Hello Server");
- } else {
- System.exit(1);
- }
- }
- if (key.isReadable()) {
- ByteBuffer in =ByteBuffer.allocate(1024);
- // 将socketChannel中的数据读入到buffer中,返回当前字节的位置
- int readBytes = sc.read( in );
- if (readBytes > 0) {
- // 把buffer的position指针指向buffer的开头
- in .flip();
- byte[] bytes = new byte[ in .remaining()]; in .get(bytes);
- String body = new String(bytes, "UTF-8");
- System.out.println("The Client receive : " + body);
- this.stop = true;
- } else if (readBytes < 0) {
- // 对端链路关闭
- key.cancel();
- sc.close();
- } else {
- // 读到0字节,忽略
- }
- }
- }
- }
- private void doConnect(Selector selector, SocketChannel sc) throws IOException {
- if (sc.connect(new InetSocketAddress(HOST, PORT))) {
- System.out.println("Client connect successfully...");
- // 如果直接连接成功,则注册读操作
- sc.register(selector, SelectionKey.OP_READ);
- doWrite(sc, "Hello server!");
- } else {
- // 如果没有连接成功,则注册连接操作
- sc.register(selector, SelectionKey.OP_CONNECT);
- }
- }
- }
客户端跟服务端很相似,唯一不同的是服务端需要监测的 socket 行为是 OP_ACCEPT 和 OP_READ,而客户端需要监控的是 OP_CONNECT 和 OP_READ,其他的区别不是很大。
依次运行服务器端和客户端,结果如下:
代码在我的上也可以找到。
花了大概三天的时间,把《Java NIO》这本书看了一遍并记录了下来学习过程,并且结合《Netty 权威指南》中的例子去实践了一下,慢慢感觉到 NIO 的魅力。反思一下学习的比较慢的原因,应该是对 Unix 上的 I/O 模型不熟悉导致的,所以觉得接下来好好学习一下 select、poll、epoll,加深对多路复用的理解。
本文中可能存在理解有偏差的地方,也请多多指正。
1.
2.
来源: http://www.cnblogs.com/puyangsky/p/5840873.html