在使用 dubbo 时, 通常会遇到 timeout 这个属性, timeout 属性的作用是: 给某个服务调用设置超时时间, 如果服务在设置的时间内未返回结果, 则会抛出调用超时异常: TimeoutException, 在使用的过程中, 我们有时会对 provider 和 consumer 两个配置都会设置 timeout 值, 那么服务调用过程中会以哪个为准? 橘子同学今天主要针对这个问题进行分析和扩展.
三种设置方式
以 provider 配置为例:
- #### 方法级别
- <dubbo:service interface="orangecsong.test.service.TestService" ref="testServiceImpl">
- <dubbo:method name="test" timeout="10000"/>
- </dubbo:service>
- #### 接口级别
- <dubbo:service interface="orangecsong.test.service.TestService" ref="testServiceImpl" timeout="10000"/>
- #### 全局级别
- <dubbo:service ="10000"/>
优先级选择
在 dubbo 中如果 provider 和 consumer 都配置了相同的一个属性, 比如本文分析的 timeout, 其实它们是有优先级的, consumer 方法配置> provider 方法配置> consumer 接口配置> provider 接口配置> consumer 全局配置> provider 全局配置. 所以对于小橘开始的提出的问题就有了结果, 会以消费者配置的为准, 接下结合源码来进行解析, 其实源码很简单, 在 RegistryDirectory 类中将服务列表转换为 DubboInvlker 方法中进行了处理:
- private Map<String, Invoker<T>> toInvokers(List<URL> urls) {
- Map<String, Invoker<T>> newUrlInvokerMap = new HashMap<String, Invoker<T>>();
- if (urls == null || urls.isEmpty()) {
- return newUrlInvokerMap;
- }
- Set<String> keys = new HashSet<String>();
- String queryProtocols = this.queryMap.get(Constants.PROTOCOL_KEY);
- for (URL providerUrl : urls) {
- // If protocol is configured at the reference side, only the matching protocol is selected
- if (queryProtocols != null && queryProtocols.length()> 0) {
- boolean accept = false;
- String[] acceptProtocols = queryProtocols.split(",");
- for (String acceptProtocol : acceptProtocols) {
- if (providerUrl.getProtocol().equals(acceptProtocol)) {
- accept = true;
- break;
- }
- }
- if (!accept) {
- continue;
- }
- }
- if (Constants.EMPTY_PROTOCOL.equals(providerUrl.getProtocol())) {
- continue;
- }
- if (!ExtensionLoader.getExtensionLoader(Protocol.class).hasExtension(providerUrl.getProtocol())) {
- logger.error(new IllegalStateException("Unsupported protocol" + providerUrl.getProtocol() +
- "in notified url:" + providerUrl + "from registry" + getUrl().getAddress() +
- "to consumer" + NetUtils.getLocalHost() + ", supported protocol:" +
- ExtensionLoader.getExtensionLoader(Protocol.class).getSupportedExtensions()));
- continue;
- }
- // 重点就是下面这个方法
- URL url = mergeUrl(providerUrl);
- String key = url.toFullString(); // The parameter urls are sorted
- if (keys.contains(key)) { // Repeated url
- continue;
- }
- keys.add(key);
- // Cache key is url that does not merge with consumer side parameters, regardless of how the consumer combines parameters, if the server url changes, then refer again
- Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local reference
- Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key);
- if (invoker == null) { // Not in the cache, refer again
- try {
- boolean enabled = true;
- if (url.hasParameter(Constants.DISABLED_KEY)) {
- enabled = !url.getParameter(Constants.DISABLED_KEY, false);
- } else {
- enabled = url.getParameter(Constants.ENABLED_KEY, true);
- }
- if (enabled) {
- invoker = new InvokerDelegate<T>(protocol.refer(serviceType, url), url, providerUrl);
- }
- } catch (Throwable t) {
- logger.error("Failed to refer invoker for interface:" + serviceType + ",url:(" + url + ")" + t.getMessage(), t);
- }
- if (invoker != null) { // Put new invoker in cache
- newUrlInvokerMap.put(key, invoker);
- }
- } else {
- newUrlInvokerMap.put(key, invoker);
- }
- }
- keys.clear();
- return newUrlInvokerMap;
- }
重点就是上面 mergeUrl 方法, 将 provider 和 comsumer 的 url 参数进行了整合, 在 mergeUrl 方法有会调用 ClusterUtils.mergeUrl 方法进行整合, 因为这个方法比较简单, 就是对一些参数进行了整合了, 会用 consumer 参数进行覆盖, 这里就不分析了, 如果感兴趣的同学可以去研究一下.
超时处理
在配置设置了超时 timeout, 那么代码中是如何处理的, 这里咱们在进行一下扩展, 分析一下 dubbo 中是如何处理超时的, 在调用服务方法, 最后都会调用 DubboInvoker.doInvoke 方法, 咱们就从这个方法开始分析:
- @Override
- protected Result doInvoke(final Invocation invocation) throws Throwable {
- RpcInvocation inv = (RpcInvocation) invocation;
- final String methodName = RpcUtils.getMethodName(invocation);
- inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
- inv.setAttachment(Constants.VERSION_KEY, version);
- ExchangeClient currentClient;
- if (clients.length == 1) {
- currentClient = clients[0];
- } else {
- currentClient = clients[index.getAndIncrement() % clients.length];
- }
- try {
- boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
- boolean isAsyncFuture = RpcUtils.isReturnTypeFuture(inv);
- boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
- int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
- if (isOneway) {
- boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
- currentClient.send(inv, isSent);
- RpcContext.getContext().setFuture(null);
- return new RpcResult();
- } else if (isAsync) {
- ResponseFuture future = currentClient.request(inv, timeout);
- // For compatibility
- FutureAdapter<Object> futureAdapter = new FutureAdapter<>(future);
- RpcContext.getContext().setFuture(futureAdapter);
- Result result;
- // 异步处理
- if (isAsyncFuture) {
- // register resultCallback, sometimes we need the async result being processed by the filter chain.
- result = new AsyncRpcResult(futureAdapter, futureAdapter.getResultFuture(), false);
- } else {
- result = new SimpleAsyncRpcResult(futureAdapter, futureAdapter.getResultFuture(), false);
- }
- return result;
- } else {
- // 同步处理
- RpcContext.getContext().setFuture(null);
- return (Result) currentClient.request(inv, timeout).get();
- }
- } catch (TimeoutException e) {
- throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method:" + invocation.getMethodName() + ", provider:" + getUrl() + ", cause:" + e.getMessage(), e);
- } catch (RemotingException e) {
- throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method:" + invocation.getMethodName() + ", provider:" + getUrl() + ", cause:" + e.getMessage(), e);
- }
- }
在这个方法中, 就以同步模式进行分析, 看 request 方法, request()方法会返回一个 DefaultFuture 类, 在去调用 DefaultFuture.get()方法, 这里其实涉及到一个在异步中实现同步的技巧, 咱们这里不做分析, 所以重点就在 get()方法里:
- @Override
- public Object get() throws RemotingException {
- return get(timeout);
- }
- @Override
- public Object get(int timeout) throws RemotingException {
- if (timeout <= 0) {
- timeout = Constants.DEFAULT_TIMEOUT;
- }
- if (!isDone()) {
- long start = System.currentTimeMillis();
- lock.lock();
- try {
- while (!isDone()) {
- done.await(timeout, TimeUnit.MILLISECONDS);
- if (isDone() || System.currentTimeMillis() - start> timeout) {
- break;
- }
- }
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- } finally {
- lock.unlock();
- }
- if (!isDone()) {
- throw new TimeoutException(sent> 0, channel, getTimeoutMessage(false));
- }
- }
- return returnFromResponse();
- }
在调用 get()方法时, 会去调用 get(timeout)这个方法, 在这个方法中会传一个 timeout 字段, 在和 timeout 就是咱们配置的那个参数, 在这个方法中咱们要关注下面一个代码块:
- if (!isDone()) {
- long start = System.currentTimeMillis();
- lock.lock();
- try {
- while (!isDone()) {
- // 线程阻塞
- done.await(timeout, TimeUnit.MILLISECONDS);
- if (isDone() || System.currentTimeMillis() - start> timeout) {
- break;
- }
- }
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- } finally {
- lock.unlock();
- }
- // 在超时时间里, 还没有结果, 则抛出超时异常
- if (!isDone()) {
- throw new TimeoutException(sent> 0, channel, getTimeoutMessage(false));
- }
- }
重点看 await()方法, 会进行阻塞 timeout 时间, 如果阻塞时间到了, 则会唤醒往下执行, 超时跳出 while 循环中, 判断是否有结果返回, 如果没有 (这个地方要注意: 只有有结果返回, 或超时才跳出循环中), 则抛出超时异常. 讲到这里, 超时原理基本上其实差不多了, DefaultFuture 这个类还有个地方需要注意, 在初始化 DefaultFuture 对象时, 会去创建一个超时的延迟任务, 延迟时间就是 timeout 值, 在这个延迟任务中也会调用 signal() 方法唤醒阻塞.
分批调用
不过在调用 rpc 远程接口, 如果对方的接口不能一次承载返回请求结果能力, 我们一般做法是分批调用, 将调用一次分成调用多次, 然后对每次结果进行汇聚, 当然也可以做用利用多线程的能力去执行. 后面文章小橘将会介绍这种模式, 敬请关注哦!
- /**
- * Description: 通用 分批调用工具类
- * 场景:
- * <pre>
- * 比如 List 参数的 size 可能为 几十个甚至上百个
- * 如果 invoke 接口比较慢, 传入 50 个以上会超时, 那么可以每次传入 20 个, 分批执行.
- * </pre>
- * Author: OrangeCsong
- */
- public class ParallelInvokeUtil {
- private ParallelInvokeUtil() {}
- /**
- * @param sourceList 源数据
- * @param size 分批大小
- * @param buildParam 构建函数
- * @param processFunction 处理函数
- * @param <R> 返回值
- * @param <T> 入参 \
- * @param <P> 构建参数
- * @return
- */
- public static <R, T, P> List<R> partitionInvokeWithRes(List<T> sourceList, Integer size,
- Function<List<T>, P> buildParam,
- Function<P, List<R>> processFunction) {
- if (CollectionUtils.isEmpty(sourceList)) {
- return new ArrayList<>(0);
- }
- Preconditions.checkArgument(size> 0, "size 大小必须大于 0");
- return Lists.partition(sourceList, size).stream()
- .map(buildParam)
- .map(processFunction)
- .filter(Objects::nonNull)
- .reduce(new ArrayList<>(),
- (resultList1, resultList2) -> {
- resultList1.addAll(resultList2);
- return resultList1;
- });
- }
- }
来源: https://www.cnblogs.com/csong7876/p/13236955.html