一, 同步调用
默认情况下, 我们通过 Dubbo 调用一个服务, 需得等服务端执行完全部逻辑, 方法才得以返回. 这个就是同步调用.
但大家是否考虑过另外一个问题, Dubbo 底层网络通信采用 Netty, 而 Netty 是异步的; 那么它是怎么将请求转换成同步的呢?
首先我们来看请求方, 在 DubboInvoker 类中, 它有三种不同的调用方式.
- protected Result doInvoke(final Invocation invocation) throws Throwable {
- try {
- boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
- boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
- int timeout = getUrl().getMethodParameter(methodName, "timeout", 1000);
- // 忽略返回值
- if (isOneway) {
- boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
- currentClient.send(inv, isSent);
- RpcContext.getContext().setFuture(null);
- return new RpcResult();
- // 异步调用
- } else if (isAsync) {
- ResponseFuture future = currentClient.request(inv, timeout);
- RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
- return new RpcResult();
- // 同步调用
- } else {
- RpcContext.getContext().setFuture(null);
- return (Result) currentClient.request(inv, timeout).get();
- }
- }
- }
可以看到, 上面的代码有三个分支, 分别是: 忽略返回值调用, 异步调用和同步调用. 我们重点先看 return (Result) currentClient.request(inv, timeout).get();
关于上面这句代码, 它包含两个动作: 先调用 currentClient.request 方法, 通过 Netty 发送请求数据; 然后调用其返回值的 get 方法, 来获取返回值.
1, 发送请求
这一步主要是将请求方法封装成 Request 对象, 通过 Netty 将数据发送到服务端, 然后返回一个 DefaultFuture 对象.
- public ResponseFuture request(Object request, int timeout) throws RemotingException {
- // 如果客户端已断开连接
- if (closed) {
- throw new RemotingException(".......");
- }
- // 封装请求信息
- Request req = new Request();
- req.setVersion("2.0.0");
- req.setTwoWay(true);
- req.setData(request);
- // 构建 DefaultFuture 对象
- DefaultFuture future = new DefaultFuture(channel, req, timeout);
- try {
- // 通过 Netty 发送网络数据
- channel.send(req);
- } catch (RemotingException e) {
- future.cancel();
- throw e;
- }
- return future;
- }
如上代码, 逻辑很清晰. 关于看它的返回值是一个 DefaultFuture 对象, 我们再看它的构造方法.
- public DefaultFuture(Channel channel, Request request, int timeout) {
- this.channel = channel;
- this.request = request;
- this.id = request.getId();
- this.timeout = timeout> 0 ? timeout :
- channel.getUrl().getPositiveParameter("timeout", 1000);
- // 当前 Future 和请求信息的映射
- FUTURES.put(id, this);
- // 当前 Channel 和请求信息的映射
- CHANNELS.put(id, channel);
- }
在这里, 我们必须先对 Future 有所了解. Future 模式是多线程开发中非常常见的一种设计模式, 在这里我们返回这个对象后, 调用其 get 方法来获得返回值.
2, 获取返回值
我们接着看 get 方法.
- public Object get(int timeout) throws RemotingException {
- // 设置默认超时时间
- if (timeout <= 0) {
- timeout = Constants.DEFAULT_TIMEOUT;
- }
- // 判断 如果操作未完成
- if (!isDone()) {
- long start = System.currentTimeMillis();
- lock.lock();
- try {
- // 通过加锁, 等待
- while (!isDone()) {
- done.await(timeout, TimeUnit.MILLISECONDS);
- if (isDone() || System.currentTimeMillis() - start> timeout) {
- break;
- }
- }
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- } finally {
- lock.unlock();
- }
- if (!isDone()) {
- throw new TimeoutException(sent> 0, channel, getTimeoutMessage(false));
- }
- }
- // 返回数据
- return returnFromResponse();
- }
- // 获取返回值 response
- private Object returnFromResponse() throws RemotingException {
- Response res = response;
- if (res == null) {
- throw new IllegalStateException("response cannot be null");
- }
- if (res.getStatus() == Response.OK) {
- return res.getResult();
- }
- if (res.getStatus() == 30 || res.getStatus() == 31) {
- throw new TimeoutException(res.getStatus() == 31, channel, res.getErrorMessage());
- }
- throw new RemotingException(channel, res.getErrorMessage());
- }
如上代码, 我们重点来看 get 方法. 我们总结下它的运行流程:
判断超时时间, 小于 0 则设置默认值
判断操作是否已完成, 即 response 是否为空; 如果已完成, 获取返回值, 并返回
如果操作未完成, 加锁, 等待; 获得通知后, 再次判断操作是否完成. 若完成, 获取返回值, 并返回.
那么我们就会想到两个问题, response 在哪里被赋值, await 在哪里被通知.
在 Netty 读取到网络数据后, 其中会调用到 HeaderExchangeHandler 中的方法, 我们来看一眼就明白了.
- public class HeaderExchangeHandler implements ChannelHandlerDelegate {
- // 处理返回信息
- static void handleResponse(Channel channel, Response response) throws RemotingException {
- if (response != null && !response.isHeartbeat()) {
- DefaultFuture.received(channel, response);
- }
- }
- }
上面说的很清楚, 如果 response 不为空, 并且不是心跳数据, 就调用 DefaultFuture.received, 在这个方法里面, 主要就是根据返回信息的 ID 找到对应的 Future, 然后通知.
- public static void received(Channel channel, Response response)
- try {
- // 根据返回信息中的 ID 找到对应的 Future
- DefaultFuture future = FUTURES.remove(response.getId());
- if (future != null) {
- // 通知方法
- future.doReceived(response);
- } else {
- logger.warn("......");
- }
- } finally {
- // 处理完成, 删除 Future
- CHANNELS.remove(response.getId());
- }
- }
future.doReceived(response); 就很简单了, 它就回答了我们上面的那两个小问题. 赋值 response 和 await 通知.
- private void doReceived(Response res) {
- lock.lock();
- try {
- // 赋值 response
- response = res;
- if (done != null) {
- // 通知方法
- done.signal();
- }
- } finally {
- lock.unlock();
- }
- if (callback != null) {
- invokeCallback(callback);
- }
- }
通过以上方式, Dubbo 就完成了同步调用. 我们再总结下它的整体流程:
将请求封装为 Request 对象, 并构建 DefaultFuture 对象, 请求 ID 和 Future 对应.
通过 Netty 发送 Request 对象, 并返回 DefaultFuture 对象.
调用
DefaultFuture.get()
等待数据回传完成.
服务端处理完成, Netty 处理器接收到返回数据, 通知到 DefaultFuture 对象.
get 方法返回, 获取到返回值.
二, 异步调用
如果想使用异步调用的方式, 我们就得配置一下. 在消费者端配置文件中
- <dubbo:reference id="infoUserService"
- interface="com.viewscenes.netsupervisor.service.InfoUserService"
- async="true"/>
然后我们再看它的实现方法
- if (isAsync) {
- ResponseFuture future = currentClient.request(inv, timeout);
- RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
- return new RpcResult();
- }
可以看到, 它同样是通过 currentClient.request 返回的 Future 对象, 但并未调用其 get 方法; 而是将 Future 对象封装成 FutureAdapter, 然后设置到 RpcContext.getContext()
RpcContext 是 Dubbo 中的一个上下文信息, 它是一个 ThreadLocal 的临时状态记录器. 我们重点看它的 setFuture 方法.
- public class RpcContext {
- private static final ThreadLocal<RpcContext> LOCAL = new ThreadLocal<RpcContext>() {
- @Override
- protected RpcContext initialValue() {
- return new RpcContext();
- }
- };
- private Future<?> future;
- public void setFuture(Future<?> future) {
- this.future = future;
- }
- }
既然它是基于 ThreadLocal 机制实现, 那么我们在获取返回值的时候, 通过 ThreadLocal 获取到上下文信息对象, 再拿到其 Future 对象就好了. 这个时候, 我们客户端应该这样来做
- userService.sayHello("Jack");
- Future<Object> future = RpcContext.getContext().getFuture();
- System.out.println("服务返回消息:"+future.get());
这样做的好处是, 我们不必等待在单一方法上, 可以调用多个方法, 它们会并行的执行. 比如像官网给出的例子那样:
- // 此调用会立即返回 null
- fooService.findFoo(fooId);
- // 拿到调用的 Future 引用, 当结果返回后, 会被通知和设置到此 Future
- Future<Foo> fooFuture = RpcContext.getContext().getFuture();
- // 此调用会立即返回 null
- barService.findBar(barId);
- // 拿到调用的 Future 引用, 当结果返回后, 会被通知和设置到此 Future
- Future<Bar> barFuture = RpcContext.getContext().getFuture();
- // 此时 findFoo 和 findBar 的请求同时在执行, 客户端不需要启动多线程来支持并行, 而是借助 NIO 的非阻塞完成
- // 如果 foo 已返回, 直接拿到返回值, 否则线程 wait 住, 等待 foo 返回后, 线程会被 notify 唤醒
- Foo foo = fooFuture.get();
- // 同理等待 bar 返回
- Bar bar = barFuture.get();
- // 如果 foo 需要 5 秒返回, bar 需要 6 秒返回, 实际只需等 6 秒, 即可获取到 foo 和 bar, 进行接下来的处理.
来源: https://juejin.im/post/5c98cc08e51d45639b76cb98