我在服务端引用那篇文章里面分析到, 服务端在引用的时候会去获取服务端可用的服务, 并进行心跳, 维护一个可用的集合.
所以我们从客户端初始化这部分说起.
服务连接的维护
客户端初始化的时候会调用 cluster#init 方法, 这里的 cluster 是继承了 AbstractCLuster 抽象类, 调用的是抽象类里面的 init 方法.
- public synchronized void init() {
- if (initialized) { // 已初始化
- return;
- }
- // 构造 Router 链
- routerChain = RouterChain.buildConsumerChain(consumerBootstrap);
- // 负载均衡策略 考虑是否可动态替换?
- loadBalancer = LoadBalancerFactory.getLoadBalancer(consumerBootstrap);
- // 地址管理器
- addressHolder = AddressHolderFactory.getAddressHolder(consumerBootstrap);
- // 连接管理器
- connectionHolder = ConnectionHolderFactory.getConnectionHolder(consumerBootstrap);
- // 构造 Filter 链, 最底层是调用过滤器
- this.filterChain = FilterChain.buildConsumerChain(this.consumerConfig,
- new ConsumerInvoker(consumerBootstrap));
- if (consumerConfig.isLazy()) { // 延迟连接
- if (LOGGER.isInfoEnabled(consumerConfig.getAppName())) {
- LOGGER.infoWithApp(consumerConfig.getAppName(), "Connection will be initialized when first invoke.");
- }
- }
- // 启动重连线程
- connectionHolder.init();
- try {
- // 得到服务端列表
- List<ProviderGroup> all = consumerBootstrap.subscribe();
- if (CommonUtils.isNotEmpty(all)) {
- // 初始化服务端连接 (建立长连接)
- updateAllProviders(all);
- }
- } catch (SofaRpcRuntimeException e) {
- throw e;
- } catch (Throwable e) {
- throw new SofaRpcRuntimeException("Init provider's transport error!", e);
- }
- // 启动成功
- initialized = true;
- // 如果 check=true 表示强依赖
- if (consumerConfig.isCheck() && !isAvailable()) {
- throw new SofaRpcRuntimeException("The consumer is depend on alive provider" +
- "and there is no alive provider, you can ignore it" +
- "by ConsumerConfig.setCheck(boolean) (default is false)");
- }
- }
这上面在服务连接的维护上面主要分为三步:
设置心跳线程, 每 10 秒进行一次心跳
获取服务端列表
初始化服务端连接
1.SOFARPC 的心跳线程
AllConnectConnectionHolder#init
这里 connectionHolder 是 AllConnectConnectionHolder 的实现类, 我们进入到这个类里面看. 这里面实际上实现了 SOFARPC 的心跳检测.
- /**
- * 重连线程
- */
- private volatile ScheduledService reconThread;
- public void init() {
- // 如果 reconThread 没有初始化过, 调用 startReconnectThread 进行初始化
- if (reconThread == null) {
- startReconnectThread();
- }
- }
- protected void startReconnectThread() {
- final String interfaceId = consumerConfig.getInterfaceId();
- // 启动线程池
- // 默认每隔 10 秒重连
- int reconnect = consumerConfig.getReconnectPeriod();
- if (reconnect> 0) {
- reconnect = Math.max(reconnect, 2000); // 最小 2000
- reconThread = new ScheduledService("CLI-RC-" + interfaceId, ScheduledService.MODE_FIXEDDELAY, new
- Runnable() {
- @Override
- public void run() {
- try {
- doReconnect();
- } catch (Throwable e) {
- LOGGER.errorWithApp(consumerConfig.getAppName(),
- "Exception when retry connect to provider", e);
- }
- }
- }, reconnect, reconnect, TimeUnit.MILLISECONDS).start();
- }
- }
在 startReconnectThread 方法中, 客户端会调用 reconnectPeriod 变量, 如果没有设置则为 10 秒, 如果设置小于 10 秒则取 2 秒. 也就是说客户端开启的心跳是默认 10 秒一次, 最快也是只能 2 秒一次.
然后创建了一个 ScheduledService 实例, 并调用其 start 方法.
我们看一下 ScheduledService 类是怎么样的结构
- ScheduledService
- public class ScheduledService {
- private volatile ScheduledExecutorService scheduledExecutorService;
- public ScheduledService(String threadName,
- int mode,
- Runnable runnable,
- long initialDelay,
- long period,
- TimeUnit unit) {
- this.threadName = threadName;
- this.runnable = runnable;
- this.initialDelay = initialDelay;
- this.period = period;
- this.unit = unit;
- this.mode = mode;
- }
- // 开始执行定时任务
- public synchronized ScheduledService start() {
- if (started) {
- return this;
- }
- if (scheduledExecutorService == null) {
- scheduledExecutorService = new ScheduledThreadPoolExecutor(1,
- new NamedThreadFactory(threadName, true));
- }
- ScheduledFuture future = null;
- // 传进来的是 MODE_FIXEDDELAY
- switch (mode) {
- case MODE_FIXEDRATE:
- future = scheduledExecutorService.scheduleAtFixedRate(runnable, initialDelay,
- period,
- unit);
- break;
- case MODE_FIXEDDELAY:
- // 创建一个固定延迟的定时任务
- future = scheduledExecutorService.scheduleWithFixedDelay(runnable, initialDelay, period,
- unit);
- break;
- default:
- break;
- }
- if (future != null) {
- this.future = future;
- // 缓存一下
- SCHEDULED_SERVICE_MAP.put(this, System.currentTimeMillis());
- started = true;
- } else {
- started = false;
- }
- return this;
- }
- }
ScheduledService 的作用就是创建一个固定延迟的线程, 以固定的时间定时执行一下任务.
然后会默认每 10 秒钟执行一次 AllConnectConnectionHolder 的 doReconnect 方法.
- AllConnectConnectionHolder#doReconnect
- /**
- * 存活的客户端列表 (保持了长连接, 且一切正常的)
- */
- protected ConcurrentMap<ProviderInfo, ClientTransport> aliveConnections = new ConcurrentHashMap<ProviderInfo, ClientTransport>();
- /**
- * 失败待重试的客户端列表 (连上后断开的)
- */
- protected ConcurrentMap<ProviderInfo, ClientTransport> retryConnections = new ConcurrentHashMap<ProviderInfo, ClientTransport>();
- private void doReconnect() {
- // 获取配置的接口
- String interfaceId = consumerConfig.getInterfaceId();
- // 获取应用名
- String appName = consumerConfig.getAppName();
- int thisTime = reconnectFlag.incrementAndGet();
- boolean print = thisTime % 6 == 0; // 是否打印 error, 每 6 次打印一次
- // 可用的连接集合是否为空
- boolean isAliveEmptyFirst = isAvailableEmpty();
- // 检查可用连接 todo subHealth
- for (Map.Entry<ProviderInfo, ClientTransport> alive : aliveConnections.entrySet()) {
- ClientTransport connection = alive.getValue();
- // 如果该连接不可用, 那么就将该连接从可用连接集合里剔除放入到重试集合里面
- if (connection != null && !connection.isAvailable()) {
- aliveToRetry(alive.getKey(), connection);
- }
- }
- // 遍历所有待重试集合
- for (Map.Entry<ProviderInfo, ClientTransport> entry : getRetryConnections()
- .entrySet()) {
- ProviderInfo providerInfo = entry.getKey();
- int providerPeriodCoefficient = CommonUtils.parseNum((Integer)
- providerInfo.getDynamicAttr(ProviderInfoAttrs.ATTR_RC_PERIOD_COEFFICIENT), 1);
- if (thisTime % providerPeriodCoefficient != 0) {
- continue; // 如果命中重连周期, 则进行重连
- }
- ClientTransport transport = entry.getValue();
- if (LOGGER.isDebugEnabled(appName)) {
- LOGGER.debugWithApp(appName, "Retry connect to {} provider:{} ...", interfaceId, providerInfo);
- }
- try {
- // 重连
- transport.connect();
- // 重连完检查一下该连接是否可用
- if (doubleCheck(interfaceId, providerInfo, transport)) {
- providerInfo.setDynamicAttr(ProviderInfoAttrs.ATTR_RC_PERIOD_COEFFICIENT, 1);
- // 如果该连接可用, 则把该连接从重试集合里移除, 加入到可用集合里
- retryToAlive(providerInfo, transport);
- }
- } catch (Exception e) {
- if (print) {
- if (LOGGER.isWarnEnabled(appName)) {
- LOGGER.warnWithApp(appName, "Retry connect to {} provider:{} error ! The exception is" + e
- .getMessage(), interfaceId, providerInfo);
- }
- } else {
- if (LOGGER.isDebugEnabled(appName)) {
- LOGGER.debugWithApp(appName, "Retry connect to {} provider:{} error ! The exception is" + e
- .getMessage(), interfaceId, providerInfo);
- }
- }
- }
- }
- if (isAliveEmptyFirst && !isAvailableEmpty()) { // 原来空, 变成不空
- notifyStateChangeToAvailable();
- }
- }
这个 doReconnect 方法里面主要做了以下几件事:
检查可用连接集合, 如果该连接不可用, 那么就将该连接从可用连接集合里剔除放入到重试集合里面.
遍历所有待重试集合, 如果该 thisTime 和 providerPeriodCoefficient 取模为零, 那么就进行重连.
设置监听器.
这里有个细节, 在 aliveToRetry 方法里面是加锁的, 尽管 aliveConnections 和 retryConnections 都是安全的集合, 但是这里有一个 if 判断, 这两步操作并不是线程安全的.
- protected void aliveToRetry(ProviderInfo providerInfo, ClientTransport transport) {
- providerLock.lock();
- try {
- // 这里两步操作并不是原子性的, 所以需要加锁
- if (aliveConnections.remove(providerInfo) != null) {
- retryConnections.put(providerInfo, transport);
- }
- } finally {
- providerLock.unlock();
- }
- }
由于我们这里并不分析网络是怎么传输和连接的, 所以暂时不分析 transport#connect, 大家只要知道这里是保持一个长连接的就可以了.
接下来我们再看一下 doubleCheck 方法:
- protected boolean doubleCheck(String interfaceId, ProviderInfo providerInfo, ClientTransport transport) {
- if (transport.isAvailable()) {
- try { // 睡一下下 防止被连上又被服务端踢下线
- Thread.sleep(100);
- } catch (InterruptedException e) {
- // ignore
- }
- if (transport.isAvailable()) { // double check
- return true;
- } else { // 可能在黑名单里, 刚连上就断开了
- if (LOGGER.isWarnEnabled(consumerConfig.getAppName())) {
- LOGGER.warnWithApp(consumerConfig.getAppName(),
- "Connection has been closed after connected (in last 100ms)!" +
- "Maybe connectionNum of provider has been reached limit," +
- "or your host is in the blacklist of provider {}/{}",
- interfaceId, transport.getConfig().getProviderInfo());
- }
- providerInfo.setDynamicAttr(ProviderInfoAttrs.ATTR_RC_PERIOD_COEFFICIENT, 5);
- return false;
- }
- } else {
- return false;
- }
- }
这里面主要是检查一下连接的稳定性, 如果一开始连接成功, 在 100ms 内又断开连接, 那么就打出警告日志, 当看到这个日志在后台的时候需要我们查看一下网络连接的情况.
然后再把 reconnectCoefficient 属性设置为 5, 当 thisTime 与 providerPeriodCoefficient 取模为 0 的时候再次尝试连接, 其中如果按默认设置的话, 需要 50 秒才会进行重连.
2. 获取服务列表
调用 consumerBootstrap#subscribe 方法进行获取服务列表, 会进入到抽象类 DefaultConsumerBootstrap 的 subscribe 方法中.
- DefaultConsumerBootstrap#subscribe
- public List<ProviderGroup> subscribe() {
- List<ProviderGroup> result = null;
- String directUrl = consumerConfig.getDirectUrl();
- if (StringUtils.isNotEmpty(directUrl)) {
- // 如果走直连
- result = subscribeFromDirectUrl(directUrl);
- } else {
- // 没有配置 url 直连
- List<RegistryConfig> registryConfigs = consumerConfig.getRegistry();
- if (CommonUtils.isNotEmpty(registryConfigs)) {
- // 从多个注册中心订阅服务列表
- result = subscribeFromRegistries();
- }
- }
- return result;
- }
这里分成两步:
如果在客户端设置了直连地址的话则调用 subscribeFromDirectUrl 方法.
如果没有配置直接地址则获取注册中心后调用 subscribeFromRegistries 方法.
DefaultConsumerBootstrap#subscribeFromDirectUrl
这个方法里面主要是将直连地址用 ";" 拆分, 然后封装成 provider 放入直连分组集合中.
- protected List<ProviderGroup> subscribeFromDirectUrl(String directUrl) {
- List<ProviderGroup> result = new ArrayList<ProviderGroup>();
- List<ProviderInfo> tmpProviderInfoList = new ArrayList<ProviderInfo>();
- // 拆分 url, 多个 url 可以用 ";" 分割
- String[] providerStrs = StringUtils.splitWithCommaOrSemicolon(directUrl);
- for (String providerStr : providerStrs) {
- ProviderInfo providerInfo = convertToProviderInfo(providerStr);
- if (providerInfo.getStaticAttr(ProviderInfoAttrs.ATTR_SOURCE) == null) {
- providerInfo.setStaticAttr(ProviderInfoAttrs.ATTR_SOURCE, "direct");
- }
- tmpProviderInfoList.add(providerInfo);
- }
- // 加入直连分组
- result.add(new ProviderGroup(RpcConstants.ADDRESS_DIRECT_GROUP, tmpProviderInfoList));
- return result;
- }
- DefaultConsumerBootstrap#subscribeFromRegistries
- protected List<ProviderGroup> subscribeFromRegistries() {
- List<ProviderGroup> result = new ArrayList<ProviderGroup>();
- List<RegistryConfig> registryConfigs = consumerConfig.getRegistry();
- // 没有配置注册中心, 直接返回
- if (CommonUtils.isEmpty(registryConfigs)) {
- return result;
- }
- // 是否等待结果
- int addressWaitTime = consumerConfig.getAddressWait();
- int maxAddressWaitTime = SofaConfigs.getIntegerValue(consumerConfig.getAppName(),
- SofaOptions.CONFIG_MAX_ADDRESS_WAIT_TIME, SofaOptions.MAX_ADDRESS_WAIT_TIME);
- addressWaitTime = addressWaitTime <0 ? maxAddressWaitTime : Math.min(addressWaitTime, maxAddressWaitTime);
- ProviderInfoListener listener = consumerConfig.getProviderInfoListener();
- // 设置 CountDownLatch 用来等待
- respondRegistries = addressWaitTime == 0 ? null : new CountDownLatch(registryConfigs.size());
- // 从注册中心订阅 {groupName: ProviderGroup}
- Map<String, ProviderGroup> tmpProviderInfoList = new HashMap<String, ProviderGroup>();
- for (RegistryConfig registryConfig : registryConfigs) {
- Registry registry = RegistryFactory.getRegistry(registryConfig);
- registry.init();
- registry.start();
- try {
- List<ProviderGroup> current;
- try {
- if (respondRegistries != null) {
- consumerConfig.setProviderInfoListener(new WrapperClusterProviderInfoListener(listener,
- respondRegistries));
- }
- current = registry.subscribe(consumerConfig);
- } finally {
- if (respondRegistries != null) {
- consumerConfig.setProviderInfoListener(listener);
- }
- }
- if (current == null) {
- continue; // 未同步返回结果
- } else {
- if (respondRegistries != null) {
- respondRegistries.countDown();
- }
- }
- for (ProviderGroup group : current) { // 当前注册中心的
- String groupName = group.getName();
- if (!group.isEmpty()) {
- ProviderGroup oldGroup = tmpProviderInfoList.get(groupName);
- if (oldGroup != null) {
- oldGroup.addAll(group.getProviderInfos());
- } else {
- tmpProviderInfoList.put(groupName, group);
- }
- }
- }
- } catch (SofaRpcRuntimeException e) {
- throw e;
- } catch (Throwable e) {
- String appName = consumerConfig.getAppName();
- if (LOGGER.isWarnEnabled(appName)) {
- LOGGER.warnWithApp(appName,
- "Catch exception when subscribe from registry:" + registryConfig.getId()
- + ", but you can ignore if it's called by JVM shutdown hook", e);
- }
- }
- }
- if (respondRegistries != null) {
- try {
- respondRegistries.await(addressWaitTime, TimeUnit.MILLISECONDS);
- } catch (Exception ignore) { // NOPMD
- }
- }
- return new ArrayList<ProviderGroup>(tmpProviderInfoList.values());
- }
这个这么长的方法实际上做了那么几件事:
遍历注册中心
初始化注册中心, 然后订阅注册中心, 以异步的方式拉去 provider
如果设置了等待, 那么就等待一段时间后返回
3. 初始化服务端连接
如果在调用 consumerBootstrap#subscribe() 后不是异步获取, 返回的就不是 null, 那么就会进入到 updateAllProviders, 所以我们来看一下这个方法里面做了什么.
- DefaultConsumerBootstrap#updateAllProviders
- public void updateAllProviders(List<ProviderGroup> providerGroups) {
- List<ProviderGroup> oldProviderGroups = new ArrayList<ProviderGroup>(addressHolder.getProviderGroups());
- int count = 0;
- if (providerGroups != null) {
- for (ProviderGroup providerGroup : providerGroups) {
- // 校验检查 providerGroup 里面的元素是不是为空
- // 消费者的配置的 protocol 是不是和 provider 的 protocol 相同
- checkProviderInfo(providerGroup);
- count += providerGroup.size();
- }
- }
- // 走到这里说明没有 provider
- if (count == 0) {
- Collection<ProviderInfo> currentProviderList = currentProviderList();
- addressHolder.updateAllProviders(providerGroups);
- if (CommonUtils.isNotEmpty(currentProviderList)) {
- if (LOGGER.isWarnEnabled(consumerConfig.getAppName())) {
- LOGGER.warnWithApp(consumerConfig.getAppName(), "Provider list is emptied, may be all" +
- "providers has been closed, or this consumer has been add to blacklist");
- closeTransports();
- }
- }
- } else {
- addressHolder.updateAllProviders(providerGroups);
- connectionHolder.updateAllProviders(providerGroups);
- }
- if (EventBus.isEnable(ProviderInfoUpdateAllEvent.class)) {
- ProviderInfoUpdateAllEvent event = new ProviderInfoUpdateAllEvent(consumerConfig, oldProviderGroups,
- providerGroups);
- EventBus.post(event);
- }
- }
其实这个方法里面就做了一件事情, 那就是把 provider 放入到 addressHolder 和 connectionHolder 中.
故障剔除
客户端在引用的时候会调用 FailoverCluster#doInvoke 方法, 然后调用父类的 select 进行路由和负载均衡选用合适的 provider.
- AbstractCluster#doInvoke
- public SofaResponse doInvoke(SofaRequest request) throws SofaRpcException {
- String methodName = request.getMethodName();
- int retries = consumerConfig.getMethodRetries(methodName);
- int time = 0;
- SofaRpcException throwable = null;// 异常日志
- List<ProviderInfo> invokedProviderInfos = new ArrayList<ProviderInfo>(retries + 1);
- do {
- // 负载均衡
- ProviderInfo providerInfo = select(request, invokedProviderInfos);
- try {
- // 调用过滤器链
- SofaResponse response = filterChain(providerInfo, request);
- if (response != null) {
- if (throwable != null) {
- if (LOGGER.isWarnEnabled(consumerConfig.getAppName())) {
- LOGGER.warnWithApp(consumerConfig.getAppName(),
- LogCodes.getLog(LogCodes.WARN_SUCCESS_BY_RETRY,
- throwable.getClass() + ":" + throwable.getMessage(),
- invokedProviderInfos));
- }
- }
- return response;
- } else {
- throwable = new SofaRpcException(RpcErrorType.CLIENT_UNDECLARED_ERROR,
- "Failed to call" + request.getInterfaceName() + "." + methodName
- + "on remote server" + providerInfo + ", return null");
- time++;
- }
- } catch (SofaRpcException e) { // 服务端异常 + 超时异常 才发起 rpc 异常重试
- if (e.getErrorType() == RpcErrorType.SERVER_BUSY
- || e.getErrorType() == RpcErrorType.CLIENT_TIMEOUT) {
- throwable = e;
- time++;
- } else {
- throw e;
- }
- } catch (Exception e) { // 其它异常不重试
- throw new SofaRpcException(RpcErrorType.CLIENT_UNDECLARED_ERROR,
- "Failed to call" + request.getInterfaceName() + "." + request.getMethodName()
- + "on remote server:" + providerInfo + ", cause by unknown exception:"
- + e.getClass().getName() + ", message is:" + e.getMessage(), e);
- } finally {
- if (RpcInternalContext.isAttachmentEnable()) {
- RpcInternalContext.getContext().setAttachment(RpcConstants.INTERNAL_KEY_INVOKE_TIMES,
- time + 1); // 重试次数
- }
- }
- invokedProviderInfos.add(providerInfo);
- } while (time <= retries);
- throw throwable;
- }
这个方法需要注意的是, 一开始 invokedProviderInfos 集合是空的, 如果调用完后没有返回 response, 而是抛出异常了, 那么就会把这个抛出异常的 provider 实例加入到 invokedProviderInfos 集合. 这个集合会在 select 方法里面用到.
AbstractCluster#select
客户端在引用服务端的时候会通过路由找到所有的 provider, 然后进行剔除. 路由是在调用 AbstractCluster#select 的时候做的. 所以我们先看看这个方法.
- protected ProviderInfo select(SofaRequest message, List<ProviderInfo> invokedProviderInfos)
- throws SofaRpcException {
- // 粘滞连接, 当前连接可用
- if (consumerConfig.isSticky()) {
- // 这个变量会在下面的 selectByProvider 方法为其赋值
- if (lastProviderInfo != null) {
- ProviderInfo providerInfo = lastProviderInfo;
- ClientTransport lastTransport = connectionHolder.getAvailableClientTransport(providerInfo);
- if (lastTransport != null && lastTransport.isAvailable()) {
- checkAlias(providerInfo, message);
- return providerInfo;
- }
- }
- }
- // 原始服务列表数据 --> 路由结果
- List<ProviderInfo> providerInfos = routerChain.route(message, null);
- // 保存一下原始地址, 为了打印
- List<ProviderInfo> orginalProviderInfos = new ArrayList<ProviderInfo>(providerInfos);
- if (CommonUtils.isEmpty(providerInfos)) {
- throw noAvailableProviderException(message.getTargetServiceUniqueName());
- }
- //invokedProviderInfos 保存的是重试的 provider, 说明该 provider 已经调用过, 并且失败了
- // 所以在这里排除
- if (CommonUtils.isNotEmpty(invokedProviderInfos) && providerInfos.size()> invokedProviderInfos.size()) { // 总数大于已调用数
- providerInfos.removeAll(invokedProviderInfos);// 已经调用异常的本次不再重试
- }
- String targetIP = null;
- ProviderInfo providerInfo;
- RpcInternalContext context = RpcInternalContext.peekContext();
- if (context != null) {
- targetIP = (String) RpcInternalContext.getContext().getAttachment(RpcConstants.HIDDEN_KEY_PINPOINT);
- }
- if (StringUtils.isNotBlank(targetIP)) {
- // 如果指定了调用地址
- providerInfo = selectPinpointProvider(targetIP, providerInfos);
- if (providerInfo == null) {
- // 指定的不存在
- throw unavailableProviderException(message.getTargetServiceUniqueName(), targetIP);
- }
- ClientTransport clientTransport = selectByProvider(message, providerInfo);
- if (clientTransport == null) {
- // 指定的不存在或已死, 抛出异常
- throw unavailableProviderException(message.getTargetServiceUniqueName(), targetIP);
- }
- return providerInfo;
- } else {
- do {
- // 再进行负载均衡筛选, 默认使用 RandomLoadBalancer
- providerInfo = loadBalancer.select(message, providerInfos);
- ClientTransport transport = selectByProvider(message, providerInfo);
- if (transport != null) {
- return providerInfo;
- }
- providerInfos.remove(providerInfo);
- } while (!providerInfos.isEmpty());
- }
- throw unavailableProviderException(message.getTargetServiceUniqueName(),
- convertProviders2Urls(orginalProviderInfos));
- }
这个方法主要做了如下几件事:
如果设置了粘滞连接, 那么会继续调用上一次使用过的 provider
调用 router 获取原始服务列表数据
如果 invokedProviderInfos 不为空的话, 原始服务列表里面需要剔除掉这些 provider
如果设置了直连, 那么调用 selectPinpointProvider 获取选定的 provider, 不存在故障剔除
没有设置直连, 则循环调用筛选
路由筛选 porvider
- RouterChain#route
- public List<ProviderInfo> route(SofaRequest request, List<ProviderInfo> providerInfos) {
- for (Router router : routers) {
- providerInfos = router.route(request, providerInfos);
- }
- return providerInfos;
- }
- //RegistryRouter#route
- public List<ProviderInfo> route(SofaRequest request, List<ProviderInfo> providerInfos) {
- //has address. FIXME
- if (CommonUtils.isNotEmpty(providerInfos)) {
- return providerInfos;
- }
- AddressHolder addressHolder = consumerBootstrap.getCluster().getAddressHolder();
- if (addressHolder != null) {
- List<ProviderInfo> current = addressHolder.getProviderInfos(RpcConstants.ADDRESS_DEFAULT_GROUP);
- if (providerInfos != null) {
- providerInfos.addAll(current);
- } else {
- providerInfos = current;
- }
- }
- recordRouterWay(RPC_REGISTRY_ROUTER);
- return providerInfos;
- }
我们这里考虑 RegistryRouter 进行路由选择的情况.
在 RegistryRouter#route 里面首先获取 addressHolder, 调用其实现类 SingleGroupAddressHolder
- SingleGroupAddressHolder#getProviderInfos
- /**
- * 配置的直连地址列表
- */
- protected ProviderGroup directUrlGroup;
- /**
- * 注册中心来的地址列表
- */
- protected ProviderGroup registryGroup;
- private ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
- private Lock rLock = lock.readLock();
- public List<ProviderInfo> getProviderInfos(String groupName) {
- rLock.lock();
- try {
- // 复制一份
- return new ArrayList<ProviderInfo>(getProviderGroup(groupName).getProviderInfos());
- } finally {
- rLock.unlock();
- }
- }
- public ProviderGroup getProviderGroup(String groupName) {
- rLock.lock();
- try {
- return RpcConstants.ADDRESS_DIRECT_GROUP.equals(groupName) ? directUrlGroup
- : registryGroup;
- } finally {
- rLock.unlock();
- }
- }
这里用的是读写锁, 也就是说, 在读的时候可以并发读, 但是不允许读的时候有写的操作. 然后根据 groupName 获取到相应的直连集合.
实现故障剔除, 筛选合适的 provider
- do {
- // 再进行负载均衡筛选, 默认使用 RandomLoadBalancer
- providerInfo = loadBalancer.select(message, providerInfos);
- ClientTransport transport = selectByProvider(message, providerInfo);
- if (transport != null) {
- return providerInfo;
- }
- providerInfos.remove(providerInfo);
- } while (!providerInfos.isEmpty());
这里是真正实现了故障剔除的方法, 负载均衡我已经在上一篇已经分析过了, 这里不再赘述, 所以我们直接看到 selectByProvider 方法中
- protected ClientTransport selectByProvider(SofaRequest message, ProviderInfo providerInfo) {
- ClientTransport transport = connectionHolder.getAvailableClientTransport(providerInfo);
- if (transport != null) {
- if (transport.isAvailable()) {
- lastProviderInfo = providerInfo;
- checkAlias(providerInfo, message); // 检查分组
- return transport;
- } else {
- connectionHolder.setUnavailable(providerInfo, transport);
- }
- }
- return null;
- }
- //AllConnectConnectionHolder#getAvailableClientTransport
- public ClientTransport getAvailableClientTransport(ProviderInfo providerInfo) {
- // 先去存活列表
- ClientTransport transport = aliveConnections.get(providerInfo);
- if (transport != null) {
- return transport;
- }
- // 再去亚健康列表 这个列表暂时没有实现的地方
- transport = subHealthConnections.get(providerInfo);
- if (transport != null) {
- return transport;
- }
- // 最后看看是否第一次调用未初始化
- transport = uninitializedConnections.get(providerInfo);
- if (transport != null) {
- // 未初始化则初始化, 这里是 lazy 为 ture 的情况, 延迟初始化
- synchronized (this) {
- transport = uninitializedConnections.get(providerInfo);
- if (transport != null) {
- initClientTransport(consumerConfig.getInterfaceId(), providerInfo, transport);
- uninitializedConnections.remove(providerInfo);
- }
- return getAvailableClientTransport(providerInfo);
- }
- }
- return null;
- }
当我们进入到 getAvailableClientTransport 这个方法的看到存货列表和未初始化列表的时候有没有似曾相识的感觉? 没错, 这几个参数就是我们上面讲到的客户端会初始化一个心跳线程, 在心跳线程里面维护这几个参数.
所以这个方法主要做了以下几件事:
去存活列表里面找 transport
去亚健康列表里面找 transport, 当然目前的版本并没有维护亚健康列表, 所以永远找不到
如果设置了延迟加载, 那么会去 uninitializedConnections 里面找到 transport, 然后再调用 initClientTransport 方法进行初始化
如果找不到那么就返回 null
如果返回 null, 那么会回到上面的 do-while 循环进行再次的筛选
好了, 那么 SOFARPC 是如何实现故障剔除的就已经分析完了, 如果这篇文章对你有所帮助, 不妨点个赞, 谢谢.
SOFARPC 源码解析系列:
1. 源码分析 ---SOFARPC 可扩展的机制 SPI
2. 源码分析 ---SOFARPC 客户端服务引用
3. 源码分析 ---SOFARPC 客户端服务调用
4. 源码分析 ---SOFARPC 服务端暴露
5. 源码分析 ---SOFARPC 调用服务
6. 源码分析 --- 和 dubbo 相比 SOFARPC 是如何实现负载均衡的?
来源: https://www.cnblogs.com/luozhiyun/p/11315156.html