知识点:
阻塞的概念, 同步异步的区别
Bio 及多路复用
NIO 概要
NIO 之 Buffer(缓冲区)
NIO 之 Channel(通路)
NIO 之 Selector(选择器)
NIO 之 Reactor(反应堆)
基于 NIO 的聊天室实例
学习 NIO 我们先了解前置概念:
1) 阻塞和非阻塞
阻塞和非阻塞是进程在访问数据的时候, 数据是否准备就绪的一种处理方式, 当数据没有准备的时候
阻塞: 往往需要等待缓冲区中的数据准备好过后才处理其他事情, 否则就一直等待. 非阻塞: 当我们的进程范文我们的数据缓冲区的
2) 同步 异步区别
基于应用程序和操作系统处理 IO 事件采取的方式来区分:
异步: 同一时刻可以处理多个 io 读写, 应用程序等待操作系统通知
同步: 同一时间只能处理一条 io 读写, 应用程序直接参与 io 读写
我们接着看下图
简单的说, 必须等待数据接受完毕之后才能处理, 否则一直阻塞, 形象地说就好比一个人去买奶茶, 但是奶茶店前排了很多人的队, 你就在队伍后面排队等待, 期间你啥都做不了, 这就是 bio. 然后我们看下 nio 的多路复用
我们正式开始学习 NIO
一 JAVA NIO 之概念
Java NIO 是 java 1.4, 之后新出的一套 IO 接口 NIO 中的 N 可以理解为 Non-blocking, 有些人会认为是 new, 其实也没错. BIO(Block IO) 和 Nio(Non-Block IO) 的对比
Nio 主要用到的是块, 所以 nio 效率比 io 高.
JavaAPI 中有俩套 nio:
1) 针对标准输入输出 nio
2) 网络编程 nio
Io 以流的形式处理数据, nio 以块的形式处理数据. 面向流的 io 一次处理一个字节, 一个输入流产生了一个字节, 一个输出流就消费一个字节.
面向块的 io, 每个操作都在一步中产生或者消费一个数据块.
它读取数据方式和写数据的方式否必须要通过通道来操作缓冲区实现.
核心组件包括 Channels Buffers Selectors
二 Java NIO 之 Buffer(缓冲区)
1) Buffer 介绍: 缓冲区, 本质就是一个数组, 但是它是特殊的数组, 缓冲区对象内置了一些机制, 能够追踪和记录缓冲区的状态变化情况, 如果我们使用 get 方法从缓冲区中获取数据或者用 put 方法吧数据写入缓冲区, 都会引起缓冲区的状态变化
在缓冲区中, 最重要的属性是如下三个, 他们一起合作完成了对缓冲区内容状态的变化跟踪
1)position: 指定了下一个将要被写入或者读取的元素索引, 它的值由 get()/put() 方法自动更新, 在新创建一个 Buffer 对象时, position 被初始化为 0
2)limit: 操作缓冲区的可操作空间和可操作范围, 指定还有多少数据需要去除, 或者还有多少空间可以放入数据
3)capacity: 指定了可以存储在缓冲区中的最大数据容量, 实际上, 它指定了底层数组的大小, 或者至少是指定了准许我们使用的底层数组的容量.
以上三个属性值之间有一些相对的大小的关系: 0<=position<=limit<=capacity
如果我们创建了一个新的容量为 10 的 bytebuffer 对象, 在初始化的时候. position 设置为 0,limit 和 capacity 被设置为 10, 在以后使用 bytebuffer 对象过程中, capacity 的值不会再发生变化, 而其他俩个值会顺着使用而变化 如下图:
现在我们可以从通道中读取一些数据到缓冲区, 注意从通道读取数据, 相当于往缓冲区中写入数据. 如果读取四个自己的数据, 则此时的 position 为 4, 即下一个将要被写入的字节索引为 4, 而 limit 依旧是 10
下一步把读取的数据写入到输出通道, 相当于从缓冲区读取数据, 在此之前, 必须调用 flip() 方法, 该方法将完成俩件事: 1) 把 limit 设置成 position 值
2) 把 position 值设置为 0
[flip] 需要将缓冲区数据取出来解析, 固定住
取出之后调用 clear 方法 回归到最初的状态.
- package com.Allen.buffer;
- import java.io.FileInputStream;
- import java.io.FileNotFoundException;
- import java.io.IOException;
- import java.nio.ByteBuffer;
- import java.nio.channels.FileChannel;
- public class testBufferDemo01 {
- public static void main(String[] args) throws IOException {
- String fileURL="F://a.txt";
- FileInputStream fis=new FileInputStream(fileURL);
- // 获取通路
- FileChannel channel=fis.getChannel();
- // 定义缓冲区大小
- ByteBuffer buffer=ByteBuffer.allocate(10);
- output("init", buffer);
- // 先读
- channel.read(buffer);
- output("read", buffer);
- buffer.flip();
- output("flip", buffer);
- while (buffer.hasRemaining()) {
- byte b=buffer.get();
- }
- output("get", buffer);
- buffer.clear();
- output("clear", buffer);
- fis.close();
- }
- public static void output(String string,ByteBuffer buffer){
- System.out.println(string);
- System.out.println(buffer.capacity()+":"+buffer.position()+":"+buffer.limit());
- }
- }
结果
三 Java NIO 之 Channel(通路)
通道是个对象, 通过它可以读取和写入数据, 所有的数据都是通过 buffer 对象来处理. 我们永远不会把字节直接写入通道, 相反是吧数据写入包含一个或者多个字节的缓冲区. 同样不会直接读取字节, 而是把数据从通道读入缓冲区, 再从缓冲区获取这个字节, nio 中提供了多种通道对象, 而所有的通道对象都实现了 channel 接口.
使用 nIo 读取数据]
任何时候读取数据, 都不是直接从通道中读取, 而是从通道读取到缓冲区, 所以使用 NIO 读取数据可以分成下面三个步骤
1) 从 FileInputStream 获取 Channel
2) 创建 Buffer
3) 将数据从 Channel 读取到 Buffer 中
下面就是一个 nio 读复制文件的实例
- package com.allen.test;
- import java.io.FileInputStream;
- import java.io.FileNotFoundException;
- import java.io.FileOutputStream;
- import java.io.IOException;
- import java.nio.ByteBuffer;
- import java.nio.channels.FileChannel;
- public class testNio {
- public static void main(String[] args) throws IOException {
- String oldFileUrl="E://1.txt";
- String newFileUrl="E://2.txt";
- FileInputStream fis=new FileInputStream(oldFileUrl);
- FileChannel inChannel=fis.getChannel();
- ByteBuffer bf=ByteBuffer.allocate(1024);
- FileOutputStream fos=new FileOutputStream(newFileUrl);
- FileChannel outChannel=fos.getChannel();
- while(true){
- int eof=inChannel.read(bf);
- if(eof==-1){
- break;
- }else{
- bf.flip();
- outChannel.write(bf);
- bf.clear();
- }
- }
- inChannel.close();
- fis.close();
- outChannel.close();
- fos.close();
- }
- }
四 JAVA NIO 之 Selector(选择器)
Selector 一般称 为选择器 , 当然你也可以翻译为 多路复用器 . 它是 Java NIO 核心组件中的一个, 用于检查一个或多个 NIO Channel(通道) 的状态是否处于可读, 可写. 如此可以实现单线程管理多个 channels, 也就是可以管理多个网络链接. 使用 Selector 的好处在于: 使用更少的线程来就可以来处理通道了, 相比使用多个线程, 避免了线程上下文切换带来的开销.
有了 selector, 可以用一个线程处理所有的 channel. 线程之间的切换对操作系统来说, 代建是很高的, 并且每个线程也会占用一定的系统资源, 所以对于系统而言, 线程越少越好 (但是也不是绝对的, 若 CPU 有多个内核, 不使用多任务是在浪费 CPU 能力)
Selector selector=Selector.open();
注册 channel 到 selector 上
- Channel.configureBlocking(false)
- SelectionKey key=channel.register(selector,SelectionKey.OP_READ)
注册到 server 上的 channel 必须设置成异步模式, 否则异步 io 无法工作, 这就意味着我们不可以把一个 Filechannel 注册到 selector, 因为 filechannel 没有异步模式, 但是 socketchannel 有异步模式
Register 方法的第二个参数, 它是一个 interst set , 意思是注册的 selector 对 channel 中的那些事务感兴趣. 事件分成四种: read write connect accept, 通道触发一个时间指该事件已经 Read, 所有某个 channel 成功连接到另一个服务器称之为 connect ready. 一个 serversocketchanel 准备好接受新的连接称为 connect ready. 一个数据可读的通道可以说 read ready. 等待写数据的通道 write ready.
- Wirte:SelectionKey.OP_WRITE
- Read:SelectionKey.OP_READ
- Accept:SelectionKey.OP_ACCEPT
- Connect:SelectionKey.OP_CONNECT
若是对多个事件感情求, 可以写为 (用 or)
Int interest=SelectionKey.OP_READ|SelectionKey.OP_ACCEPT
SelectionKey 表示通道在 selector 上这个注册, 通过 SelectionKey 可以得到 selector 和注册的 channel.selector 感兴趣的事. 一旦向 selector 注册了一个或者多个通道, 可以调用重载的 select 方法返回你所感兴趣的事件已经准备就绪的通道.
五 JAVA NIO 之 Reactor(反应堆)
阻塞 / IO 通信模型
java 在上图客户端增多的情况下右边的线程会出现不可控的情况.
引入了 pool 的概念,
所以 Nio 是 jdk1.4 开始使用的, 可以说是想新 io, 也可以说是非阻塞 io
以下是 nio 工作原理:
1) 由一个专门的线程去处理所有的 io 事件并且负责分发
2) 事件驱动机制, 时间到的时候触发, 而不是同步地去监听事件
3) 线程通信, 线程之间通过 wait,notify 等方式通信, 保证每次上下文切换都是有意义的, 减少无畏的线程切换.
六实例
服务器
- package com.allen.nio;
- import java.io.IOException;
- import java.NET.InetSocketAddress;
- import java.nio.ByteBuffer;
- import java.nio.channels.Channel;
- import java.nio.channels.SelectionKey;
- import java.nio.channels.Selector;
- import java.nio.channels.ServerSocketChannel;
- import java.nio.channels.SocketChannel;
- import java.nio.charset.Charset;
- import java.util.HashSet;
- import java.util.Iterator;
- import java.util.Set;
- /**
- * 网络多客户端聊天室
- * 功能 1: 客户端通过 Java NIO 连接到服务端, 支持多客户端的连接
- * 功能 2: 客户端初次连接时, 服务端提示输入昵称, 如果昵称已经有人使用, 提示重新输入, 如果昵称唯一, 则登录成功, 之后发送消息都需要按照规定格式带着昵称发送消息
- * 功能 3: 客户端登录后, 发送已经设置好的欢迎信息和在线人数给客户端, 并且通知其他客户端该客户端上线
- * 功能 4: 服务器收到已登录客户端输入内容, 转发至其他登录客户端.
- *
- * TODO 客户端下线检测
- */
- public class NIOServer {
- private int port = 8080;
- private Charset charset = Charset.forName("UTF-8");
- // 用来记录在线人数, 以及昵称
- private static HashSet<String> users = new HashSet<String>();
- private static String USER_EXIST = "系统提示: 该昵称已经存在, 请换一个昵称";
- // 相当于自定义协议格式, 与客户端协商好
- private static String USER_CONTENT_SPILIT = "#@#";
- private Selector selector = null;
- public NIOServer(int port) throws IOException{
- this.port = port;
- // 要想富, 先修路
- // 先把通道打开
- ServerSocketChannel server = ServerSocketChannel.open();
- // 设置高速公路的关卡
- server.bind(new InetSocketAddress(this.port));
- server.configureBlocking(false);
- // 开门迎客, 排队叫号大厅开始工作
- selector = Selector.open();
- // 告诉服务叫号大厅的工作人员, 你可以接待了 (事件)
- server.register(selector, SelectionKey.OP_ACCEPT);
- System.out.println("服务已启动, 监听端口是:" + this.port);
- }
- public void listener() throws IOException{
- // 死循环, 这里不会阻塞
- //CPU 工作频率可控了, 是可控的固定值
- while(true) {
- // 在轮询, 我们服务大厅中, 到底有多少个人正在排队
- int wait = selector.select();
- if(wait == 0) continue; // 如果没有人排队, 进入下一次轮询
- // 取号, 默认给他分配个号码 (排队号码)
- Set<SelectionKey> keys = selector.selectedKeys(); // 可以通过这个方法, 知道可用通道的集合
- Iterator<SelectionKey> iterator = keys.iterator();
- while(iterator.hasNext()) {
- SelectionKey key = (SelectionKey) iterator.next();
- // 处理一个, 号码就要被消除, 打发他走人 (别在服务大厅占着茅坑不拉屎了)
- // 过号不候
- iterator.remove();
- // 处理逻辑
- process(key);
- }
- }
- }
- public void process(SelectionKey key) throws IOException {
- // 判断客户端确定已经进入服务大厅并且已经可以实现交互了
- if(key.isAcceptable()){
- ServerSocketChannel server = (ServerSocketChannel)key.channel();
- SocketChannel client = server.accept();
- // 非阻塞模式
- client.configureBlocking(false);
- // 注册选择器, 并设置为读取模式, 收到一个连接请求, 然后起一个 SocketChannel, 并注册到 selector 上, 之后这个连接的数据, 就由这个 SocketChannel 处理
- client.register(selector, SelectionKey.OP_READ);
- // 将此对应的 channel 设置为准备接受其他客户端请求
- key.interestOps(SelectionKey.OP_ACCEPT);
- // System.out.println("有客户端连接, IP 地址为 :" + sc.getRemoteAddress());
- client.write(charset.encode("请输入你的昵称"));
- }
- // 处理来自客户端的数据读取请求
- if(key.isReadable()){
- // 返回该 SelectionKey 对应的 Channel, 其中有数据需要读取
- SocketChannel client = (SocketChannel)key.channel();
- // 往缓冲区读数据
- ByteBuffer buff = ByteBuffer.allocate(1024);
- StringBuilder content = new StringBuilder();
- try{
- while(client.read(buff)> 0)
- {
- buff.flip();
- content.append(charset.decode(buff));
- }
- // System.out.println("从 IP 地址为:" + sc.getRemoteAddress() + "的获取到消息:" + content);
- // 将此对应的 channel 设置为准备下一次接受数据
- key.interestOps(SelectionKey.OP_READ);
- }catch (IOException io){
- key.cancel();
- if(key.channel() != null)
- {
- key.channel().close();
- }
- }
- if(content.length()> 0) {
- String[] arrayContent = content.toString().split(USER_CONTENT_SPILIT);
- // 注册用户
- if(arrayContent != null && arrayContent.length == 1) {
- String nickName = arrayContent[0];
- if(users.contains(nickName)) {
- client.write(charset.encode(USER_EXIST));
- } else {
- users.add(nickName);
- int onlineCount = onlineCount();
- String message = "欢迎" + nickName + "进入聊天室! 当前在线人数:" + onlineCount;
- broadCast(null, message);
- }
- }
- // 注册完了, 发送消息
- else if(arrayContent != null && arrayContent.length> 1) {
- String nickName = arrayContent[0];
- String message = content.substring(nickName.length() + USER_CONTENT_SPILIT.length());
- message = nickName + "说 :" + message;
- if(users.contains(nickName)) {
- // 不回发给发送此内容的客户端
- broadCast(client, message);
- }
- }
- }
- }
- }
- //TODO 要是能检测下线, 就不用这么统计了
- public int onlineCount() {
- int res = 0;
- for(SelectionKey key : selector.keys()){
- Channel target = key.channel();
- if(target instanceof SocketChannel){
- res++;
- }
- }
- return res;
- }
- public void broadCast(SocketChannel client, String content) throws IOException {
- // 广播数据到所有的 SocketChannel 中
- for(SelectionKey key : selector.keys()) {
- Channel targetchannel = key.channel();
- // 如果 client 不为空, 不回发给发送此内容的客户端
- if(targetchannel instanceof SocketChannel && targetchannel != client) {
- SocketChannel target = (SocketChannel)targetchannel;
- target.write(charset.encode(content));
- }
- }
- }
- public static void main(String[] args) throws IOException {
- new NIOServer(8080).listener();
- }
- }
客户端
- package com.allen.nio;
- import java.io.IOException;
- import java.NET.InetSocketAddress;
- import java.nio.ByteBuffer;
- import java.nio.channels.SelectionKey;
- import java.nio.channels.Selector;
- import java.nio.channels.SocketChannel;
- import java.nio.charset.Charset;
- import java.util.Iterator;
- import java.util.Scanner;
- import java.util.Set;
- public class NIOClient {
- private final InetSocketAddress serverAdrress = new InetSocketAddress("localhost", 8080);
- private Selector selector = null;
- private SocketChannel client = null;
- private String nickName = "";
- private Charset charset = Charset.forName("UTF-8");
- private static String USER_EXIST = "系统提示: 该昵称已经存在, 请换一个昵称";
- private static String USER_CONTENT_SPILIT = "#@#";
- public NIOClient() throws IOException{
- // 不管三七二十一, 先把路修好, 把关卡开放
- // 连接远程主机的 IP 和端口
- client = SocketChannel.open(serverAdrress);
- client.configureBlocking(false);
- // 开门接客
- selector = Selector.open();
- client.register(selector, SelectionKey.OP_READ);
- }
- public void session(){
- // 开辟一个新线程从服务器端读数据
- new Reader().start();
- // 开辟一个新线程往服务器端写数据
- new Writer().start();
- }
- private class Writer extends Thread{
- @Override
- public void run() {
- try{
- // 在主线程中 从键盘读取数据输入到服务器端
- Scanner scan = new Scanner(System.in);
- while(scan.hasNextLine()){
- String line = scan.nextLine();
- if("".equals(line)) continue; // 不允许发空消息
- if("".equals(nickName)) {
- nickName = line;
- line = nickName + USER_CONTENT_SPILIT;
- } else {
- line = nickName + USER_CONTENT_SPILIT + line;
- }
- // client.register(selector, SelectionKey.OP_WRITE);
- client.write(charset.encode(line));//client 既能写也能读, 这边是写
- }
- scan.close();
- }catch(Exception e){
- }
- }
- }
- private class Reader extends Thread {
- public void run() {
- try {
- // 轮询
- while(true) {
- int readyChannels = selector.select();
- if(readyChannels == 0) continue;
- Set<SelectionKey> selectedKeys = selector.selectedKeys(); // 可以通过这个方法, 知道可用通道的集合
- Iterator<SelectionKey> keyIterator = selectedKeys.iterator();
- while(keyIterator.hasNext()) {
- SelectionKey key = (SelectionKey) keyIterator.next();
- keyIterator.remove();
- process(key);
- }
- }
- }
- catch (IOException io){
- }
- }
- private void process(SelectionKey key) throws IOException {
- if(key.isReadable()){
- // 使用 NIO 读取 Channel 中的数据, 这个和全局变量 client 是一样的, 因为只注册了一个 SocketChannel
- //client 既能写也能读, 这边是读
- SocketChannel sc = (SocketChannel)key.channel();
- ByteBuffer buff = ByteBuffer.allocate(1024);
- String content = "";
- while(sc.read(buff)> 0)
- {
- buff.flip();
- content += charset.decode(buff);
- }
- // 若系统发送通知名字已经存在, 则需要换个昵称
- if(USER_EXIST.equals(content)) {
- nickName = "";
- }
- System.out.println(content);
- key.interestOps(SelectionKey.OP_READ);
- }
- }
- }
- public static void main(String[] args) throws IOException
- {
- new NIOClient().session();
- }
- }
来源: https://juejin.im/post/5c21ed1bf265da61171cc11b