写在最前面
PRC(Remote Procedure Call) 远程过程调用.通俗的讲就是程序通过 RPC 框架调用远程主机的方法就如同调用本地方法一样.Dubbo 就是这样一个 Rpc 框架,本文主要参考 Dubbo 的设计思路,简易实现了 Rpc 框架. 本文涉及到知识点包括:
Jdk 动态代理
serialization 序列化
Netty 相关
Zookeeper 使用
1,Rpc 框架
Rpc 框架一般分为三个部分,Registry(注册中心),Provider(提供者),Consumer(消费者).
Registry 服务的注册中心,可以通过 zookeeper,redis 等实现.
Provider 服务提供者被调用方,提供服务供消费者调用
Consumer 消费者,通过订阅相应的服务获取需要调用服务的 ip 和端口号调用远程 provider 提供的服务.
2,代理
java 中常见的代理有 JDK 动态代理,Cglib 动态代理,静态代理 (ASM 等字节码技术).
2.1,JDK 代理
举个例子
JDK 代理生成代理对象主要通过
@Override
@SuppressWarnings("unchecked")
public <T> T createProxyBean(Class<T> rpcServiceInterface) {
return (T) Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class<?>[]{rpcServiceInterface}, new AryaRpcInvocationHandler());
}
java.lang.reflect.Proxy
类的 newProxyInstance 方法.JDK 代理需要被代理对象必须实现接口.
2.2,Cglib
Cglib 实际上是对 ASM 的易用性封装,Cglib 不需要目标对象必须实现某一个接口,相对 JDK 动态代理更加灵活.
2.3,静态代理
Enhancer en = new Enhancer();
en.setSuperclass(clazz);
en.setCallback(new MethodInterceptor() {
@Override
public Object intercept(Object arg0, Method method, Object[] args, MethodProxy arg3) throws Throwable {
Object o = method.invoke(object, args);
return o;
}
});
return en.create();
通过字节码技术对 class 文件进行修改,使用和学习成本相对较高,需要对 Class 的文件结构以及各种符号引用有比较深的认识,才能较好的使用,因为是对字节码的修改所以相对的性能上也比动态代理要好一些.
3,序列化
我们知道数据在网络上传输都是通过二进制流的形式进行进行的.当 Consumer 调用 Provider 时传输的参数需要先进行序列化,provider 接收到参数时需要进行反序列化才能拿到需要的参数数据,所以序列化的性能对 RPC 的调用性能有很大的影响.目前主流的序列化方式有很多包括:Kryo,Protostuff,hessian.等
Protostuff 是 google 序列化 Protosbuff 的开源实现,项目中我们用到它的序列化方式
Netty 是一个高性能,异步事件驱动的 NIO 框架,它提供了对 TCP,UDP 和文件传输的支持.举个例子: Netty 服务端代码
/**
* @author HODO
*/
public class ProtostuffSerializer implements Serializer {
@Override
public byte[] serialize(Object object) {
Class targetClass = object.getClass();
RuntimeSchema schema = RuntimeSchema.createFrom(targetClass);
LinkedBuffer linkedBuffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);
return ProtostuffIOUtil.toByteArray(object, schema, linkedBuffer);
}
@SuppressWarnings("unchecked")
@Override
public <T> T deserialize(byte[] bytes, Class<T> targetClass) {
RuntimeSchema schema = RuntimeSchema.createFrom(targetClass);
T object = (T) schema.newMessage();
ProtostuffIOUtil.mergeFrom(bytes, object, schema);
return object;
}
}
4,Netty
Netty 客服端代码
public class NettyServer {
private ApplicationContext applicationContext;
public NettyServer(ApplicationContext applicationContext) {
this.applicationContext = applicationContext;
}
public void init(int port) {
EventLoopGroup boss = new NioEventLoopGroup();
EventLoopGroup worker = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(boss, worker);
bootstrap.channel(NioServerSocketChannel.class);
bootstrap.option(ChannelOption.SO_BACKLOG, 1024);
bootstrap.option(ChannelOption.TCP_NODELAY, true);
bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
bootstrap.localAddress(port);
bootstrap.childHandler(new ChannelInitializer < SocketChannel > () {@Override public void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline channelPipeline = socketChannel.pipeline();
channelPipeline.addLast(new NettyServerHandler(applicationContext));
}
});
ChannelFuture f = bootstrap.bind().sync();
if (f.isSuccess()) {
System.out.println("Netty端口号:" + port);
}
f.channel().closeFuture().sync();
} catch(Exception e) {
e.printStackTrace();
} finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
}
* 接收 Rpc 调用结果
public class NettyClient {
private int port;
private String host;
private final CountDownLatch countDownLatch = new CountDownLatch(1);
SerializerFactory serializerFactory = new SerializerFactory();
Serializer serializer = serializerFactory.getSerialize(ProtostuffSerializer.class);
public NettyClient(String host, int port) {
this.port = port;
this.host = host;
}
public NettyClient(String inetAddress) {
if (inetAddress != null && inetAddress.length() != 0) {
String[] strings = inetAddress.split(":");
this.host = strings[0];
this.port = Integer.valueOf(strings[1]);
}
}
public RpcResponse invoker(RpcRequest rpcRequest) throws InterruptedException {
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
final NettyClientHandler clientHandler = new NettyClientHandler();
bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class).option(ChannelOption.SO_KEEPALIVE, true).handler(new ChannelInitializer < SocketChannel > () {@Override protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(clientHandler);
}
});
ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port)).sync();
serializer.serialize(rpcRequest);
future.channel().writeAndFlush(Unpooled.buffer().writeBytes(serializer.serialize(rpcRequest)));
countDownLatch.await();
// 等待链接关闭
//future.channel().closeFuture().sync();
return clientHandler.getRpcResponse();
} finally {
eventLoopGroup.shutdownGracefully();
}
}
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
private RpcResponse rpcResponse;
/***/
*
* @param ctx netty 容器
* @param msg 服务端答复消息
5,注册中心 zookeeper
* @throws Exception * /
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);
rpcResponse = serializer.deserialize(req, RpcResponse.class);
countDownLatch.countDown();
}
RpcResponse getRpcResponse() {
return rpcResponse;
}
}
}/
选用了 zookeeper 作为注册中心,在建议 Rpc 框架中提供了注册中心的扩展.只要实现 RegistryManager 接口即可.zookeeper 常用的命令行:
1,客服端脚本连接 zookeeper 服务器不指定 - server 默认连接本地服务
. / zkCli - service ip: port
2,创建
create[ - s][ - e] path data acl
创建一个节点 - s -e 分别指定节点的类型和特性:顺序和临时节点默认创建的是临时节点,acl 用于权限控制
3,读取
ls path 只能看指定节点下的一级节点
get path 查看指定节点的数据和属性信息
4,更新
set path data[version]
可以指定更新操作是基于哪一个版本当更新的 path 不存在时报
Node does not exist
5,删除
`delete path [version]``
6,Spring 支持
在框架中还提供了两个注解 @RpcConsumer 和 RpcProvider 在项目中只要引入
在 provider 端容器注入
<dependency>
<groupId>com.yoku.arya</groupId>
<artifactId>arya</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
在 comsumer 端容器注入
@Bean
public RpcProviderProcessor rpcProviderProcessor() {
return new RpcProviderProcessor();
}
@Bean
public RpcConsumerProcessor rpcConsumerProcessor() {
return new RpcConsumerProcessor();
}
来源: https://juejin.im/post/5a5b684c6fb9a01c9e45ec76