断断续续看Ribbon的源码差不多也有7-8天了,总算告一段落。本文记录了这些天对源码的阅读过程与一些分析理解,如有不对还请指出。
在之前介绍使用Ribbon进行服务消费的时候,我们用到了
,但是熟悉Spring的同学们是否产生过这样的疑问:
- RestTemplate
不是Spring自己就有的吗?跟Ribbon的客户端负载均衡又有什么关系呢?下面在本文,我们来看
- RestTemplate
和
- RestTemplate
是如何联系起来并实现客户端负载均衡的。
- Ribbon
首先,回顾一下之前的消费者示例:我们是如何实现客户端负载均衡的?仔细观察一下代码之前的代码,我们可以发现在消费者的例子中,可能就是这个注解
是之前没有接触过的,并且从命名上来看也与负载均衡相关。我们不妨以此为线索来看看源码实现的机制。
- @LoadBalanced
从
注解源码的注释中,我们可以知道该注解用来给
- @LoadBalanced
标记,以使用负载均衡的客户端(
- RestTemplate
)来配置它。
- LoadBalancerClient
通过搜索
,我们可以发现这是Spring Cloud中定义的一个接口:
- LoadBalancerClient
- public interface LoadBalancerClient {
-
- ServiceInstance choose(String serviceId);
-
- < T > T execute(String serviceId, LoadBalancerRequest < T > request) throws IOException;
-
- URI reconstructURI(ServiceInstance instance, URI original);
-
- }
从该接口中,我们可以通过定义的抽象方法来了解到客户端负载均衡器中应具备的几种能力:
:根据传入的服务名
- ServiceInstance choose(String serviceId)
,从负载均衡器中挑选一个对应服务的实例。
- serviceId
:使用从负载均衡器中挑选出的服务实例来执行请求内容。
- T execute(String serviceId, LoadBalancerRequest request) throws IOException
:为系统构建一个合适的“host:port”形式的URI。在分布式系统中,我们使用逻辑上的服务名称作为host来构建URI(替代服务实例的“host:port”形式)进行请求,比如:
- URI reconstructURI(ServiceInstance instance, URI original)
。在该操作的定义中,前者
- http://myservice/path/to/service
对象是带有host和port的具体服务实例,而后者URI对象则是使用逻辑服务名定义为host的URI,而返回的URI内容则是通过
- ServiceInstance
的服务实例详情拼接出的具体“host:post”形式的请求地址。
- ServiceInstance
顺着
接口的所属包
- LoadBalancerClient
,我们对其内容进行整理,可以得出如下图的关系:
- org.springframework.cloud.client.loadbalancer
从类的命名上我们初步判断
为实现客户端负载均衡器的自动化配置类。通过查看源码,我们可以验证这一点假设:
- LoadBalancerAutoConfiguration
- @Configuration
- @ConditionalOnClass(RestTemplate.class)
- @ConditionalOnBean(LoadBalancerClient.class)
- public class LoadBalancerAutoConfiguration {
-
- @LoadBalanced
- @Autowired(required = false)
- private List<RestTemplate> restTemplates = Collections.emptyList();
-
- @Bean
- public SmartInitializingSingleton loadBalancedRestTemplateInitializer(
- final List<RestTemplateCustomizer> customizers) {
- return new SmartInitializingSingleton() {
- @Override
- public void afterSingletonsInstantiated() {
- for (RestTemplate restTemplate : LoadBalancerAutoConfiguration.this.restTemplates) {
- for (RestTemplateCustomizer customizer : customizers) {
- customizer.customize(restTemplate);
- }
- }
- }
- };
- }
-
- @Bean
- @ConditionalOnMissingBean
- public RestTemplateCustomizer restTemplateCustomizer(
- final LoadBalancerInterceptor loadBalancerInterceptor) {
- return new RestTemplateCustomizer() {
- @Override
- public void customize(RestTemplate restTemplate) {
- List<ClientHttpRequestInterceptor> list = new ArrayList<>(
- restTemplate.getInterceptors());
- list.add(loadBalancerInterceptor);
- restTemplate.setInterceptors(list);
- }
- };
- }
-
- @Bean
- public LoadBalancerInterceptor ribbonInterceptor(
- LoadBalancerClient loadBalancerClient) {
- return new LoadBalancerInterceptor(loadBalancerClient);
- }
-
- }
从
类头上的注解可以知道Ribbon实现的负载均衡自动化配置需要满足下面两个条件:
- LoadBalancerAutoConfiguration
:
- @ConditionalOnClass(RestTemplate.class)
类必须存在于当前工程的环境中。
- RestTemplate
:在Spring的Bean工程中有必须有
- @ConditionalOnBean(LoadBalancerClient.class)
的实现Bean。
- LoadBalancerClient
在该自动化配置类中,主要做了下面三件事:
的Bean,用于实现对客户端发起请求时进行拦截,以实现客户端负载均衡。
- LoadBalancerInterceptor
的Bean,用于给
- RestTemplateCustomizer
增加
- RestTemplate
拦截器。
- LoadBalancerInterceptor
注解修饰的
- @LoadBalanced
对象列表,并在这里进行初始化,通过调用
- RestTemplate
的实例来给需要客户端负载均衡的
- RestTemplateCustomizer
增加
- RestTemplate
拦截器。
- LoadBalancerInterceptor
接下来,我们看看
拦截器是如何将一个普通的
- LoadBalancerInterceptor
变成客户端负载均衡的:
- RestTemplate
- public class LoadBalancerInterceptor implements ClientHttpRequestInterceptor {
-
- private LoadBalancerClient loadBalancer;
-
- public LoadBalancerInterceptor(LoadBalancerClient loadBalancer) {
- this.loadBalancer = loadBalancer;
- }
-
- @Override
- public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,
- final ClientHttpRequestExecution execution) throws IOException {
- final URI originalUri = request.getURI();
- String serviceName = originalUri.getHost();
- return this.loadBalancer.execute(serviceName,
- new LoadBalancerRequest<ClientHttpResponse>() {
- @Override
- public ClientHttpResponse apply(final ServiceInstance instance)
- throws Exception {
- HttpRequest serviceRequest = new ServiceRequestWrapper(request,
- instance);
- return execution.execute(serviceRequest, body);
- }
- });
- }
-
- private class ServiceRequestWrapper extends HttpRequestWrapper {
-
- private final ServiceInstance instance;
-
- public ServiceRequestWrapper(HttpRequest request, ServiceInstance instance) {
- super(request);
- this.instance = instance;
- }
-
- @Override
- public URI getURI() {
- URI uri = LoadBalancerInterceptor.this.loadBalancer.reconstructURI(
- this.instance, getRequest().getURI());
- return uri;
- }
- }
- }
通过源码以及之前的自动化配置类,我们可以看到在拦截器中注入了
的实现。当一个被
- LoadBalancerClient
注解修饰的
- @LoadBalanced
对象向外发起HTTP请求时,会被
- RestTemplate
类的
- LoadBalancerInterceptor
函数所拦截。由于我们在使用RestTemplate时候采用了服务名作为host,所以直接从
- intercept
的URI对象中通过getHost()就可以拿到服务名,然后调用
- HttpRequest
函数去根据服务名来选择实例并发起实际的请求。
- execute
分析到这里,
还只是一个抽象的负载均衡器接口,所以我们还需要找到它的具体实现类来进一步分析。通过查看ribbon的源码,我们可以很容易的在
- LoadBalancerClient
包下找到对应的实现类:
- org.springframework.cloud.netflix.ribbon
。
- RibbonLoadBalancerClient
- public <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException {
- ILoadBalancer loadBalancer = getLoadBalancer(serviceId);
- Server server = getServer(loadBalancer);
- if (server == null) {
- throw new IllegalStateException("No instances available for " + serviceId);
- }
- RibbonServer ribbonServer = new RibbonServer(serviceId, server, isSecure(server,
- serviceId), serverIntrospector(serviceId).getMetadata(server));
-
- RibbonLoadBalancerContext context = this.clientFactory
- .getLoadBalancerContext(serviceId);
- RibbonStatsRecorder statsRecorder = new RibbonStatsRecorder(context, server);
-
- try {
- T returnVal = request.apply(ribbonServer);
- statsRecorder.recordStats(returnVal);
- return returnVal;
- }
- catch (IOException ex) {
- statsRecorder.recordStats(ex);
- throw ex;
- }
- catch (Exception ex) {
- statsRecorder.recordStats(ex);
- ReflectionUtils.rethrowRuntimeException(ex);
- }
- return null;
- }
可以看到,在
函数的实现中,第一步做的就是通过
- execute
根据传入的服务名
- getServer
去获得具体的服务实例:
- serviceId
- protected Server getServer(ILoadBalancer loadBalancer) {
- if (loadBalancer == null) {
- return null;
- }
- return loadBalancer.chooseServer("default");
- }
通过
函数的实现源码,我们可以看到这里获取具体服务实例的时候并没有使用
- getServer
接口中的
- LoadBalancerClient
函数,而是使用了ribbon自身的
- choose
接口中定义的
- ILoadBalancer
函数。
- chooseServer
我们先来认识一下
接口:
- ILoadBalancer
- public interface ILoadBalancer {
-
- public void addServers(List < Server > newServers);
-
- public Server chooseServer(Object key);
-
- public void markServerDown(Server server);
-
- public List < Server > getReachableServers();
-
- public List < Server > getAllServers();
- }
可以看到,在该接口中定义了一个软负载均衡器需要的一系列抽象操作(未例举过期函数):
:向负载均衡器中维护的实例列表增加服务实例。
- addServers
:通过某种策略,从负载均衡器中挑选出一个具体的服务实例。
- chooseServer
:用来通知和标识负载均衡器中某个具体实例已经停止服务,不然负载均衡器在下一次获取服务实例清单前都会认为服务实例均是正常服务的。
- markServerDown
:获取当前正常服务的实例列表。
- getReachableServers
:获取所有已知的服务实例列表,包括正常服务和停止服务的实例。
- getAllServers
在该接口定义中涉及到的
对象定义的是一个传统的服务端节点,在该类中存储了服务端节点的一些元数据信息,包括:host、port以及一些部署信息等。
- Server
而对于该接口的实现,我们可以整理出如上图所示的结构。我们可以看到
类实现了基础的负载均衡,而
- BaseLoadBalancer
和
- DynamicServerListLoadBalancer
在负载均衡的策略上做了一些功能的扩展。
- ZoneAwareLoadBalancer
那么在整合Ribbon的时候Spring Cloud默认采用了哪个具体实现呢?我们通过
配置类,可以知道在整合时默认采用了
- RibbonClientConfiguration
来实现负载均衡器。
- ZoneAwareLoadBalancer
- @Bean
- @ConditionalOnMissingBean
- public ILoadBalancer ribbonLoadBalancer(IClientConfig config,
- ServerList<Server> serverList, ServerListFilter<Server> serverListFilter,
- IRule rule, IPing ping) {
- ZoneAwareLoadBalancer<Server> balancer = LoadBalancerBuilder.newBuilder()
- .withClientConfig(config).withRule(rule).withPing(ping)
- .withServerListFilter(serverListFilter).withDynamicServerList(serverList)
- .buildDynamicServerListLoadBalancer();
- return balancer;
- }
下面,我们再回到
的
- RibbonLoadBalancerClient
函数逻辑,在通过
- execute
的
- ZoneAwareLoadBalancer
函数获取了负载均衡策略分配到的服务实例对象
- chooseServer
之后,将其内容包装成
- Server
对象(该对象除了存储了服务实例的信息之外,还增加了服务名serviceId、是否需要使用HTTPS等其他信息),然后使用该对象再回调
- RibbonServer
请求拦截器中
- LoadBalancerInterceptor
的
- LoadBalancerRequest
函数,向一个实际的具体服务实例发起请求,从而实现一开始以服务名为host的URI请求,到实际访问host:post形式的具体地址的转换。
- apply(final ServiceInstance instance)
函数中传入的
- apply(final ServiceInstance instance)
接口是对服务实例的抽象定义。在该接口中暴露了服务治理系统中每个服务实例需要提供的一些基本信息,比如:serviceId、host、port等,具体定义如下:
- ServiceInstance
- public interface ServiceInstance {
-
- String getServiceId();
-
- String getHost();
-
- int getPort();
-
- boolean isSecure();
-
- URI getUri();
-
- Map < String,
- String > getMetadata();
- }
而上面提到的具体包装
服务实例的
- Server
对象就是
- RibbonServer
接口的实现,可以看到它除了包含了
- ServiceInstance
对象之外,还存储了服务名、是否使用https标识以及一个Map类型的元数据集合。
- Server
- protected static class RibbonServer implements ServiceInstance {
-
- private final String serviceId;
- private final Server server;
- private final boolean secure;
- private Map < String,
- String > metadata;
-
- protected RibbonServer(String serviceId, Server server) {
- this(serviceId, server, false, Collections. < String, String > emptyMap());
- }
-
- protected RibbonServer(String serviceId, Server server, boolean secure, Map < String, String > metadata) {
- this.serviceId = serviceId;
- this.server = server;
- this.secure = secure;
- this.metadata = metadata;
- }
-
- // 省略实现ServiceInstance的一些获取Server信息的get函数
- ...
- }
那么
函数,在接收到了具体
- apply(final ServiceInstance instance)
实例后,是如何通过
- ServiceInstance
接口中的
- LoadBalancerClient
操作来组织具体请求地址的呢?
- reconstructURI
- @Override
- public ClientHttpResponse apply(final ServiceInstance instance)
- throws Exception {
- HttpRequest serviceRequest = new ServiceRequestWrapper(request, instance);
- return execution.execute(serviceRequest, body);
- }
从
的实现中,我们可以看到它具体执行的时候,还传入了
- apply
对象,该对象继承了
- ServiceRequestWrapper
并重写了
- HttpRequestWrapper
函数,重写后的
- getURI
会通过调用
- getURI
接口的
- LoadBalancerClient
函数来重新构建一个URI来进行访问。
- reconstructURI
- private class ServiceRequestWrapper extends HttpRequestWrapper {
-
- private final ServiceInstance instance;
-
- ...
-
- @Override
- public URI getURI() {
- URI uri = LoadBalancerInterceptor.this.loadBalancer.reconstructURI(
- this.instance, getRequest().getURI());
- return uri;
- }
- }
在
拦截器中,
- LoadBalancerInterceptor
的实例具体执行
- ClientHttpRequestExecution
时,会调用
- execution.execute(serviceRequest, body)
下
- InterceptingClientHttpRequest
类的
- InterceptingRequestExecution
函数,具体实现如下:
- execute
- public ClientHttpResponse execute(HttpRequest request, byte[] body) throws IOException {
- if (this.iterator.hasNext()) {
- ClientHttpRequestInterceptor nextInterceptor = this.iterator.next();
- return nextInterceptor.intercept(request, body, this);
- } else {
- ClientHttpRequest delegate = requestFactory.createRequest(request.getURI(), request.getMethod());
- delegate.getHeaders().putAll(request.getHeaders());
- if (body.length > 0) {
- StreamUtils.copy(body, delegate.getBody());
- }
- return delegate.execute();
- }
- }
可以看到在创建请求的时候
,这里
- requestFactory.createRequest(request.getURI(), request.getMethod());
会调用之前介绍的
- request.getURI()
对象中重写的
- ServiceRequestWrapper
函数。此时,它就会使用
- getURI
中实现的
- RibbonLoadBalancerClient
来组织具体请求的服务实例地址。
- reconstructURI
- public URI reconstructURI(ServiceInstance instance, URI original) {
- Assert.notNull(instance, "instance can not be null");
- String serviceId = instance.getServiceId();
- RibbonLoadBalancerContext context = this.clientFactory.getLoadBalancerContext(serviceId);
- Server server = new Server(instance.getHost(), instance.getPort());
- boolean secure = isSecure(server, serviceId);
- URI uri = original;
- if (secure) {
- uri = UriComponentsBuilder.fromUri(uri).scheme("https").build().toUri();
- }
- return context.reconstructURIWithServer(server, uri);
- }
从
函数中,我们可以看到,它通过
- reconstructURI
实例对象的
- ServiceInstance
,从
- serviceId
类的
- SpringClientFactory
对象中获取对应
- clientFactory
的负载均衡器的上下文
- serviceId
对象。然后根据
- RibbonLoadBalancerContext
中的信息来构建具体服务实例信息的
- ServiceInstance
对象,并使用
- Server
对象的
- RibbonLoadBalancerContext
函数来构建服务实例的URI。
- reconstructURIWithServer
为了帮助理解,简单介绍一下上面提到的
和
- SpringClientFactory
:
- RibbonLoadBalancerContext
类是一个用来创建客户端负载均衡器的工厂类,该工厂会为每一个不同名的ribbon客户端生成不同的Spring上下文。
- SpringClientFactory
类是
- RibbonLoadBalancerContext
的子类,该类用于存储一些被负载均衡器使用的上下文内容和Api操作(
- LoadBalancerContext
就是其中之一)。
- reconstructURIWithServer
从
的实现中我们可以看到,它同
- reconstructURIWithServer
的定义类似。只是
- reconstructURI
的第一个保存具体服务实例的参数使用了Spring Cloud定义的
- reconstructURI
,而
- ServiceInstance
中使用了Netflix中定义的
- reconstructURIWithServer
,所以在
- Server
实现
- RibbonLoadBalancerClient
时候,做了一次转换,使用
- reconstructURI
的host和port信息来构建了一个
- ServiceInstance
对象来给
- Server
使用。从
- reconstructURIWithServer
的实现逻辑中,我们可以看到,它从
- reconstructURIWithServer
对象中获取host和port信息,然后根据以服务名为host的
- Server
对象original中获取其他请求信息,将两者内容进行拼接整合,形成最终要访问的服务实例的具体地址。
- URI
- public class LoadBalancerContext implements IClientConfigAware {
-
- ...
-
- public URI reconstructURIWithServer(Server server, URI original) {
- String host = server.getHost();
- int port = server.getPort();
- if (host.equals(original.getHost()) && port == original.getPort()) {
- return original;
- }
- String scheme = original.getScheme();
- if (scheme == null) {
- scheme = deriveSchemeAndPortFromPartialUri(original).first();
- }
-
- try {
- StringBuilder sb = new StringBuilder();
- sb.append(scheme).append("://");
- if (!Strings.isNullOrEmpty(original.getRawUserInfo())) {
- sb.append(original.getRawUserInfo()).append("@");
- }
- sb.append(host);
- if (port >= 0) {
- sb.append(":").append(port);
- }
- sb.append(original.getRawPath());
- if (!Strings.isNullOrEmpty(original.getRawQuery())) {
- sb.append("?").append(original.getRawQuery());
- }
- if (!Strings.isNullOrEmpty(original.getRawFragment())) {
- sb.append("#").append(original.getRawFragment());
- }
- URI newURI = new URI(sb.toString());
- return newURI;
- } catch(URISyntaxException e) {
- throw new RuntimeException(e);
- }
- }
-
- ...
- }
另外,从
的
- RibbonLoadBalancerClient
的函数逻辑中,我们还能看到在回调拦截器中,执行具体的请求之后,ribbon还通过
- execute
对象对服务的请求还进行了跟踪记录,这里不再展开说明,有兴趣的读者可以继续研究。
- RibbonStatsRecorder
分析到这里,我们已经可以大致理清Spring Cloud中使用Ribbon实现客户端负载均衡的基本脉络。了解了它是如何通过
拦截器对
- LoadBalancerInterceptor
的请求进行拦截,并利用Spring Cloud的负载均衡器
- RestTemplate
将以逻辑服务名为host的URI转换成具体的服务实例的过程。同时通过分析
- LoadBalancerClient
的Ribbon实现
- LoadBalancerClient
,可以知道在使用Ribbon实现负载均衡器的时候,实际使用的还是Ribbon中定义的
- RibbonLoadBalancerClient
接口的实现,自动化配置会采用
- ILoadBalancer
的实例来进行客户端负载均衡实现。
- ZoneAwareLoadBalancer
通过之前的分析,我们已经对Spring Cloud如何使用Ribbon有了基本的了解。虽然Spring Cloud中定义了
为负载均衡器的接口,并且针对Ribbon实现了
- LoadBalancerClient
,但是它在具体实现客户端负载均衡时,则是通过Ribbon的
- RibbonLoadBalancerClient
接口实现。在上一节分析时候,我们对该接口的实现结构已经做了一些简单的介绍,下面我们根据
- ILoadBalancer
接口的实现类逐个看看它都是如何实现客户端负载均衡的。
- ILoadBalancer
是
- AbstractLoadBalancer
接口的抽象实现。在该抽象类中定义了一个关于服务实例的分组枚举类
- ILoadBalancer
,它包含了三种不同类型:ALL-所有服务实例、STATUS_UP-正常服务的实例、STATUS_NOT_UP-停止服务的实例;实现了一个
- ServerGroup
函数,该函数通过调用接口中的
- chooseServer()
实现,其中参数
- chooseServer(Object key)
为null,表示在选择具体服务实例时忽略
- key
的条件判断;最后还定义了两个抽象函数,
- key
定义了根据分组类型来获取不同的服务实例列表,
- getServerList(ServerGroup serverGroup)
定义了获取
- getLoadBalancerStats()
对象的方法,
- LoadBalancerStats
对象被用来存储负载均衡器中各个服务实例当前的属性和统计信息,这些信息非常有用,我们可以利用这些信息来观察负载均衡器的运行情况,同时这些信息也是用来制定负载均衡策略的重要依据。
- LoadBalancerStats
- public abstract class AbstractLoadBalancer implements ILoadBalancer {
-
- public enum ServerGroup {
- ALL,
- STATUS_UP,
- STATUS_NOT_UP
- }
-
- public Server chooseServer() {
- return chooseServer(null);
- }
-
- public abstract List < Server > getServerList(ServerGroup serverGroup);
-
- public abstract LoadBalancerStats getLoadBalancerStats();
- }
类是Ribbon负载均衡器的基础实现类,在该类中定义很多关于均衡负载器相关的基础内容:
- BaseLoadBalancer
对象的列表。一个用于存储所有服务实例的清单,一个用于存储正常服务的实例清单。
- Server
- @Monitor(name = PREFIX + "AllServerList", type = DataSourceType.INFORMATIONAL)
- protected volatile List<Server> allServerList = Collections
- .synchronizedList(new ArrayList<Server>());
- @Monitor(name = PREFIX + "UpServerList", type = DataSourceType.INFORMATIONAL)
- protected volatile List<Server> upServerList = Collections
- .synchronizedList(new ArrayList<Server>());
对象。
- LoadBalancerStats
对象,在
- IPing
中默认为null,需要在构造时注入它的具体实现。
- BaseLoadBalancer
,在
- IPingStrategy
中默认使用了该类中定义的静态内部类
- BaseLoadBalancer
实现。根据源码,我们可以看到该策略采用线性遍历ping服务实例的方式实现检查。该策略在当我们实现的
- SerialPingStrategy
速度不理想,或是
- IPing
列表过大时,可能变的不是很为理想,这时候我们需要通过实现
- Server
接口并实现
- IPingStrategy
函数去扩展ping的执行策略。
- pingServers(IPing ping, Server[] servers)
- private static class SerialPingStrategy implements IPingStrategy {
- @Override
- public boolean[] pingServers(IPing ping, Server[] servers) {
- int numCandidates = servers.length;
- boolean[] results = new boolean[numCandidates];
-
- if (logger.isDebugEnabled()) {
- logger.debug("LoadBalancer: PingTask executing ["
- + numCandidates + "] servers configured");
- }
-
- for (int i = 0; i < numCandidates; i++) {
- results[i] = false;
- try {
- if (ping != null) {
- results[i] = ping.isAlive(servers[i]);
- }
- } catch (Throwable t) {
- logger.error("Exception while pinging Server:"
- + servers[i], t);
- }
- }
- return results;
- }
- }
对象,从
- IRule
中
- BaseLoadBalancer
的实现源码,我们可以知道负载均衡器实际进行服务实例选择任务是委托给了
- chooseServer(Object key)
实例中的
- IRule
函数来实现。而在这里,默认初始化了
- choose
为
- RoundRobinRule
的实现对象。
- IRule
实现了最基本且常用的线性负载均衡规则。
- RoundRobinRule
- public Server chooseServer(Object key) {
- if (counter == null) {
- counter = createCounter();
- }
- counter.increment();
- if (rule == null) {
- return null;
- } else {
- try {
- return rule.choose(key);
- } catch(Throwable t) {
- return null;
- }
- }
- }
的默认构造函数中,会直接启动一个用于定时检查
- BaseLoadBalancer
是否健康的任务。该任务默认的执行间隔为:10秒。
- Server
接口定义的负载均衡器应具备的一系列基本操作:
- ILoadBalancer
:向负载均衡器中增加新的服务实例列表,该实现将原本已经维护着的所有服务实例清单
- addServers(List newServers)
和新传入的服务实例清单
- allServerList
都加入到
- newServers
中,然后通过调用
- newList
函数对
- setServersList
进行处理,在
- newList
中实现的时候会使用新的列表覆盖旧的列表。而之后介绍的几个扩展实现类对于服务实例清单更新的优化都是对
- BaseLoadBalancer
函数的重写来实现的。
- setServersList
- public void addServers(List < Server > newServers) {
- if (newServers != null && newServers.size() > 0) {
- try {
- ArrayList < Server > newList = new ArrayList < Server > ();
- newList.addAll(allServerList);
- newList.addAll(newServers);
- setServersList(newList);
- } catch(Exception e) {
- logger.error("Exception while adding Servers", e);
- }
- }
- }
:挑选一个具体的服务实例,在上面介绍
- chooseServer(Object key)
的时候,已经做了说明,这里不再赘述。
- IRule
:标记某个服务实例暂停服务。
- markServerDown(Server server)
- public void markServerDown(Server server) {
- if (server == null) {
- return;
- }
- if (!server.isAlive()) {
- return;
- }
- logger.error("LoadBalancer: markServerDown called on [" + server.getId() + "]");
- server.setAlive(false);
- notifyServerStatusChangeListener(singleton(server));
- }
:获取可用的服务实例列表。由于
- getReachableServers()
中单独维护了一个正常服务的实例清单,所以直接返回即可。
- BaseLoadBalancer
- public List < Server > getReachableServers() {
- return Collections.unmodifiableList(upServerList);
- }
:获取所有的服务实例列表。由于
- getAllServers()
中单独维护了一个所有服务的实例清单,所以也直接返回它即可。
- BaseLoadBalancer
- public List < Server > getAllServers() {
- return Collections.unmodifiableList(allServerList);
- }
类继承于
- DynamicServerListLoadBalancer
类,它是对基础负载均衡器的扩展。在该负载均衡器中,实现了服务实例清单的在运行期的动态更新能力;同时,它还具备了对服务实例清单的过滤功能,也就是说我们可以通过过滤器来选择性的获取一批服务实例清单。下面我们具体来看看在该类中增加了一些什么内容:
- BaseLoadBalancer
从
的成员定义中,我们马上可以发现新增了一个关于服务列表的操作对象:
- DynamicServerListLoadBalancer
。其中泛型
- ServerList<T> serverListImpl
从类名中对于T的限定
- T
可以获知它是一个
- DynamicServerListLoadBalancer<T extends Server>
的子类,即代表了一个具体的服务实例的扩展类。而
- Server
接口定义如下所示:
- ServerList
- public interface ServerList < T extends Server > {
-
- public List < T > getInitialListOfServers();
-
- public List < T > getUpdatedListOfServers();
- }
它定义了两个抽象方法:
用于获取初始化的服务实例清单,而
- getInitialListOfServers
用于获取更新的服务实例清单。那么该接口的实现有哪些呢?通过搜索源码,我们可以整出如下图的结构:
- getUpdatedListOfServers
从图中我们可以看到有很多个
的实现类,那么在
- ServerList
中的
- DynamicServerListLoadBalancer
默认配置到底使用了哪个具体实现呢?既然在该负载均衡器中需要实现服务实例的动态更新,那么势必需要ribbon具备访问eureka来获取服务实例的能力,所以我们从Spring Cloud整合ribbon与eureka的包
- ServerList
下探索,可以找到配置类
- org.springframework.cloud.netflix.ribbon.eureka
,在该类中可以找到看到下面创建
- EurekaRibbonClientConfiguration
实例的内容:
- ServerList
- @Bean
- @ConditionalOnMissingBean
- public ServerList<?> ribbonServerList(IClientConfig config) {
- DiscoveryEnabledNIWSServerList discoveryServerList = new DiscoveryEnabledNIWSServerList(
- config);
- DomainExtractingServerList serverList = new DomainExtractingServerList(
- discoveryServerList, config, this.approximateZoneFromHostname);
- return serverList;
- }
可以看到,这里创建的是一个
实例,从下面它的源码中我们可以看到在它内部还定义了一个
- DomainExtractingServerList
。同时,
- ServerList list
类中对
- DomainExtractingServerList
和
- getInitialListOfServers
的具体实现,其实委托给了内部定义的
- getUpdatedListOfServers
对象,而该对象是通过创建
- ServerList list
时候,由构造函数传入的
- DomainExtractingServerList
实现的。
- DiscoveryEnabledNIWSServerList
- public class DomainExtractingServerList implements ServerList<DiscoveryEnabledServer> {
-
- private ServerList<DiscoveryEnabledServer> list;
- private IClientConfig clientConfig;
- private boolean approximateZoneFromHostname;
-
- public DomainExtractingServerList(ServerList<DiscoveryEnabledServer> list,
- IClientConfig clientConfig, boolean approximateZoneFromHostname) {
- this.list = list;
- this.clientConfig = clientConfig;
- this.approximateZoneFromHostname = approximateZoneFromHostname;
- }
-
- @Override
- public List<DiscoveryEnabledServer> getInitialListOfServers() {
- List<DiscoveryEnabledServer> servers = setZones(this.list
- .getInitialListOfServers());
- return servers;
- }
-
- @Override
- public List<DiscoveryEnabledServer> getUpdatedListOfServers() {
- List<DiscoveryEnabledServer> servers = setZones(this.list
- .getUpdatedListOfServers());
- return servers;
- }
- ...
- }
那么
是如何实现这两个服务实例的获取的呢?我们从源码中可以看到这两个方法都是通过该类中的一个私有函数
- DiscoveryEnabledNIWSServerList
来通过服务发现机制来实现服务实例的获取。
- obtainServersViaDiscovery
- @Override
- public List<DiscoveryEnabledServer> getInitialListOfServers(){
- return obtainServersViaDiscovery();
- }
-
- @Override
- public List<DiscoveryEnabledServer> getUpdatedListOfServers(){
- return obtainServersViaDiscovery();
- }
而
的实现逻辑如下,主要依靠
- obtainServersViaDiscovery
从服务注册中心中获取到具体的服务实例
- EurekaClient
列表(
- InstanceInfo
的具体实现,我们在分析Eureka的源码时已经做了详细的介绍,这里传入的
- EurekaClient
可以理解为逻辑上的服务名,比如“USER-SERVICE”)。接着,对这些服务实例进行遍历,将状态为“UP”(正常服务)的实例转换成
- vipAddress
对象,最后将这些实例组织成列表返回。
- DiscoveryEnabledServer
- private List < DiscoveryEnabledServer > obtainServersViaDiscovery() {
- List < DiscoveryEnabledServer > serverList = new ArrayList < DiscoveryEnabledServer > ();
-
- if (eurekaClientProvider == null || eurekaClientProvider.get() == null) {
- logger.warn("EurekaClient has not been initialized yet, returning an empty list");
- return new ArrayList < DiscoveryEnabledServer > ();
- }
-
- EurekaClient eurekaClient = eurekaClientProvider.get();
- if (vipAddresses != null) {
- for (String vipAddress: vipAddresses.split(",")) {
- List < InstanceInfo > listOfInstanceInfo = eurekaClient.getInstancesByVipAddress(vipAddress, isSecure, targetRegion);
- for (InstanceInfo ii: listOfInstanceInfo) {
- if (ii.getStatus().equals(InstanceStatus.UP)) {
- // 省略了一些实例信息的加工逻辑
- DiscoveryEnabledServer des = new DiscoveryEnabledServer(ii, isSecure, shouldUseIpAddr);
- des.setZone(DiscoveryClient.getZone(ii));
- serverList.add(des);
- }
- }
- if (serverList.size() > 0 && prioritizeVipAddressBasedServers) {
- break;
- }
- }
- }
- return serverList;
- }
来源: https://zhuanlan.zhihu.com/p/31750966