本文主要基于 Eureka 1.8.X 版本
1. 概述
2. Eureka-Client 发起注册
2.1 应用实例信息复制器
2.2 刷新应用实例信息
2.3 发起注册应用实例
3. Eureka-Server 接收注册
3.1 接收注册请求
3.2 Lease
3.3 注册应用实例信息
666. 彩蛋
RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表
RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
您对于源码的疑问每条留言都将得到认真回复.甚至不知道如何读源码也可以请教噢.
新的源码解析文章实时收到通知.每周更新一篇左右.
认真的源码交流微信群.
1. 概述
本文主要分享 Eureka-Client 向 Eureka-Server 注册应用实例的过程.
FROM 《深度剖析服务发现组件 Netflix Eureka》 二次编辑
蓝框部分,为本文重点.
非蓝框部分,Eureka-Server 集群间复制注册的应用实例信息,不在本文内容范畴.
推荐 Spring Cloud 书籍:
请支持正版.下载盗版,等于主动编写低级 BUG .
程序猿 DD —— 《Spring Cloud 微服务实战》
周立 —— 《Spring Cloud 与 Docker 微服务架构实战》
两书齐买,京东包邮.
2. Eureka-Client 发起注册
Eureka-Client 向 Eureka-Server 发起注册应用实例需要符合如下条件:
配置
eureka.registration.enabled = true
,Eureka-Client 向 Eureka-Server 发起注册应用实例的开关.
InstanceInfo 在 Eureka-Client 和 Eureka-Server 数据不一致.
每次 InstanceInfo 发生属性变化时,标记 isInstanceInfoDirty 属性为 true,表示 InstanceInfo 在 Eureka-Client 和 Eureka-Server 数据不一致,需要注册.另外,InstanceInfo 刚被创建时,在 Eureka-Server 不存在,也会被注册.
当符合条件时,InstanceInfo 不会立即向 Eureka-Server 注册,而是后台线程定时注册.
当 InstanceInfo 的状态 (status) 属性发生变化时,并且配置
eureka.shouldOnDemandUpdateStatusChange = true
时,立即向 Eureka-Server 注册.因为状态属性非常重要,一般情况下建议开启,当然默认情况也是开启的.
Let's Go.让我们看看代码的实现.
2.1 应用实例信息复制器
* 应用实例状态变更监听器
// DiscoveryClient.java
public
class
DiscoveryClient
implements
EurekaClient
{
/**
* 应用实例信息复制器
* /
private ApplicationInfoManager.StatusChangeListener statusChangeListener;
/ * *
,应用实例信息复制器.
*/
private InstanceInfoReplicator instanceInfoReplicator;
private
void
initScheduledTasks
()
{
// ... 省略无关代码
if (clientConfig.shouldRegisterWithEureka()) {
// ... 省略无关代码
// 创建 应用实例信息复制器
// InstanceInfo replicator
instanceInfoReplicator = new InstanceInfoReplicator(
this,
instanceInfo,
clientConfig.getInstanceInfoReplicationIntervalSeconds(),
2); // burstSize
// 创建 应用实例状态变更监听器
statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
@Override
public String getId()
{
return "statusChangeListener";
}
@Override
public
void
notify
(StatusChangeEvent statusChangeEvent)
{
if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
// log at warn level if DOWN was involved
logger.warn("Saw local status change event {}", statusChangeEvent);
} else {
logger.info("Saw local status change event {}", statusChangeEvent);
}
instanceInfoReplicator.onDemandUpdate();
}
};
// 注册 应用实例状态变更监听器
if (clientConfig.shouldOnDemandUpdateStatusChange()) {
applicationInfoManager.registerStatusChangeListener(statusChangeListener);
}
// 开启 应用实例信息复制器
instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
}
}
}
com.netflix.discovery.InstanceInfoReplicator
调用
InstanceInfoReplicator#start(...)
方法,开启应用实例信息复制器.实现代码如下:
* 应用实例信息
// InstanceInfoReplicator.java
class
InstanceInfoReplicator
implements
Runnable
{
private static final Logger logger = LoggerFactory.getLogger(InstanceInfoReplicator.class);
private final DiscoveryClient discoveryClient;
/**
* 定时执行频率,单位:秒
* /
private final InstanceInfo instanceInfo;
/ * *
* 定时执行器
* /
private final int replicationIntervalSeconds;
/ * *
* 定时执行任务的 Future
* /
private final ScheduledExecutorService scheduler;
/ * *
* 是否开启调度
* /
private final AtomicReference scheduledPeriodicRef;
/ * *
执行
*/
private final AtomicBoolean started;
private final RateLimiter rateLimiter; // 限流相关,跳过
private
final
int
burstSize;
// 限流相关,跳过
private
final
int
allowedRatePerMinute;
// 限流相关,跳过
InstanceInfoReplicator(DiscoveryClient discoveryClient, InstanceInfo instanceInfo, int replicationIntervalSeconds, int burstSize) {
this.discoveryClient = discoveryClient;
this.instanceInfo = instanceInfo;
this.scheduler = Executors.newScheduledThreadPool(1,
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-InstanceInfoReplicator-%d")
.setDaemon(true)
.build());
this.scheduledPeriodicRef = new AtomicReference();
this.started = new AtomicBoolean(false);
this.rateLimiter = new RateLimiter(TimeUnit.MINUTES);
this.replicationIntervalSeconds = replicationIntervalSeconds;
this.burstSize = burstSize;
this
.allowedRatePerMinute =
60
*
this
.burstSize /
this
.replicationIntervalSeconds;
logger.info("InstanceInfoReplicator onDemand update allowed rate per min is {}", allowedRatePerMinute);
}
public
void
start
(int initialDelayMs)
{
if (started.compareAndSet(false, true)) {
// 设置 应用实例信息 数据不一致
instanceInfo.setIsDirty(); // for initial register
// 提交任务,并设置该任务的 Future
Future next = scheduler.schedule(this, initialDelayMs, TimeUnit.SECONDS);
scheduledPeriodicRef.set(next);
}
}
// ... 省略无关方法
}
// InstanceInfo.java
private
volatile
boolean
isInstanceInfoDirty =
false
;
private volatile Long lastDirtyTimestamp;
public
synchronized
void
setIsDirty
()
{
isInstanceInfoDirty = true;
lastDirtyTimestamp = System.currentTimeMillis();
}
instanceInfo.setIsDirty()
代码块,因为 InstanceInfo 刚被创建时,在 Eureka-Server 不存在,也会被注册.
调用
ScheduledExecutorService#schedule(...)
方法,延迟 initialDelayMs 毫秒执行一次任务.为什么此处设置
scheduledPeriodicRef
?在
InstanceInfoReplicator#onDemandUpdate()
方法会看到具体用途.
定时检查 InstanceInfo 的状态 (status) 属性是否发生变化.若是,发起注册.实现代码如下:
调用
// InstanceInfoReplicator.java
@Override
public
void
run
()
{
try {
// 刷新 应用实例信息
discoveryClient.refreshInstanceInfo();
// 判断 应用实例信息 是否数据不一致
Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
if (dirtyTimestamp != null) {
// 发起注册
discoveryClient.register();
// 设置 应用实例信息 数据一致
instanceInfo.unsetIsDirty(dirtyTimestamp);
}
} catch (Throwable t) {
logger.warn("There was a problem with the instance info replicator", t);
} finally {
// 提交任务,并设置该任务的 Future
Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
scheduledPeriodicRef.set(next);
}
}
// InstanceInfo.java
public
synchronized
long
setIsDirtyWithTime
()
{
setIsDirty();
return lastDirtyTimestamp;
}
public
synchronized
void
unsetIsDirty
(long unsetDirtyTimestamp)
{
if (lastDirtyTimestamp <= unsetDirtyTimestamp) {
isInstanceInfoDirty = false;
} else {
}
}
DiscoveryClient#refreshInstanceInfo()
方法,刷新应用实例信息.此处可能导致应用实例信息数据不一致,在 「2.2」刷新应用实例信息 详细解析.
调用
DiscoveryClient#register()
方法,Eureka-Client 向 Eureka-Server 注册应用实例.
调用
ScheduledExecutorService#schedule(...)
方法,再次延迟执行任务,并设置
scheduledPeriodicRef
.通过这样的方式,不断循环定时执行任务.
com.netflix.appinfo.ApplicationInfoManager.StatusChangeListener
内部类,监听应用实例信息状态的变更.
调用
ApplicationInfoManager#registerStatusChangeListener(...)
方法,注册应用实例状态变更监听器.实现代码如下:
* 状态变更监听器
public
class ApplicationInfoManager
{
/**
业务里,调用
* /
protected final Map listeners;
public
void
registerStatusChangeListener
(StatusChangeListener listener)
{
listeners.put(listener.getId(), listener);
}
}/
ApplicationInfoManager#setInstanceStatus(...)
方法,设置应用实例信息的状态,从而通知
InstanceInfoReplicator#onDemandUpdate()
方法的调用.实现代码如下:
,实现代码如下:
// ApplicationInfoManager.java
public
synchronized
void
setInstanceStatus
(InstanceStatus status)
{
InstanceStatus next = instanceStatusMapper.map(status);
if (next == null) {
return;
}
InstanceStatus prev = instanceInfo.setStatus(next);
if (prev != null) {
for (StatusChangeListener listener : listeners.values()) {
try {
listener.notify(new StatusChangeEvent(prev, next));
} catch (Exception e) {
logger.warn("failed to notify listener: {}", listener.getId(), e);
}
}
}
}
// InstanceInfo.java
public
synchronized
InstanceStatus
setStatus
(InstanceStatus status)
{
if (this.status != status) {
InstanceStatus prev = this.status;
this.status = status;
// 设置 应用实例信息 数据一致
setIsDirty();
return prev;
}
return null;
}
InstanceInfoReplicator#onDemandUpdate()
调用
// InstanceInfoReplicator.java
public
boolean
onDemandUpdate
()
{
if (rateLimiter.acquire(burstSize, allowedRatePerMinute)) { // 限流相关,跳过
scheduler.submit(new Runnable() {
@Override
public
void
run
()
{
logger.debug("Executing on-demand update of local InstanceInfo");
// 取消任务
Future latestPeriodic = scheduledPeriodicRef.get();
if (latestPeriodic != null && !latestPeriodic.isDone()) {
logger.debug("Canceling the latest scheduled update, it will be rescheduled at the end of on demand update");
latestPeriodic.cancel(false);
}
// 再次调用
InstanceInfoReplicator.this.run();
}
});
return true;
} else {
logger.warn("Ignoring onDemand update due to rate limiter");
return false;
}
}
Future#cancel(false)
方法,取消定时任务,避免无用的注册.
调用
InstanceInfoReplicator#run()
方法,发起注册.
2.2 刷新应用实例信息
调用
DiscoveryClient#refreshInstanceInfo()
方法,刷新应用实例信息.此处可能导致应用实例信息数据不一致,实现代码如下:
调用
void refreshInstanceInfo() {
// 刷新 数据中心信息
applicationInfoManager.refreshDataCenterInfoIfRequired();
// 刷新 租约信息
applicationInfoManager.refreshLeaseInfoIfRequired();
// 健康检查
InstanceStatus status;
try {
status = getHealthCheckHandler().getStatus(instanceInfo.getStatus());
} catch(Exception e) {
logger.warn("Exception from healthcheckHandler.getStatus, setting status to DOWN", e);
status = InstanceStatus.DOWN;
}
if (null != status) {
applicationInfoManager.setInstanceStatus(status);
}
}
ApplicationInfoManager#refreshDataCenterInfoIfRequired()
方法,刷新数据中心相关信息,实现代码如下:
关注应用实例信息的 hostName , ipAddr , dataCenterInfo 属性的变化.
// ApplicationInfoManager.java
public
void
refreshDataCenterInfoIfRequired
()
{
// hostname
String existingAddress = instanceInfo.getHostName();
String newAddress;
if (config instanceof RefreshableInstanceConfig) {
// Refresh data center info, and return up to date address
newAddress = ((RefreshableInstanceConfig) config).resolveDefaultAddress(true);
} else {
newAddress = config.getHostName(true);
}
// ip
String newIp = config.getIpAddress();
if (newAddress != null && !newAddress.equals(existingAddress)) {
logger.warn("The address changed from : {} => {}", existingAddress, newAddress);
// :( in the legacy code here the builder is acting as a mutator.
// This is hard to fix as this same instanceInfo instance is referenced elsewhere.
// We will most likely re-write the client at sometime so not fixing for now.
InstanceInfo.Builder builder = new InstanceInfo.Builder(instanceInfo);
builder.setHostName(newAddress) // hostname
.setIPAddr(newIp) // ip
.setDataCenterInfo(config.getDataCenterInfo()); // dataCenterInfo
instanceInfo.setIsDirty();
}
}
public
abstract
class
AbstractInstanceConfig
implements
EurekaInstanceConfig
{
private static final Pair hostInfo = getHostInfo();
@Override
public
String
getHostName
(boolean refresh)
{
return hostInfo.second();
}
@Override
public String getIpAddress()
{
return hostInfo.first();
}
private
static
Pair
getHostInfo
()
{
Pair pair;
try {
InetAddress localHost = InetAddress.getLocalHost();
pair = new Pair(localHost.getHostAddress(), localHost.getHostName());
} catch (UnknownHostException e) {
logger.error("Cannot get host info", e);
pair = new Pair("", "");
}
return pair;
}
}
一般情况下,我们使用的是非 RefreshableInstanceConfig 实现的配置类 (一般是 MyDataCenterInstanceConfig),因为
AbstractInstanceConfig.hostInfo
是静态属性,即使本机修改了 IP 等信息,Eureka-Client 进程也不会感知到.TODO[0022]:看下 springcloud 的实现
调用
ApplicationInfoManager#refreshLeaseInfoIfRequired()
方法,刷新租约相关信息,实现代码如下:
关注应用实例信息的
public
void
refreshLeaseInfoIfRequired
()
{
LeaseInfo leaseInfo = instanceInfo.getLeaseInfo();
if (leaseInfo == null) {
return;
}
int currentLeaseDuration = config.getLeaseExpirationDurationInSeconds();
int currentLeaseRenewal = config.getLeaseRenewalIntervalInSeconds();
if (leaseInfo.getDurationInSecs() != currentLeaseDuration // 租约过期时间 改变
|| leaseInfo.getRenewalIntervalInSecs() != currentLeaseRenewal) { // 租约续约频率 改变
LeaseInfo newLeaseInfo = LeaseInfo.Builder.newBuilder()
.setRenewalIntervalInSecs(currentLeaseRenewal)
.setDurationInSecs(currentLeaseDuration)
.build();
instanceInfo.setLeaseInfo(newLeaseInfo);
instanceInfo.setIsDirty();
}
}
renewalIntervalInSecs
, durationInSecs 属性的变化.
调用
HealthCheckHandler#getStatus()
方法,健康检查.这里先暂时跳过,我们在 TODO[0004]:健康检查 详细解析.
2.3 发起注册应用实例
调用
DiscoveryClient#register()
方法,Eureka-Client 向 Eureka-Server 注册应用实例,实现代码如下:
调用
// DiscoveryClient.java
boolean
register
()
throws
Throwable
{
logger.info(PREFIX + appPathIdentifier + ": registering service...");
EurekaHttpResponse httpResponse;
try {
httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
} catch (Exception e) {
logger.warn("{} - registration failed {}", PREFIX + appPathIdentifier, e.getMessage(), e);
throw e;
}
if (logger.isInfoEnabled()) {
logger.info("{} - registration status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode());
}
return httpResponse.getStatusCode() == 204;
}
// AbstractJerseyEurekaHttpClient.java
@Override
public EurekaHttpResponse register(InstanceInfo info)
{
String urlPath = "apps/" + info.getAppName();
ClientResponse response = null;
try {
Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder();
addExtraHeaders(resourceBuilder);
response = resourceBuilder
.header("Accept-Encoding", "gzip")
.type(MediaType.APPLICATION_JSON_TYPE)
.accept(MediaType.APPLICATION_JSON)
.post(ClientResponse.class, info);
return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
} finally {
if (logger.isDebugEnabled()) {
logger.debug("Jersey HTTP POST {}/{} with instance {}; statusCode={}", serviceUrl, urlPath, info.getId(),
response == null ? "N/A" : response.getStatus());
}
if (response != null) {
response.close();
}
}
}
AbstractJerseyEurekaHttpClient#register(...)
方法,POST 请求 Eureka-Server 的 apps/${APP_NAME} 接口,参数为 InstanceInfo ,实现注册实例信息的注册.
3. Eureka-Server 接收注册
3.1 接收注册请求
com.netflix.eureka.resources.ApplicationResource
,处理单个应用的请求操作的 Resource (Controller).
注册应用实例信息的请求,映射
ApplicationResource#addInstance()
方法,实现代码如下:
请求头 isReplication 参数,和 Eureka-Server 集群复制相关,暂时跳过.
@Produces({"application/xml", "application/json")
public
class ApplicationResource
{
@POST
@Consumes({"application/json", "application/xml")
public Response addInstance(InstanceInfo info,
@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
// 校验参数是否合法
logger.debug("Registering instance {} (replication={})", info.getId(), isReplication);
// validate that the instanceinfo contains all the necessary required fields
if (isBlank(info.getId())) {
return Response.status(400).entity("Missing instanceId").build();
} else if (isBlank(info.getHostName())) {
return Response.status(400).entity("Missing hostname").build();
} else if (isBlank(info.getIPAddr())) {
return Response.status(400).entity("Missing ip address").build();
} else if (isBlank(info.getAppName())) {
return Response.status(400).entity("Missing appName").build();
} else if (!appName.equals(info.getAppName())) {
return
Response.status(
400
).entity(
"Mismatched appName, expecting "
+ appName +
" but was "
+ info.getAppName()).build();
} else if (info.getDataCenterInfo() == null) {
return Response.status(400).entity("Missing dataCenterInfo").build();
} else if (info.getDataCenterInfo().getName() == null) {
return Response.status(400).entity("Missing dataCenterInfo Name").build();
}
// AWS 相关,跳过
// handle cases where clients may be registering with bad DataCenterInfo with missing data
DataCenterInfo dataCenterInfo = info.getDataCenterInfo();
if (dataCenterInfo instanceof UniqueIdentifier) {
String dataCenterInfoId = ((UniqueIdentifier) dataCenterInfo).getId();
if (isBlank(dataCenterInfoId)) {
boolean experimental = "true".equalsIgnoreCase(serverConfig.getExperimental("registration.validation.dataCenterInfoId"));
if (experimental) {
String entity = "DataCenterInfo of type " + dataCenterInfo.getClass() + " must contain a valid id";
return Response.status(400).entity(entity).build();
} else if (dataCenterInfo instanceof AmazonInfo) {
AmazonInfo amazonInfo = (AmazonInfo) dataCenterInfo;
String effectiveId = amazonInfo.get(AmazonInfo.MetaDataKey.instanceId);
if (effectiveId == null) {
amazonInfo.getMetadata().put(AmazonInfo.MetaDataKey.instanceId.getName(), info.getId());
}
} else {
logger.warn("Registering DataCenterInfo of type {} without an appropriate id", dataCenterInfo.getClass());
}
}
}
// 注册应用实例信息
registry.register(info, "true".equals(isReplication));
// 返回 204 成功
return Response.status(204).build(); // 204 to be backwards compatible
}
}
调用
PeerAwareInstanceRegistryImpl#register(...)
方法,注册应用实例信息.实现代码如下:
调用父类
@Override
public
void
register
(final InstanceInfo info, final boolean isReplication)
{
// 租约过期时间
int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
leaseDuration = info.getLeaseInfo().getDurationInSecs();
}
// 注册应用实例信息
super.register(info, leaseDuration, isReplication);
// Eureka-Server 复制
replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
}
AbstractInstanceRegistry#register(...)
方法,注册应用实例信息.
3.2 Lease
在看具体的注册应用实例信息的逻辑之前,我们先来看下
com.netflix.eureka.lease.Lease
,租约.实现代码如下:
* 实体
public
class Lease
{
/**
* 注册时间戳
* /
private T holder;
/ * *
* 开始服务时间戳
* /
private long registrationTimestamp;
/ * *
* 取消注册时间戳
* /
private long serviceUpTimestamp;
/ * *
* 最后更新时间戳
* /
private long evictionTimestamp;
/ * *
* 租约持续时长,单位:毫秒
* /
/ / Make it volatile so that the expiration task would see this quicker private volatile long lastUpdateTimestamp;
/***/
holder 属性,租约的持有者.在 Eureka-Server 里,暂时只有 InstanceInfo 使用.
* /
private long duration;
public
Lease
(T r, int durationInSecs)
{
holder = r;
registrationTimestamp = System.currentTimeMillis();
lastUpdateTimestamp = registrationTimestamp;
duration = (durationInSecs * 1000);
}
}/
registrationTimestamp
属性,注册 (创建) 租约时间戳.在构造方法里可以看租约对象的创建时间戳即为注册租约时间戳.
serviceUpTimestamp 属性,开始服务时间戳.注册应用实例信息会使用到它如下两个方法,实现代码如下:
属性,最后更新租约时间戳.每次续租时,更新该时间戳.注册应用实例信息会使用到它如下方法,实现代码如下:
public
void
serviceUp
()
{
if (serviceUpTimestamp == 0) { // 第一次有效
serviceUpTimestamp = System.currentTimeMillis();
}
}
public
void
setServiceUpTimestamp
(long serviceUpTimestamp)
{
this.serviceUpTimestamp = serviceUpTimestamp;
}
lastUpdatedTimestamp
duration 属性,租约持续时间,单位:毫秒.当租约过久未续租,即当前时间 -
public
void
setLastUpdatedTimestamp
()
{
this.lastUpdatedTimestamp = System.currentTimeMillis();
}
lastUpdatedTimestamp
> duration 时,租约过期.
evictionTimestamp 属性,租约过期时间戳.
3.3 注册应用实例信息
调用
AbstractInstanceRegistry#register(...)
方法,注册应用实例信息,实现代码如下:
第 3 行 :添加到应用实例覆盖状态映射,在 《Eureka 源码解析 —— Eureka-Server 集群同步》 详细解析.
1
:
public
void
register
(InstanceInfo registrant, int leaseDuration, boolean isReplication)
{
2: try {
3: // 获取读锁
4: read.lock();
5: Map> gMap = registry.get(registrant.getAppName());
6: // 增加 注册次数 到 监控
7: REGISTER.increment(isReplication);
8: // 获得 应用实例信息 对应的 租约
9: if (gMap == null) {
10: final ConcurrentHashMap> gNewMap = new ConcurrentHashMap>();
11: gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap); // 添加 应用
12
:
if
(gMap ==
null
) {
// 添加 应用 成功
13: gMap = gNewMap;
14: }
15: }
16: Lease existingLease = gMap.get(registrant.getId());
17: // Retain the last dirty timestamp without overwriting it, if there is already a lease
18
:
if
(existingLease !=
null
&& (existingLease.getHolder() !=
null
)) {
// 已存在时,使用数据不一致的时间大的应用注册信息为有效的
19: Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp(); // Server 注册的 InstanceInfo
20: Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp(); // Client 请求的 InstanceInfo
21: logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
22:
23: // this is a > instead of a >= because if the timestamps are equal, we still take the remote transmitted
24: // InstanceInfo instead of the server local copy.
25: if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
26: logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" +
27: " than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
28: logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");
29: registrant = existingLease.getHolder();
30: }
31: } else {
32: // The lease does not exist and hence it is a new registration
33: // 【自我保护机制】增加 `numberOfRenewsPerMinThreshold` ,`expectedNumberOfRenewsPerMin`
34: synchronized (lock) {
35
:
if
(
this
.expectedNumberOfRenewsPerMin >
0
) {
36: // Since the client wants to cancel it, reduce the threshold
37: // (1
38: // for 30 seconds, 2 for a minute)
39
:
this
.expectedNumberOfRenewsPerMin =
this
.expectedNumberOfRenewsPerMin +
2
;
40: this.numberOfRenewsPerMinThreshold =
41: (int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
42: }
43: }
44: logger.debug("No previous lease information found; it is new registration");
45: }
46: // 创建 租约
47: Lease lease = new Lease(registrant, leaseDuration);
48
:
if
(existingLease !=
null
) {
// 若租约已存在,设置 租约的开始服务的时间戳
49: lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
50: }
51: // 添加到 租约映射
52: gMap.put(registrant.getId(), lease);
53: // 添加到 最近注册的调试队列
54: synchronized (recentRegisteredQueue) {
55: recentRegisteredQueue.add(new Pair(
56: System.currentTimeMillis(),
57: registrant.getAppName() + "(" + registrant.getId() + ")"));
58: }
59: // 添加到 应用实例覆盖状态映射(Eureka-Server 初始化使用)
60: // This is where the initial state transfer of overridden status happens
61: if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {
62: logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "
63: + "overrides", registrant.getOverriddenStatus(), registrant.getId());
64: if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {
65: logger.info("Not found overridden id {} and hence adding it", registrant.getId());
66: overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());
67: }
68: }
69: InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());
70: if (overriddenStatusFromMap != null) {
71: logger.info("Storing overridden status {} from map", overriddenStatusFromMap);
72: registrant.setOverriddenStatus(overriddenStatusFromMap);
73: }
74:
75: // 获得应用实例最终状态,并设置应用实例的状态
76: // Set the status based on the overridden status rules
77: InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);
78: registrant.setStatusWithoutDirty(overriddenInstanceStatus);
79:
80: // 设置 租约的开始服务的时间戳(只有第一次有效)
81: // If the lease is registered with UP status, set lease service up timestamp
82: if (InstanceStatus.UP.equals(registrant.getStatus())) {
83: lease.serviceUp();
84: }
85: // 设置 应用实例信息的操作类型 为 添加
86: registrant.setActionType(ActionType.ADDED);
87: // 添加到 最近租约变更记录队列
88: recentlyChangedQueue.add(new RecentlyChangedItem(lease));
89: // 设置 租约的最后更新时间戳
90: registrant.setLastUpdatedTimestamp();
91: // 设置 响应缓存 过期
92: invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
93: logger.info("Registered instance {}/{} with status {} (replication={})",
94: registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);
95: } finally {
96: // 释放锁
97: read.unlock();
98: }
99: }
第 6 至 7 行 :增加注册次数到监控.配合 Netflix Servo 实现监控信息采集.
第 5 至 16 行 :获得应用实例信息对应的租约.registry 实现代码如下:
/**
* 租约映射
* key1 :应用名 {@link InstanceInfo#appName}
* key2 :应用实例信息编号 {@link InstanceInfo#instanceId}
* value :租约
第 17 至 30 行 :当租约已存在,判断 Server 已存在的 InstanceInfo 的 lastDirtyTimestamp 是否大于 (不包括等于) Client 请求的 InstanceInfo ,若是,使用 Server 的 InstanceInfo 进行替代.
* /
private final ConcurrentHashMap>> registry = new ConcurrentHashMap>>();/
第 31 至 44 行 :增加
,自我保护机制相关,在 《Eureka 源码解析 —— 应用实例注册发现(四)之自我保护机制》 有详细解析.
numberOfRenewsPerMinThreshold
,
expectedNumberOfRenewsPerMin
第 45 至 52 行 :创建租约,并添加到租约映射 (registry).
第 53 至 58 行 :添加到最近注册的调试队列 (
recentRegisteredQueue
),用于 Eureka-Server 运维界面的显示,无实际业务逻辑使用.实现代码如下:
/**
* 最近注册的调试队列
* key :添加时的时间戳
* value :字符串 = 应用名(应用实例信息编号)
* 循环队列
* /
private final CircularQueue> recentRegisteredQueue;
/ * *
*
* @param 泛型
* 队列大小
* /
private
class
CircularQueue
<
E
>
extends
ConcurrentLinkedQueue
<
E
>
{
/ * *
* 保证空间足够
* /
private int size = 0;
public
CircularQueue
(int size)
{
this.size = size;
}
@Override
public
boolean
add
(E e)
{
this.makeSpaceIfNotAvailable();
return super.add(e);
}
/ * *
*
* 当空间不够时,移除首元素
第 59 至 68 行 :添加到应用实例覆盖状态映射,在 《Eureka 源码解析 —— Eureka-Server 集群同步》 详细解析.
* /
private
void
makeSpaceIfNotAvailable
()
{
if (this.size() == size) {
this.remove();
}
}
public
boolean
offer
(E e)
{
this.makeSpaceIfNotAvailable();
return super.offer(e);
}
}/
第 69 至 73 行 :设置应用实例的覆盖状态 (overridestatus),避免注册应用实例后,丢失覆盖状态.在 《应用实例注册发现 (八)之覆盖状态》 详细解析.
第 75 至 78 行 : 获得应用实例最终状态,并设置应用实例的状态.在 《应用实例注册发现 (八)之覆盖状态》 详细解析.
第 80 至 84 行 :设置租约的开始服务的时间戳 ( 只有第一次有效 ).
第 85 至 88 行 :设置应用实例信息的操作类型为添加,并添加到最近租约变更记录队列 (
用于注册信息的增量获取,在 《应用实例注册发现 (七)之增量获取》 详细解析.实现代码如下:
recentlyChangedQueue
).
recentlyChangedQueue
/**
* 最近租约变更记录队列
第 89 至 90 行 :设置租约的最后更新时间戳.
* /
private ConcurrentLinkedQueue recentlyChangedQueue = new ConcurrentLinkedQueue();/
第 91 至 92 行 :设置响应缓存 (ResponseCache) 过期,在 《Eureka 源码解析 —— 应用实例注册发现 (六)之全量获取》 详细解析.
第 96 至 97 行 :释放锁.
666. 彩蛋
嘿嘿,蛮嗨的,比起前面几篇写配置相关的文章来说.
胖友,分享我的公众号 (芋道源码) 给你的胖友可好?
来源: http://www.suo.im/4C7DZx