一 简单概念
RPC: ( Remote Procedure Call), 远程调用过程, 是通过网络调用远程计算机的进程中某个方法, 从而获取到想要的数据, 过程如同调用本地的方法一样.
阻塞 IO : 当阻塞 I/O 在调用 InputStream.read()方法是阻塞的, 一直等到数据到来时才返回, 同样 ServerSocket.accept()方法时, 也是阻塞, 直到有客户端连接才返回, I/O 通信模式如下:
缺点: 当客户端多时, 会创建大量的处理线程, 并且为每一个线程分配一定的资源; 阻塞可能带来频繁切换上下文, 这时引入 NIO
NIO : jdk1.4 引入的(NEW Input/Output), 是基于通过和缓存区的 I/O 方式,(插入一段题外话, 学的多忘得也多, 之前有认真研究过 NIO, 后来用到的时候, 忘得一干二净, 所以学习一些东西, 经常返回看看),NIO 是一种非阻塞的 IO 模型, 通过不断轮询 IO 事件是否就绪, 非阻塞是指线程在等待 IO 的时候, 可以做其他的任务, 同步的核心是 Selector,Selector 代替线程本省的轮询 IO 事件, 避免了阻塞同时减少了不必要的线程消耗; 非阻塞的核心是通道和缓存区, 当 IO 事件的就绪时, 可以将缓存区的数据写入通道
其工作原理:
1 由专门的线程来处理所有的 IO 事件, 并且负责转发
2 事件驱动机制: 事件到的时候才触发, 而不是同步监视
3 线程通讯: 线程之间通讯通过 wait,notify 等方式通讯, 保证每次上下文切换都是有意义的, 减少没必要的线程切换
通道 : 是对原 I/O 包中流的模拟, 所有数据必须通过 Channel 对象, 常见的通道 FileChannel,SocketChannel,ServerSocketChannel,DatagramChannel(UDP 协议向网络连接的两端读写数据)
Buffer 缓存区 : 实际上是一个容器, 一个连续的数组, 任何读写的数据都经过 Buffer
Netty : 是由 JBOSS 提供的一个 java 开源框架, 是一个高性能, 异步事件驱动的 NIO 框架, 基于 JAVA NIO 提供的 API 实现, 他提供了 TCP UDP 和文件传输的支持,, 所有操作都是异步非阻塞的. 通过 Futrue-Listener 机制, 本质就是 Reactor 模式的现实, Selector 作为多路复用器, EventLoop 作为转发器, 而且, netty 对 NIO 中 buffer 做优化, 大大提高了性能
二 Netty 客户端和服务端的
Netty 中 Bootstrap 和 Channel 的生命周期
Bootstrap 简介
Bootstarp: 引导程序, 将 ChannelPipeline,ChannelHandler,EventLoop 进行整体关联
Bootstrap 具体分为两个实现
ServerBootstrap: 用于服务端, 使用一个 ServerChannel 接收客户端的连接, 并创建对应的子 Channel
Bootstrap: 用于客户端, 只需要一个单独的 Channel, 配置整个 Netty 程序, 串联起各个组件
二者的主要区别:
1 ServerBootstrap 用于 Server 端, 通过调用 bind()绑定一个端口监听连接, Bootstrap 用于 Client 端, 需要调用 connect()方法来连接服务器端, 我们也可以调用 bind()方法接收返回 ChannelFuture 中 Channel
2 客户端的 Bootstrap 一般用一个 EventLoopGroup, 而服务器的 ServerBootstrap 会用两个第一个 EventLoopGroup 专门负责绑定到端口监听连接事件, 而第二个 EventLoopGroup 专门用来处处理每个接收的连接, 这样大大提高了并发量
public class Server { public static void main(String[] args) throws Exception { // 1 创建线两个事件循环组 // 一个是用于处理服务器端接收客户端连接的 // 一个是进行网络通信的 (网络读写的) EventLoopGroup pGroup = new NioEventLoopGroup(); EventLoopGroup cGroup = new NioEventLoopGroup(); // 2 创建辅助工具类 ServerBootstrap, 用于服务器通道的一系列配置 ServerBootstrap b = new ServerBootstrap(); b.group(pGroup, cGroup) // 绑定俩个线程组 .channel(NioServerSocketChannel.class) // 指定 NIO 的模式. NioServerSocketChannel 对应 TCP, NioDatagramChannel 对应 UDP .option(ChannelOption.SO_BACKLOG, 1024) // 设置 TCP 缓冲区 .option(ChannelOption.SO_SNDBUF, 32 * 1024) // 设置发送缓冲大小 .option(ChannelOption.SO_RCVBUF, 32 * 1024) // 这是接收缓冲大小 .option(ChannelOption.SO_KEEPALIVE, true) // 保持连接 .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel sc) throws Exception { //SocketChannel 建立连接后的管道 // 3 在这里配置 通信数据的处理逻辑, 可以 addLast 多个... sc.pipeline().addLast(new ServerHandler()); } }); // 4 绑定端口, bind 返回 future(异步), 加上 sync 阻塞在获取连接处 ChannelFuture cf1 = b.bind(8765).sync(); //ChannelFuture cf2 = b.bind(8764).sync(); // 可以绑定多个端口 // 5 等待关闭, 加上 sync 阻塞在关闭请求处 cf1.channel().closeFuture().sync(); //cf2.channel().closeFuture().sync(); pGroup.shutdownGracefully(); cGroup.shutdownGracefully(); } } public class ServerHandler extends ChannelHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("server channel active..."); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf) msg; byte[] req = new byte[buf.readableBytes()]; buf.readBytes(req); String body = new String(req, "utf-8"); System.out.println("Server :" + body ); String response = "返回给客户端的响应:" + body ; ctx.writeAndFlush(Unpooled.copiedBuffer(response.getBytes())); // future 完成后触发监听器, 此处是写完即关闭(短连接). 因此需要关闭连接时, 要通过 server 端关闭. 直接关闭用方法 ctx[.channel()].close() //.addListener(ChannelFutureListener.CLOSE); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { System.out.println("读完了"); ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable t) throws Exception { ctx.close(); } } public class Client { public static void main(String[] args) throws Exception { EventLoopGroup group = new NioEventLoopGroup(); Bootstrap b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel sc) throws Exception { sc.pipeline().addLast(new ClientHandler()); } }); ChannelFuture cf1 = b.connect("127.0.0.1", 8765).sync(); //ChannelFuture cf2 = b.connect("127.0.0.1", 8764).sync(); // 可以使用多个端口 // 发送消息, Buffer 类型. write 需要 flush 才发送, 可用 writeFlush 代替 cf1.channel().writeAndFlush(Unpooled.copiedBuffer("777".getBytes())); cf1.channel().writeAndFlush(Unpooled.copiedBuffer("666".getBytes())); Thread.sleep(2000); cf1.channel().writeAndFlush(Unpooled.copiedBuffer("888".getBytes())); //cf2.channel().writeAndFlush(Unpooled.copiedBuffer("999".getBytes())); cf1.channel().closeFuture().sync(); //cf2.channel().closeFuture().sync(); group.shutdownGracefully(); } } public class ClientHandler extends ChannelHandlerAdapter{ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { try { ByteBuf buf = (ByteBuf) msg; byte[] req = new byte[buf.readableBytes()]; buf.readBytes(req); String body = new String(req, "utf-8"); System.out.println("Client :" + body ); } finally { // 记得释放 xxxHandler 里面的方法的 msg 参数: 写(write) 数据, msg 引用将被自动释放不用手动处理; 但只读数据时,! 必须手动释放引用数 ReferenceCountUtil.release(msg); } } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } }
其他组件:
Handle: 为了支持各种协议和处理数据的方式, 可以是连接, 数据接收, 异常, 数据格式转换等
ChannelHandler
ChannelInboundHandler : 最常用的 Handler, 作用是处理接收数据的事件, 来处理我们的核心业务逻辑.
ChannelInitializer :, 当一个链接建立时, 我们需要知道怎么来接收或者发送数据, 当然, 我们有各种各样的 Handler 实现来处理它, 那么 ChannelInitializer 便是用来配置这些 Handler, 它会提供一个 ChannelPipeline, 并把 Handler 加入到 ChannelPipeline.
ChannelPipeline : 一个 Netty 应用基于 ChannelPipeline 机制, 这种机制依赖 EventLoop 和 EventLoopGroup, 这三个都和事件或者事件处理相关
EventLoop : 为 Channel 处理 IO 操作, 一个 EventLoop 可以为多个 Channel 服务
EventLoopGroup : 包含多个 EventLoop
Channel : 代表一个 Socket 连接
Future : 在 Netty 中所有的 IO 操作都是异步的,, 因此我们不知道, 过来的请求是否被处理了, 所以我们注册一个监听, 当操作执行成功或者失败时监听自动触发, 所有操作都会返回一个 ChannelFutrue
ChannelFuture
Netty 是一个非阻塞的, 事件驱动的, 网络编程框架, 我们通过一张图理解一下, Channel,EventLoop 以及 EventLoopGroup 之间的关系
解释一下, 当一个连接过来, Netty 首先会注册一个 channel, 然后 EventLoopGroup 会分配一个 EventLoop 绑定到这个 channel, 在这个 channel 的整个生命周期过程中, 这个 EventLoop 一直为他服务, 这个玩意就是一个线程
这下聊一下 Netty 如何处理数据?
前面有讲到, handler 数据处理核心,, 而 ChannelPipeline 负责安排 Handler 的顺序和执行, 我们可以这样理解, 数据在 ChannelPipeline 中流动, 其中 ChannelHandler 就是一个个阀门, 这些数据都会经过每一个 ChannelHandler 并且被他处理, 其中 ChannelHandler 的两个子类 ChannelOutboundHandler 和 ChannelInboundHandler, 根据不同的流向, 选择不同的 Handler
由图可以看出, 一个数据流进入 ChannelPipeline 时, 一个一个 handler 挨着执行, 各个 handler 的数据传递, 这需要调用方法中 ChannelHandlerContext 来操作, 而这个 ChannelHandlerContext 可以用来读写 Netty 中的数据流
三 Netty 中的业务处理
netty 中会有很多 Handler. 具体哪一种 Handler 还要看继承是 InboundAdapter 还是 OutboundAdapter,Netty 中提供一系列的 Adapter 来帮助我们简化开发, 在 ChannelPipeline 中的每一个 handler 都负责把 Event 传递个洗下一个 handler, 有这些 adapter, 这些工作可以自动完成,, 我们只需覆盖我们真正实现的部分即可, 接下来比较常用的三种 ChannelHandler
Encoders 和 Decodeers
我们在网络传输只能传输字节流, 在发送数据时, 把我们的 message 转换成 bytes 这个过程叫 Encode(编码), 相反, 接收数据, 需要把 byte 转换成 message, 这个过程叫 Decode(解码)
Domain Logic
我们真正关心的如何处理解码以后的数据, 我们真正的业务逻辑便是接收处理的数据, Netty 提供一个常用的基类就是 SimpleChannelInboundHandler<T>, 其中 T 就是 Handler 处理的数据类型, 消息到达这个 Handler, 会自动调用这个 Handler 中的 channelRead0(ChannelHandlerContext,T)方法, T 就是传过来的数据对象
四 基于 netty 实现的 Rpc 的例子
这是我的 GitHub 上项目的位置
https://github.com/developerxiaofeng/rpcByNetty
项目目录结构如下
详细的项目细节看类中的注释, 很详细哦.
获取资料:
最后给大家分享一些学习资料, 里面包括:(BATJ 面试资料, 高可用, 高并发, 高性能及分布式, Jvm 性能调优, Spring 源码, MyBatis,Netty,Redis,Kafka,MySQL,Zookeeper,Tomcat,Docker,Dubbo,Nginx 等多个知识点的架构资料)和 Java 进阶学习路线图.
领取方式: 加微信号 weixin99ting 备注 :(资料) 即可获取.
最后, 祝大家早日学有所成!
来源: https://www.cnblogs.com/L10086/p/11437592.html