smart-socket 实现 RPC
RPC 是目前被广泛应用于互联网服务的一项技术, 关于它的基本介绍大家可通过百度了解一下, 此处不再赘述. 正所谓读万卷书不如行万里路, 原理性的文章看的再多都不如亲自实现一遍 RPC, 方可对其了解的更加透彻. 本文将以纯技术视角, 为大家演示一下 RPC 的工作原理与实现方案.
正式开始之前, 先罗列一下实现 RPC 需要运用到的技术点:
通信
序列化 / 反序列化
反射
动态代理
在具体实现上除了通信部分我们选用 smart-socket 来辅助, 其余包括序列化 / 反序列化, 反射, 动态代理等部分我们将采用 JDK 提供的解决方案, 待您掌握 RPC 后可再尝试结合第三方技术来重构 RPC.
名词解释
Provider
RPC 服务提供者
Consumer
RPC 服务调用者
消息通信
既然 RPC 是跨网络通信服务, 那我们先制定通信规则, 该部分的内容涉及到通信, 序列化 / 反序列化技术.
通信协议
通信协议我们采用最简单的 length+data 模式, 编解码的实现算法如下. 作为示例我们假设 readBuffer 足够容纳一个完整的消息, 协议中的 data 部分便是 RPC 服务序列化后的 byte 数组, Provider/Consumer 则必须对 byte 数组完成反序列化后才能继续 RPC 服务处理.
- public class RpcProtocol implements Protocol<byte[]> {
- private static final int INTEGER_BYTES = Integer.SIZE / Byte.SIZE;
- @Override
- public byte[] decode(ByteBuffer readBuffer, AioSession<byte[]> session, boolean eof) {
- int remaining = readBuffer.remaining();
- if (remaining <INTEGER_BYTES) {
- return null;
- }
- int messageSize = readBuffer.getInt(readBuffer.position());
- if (messageSize> remaining) {
- return null;
- }
- byte[] data = new byte[messageSize - INTEGER_BYTES];
- readBuffer.getInt();
- readBuffer.get(data);
- return data;
- }
- @Override
- public ByteBuffer encode(byte[] msg, AioSession<byte[]> session) {
- ByteBuffer byteBuffer = ByteBuffer.allocate(msg.length + INTEGER_BYTES);
- byteBuffer.putInt(byteBuffer.capacity());
- byteBuffer.put(msg);
- byteBuffer.flip();
- return byteBuffer;
- }
- }
RPC 请求消息
RPC 请求消息由 Consumer 发送, Consumer 需要在请求消息中提供足够信息以供 Provider 准确识别服务接口. 核心要素包括:
uuid
请求消息唯一标识, 用于关联, 识别响应消息.
interfaceClass
Consumer 要调用的 API 接口名
method
Consumer 要执行的 API 接口方法名
paramClassList
Consumer 调用的方法入参类型, 用于区分同方法名不同入参的情况
params
Consumer 执行方法传入的参数值
- public class RpcRequest implements Serializable {
- /**
- * 消息的唯一标识
- */
- private final String uuid = UUID.randomUUID().toString();
- /**
- * 接口名称
- */
- private String interfaceClass;
- /**
- * 调用方法
- */
- private String method;
- /**
- * 参数类型字符串
- */
- private String[] paramClassList;
- /**
- * 入参
- */
- private Object[] params;
- getX/setX()
- }
RPC 响应消息
RPC 响应消息为 Provider 将接口执行结果响应给 Consumer 的载体.
uuid
与 RPC 请求消息同值
returnObject
RPC 接口执行返回值
returnType
RPC 接口返回值类型
exception
RPC 执行异常信息, 如果出现异常的话.
- public class RpcResponse implements Serializable {
- /**
- * 消息的唯一标示, 与对应的 RpcRequest uuid 值相同
- */
- private String uuid;
- /**
- * 返回对象
- */
- private Object returnObject;
- /**
- * 返回对象类型
- */
- private String returnType;
- /**
- * 异常
- */
- private String exception;
- public RpcResponse(String uuid) {
- this.uuid = uuid;
- }
- getX/setX()
- }
通过上述内容便完成 RPC 通信的消息设计, 至于 RpcRequest,RpcResponse 如何转化为通信协议要求的 byte 数组格式, 我们采用 JDK 提供的序列化方式 (生产环境不建议使用).
序列化
- ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
- ObjectOutput objectOutput = new ObjectOutputStream(byteArrayOutputStream);
- objectOutput.writeObject(request);
- aioSession.write(byteArrayOutputStream.toByteArray());
反序列化
- ObjectInputStream objectInput = new ObjectInputStream(new ByteArrayInputStream(msg));
- RpcResponse resp = (RpcResponse) objectInput.readObject();
RPC 服务实现
通过上文方案我们解决了 RPC 的通信问题, 接下来便得根据通信消息实现服务能力.
Consumer
由于 RPC 的 Consumer 端只有接口, 没有具体实现, 但在使用上我们又期望跟本地服务有同样的使用体验. 因此我们需要将接口实例化成对象, 并使其具备跨应用服务能力, 此处便运用到动态代理. 当 Consumer 调用 RPC 接口时, 代理类内部发送请求消息至 Provider 并获取结果.
- obj = (T) Proxy.newProxyInstance(getClass().getClassLoader(), new Class[]{remoteInterface},
- new InvocationHandler() {
- @Override
- public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
- RpcRequest req = new RpcRequest();
- req.setInterfaceClass(remoteInterface.getName());
- req.setMethod(method.getName());
- Class<?>[] types = method.getParameterTypes();
- if (!ArrayUtils.isEmpty(types)) {
- String[] paramClass = new String[types.length];
- for (int i = 0; i <types.length; i++) {
- paramClass[i] = types[i].getName();
- }
- req.setParamClassList(paramClass);
- }
- req.setParams(args);
- RpcResponse rmiResp = sendRpcRequest(req);
- if (StringUtils.isNotBlank(rmiResp.getException())) {
- throw new RuntimeException(rmiResp.getException());
- }
- return rmiResp.getReturnObject();
- }
- });
- Provider
Provider 可将其提供的 RPC 服务维护在集合里, 采用 Map 存储即可, key 为暴露的接口名, value 为接口的具体实现. 一旦 Provider 接受到 RPC 的请求消息, 只需根据请求消息内容找到并执行对应的服务, 最后将返回结果以消息的形式返回至 Consumer 即可.
- ObjectInputStream objectInput = new ObjectInputStream(new ByteArrayInputStream(msg));
- RpcRequest req = (RpcRequest) objectInput.readObject();
- RpcResponse resp = new RpcResponse(req.getUuid());
- try {
- String[] paramClassList = req.getParamClassList();
- Object[] paramObjList = req.getParams();
- // 获取入参类型
- Class<?>[] classArray = null;
- if (paramClassList != null) {
- classArray = new Class[paramClassList.length];
- for (int i = 0; i <classArray.length; i++) {
- Class<?> clazz = primitiveClass.get(paramClassList[i]);
- if (clazz == null) {
- classArray[i] = Class.forName(paramClassList[i]);
- } else {
- classArray[i] = clazz;
- }
- }
- }
- // 调用接口
- Object impObj = impMap.get(req.getInterfaceClass());
- if (impObj == null) {
- throw new UnsupportedOperationException("can not find interface:" + req.getInterfaceClass());
- }
- Method method = impObj.getClass().getMethod(req.getMethod(), classArray);
- Object obj = method.invoke(impObj, paramObjList);
- resp.setReturnObject(obj);
- resp.setReturnType(method.getReturnType().getName());
- } catch (InvocationTargetException e) {
- LOGGER.error(e.getMessage(), e);
- resp.setException(e.getTargetException().getMessage());
- } catch (Exception e) {
- LOGGER.error(e.getMessage(), e);
- resp.setException(e.getMessage());
- }
- ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
- objectOutput = new ObjectOutputStream(byteArrayOutputStream);
- objectOutput.writeObject(resp);
- session.write(byteArrayOutputStream.toByteArray());
测试 RPC 服务
服务端定义接口 DemoApi, 并将其实现示例 DemoApiImpl 注册至 Provider 中.
- public class Provider {
- public static void main(String[] args) throws IOException {
- RpcProviderProcessor rpcProviderProcessor = new RpcProviderProcessor();
- AioQuickServer<byte[]> server = new AioQuickServer<>(8888, new RpcProtocol(), rpcProviderProcessor);
- server.start();
- rpcProviderProcessor.publishService(DemoApi.class, new DemoApiImpl());
- }
- }
Consumer 调用 RPC 接口 test,sum 获得执行结果.
- public class Consumer {
- public static void main(String[] args) throws InterruptedException, ExecutionException, IOException {
- RpcConsumerProcessor rpcConsumerProcessor = new RpcConsumerProcessor();
- AioQuickClient<byte[]> consumer = new AioQuickClient<>("localhost", 8888, new RpcProtocol(), rpcConsumerProcessor);
- consumer.start();
- DemoApi demoApi = rpcConsumerProcessor.getObject(DemoApi.class);
- System.out.println(demoApi.test("smart-socket"));
- System.out.println(demoApi.sum(1, 2));
- }
- }
总结
本文简要描述了 RPC 服务实现的关键部分, 但是提供稳定可靠的 RPC 服务还有很多细节需要考虑, 有兴趣的朋友可自行研究. 本文示例的完整代码可从 smart-socket 项目中获取.
来源: https://juejin.im/entry/5b39acb3f265da59a50b3572