1 Reactor 单线程案例代码热热身
如下是单线程的 JAVA NIO 编程模型.
首先服务端创建 ServerSocketChannel 对象, 并注册到 Select 上 OP_ACCEPT 事件, 然后 ServerSocketChannel 负责监听指定端口上的连接请求.
客户端一旦连接上 ServerSocketChannel, 就会触发 Acceptor 来处理 OP_ACCEPT 事件, 并为来自客户端的连接创建 Socket Channel, 并设置为非阻塞模式, 并在其 Selector 上注册 OP_READ 或者 OP_WRITE, 最终实现客户端与服务端的连接建立和数据通道打通.
当客户端向建立的 SocketChannel 发送请求时, 服务端的 Selector 就会监听到 OP_READ 事件, 并触发相应的处理逻辑. 当服务端向客户端写数据时, 会触发服务端 Selector 的 OP_WRITE 事件, 从而执行响应的处理逻辑.
这里有一个明显的问题, 就是所有时间的处理逻辑都是在 Acceptor 单线程完成的, 在并发连接数较小, 数据量较小的场景下, 是没有问题的, 但是......
Selector 允许一个单一的线程来操作多个 Channel. 如果我们的应用程序中使用了多个 Channel, 那么使用 Selector 很方便的实现这样的目的, 但是因为在一个线程中使用了多个 Channel, 因此也会造成了每个 Channel 传输效率的降低.
优化点在于: 通道连接 | 读取或写入 | 业务处理均采用单线程来处理. 通过线程池或者 MessageQueue 共享队列, 进一步优化了高并发的处理要求, 这样就解决了同一时间出现大量 I/O 事件时, 单独的 Select 就可能在分发事件时阻塞(或延时), 而成为瓶颈的问题.
- public class NioEchoServer { private static final int BUF_SIZE = 256; private static final int TIMEOUT = 3000;
- public static void main(String args[]) throws Exception {
- // 打开服务端 Socket
- ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
- // 打开 Selector
- Selector selector = Selector.open();
- // 服务端 Socket 监听 8080 端口, 并配置为非阻塞模式
- serverSocketChannel.socket().bind(new InetSocketAddress(8080));
- serverSocketChannel.configureBlocking(false);
- // 将 channel 注册到 selector 中.
- // 通常我们都是先注册一个 OP_ACCEPT 事件, 然后在 OP_ACCEPT 到来时, 再将这个 Channel 的 OP_READ
- // 注册到 Selector 中.
- serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
- while (true) {
- // 通过调用 select 方法, 阻塞地等待 channel I/O 可操作
- if (selector.select(TIMEOUT) == 0) {
- System.out.print(".");
- continue;
- }
- // 获取 I/O 操作就绪的 SelectionKey, 通过 SelectionKey 可以知道哪些 Channel 的哪类 I/O 操作已经就绪.
- Iterator<SelectionKey> keyIterator = selector.selectedKeys().iterator();
- while (keyIterator.hasNext()) {
- SelectionKey key = keyIterator.next();
- // 当获取一个 SelectionKey 后, 就要将它删除, 表示我们已经对这个 IO 事件进行了处理.
- keyIterator.remove();
- if (key.isAcceptable()) {
- // 当 OP_ACCEPT 事件到来时, 我们就有从 ServerSocketChannel 中获取一个 SocketChannel,
- // 代表客户端的连接
- // 注意, 在 OP_ACCEPT 事件中, 从 key.channel() 返回的 Channel 是 ServerSocketChannel.
- // 而在 OP_WRITE 和 OP_READ 中, 从 key.channel() 返回的是 SocketChannel.
- SocketChannel clientChannel = ((ServerSocketChannel) key.channel()).accept();
- clientChannel.configureBlocking(false);
- // 在 OP_ACCEPT 到来时, 再将这个 Channel 的 OP_READ 注册到 Selector 中.
- // 注意, 这里我们如果没有设置 OP_READ 的话, 即 interest set 仍然是 OP_CONNECT 的话, 那么 select 方法会一直直接返回.
- clientChannel.register(key.selector(), OP_READ, ByteBuffer.allocate(BUF_SIZE));
- }
- if (key.isReadable()) {
- SocketChannel clientChannel = (SocketChannel) key.channel();
- ByteBuffer buf = (ByteBuffer) key.attachment();
- long bytesRead = clientChannel.read(buf);
- if (bytesRead == -1) {
- clientChannel.close();
- } else if (bytesRead> 0) {
- key.interestOps(OP_READ | SelectionKey.OP_WRITE);
- System.out.println("Get data length:" + bytesRead);
- }
- }
- if (key.isValid() && key.isWritable()) {
- ByteBuffer buf = (ByteBuffer) key.attachment();
- buf.flip();
- SocketChannel clientChannel = (SocketChannel) key.channel();
- clientChannel.write(buf);
- if (!buf.hasRemaining()) {
- key.interestOps(OP_READ);
- }
- buf.compact();
- }
- }
- }
- }
}
2 Kafka Reactor 模式设计思路
时间原因, 后续补全
来源: https://juejin.im/post/5c038fd6e51d45100653e146