Netty 是一个异步的基于事件驱动的网络框架.
为什么要使用 Netty 而不直接使用 JAVA 中的 NIO
1.Netty 支持三种 IO 模型同时支持三种 Reactor 模式.
2.Netty 支持很多应用层的协议, 提供了很多 decoder 和 encoder.
3.Netty 能够解决 TCP 长连接所带来的缺陷 (粘包, 半包等)
4.Netty 支持应用层的 KeepAlive.
5.Netty 规避了 JAVA NIO 中的很多 BUG, 性能更好.
Netty 启动服务端
1. 创建 ServerBootstrap 服务端启动对象.
2. 配置 bossGroup 和 workerGroup, 其中 bossGroup 负责接收连接, workerGroup 负责处理连接的读写就绪事件.
3. 配置父 Channel, 一般为 NioServerSocketChannel.
4. 配置子 Channel 与 Handler 之间的关系.
5. 给父 Channel 配置参数.
6. 给子 Channel 配置参数.
7. 绑定端口, 启动服务.
- private void start() {
- EventLoopGroup bossGroup = new NioEventLoopGroup();
- EventLoopGroup workerGroup = new NioEventLoopGroup();
- ServerBootstrap serverBootstrap = new ServerBootstrap();
- serverBootstrap
- .group(bossGroup, workerGroup)
- .channel(NioServerSocketChannel.class) // 配置父 Channel
- .childHandler(new ChannelInitializer<SocketChannel>() { // 配置子 Channel 与 Handler 之间的关系
- @Override
- protected void initChannel(SocketChannel socketChannel) {
- // 往 ChannelPipeline 中添加 ChannelHandler
- socketChannel.pipeline().addLast(
- new HttpRequestDecoder(),
- new HttpObjectAggregator(65535),
- new HttpResponseEncoder(),
- new HttpServerHandler()
- );
- }
- })
- .option(ChannelOption.SO_BACKLOG, 128) // 给父 Channel 配置参数
- .childOption(ChannelOption.SO_KEEPALIVE, true); // 给子 Channel 配置参数
- try {
- // 绑定端口, 启动服务
- System.out.println("start server and bind 8888 port ...");
- serverBootstrap.bind(8888).sync();
- } catch (InterruptedException e) {
- bossGroup.shutdownGracefully();
- workerGroup.shutdownGracefully();
- }
- }
Netty 启动客户端
1. 创建 Bootstrap 客户端启动对象.
2. 配置 workerGroup, 负责处理连接的读写就绪事件.
3. 配置父 Channel, 一般为 NioSocketChannel.
4. 给父 Channel 配置参数.
5. 配置父 Channel 与 Handler 之间的关系.
6. 连接服务器.
- private void start() {
- EventLoopGroup workerGroup = new NioEventLoopGroup();
- Bootstrap Bootstrap = new Bootstrap();
- Bootstrap.group(workerGroup)
- .channel(NioSocketChannel.class) // 配置父 Channel
- .option(ChannelOption.SO_KEEPALIVE, true) // 给父 Channel 配置参数
- .handler(new ChannelInitializer<SocketChannel>() { // 配置父 Channel 与 Handler 之间的关系
- @Override
- protected void initChannel(SocketChannel socketChannel) throws Exception {
- socketChannel.pipeline().addLast(new TimeClientHandler());
- }
- });
- try {
- Bootstrap.connect(new InetSocketAddress(8888)).sync(); // 连接服务器
- } catch (InterruptedException e) {
- workerGroup.shutdownGracefully();
- }
- }
ChannelInBoundHandler 接口声明了事件的处理方法
channelActive(): 当建立一个新的 Channel 时调用该方法
handlerAdd(): 当往 Channel 的 ChannelPipeline 中添加 Handler 时调用该方法
handlerRemove(): 当移除 ChannelPipeline 中的 Handler 时调用该方法
channelRead(): 当 Channel 有数据可读时调用该方法
exceptionCaught(): 当在处理事件发生异常时调用该方法
ServerSocketChannel 每接收到一个新的连接时都会建立一个 SocketChannel, 然后调用 ChannelInitializer 的 init 方法初始化 Channel, 方法中配置 Channel 与 Handler 之间的关系, 然后调用 Handler 的 handlerAdd() 和 channelActive() 方法.
关于 ChannelPipeline
ChannelPipeline 底层使用双向链表.
当 Channel 有数据可读时, 会沿着链表从前往后寻找有 IN 性质的 Handler 进行处理.
当 Channel 写入数据时, 会沿着链表从后往前寻找有 OUT 性质的 Handler 进行处理.
关于 write() 和 flush() 方法
graph TB;
S1[Channel 的 write 方法] -- 将数据写入到缓冲区 --> buffer[缓冲区];
S2[Channel 的 flush 方法] -- 发送缓冲区中的数据并清空 --> buffer[缓冲区];
buffer -- 发送 --> S3[SocketChannel];
write(): 将数据写入到缓冲区
flush(): 发送缓冲区中的数据并进行清空
writeAndFlush(): 将数据写入到缓冲区, 同时发送缓冲区中的数据并进行清空
Channel 的 writeAndFlush() 和 flush() 方法会从链表的最后一个节点开始从后往前寻找有 OUT 性质的 Handler 进行处理.
ChannelHandlerContext 的 writeAndFlush() 和 flush() 方法会从当前节点从后往前寻找有 OUT 性质的 Handler 进行处理.
关于写就绪事件
当 SocketChannel 可以写入数据时, 将会触发写就绪事件, 所以一般不能随便监听, 否则将会一直触发.
当 SocketChannel 在写入数据写不进时 (缓冲区已经满了), 向 Selector 传递要监听此 Channel 的写就绪事件, 然后强制发送缓冲区中的数据并进行清空, 此时将会触发写就绪事件, 当 Selector 处理完写就绪事件后, 应当剔除监听此 Channel 的写就绪事件.
为什么说 Netty 中的所有操作都是异步的
Channel 中的所有任务都会放入到其绑定的 EventLoop 的任务队列中, 然后等待被 EventLoop 中的线程处理.
关于 ChannelFuture
由于 Netty 中的所有操作都是异步的, 因此一般会返回 ChannelFuture 对象, 用于存储 Channel 异步执行的结果.
当创建 ChannelFuture 实例时, isDone() 方法返回 false, 仅当 ChannelFuture 被设置成成功或者失败时, isDone() 方法才返回 true.
可以往 ChannelFuture 中添加 ChannelFutureListener, 当任务被执行完毕后由 IO 线程自动调用.
Netty 中的 ByteBuf
ByteBuf 有 readerIndex 和 writerIndex 两个指针, 默认都为 0, 当进行写操作时移动 writerIndex 指针, 读操作时移动 readerIndex 指针.
可读容量 = writerIndex - readerIndex
* 只有 read()/write() 方法才会移动指针, get()/set() 方法不会移动指针.
*ByteBuf 支持动态扩容.
ByteBuf 的创建和管理
使用 ByteBufAllocator 来创建和管理 ByteBuf, 其分别提供 PooledByteBufAllocator 和 UnpooledByteBufAllocator 实现类, 分别代表池化和非池化.
*Netty 同时也提供了 Pooled 和 Unpooled 工具类来创建和管理 ByteBuf.
池化的 ByteBuf(Pooled)
每次使用时都从池中取出一个 ByteBuf 对象, 当使用完毕后再放回到池中.
每个 ByteBuf 都有一个 refCount 属性, 仅当 refCount 属性为 0 时才将 ByteBuf 对象放回到池中.
ByteBuf 的 release() 方法可以使 refCount 属性减 1(一般由最后一个访问 ByteBuf 的 Handler 进行处理)
非池化的 ByteBuf(Unpooled)
每次使用时都创建一个新的 ByteBuf 对象.
使用池化 ByteBuf 的风险
如果每次使用 ByteBuf 后却不进行释放, 那么有可能发生内存泄漏, 对象池中会不停的创建 ByteBuf 对象.
非池化的 ByteBuf 对象能够依赖 JVM 自动进行回收.
关于堆内和堆外的 ByteBuf
池化和非池化的 ByteBufAllocator 中都可以创建堆内和堆外的 ByteBuf 对象.
堆外的 ByteBuf 可以避免在进行 IO 操作时数据从堆内内存复制到操作系统内存的过程, 所以对于 IO 操作来说一般使用堆外的 ByteBuf, 而对于内部业务数据处理来说使用堆内的 ByteBuf.
Netty 支持的 IO 模型
Netty 支持 BIO,NIO,AIO 三种 IO 模型.
* 其中 AIO 模型只在 Netty 的 5.x 版本有提供, 但不建议使用, 因为 Netty 不再维护同时也废除了 5.x 版本, 其原因是在 Linux 中 AIO 比 NIO 强不了多少.
Netty 如何切换 IO 模型
只需要将 EventLoopGroup 和 ServerSocketChannel 换成相应 IO 模型的 API 即可.
Netty 中使用 Reactor 模式
Reactor 单线程模式
EventLoopGroup eventLoopGroup = new NioEventLoopGroup(1);
Reactor 多线程模式
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
* 默认 CPU 核数 x 2 个 EventLoop.
主从 Reactor 多线程模式
- EventLoopGroup bossGroup = new NioEventLoopGroup();
- EventLoopGroup workerGroup = new NioEventLoopGroup();
关于 TCP 的粘包和半包
粘包 (多个数据包被合并成一个进行发送)
- graph LR;
- data1[ABC] --> compact[ABCDEF];
- data2[DEF] --> compact;
compact --send--> net[网络]
半包 (一个数据包被拆分成多个进行发送)
- graph LR;
- data1[ABCDEF] --> part1[ABC];
- data1 --> part2[DEF];
part1 --send--> net1[网络];
part2 --send--> net2[网络];
发生粘包的原因
1. 写入的数据远小于缓冲区的大小, TCP 协议为了性能的考虑, 合并后再进行发送.
发生半包的原因
1. 写入的数据大于缓冲区的大小, 因此必须拆包后再进行传输 (缓冲区已满, 强制 flush)
2. 写入的数据大于协议的 MTU(最大传输单元), 因此必须拆包后再进行传输.
TCP 长连接的缺陷
长连接中可以发送多个请求, 同时 TCP 协议是流式协议, 消息无边界, 所以有一个很棘手的问题, 接收方怎么去知道一个请求中的数据到底是哪里到哪里, 以及一个请求中的数据有可能是粘包后的结果, 同时多个请求中的数据有可能是半包后的结果.
解决方案
1. 使用短连接, 连接开始和连接结束之间的数据就是请求的数据.
2. 使用固定的长度, 每个请求中的数据都使用固定的长度, 接收方以接收到固定长度的数据来确定一个完整的请求数据.
3. 使用指定的分隔符, 每个请求中的数据的末尾都加上一个分隔符, 接收方以分隔符来确定一个完整的请求数据.
4. 使用特定长度的字段去存储请求数据的长度, 接收方根据请求数据的长度来确定一个完整的请求数据.
Netty 对 TCP 长连接缺陷的解决方案
FixedLengthFrameDecoder: 使用固定的长度
DelimiterBasedFrameDecoder: 使用指定的分隔符
LengthFieldBasedFrameDecoder: 使用特定长度的字段去存储请求数据的长度
关于 TCP 的 KeepAlive
正常情况下双方建立连接后是不会断开的, KeepAlive 就是防止连接双方中的任意一方由于意外断开而通知不到对方, 导致对方一直持有连接, 占用资源.
* 建立连接需要三次握手, 正常断开连接需要四次挥手.
KeepAlive 有三个核心参数
net.ipv4.tcp_keepalive_timeout: 连接的超时时间 (默认 7200s)
net.ipv4.tcp_keepalive_intvl: 发送探测包的间隔 (默认 75s)
.NET.ipv4.cp_keepalive_probes: 发送探测包的个数 (默认 9 个)
这三个参数都是系统参数, 会影响部署在机器上的所有应用.
KeepAlive 的开关是在应用层开启的, 只有当应用层开启了 KeepAlive,KeepAlive 才会生效.
java.NET.Socket.setKeepAlive(boolean on);
当连接在指定时间内没有发送请求时, 开启 KeepAlive 的一端就会向对方发送一个探测包, 如果对方没有回应, 则每隔指定时间发送一个探测包, 总共发送指定个探测包, 如果对方都没有回应则认为对方不可用, 断开连接.
为什么要做应用层的 KeepAlive
1.KeepAlive 参数是系统参数, 对于应用来说不够灵活.
2. 默认检测一个不可用的连接所需要的时间太长.
怎么做应用层的 KeepAlive
1. 定时任务
客户端定期向所有已经建立连接的服务端发送心跳检测, 如果服务端连续没有回应指定个心跳检测, 则认为对方不可用, 此时客户端应该重连.
服务端定期向所有已经建立连接的客户端发送心跳检测, 如果客户端连续没有回应指定个心跳检测, 则认为对方不可用, 此时应该断开连接.
2. 计时器
连接在指定时间内没有发送请求则认为对方不可用
Netty 对 KeepAlive 的支持
Netty 开启 KeepAlive
- Bootstrap.option(ChannelOption.SO_KEEPALIVE,true);
- ServerBootstrap.childOption(ChannelOption.SO_KEEPALIVE,true);
Netty 提供的 KeepAlive 机制
Netty 提供的 IdleStateHandler 能够检测处于 Idle 状态的连接.
Idle 状态类型
reader_idle:SocketChannel 在指定时间内都没有数据可读
writer_idle:SocketChannel 在指定时间内没有写入数据
all_idle:SocketChannel 在指定时间内没有数据可读或者没有写入数据
直接将 IdleStateHandler 添加到 ChannelPipeline 即可, 当 Netty 检测到处于 Idle 状态的连接时, 将会自动调用其 Handler 的 userEventTriggered() 方法, 用户只需要在该方法中判断 Idle 状态的类型, 然后做出相应的处理.
关于 HTTP 的 KeepAlive
HTTP 的 KeepAlive 是对长连接和短连接的选择, 并不是保持连接存活的一种机制.
HTTP 是基于请求和响应的, 客户端发送请求给服务端然后等待服务端的响应, 当服务端检测到请求头中包含 Connection:KeepAlive 时, 表示客户端使用长连接, 此时服务端应该保持连接, 当检测到请求头中包含 Connection:close 时, 表示客户端使用短连接, 此时服务端应该主动断开连接.
TCP 并不是基于请求和响应的, 客户端可以发送请求给服务端, 同时服务端也可以发送请求给客户端.
来源: https://www.cnblogs.com/funyoung/p/11978750.html