本文主要基于 Eureka 1.8.X 版本
1. 概述
2. Eureka-Client 发起注册
2.1 应用实例信息复制器
2.2 刷新应用实例信息
2.3 发起注册应用实例
3. Eureka-Server 接收注册
3.1 接收注册请求
3.2 Lease
3.3 注册应用实例信息
666. 彩蛋
RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表
RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
您对于源码的疑问每条留言都将得到认真回复甚至不知道如何读源码也可以请教噢
新的源码解析文章实时收到通知每周更新一篇左右
认真的源码交流微信群
1. 概述
本文主要分享 Eureka-Client 向 Eureka-Server 注册应用实例的过程
FROM 深度剖析服务发现组件 Netflix Eureka 二次编辑
蓝框部分, 为本文重点
非蓝框部分, Eureka-Server 集群间复制注册的应用实例信息, 不在本文内容范畴
推荐 Spring Cloud 书籍:
请支持正版下载盗版, 等于主动编写低级 BUG
程序猿 DD Spring Cloud 微服务实战
周立 Spring Cloud 与 Docker 微服务架构实战
两书齐买, 京东包邮
推荐 Spring Cloud 视频:
Java 微服务实践 - Spring Boot
Java 微服务实践 - Spring Cloud
Java 微服务实践 - Spring Boot / Spring Cloud
2. Eureka-Client 发起注册
Eureka-Client 向 Eureka-Server 发起注册应用实例需要符合如下条件:
配置
eureka.registration.enabled = true
,Eureka-Client 向 Eureka-Server 发起注册应用实例的开关
InstanceInfo 在 Eureka-Client 和 Eureka-Server 数据不一致
每次 InstanceInfo 发生属性变化时, 标记 isInstanceInfoDirty 属性为 true, 表示 InstanceInfo 在 Eureka-Client 和 Eureka-Server 数据不一致, 需要注册另外, InstanceInfo 刚被创建时, 在 Eureka-Server 不存在, 也会被注册
当符合条件时, InstanceInfo 不会立即向 Eureka-Server 注册, 而是后台线程定时注册
当 InstanceInfo 的状态( status ) 属性发生变化时, 并且配置
eureka.shouldOnDemandUpdateStatusChange = true
时, 立即向 Eureka-Server 注册因为状态属性非常重要, 一般情况下建议开启, 当然默认情况也是开启的
Lets Go 让我们看看代码的实现
2.1 应用实例信息复制器
- // DiscoveryClient.java
- public class DiscoveryClient implements EurekaClient {
- /**
- * 应用实例状态变更监听器
- */
- private ApplicationInfoManager.StatusChangeListener statusChangeListener;
- /**
- * 应用实例信息复制器
- */
- private InstanceInfoReplicator instanceInfoReplicator;
- private void initScheduledTasks() {
- // ... 省略无关代码
- if (clientConfig.shouldRegisterWithEureka()) {
- // ... 省略无关代码
- // 创建 应用实例信息复制器
- // InstanceInfo replicator
- instanceInfoReplicator = new InstanceInfoReplicator(
- this,
- instanceInfo,
- clientConfig.getInstanceInfoReplicationIntervalSeconds(),
- 2); // burstSize
- // 创建 应用实例状态变更监听器
- statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
- @Override
- public String getId() {
- return "statusChangeListener";
- }
- @Override
- public void notify(StatusChangeEvent statusChangeEvent) {
- if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
- InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
- // log at warn level if DOWN was involved
- logger.warn("Saw local status change event {}", statusChangeEvent);
- } else {
- logger.info("Saw local status change event {}", statusChangeEvent);
- }
- instanceInfoReplicator.onDemandUpdate();
- }
- };
- // 注册 应用实例状态变更监听器
- if (clientConfig.shouldOnDemandUpdateStatusChange()) {
- applicationInfoManager.registerStatusChangeListener(statusChangeListener);
- }
- // 开启 应用实例信息复制器
- instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
- }
- }
- }
- com.netflix.discovery.InstanceInfoReplicator
, 应用实例信息复制器
调用
InstanceInfoReplicator#start(...)
方法, 开启应用实例信息复制器实现代码如下:
- // InstanceInfoReplicator.java
- class InstanceInfoReplicator implements Runnable {
- private static final Logger logger = LoggerFactory.getLogger(InstanceInfoReplicator.class);
- private final DiscoveryClient discoveryClient;
- /**
- * 应用实例信息
- */
- private final InstanceInfo instanceInfo;
- /**
- * 定时执行频率, 单位: 秒
- */
- private final int replicationIntervalSeconds;
- /**
- * 定时执行器
- */
- private final ScheduledExecutorService scheduler;
- /**
- * 定时执行任务的 Future
- */
- private final AtomicReference scheduledPeriodicRef;
- /**
- * 是否开启调度
- */
- private final AtomicBoolean started;
- private final RateLimiter rateLimiter; // 限流相关, 跳过
- private final int burstSize; // 限流相关, 跳过
- private final int allowedRatePerMinute; // 限流相关, 跳过
- InstanceInfoReplicator(DiscoveryClient discoveryClient, InstanceInfo instanceInfo, int replicationIntervalSeconds, int burstSize) {
- this.discoveryClient = discoveryClient;
- this.instanceInfo = instanceInfo;
- this.scheduler = Executors.newScheduledThreadPool(1,
- new ThreadFactoryBuilder()
- .setNameFormat("DiscoveryClient-InstanceInfoReplicator-%d")
- .setDaemon(true)
- .build());
- this.scheduledPeriodicRef = new AtomicReference();
- this.started = new AtomicBoolean(false);
- this.rateLimiter = new RateLimiter(TimeUnit.MINUTES);
- this.replicationIntervalSeconds = replicationIntervalSeconds;
- this.burstSize = burstSize;
- this.allowedRatePerMinute = 60 * this.burstSize / this.replicationIntervalSeconds;
- logger.info("InstanceInfoReplicator onDemand update allowed rate per min is {}", allowedRatePerMinute);
- }
- public void start(int initialDelayMs) {
- if (started.compareAndSet(false, true)) {
- // 设置 应用实例信息 数据不一致
- instanceInfo.setIsDirty(); // for initial register
- // 提交任务, 并设置该任务的 Future
- Future next = scheduler.schedule(this, initialDelayMs, TimeUnit.SECONDS);
- scheduledPeriodicRef.set(next);
- }
- }
- // ... 省略无关方法
- }
- // InstanceInfo.java
- private volatile boolean isInstanceInfoDirty = false;
- private volatile Long lastDirtyTimestamp;
- public synchronized void setIsDirty() {
- isInstanceInfoDirty = true;
- lastDirtyTimestamp = System.currentTimeMillis();
- }
执行
instanceInfo.setIsDirty()
代码块, 因为 InstanceInfo 刚被创建时, 在 Eureka-Server 不存在, 也会被注册
调用
ScheduledExecutorService#schedule(...)
方法, 延迟 initialDelayMs 毫秒执行一次任务为什么此处设置
scheduledPeriodicRef
? 在
InstanceInfoReplicator#onDemandUpdate()
方法会看到具体用途
定时检查 InstanceInfo 的状态( status ) 属性是否发生变化若是, 发起注册实现代码如下:
- // InstanceInfoReplicator.java
- @Override
- public void run() {
- try {
- // 刷新 应用实例信息
- discoveryClient.refreshInstanceInfo();
- // 判断 应用实例信息 是否数据不一致
- Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
- if (dirtyTimestamp != null) {
- // 发起注册
- discoveryClient.register();
- // 设置 应用实例信息 数据一致
- instanceInfo.unsetIsDirty(dirtyTimestamp);
- }
- } catch (Throwable t) {
- logger.warn("There was a problem with the instance info replicator", t);
- } finally {
- // 提交任务, 并设置该任务的 Future
- Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
- scheduledPeriodicRef.set(next);
- }
- }
- // InstanceInfo.java
- public synchronized long setIsDirtyWithTime() {
- setIsDirty();
- return lastDirtyTimestamp;
- }
- public synchronized void unsetIsDirty(long unsetDirtyTimestamp) {
- if (lastDirtyTimestamp <= unsetDirtyTimestamp) {
- isInstanceInfoDirty = false;
- } else {
- }
- }
调用
DiscoveryClient#refreshInstanceInfo()
方法, 刷新应用实例信息此处可能导致应用实例信息数据不一致, 在 2.2 刷新应用实例信息 详细解析
调用
DiscoveryClient#register()
方法, Eureka-Client 向 Eureka-Server 注册应用实例
调用
ScheduledExecutorService#schedule(...)
方法, 再次延迟执行任务, 并设置
scheduledPeriodicRef
通过这样的方式, 不断循环定时执行任务
com.netflix.appinfo.ApplicationInfoManager.StatusChangeListener
内部类, 监听应用实例信息状态的变更
调用
ApplicationInfoManager#registerStatusChangeListener(...)
方法, 注册应用实例状态变更监听器实现代码如下:
- public class ApplicationInfoManager {
- /**
- * 状态变更监听器
- */
- protected final Map listeners;
- public void registerStatusChangeListener(StatusChangeListener listener) {
- listeners.put(listener.getId(), listener);
- }
- }
业务里, 调用
ApplicationInfoManager#setInstanceStatus(...)
方法, 设置应用实例信息的状态, 从而通知
InstanceInfoReplicator#onDemandUpdate()
方法的调用实现代码如下:
- // ApplicationInfoManager.java
- public synchronized void setInstanceStatus(InstanceStatus status) {
- InstanceStatus next = instanceStatusMapper.map(status);
- if (next == null) {
- return;
- }
- InstanceStatus prev = instanceInfo.setStatus(next);
- if (prev != null) {
- for (StatusChangeListener listener : listeners.values()) {
- try {
- listener.notify(new StatusChangeEvent(prev, next));
- } catch (Exception e) {
- logger.warn("failed to notify listener: {}", listener.getId(), e);
- }
- }
- }
- }
- // InstanceInfo.java
- public synchronized InstanceStatus setStatus(InstanceStatus status) {
- if (this.status != status) {
- InstanceStatus prev = this.status;
- this.status = status;
- // 设置 应用实例信息 数据一致
- setIsDirty();
- return prev;
- }
- return null;
- }
- InstanceInfoReplicator#onDemandUpdate()
, 实现代码如下:
- // InstanceInfoReplicator.java
- public boolean onDemandUpdate() {
- if (rateLimiter.acquire(burstSize, allowedRatePerMinute)) { // 限流相关, 跳过
- scheduler.submit(new Runnable() {
- @Override
- public void run() {
- logger.debug("Executing on-demand update of local InstanceInfo");
- // 取消任务
- Future latestPeriodic = scheduledPeriodicRef.get();
- if (latestPeriodic != null && !latestPeriodic.isDone()) {
- logger.debug("Canceling the latest scheduled update, it will be rescheduled at the end of on demand update");
- latestPeriodic.cancel(false);
- }
- // 再次调用
- InstanceInfoReplicator.this.run();
- }
- });
- return true;
- } else {
- logger.warn("Ignoring onDemand update due to rate limiter");
- return false;
- }
- }
调用
Future#cancel(false)
方法, 取消定时任务, 避免无用的注册
调用
InstanceInfoReplicator#run()
方法, 发起注册
2.2 刷新应用实例信息
调用
DiscoveryClient#refreshInstanceInfo()
方法, 刷新应用实例信息此处可能导致应用实例信息数据不一致, 实现代码如下:
- void refreshInstanceInfo() {
- // 刷新 数据中心信息
- applicationInfoManager.refreshDataCenterInfoIfRequired();
- // 刷新 租约信息
- applicationInfoManager.refreshLeaseInfoIfRequired();
- // 健康检查
- InstanceStatus status;
- try {
- status = getHealthCheckHandler().getStatus(instanceInfo.getStatus());
- } catch (Exception e) {
- logger.warn("Exception from healthcheckHandler.getStatus, setting status to DOWN", e);
- status = InstanceStatus.DOWN;
- }
- if (null != status) {
- applicationInfoManager.setInstanceStatus(status);
- }
- }
调用
ApplicationInfoManager#refreshDataCenterInfoIfRequired()
方法, 刷新数据中心相关信息, 实现代码如下:
- // ApplicationInfoManager.java
- public void refreshDataCenterInfoIfRequired() {
- // hostname
- String existingAddress = instanceInfo.getHostName();
- String newAddress;
- if (config instanceof RefreshableInstanceConfig) {
- // Refresh data center info, and return up to date address
- newAddress = ((RefreshableInstanceConfig) config).resolveDefaultAddress(true);
- } else {
- newAddress = config.getHostName(true);
- }
- // ip
- String newIp = config.getIpAddress();
- if (newAddress != null && !newAddress.equals(existingAddress)) {
- logger.warn("The address changed from : {} => {}", existingAddress, newAddress);
- // :( in the legacy code here the builder is acting as a mutator.
- // This is hard to fix as this same instanceInfo instance is referenced elsewhere.
- // We will most likely re-write the client at sometime so not fixing for now.
- InstanceInfo.Builder builder = new InstanceInfo.Builder(instanceInfo);
- builder.setHostName(newAddress) // hostname
- .setIPAddr(newIp) // ip
- .setDataCenterInfo(config.getDataCenterInfo()); // dataCenterInfo
- instanceInfo.setIsDirty();
- }
- }
- public abstract class AbstractInstanceConfig implements EurekaInstanceConfig {
- private static final Pair hostInfo = getHostInfo();
- @Override
- public String getHostName(boolean refresh) {
- return hostInfo.second();
- }
- @Override
- public String getIpAddress() {
- return hostInfo.first();
- }
- private static Pair getHostInfo() {
- Pair pair;
- try {
- InetAddress localHost = InetAddress.getLocalHost();
- pair = new Pair(localHost.getHostAddress(), localHost.getHostName());
- } catch (UnknownHostException e) {
- logger.error("Cannot get host info", e);
- pair = new Pair("","");
- }
- return pair;
- }
- }
关注应用实例信息的 hostName ipAddr dataCenterInfo 属性的变化
一般情况下, 我们使用的是非 RefreshableInstanceConfig 实现的配置类( 一般是 MyDataCenterInstanceConfig ), 因为
AbstractInstanceConfig.hostInfo
是静态属性, 即使本机修改了 IP 等信息, Eureka-Client 进程也不会感知到 TODO[0022]: 看下 springcloud 的实现
调用
ApplicationInfoManager#refreshLeaseInfoIfRequired()
方法, 刷新租约相关信息, 实现代码如下:
- public void refreshLeaseInfoIfRequired() {
- LeaseInfo leaseInfo = instanceInfo.getLeaseInfo();
- if (leaseInfo == null) {
- return;
- }
- int currentLeaseDuration = config.getLeaseExpirationDurationInSeconds();
- int currentLeaseRenewal = config.getLeaseRenewalIntervalInSeconds();
- if (leaseInfo.getDurationInSecs() != currentLeaseDuration // 租约过期时间 改变
- || leaseInfo.getRenewalIntervalInSecs() != currentLeaseRenewal) { // 租约续约频率 改变
- LeaseInfo newLeaseInfo = LeaseInfo.Builder.newBuilder()
- .setRenewalIntervalInSecs(currentLeaseRenewal)
- .setDurationInSecs(currentLeaseDuration)
- .build();
- instanceInfo.setLeaseInfo(newLeaseInfo);
- instanceInfo.setIsDirty();
- }
- }
关注应用实例信息的
renewalIntervalInSecs
durationInSecs 属性的变化
调用
HealthCheckHandler#getStatus()
方法, 健康检查这里先暂时跳过, 我们在 TODO[0004]: 健康检查 详细解析
2.3 发起注册应用实例
调用
DiscoveryClient#register()
方法, Eureka-Client 向 Eureka-Server 注册应用实例, 实现代码如下:
- // DiscoveryClient.java
- boolean register() throws Throwable {
- logger.info(PREFIX + appPathIdentifier + ": registering service...");
- EurekaHttpResponse httpResponse;
- try {
- httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
- } catch (Exception e) {
- logger.warn("{} - registration failed {}", PREFIX + appPathIdentifier, e.getMessage(), e);
- throw e;
- }
- if (logger.isInfoEnabled()) {
- logger.info("{} - registration status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode());
- }
- return httpResponse.getStatusCode() == 204;
- }
- // AbstractJerseyEurekaHttpClient.java
- @Override
- public EurekaHttpResponse register(InstanceInfo info) {
- String urlPath = "apps/" + info.getAppName();
- ClientResponse response = null;
- try {
- Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder();
- addExtraHeaders(resourceBuilder);
- response = resourceBuilder
- .header("Accept-Encoding", "gzip")
- .type(MediaType.APPLICATION_JSON_TYPE)
- .accept(MediaType.APPLICATION_JSON)
- .post(ClientResponse.class, info);
- return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
- } finally {
- if (logger.isDebugEnabled()) {
- logger.debug("Jersey HTTP POST {}/{} with instance {}; statusCode={}", serviceUrl, urlPath, info.getId(),
- response == null ? "N/A" : response.getStatus());
- }
- if (response != null) {
- response.close();
- }
- }
- }
调用
AbstractJerseyEurekaHttpClient#register(...)
方法, POST 请求 Eureka-Server 的 apps/${APP_NAME} 接口, 参数为 InstanceInfo , 实现注册实例信息的注册
3. Eureka-Server 接收注册
3.1 接收注册请求
com.netflix.eureka.resources.ApplicationResource
, 处理单个应用的请求操作的 Resource ( Controller )
注册应用实例信息的请求, 映射
ApplicationResource#addInstance()
方法, 实现代码如下:
- @Produces({
- "application/xml",
- "application/json"
- }) public class ApplicationResource {@POST@Consumes({
- "application/json",
- "application/xml"
- }) public Response addInstance(InstanceInfo info, @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
- // 校验参数是否合法
- logger.debug("Registering instance {} (replication={})", info.getId(), isReplication);
- // validate that the instanceinfo contains all the necessary required fields
- if (isBlank(info.getId())) {
- return Response.status(400).entity("Missing instanceId").build();
- } else if (isBlank(info.getHostName())) {
- return Response.status(400).entity("Missing hostname").build();
- } else if (isBlank(info.getIPAddr())) {
- return Response.status(400).entity("Missing ip address").build();
- } else if (isBlank(info.getAppName())) {
- return Response.status(400).entity("Missing appName").build();
- } else if (!appName.equals(info.getAppName())) {
- return Response.status(400).entity("Mismatched appName, expecting" + appName + "but was" + info.getAppName()).build();
- } else if (info.getDataCenterInfo() == null) {
- return Response.status(400).entity("Missing dataCenterInfo").build();
- } else if (info.getDataCenterInfo().getName() == null) {
- return Response.status(400).entity("Missing dataCenterInfo Name").build();
- }
- // AWS 相关, 跳过
- // handle cases where clients may be registering with bad DataCenterInfo with missing data
- DataCenterInfo dataCenterInfo = info.getDataCenterInfo();
- if (dataCenterInfo instanceof UniqueIdentifier) {
- String dataCenterInfoId = ((UniqueIdentifier) dataCenterInfo).getId();
- if (isBlank(dataCenterInfoId)) {
- boolean experimental = "true".equalsIgnoreCase(serverConfig.getExperimental("registration.validation.dataCenterInfoId"));
- if (experimental) {
- String entity = "DataCenterInfo of type" + dataCenterInfo.getClass() + "must contain a valid id";
- return Response.status(400).entity(entity).build();
- } else if (dataCenterInfo instanceof AmazonInfo) {
- AmazonInfo amazonInfo = (AmazonInfo) dataCenterInfo;
- String effectiveId = amazonInfo.get(AmazonInfo.MetaDataKey.instanceId);
- if (effectiveId == null) {
- amazonInfo.getMetadata().put(AmazonInfo.MetaDataKey.instanceId.getName(), info.getId());
- }
- } else {
- logger.warn("Registering DataCenterInfo of type {} without an appropriate id", dataCenterInfo.getClass());
- }
- }
- }
- // 注册应用实例信息
- registry.register(info, "true".equals(isReplication));
- // 返回 204 成功
- return Response.status(204).build(); // 204 to be backwards compatible
- }
- }
请求头 isReplication 参数, 和 Eureka-Server 集群复制相关, 暂时跳过
调用
PeerAwareInstanceRegistryImpl#register(...)
方法, 注册应用实例信息实现代码如下:
- @Override
- public void register(final InstanceInfo info, final boolean isReplication) {
- // 租约过期时间
- int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
- if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs()> 0) {
- leaseDuration = info.getLeaseInfo().getDurationInSecs();
- }
- // 注册应用实例信息
- super.register(info, leaseDuration, isReplication);
- // Eureka-Server 复制
- replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
- }
调用父类
AbstractInstanceRegistry#register(...)
方法, 注册应用实例信息
3.2 Lease
在看具体的注册应用实例信息的逻辑之前, 我们先来看下
com.netflix.eureka.lease.Lease
, 租约实现代码如下:
- public class Lease {
- /**
- * 实体
- */
- private T holder;
- /**
- * 注册时间戳
- */
- private long registrationTimestamp;
- /**
- * 开始服务时间戳
- */
- private long serviceUpTimestamp;
- /**
- * 取消注册时间戳
- */
- private long evictionTimestamp;
- /**
- * 最后更新时间戳
- *$/ Make it volatile so that the expiration task would see this quicker
- private volatile long lastUpdateTimestamp;
- /**
- * 租约持续时长, 单位: 毫秒
- */
- private long duration;
- public Lease(T r, int durationInSecs) {
- holder = r;
- registrationTimestamp = System.currentTimeMillis();
- lastUpdateTimestamp = registrationTimestamp;
- duration = (durationInSecs * 1000);
- }
- }
holder 属性, 租约的持有者在 Eureka-Server 里, 暂时只有 InstanceInfo 使用
registrationTimestamp
属性, 注册 ( 创建 ) 租约时间戳在构造方法里可以看租约对象的创建时间戳即为注册租约时间戳
serviceUpTimestamp 属性, 开始服务时间戳注册应用实例信息会使用到它如下两个方法, 实现代码如下:
- public void serviceUp() {
- if (serviceUpTimestamp == 0) { // 第一次有效
- serviceUpTimestamp = System.currentTimeMillis();
- }
- }
- public void setServiceUpTimestamp(long serviceUpTimestamp) {
- this.serviceUpTimestamp = serviceUpTimestamp;
- }
- lastUpdatedTimestamp
属性, 最后更新租约时间戳每次续租时, 更新该时间戳注册应用实例信息会使用到它如下方法, 实现代码如下:
- public void setLastUpdatedTimestamp() {
- this.lastUpdatedTimestamp = System.currentTimeMillis();
- }
duration 属性, 租约持续时间, 单位: 毫秒当租约过久未续租, 即当前时间 -
lastUpdatedTimestamp> duration 时, 租约过期
evictionTimestamp 属性, 租约过期时间戳
3.3 注册应用实例信息
调用
AbstractInstanceRegistry#register(...)
方法, 注册应用实例信息, 实现代码如下:
- public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
- try {
- // 获取读锁
- read.lock();
- Map> gMap = registry.get(registrant.getAppName());
- // 增加 注册次数 到 监控
- REGISTER.increment(isReplication);
- // 获得 应用实例信息 对应的 租约
- if (gMap == null) {
- final ConcurrentHashMap> gNewMap = new ConcurrentHashMap>();
- gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap); // 添加 应用
- if (gMap == null) { // 添加 应用 成功
- gMap = gNewMap;
- }
- }
- Lease existingLease = gMap.get(registrant.getId());
- // Retain the last dirty timestamp without overwriting it, if there is already a lease
- if (existingLease != null && (existingLease.getHolder() != null)) { // 已存在时, 使用数据不一致的时间大的应用注册信息为有效的
- Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp(); // Server 注册的 InstanceInfo
- Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp(); // Client 请求的 InstanceInfo
- logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
- // this is a> instead of a>= because if the timestamps are equal, we still take the remote transmitted
- // InstanceInfo instead of the server local copy.
- if (existingLastDirtyTimestamp> registrationLastDirtyTimestamp) {
- logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater"+" than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
- logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");
- registrant = existingLease.getHolder();
- }
- } else {
- // The lease does not exist and hence it is a new registration
- // 自我保护机制增加 `numberOfRenewsPerMinThreshold` `expectedNumberOfRenewsPerMin`
- synchronized (lock) {
- if (this.expectedNumberOfRenewsPerMin> 0) {
- // Since the client wants to cancel it, reduce the threshold
- // (1
- // for 30 seconds, 2 for a minute)
- this.expectedNumberOfRenewsPerMin = this.expectedNumberOfRenewsPerMin + 2;
- this.numberOfRenewsPerMinThreshold =
- (int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
- }
- }
- logger.debug("No previous lease information found; it is new registration");
- }
- // 创建 租约
- Lease lease = new Lease(registrant, leaseDuration);
- if (existingLease != null) { // 若租约已存在, 设置 租约的开始服务的时间戳
- lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
- }
- // 添加到 租约映射
- gMap.put(registrant.getId(), lease);
- // 添加到 最近注册的调试队列
- synchronized (recentRegisteredQueue) {
- recentRegisteredQueue.add(new Pair(
- System.currentTimeMillis(),
- registrant.getAppName() + "(" + registrant.getId() + ")"));
- }
- // 添加到 应用实例覆盖状态映射(Eureka-Server 初始化使用)
- // This is where the initial state transfer of overridden status happens
- if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {
- logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the"
- + "overrides", registrant.getOverriddenStatus(), registrant.getId());
- if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {
- logger.info("Not found overridden id {} and hence adding it", registrant.getId());
- overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());
- }
- }
- InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());
- if (overriddenStatusFromMap != null) {
- logger.info("Storing overridden status {} from map", overriddenStatusFromMap);
- registrant.setOverriddenStatus(overriddenStatusFromMap);
- }
- // 获得应用实例最终状态, 并设置应用实例的状态
- // Set the status based on the overridden status rules
- InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);
- registrant.setStatusWithoutDirty(overriddenInstanceStatus);
- // 设置 租约的开始服务的时间戳(只有第一次有效)
- // If the lease is registered with UP status, set lease service up timestamp
- if (InstanceStatus.UP.equals(registrant.getStatus())) {
- lease.serviceUp();
- }
- // 设置 应用实例信息的操作类型 为 添加
- registrant.setActionType(ActionType.ADDED);
- // 添加到 最近租约变更记录队列
- recentlyChangedQueue.add(new RecentlyChangedItem(lease));
- // 设置 租约的最后更新时间戳
- registrant.setLastUpdatedTimestamp();
- // 设置 响应缓存 过期
- invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
- logger.info("Registered instance {}/{} with status {} (replication={})",
- registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);
- } finally {
- // 释放锁
- read.unlock();
- }
- }
第 3 行 : 添加到应用实例覆盖状态映射, 在 Eureka 源码解析 Eureka-Server 集群同步 详细解析
第 6 至 7 行 : 增加注册次数到监控配合 Netflix Servo 实现监控信息采集
第 5 至 16 行 : 获得应用实例信息对应的租约 registry 实现代码如下:
- /**
- * 租约映射
- * key1 : 应用名 {@link InstanceInfo#appName}
- * key2 : 应用实例信息编号 {@link InstanceInfo#instanceId}
- * value : 租约
- */
- private final ConcurrentHashMap>> registry = new ConcurrentHashMap>>();
第 17 至 30 行 : 当租约已存在, 判断 Server 已存在的 InstanceInfo 的 lastDirtyTimestamp 是否大于( 不包括等于 ) Client 请求的 InstanceInfo , 若是, 使用 Server 的 InstanceInfo 进行替代
第 31 至 44 行 : 增加
- numberOfRenewsPerMinThreshold
- expectedNumberOfRenewsPerMin
, 自我保护机制相关, 在 Eureka 源码解析 应用实例注册发现 (四) 之自我保护机制 有详细解析
第 45 至 52 行 : 创建租约, 并添加到租约映射( registry )
第 53 至 58 行 : 添加到最近注册的调试队列(
recentRegisteredQueue
), 用于 Eureka-Server 运维界面的显示, 无实际业务逻辑使用实现代码如下:
- /**
- * 最近注册的调试队列
- * key : 添加时的时间戳
- * value : 字符串 = 应用名(应用实例信息编号)
- */
- private final CircularQueue> recentRegisteredQueue;
- /**
- * 循环队列
- *
- * @param 泛型
- */
- private class CircularQueue extends ConcurrentLinkedQueue {
- /**
- * 队列大小
- */
- private int size = 0;
- public CircularQueue(int size) {
- this.size = size;
- }
- @Override
- public boolean add(E e) {
- this.makeSpaceIfNotAvailable();
- return super.add(e);
- }
- /**
- * 保证空间足够
- *
- * 当空间不够时, 移除首元素
- */
- private void makeSpaceIfNotAvailable() {
- if (this.size() == size) {
- this.remove();
- }
- }
- public boolean offer(E e) {
- this.makeSpaceIfNotAvailable();
- return super.offer(e);
- }
- }
第 59 至 68 行 : 添加到应用实例覆盖状态映射, 在 Eureka 源码解析 Eureka-Server 集群同步 详细解析
第 69 至 73 行 : 设置应用实例的覆盖状态 ( overridestatus ), 避免注册应用实例后, 丢失覆盖状态在应用实例注册发现 (八) 之覆盖状态详细解析
第 75 至 78 行 : 获得应用实例最终状态, 并设置应用实例的状态在应用实例注册发现 (八)之覆盖状态详细解析
第 80 至 84 行 : 设置租约的开始服务的时间戳( 只有第一次有效 )
第 85 至 88 行 : 设置应用实例信息的操作类型为添加, 并添加到最近租约变更记录队列(
- recentlyChangedQueue
- )
- recentlyChangedQueue
用于注册信息的增量获取, 在应用实例注册发现 (七)之增量获取详细解析实现代码如下:
- /**
- * 最近租约变更记录队列
- */
- private ConcurrentLinkedQueue recentlyChangedQueue = new ConcurrentLinkedQueue();
第 89 至 90 行 : 设置租约的最后更新时间戳
第 91 至 92 行 : 设置响应缓存 ( ResponseCache ) 过期, 在 Eureka 源码解析 应用实例注册发现 (六)之全量获取详细解析
第 96 至 97 行 : 释放锁
666. 彩蛋
嘿嘿, 蛮嗨的, 比起前面几篇写配置相关的文章来说
胖友, 分享我的公众号( 芋道源码 ) 给你的胖友可好?
来源: http://www.suo.im/1AkSut