TCP 是一个流协议,即 TCP 的数据时没有界限的一串数据。而这样的数据方式必然会导致数据粘包。为了解析 TCP 数据,我们相对应的也要对数据进行拆包。 粘包的原因: 1. 应用程序 write 的字节大于套接口发送缓冲区大小; 2. 进行 MSS 大小的 TCP 分段; 3. 以太网帧的 payload 大于 MTU 进行 IP 分片;
服务器端: TimeServer
- public class TimeServer{
- public voidbind(intport) throws Exception {
- EventLoopGroup bossGroup =newNioEventLoopGroup();
- EventLoopGroup workerGroup =newNioEventLoopGroup();
- ServerBootstrap bootstrap =newServerBootstrap();try{
- bootstrap.group(bossGroup, workerGroup)
- .channel(NioServerSocketChannel.class)
- .option(ChannelOption.SO_BACKLOG,1024)
- .childHandler(newChildChannelHandler());//绑定端口, 同步等待成功;ChannelFuture future = bootstrap.bind(port).sync();//等待服务端监听端口关闭future.channel().closeFuture().sync();
- }finally{//优雅关闭 线程组bossGroup.shutdownGracefully();
- workerGroup.shutdownGracefully();
- }
- }private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {@Overrideprotected voidinitChannel(SocketChannel ch) throws Exception {// ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
- // ch.pipeline().addLast(new StringDecoder());ch.pipeline().addLast("timeServerHandler",newTimeServerHandler());
- }
- }public static voidmain(String[] args) throws Exception {intport =443;newTimeServer().bind(port);
- }
- }
TimeServerHandler
- public class TimeServerHandler extends ChannelInboundHandlerAdapter{
- private intcount;@Override
- public void channelRead(ChannelHandlerContext ctx, Object msg)throwsException {
- System.out.println("Server start read");
- ByteBuf buf = (ByteBuf) msg;byte[] req =new byte[buf.readableBytes()];
- buf.readBytes(req);
- String body =newString(req,"UTF-8").substring(0, req.length - System.getProperty("line.separator").length());// String body = (String) msg;System.out.println("The time server receive order : "+ body +"; the count is : "+ ++count);
- String currentTime ="Query Time Order".equalsIgnoreCase(body)newjava.util.Date(
- System.currentTimeMillis()).toString() :"Bad Order";
- currentTime = currentTime + System.getProperty("line.separator");
- ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());// ctx.writeAndFlush(resp);ctx.write(resp);
- }@Override
- public void channelReadComplete(ChannelHandlerContext ctx)throwsException {
- ctx.flush();
- }@Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)throwsException {
- ctx.close();
- }
- }
客户端: TimeClient
- public classTimeClient {public void connect(intport, String host) throws Exception{//配置客户端NIO 线程组EventLoopGroupgroup=newNioEventLoopGroup();
- Bootstrap client =newBootstrap();try{
- client.group(group)
- .channel(NioSocketChannel.class)
- .option(ChannelOption.TCP_NODELAY,true)
- .handler(newChannelInitializer() {
- @Override
- protected void initChannel(SocketChannel ch) throws Exception {// ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
- // ch.pipeline().addLast(new StringDecoder());ch.pipeline().addLast("timeServerHandler",newTimeClientHandler());
- }
- });//绑定端口, 异步连接操作ChannelFuture future = client.connect(host, port).sync();//等待客户端连接端口关闭future.channel().closeFuture().sync();
- }finally{//优雅关闭 线程组
- group.shutdownGracefully();
- }
- }public static void main(String[] args) {intport =443;
- TimeClient client =newTimeClient();try{
- client.connect(port,"127.0.0.1");
- }catch(Exception e) {
- e.printStackTrace();
- }
- }
- }
TimeClientHandler
- public class TimeClientHandler extends ChannelInboundHandlerAdapter{
- private intcount;private byte[] req;public TimeClientHandler() {
- req = ("QUERY TIME ORDER"+ System.getProperty("line.separator"))
- .getBytes();
- }@Override
- public void channelActive(ChannelHandlerContext ctx)throwsException {
- ByteBuf message =null;for(inti =0; i <100; i++) {
- message = Unpooled.buffer(req.length);
- message.writeBytes(req);
- ctx.writeAndFlush(message);
- }
- }@Override
- public void channelRead(ChannelHandlerContext ctx, Object msg)throwsException {
- ByteBuf buf = (ByteBuf) msg;byte[] req =new byte[buf.readableBytes()];
- buf.readBytes(req);
- String body =newString(req,"UTF-8");// String body = (String) msg;System.out.println("NOW is: "+ body +"; the counter is "+ ++count);
- }@Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)throwsException {
- ctx.close();
- }
- }
服务端输出:
- Serverstart readThetimeserver receiveorder: QUERYTIME ORDERQUERYTIME ORDER。。。
- QUERYTIME ORDERQUERYTIMEORD;the count is : 1
- Serverstart readThetimeserver receiveorder:
- QUERYTIME ORDER。。。
- QUERYTIME ORDERQUERYTIME ORDER;the count is : 2
客户端输出:
- NOWis: BadOrderBadOrder; the counteris 1
很明显,由于粘包拆包导致半包读写问题,致使得到的结果不是目标结果。Netty 提供多种编码器用于处理半包问题,接下来使用 LineBasedFrameDecoder 来解决半包读写问题。
服务端: TimeServer 类需要修改 initChannel 方法,加入 LineBasedFrameDecoder 解码器,修改后方法如下:
- protected void initChannel(SocketChannel ch)throwsException {
- ch.pipeline().addLast(newLineBasedFrameDecoder(1024));
- ch.pipeline().addLast(newStringDecoder());
- ch.pipeline().addLast("timeServerHandler",newTimeServerHandler());
- }
TimeServerHandler 类需要修改 channelRead 方法,由于使用解码器之后,获取的 msg 已经解码成字符串了,具体代码如下:
- public void channelRead(ChannelHandlerContext ctx, Object msg)
- throws Exception {
- // System.out.println("Server start read");// ByteBuf buf = (ByteBuf) msg;// byte[] req = new byte[buf.readableBytes()];// buf.readBytes(req);// String body = new String(req,"UTF-8").substring(0, req.length- System.getProperty("line.separator").length());String body = (String) msg;System.out.println("The time server receive order : "+ body +"; the count is : "+ ++count);String currentTime ="Query Time Order".equalsIgnoreCase(body) ? new java.util.Date(
- System.currentTimeMillis()).toString() :"Bad Order";currentTime = currentTime + System.getProperty("line.separator");ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());ctx.writeAndFlush(resp);// ctx.write(resp);}
客户端代码: TimeClient 与服务端对应修改 initChannel 方法,加入 LineBasedFrameDecoder 解码器,修改后代码如下:
- protected void initChannel(SocketChannel ch)throwsException {
- ch.pipeline().addLast(newLineBasedFrameDecoder(1024));
- ch.pipeline().addLast(newStringDecoder());
- ch.pipeline().addLast("timeServerHandler",newTimeClientHandler());
- }
TimeClientHandler 类需要修改 channelRead 方法,修改如下:
- public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {// ByteBuf buf = (ByteBuf)msg;
- // byte[] req = new byte[buf.readableBytes()];
- // buf.readBytes(req);
- // String body = new String(req, "UTF-8");String body = (String) msg;
- System.out.println("NOW is: "+ body +"; the counter is "+ ++count);
- }
运行之后,服务端输出为:
- Thetime serverreceive order : QUERYTIMEORDER; the countis:1Thetime serverreceive order : QUERYTIMEORDER; the countis:2Thetime serverreceive order : QUERYTIMEORDER; the countis:3Thetime serverreceive order : QUERYTIMEORDER; the countis:4Thetime serverreceive order : QUERYTIMEORDER; the countis:5Thetime serverreceive order : QUERYTIMEORDER; the countis:6Thetime serverreceive order : QUERYTIMEORDER; the countis:7。。。。。。
- Thetime serverreceive order : QUERYTIMEORDER; the countis:98Thetime serverreceive order : QUERYTIMEORDER; the countis:99Thetime serverreceive order : QUERYTIMEORDER; the countis:100
客户端输出为:
- NOW is: Thu Jun08 17:47:06CST2017; the counteris 1
- NOW is: Thu Jun08 17:47:06CST2017; the counteris 2
- NOW is: Thu Jun08 17:47:06CST2017; the counteris 3。。。。。。NOW is: Thu Jun08 17:47:06CST2017; the counteris 94
- NOW is: Thu Jun08 17:47:06CST2017; the counteris 95
- NOW is: Thu Jun08 17:47:06CST2017; the counteris 96
- NOW is: Thu Jun08 17:47:06CST2017; the counteris 97
- NOW is: Thu Jun08 17:47:06CST2017; the counteris 98
- NOW is: Thu Jun08 17:47:06CST2017; the counteris 99
- NOW is: Thu Jun08 17:47:06CST2017; the counteris 100
程序运行结果完全符合预期,说明使用 LineBasedFrameDecoder 和 StringDecoder 可以解决 TCP 粘包导致的读半包问题,不需要写额外代码,使用起来比较方便。
LineBasedFrameDecoder 是依次遍历 ByteBuf 中的可读字节,判断看是否有 "\n" 或者 "\r\n",如果有,则此位置为结束位置,从可读索引到结束位置区间的字节组成一行。它是以换行符为结束标志的解码器。 StringDecoder 的功能非常简单,就是接收到的对象转换为字符串,然后调用后面的 Handler。 使用 LineBasedFrameDecoder+StringDecoder 组合就是按照行切换的文本解码器,它被设计用来支持 TCP 的粘包和拆包!!
来源: http://blog.csdn.net/wangshuang1631/article/details/72931415