自己动手实现 rpc 服务调用框架
本文利用 java 自带的 socket 编程实现了一个简单的 rpc 调用框架, 由两个工程组成分别名为 battercake-provider(服务提供者),battercake-consumer(服务调用者).
服务提供者
本部分的工程为 battercake-provider, 项目结构图如下图所示
先上 rpc 框架调用部分的代码, RpcProvider, 该部分代码可以总结为两步
将需要发布的服务存储在一个内存变量 serviceList 中
启动 socket,server.accept() 方法阻塞在那, 监听输入
针对每一个请求, 单独启动一个线程处理
- package com.rjzheng.rpc;
- import java.net.ServerSocket;
- import java.net.Socket;
- import java.util.ArrayList;
- import java.util.Arrays;
- import java.util.List;
- /**
- * RPC 服务提供器
- * @author zhengrongjun
- *
- */
- public class RpcProvider {
- // 存储注册的服务列表
- private static List<Object> serviceList;
- /**
- * 发布 rpc 服务
- * @param object
- * @param port
- * @throws Exception
- */
- public static void export(int port,Object... services) throws Exception {
- serviceList=Arrays.asList(services);
- ServerSocket server = new ServerSocket(port);
- Socket client = null;
- while (true) {
- // 阻塞等待输入
- client = server.accept();
- // 每一个请求, 启动一个线程处理
- new Thread(new ServerThread(client,serviceList)).start();
- }
- }
- }
接下来 ServerThread 线程处理类的代码, ServerThread 主要做以下几个步骤
读取客户端发送的服务名
判断服务是否发布
如果发布, 则走反射逻辑, 动态调用, 返回结果
如果未发布, 则返回提示通知
- package com.rjzheng.rpc;
- import java.io.IOException;
- import java.io.ObjectInputStream;
- import java.io.ObjectOutputStream;
- import java.lang.reflect.Method;
- import java.net.Socket;
- import java.util.List;
- public class ServerThread implements Runnable {
- private Socket client = null;
- private List<Object> serviceList = null;
- public ServerThread(Socket client, List<Object> service) {
- this.client = client;
- this.serviceList = service;
- }
- @Override
- public void run() {
- ObjectInputStream input = null;
- ObjectOutputStream output = null;
- try {
- input = new ObjectInputStream(client.getInputStream());
- output = new ObjectOutputStream(client.getOutputStream());
- // 读取客户端要访问那个 service
- Class serviceClass = (Class) input.readObject();
- // 找到该服务类
- Object obj = findService(serviceClass);
- if (obj == null) {
- output.writeObject(serviceClass.getName() + "服务未发现");
- } else {
- // 利用反射调用该方法, 返回结果
- try {
- String methodName = input.readUTF();
- Class<?>[] parameterTypes = (Class<?>[]) input.readObject();
- Object[] arguments = (Object[]) input.readObject();
- Method method = obj.getClass().getMethod(methodName, parameterTypes);
- Object result = method.invoke(obj, arguments);
- output.writeObject(result);
- } catch (Throwable t) {
- output.writeObject(t);
- }
- }
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- try {
- client.close();
- input.close();
- output.close();
- } catch (IOException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- }
- }
- private Object findService(Class serviceClass) {
- // TODO Auto-generated method stub
- for (Object obj : serviceList) {
- boolean isFather = serviceClass.isAssignableFrom(obj.getClass());
- if (isFather) {
- return obj;
- }
- }
- return null;
- }
- }
接下来是使用的部分
先创建一个微服务, 接口如下
- package com.rjzheng.service;
- public interface BatterCakeService {
- /**
- * 卖煎饼的服务
- * @param name
- * @return
- */
- public String sellBatterCake(String name);
- }
实现类如下
- package com.rjzheng.service.impl;
- import com.rjzheng.service.BatterCakeService;
- public class BatterCakeServiceImpl implements BatterCakeService {
- @Override
- public String sellBatterCake(String name) {
- // TODO Auto-generated method stub
- return name+"煎饼, 卖的特别好";
- }
- }
接下来就是发布服务
- package com.rjzheng.start;
- import com.rjzheng.rpc.RpcProvider;
- import com.rjzheng.service.BatterCakeService;
- import com.rjzheng.service.impl.BatterCakeServiceImpl;
- public class RpcBootStrap {
- public static void main(String[] args) throws Exception {
- BatterCakeService batterCakeService =new BatterCakeServiceImpl();
- // 发布卖煎饼的服务, 注册在 20006 端口
- RpcProvider.export(20006,batterCakeService);
- }
- }
服务消费者
本部分的工程为 battercake-consumer, 项目结构图如下图所示
先上 rpc 框架调用部分的代码 RpcConsumer, 步骤分两步
封装一个代理类处理器
返回 service 的代理类对象
- package com.rjzheng.rpc;
- import java.lang.reflect.Proxy;
- public class RpcConsumer {
- public static <T> T getService(Class<T> clazz,String ip,int port) {
- ProxyHandler proxyHandler =new ProxyHandler(ip,port);
- return (T)Proxy.newProxyInstance(RpcConsumer.class.getClassLoader(), new Class<?>[] {clazz}, proxyHandler);
- }
- }
接下来上代理类处理器的代码, 代理类处理步骤分以下几步
建立 socket 连接
封装请求数据, 发送给服务提供者
返回结果
- package com.rjzheng.rpc;
- import java.io.ObjectInputStream;
- import java.io.ObjectOutputStream;
- import java.lang.reflect.InvocationHandler;
- import java.lang.reflect.Method;
- import java.net.Socket;
- import com.rjzheng.service.BatterCakeService;
- public class ProxyHandler implements InvocationHandler {
- private String ip;
- private int port;
- public ProxyHandler(String ip, int port) {
- // TODO Auto-generated constructor stub
- this.ip = ip;
- this.port = port;
- }
- @Override
- public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
- // TODO Auto-generated method stub
- Socket socket = new Socket(this.ip, this.port);
- ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream());
- ObjectInputStream input = new ObjectInputStream(socket.getInputStream());
- try {
- output.writeObject(proxy.getClass().getInterfaces()[0]);
- output.writeUTF(method.getName());
- output.writeObject(method.getParameterTypes());
- output.writeObject(args);
- output.flush();
- Object result = input.readObject();
- if(result instanceof Throwable) {
- throw (Throwable) result;
- }
- return result;
- } finally {
- socket.shutdownOutput();
- }
- }
- }
接下来建立一个测试类 RpcTest 如下 (跑该测试类前, 记得运行在 battercake-provider 端的 RpcBootstrap 类发布 BatterCakeService 服务)
- package com.rjzheng.start;
- import com.rjzheng.rpc.RpcConsumer;
- import com.rjzheng.service.BatterCakeService;
- public class RpcTest {
- public static void main(String[] args) {
- BatterCakeService batterCakeService=RpcConsumer.getService(BatterCakeService.class, "127.0.0.1", 20006);
- String result=batterCakeService.sellBatterCake("双蛋");
- System.out.println(result);
- }
- }
输出结果如下
双蛋煎饼, 卖的特别好
至此, 我们就实现了一个简易的 rpc 服务调用框架
来源: https://www.cnblogs.com/rjzheng/p/8798556.html