Netty 是由 JBOSS 提供的一个 java 开源框架. Netty 提供异步的, 事件驱动的网络应用程序框架和工具, 用以快速开发高性能, 高可靠性的网络服务器和客户端程序. 也就是说, Netty 是一个基于 NIO 的客户, 服务器端编程框架, 使用 Netty 可以确保你快速和简单的开发出一个网络应用, 例如实现了某种协议的客户, 服务端应用. Netty 相当于简化和流线化了网络应用的编程开发过程, 例如: 基于 TCP 和 UDP 的 socket 服务开发."快速" 和 "简单" 并不用产生维护性或性能上的问题. Netty 是一个吸收了多种协议 (包括 FTP,SMTP,HTTP 等各种二进制文本协议) 的实现经验, 并经过相当精心设计的项目. 最终, Netty 成功的找到了一种方式, 在保证易于开发的同时还保证了其应用的性能, 稳定性和伸缩性
本文讲解 SpringBoot 如何使用 Netty 服务端和客户端的简单案例以及相关参数解释
一, Netty 服务端
1, 导入依赖
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter</artifactId>
- </dependency>
- <dependency>
- <groupId>org.projectlombok</groupId>
- <artifactId>lombok</artifactId>
- </dependency>
- <dependency>
- <groupId>io.netty</groupId>
- <artifactId>netty-all</artifactId>
- <version>4.1.36.Final</version>
- </dependency>
2, 编写 Netty 服务端处理器
- /**
- * @author Gjing
- *
- * netty 服务端处理器
- **/
- @Slf4j
- public class NettyServerHandler extends ChannelInboundHandlerAdapter {
- /**
- * 客户端连接会触发
- */
- @Override
- public void channelActive(ChannelHandlerContext ctx) throws Exception {
- log.info("Channel active......");
- }
- /**
- * 客户端发消息会触发
- */
- @Override
- public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
- log.info("服务器收到消息: {}", msg.toString());
- ctx.write("你也好哦");
- ctx.flush();
- }
- /**
- * 发生异常触发
- */
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
- cause.printStackTrace();
- ctx.close();
- }
- }
3, 编写 Netty 服务端初始化器
- /**
- * @author Gjing
- *
- * netty 服务初始化器
- **/
- public class ServerChannelInitializer extends ChannelInitializer<SocketChannel> {
- @Override
- protected void initChannel(SocketChannel socketChannel) throws Exception {
- // 添加编解码
- socketChannel.pipeline().addLast("decoder", new StringDecoder(CharsetUtil.UTF_8));
- socketChannel.pipeline().addLast("encoder", new StringEncoder(CharsetUtil.UTF_8));
- socketChannel.pipeline().addLast(new NettyServerHandler());
- }
- }
4, 编写 Netty 服务启动
- /**
- * @author Gjing
- * <p>
- * 服务启动监听器
- **/
- @Component
- @Slf4j
- public class NettyServer {
- public void start(InetSocketAddress socketAddress) {
- //new 一个主线程组
- EventLoopGroup bossGroup = new NioEventLoopGroup(1);
- //new 一个工作线程组
- EventLoopGroup workGroup = new NioEventLoopGroup(200);
- ServerBootstrap Bootstrap = new ServerBootstrap()
- .group(bossGroup, workGroup)
- .channel(NioServerSocketChannel.class)
- .childHandler(new ServerChannelInitializer())
- .localAddress(socketAddress)
- // 设置队列大小
- .option(ChannelOption.SO_BACKLOG, 1024)
- // 两小时内没有数据的通信时, TCP 会自动发送一个活动探测数据报文
- .childOption(ChannelOption.SO_KEEPALIVE, true);
- // 绑定端口, 开始接收进来的连接
- try {
- ChannelFuture future = Bootstrap.bind(socketAddress).sync();
- log.info("服务器启动开始监听端口: {}", socketAddress.getPort());
- future.channel().closeFuture().sync();
- } catch (InterruptedException e) {
- e.printStackTrace();
- } finally {
- // 关闭主线程组
- bossGroup.shutdownGracefully();
- // 关闭工作线程组
- workGroup.shutdownGracefully();
- }
- }
- }
5, 启动类
- @SpringBootApplication
- public class ServerApplication {
- public static void main(String[] args) {
- SpringApplication.run(ServerApplication.class, args);
- // 启动服务端
- NettyServer nettyServer = new NettyServer();
- nettyServer.start(new InetSocketAddress("127.0.0.1", 8090));
- }
- }
6, 启动结果
二, Netty 客户端
1, 添加依赖
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter</artifactId>
- </dependency>
- <dependency>
- <groupId>org.projectlombok</groupId>
- <artifactId>lombok</artifactId>
- </dependency>
- <dependency>
- <groupId>io.netty</groupId>
- <artifactId>netty-all</artifactId>
- <version>4.1.36.Final</version>
- </dependency>
2, 编写客户端处理器
- /**
- * @author Gjing
- *
- * 客户端处理器
- **/
- @Slf4j
- public class NettyClientHandler extends ChannelInboundHandlerAdapter {
- @Override
- public void channelActive(ChannelHandlerContext ctx) throws Exception {
- log.info("客户端 Active .....");
- }
- @Override
- public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
- log.info("客户端收到消息: {}", msg.toString());
- }
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
- cause.printStackTrace();
- ctx.close();
- }
- }
3, 编写客户端初始化器
- /**
- * @author Gjing
- * 客户端初始化器
- **/
- public class NettyClientInitializer extends ChannelInitializer<SocketChannel> {
- @Override
- protected void initChannel(SocketChannel socketChannel) throws Exception {
- socketChannel.pipeline().addLast("decoder", new StringDecoder());
- socketChannel.pipeline().addLast("encoder", new StringEncoder());
- socketChannel.pipeline().addLast(new NettyClientHandler());
- }
- }
4, 编写客户端
- /**
- * @author Gjing
- **/
- @Component
- @Slf4j
- public class NettyClient {
- public void start() {
- EventLoopGroup group = new NioEventLoopGroup();
- Bootstrap Bootstrap = new Bootstrap()
- .group(group)
- // 该参数的作用就是禁止使用 Nagle 算法, 使用于小数据即时传输
- .option(ChannelOption.TCP_NODELAY, true)
- .channel(NioSocketChannel.class)
- .handler(new NettyClientInitializer());
- try {
- ChannelFuture future = Bootstrap.connect("127.0.0.1", 8090).sync();
- log.info("客户端成功....");
- // 发送消息
- future.channel().writeAndFlush("你好啊");
- // 等待连接被关闭
- future.channel().closeFuture().sync();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }finally {
- group.shutdownGracefully();
- }
- }
- }
5, 启动类
- @SpringBootApplication
- public class ClientApplication {
- public static void main(String[] args) {
- SpringApplication.run(ClientApplication.class, args);
- // 启动 netty 客户端
- NettyClient nettyClient = new NettyClient();
- nettyClient.start();
- }
- }
6, 启动结果
客户端
服务端
三, ChannelOption 参数详解
1,ChannelOption.SO_BACKLOG
ChannelOption.SO_BACKLOG 对应的是 tcp/ip 协议 listen 函数中的 backlog 参数, 函数 listen(int socketfd,int backlog)用来初始化服务端可连接队列, 服务端处理客户端连接请求是顺序处理的, 所以同一时间只能处理一个客户端连接, 多个客户端来的时候, 服务端将不能处理的客户端连接请求放在队列中等待处理, backlog 参数指定了队列的大小
2,ChannelOption.SO_REUSEADDR
ChanneOption.SO_REUSEADDR 对应于套接字选项中的 SO_REUSEADDR, 这个参数表示允许重复使用本地地址和端口, 比如, 某个服务器进程占用了 TCP 的 80 端口进行监听, 此时再次监听该端口就会返回错误, 使用该参数就可以解决问题, 该参数允许共用该端口, 这个在服务器程序中比较常使用, 比如某个进程非正常退出, 该程序占用的端口可能要被占用一段时间才能允许其他进程使用, 而且程序死掉以后, 内核一需要一定的时间才能够释放此端口, 不设置 SO_REUSEADDR 就无法正常使用该端口.
3,ChannelOption.SO_KEEPALIVE
Channeloption.SO_KEEPALIVE 参数对应于套接字选项中的 SO_KEEPALIVE, 该参数用于设置 TCP 连接, 当设置该选项以后, 连接会测试链接的状态, 这个选项用于可能长时间没有数据交流的连接. 当设置该选项以后, 如果在两小时内没有数据的通信时, TCP 会自动发送一个活动探测数据报文
4,ChannelOption.SO_SNDBUF 和 ChannelOption.SO_RCVBUF
ChannelOption.SO_SNDBUF 参数对应于套接字选项中的 SO_SNDBUF,ChannelOption.SO_RCVBUF 参数对应于套接字选项中的 SO_RCVBUF 这两个参数用于操作接收缓冲区和发送缓冲区的大小, 接收缓冲区用于保存网络协议站内收到的数据, 直到应用程序读取成功, 发送缓冲区用于保存发送数据, 直到发送成功.
5,ChannelOption.SO_LINGER
ChannelOption.SO_LINGER 参数对应于套接字选项中的 SO_LINGER,Linux 内核默认的处理方式是当用户调用 close()方法的时候, 函数返回, 在可能的情况下, 尽量发送数据, 不一定保证会发生剩余的数据, 造成了数据的不确定性, 使用 SO_LINGER 可以阻塞 close()的调用时间, 直到数据完全发送
6,ChannelOption.TCP_NODELAY
ChannelOption.TCP_NODELAY 参数对应于套接字选项中的 TCP_NODELAY, 该参数的使用与 Nagle 算法有关, Nagle 算法是将小的数据包组装为更大的帧然后进行发送, 而不是输入一次发送一次, 因此在数据包不足的时候会等待其他数据的到了, 组装成大的数据包进行发送, 虽然该方式有效提高网络的有效负载, 但是却造成了延时, 而该参数的作用就是禁止使用 Nagle 算法, 使用于小数据即时传输, 于 TCP_NODELAY 相对应的是 TCP_CORK, 该选项是需要等到发送的数据量最大的时候, 一次性发送数据, 适用于文件传输.
7,IP_TOS
IP 参数, 设置 IP 头部的 Type-of-Service 字段, 用于描述 IP 包的优先级和 QoS 选项.
8,ALLOW_HALF_CLOSURE
Netty 参数, 一个连接的远端关闭时本地端是否关闭, 默认值为 False. 值为 False 时, 连接自动关闭; 为 True 时, 触发 ChannelInboundHandler 的 userEventTriggered()方法, 事件为 ChannelInputShutdownEvent.
四, Netty 的 future.channel().closeFuture().sync(); 到底有什么用?
主线程执行到这里就 wait 子线程结束, 子线程才是真正监听和接受请求的, closeFuture()是开启了一个 channel 的监听器, 负责监听 channel 是否关闭的状态, 如果监听到 channel 关闭了, 子线程才会释放, syncUninterruptibly()让主线程同步等待子线程结果
本文只讲解了简单的基本使用, 如果需要更深层的使用, 可以前往官网学习, 本 demo 源代码地址: SpringBoot-netty
来源: https://yq.aliyun.com/articles/706893