Java NIO 是 new IO 的简称, 是一种可以替代 Java IO 的一套新的 IO 机制. 它提供了一套不同于 Java 标准 IO 的操作机制, 严格来说, NIO 与并发并无直接关系, 但是使用 NIO 技术可以大大提高线程的使用效率. Java NIO 设计的基础内容有通道 (Channel), 缓冲区 (Buffer),Selector(选择器). 下面说说这几个内容
1) 通道 (Channel)
Channel:Channel 是一对象, 可以通过它读取和写入数据. 可以把它看做是 IO 中的流, 不同的是:
Channel 是双向的, 既可以读又可以写, 而流是单向的
Channel 可以进行异步的读写
对 Channel 的读写必须通过 buffer 对象
正如上面提到的, 所有数据都通过 Buffer 对象处理, 所以不会将字节写入到 Channel 中, 而是将数据写入到 Buffer 中; 不会从 Channel 中读取字节, 而是将数据从 Channel 读入 Buffer, 再从 Buffer 获取这个字节. Channel 可以比流更好地反映出底层操作系统的真实情况. 特别是在 Unix 模型中, 底层操作系统通常都是双向的. 在 Java NIO 中的 Channel 主要有如下几种类型:
FileChannel: 从文件读取数据的
DatagramChannel: 读写 UDP 网络协议数据
SocketChannel: 读写 TCP 网络协议数据
ServerSocketChannel: 可以监听 TCP 连接
2) 缓冲区 (Buffer)
Buffer 是一对象, 它包含一些要写入或者读到的 Stream 对象. 应用程序不能直接对 Channel 进行读写操作, 而必须通过 Buffer 来进行, 即 Channel 是通过 Buffer 来读写数据的. 在 NIO 中, 所有的数据都是用 Buffer 处理的, 它是 NIO 读写数据的中转池. Buffer 实质上是一个数组, 通常是一个字节数据, 但也可以是其他类型的数组. 但一个缓冲区不仅仅是一个数组, 重要的是它提供了对数据的结构化访问, 而且还可以跟踪系统的读写进程. 使用 Buffer 读写数据一般遵循以下四个步骤:
写入数据到 Buffer;
调用 flip() 方法;
从 Buffer 中读取数据;
调用 clear() 方法或者 compact() 方法.
当向 Buffer 写入数据时, Buffer 会记录下写了多少数据. 一旦要读取数据, 需要通过 flip() 方法将 Buffer 从写模式切换到读模式. 在读模式下, 可以读取之前写入到 Buffer 的所有数据. 一旦读完了所有的数据, 就需要清空缓冲区, 让它可以再次被写入. 有两种方式能清空缓冲区: 调用 clear() 或 compact() 方法. clear() 方法会清空整个缓冲区. compact() 方法只会清除已经读过的数据. 任何未读的数据都被移到缓冲区的起始处, 新写入的数据将放到缓冲区未读数据的后面. Buffer 主要有如下几种:
- ByteBuffer
- CharBuffer
- DoubleBuffer
- FloatBuffer
- IntBuffer
- LongBuffer
- ShortBuffer
CopyFile 执行三个基本的操作: 创建一个 Buffer, 然后从源文件读取数据到缓冲区, 然后再将缓冲区写入目标文件.
- public static void copyFileUseNIO(String src,String dst) throws IOException{
- // 声明源文件和目标文件
- FileInputStream fi=new FileInputStream(new File(src));
- FileOutputStream fo=new FileOutputStream(new File(dst));
- // 获得传输通道 channel
- FileChannel inChannel=fi.getChannel();
- FileChannel outChannel=fo.getChannel();
- // 获得容器 buffer
- ByteBuffer buffer=ByteBuffer.allocate(1024);
- while(true){
- // 判断是否读完文件
- int eof =inChannel.read(buffer);
- if(eof==-1){
- break;
- }
- // 重设一下 buffer 的 position=0,limit=position
- buffer.flip();
- // 开始写
- outChannel.write(buffer);
- // 写完要重置 buffer, 重设 position=0,limit=capacity
- buffer.clear();
- }
- inChannel.close();
- outChannel.close();
- fi.close();
- fo.close();
- }
三)Selector(选择器对象)
Selector 是一个对象, 它可以注册到很多个 Channel 上, 监听各个 Channel 上发生的事件, 并且能够根据事件情况决定 Channel 读写. 这样, 通过一个线程管理多个 Channel, 就可以处理大量网络连接了. 有了 Selector, 我们就可以利用一个线程来处理所有的 channels. 线程之间的切换对操作系统来说代价是很高的, 并且每个线程也会占用一定的系统资源. 所以, 对系统来说使用的线程越少越好. Selector 就是注册对各种 I/O 事件的地方, 而且当那些事件发生时, 就是这个对象告诉您所发生的事件.
Selector selector = Selector.open();
为了能让 Channel 和 Selector 配合使用, 我们需要把 Channel 注册到 Selector 上. 通过调用 channel.register() 方法来实现注册:
- channel.configureBlocking(false);
- SelectionKey key =channel.register(selector,SelectionKey.OP_READ);
注意, 注册的 Channel 必须设置成异步模式 才可以, 否则异步 IO 就无法工作, 这就意味着我们不能把一个 FileChannel 注册到 Selector, 因为 FileChannel 没有异步模式, 但是网络编程中的 SocketChannel 是可以的.
register() 的调用的返回值是一个 SelectionKey, 代表这个通道在此 Selector 上注册. 当某个 Selector 通知您某个传入事件时, 它是通过提供对应于该事件的 SelectionKey 来进行的. SelectionKey 还可以用于取消通道的注册.
SelectionKey 中包含如下属性:
(1)interestSet
把 Channel 注册到 Selector 来监听感兴趣的事件, interestSet 就是你要选择的感兴趣的事件的集合. 可以通过 SelectionKey 对象来读写 interest set:
- int interestSet = selectionKey.interestOps();
- boolean isInterestedInAccept = interestSet & SelectionKey.OP_ACCEPT;
- boolean isInterestedInConnect = interestSet & SelectionKey.OP_CONNECT;
- boolean isInterestedInRead = interestSet & SelectionKey.OP_READ;
- boolean isInterestedInWrite = interestSet & SelectionKey.OP_WRITE;
通过上面例子可以看到, 我们可以通过用 & 和 SelectionKey 中的常量做运算, 从 SelectionKey 中找到我们感兴趣的事件.
(2)readySet
readySet 是通道已经准备就绪进行操作的集合. 在一次选 Selection 之后, 你应该会首先访问这个 readySet.Selection 将在下一小节进行解释. 可以这样访问 ready 集合, 也可以用像检测 interest 集合那样的方法, 来检测 Channel 中什么事件或操作已经就绪:
- int readySet = selectionKey.readyOps();
- selectionKey.isAcceptable();
- selectionKey.isConnectable();
- selectionKey.isReadable();
- selectionKey.isWritable();
(3)Channel 和 Selector
我们可以通过 SelectionKey 获得 Selector 和注册的 Channel:
- Channel channel = selectionKey.channel();
- Selector selector = selectionKey.selector();
(4)Attach 一个对象
可以将一个对象或者更多信息 attach 到 SelectionKey 上, 这样就能识别某个给定的通道. 例如, 可以附加与通道一起使用的 Buffer, 或包含聚集数据对象. 使用方法如下:
- selectionKey.attach(theObject);
- Object attachedObj = selectionKey.attachment();
还可以在用 register() 方法向 Selector 注册 Channel 的时候附加对象. 如:
SelectionKey key = channel.register(selector, SelectionKey.OP_READ, theObject);
NIO 多路复用
主要步骤和元素:
首先, 通过 Selector.open() 创建一个 Selector, 作为类似调度员的角色.
然后, 创建一个 ServerSocketChannel, 并且向 Selector 注册, 通过指定 SelectionKey.OP_ACCEPT, 告诉调度员, 它关注的是新的连接请求.
注意, 为什么我们要明确配置非阻塞模式呢? 这是因为阻塞模式下, 注册操作是不允许的, 会抛出 IllegalBlockingModeException 异常.
Selector 阻塞在 select 操作, 当有 Channel 发生接入请求, 就会被唤醒.
在具体的方法中, 通过 SocketChannel 和 Buffer 进行数据操作
IO 都是同步阻塞模式, 所以需要多线程以实现多任务处理. 而 NIO 则是利用了单线程轮询事件的机制, 通过高效地定位就绪的 Channel, 来决定做什么, 仅仅 select 阶段是阻塞的, 可以有效避免大量客户端连接时, 频繁线程切换带来的问题, 应用的扩展能力有了非常大的提高
下面用 NIO 设计一个 Echo 服务器:
首先定义一个 Selector 和线程池
- private Selector selector;
- private ExecutorService tp = Executors.newCachedThreadPool();
selector 处理所有的网络连接, tp 线程池处理每一个客户端请求. 为了统计服务器线程在客户端花费的时间, 还需要定义一个时间统计有关的变量, 用于统计在某一个 Socket 上花费的时间, time_stat 的 key 为 Socket,value 为时间戳:
public static Map<Socket,Long> time_stat = new HashMap<Socket,Long>(10240);
下面来看一下 NIO 服务器的核心代码, startServer() 方法用于启动 NIO Server.
- private void startServer() throws IOException{
- this.selector = SelectorProvider.provider().openSelector();
- ServerSocketChannel ssc = ServerSocketChannel.open(); // 服务端 SocketChannel
- ssc.configureBlocking(false); // 设置为非阻塞模式
- InetSocketAddress isa = new InetSocketAddress(InetAddress.getLocalHost(),8000);// 使用 8000 端口
- ssc.socket().bind(isa);
- SelectionKey acceptKey = ssc.register(selector, SelectionKey.OP_ACCEPT); // 将 ServerSocketChannel 绑定到 Selector 上, 感兴趣的时间为 Accept
- for(;;){ // 主要任务是等待 - 分发网络消息
- this.selector.select(); // 阻塞方法, 如果当前没有准备好的的数据, 就会等待, 如果有的话返回已经准备好的 SelectionKey 数量
- Set<SelectionKey> readyKeys = this.selector.selectedKeys(); // 获取准备好的 SelectionKey
- Iterator<SelectionKey> i = readyKeys.iterator();
- long e = 0;
- while(i.hasNext()){
- SelectionKey sk = i.next();
- i.remove();// 处理一个删除一个, 不然可能重复处理
- if(sk.isAcceptable()){
- doAccept(sk);
- }else if(sk.isValid() && sk.isReadable()){// 判断是否可以读
- if(!time_stat.containsKey(((SocketChannel) sk.channel()).socket())){
- time_stat.put(((SocketChannel) sk.channel()).socket(), System.currentTimeMillis());
- }
- doRead(sk);
- }else if(sk.isValid() && sk.isWritable()){ // 判断是否可以写
- doWrite(sk);
- e = System.currentTimeMillis();
- long b = time_stat.remove(((SocketChannel) sk.channel()).socket());
- System.out.println("spend:"+(b-e)+"ms");
- }
- }
- }
- }
在了解服务端整体框架后, 下面从具体的方法中看看几个主要方法的使用:
- private void doAccept(SelectionKey sk) {
- ServerSocketChannel server = (ServerSocketChannel) sk.channel();
- SocketChannel clientChannel;
- try {
- clientChannel = server.accept();
- clientChannel.configureBlocking(false);// 非阻塞
- SelectionKey clientKey = clientChannel.register(selector, SelectionKey.OP_READ);// 将 Channel 注册到 Selector 上, 并告诉 Selector 对读感兴趣, Channel 准备好读时给线程一个通知
- EchoClient ec = new EchoClient();
- clientKey.attach(ec);// 客户端实例作为附件, 附加到表示这个连接的 SelectionKey 上, 可以在整个连接过程共享 ec
- InetAddress clientAddress = clientChannel.socket().getInetAddress();
- System.out.println("Accepted connection from"+clientAddress.getHostAddress());
- } catch (Exception e) {}
- }
EchoClient 封装一个队列, 保存在需要恢复给这个客户端所有信息上, 这样再进行回复, 只要 outq 对象中弹出元素即可.
- public class EchoClient {
- private LinkedList<ByteBuffer> outq;
- public EchoClient() {
- this.outq = new LinkedList<ByteBuffer>();
- }
- public LinkedList<ByteBuffer> getOutq() {
- return outq;
- }
- public void enqueue(ByteBuffer bb) {
- this.outq.addFirst(bb);
- }
- }
下面看看 doRead() 方法的实现.
- private void doRead(SelectionKey sk) {
- SocketChannel c = (SocketChannel) sk.channel();
- ByteBuffer bb = ByteBuffer.allocate(8192);
- int len;
- try {
- len = c.read(bb);// 存放读取的数据
- if(len<0){
- disconnect(sk);
- return;
- }
- } catch (Exception e) {
- System.out.println("Failed to read from client!");
- e.printStackTrace();
- disconnect(sk);
- return;
- }
- bb.flip();
- tp.execute(new HandleMsg(sk,bb)); // 线程池处理数据
- }
HandleMsg 的实现很简单:
- public class HandleMsg implements Runnable{
- SelectionKey sk;
- ByteBuffer bb;
- public HandleMsg(SelectionKey sk,ByteBuffer bb){
- this.sk = sk;
- this.bb = bb;
- }
- @Override
- public void run() {
- EchoClient ec = (EchoClient) sk.attachment();
- ec.enqueue(bb);// 将收到的数据压入队列, 业务逻辑也可以在这个地方处理了
- sk.interestOps(SelectionKey.OP_READ|SelectionKey.OP_WRITE);
- selector.wakeup();// 强迫 Selector 立即返回
- }
- }
doWrite() 代码如下, 这个方法拿到的 sk 和 doread() 方法拿到的是同一个, 通过这个 sk 可以操作共享的 EchoClient
- private void doWrite(SelectionKey sk) {
- SocketChannel c = (SocketChannel) sk.channel();
- EchoClient ec = (EchoClient) sk.attachment();
- LinkedList<ByteBuffer> outq = ec.getOutq();
- ByteBuffer bb = outq.getLast();// 列表顶部元素, 写回客户端
- try {
- int len = c.write(bb);
- if(len == -1){
- disconnect(sk);
- return;
- }
- if(bb.remaining()== 0){
- outq.removeLast();// 缓冲区已经完成写, 删除它
- }
- } catch (Exception e) {
- System.out.println("Failed to write to client.");
- e.printStackTrace();
- disconnect(sk);
- return;
- }
- if(outq.size()==0){
- sk.interestOps(SelectionKey.OP_READ);
- }
- }
下面用 NIO 设计一个客户端
首先初始化 Selector 和 Channel
- private Selector selector;
- public void init(String ip,int port) throws IOException{
- SocketChannel s = SocketChannel.open();
- s.configureBlocking(false);
- this.selector = SelectorProvider.provider().openSelector();
- s.connect(new InetSocketAddress(ip,port));// 并不定连接成功, 需要 finishConnect() 确认
- s.register(selector, SelectionKey.OP_CONNECT);
- }
程序的工作执行逻辑, 主要两件事, 一个是链接就绪的 Connect, 一个是刻度的 read() 事件:
- public void working() throws IOException{
- while(true){
- if(!this.selector.isOpen()){
- break;
- }
- this.selector.select();
- Iterator<SelectionKey> i = this.selector.selectedKeys().iterator();
- while(i.hasNext()){
- SelectionKey key = i.next();
- i.remove();
- if(key.isConnectable()){
- connect(key);// 判断有没有完成连接, 没有的话使用 finishConnect() 方法完成连接, 并向 Channel 中写入数据及感兴趣的事情
- }else if(key.isReadable()){
- read(key);
- }
- }
- }
- }
下面是 read 事件
- private void read(SelectionKey key) throws IOException {
- SocketChannel c = (SocketChannel) key.channel();
- ByteBuffer buffer = ByteBuffer.allocate(100);
- c.read(buffer);
- byte[] bs = buffer.array();
- String msg = new String(bs).trim();
- System.out.println("客户端收到信息:"+msg);
- c.close();
- key.selector().close();
- }
来源: http://www.bubuko.com/infodetail-3152994.html