第一章: 手动搭建 I/O 网络通信框架 1:Socket 和 ServerSocket 入门实战, 实现单聊
第二章: 手动搭建 I/O 网络通信框架 2:BIO 编程模型实现群聊
在第二章中用 BIO 编程模型, 简单的实现了一个聊天室. 但是其最大的问题在解释 BIO 时就已经说了: ServerSocket 接收请求时 (accept() 方法),InputStream,OutputStream(输入输出流的读和写)都是阻塞的. 还有一个问题就是线程池, 线程多了, 服务器性能耗不起. 线程少了, 在聊天室这种场景下, 让用户等待连接肯定不可取. 今天要说到的 NIO 编程模型就很好的解决了这几个问题. 有两个主要的替换地方:
1. 用 Channel 代替 Stream.2. 使用 Selector 监控多条 Channel, 起到类似线程池的作用, 但是它只需一条线程.
既然要用 NIO 编程模型, 那就要说说它的三个主要核心: Selector,Channel,Buffer. 它们的关系是: 一个 Selector 管理多个 Channel, 一个 Channel 可以往 Buffer 中写入和读取数据. Buffer 名叫缓冲区, 底层其实是一个数组, 会提供一些方法往数组写入读取数据.
Buffer:
不太了解 Buffer 的可以看看这个: https://blog.csdn.net/czx2018/article/details/89502699
常用 API:
allocate() - 初始化一块缓冲区
put() - 向缓冲区写入数据
get() - 向缓冲区读数据
filp() - 将缓冲区的读写模式转换
clear() - 这个并不是把缓冲区里的数据清除, 而是利用后来写入的数据来覆盖原来写入的数据, 以达到类似清除了老的数据的效果
compact() - 从读数据切换到写模式, 数据不会被清空, 会将所有未读的数据 copy 到缓冲区头部, 后续写数据不会覆盖, 而是在这些数据之后写数据
mark() - 对 position 做出标记, 配合 reset 使用
reset() - 将 position 置为标记值
简单地说: Buffer 实质上是个数组, 有两个关键的指针, 一个 position 代表当前数据写入到哪了, 一个 limit 代表限制. 初始化时设置了数组长度, 这 limit 就是数组的长度. 如: 设置 intBuffer.allocate(10), 最大存储 10 个 int 数据, 写入 5 五个数据后, 需要读取数据了. 用 filp()转换读写模式后, limit=position,position=0. 也就是说从 0 开始读, 只能读到第五个. 读完后这个缓冲区就需要 clear()了, 实际上并没有真的去清空数据, 而是 position 和 limit 两个指针又回到了初始化的位置, 接着又可以写入数据了, 反正数组下标相同重新写入数据会覆盖, 就没必要真的去清空了.
Channel:
Channel(通道)主要用于传输数据, 然后从 Buffer 中写入或读取. 它们两个结合起来虽然和流有些相似, 但主要有以下几点区别:
1. 流是单向的, 可以发现 Stream 的输入流和输出流是独立的, 它们只能输入或输出. 而通道既可以读也可以写.
2. 通道本身不能存放数据, 只能借助 Buffer.
3.Channel 支持异步.
Channel 有如下三个常用的类: FileChannel,SocketChannel,ServerSocketChannel. 从名字也可以看出区别, 第一个是对文件数据的读写, 后面两个则是针对 Socket 和 ServerSocket, 这里我们只是用后面两个. 更详细的用法可以看: https://www.cnblogs.com/snailclimb/p/9086335.html, 下面的代码中也会用到, 会有详细的注释.
Selector
多个 Channel 可以注册到 Selector, 就可以直接通过一个 Selector 管理多个通道. Channel 在不同的时间或者不同的事件下有不同的状态:
1. 客户端的 SocketChannel 和服务器端建立连接, SocketChannel 状态就是 Connect.
2. 服务器端的 ServerSocketChannel 接收了客户端的请求, ServerSocketChannel 状态就是 Accept.
3. 当 SocketChannel 或 ServerSocketChannel 有数据可读, 那么它们的状态就是 Read.
4. 当可以向 Channel 中写数据时, 那么它们的状态就是 Write.
具体的使用见下面代码注释或看 https://www.cnblogs.com/snailclimb/p/9086334.html
NIO 编程模型
NIO 编程模型工作流程:
1. 首先会创建一个 Selector, 用来监视管理各个不同的 Channel, 也就是不同的客户端. 相当于取代了原来 BIO 的线程池, 但是它只需一个线程就可以处理多个 Channel, 没有了线程上下文切换带来的消耗, 很好的优化了性能.
2. 创建一个 ServerSocketChannel 监听通信端口, 并注册到 Selector, 让 Seletor 监视这个通道的 Accept 状态, 也就是接收客户端请求的状态.
3. 此时客户端 ClientA 请求服务器, 那么 Selector 就知道了有客户端请求进来. 这时候我们可以得到客户端的 SocketChannel, 并为这个通道注册 Read 状态, 也就是 Selector 会监听 ClientA 发来的消息.
4. 一旦接收到 ClientA 的消息, 就会用其他客户端的 SocketChannel 的 Write 状态, 向它们转发 ClientA 的消息.
上代码之前, 还是先说说各个类的作用:
相比较 BIO 的代码, NIO 的代码还少了一个类, 那就是服务器端的工作线程类. 没了线程池, 自然也不需要一个单独的线程去服务客户端. 客户端还是需要一个单独的线程去等待用户输入, 因为用户随时都可能输入信息, 这个没法预见, 只能阻塞式的等待.
ChatServer: 服务器端的唯一的类, 作用就是通过 Selector 监听 Read 和 Accept 事件, 并针对这些事件的类型, 进行不同的处理, 如连接, 转发.
ChatClient: 客户端, 通过 Selector 监听 Read 和 Connect 事件. Read 事件就是获取服务器转发的消息然后显示出来; Connect 事件就是和服务器建立连接, 建立成功后就可以发送消息.
UserInputHandler: 专门等待用户输入的线程, 和 BIO 没区别.
- ChatServer
- public class ChatServer {
- // 设置缓冲区的大小, 这里设置为 1024 个字节
- private static final int BUFFER = 1024;
- //Channel 都要配合缓冲区进行读写, 所以这里创建一个读缓冲区和一个写缓冲区
- //allocate()静态方法就是设置缓存区大小的方法
- private ByteBuffer read_buffer = ByteBuffer.allocate(BUFFER);
- private ByteBuffer write_buffer = ByteBuffer.allocate(BUFFER);
- // 为了监听端口更灵活, 再不写死了, 用一个构造函数设置需要监听的端口号
- private int port;
- public ChatServer(int port) {
- this.port = port;
- }
- private void start() {
- // 创建 ServerSocketChannel 和 Selector 并打开
- try (ServerSocketChannel server = ServerSocketChannel.open(); Selector selector = Selector.open()) {
- //[重点, 实现 NIO 编程模型的关键] configureBlocking 设置 ServerSocketChannel 为非阻塞式调用, Channel 默认的是阻塞的调用方式
- server.configureBlocking(false);
- // 绑定监听端口, 这里不是给 ServerSocketChannel 绑定, 而是给 ServerSocket 绑定, socket()就是获取通道原生的 ServerSocket 或 Socket
- server.socket().bind(new InetSocketAddress(port));
- // 把 server 注册到 Selector 并监听 Accept 事件
- server.register(selector, SelectionKey.OP_ACCEPT);
- System.out.println("启动服务器, 监听端口:" + port);
- while (true) {
- //select()会返回此时触发了多少个 Selector 监听的事件
- if(selector.select()>0) {
- // 获取这些已经触发的事件, selectedKeys()返回的是触发事件的所有信息
- Set<SelectionKey> selectionKeys = selector.selectedKeys();
- // 循环处理这些事件
- for (SelectionKey key : selectionKeys) {
- handles(key, selector);
- }
- // 处理完后清空 selectedKeys, 避免重复处理
- selectionKeys.clear();
- }
- }
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- // 处理事件的方法
- private void handles(SelectionKey key, Selector selector) throws IOException {
- // 当触发了 Accept 事件, 也就是有客户端请求进来
- if (key.isAcceptable()) {
- // 获取 ServerSocketChannel
- ServerSocketChannel server = (ServerSocketChannel) key.channel();
- // 然后通过 accept()方法接收客户端的请求, 这个方法会返回客户端的 SocketChannel, 这一步和原生的 ServerSocket 类似
- SocketChannel client = server.accept();
- client.configureBlocking(false);
- // 把客户端的 SocketChannel 注册到 Selector, 并监听 Read 事件
- client.register(selector, SelectionKey.OP_READ);
- System.out.println("客户端 [" + client.socket().getPort() + "] 上线啦!");
- }
- // 当触发了 Read 事件, 也就是客户端发来了消息
- if (key.isReadable()) {
- SocketChannel client = (SocketChannel) key.channel();
- // 获取消息
- String msg = receive(client);
- System.out.println("客户端[" + client.socket().getPort() + "]:" + msg);
- // 把消息转发给其他客户端
- sendMessage(client, msg, selector);
- // 判断用户是否退出
- if (msg.equals("quit")) {
- // 解除该事件的监听
- key.cancel();
- // 更新 Selector
- selector.wakeup();
- System.out.println("客户端 [" + client.socket().getPort() + "] 下线了!");
- }
- }
- }
- // 编码方式设置为 utf-8, 下面字符和字符串互转时用得到
- private Charset charset = Charset.forName("UTF-8");
- // 接收消息的方法
- private String receive(SocketChannel client) throws IOException {
- // 用缓冲区之前先清空一下, 避免之前的信息残留
- read_buffer.clear();
- // 把通道里的信息读取到缓冲区, 用 while 循环一直读取, 直到读完所有消息. 因为没有明确的类似 \ n 这样的结尾, 所以要一直读到没有字节为止
- while (client.read(read_buffer)> 0) ;
- // 把消息读取到缓冲区后, 需要转换 buffer 的读写状态, 不明白的看看前面的 Buffer 的讲解
- read_buffer.flip();
- return String.valueOf(charset.decode(read_buffer));
- }
- // 转发消息的方法
- private void sendMessage(SocketChannel client, String msg, Selector selector) throws IOException {
- msg = "客户端[" + client.socket().getPort() + "]:" + msg;
- // 获取所有客户端, keys()与前面的 selectedKeys 不同, 这个是获取所有已经注册的信息, 而 selectedKeys 获取的是触发了的事件的信息
- for (SelectionKey key : selector.keys()) {
- // 排除服务器和本客户端并且保证 key 是有效的, isValid()会判断 Selector 监听是否正常, 对应的通道是保持连接的状态等
- if (!(key.channel() instanceof ServerSocketChannel) && !client.equals(key.channel()) && key.isValid()) {
- SocketChannel otherClient = (SocketChannel) key.channel();
- write_buffer.clear();
- write_buffer.put(charset.encode(msg));
- write_buffer.flip();
- // 把消息写入到缓冲区后, 再把缓冲区的内容写到客户端对应的通道中
- while (write_buffer.hasRemaining()) {
- otherClient.write(write_buffer);
- }
- }
- }
- }
- public static void main(String[] args) {
- new ChatServer(8888).start();
- }
- }
- ChatClient
- public class ChatClient {
- private static final int BUFFER = 1024;
- private ByteBuffer read_buffer = ByteBuffer.allocate(BUFFER);
- private ByteBuffer write_buffer = ByteBuffer.allocate(BUFFER);
- // 声明成全局变量是为了方便下面一些工具方法的调用, 就不用 try with resource 了
- private SocketChannel client;
- private Selector selector;
- private Charset charset = Charset.forName("UTF-8");
- private void start() {
- try {
- client=SocketChannel.open();
- selector=Selector.open();
- client.configureBlocking(false);
- // 注册 channel, 并监听 SocketChannel 的 Connect 事件
- client.register(selector, SelectionKey.OP_CONNECT);
- // 请求服务器建立连接
- client.connect(new InetSocketAddress("127.0.0.1", 8888));
- // 和服务器一样, 不停的获取触发事件, 并做相应的处理
- while (true) {
- selector.select();
- Set<SelectionKey> selectionKeys = selector.selectedKeys();
- for (SelectionKey key : selectionKeys) {
- handle(key);
- }
- selectionKeys.clear();
- }
- } catch (IOException e) {
- e.printStackTrace();
- }catch (ClosedSelectorException e){
- // 当用户输入 quit 时, 在 send()方法中, selector 会被关闭, 而在上面的无限 while 循环中, 可能会使用到已经关闭了的 selector.
- // 所以这里捕捉一下异常, 做正常退出处理就行了. 不会对服务器造成影响
- }
- }
- private void handle(SelectionKey key) throws IOException {
- // 当触发 connect 事件, 也就是服务器和客户端建立连接
- if (key.isConnectable()) {
- SocketChannel client = (SocketChannel) key.channel();
- //finishConnect()返回 true, 说明和服务器已经建立连接. 如果是 false, 说明还在连接中, 还没完全连接完成
- if(client.finishConnect()){
- // 新建一个新线程去等待用户输入
- new Thread(new UserInputHandler(this)).start();
- }
- // 连接建立完成后, 注册 read 事件, 开始监听服务器转发的消息
- client.register(selector,SelectionKey.OP_READ);
- }
- // 当触发 read 事件, 也就是获取到服务器的转发消息
- if(key.isReadable()){
- SocketChannel client = (SocketChannel) key.channel();
- // 获取消息
- String msg = receive(client);
- System.out.println(msg);
- // 判断用户是否退出
- if (msg.equals("quit")) {
- // 解除该事件的监听
- key.cancel();
- // 更新 Selector
- selector.wakeup();
- }
- }
- }
- // 获取消息
- private String receive(SocketChannel client) throws IOException{
- read_buffer.clear();
- while (client.read(read_buffer)>0);
- read_buffer.flip();
- return String.valueOf(charset.decode(read_buffer));
- }
- // 发送消息
- public void send(String msg) throws IOException{
- if(!msg.isEmpty()){
- write_buffer.clear();
- write_buffer.put(charset.encode(msg));
- write_buffer.flip();
- while (write_buffer.hasRemaining()){
- client.write(write_buffer);
- }
- if(msg.equals("quit")){
- selector.close();
- }
- }
- }
- public static void main(String[] args) {
- new ChatClient().start();
- }
- }
- UserInputHandler
- public class UserInputHandler implements Runnable {
- ChatClient client;
- public UserInputHandler(ChatClient chatClient) {
- this.client=chatClient;
- }
- @Override
- public void run() {
- BufferedReader read=new BufferedReader(
- new InputStreamReader(System.in)
- );
- while (true){
- try {
- String input=read.readLine();
- client.send(input);
- if(input.equals("quit"))
- break;
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- }
- }
测试运行: 之前用的是 win10 的终端运行的, 以后直接用 IDEA 运行, 方便些. 不过一个类同时运行多个, 以实现多个客户端的场景, 需要先做以下设置
设置完后, 就可以同时运行两个 ChatClient 了, 上图中得 Unnamed 就是第二个 ChatClient, 选中后点击右边运行按钮就行了. 效果如下:
来源: https://www.cnblogs.com/lbhym/p/12698309.html