通过本章学习, 笔者希望你能掌握 EventLoopGroup 的工作流程, ServerBootstrap 的启动流程, ChannelPipeline 是如何操作管理 Channel. 只有清楚这些, 才能更好的了解和使用 Netty. 还在等什么, 快来学习吧!
知识结构图:
技术: Netty, 拆包粘包, 服务启动流程
说明: 若你对 NIO 有一定的了解, 对于本章知识来说有很大的帮助! NIO 教程
源码: https://github.com/ITDragonBlog/daydayup/tree/master/Netty/netty-stu
Netty 重要组件
这里让你清楚了解 ChannelPipeline,ChannelHandlerContext,ChannelHandler,Channel 四者之间的关系.
这里让你清楚了解 NioEventLoopGroup,NioEventLoop,Channel 三者之间的关系.
这里让你清楚了解 ServerBootstrap,Channel 两者之间的关系.
看懂了这块的理论知识, 后面 Netty 拆包粘包的代码就非常的简单.
Channel
Channel : Netty 最核心的接口. NIO 通讯模式中通过 Channel 进行 Socket 套接字的读, 写和同时读写操作.
ChannelHandler : 因为直接使用 Channel 会比较麻烦, 所以在 Netty 编程中通过 ChannelHandler 间接操作 Channel, 从而简化开发.
ChannelPipeline : 可以理解为一个管理 ChandlerHandler 的链表. 对 Channel 进行操作时, Pipeline 负责从尾部依次调用每一个 Handler 进行处理. 每个 Channel 都有一个属于自己的 ChannelPipeline.
ChannelHandlerContext : ChannelPipeline 通过 ChannelHandlerContext 间接管理每个 ChannelHandler.
如下图所示, 结合代码, 在服务器初始化和客户端创建连接的过程中加了四个 Handler, 分别是日志事务, 字符串分割解码器, 接受参数转字符串解码器, 处理任务的 Handler.
NioEventLoopGroup
EventLoopGroup : 本质是个线程池, 继承了 ScheduledExecutorService 定时任务线程池.
NioEventLoopGroup : 是用来处理 NIO 通信模式的线程池. 每个线程池有 N 个 NioEventLoop 来处理 Channel 事件, 每一个 NioEventLoop 负责处理 N 个 Channel.
NioEventLoop : 负责不停地轮询 IO 事件, 处理 IO 事件和执行任务, 类比多路复用器, 细化分三件事.
1 轮询注册到 Selector 上所有的 Channel 的 IO 事件
2 处理产生网络 IO 事件的 Channel
3 处理队列中的任务
ServerBootstrap
本章重点, Netty 是如何通过 NIO 辅助启动类来初始化 Channel 的? 先看下面的源码.
@Override void init(Channel channel) throws Exception {
final Map < ChannelOption < ?>,
Object > options = options0();
synchronized(options) {
setChannelOptions(channel, options, logger);
}
final Map < AttributeKey < ?>,
Object > attrs = attrs0();
synchronized(attrs) {
for (Entry < AttributeKey < ?>, Object > e: attrs.entrySet()) {@SuppressWarnings("unchecked") AttributeKey < Object > key = (AttributeKey < Object > ) e.getKey();
channel.attr(key).set(e.getValue());
}
}
ChannelPipeline p = channel.pipeline();
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry < ChannelOption < ?>,
Object > [] currentChildOptions;
final Entry < AttributeKey < ?>,
Object > [] currentChildAttrs;
synchronized(childOptions) {
currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
}
synchronized(childAttrs) {
currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
}
p.addLast(new ChannelInitializer < Channel > () {@Override public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {@Override public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
服务器启动和连接过程:
第一步: 是给 Channel 设置 options 和 attrs,
第二步: 复制 childGroup,childHandler,childOptions 和 childAttrs 等待服务器和客户端连接,
第三步: 实例化一个 ChannelInitializer, 添加到 Pipeline 的末尾.
第四步: 当 Channel 注册到 NioEventLoop 时, ChannelInitializer 触发 initChannel 方法, pipeline 装入自定义的 Handler, 给 Channel 设置一下 child 配置.
小结:
1 group,options,attrs,handler, 是在服务器端初始化时配置, 是 AbstractBootstrap 的方法.
2 childGroup,childOption,childAttr,childHandler, 是在服务器与客户端建立 Channel 后配置, 是 ServerBootstrap 的方法.
3 Bootstrap 和 ServerBootstrap 都继承了 AbstractBootstrap 类.
4 若不设置 childGroup, 则默认取 group 值.
5 Bootstrap 和 ServerBootstrap 启动服务时, 都会执行验证方法, 判断必填参数是否都有配置.
Netty 拆包粘包
这里通过介绍 Netty 拆包粘包问题来对 Netty 进行入门学习.
在基于流的传输中, 即便客户端发送独立的数据包, 操作系统也会将其转换成一串字节队列, 而服务端一次读取到的字节数又不确定. 再加上网络传输的快慢. 服务端很难完整的接收到数据.
常见的拆包粘包方法有三种
1 服务端设置一次接收字节的长度. 若服务端接收的字节长度不满足要求则一直处于等待. 客户端为满足传输的字节长度合格, 可以考虑使用空格填充.
2 服务端设置特殊分隔符. 客户端通过特殊分隔符粘包, 服务端通过特殊分隔符拆包.
3 自定义协议. 数据传输一般分消息头和消息体, 消息头中包含了数据的长度. 服务端先接收到消息头, 得知需要接收 N 个数据, 然后服务端接收直到数据为 N 个为止.
本章采用第二种, 用特殊分隔符的方式.
创建服务端代码流程
第一步: 准备两个线程池. 一个用于接收事件的 boss 线程池, 另一个用于处理事件的 worker 线程池.
第二步: 服务端实例化 ServerBootstrap NIO 服务辅助启动类. 用于简化提高开发效率.
第三步: 配置服务器启动参数. 比如 channel 的类型, 接收 channel 的 EventLoop, 初始化的日志打印事件, 建立连接后的事件 (拆包, 对象转字符串, 自定义事件), 初始化的配置和建立连接后的配置.
第四步: 绑定端口, 启动服务. Netty 会根据第三步配置的参数启动服务.
第五步: 关闭资源.
package com.itdragon.delimiter;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.FixedLengthFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
public class ITDragonServer {
private static final Integer PORT = 8888; // 被监听端口号
private static final String DELIMITER = "_$"; // 拆包分隔符
public static void main(String[] args) {
EventLoopGroup bossGroup = new NioEventLoopGroup(); // 用于接收进来的连接
EventLoopGroup workerGroup = new NioEventLoopGroup(); // 用于处理进来的连接
try {
ServerBootstrap serverbootstrap = new ServerBootstrap(); // 启动 NIO 服务的辅助启动类
serverbootstrap.group(bossGroup, workerGroup) // 分别设置 bossGroup, workerGroup 顺序不能反
.channel(NioServerSocketChannel.class) // Channel 的创建工厂, 启动服务时会通过反射的方式来创建一个 NioServerSocketChannel 对象
.handler(new LoggingHandler(LogLevel.INFO)) // handler 在初始化时就会执行, 可以设置打印日志级别
.childHandler(new ChannelInitializer < SocketChannel > () { // childHandler 会在客户端成功 connect 后才执行, 这里实例化 ChannelInitializer
@Override protected void initChannel(SocketChannel socketChannel) throws Exception { // initChannel 方法执行后删除实例 ChannelInitializer, 添加以下内容
ByteBuf delimiter = Unpooled.copiedBuffer(DELIMITER.getBytes()); // 获取特殊分隔符的 ByteBuffer
socketChannel.pipeline().addLast(new DelimiterBasedFrameDecoder(128, delimiter)); // 设置特殊分隔符用于拆包
// socketChannel.pipeline().addLast(new FixedLengthFrameDecoder(8)); 设置指定长度分割
socketChannel.pipeline().addLast(new StringDecoder()); // 设置字符串形式的解码
socketChannel.pipeline().addLast(new ITDragonServerHandler()); // 自定义的服务器处理类, 负责处理事件
}
}).option(ChannelOption.SO_BACKLOG, 128) // option 在初始化时就会执行, 设置 tcp 缓冲区
.childOption(ChannelOption.SO_KEEPALIVE, true); // childOption 会在客户端成功 connect 后才执行, 设置保持连接
ChannelFuture future = serverbootstrap.bind(PORT).sync(); // 绑定端口, 阻塞等待服务器启动完成, 调用 sync() 方法会一直阻塞等待 channel 的停止
future.channel().closeFuture().sync(); // 等待关闭 , 等待服务器套接字关闭
} catch(Exception e) {
e.printStackTrace();
} finally {
workerGroup.shutdownGracefully(); // 关闭线程组, 先打开的后关闭
bossGroup.shutdownGracefully();
}
}
}
核心参数说明
NioEventLoopGroup : 是用来处理 I/O 操作的多线程事件循环器. Netty 提供了许多不同的 EventLoopGroup 的实现用来处理不同传输协议.
ServerBootstrap : 启动 NIO 服务的辅助启动类. 先配置 Netty 服务端启动参数, 执行 bind(PORT) 方法才算真正启动服务.
group : 注册 EventLoopGroup
channel : channelFactory, 用于配置通道的类型.
handler : 服务器始化时就会执行的事件.
childHandler : 服务器在和客户端成功连接后会执行的事件.
initChannel : channelRegistered 事件触发后执行, 删除 ChannelInitializer 实例, 添加该方法体中的 handler.
option : 服务器始化的配置.
childOption : 服务器在和客户端成功连接后的配置.
SocketChannel : 继承了 Channel, 通过 Channel 可以对 Socket 进行各种操作.
ChannelHandler : 通过 ChannelHandler 来间接操纵 Channel, 简化了开发.
ChannelPipeline : 可以看成是一个 ChandlerHandler 的链表.
ChannelHandlerContext : ChannelPipeline 通过 ChannelHandlerContext 来间接管理 ChannelHandler.
自定义服务器处理类
第一步: 继承 ChannelInboundHandlerAdapter, 其父类已经实现了 ChannelHandler 接口, 简化了开发.
第二步: 覆盖 chanelRead() 事件处理方法 , 每当服务器从客户端收到新的数据时, 该方法会在收到消息时被调用.
第三步: 释放 ByteBuffer,ByteBuf 是一个引用计数对象, 这个对象必须显示地调用 release() 方法来释放.
第四步: 异常处理, 即当 Netty 由于 IO 错误或者处理器在处理事件时抛出的异常时触发. 在大部分情况下, 捕获的异常应该被记录下来并且把关联的 channel 给关闭掉.
package com.itdragon.delimiter;
import com.itdragon.utils.ITDragonUtil;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;
public class ITDragonServerHandler extends ChannelInboundHandlerAdapter {
private static final String DELIMITER = "_$"; // 拆包分隔符
@Override public void channelRead(ChannelHandlerContext chc, Object msg) {
try {
// 普通读写数据
/* 设置字符串形式的解码 new StringDecoder() 后可以直接使用
ByteBuf buf = (ByteBuf) msg;
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);
String body = new String(req, "utf-8");
*/
System.out.println("Netty Server :" + msg.toString());
// 分隔符拆包
String response = ITDragonUtil.cal(msg.toString()) + DELIMITER;
chc.channel().writeAndFlush(Unpooled.copiedBuffer(response.getBytes()));
} catch(Exception e) {
e.printStackTrace();
} finally {
ReferenceCountUtil.release(msg); // 写入方法 writeAndFlush ,Netty 已经释放了
}
}
// 当出现 Throwable 对象才会被调用
@Override public void exceptionCaught(ChannelHandlerContext chc, Throwable cause) {
// 这个方法的处理方式会在遇到不同异常的情况下有不同的实现, 比如你可能想在关闭连接之前发送一个错误码的响应消息.
cause.printStackTrace();
chc.close();
}
}
客户端启动流程
第一步: 创建一个用于发送请求的线程池.
第二步: 客户端实例化 Bootstrap NIO 服务启动辅助类, 简化开发.
第三步: 配置参数, 粘包, 发送请求.
第四步: 关闭资源.
值得注意的是, 和 ServerBootstrap 不同, 它并没有 childHandler 和 childOption 方法.
package com.itdragon.delimiter;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.FixedLengthFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
public class ITDragonClient {
private static final Integer PORT = 8888;
private static final String HOST = "127.0.0.1";
private static final String DELIMITER = "_$"; // 拆包分隔符
public static void main(String[] args) {
NioEventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer < SocketChannel > () {@Override protected void initChannel(SocketChannel socketChannel) throws Exception {
ByteBuf delimiter = Unpooled.copiedBuffer(DELIMITER.getBytes());
// 设置特殊分隔符
socketChannel.pipeline().addLast(new DelimiterBasedFrameDecoder(128, delimiter));
// 设置指定长度分割 不推荐, 两者选其一
// socketChannel.pipeline().addLast(new FixedLengthFrameDecoder(8));
socketChannel.pipeline().addLast(new StringDecoder());
socketChannel.pipeline().addLast(new ITDragonClientHandler());
}
}).option(ChannelOption.SO_KEEPALIVE, true);
ChannelFuture future = bootstrap.connect(HOST, PORT).sync(); // 建立连接
future.channel().writeAndFlush(Unpooled.copiedBuffer(("1+1" + DELIMITER).getBytes()));
future.channel().writeAndFlush(Unpooled.copiedBuffer(("6+1" + DELIMITER).getBytes()));
future.channel().closeFuture().sync();
} catch(Exception e) {
e.printStackTrace();
} finally {
group.shutdownGracefully();
}
}
}
客户端请求接收类
和服务器处理类一样, 这里只负责打印数据.
package com.itdragon.delimiter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;
public class ITDragonClientHandler extends ChannelInboundHandlerAdapter {@Override public void channelRead(ChannelHandlerContext chc, Object msg) {
try {
/* 设置字符串形式的解码 new StringDecoder() 后可以直接使用
ByteBuf buf = (ByteBuf) msg;
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);
String body = new String(req, "utf-8");
*/
System.out.println("Netty Client :" + msg);
} catch(Exception e) {
e.printStackTrace();
} finally {
ReferenceCountUtil.release(msg);
}
}
public void exceptionCaught(ChannelHandlerContext chc, Throwable cause) {
cause.printStackTrace();
chc.close();
}
}
打印结果
一月 29, 2018 11:31:10 上午 io.netty.handler.logging.LoggingHandler channelRegistered
信息: [id: 0xcf3a3ac1] REGISTERED
一月 29, 2018 11:31:11 上午 io.netty.handler.logging.LoggingHandler bind
信息: [id: 0xcf3a3ac1] BIND: 0.0.0.0/0.0.0.0:8888
一月 29, 2018 11:31:11 上午 io.netty.handler.logging.LoggingHandler channelActive
信息: [id: 0xcf3a3ac1, L:/0:0:0:0:0:0:0:0:8888] ACTIVE
一月 29, 2018 11:31:18 上午 io.netty.handler.logging.LoggingHandler channelRead
信息: [id: 0xcf3a3ac1, L:/0:0:0:0:0:0:0:0:8888] READ: [id: 0xf1b8096b, L:/127.0.0.1:8888 - R:/127.0.0.1:4777]
一月 29, 2018 11:31:18 上午 io.netty.handler.logging.LoggingHandler channelReadComplete
信息: [id: 0xcf3a3ac1, L:/0:0:0:0:0:0:0:0:8888] READ COMPLETE
Netty Server : 1+1
Netty Server : 6+1
Netty Client :2
Netty Client :7
从日志中可以看出 Channel 的状态从 REGISTERED ---> ACTIVE ---> READ ---> READ COMPLETE. 服务端也是按照特殊分割符拆包.
总结
看完本章, 你必须要掌握的三个知识点: NioEventLoopGroup,ServerBootstrap,ChannelHandlerAdapter
1 NioEventLoopGroup 本质就是一个线程池, 管理多个 NioEventLoop, 一个 NioEventLoop 管理多个 Channel.
2 NioEventLoop 负责不停地轮询 IO 事件, 处理 IO 事件和执行任务.
3 ServerBootstrap 是 NIO 服务的辅助启动类, 先配置服务参数, 后执行 bind 方法启动服务.
4 Bootstrap 是 NIO 客户端的辅助启动类, 用法和 ServerBootstrap 类似.
5 Netty 使用 FixedLengthFrameDecoder 固定长度拆包, DelimiterBasedFrameDecoder 分隔符拆包.
到这里, Netty 的拆包粘包, 以及 Netty 的重要组件, 服务器启动流程到这里就结束了, 如果觉得不错可以点一个 ** "推荐" ** , 也可以 ** "关注" ** 我哦.
优质文章
http://blog.csdn.net/spiderdog/article/category/1800249 https://www.jianshu.com/p/c5068caab217
来源: https://www.cnblogs.com/itdragon/p/8365694.html