前提
前置文章:
《基于 Netty 和 SpringBoot 实现一个轻量级 RPC 框架 - 协议篇》
《基于 Netty 和 SpringBoot 实现一个轻量级 RPC 框架 - Server 篇》
前一篇文章相对简略地介绍了 RPC 服务端的编写, 而这篇博文最要介绍服务端 (Client) 的实现. RPC 调用一般是面向契约编程的, 而 Client 的核心功能就是: 把契约接口方法的调用抽象为使用 Netty 向 RPC 服务端通过私有协议发送一个请求. 这里最底层的实现依赖于动态代理, 因此动态代理是动态实现接口的最简单方式(如果字节码研究得比较深入, 可以通过字节码编程实现接口). 需要的依赖如下:
- JDK1.8+
- Netty:4.1.44.Final
- SpringBoot:2.2.2.RELEASE
动态代理的简单使用
一般可以通过 JDK 动态代理或者 Cglib 的字节码增强来实现此功能, 为了简单起见, 不引入额外的依赖, 这里选用 JDK 动态代理. 这里重新搬出前面提到的契约接口 HelloService:
- public interface HelloService {
- String sayHello(String name);
- }
接下来需要通过动态代理为此接口添加一个实现:
- public class TestDynamicProxy {
- public static void main(String[] args) throws Exception {
- Class<HelloService> interfaceKlass = HelloService.class;
- InvocationHandler handler = new HelloServiceImpl(interfaceKlass);
- HelloService helloService = (HelloService)
- Proxy.newProxyInstance(interfaceKlass.getClassLoader(), new Class[]{interfaceKlass}, handler);
- System.out.println(helloService.sayHello("throwable"));
- }
- @RequiredArgsConstructor
- private static class HelloServiceImpl implements InvocationHandler {
- private final Class<?> interfaceKlass;
- @Override
- public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
- // 这里应该根据方法的返回值类型去决定返回结果
- return String.format("[%s#%s]方法被调用, 参数列表:%s", interfaceKlass.getName(), method.getName(),
- JSON.toJSONString(args));
- }
- }
- }
- // 控制台输出结果
- [club.throwable.contract.HelloService#sayHello]方法被调用, 参数列表:["throwable"]
这里可以确认两点:
InvocationHandler 实现后会对被代理接口生成一个动态实现类.
动态实现类 (接口) 方法被调用的时候, 实际上是调用 InvocationHandler 对应实例的 invoke()方法, 传入的参数就是当前方法调用的元数据.
Client 端代码实现
Client 端需要通过动态代理为契约接口生成一个动态实现类, 然后提取契约接口调用方法时候所能提供的元数据, 通过这些元数据和 Netty 客户端的支持 (例如 Netty 的 Channel) 基于私有 RPC 协议组装请求信息并且发送请求. 这里先定义一个请求参数提取器接口 RequestArgumentExtractor:
- @Data
- public class RequestArgumentExtractInput {
- private Class<?> interfaceKlass;
- private Method method;
- }
- @Data
- public class RequestArgumentExtractOutput {
- private String interfaceName;
- private String methodName;
- private List<String> methodArgumentSignatures;
- }
- // 接口
- public interface RequestArgumentExtractor {
- RequestArgumentExtractOutput extract(RequestArgumentExtractInput input);
- }
简单实现一下, 解析结果添加到缓存中, 实现类 DefaultRequestArgumentExtractor 代码如下:
- public class DefaultRequestArgumentExtractor implements RequestArgumentExtractor {
- private final ConcurrentMap<CacheKey, RequestArgumentExtractOutput> cache = Maps.newConcurrentMap();
- @Override
- public RequestArgumentExtractOutput extract(RequestArgumentExtractInput input) {
- Class<?> interfaceKlass = input.getInterfaceKlass();
- Method method = input.getMethod();
- String methodName = method.getName();
- Class<?>[] parameterTypes = method.getParameterTypes();
- return cache.computeIfAbsent(new CacheKey(interfaceKlass.getName(), methodName,
- Lists.newArrayList(parameterTypes)), x -> {
- RequestArgumentExtractOutput output = new RequestArgumentExtractOutput();
- output.setInterfaceName(interfaceKlass.getName());
- List<String> methodArgumentSignatures = Lists.newArrayList();
- for (Class<?> klass : parameterTypes) {
- methodArgumentSignatures.add(klass.getName());
- }
- output.setMethodArgumentSignatures(methodArgumentSignatures);
- output.setMethodName(methodName);
- return output;
- });
- }
- @RequiredArgsConstructor
- private static class CacheKey {
- private final String interfaceName;
- private final String methodName;
- private final List<Class<?>> parameterTypes;
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
- CacheKey cacheKey = (CacheKey) o;
- return Objects.equals(interfaceName, cacheKey.interfaceName) &&
- Objects.equals(methodName, cacheKey.methodName) &&
- Objects.equals(parameterTypes, cacheKey.parameterTypes);
- }
- @Override
- public int hashCode() {
- return Objects.hash(interfaceName, methodName, parameterTypes);
- }
- }
- }
在不考虑重连, 断连等情况下, 新增一个类 ClientChannelHolder 用于保存 Netty 客户端的 Channel 实例:
- public class ClientChannelHolder {
- public static final AtomicReference<Channel> CHANNEL_REFERENCE = new AtomicReference<>();
- }
接着新增一个契约动态代理工厂(工具类)ContractProxyFactory, 用于为契约接口生成代理类实例:
- public class ContractProxyFactory {
- private static final RequestArgumentExtractor EXTRACTOR = new DefaultRequestArgumentExtractor();
- private static final ConcurrentMap<Class<?>, Object> CACHE = Maps.newConcurrentMap();
- @SuppressWarnings("unchecked")
- public static <T> T ofProxy(Class<T> interfaceKlass) {
- // 缓存契约接口的代理类实例
- return (T) CACHE.computeIfAbsent(interfaceKlass, x ->
- Proxy.newProxyInstance(interfaceKlass.getClassLoader(), new Class[]{interfaceKlass}, (target, method, args) -> {
- RequestArgumentExtractInput input = new RequestArgumentExtractInput();
- input.setInterfaceKlass(interfaceKlass);
- input.setMethod(method);
- RequestArgumentExtractOutput output = EXTRACTOR.extract(input);
- // 封装请求参数
- RequestMessagePacket packet = new RequestMessagePacket();
- packet.setMagicNumber(ProtocolConstant.MAGIC_NUMBER);
- packet.setVersion(ProtocolConstant.VERSION);
- packet.setSerialNumber(SerialNumberUtils.X.generateSerialNumber());
- packet.setMessageType(MessageType.REQUEST);
- packet.setInterfaceName(output.getInterfaceName());
- packet.setMethodName(output.getMethodName());
- packet.setMethodArgumentSignatures(output.getMethodArgumentSignatures().toArray(new String[0]));
- packet.setMethodArguments(args);
- Channel channel = ClientChannelHolder.CHANNEL_REFERENCE.get();
- // 发起请求
- channel.writeAndFlush(packet);
- // 这里方法返回值需要进行同步处理, 相对复杂, 后面专门开一篇文章讲解, 暂时统一返回字符串
- // 如果契约接口的返回值类型不是字符串, 这里方法返回后会抛出异常
- return String.format("[%s#%s]调用成功, 发送了 [%s] 到 NettyServer[%s]", output.getInterfaceName(),
- output.getMethodName(), JSON.toJSONString(packet), channel.remoteAddress());
- }));
- }
- }
最后编写客户端 ClientApplication 的代码:
- @Slf4j
- public class ClientApplication {
- public static void main(String[] args) throws Exception {
- int port = 9092;
- EventLoopGroup workerGroup = new NioEventLoopGroup();
- Bootstrap Bootstrap = new Bootstrap();
- try {
- Bootstrap.group(workerGroup);
- Bootstrap.channel(NioSocketChannel.class);
- Bootstrap.option(ChannelOption.SO_KEEPALIVE, Boolean.TRUE);
- Bootstrap.option(ChannelOption.TCP_NODELAY, Boolean.TRUE);
- Bootstrap.handler(new ChannelInitializer<SocketChannel>() {
- @Override
- protected void initChannel(SocketChannel ch) throws Exception {
- ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4));
- ch.pipeline().addLast(new LengthFieldPrepender(4));
- ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
- ch.pipeline().addLast(new RequestMessagePacketEncoder(FastJsonSerializer.X));
- ch.pipeline().addLast(new ResponseMessagePacketDecoder());
- ch.pipeline().addLast(new SimpleChannelInboundHandler<ResponseMessagePacket>() {
- @Override
- protected void channelRead0(ChannelHandlerContext ctx, ResponseMessagePacket packet) throws Exception {
- Object targetPayload = packet.getPayload();
- if (targetPayload instanceof ByteBuf) {
- ByteBuf byteBuf = (ByteBuf) targetPayload;
- int readableByteLength = byteBuf.readableBytes();
- byte[] bytes = new byte[readableByteLength];
- byteBuf.readBytes(bytes);
- targetPayload = FastJsonSerializer.X.decode(bytes, String.class);
- byteBuf.release();
- }
- packet.setPayload(targetPayload);
- log.info("接收到来自服务端的响应消息, 消息内容:{}", JSON.toJSONString(packet));
- }
- });
- }
- });
- ChannelFuture future = Bootstrap.connect("localhost", port).sync();
- // 保存 Channel 实例, 暂时不考虑断连重连
- ClientChannelHolder.CHANNEL_REFERENCE.set(future.channel());
- // 构造契约接口代理类实例
- HelloService helloService = ContractProxyFactory.ofProxy(HelloService.class);
- String result = helloService.sayHello("throwable");
- log.info(result);
- future.channel().closeFuture().sync();
- } finally {
- workerGroup.shutdownGracefully();
- }
- }
- }
先启动《基于 Netty 和 SpringBoot 实现一个轻量级 RPC 框架 - Server 篇》一文中的 ServerApplication, 再启动 ClientApplication, 控制台输出如下:
// 服务端日志
2020-01-16 22:34:51 [main] INFO c.throwable.server.ServerApplication - 启动 NettyServer[9092]成功...
2020-01-16 22:36:35 [nioEventLoopGroup-3-1] INFO club.throwable.server.ServerHandler - 服务端接收到: RequestMessagePacket(interfaceName=club.throwable.contract.HelloService, methodName=sayHello, methodArgumentSignatures=[java.lang.String], methodArguments=[PooledUnsafeDirectByteBuf(ridx: 0, widx: 11, cap: 11/144)])
2020-01-16 22:36:35 [nioEventLoopGroup-3-1] INFO club.throwable.server.ServerHandler - 查找目标实现方法成功, 目标类: club.throwable.server.contract.DefaultHelloService, 宿主类: club.throwable.server.contract.DefaultHelloService, 宿主方法: sayHello
2020-01-16 22:36:35 [nioEventLoopGroup-3-1] INFO club.throwable.server.ServerHandler - 服务端输出:{"attachments":{},"errorCode":200,"magicNumber":10086,"message":"Success","messageType":"RESPONSE","payload":"\"throwable say hello!\"","serialNumber":"63d386214d30410c9e5f04de03d8b2da","version":1}
- // 客户端日志
- 2020-01-16 22:36:35 [main] INFO c.throwable.client.ClientApplication - [club.throwable.contract.HelloService#sayHello]调用成功, 发送了 [{
- "attachments":{
- },"interfaceName":"club.throwable.contract.HelloService","magicNumber":10086,"messageType":"REQUEST","methodArgumentSignatures":["java.lang.String"],"methodArguments":["throwable"],"methodName":"sayHello","serialNumber":"63d386214d30410c9e5f04de03d8b2da","version":1
- }] 到 NettyServer[localhost/127.0.0.1:9092]
2020-01-16 22:36:35 [nioEventLoopGroup-2-1] INFO c.throwable.client.ClientApplication - 接收到来自服务端的响应消息, 消息内容:{"attachments":{},"errorCode":200,"magicNumber":10086,"message":"Success","messageType":"RESPONSE","payload":"\"throwable say hello!\"","serialNumber":"63d386214d30410c9e5f04de03d8b2da","version":1}
小结
Client 端主要负责契约接口调用转换为发送 RPC 协议请求这一步, 核心技术就是动态代理, 在不进行模块封装优化的前提下实现是相对简单的. 这里其实 Client 端还有一个比较大的技术难题没有解决, 上面例子中客户端日志输出如果眼尖的伙伴会发现, Client 端发送 RPC 请求的线程 (main 线程) 和 Client 端接收 Server 端 RPC 响应处理的线程 (nioEventLoopGroup-2-1 线程) 并不相同, 这一点是 Netty 处理网络请求之所以能够如此高效的根源(简单来说就是请求和响应是异步的, 两个流程本来是互不感知的). 但是更多情况下, 我们希望外部请求是同步的, 希望发送 RPC 请求的线程得到响应结果再返回(这里请求和响应有可能依然是异步流程). 下一篇文章会详细分析一下如果对请求 - 响应做同步化处理.
Demo 项目地址:
(c-2-d e-a-20200116)
来源: https://www.cnblogs.com/throwable/p/12203684.html