目录
一, Spring Cloud Eureka Server 自动配置及初始化
@EnableEurekaServer
EurekaServerAutoConfiguration - 注册服务自动配置类
[重点 1] Eureka Server 上下文初始化
- ,PeerEurekaNodes#start(): 初始化对等节点信息
- ,PeerAwareInstanceRegistry#init(peerEurekaNodes): 集群实例注册器初始化
[重点 2] EurekaServerBootstrap 初始化
1,registry.syncUp(): 从相邻 eureka 节点拷贝注册列表信息
2,registry.openForTraffic(): 允许与客户端的数据传输
二, Eureka Server 处理注册请求
- ApplicationResource#addInstance() - 注册单个应用实例
- PeerAwareInstanceRegistryImpl#register() - 注册服务信息并同步到其它 Eureka 节点
- AbstractInstanceRegistry#register(): 注册
- PeerAwareInstanceRegistryImpl#replicateToPeers() : 复制到 Eureka 对等节点
本文使用 Spring Cloud Eureka 分析
Spring Cloud 版本: Dalston.SR5
spring-cloud-starter-eureka 版本: 1.3.6.RELEASE
netflix eureka 版本: 1.6.2
继续 从 Eureka Client 发起注册请求到 Eureka Server 处理的整个服务注册过程(上) 分析
目录:
一, Spring Cloud Eureka Server 自动配置及初始化
@EnableEurekaServer
创建 Spring Cloud Eureka Server 首先要使用 @EnableEurekaServer 注解, 其实质是:
- @EnableDiscoveryClient
- @Target(ElementType.TYPE)
- @Retention(RetentionPolicy.RUNTIME)
- @Documented
- @Import(EurekaServerMarkerConfiguration.class)
- public @interface EnableEurekaServer {
- }
- @EnableDiscoveryClient
: 引入服务发现客户端相关配置(身为 Server 的同时, 在 Server 集群复制时也会作为 Client)
导入
EurekaServerMarkerConfiguration
: 激活
EurekaServerAutoConfiguration
所以,@EnableEurekaServer 注解和上一篇分析的 Client 启动注解都是通过向 Spring 容器注入 Maker 的形式激活 xxAutoConfiguration 配置类, Eureka Client 是 EurekaClientAutoConfiguration,Eureka Server 是 EurekaServerAutoConfiguration
EurekaServerAutoConfiguration - 注册服务自动配置类
以下是对自动注入的各个组件的简单分析:
头部注解
@Import(EurekaServerInitializerConfiguration.class): 导入 Eureka Server 初始化的配置类, 其实现 SmartLifecycle 接口, 会在 Spring 容器基本 refresh 完毕时调用 EurekaServerBootstrap#contextInitialized() Eureka Server 启动分析重点
@EnableConfigurationProperties({ EurekaDashboardProperties.class,InstanceRegistryProperties.class })
EurekaDashboardProperties 是仪表盘相关属性
InstanceRegistryProperties 是实例注册相关属性
- @ConfigurationProperties(PREFIX)
- public class InstanceRegistryProperties {
- public static final String PREFIX = "eureka.instance.registry";
- /* Default number of expected renews per minute, defaults to 1.
- * Setting expectedNumberOfRenewsPerMin to non-zero to ensure that even an isolated
- * server can adjust its eviction policy to the number of registrations (when it's
- * zero, even a successful registration won't reset the rate threshold in
- * InstanceRegistry.register()).
- * 每分钟默认续约数量为 1
- * 将 expectedNumberOfRenewsPerMin 设置为非零
- * 以确保即使是隔离的服务器也可以根据注册数量调整其驱逐策略
- * (当它为零时, 即使成功注册也不会重置 InstanceRegistry.register()中的速率阈值)
- */
- @Value("${eureka.server.expectedNumberOfRenewsPerMin:1}") // for backwards compatibility
- // 为了向后兼容
- private int expectedNumberOfRenewsPerMin = 1;
- /**
- * Value used in determining when leases are cancelled, default to 1 for standalone.
- * Should be set to 0 for peer replicated eurekas
- * 决定租约何时取消的值
- * 单机默认值为 1, 对于同行复制的 eurekas, 应设置为 0
- */
- @Value("${eureka.server.defaultOpenForTrafficCount:1}") // for backwards compatibility
- private int defaultOpenForTrafficCount = 1;
@PropertySource("classpath:/eureka/server.properties") : 在 spring-cloud-netflix-eureka-server-xxx.jar 中, 只包含 spring.http.encoding.force=false
EurekaServerFeature: 访问 / features 端点时会显示启用的 Eureka Server 自动配置类为 EurekaServerAutoConfiguration
EurekaServerConfig: 注入 Eureka Server 配置类, EurekaServerConfig 是 netflix 的接口, 里面有很多记录 eureka 服务器运行所需的配置信息, netflix 的默认实现类是 DefaultEurekaServerConfig,spring cloud 的默认实现类是 EurekaServerConfigBean
- @Configuration
- protected static class EurekaServerConfigBeanConfiguration {
- @Bean
- @ConditionalOnMissingBean
- public EurekaServerConfig eurekaServerConfig(EurekaClientConfig clientConfig) {
- EurekaServerConfigBean server = new EurekaServerConfigBean(); // 创建 EurekaServerConfigBean
- // 如果当前 Eureka Server 本身也需要作为客户端注册(集群模式必须开启??)
- if (clientConfig.shouldRegisterWithEureka()) {
- // Set a sensible default if we are supposed to replicate
- // 设置 EurekaServer 在启动期间 eureka 节点尝试从对等放获取注册表信息的重试次数
- server.setRegistrySyncRetries(5);
- }
- return server;
- }
- }
EurekaController:Eureka Server Dashborad 对应的 Controller(默认 path: /)
PeerAwareInstanceRegistry: 直译是对等体可见的应用实例注册器, 就是在注册实例时会考虑集群情况下其它 Node 相关操作的注册器
- @Bean
- public PeerAwareInstanceRegistry peerAwareInstanceRegistry(
- ServerCodecs serverCodecs) {
- this.eurekaClient.getApplications(); // force initialization
- // 强制初始化 eurekaClient, 在之前看 RefreshScope 的 bug 时, 也使用到了这种方式强制创建 eurekaClient
- // 创建 InstanceRegistry(是 spring cloud 的实现)
- // 继承了 PeerAwareInstanceRegistryImpl,PeerAwareInstanceRegistry 接口的实现类
- return new InstanceRegistry(
- this.eurekaServerConfig, this.eurekaClientConfig,
- serverCodecs, this.eurekaClient,
- this.instanceRegistryProperties.getExpectedNumberOfRenewsPerMin(),
- this.instanceRegistryProperties.getDefaultOpenForTrafficCount());
- }
PeerEurekaNodes: 用来管理 PeerEurekaNode 的帮助类
start(): 会创建一个 newSingleThreadScheduledExecutor 定时单例线程池, 定时更新 PeerNode 列表, 线程名为 "Eureka-PeerNodesUpdater", 线程执行间隔为
EurekaServerConfigBean.peerEurekaNodesUpdateIntervalMs=10 * MINUTES
, 调用时机是: DefaultEurekaServerContext 在 @PostConstruct 调用 initialize()-->peerEurekaNodes.start()
updatePeerEurekaNodes(resolvePeerUrls()): 定时线程里更新 PeerNode 列表的核心逻辑
resolvePeerUrls() : 是解析其它 Eureka Server Node 节点 URL, 会根据当前 Server 的 zone 和 shouldPreferSameZoneEureka 的配置获取一个经过排序的 replicaUrls 集合, 再判断 replicaUrls 有没有自己, 有就 remove
updatePeerEurekaNodes(): 将内存中的老的 PeerEurekaNodes.peerEurekaNodeUrls 与 上一步获得的新的 newPeerUrls 对比, 删除不可用的, 新增新添加的. 之所以不直接用 newPeerUrls, 是因为在删除不可用时可以做
PeerEurekaNode#shutdown()
, 在添加新的可以
PeerEurekaNodes#createPeerEurekaNode()
EurekaServerContext: Eureka Server 启动分析重点
Eureka Server 上下文接口, 包含 initialize(),shutdown()方法, EurekaServerConfig 配置, PeerEurekaNodes 节点管理帮助类, PeerAwareInstanceRegistry 对等体可见的应用实例注册器, ApplicationInfoManager 当前应用实例 info 信息管理器(是由 Client 配置初始化的)
默认实现类 com.netflix.eureka.DefaultEurekaServerContext
@PostConstruct 方法包含一些初始化逻辑(说明初始化方法是在 DefaultEurekaServerContext 构造后由 @PostConstruct 触发的?)
- @PostConstruct
- @Override
- public void initialize() throws Exception {
- logger.info("Initializing ...");
- // PeerEurekaNode 的帮助类 start
- // 会启动更新 PeerNode 列表的定时线程
- peerEurekaNodes.start();
- // PeerAwareInstanceRegistry 初始化
- // 启动 numberOfReplicationsLastMin 定时线程, initializedResponseCache(),scheduleRenewalThresholdUpdateTask(),initRemoteRegionRegistry(), 还有添加 JMX 监控
- registry.init(peerEurekaNodes);
- logger.info("Initialized");
- }
EurekaServerBootstrap: Eureka Server 启动引导, 会在 Spring 容器基本 refresh()完毕时由 EurekaServerInitializerConfiguration#run()方法真正调用 eurekaServerBootstrap.contextInitialized()初始化, 其中会 initEurekaEnvironment(),initEurekaServerContext() Eureka Server 启动分析重点
注册 Jersey filter: 所有 / eureka 的请求都需要经过 Jersery Filter, 其处理类是 com.sun.jersey.spi.container.servlet.ServletContainer, 其既是 Filter, 也是 Servlet, 包含 Jersey 的处理逻辑. 在构造时已经将 com.netflix.discovery 包 和 com.netflix.eureka 包 下的类作为处理请求的资源导入, 如处理单个应用请求的 com.netflix.eureka.resources.ApplicationResource
经过上面的 EurekaServerAutoConfiguration 自动配置类分析后, 个人感觉有几个重点:
1,DefaultEurekaServerContext(Eureka Server 上下文) 初始化
因为 netflix 设计的 EurekaServerContext 接口本身包含很多成员变量, 如 PeerEurekaNodes 管理对等节点, PeerAwareInstanceRegistry 考虑对等节点的实例注册器等, 在 Eureka Server 上下文初始化时会对这些组件初始化, 还会启动一些定时线程
2,EurekaServerBootstrap 初始化
EurekaServerBootstrap 是 spring cloud 实现的 Eureka Server 的启动引导类, 在 netflix 对应的是 EurekaBootstrap. 而这个启动引导类初始化是在 EurekaServerInitializerConfiguration 这个 Spring 的 SmartLifecycle bean 的生命周期方法中触发的, 在 refresh()几乎完成的时候, 所以会在 Eureka Server 上下文初始化之后
3,jerseyFilter, 用于处理所有到 / eureka 的请求
[重点 1] Eureka Server 上下文初始化
首先看 Netflix 的 EurekaServerContext 接口是如何定义的:
- public interface EurekaServerContext {
- void initialize() throws Exception;
- void shutdown() throws Exception;
- EurekaServerConfig getServerConfig();
- PeerEurekaNodes getPeerEurekaNodes();
- ServerCodecs getServerCodecs();
- PeerAwareInstanceRegistry getRegistry();
- ApplicationInfoManager getApplicationInfoManager();
- }
除了初始化 initialize()方法, shutdown()方法, 还有一些组件 EurekaServerConfig,PeerEurekaNodes,ServerCodecs,PeerAwareInstanceRegistry,ApplicationInfoManager, 而在自动配置构造 DefaultEurekaServerContext 时, 这些组件都已设置好
- @Inject
- public DefaultEurekaServerContext(EurekaServerConfig serverConfig,
- ServerCodecs serverCodecs,
- PeerAwareInstanceRegistry registry,
- PeerEurekaNodes peerEurekaNodes,
- ApplicationInfoManager applicationInfoManager) {
- this.serverConfig = serverConfig;
- this.serverCodecs = serverCodecs;
- this.registry = registry;
- this.peerEurekaNodes = peerEurekaNodes;
- this.applicationInfoManager = applicationInfoManager;
- }
接下来是由 @PostConstruct 触发的初始化方法
- @PostConstruct
- @Override
- public void initialize() throws Exception {
- logger.info("Initializing ...");
- peerEurekaNodes.start();
- registry.init(peerEurekaNodes);
- logger.info("Initialized");
- }
主要调用了 2 个组件的初始化方法: PeerEurekaNodes 和 PeerAwareInstanceRegistry
- 1,PeerEurekaNodes#start(): 初始化对等节点信息
- public void start() {
- // 后台运行的单线程定时任务执行器, 定时线程名: Eureka-PeerNodesUpdater
- taskExecutor = Executors.newSingleThreadScheduledExecutor(
- new ThreadFactory() {
- @Override
- public Thread newThread(Runnable r) {
- Thread thread = new Thread(r, "Eureka-PeerNodesUpdater");
- thread.setDaemon(true);
- return thread;
- }
- }
- );
- try {
- // 解析 Eureka Server URL, 并更新 PeerEurekaNodes 列表
- updatePeerEurekaNodes(resolvePeerUrls());
- // 启动定时执行任务 peersUpdateTask(定时默认 10min, 由 peerEurekaNodesUpdateIntervalMs 配置)
- Runnable peersUpdateTask = new Runnable() {
- @Override
- public void run() {
- try {
- // 定时任务中仍然是 解析 Eureka Server URL, 并更新 PeerEurekaNodes 列表
- updatePeerEurekaNodes(resolvePeerUrls());
- } catch (Throwable e) {
- logger.error("Cannot update the replica Nodes", e);
- }
- }
- };
- taskExecutor.scheduleWithFixedDelay(
- peersUpdateTask,
- serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
- serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
- TimeUnit.MILLISECONDS
- );
- } catch (Exception e) {
- throw new IllegalStateException(e);
- }
- // 打印对等体节点(应该没有当前节点自己)
- for (PeerEurekaNode node : peerEurekaNodes) {
- logger.info("Replica node URL:" + node.getServiceUrl());
- }
- }
PeerEurekaNodes 启动主要做了 2 件事:
根据配置信息更新 PeerEurekaNodes 列表
启动定时更新 PeerEurekaNodes 列表的任务 peersUpdateTask, 定时线程名[Eureka-PeerNodesUpdater]
resolvePeerUrls(): 解析配置的对等体 URL
- protected List<String> resolvePeerUrls() {
- // 当前 Eureka Server 自己的 InstanceInfo 信息
- InstanceInfo myInfo = applicationInfoManager.getInfo();
- // 当前 Eureka Server 所在的 zone, 默认是 defaultZone
- String zone = InstanceInfo.getZone(clientConfig.getAvailabilityZones(clientConfig.getRegion()), myInfo);
- // 获取配置的 service-url
- List<String> replicaUrls = EndpointUtils
- .getDiscoveryServiceUrls(clientConfig, zone, new EndpointUtils.InstanceInfoBasedUrlRandomizer(myInfo));
- // 遍历 service-url, 排除自己
- int idx = 0;
- while (idx <replicaUrls.size()) {
- if (isThisMyUrl(replicaUrls.get(idx))) {
- replicaUrls.remove(idx);
- } else {
- idx++;
- }
- }
- return replicaUrls;
- }
isThisMyUrl() 是如何判断是自己的 URL, 进而排除的呢?
- public boolean isThisMyUrl(String url) {
- return isInstanceURL(url, applicationInfoManager.getInfo());
- }
- public boolean isInstanceURL(String url, InstanceInfo instance) {
- // 根据配置项的 url 获取 host 主机信息
- String hostName = hostFromUrl(url);
- // 根据当前 Eureka Server 的 Instance 实例信息获取 host 主机信息
- String myInfoComparator = instance.getHostName();
- // 如果 eureka.client.transport.applicationsResolverUseIp==true, 即按照 IP 解析 URL
- // 那么将当前 Eureka Server 的 Instance 实例信息转换为 IP
- if (clientConfig.getTransportConfig().applicationsResolverUseIp()) {
- myInfoComparator = instance.getIPAddr();
- }
- // 比较配置项的 hostName 和 当前 Eureka Server 的 Instance 实例信息
- return hostName != null && hostName.equals(myInfoComparator);
- }
其中配置项中的 hostName 基本上就是 http:// 和 端口号 之间的部分, 而当前 Eureka Server 实例的用于比较的 myInfoComparator 信息是
如果主动配置了 eureka.instance.hostname=xxx, 配置值就是当前 Eureka Server 实例的 host
没有主动配置的话, 会从在
EurekaClientAutoConfiguration
中创建
EurekaInstanceConfigBean
时使用的 InetUtils 中获取, InetUtils 是 spring cloud 网络相关的工具类, 其首先根据第一个非回环网卡获取 IP(注意: docker 容器环境有坑), 再根据 InetAddress 获取与 IP 对应的 hostname, 我已知的是从如 Linux 的 /etc/hosts 配置文件中获取 或者 从 hostname 环境变量获取
如果 eureka.client.transport.applicationsResolverUseIp=true, 那么按照当前 Eureka Server 实例的 IP 来比较
updatePeerEurekaNodes(): 更新 PeerEurekaNodes 列表
- // PeerEurekaNodes#updatePeerEurekaNodes()
- // newPeerUrls 为本次要更新的 Eureka 对等体 URL 列表
- protected void updatePeerEurekaNodes(List<String> newPeerUrls) {
- if (newPeerUrls.isEmpty()) {
- logger.warn("The replica size seems to be empty. Check the route 53 DNS Registry");
- return;
- }
- // 计算 原 peerEurekaNodeUrls - 新 newPeerUrls 的差集, 就是多余可 shutdown 节点
- Set<String> toShutdown = new HashSet<>(peerEurekaNodeUrls);
- toShutdown.removeAll(newPeerUrls);
- // 计算 新 newPeerUrls - 原 peerEurekaNodeUrls 的差集, 就是需要新增节点
- Set<String> toAdd = new HashSet<>(newPeerUrls);
- toAdd.removeAll(peerEurekaNodeUrls);
- if (toShutdown.isEmpty() && toAdd.isEmpty()) { // No change 没有变更
- return;
- }
- // Remove peers no long available
- List<PeerEurekaNode> newNodeList = new ArrayList<>(peerEurekaNodes);
- // shutDown 多余节点
- if (!toShutdown.isEmpty()) {
- logger.info("Removing no longer available peer nodes {}", toShutdown);
- int i = 0;
- while (i <newNodeList.size()) {
- PeerEurekaNode eurekaNode = newNodeList.get(i);
- if (toShutdown.contains(eurekaNode.getServiceUrl())) {
- newNodeList.remove(i);
- eurekaNode.shutDown();
- } else {
- i++;
- }
- }
- }
- // Add new peers
- // 添加新的 peerEurekaNode - createPeerEurekaNode()
- if (!toAdd.isEmpty()) {
- logger.info("Adding new peer nodes {}", toAdd);
- for (String peerUrl : toAdd) {
- newNodeList.add(createPeerEurekaNode(peerUrl));
- }
- }
- this.peerEurekaNodes = newNodeList;
- this.peerEurekaNodeUrls = new HashSet<>(newPeerUrls);
- }
- 2,PeerAwareInstanceRegistry#init(peerEurekaNodes): 集群实例注册器初始化
根据上一步初始化好的 peerEurekaNodes, 来初始化 PeerAwareInstanceRegistry, 考虑集群中的对等体的实例注册器
- // PeerAwareInstanceRegistryImpl#init()
- @Override
- public void init(PeerEurekaNodes peerEurekaNodes) throws Exception {
- // [重要] 启动用于统计最后 xx 毫秒续约情况的定时线程
- this.numberOfReplicationsLastMin.start();
- this.peerEurekaNodes = peerEurekaNodes;
- // [重要] 初始化 ResponseCache: 对客户端查询服务列表信息的缓存(所有服务列表, 增量修改, 单个应用)
- // 默认 responseCacheUpdateIntervalMs=30s
- initializedResponseCache();
- // [重要] 定期更新续约阀值的任务, 默认 900s 执行一次
- // 调用 PeerAwareInstanceRegistryImpl#updateRenewalThreshold()
- scheduleRenewalThresholdUpdateTask();
- // 初始化 远程区域注册 相关信息(默认没有远程 Region, 都是使用 us-east-1)
- initRemoteRegionRegistry();
- try {
- Monitors.registerObject(this);
- } catch (Throwable e) {
- logger.warn("Cannot register the JMX monitor for the InstanceRegistry :", e);
- }
- }
numberOfReplicationsLastMin: 上一分钟续约数统计
numberOfReplicationsLastMin 是 com.netflix.eureka.util.MeasuredRate 用于统计测量上一分钟的续约数
- // MeasuredRate#start()
- public synchronized void start() {
- if (!isActive) {
- timer.schedule(new TimerTask() {
- @Override
- public void run() {
- try {
- // Zero out the current bucket.
- // 将当前的桶的统计数据放到 lastBucket, 当前桶置为 0
- lastBucket.set(currentBucket.getAndSet(0));
- } catch (Throwable e) {
- logger.error("Cannot reset the Measured Rate", e);
- }
- }
- }, sampleInterval, sampleInterval);
- isActive = true;
- }
- }
- /**
- * Returns the count in the last sample interval.
- * 返回上一分钟的统计数
- */
- public long getCount() {
- return lastBucket.get();
- }
- /**
- * Increments the count in the current sample interval.
- * 增加当前桶的计数, 在以下 2 个场景有调用:
- * AbstractInstanceRegistry#renew() - 续约
- * PeerAwareInstanceRegistryImpl#replicateToPeers() -
- */
- public void increment() {
- currentBucket.incrementAndGet();
- }
初始化 ResponseCache
ResponseCache 主要是缓存服务列表信息, 根据注释可知, 缓存以压缩和非压缩形式维护, 用于三类请求: all applications, 增量更改和单个 application
- // ResponseCacheImpl 构造
- private final ConcurrentMap<Key, Value> readOnlyCacheMap = new ConcurrentHashMap<Key, Value>();
- private final LoadingCache<Key, Value> readWriteCacheMap;
- ResponseCacheImpl(EurekaServerConfig serverConfig, ServerCodecs serverCodecs, AbstractInstanceRegistry registry) {
- this.serverConfig = serverConfig;
- this.serverCodecs = serverCodecs;
- // 根据配置 eureka.server.useReadOnlyResponseCache 判断, 是否使用只读 ResponseCache, 默认 true
- // 由于 ResponseCache 维护这一个可读可写的 readWriteCacheMap, 还有一个只读的 readOnlyCacheMap
- // 此配置控制在 get()应用数据时, 是去只读 Map 读, 还是读写 Map 读, 应该只读 Map 是定期更新的
- this.shouldUseReadOnlyResponseCache = serverConfig.shouldUseReadOnlyResponseCache();
- this.registry = registry;
- // eureka.server.responseCacheUpdateIntervalMs 缓存更新频率, 默认 30s
- long responseCacheUpdateIntervalMs = serverConfig.getResponseCacheUpdateIntervalMs();
- // 创建读写 Map,com.google.common.cache.LoadingCache
- // 可以设置初始值, 数据写入过期时间, 删除监听器等
- this.readWriteCacheMap =
- CacheBuilder.newBuilder().initialCapacity(1000)
- .expireAfterWrite(serverConfig.getResponseCacheAutoExpirationInSeconds(), TimeUnit.SECONDS)
- .removalListener(new RemovalListener<Key, Value>() {
- @Override
- public void onRemoval(RemovalNotification<Key, Value> notification) {
- Key removedKey = notification.getKey();
- if (removedKey.hasRegions()) {
- Key cloneWithNoRegions = removedKey.cloneWithoutRegions();
- regionSpecificKeys.remove(cloneWithNoRegions, removedKey);
- }
- }
- })
- .build(new CacheLoader<Key, Value>() {
- @Override
- public Value load(Key key) throws Exception {
- if (key.hasRegions()) {
- Key cloneWithNoRegions = key.cloneWithoutRegions();
- regionSpecificKeys.put(cloneWithNoRegions, key);
- }
- Value value = generatePayload(key);
- return value;
- }
- });
- // 如果启用只读缓存, 那么每隔 responseCacheUpdateIntervalMs=30s, 执行 getCacheUpdateTask()
- if (shouldUseReadOnlyResponseCache) {
- timer.schedule(getCacheUpdateTask(),
- new Date(((System.currentTimeMillis() / responseCacheUpdateIntervalMs) * responseCacheUpdateIntervalMs)
- + responseCacheUpdateIntervalMs),
- responseCacheUpdateIntervalMs);
- }
- try {
- Monitors.registerObject(this);
- } catch (Throwable e) {
- logger.warn("Cannot register the JMX monitor for the InstanceRegistry", e);
- }
- }
可见 ResponseCache 维护了两个 Map, 一个可读可写的 readWriteCacheMap, 应该每个操作都会写入, 一个只读的 readOnlyCacheMap, 默认应该每 30s 更新一次, 下面具体看看 getCacheUpdateTask()
- // ResponseCacheImpl#getCacheUpdateTask()
- private TimerTask getCacheUpdateTask() {
- return new TimerTask() {
- @Override
- public void run() {
- logger.debug("Updating the client cache from response cache");
- // 遍历只读 Map
- for (Key key : readOnlyCacheMap.keySet()) {
- if (logger.isDebugEnabled()) {
- Object[] args = {key.getEntityType(), key.getName(), key.getVersion(), key.getType()};
- logger.debug("Updating the client cache from response cache for key : {} {} {} {}", args);
- }
- try {
- CurrentRequestVersion.set(key.getVersion());
- Value cacheValue = readWriteCacheMap.get(key);
- Value currentCacheValue = readOnlyCacheMap.get(key);
- // 如果只读 Map 中的值 和 读写 Map 中的值不同, 用读写 Map 更新只读 Map
- if (cacheValue != currentCacheValue) {
- readOnlyCacheMap.put(key, cacheValue);
- }
- } catch (Throwable th) {
- logger.error("Error while updating the client cache from response cache", th);
- }
- }
- }
- };
- }
每 30s 会比较只读 Map 和读写 Map 中的值, 以读写 Map 中的为准
scheduleRenewalThresholdUpdateTask: 定期更新续约阀值的任务
- /**
- * Schedule the task that updates <em>renewal threshold</em> periodically.
- * The renewal threshold would be used to determine if the renewals drop
- * dramatically because of network partition and to protect expiring too
- * many instances at a time.
- * 每隔 eureka.server.renewalThresholdUpdateIntervalMs=900 秒 更新一次续约阀值
- */
- private void scheduleRenewalThresholdUpdateTask() {
- timer.schedule(new TimerTask() {
- @Override
- public void run() {
- updateRenewalThreshold();
- }
- }, serverConfig.getRenewalThresholdUpdateIntervalMs(),
- serverConfig.getRenewalThresholdUpdateIntervalMs());
- }
更新续约阀值在 updateRenewalThreshold()方法
- // PeerAwareInstanceRegistryImpl#updateRenewalThreshold()
- /**
- * Updates the <em>renewal threshold</em> based on the current number of
- * renewals. The threshold is a percentage as specified in
- * {@link EurekaServerConfig#getRenewalPercentThreshold()} of renewals
- * received per minute {@link #getNumOfRenewsInLastMin()}.
- */
- private void updateRenewalThreshold() {
- try {
- Applications apps = eurekaClient.getApplications();
- int count = 0;
- // 统计所有 Instance 实例个数
- for (Application App : apps.getRegisteredApplications()) {
- for (InstanceInfo instance : App.getInstances()) {
- if (this.isRegisterable(instance)) {
- ++count;
- }
- }
- }
- synchronized (lock) {
- // Update threshold only if the threshold is greater than the
- // current expected threshold of if the self preservation is disabled.
- // 只有当阀值大于当前预期值时, 才更新 或者 关闭了自我保护模式
- if ((count * 2)> (serverConfig.getRenewalPercentThreshold() * numberOfRenewsPerMinThreshold)
- || (!this.isSelfPreservationModeEnabled())) {
- this.expectedNumberOfRenewsPerMin = count * 2;
- this.numberOfRenewsPerMinThreshold = (int) ((count * 2) * serverConfig.getRenewalPercentThreshold());
- }
- }
- logger.info("Current renewal threshold is : {}", numberOfRenewsPerMinThreshold);
- } catch (Throwable e) {
- logger.error("Cannot update renewal threshold", e);
- }
- }
其实大体意思是: 先计算所有 Instance 实例个数, 默认每个实例 1 分钟应该续约 2 次(30s 一次)
如果开启自我保护模式, 更新 expectedNumberOfRenewsPerMin 预期每分钟续约数 和 numberOfRenewsPerMinThreshold 每分钟续约阀值
如果没有开启自我保护模式, 只有当本期续约数大于之前的阀值, 即当前不处在自我保护模式中(自我保护模式中, 不能删除服务列表, 阀值自然也不能更新), 才可以更新 expectedNumberOfRenewsPerMin 和 numberOfRenewsPerMinThreshold
但如上代码是有问题的, 无论是注释还是判断逻辑, 当前版本: eureka-core-1.6.2
直到 v1.9.3 版本才修复
https://github.com/Netflix/eureka/commit/a4dd6b22ad447c706234e63fe83cb58413f7618b#diff-4aec7ea96457f5084840fc40f501c320
之后又有两个版本, 修改了这里的计算逻辑和做了方法抽取
- Add possibility to configure expected interval between clients' renews and not break self-preservation
- Extract calculation of renews threshold to separate method
[重点 2] EurekaServerBootstrap 初始化
上面的自动配置过程中已经注册了处理所有 /eureka/** 请求的 Jersey Filter, 这样所有 Client 的注册, 续约等请求都可以处理了. 而还有一些工作是通过 EurekaServerBootstrap#contextInitialized()完成的, 在 Spring 容器基本上 refresh()完毕的时候
EurekaServerBootstrap 是 spring cloud 的实现, 而 netflix 的 Eureka Server 启动引导的实现是 EurekaBootStrap
// EurekaServerBootstrap#contextInitialized()
public void contextInitialized(ServletContext context) {
try {
initEurekaEnvironment(); // 初始化环境
initEurekaServerContext(); // 初始化上下文
context.setAttribute(EurekaServerContext.class.getName(), this.serverContext);
}
catch (Throwable e) {
log.error("Cannot bootstrap eureka server :", e);
throw new RuntimeException("Cannot bootstrap eureka server :", e);
}
}
这两个里面我们主要关注上下文的初始化 initEurekaServerContext()
// EurekaServerBootstrap#initEurekaServerContext()
protected void initEurekaServerContext() throws Exception {
// For backward compatibility
JsonXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(),
XStream.PRIORITY_VERY_HIGH);
XmlXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(),
XStream.PRIORITY_VERY_HIGH);
// 是否为 AWS 环境
if (isAws(this.applicationInfoManager.getInfo())) {
this.awsBinder = new AwsBinderDelegate(this.eurekaServerConfig,
this.eurekaClientConfig, this.registry, this.applicationInfoManager);
this.awsBinder.start();
}
// 将 serverContext 由 Holder 保管
EurekaServerContextHolder.initialize(this.serverContext);
log.info("Initialized server context");
// Copy registry from neighboring eureka node
// 从相邻的 eureka 节点拷贝注册列表信息
int registryCount = this.registry.syncUp();
this.registry.openForTraffic(this.applicationInfoManager, registryCount);
// Register all monitoring statistics.
EurekaMonitors.registerAllStats();
}
有两个重要环接:
registry.syncUp(): 从相邻 eureka 节点拷贝注册列表信息
registry.openForTraffic(): 允许开始与客户端的数据传输, 即开始作为 Server 服务
1,registry.syncUp(): 从相邻 eureka 节点拷贝注册列表信息
/**
* Populates the registry information from a peer eureka node. This
* operation fails over to other nodes until the list is exhausted if the
* communication fails.
*/
- @Override
- public int syncUp() {
- // Copy entire entry from neighboring DS node
- int count = 0;
- // 循环, 最多重试 RegistrySyncRetries 次(默认 5)
- // eurekaClient 中的逻辑会重试其它的 eureka 节点
- for (int i = 0; ((i <serverConfig.getRegistrySyncRetries()) && (count == 0)); i++) {
- if (i> 0) {
- try {
- Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs()); //30s
- } catch (InterruptedException e) {
- logger.warn("Interrupted during registry transfer..");
- break;
- }
- }
- // 从 eurekaClient 获取服务列表
- Applications apps = eurekaClient.getApplications();
- // 循环服务列表, 并依次注册
- for (Application App : apps.getRegisteredApplications()) {
- for (InstanceInfo instance : App.getInstances()) {
- try {
- if (isRegisterable(instance)) {
- register(instance, instance.getLeaseInfo().getDurationInSecs(), true);
- count++;
- }
- } catch (Throwable t) {
- logger.error("During DS init copy", t);
- }
- }
- }
- }
- return count;
- }
2,registry.openForTraffic(): 允许与客户端的数据传输
- // InstanceRegistry#openForTraffic()
- /**
- * If
- * {@link PeerAwareInstanceRegistryImpl#openForTraffic(ApplicationInfoManager, int)}
- * is called with a zero argument, it means that leases are not automatically
- * cancelled if the instance hasn't sent any renewals recently. This happens for a
- * standalone server. It seems like a bad default, so we set it to the smallest
- * non-zero value we can, so that any instances that subsequently register can bump up
- * the threshold.
- */
- @Override
- public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {
- // 如果 count==0, 即没有从相邻 eureka 节点得到服务列表, 如单机启动模式, defaultOpenForTrafficCount=1
- super.openForTraffic(applicationInfoManager,
- count == 0 ? this.defaultOpenForTrafficCount : count);
- }
- // PeerAwareInstanceRegistryImpl#openForTraffic()
- @Override
- public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {
- // Renewals happen every 30 seconds and for a minute it should be a factor of 2.
- // 每分钟期待的续约数(默认 30s 续约, 60s 就是 2 次)
- this.expectedNumberOfRenewsPerMin = count * 2;
- // 每分钟续约的阀值: 85% * expectedNumberOfRenewsPerMin
- this.numberOfRenewsPerMinThreshold =
- (int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
- logger.info("Got" + count + "instances from neighboring DS node");
- logger.info("Renew threshold is:" + numberOfRenewsPerMinThreshold);
- this.startupTime = System.currentTimeMillis();
- if (count> 0) { // 可 count 默认值是 1, 那么 peerInstancesTransferEmptyOnStartup 始终不会是 true
- // 在 PeerAwareInstanceRegistryImpl#shouldAllowAccess(boolean)方法有用
- this.peerInstancesTransferEmptyOnStartup = false;
- }
- DataCenterInfo.Name selfName = applicationInfoManager.getInfo().getDataCenterInfo().getName();
- boolean isAws = Name.Amazon == selfName;
- if (isAws && serverConfig.shouldPrimeAwsReplicaConnections()) {
- logger.info("Priming AWS connections for all replicas..");
- primeAwsReplicas(applicationInfoManager);
- }
- logger.info("Changing status to UP");
- applicationInfoManager.setInstanceStatus(InstanceStatus.UP);
- // 开启新的[EvictionTask]
- super.postInit();
- }
- // AbstractInstanceRegistry#postInit()
- protected void postInit() {
- renewsLastMin.start(); // 一个统计频率的监控 Timer
- if (evictionTaskRef.get() != null) {
- evictionTaskRef.get().cancel();
- }
- evictionTaskRef.set(new EvictionTask());
- evictionTimer.schedule(evictionTaskRef.get(),
- serverConfig.getEvictionIntervalTimerInMs(), // 默认 60s
- serverConfig.getEvictionIntervalTimerInMs());
- }
如果没有从相邻 eureka 节点获得服务, count 默认为 1
初始化每分钟期待的续约数 expectedNumberOfRenewsPerMin = count * 2
初始化每分钟续约阀值 numberOfRenewsPerMinThreshold= 85% * expectedNumberOfRenewsPerMin
applicationInfoManager 设置状态为 UP
开启新的[EvictionTask] 驱逐任务
二, Eureka Server 处理注册请求
经过上面的 Eureka Server 自动配置及初始化, Eureka Server 已经成功启动并可以通过 Jersey 处理各种请求, 具体的注册请求是由 com.netflix.eureka.resources.ApplicationResource#addInstance()处理的
- ApplicationResource#addInstance() - 注册单个应用实例
- // ApplicationResource#addInstance()
- @POST
- @Consumes({"application/json", "application/xml"})
- public Response addInstance(InstanceInfo info,
- @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
- logger.debug("Registering instance {} (replication={})", info.getId(), isReplication);
- // validate that the instanceinfo contains all the necessary required fields
- // 验证 Instance 实例的所有必填字段
- if (isBlank(info.getId())) {
- return Response.status(400).entity("Missing instanceId").build();
- } else if (isBlank(info.getHostName())) {
- return Response.status(400).entity("Missing hostname").build();
- } else if (isBlank(info.getAppName())) {
- return Response.status(400).entity("Missing appName").build();
- } else if (!appName.equals(info.getAppName())) {
- return Response.status(400).entity("Mismatched appName, expecting" + appName + "but was" + info.getAppName()).build();
- } else if (info.getDataCenterInfo() == null) {
- return Response.status(400).entity("Missing dataCenterInfo").build();
- } else if (info.getDataCenterInfo().getName() == null) {
- return Response.status(400).entity("Missing dataCenterInfo Name").build();
- }
- // handle cases where clients may be registering with bad DataCenterInfo with missing data
- // 处理客户端可能正在使用缺少数据的错误 DataCenterInfo 注册的情况
- DataCenterInfo dataCenterInfo = info.getDataCenterInfo();
- if (dataCenterInfo instanceof UniqueIdentifier) {
- String dataCenterInfoId = ((UniqueIdentifier) dataCenterInfo).getId();
- if (isBlank(dataCenterInfoId)) {
- boolean experimental = "true".equalsIgnoreCase(serverConfig.getExperimental("registration.validation.dataCenterInfoId"));
- if (experimental) {
- String entity = "DataCenterInfo of type" + dataCenterInfo.getClass() + "must contain a valid id";
- return Response.status(400).entity(entity).build();
- } else if (dataCenterInfo instanceof AmazonInfo) {
- AmazonInfo amazonInfo = (AmazonInfo) dataCenterInfo;
- String effectiveId = amazonInfo.get(AmazonInfo.MetaDataKey.instanceId);
- if (effectiveId == null) {
- amazonInfo.getMetadata().put(AmazonInfo.MetaDataKey.instanceId.getName(), info.getId());
- }
- } else {
- logger.warn("Registering DataCenterInfo of type {} without an appropriate id", dataCenterInfo.getClass());
- }
- }
- }
- // [ 使用 PeerAwareInstanceRegistry 集群实例注册器 register 当前实例 ]
- // isReplication 表示此操作是否是节点间的复制, 此处 isReplication==null
- registry.register(info, "true".equals(isReplication));
- return Response.status(204).build(); // 204 to be backwards compatible
- // 注册成功返回 204 状态码
- }
重点是 registry.register(info, "true".equals(isReplication)), 即使用 PeerAwareInstanceRegistry 集群实例注册器 register 当前实例
- PeerAwareInstanceRegistryImpl#register() - 注册服务信息并同步到其它 Eureka 节点
- // PeerAwareInstanceRegistryImpl#register()
- /**
- * Registers the information about the {@link InstanceInfo} and replicates
- * this information to all peer eureka nodes. If this is replication event
- * from other replica nodes then it is not replicated.
- * 注册有关 InstanceInfo 信息, 并将此信息复制到所有对等的 eureka 节点
- * 如果这是来自其他节点的复制事件, 则不会继续复制它
- *
- * @param info
- * the {@link InstanceInfo} to be registered and replicated.
- * @param isReplication
- * true if this is a replication event from other replica nodes,
- * false otherwise.
- */
- @Override
- public void register(final InstanceInfo info, final boolean isReplication) {
- int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS; // 默认的租约持续时间是 90s
- // 如果当前 Instance 实例的租约信息中有 leaseDuration 持续时间, 使用实例的 leaseDuration
- if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs()> 0) {
- leaseDuration = info.getLeaseInfo().getDurationInSecs();
- }
- // [ 当前 Eureka Server 注册实例信息 ]
- super.register(info, leaseDuration, isReplication);
- // [ 将注册实例信息复制到集群中其它节点 ]
- replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
- }
默认 leaseDuration 租约持续时间为 90s, 如果当前 Instance 实例的租约信息中有 leaseDuration 持续时间, 使用实例的 leaseDuration
[重点] 当前 Eureka Server 注册实例信息
[重点] 将注册实例信息复制到集群中其它节点
- AbstractInstanceRegistry#register(): 注册
- /**
- * Registers a new instance with a given duration.
- *
- * @see com.netflix.eureka.lease.LeaseManager#register(java.lang.Object, int, boolean)
- */
- public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
- try {
- read.lock(); // 读锁
- // registry 是保存所有应用实例信息的 Map:ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>
- // 从 registry 中获取当前 appName 的所有实例信息
- Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
- REGISTER.increment(isReplication); // 注册统计 + 1
- // 如果当前 appName 实例信息为空, 新建 Map
- if (gMap == null) {
- final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
- gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
- if (gMap == null) {
- gMap = gNewMap;
- }
- }
- // 获取实例的 Lease 租约信息
- Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
- // Retain the last dirty timestamp without overwriting it, if there is already a lease
- // 如果已经有租约, 则保留最后一个脏时间戳而不覆盖它
- // (比较当前请求实例租约 和 已有租约 的 LastDirtyTimestamp, 选择靠后的)
- if (existingLease != null && (existingLease.getHolder() != null)) {
- Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
- Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
- logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
- if (existingLastDirtyTimestamp> registrationLastDirtyTimestamp) {
- logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" +
- "than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
- logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");
- registrant = existingLease.getHolder();
- }
- }
- else {
- // The lease does not exist and hence it is a new registration
- // 如果之前不存在实例的租约, 说明是新实例注册
- // expectedNumberOfRenewsPerMin 期待的每分钟续约数 + 2(因为 30s 一个)
- // 并更新 numberOfRenewsPerMinThreshold 每分钟续约阀值(85%)
- synchronized (lock) {
- if (this.expectedNumberOfRenewsPerMin> 0) {
- // Since the client wants to cancel it, reduce the threshold
- // (1
- // for 30 seconds, 2 for a minute)
- this.expectedNumberOfRenewsPerMin = this.expectedNumberOfRenewsPerMin + 2;
- this.numberOfRenewsPerMinThreshold =
- (int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
- }
- }
- logger.debug("No previous lease information found; it is new registration");
- }
- Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
- if (existingLease != null) {
- lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
- }
- gMap.put(registrant.getId(), lease); // 当前实例信息放到维护注册信息的 Map
- // 同步维护最近注册队列
- synchronized (recentRegisteredQueue) {
- recentRegisteredQueue.add(new Pair<Long, String>(
- System.currentTimeMillis(),
- registrant.getAppName() + "(" + registrant.getId() + ")"));
- }
- // This is where the initial state transfer of overridden status happens
- // 如果当前实例已经维护了 OverriddenStatus, 将其也放到此 Eureka Server 的 overriddenInstanceStatusMap 中
- if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {
- logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the"
- + "overrides", registrant.getOverriddenStatus(), registrant.getId());
- if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {
- logger.info("Not found overridden id {} and hence adding it", registrant.getId());
- overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());
- }
- }
- InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());
- if (overriddenStatusFromMap != null) {
- logger.info("Storing overridden status {} from map", overriddenStatusFromMap);
- registrant.setOverriddenStatus(overriddenStatusFromMap);
- }
- // Set the status based on the overridden status rules
- // 根据 overridden status 规则, 设置状态
- InstanceStatus overriddenInstanceStatus
- = getOverriddenInstanceStatus(registrant, existingLease, isReplication);
- registrant.setStatusWithoutDirty(overriddenInstanceStatus);
- // If the lease is registered with UP status, set lease service up timestamp
- // 如果租约以 UP 状态注册, 设置租赁服务时间戳
- if (InstanceStatus.UP.equals(registrant.getStatus())) {
- lease.serviceUp();
- }
- registrant.setActionType(ActionType.ADDED); //ActionType 为 ADD
- recentlyChangedQueue.add(new RecentlyChangedItem(lease)); // 维护 recentlyChangedQueue
- registrant.setLastUpdatedTimestamp(); // 更新最后更新时间
- // 使当前应用的 ResponseCache 失效
- invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
- logger.info("Registered instance {}/{} with status {} (replication={})",
- registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);
- } finally {
- read.unlock(); // 读锁
- }
- }
维护当前 Instance 实例的 Lease 租约信息, 并放到 Eureka Server 维护注册信息的 Map:[ConcurrentHashMap<String, Map<String, Lease>>] , 对应关系是 appName:<Instance 实例 Id,Lease 租约信息>
如果是新注册, expectedNumberOfRenewsPerMin 期待的每分钟续约数 + 2, 并更新 numberOfRenewsPerMinThreshold 每分钟续约阀值
维护 recentRegisteredQueue 最近注册队列, recentlyChangedQueue 最近更改队列
如果本次注册实例已经维护了 OverriddenStatus, 根据一定规则, 维护本 Server 节点当前实例的 OverriddenStatus
设置 Instance 实例的最后更新时间戳
对当前应用对应的 ResponseCache 缓存失效
responseCache 用于缓存查询的应用实例信息
其使用 guava cache 维护了一个可读可写的 LocalLoadingCache 本地缓存[readWriteCacheMap] , 还有一个只读的 ConcurrentMap [readOnlyCacheMap]
在 get(key, useReadOnlyCache)时首先会检查 [readOnlyCacheMap] 只读缓存, 如没有, 再查[readWriteCacheMap] , 而[readWriteCacheMap] 的 get() 其含义实际是 getOrLoad(), 如果获取不到从 CacheLoader 加载, 而 CacheLoader 会到维护应用实例注册信息的 Map 中获取
[readWriteCacheMap] 是直接与维护应用实例注册信息 Map 交互的, 查询时会 Load 加载, 注册新实例时会失效整个应用的
[readOnlyCacheMap] 是在[readWriteCacheMap] 之上的只读缓存, 由配置 eureka.server.useReadOnlyResponseCache 控制, 默认 true, 每隔 eureka.server.responseCacheUpdateIntervalMs=30s 与[readWriteCacheMap] 同步一次
PeerAwareInstanceRegistryImpl#replicateToPeers() : 复制到 Eureka 对等节点 // PeerAwareInstanceRegistryImpl#replicateToPeers() /** * Replicates all eureka actions to peer eureka nodes except for replication * traffic to this node. */ private void replicateToPeers(Action action, String appName, String id, InstanceInfo info /* optional */, InstanceStatus newStatus /* optional */, boolean isReplication) { Stopwatch tracer = action.getTimer().start(); try { // 如果是复制操作(针对当前节点, false) if (isReplication) { numberOfReplicationsLastMin.increment(); } // If it is a replication already, do not replicate again as this will create a poison replication // 如果它已经是复制, 请不要再次复制, 直接 return if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) { return; } // 遍历集群所有节点(除当前节点外) for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) { // If the url represents this host, do not replicate to yourself. if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) { continue; } // 复制 Instance 实例操作到某个 node 节点 replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node); } } finally { tracer.stop(); } }
下面是 replicateInstanceActionsToPeers()复制 Instance 实例操作到其它节点
// PeerAwareInstanceRegistryImpl#replicateInstanceActionsToPeers() /** * Replicates all instance changes to peer eureka nodes except for * replication traffic to this node. * */ private void replicateInstanceActionsToPeers(Action action, String appName, String id, InstanceInfo info, InstanceStatus newStatus, PeerEurekaNode node) { try { InstanceInfo infoFromRegistry = null; CurrentRequestVersion.set(Version.V2); switch (action) { case Cancel: // 取消 node.cancel(appName, id); break; case Heartbeat: // 心跳 InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id); infoFromRegistry = getInstanceByAppAndId(appName, id, false); node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false); break; case Register: // 注册 node.register(info); break; case StatusUpdate: // 状态更新 infoFromRegistry = getInstanceByAppAndId(appName, id, false); node.statusUpdate(appName, id, newStatus, infoFromRegistry); break; case DeleteStatusOverride: // 删除 OverrideStatus infoFromRegistry = getInstanceByAppAndId(appName, id, false); node.deleteStatusOverride(appName, id, infoFromRegistry); break; } } catch (Throwable t) { logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t); } }
本次只关心节点的注册操作
// PeerEurekaNode#register() /** * Sends the registration information of {@link InstanceInfo} receiving by * this node to the peer node represented by this class. * * @param info * the instance information {@link InstanceInfo} of any instance * that is send to this instance. * @throws Exception */ public void register(final InstanceInfo info) throws Exception { // 当前时间 + 30s 后 过期 long expiryTime = System.currentTimeMillis() + getLeaseRenewalOf(info); // 提交相同的操作到批量复制任务处理 batchingDispatcher.process( taskId("register", info), new InstanceReplicationTask(targetHost, Action.Register, info, overriddenStatus:null, replicateInstanceInfo:true) { public EurekaHttpResponse<Void> execute() { return replicationClient.register(info); } }, expiryTime ); }
而之后就和 Eureka Client 发起注册请求的调用差不多 replicationClient.register(info)
至此, Spring Cloud Eureka Server 的整个自动配置及初始化, 以及接收注册请求, 并复制到集群中的对等节点就分析完了
大体时序流程参考:
参考:
Dive into Eureka https://nobodyiam.com/2016/06/25/dive-into-eureka/ : 宋顺
来源: https://www.cnblogs.com/trust-freedom/p/10310606.html