本文主要基于 Eureka 1.8.X 版本
1. 概述
2. Eureka-Client 发起全量获取
2.1 初始化全量获取
2.2 定时获取
2.3 刷新注册信息缓存
2.4 发起获取注册信息
3. Eureka-Server 接收全量获取
3.1 接收全量获取请求
3.2 响应缓存 ResponseCache
3.3 缓存读取
3.4 主动过期读写缓存
3.5 被动过期读写缓存
3.6 定时刷新只读缓存
RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表
RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
您对于源码的疑问每条留言都将得到认真回复甚至不知道如何读源码也可以请教噢
新的源码解析文章实时收到通知每周更新一篇左右
认真的源码交流微信群
1. 概述
本文主要分享 Eureka-Client 向 Eureka-Server 获取全量注册信息的过程
FROM 深度剖析服务发现组件 Netflix Eureka
Eureka-Client 获取注册信息, 分成全量获取和增量获取默认配置下, Eureka-Client 启动时, 首先执行一次全量获取进行本地缓存注册信息, 而后每 30 秒增量获取刷新本地缓存( 非正常情况下会是全量获取 )
本文重点在于全量获取
推荐 Spring Cloud 书籍:
请支持正版下载盗版, 等于主动编写低级 BUG
程序猿 DD Spring Cloud 微服务实战
周立 Spring Cloud 与 Docker 微服务架构实战
两书齐买, 京东包邮
2. Eureka-Client 发起全量获取
本小节调用关系如下:
2.1 初始化全量获取
Eureka-Client 启动时, 首先执行一次全量获取进行本地缓存注册信息, 首先代码如下:
// DiscoveryClient.java
/**
* Applications 在本地的缓存
*/
private final AtomicReference localRegionApps = new AtomicReference();
DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args, Provider backupRegistryProvider) {
// ... 省略无关代码
// [3.2.5] 初始化应用集合在本地的缓存
localRegionApps.set(new Applications());
// ... 省略无关代码
// [3.2.12] 从 Eureka-Server 拉取注册信息
if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {
fetchRegistryFromBackup();
}
// ... 省略无关代码
}
com.netflix.discovery.shared.Applications, 注册的应用集合较为容易理解, 点击 链接 链接查看带中文注释的类, 这里就不啰嗦了 Applications 与 InstanceInfo 类关系如下:
配置 eureka.shouldFetchRegistry = true, 开启从 Eureka-Server 获取注册信息默认值: true
调用 #fetchRegistry(false) 方法, 从 Eureka-Server 全量获取注册信息, 在 2.4 发起获取注册信息 详细解析
2.2 定时获取
Eureka-Client 在初始化过程中, 创建获取注册信息线程, 固定间隔向 Eureka-Server 发起获取注册信息( fetch ), 刷新本地注册信息缓存实现代码如下:
// DiscoveryClient.java
DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
Provider backupRegistryProvider) {
// ... 省略无关代码
// [3.2.9] 初始化线程池
// default size of 2 - 1 each for heartbeat and cacheRefresh
scheduler = Executors.newScheduledThreadPool(2,
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-%d")
.setDaemon(true)
.build());
cacheRefreshExecutor = new ThreadPoolExecutor(
1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
new SynchronousQueue(),
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
.setDaemon(true)
.build()
); // use direct handoff
// ... 省略无关代码
// [3.2.14] 初始化定时任务
initScheduledTasks();
// ... 省略无关代码
}
private void initScheduledTasks() {
// 向 Eureka-Server 心跳(续租)执行器
if (clientConfig.shouldFetchRegistry()) {
// registry cache refresh timer
int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
scheduler.schedule(
new TimedSupervisorTask(
"cacheRefresh",
scheduler,
cacheRefreshExecutor,
registryFetchIntervalSeconds,
TimeUnit.SECONDS,
expBackOffBound,
new CacheRefreshThread()
),
registryFetchIntervalSeconds, TimeUnit.SECONDS);
}
// ... 省略无关代码
}
初始化定时任务代码, 和续租的定时任务代码类似, 在 Eureka 源码解析 应用实例注册发现 (二) 之续租 有详细解析, 这里不重复分享
com.netflix.discovery.DiscoveryClient.CacheRefreshThread, 注册信息缓存刷新任务, 实现代码如下:
class CacheRefreshThread implements Runnable {
public void run() {
refreshRegistry();
}
}
调用 #refreshRegistry(false) 方法, 刷新注册信息缓存, 在 2.3 刷新注册信息缓存 详细解析
2.3 刷新注册信息缓存
调用 #refreshRegistry(false) 方法, 刷新注册信息缓存, 实现代码如下:
// DiscoveryClient.java
void refreshRegistry() {
try {
// TODO 芋艿:TODO[0009]:RemoteRegionRegistry
boolean isFetchingRemoteRegionRegistries = isFetchingRemoteRegionRegistries();
boolean remoteRegionsModified = false;
// This makes sure that a dynamic change to remote regions to fetch is honored.
String latestRemoteRegions = clientConfig.fetchRegistryForRemoteRegions();
if (null != latestRemoteRegions) {
String currentRemoteRegions = remoteRegionsToFetch.get();
if (!latestRemoteRegions.equals(currentRemoteRegions)) {
// Both remoteRegionsToFetch and AzToRegionMapper.regionsToFetch need to be in sync
synchronized(instanceRegionChecker.getAzToRegionMapper()) {
if (remoteRegionsToFetch.compareAndSet(currentRemoteRegions, latestRemoteRegions)) {
String[] remoteRegions = latestRemoteRegions.split(",");
remoteRegionsRef.set(remoteRegions);
instanceRegionChecker.getAzToRegionMapper().setRegionsToFetch(remoteRegions);
remoteRegionsModified = true;
} else {
logger.info("Remote regions to fetch modified concurrently," + "ignoring change from {} to {}", currentRemoteRegions, latestRemoteRegions);
}
}
} else {
// Just refresh mapping to reflect any DNS/Property change
instanceRegionChecker.getAzToRegionMapper().refreshMapping();
}
}
boolean success = fetchRegistry(remoteRegionsModified);
if (success) {
// 设置 注册信息的应用实例数
registrySize = localRegionApps.get().size();
// 设置 最后获取注册信息时间
lastSuccessfulRegistryFetchTimestamp = System.currentTimeMillis();
}
// 打印日志
if (logger.isDebugEnabled()) {
StringBuilder allAppsHashCodes = new StringBuilder();
allAppsHashCodes.append("Local region apps hashcode:");
allAppsHashCodes.append(localRegionApps.get().getAppsHashCode());
allAppsHashCodes.append(", is fetching remote regions?");
allAppsHashCodes.append(isFetchingRemoteRegionRegistries);
for (Map.Entry entry: remoteRegionVsApps.entrySet()) {
allAppsHashCodes.append(", Remote region:");
allAppsHashCodes.append(entry.getKey());
allAppsHashCodes.append(", apps hashcode:");
allAppsHashCodes.append(entry.getValue().getAppsHashCode());
}
logger.debug("Completed cache refresh task for discovery. All Apps hash code is {}", allAppsHashCodes.toString());
}
} catch(Throwable e) {
logger.error("Cannot fetch registry from server", e);
}
}
第 3 至 28 行 :TODO[0009]:RemoteRegionRegistry
第 30 行 : 调用 #fetchRegistry(false) 方法, 从 Eureka-Server 获取注册信息, 在 2.4 发起获取注册信息 详细解析
第 31 至 36 行 : 获取注册信息成功, 设置注册信息的应用实例数, 最后获取注册信息时间变量代码如下:
/**
* 注册信息的应用实例数
*/
private volatile int registrySize = 0;
/**
* 最后成功从 Eureka-Server 拉取注册信息时间戳
*/
private volatile long lastSuccessfulRegistryFetchTimestamp = -1;
第 38 至 53 行 : 打印调试日志
第 54 至 56 行 : 打印异常日志
2.4 发起获取注册信息
调用 #fetchRegistry(false) 方法, 从 Eureka-Server 获取注册信息( 根据条件判断, 可能是全量, 也可能是增量 ), 实现代码如下:
private boolean fetchRegistry(boolean forceFullRegistryFetch) {
Stopwatch tracer = FETCH_REGISTRY_TIMER.start();
try {
// 获取 本地缓存的注册的应用实例集合
// If the delta is disabled or if it is the first time, get all
// applications
Applications applications = getApplications();
// 全量获取
if (clientConfig.shouldDisableDelta() // 禁用增量获取
|| (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress())) || forceFullRegistryFetch || (applications == null) // 空
|| (applications.getRegisteredApplications().size() == 0) // 空
|| (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
{
logger.info("Disable delta property : {}", clientConfig.shouldDisableDelta());
logger.info("Single vip registry refresh property : {}", clientConfig.getRegistryRefreshSingleVipAddress());
logger.info("Force full registry fetch : {}", forceFullRegistryFetch);
logger.info("Application is null : {}", (applications == null));
logger.info("Registered Applications size is zero : {}", (applications.getRegisteredApplications().size() == 0));
logger.info("Application version is -1: {}", (applications.getVersion() == -1));
// 执行 全量获取
getAndStoreFullRegistry();
} else {
// 执行 增量获取
getAndUpdateDelta(applications);
}
// 设置 应用集合 hashcode
applications.setAppsHashCode(applications.getReconcileHashCode());
// 打印 本地缓存的注册的应用实例数量
logTotalInstances();
} catch(Throwable e) {
logger.error(PREFIX + appPathIdentifier + "- was unable to refresh its cache! status =" + e.getMessage(), e);
return false;
} finally {
if (tracer != null) {
tracer.stop();
}
}
// Notify about cache refresh before updating the instance remote status
onCacheRefreshed();
// Update remote status based on refreshed data held in the cache
updateInstanceRemoteStatus();
// registry was fetched successfully, so return true
return true;
}
第 5 至 8 行 : 获取本地缓存的注册的应用实例集合, 实现代码如下:
public Applications getApplications() {
return localRegionApps.get();
}
第 10 至 26 行 : 全量获取注册信息
第 11 行 : 配置 eureka.disableDelta = true , 禁用增量获取注册信息默认值: false
第 12 行 : 只获得一个 vipAddress 对应的应用实例们的注册信息
第 13 行 : 方法参数 forceFullRegistryFetch 强制全量获取注册信息
第 14 至 15 行 : 本地缓存为空
第 25 至 26 行 : 调用 #getAndStoreFullRegistry() 方法, 全量获取注册信息, 并设置到本地缓存下文详细解析
第 27 至 30 行 : 增量获取注册信息, 并刷新本地缓存, 在 Eureka 源码解析 应用实例注册发现 (七)之增量获取 详细解析
第 31 至 32 行 : 计算应用集合 hashcode 该变量用于校验增量获取的注册信息和 Eureka-Server 全量的注册信息是否一致 ( 完整 ), 在 Eureka 源码解析 应用实例注册发现 (七) 之增量获取 详细解析
第 33 至 34 行 : 打印调试日志, 输出本地缓存的注册的应用实例数量实现代码如下:
private void logTotalInstances() {
if (logger.isDebugEnabled()) {
int totInstances = 0;
for (Application application: getApplications().getRegisteredApplications()) {
totInstances += application.getInstancesAsIsFromEureka().size();
}
logger.debug("The total number of all instances in the client now is {}", totInstances);
}
}
第 44 至 45 行 : 触发 CacheRefreshedEvent 事件, 事件监听器执行目前 Eureka 未提供默认的该事件监听器
#onCacheRefreshed() 方法, 实现代码如下:
/**
* Eureka 事件监听器
*/
private final CopyOnWriteArraySet eventListeners = new CopyOnWriteArraySet < >();
protected void onCacheRefreshed() {
fireEvent(new CacheRefreshedEvent());
}
protected void fireEvent(final EurekaEvent event) {
for (EurekaEventListener listener: eventListeners) {
listener.onEvent(event);
}
}
x
笔者的 YY : 你可以实现自定义的事件监听器监听 CacheRefreshedEvent 事件, 以达到持久化最新的注册信息到存储器( 例如, 本地文件 ), 通过这样的方式, 配合实现 BackupRegistry 接口读取存储器 BackupRegistry 接口调用如下:
// [3.2.12] 从 Eureka-Server 拉取注册信息
if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {
fetchRegistryFromBackup();
}
第 47 至 48 行 : 更新本地缓存的当前应用实例在 Eureka-Server 的状态
private volatile InstanceInfo.InstanceStatus lastRemoteInstanceStatus = InstanceInfo.InstanceStatus.UNKNOWN;
private synchronized void updateInstanceRemoteStatus() {
// Determine this instance's status for this app and set to UNKNOWN if not found
InstanceInfo.InstanceStatus currentRemoteInstanceStatus = null;
if (instanceInfo.getAppName() != null) {
Application app = getApplication(instanceInfo.getAppName());
if (app != null) {
InstanceInfo remoteInstanceInfo = app.getByInstanceId(instanceInfo.getId());
if (remoteInstanceInfo != null) {
currentRemoteInstanceStatus = remoteInstanceInfo.getStatus();
}
}
}
if (currentRemoteInstanceStatus == null) {
currentRemoteInstanceStatus = InstanceInfo.InstanceStatus.UNKNOWN;
}
// Notify if status changed
if (lastRemoteInstanceStatus != currentRemoteInstanceStatus) {
onRemoteStatusChanged(lastRemoteInstanceStatus, currentRemoteInstanceStatus);
lastRemoteInstanceStatus = currentRemoteInstanceStatus;
}
}
第 4 至 14 行 : 从注册信息中获取当前应用在 Eureka-Server 的状态
第 19 至 23 行 : 对比本地缓存和最新的的当前应用实例在 Eureka-Server 的状态, 若不同, 更新本地缓存( 注意, 只更新该缓存变量, 不更新本地当前应用实例的状态( instanceInfo.status ) ), 触发 StatusChangeEvent 事件, 事件监听器执行目前 Eureka 未提供默认的该事件监听器 #onRemoteStatusChanged(...) 实现代码如下:
protected void onRemoteStatusChanged(InstanceInfo.InstanceStatus oldStatus, InstanceInfo.InstanceStatus newStatus) {
fireEvent(new StatusChangeEvent(oldStatus, newStatus));
}
Eureka-Client 本地应用实例与 Eureka-Server 的该应用实例状态不同的原因, 因为应用实例的覆盖状态, 在 Eureka 源码解析 应用实例注册发现 (八)之覆盖状态 有详细解析
2.4.1 全量获取注册信息, 并设置到本地缓存
调用 #getAndStoreFullRegistry() 方法, 全量获取注册信息, 并设置到本地缓存下实现代码如下:
private void getAndStoreFullRegistry() throws Throwable {
long currentUpdateGeneration = fetchRegistryGeneration.get();
logger.info("Getting all instance registry info from the eureka server");
// 全量获取注册信息
Applications apps = null;
EurekaHttpResponse httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get()) : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());
if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
apps = httpResponse.getEntity();
}
logger.info("The response status is {}", httpResponse.getStatusCode());
// 设置到本地缓存
if (apps == null) {
logger.error("The application is null for some reason. Not storing this information");
} else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
localRegionApps.set(this.filterAndShuffle(apps));
logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode());
} else {
logger.warn("Not updating applications as another thread is updating it already");
}
}
第 6 至 14 行 : 全量获取注册信息, 实现代码如下:
// AbstractJerseyEurekaHttpClient.java
@Override
public EurekaHttpResponse getApplications(String... regions) {
return getApplicationsInternal("apps/", regions);
}
private EurekaHttpResponse getApplicationsInternal(String urlPath, String[] regions) {
ClientResponse response = null;
String regionsParamValue = null;
try {
WebResource webResource = jerseyClient.resource(serviceUrl).path(urlPath);
if (regions != null && regions.length > 0) {
regionsParamValue = StringUtil.join(regions);
webResource = webResource.queryParam("regions", regionsParamValue);
}
Builder requestBuilder = webResource.getRequestBuilder();
addExtraHeaders(requestBuilder);
response = requestBuilder.accept(MediaType.APPLICATION_JSON_TYPE).get(ClientResponse.class); // JSON
Applications applications = null;
if (response.getStatus() == Status.OK.getStatusCode() && response.hasEntity()) {
applications = response.getEntity(Applications.class);
}
return anEurekaHttpResponse(response.getStatus(), Applications.class)
.headers(headersOf(response))
.entity(applications)
.build();
} finally {
if (logger.isDebugEnabled()) {
logger.debug("Jersey HTTP GET {}/{}?{}; statusCode={}",
serviceUrl, urlPath,
regionsParamValue == null ? "":"regions=" + regionsParamValue,
response == null ? "N/A" : response.getStatus()
);
}
if (response != null) {
response.close();
}
}
}
调用 AbstractJerseyEurekaHttpClient#getApplications(...) 方法, GET 请求 Eureka-Server 的 apps/ 接口, 参数为 regions , 返回格式为 JSON , 实现全量获取注册信息
第 16 至 24 行 : 设置到本地注册信息缓存
第 19 行 :TODO[0025] : 并发更新的情况???
第 20 行 : 调用 #filterAndShuffle(...) 方法, 根据配置 eureka.shouldFilterOnlyUpInstances = true ( 默认值 :true ) 过滤只保留状态为开启 ( UP ) 的应用实例, 并随机打乱应用实例顺序打乱后, 实现调用应用服务的随机性代码比较易懂, 点击链接查看方法实现
3. Eureka-Server 接收全量获取
3.1 接收全量获取请求
com.netflix.eureka.resources.ApplicationsResource, 处理所有应用的请求操作的 Resource ( Controller )
接收全量获取请求, 映射 ApplicationsResource#getContainers() 方法, 实现代码如下:
@GET
public Response getContainers(@PathParam("version") String version,
@HeaderParam(HEADER_ACCEPT) String acceptHeader,
@HeaderParam(HEADER_ACCEPT_ENCODING) String acceptEncoding,
@HeaderParam(EurekaAccept.HTTP_X_EUREKA_ACCEPT) String eurekaAccept,
@Context UriInfo uriInfo,
@Nullable @QueryParam("regions") String regionsStr) {
// TODO[0009]:RemoteRegionRegistry
boolean isRemoteRegionRequested = null != regionsStr && !regionsStr.isEmpty();
String[] regions = null;
if (!isRemoteRegionRequested) {
EurekaMonitors.GET_ALL.increment();
} else {
regions = regionsStr.toLowerCase().split(",");
Arrays.sort(regions); // So we don't have different caches for same regions queried in different order.
EurekaMonitors.GET_ALL_WITH_REMOTE_REGIONS.increment();
}
// 判断是否可以访问
// Check if the server allows the access to the registry. The server can
// restrict access if it is not
// ready to serve traffic depending on various reasons.
if (!registry.shouldAllowAccess(isRemoteRegionRequested)) {
return Response.status(Status.FORBIDDEN).build();
}
// API 版本
CurrentRequestVersion.set(Version.toEnum(version));
// 返回数据格式
KeyType keyType = Key.KeyType.JSON;
String returnMediaType = MediaType.APPLICATION_JSON;
if (acceptHeader == null || !acceptHeader.contains(HEADER_JSON_VALUE)) {
keyType = Key.KeyType.XML;
returnMediaType = MediaType.APPLICATION_XML;
}
// 响应缓存键 ( KEY )
Key cacheKey = new Key(Key.EntityType.Application,
ResponseCacheImpl.ALL_APPS,
keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions
);
//
Response response;
if (acceptEncoding != null && acceptEncoding.contains(HEADER_GZIP_VALUE)) {
response = Response.ok(responseCache.getGZIP(cacheKey))
.header(HEADER_CONTENT_ENCODING, HEADER_GZIP_VALUE)
.header(HEADER_CONTENT_TYPE, returnMediaType)
.build();
} else {
response = Response.ok(responseCache.get(cacheKey))
.build();
}
return response;
}
第 8 至 17 行 :TODO[0009]:RemoteRegionRegistry
第 19 至 25 行 :Eureka-Server 启动完成, 但是未处于就绪 ( Ready ) 状态, 不接受请求全量应用注册信息的请求, 例如, Eureka-Server 启动时, 未能从其他 Eureka-Server 集群的节点获取到应用注册信息
第 27 至 28 行 : 设置 API 版本号默认最新 API 版本为 V2 实现代码如下:
public enum Version {
V1,
V2;
public static Version toEnum(String v) {
for (Version version: Version.values()) {
if (version.name().equalsIgnoreCase(v)) {
return version;
}
}
//Defaults to v2
return V2;
}
}
第 30 至 36 行 : 设置返回数据格式, 默认 JSON
第 38 至 42 行 : 创建响应缓存( ResponseCache ) 的键( KEY ), 在 3.2.1 缓存键详细解析
第 44 至 55 行 : 从响应缓存读取全量注册信息, 在 3.3 缓存读取详细解析
3.2 响应缓存 ResponseCache
com.netflix.eureka.registry.ResponseCache, 响应缓存接口, 接口代码如下:
public interface ResponseCache {
String get(Key key);
byte[] getGZIP(Key key);
void invalidate(String appName, @Nullable String vipAddress, @Nullable String secureVipAddress);
AtomicLong getVersionDelta();
AtomicLong getVersionDeltaWithRegions();
}
其中,#getVersionDelta() 和 #getVersionDeltaWithRegions() 已经废弃这里保留的原因主要是考虑兼容性判断依据来自如下代码:
// Applications.java
@Deprecated
public void setVersion(Long version) {
this.versionDelta = version;
}
// AbstractInstanceRegistry.java
public Applications getApplicationDeltas() {
// ... 省略其它无关代码
apps.setVersion(responseCache.getVersionDelta().get()); // 唯一调用到 ResponseCache#getVersionDelta() 方法的地方
// ... 省略其它无关代码
}
#get() : 获得缓存
#getGZIP() : 获得缓存, 并 GZIP
#invalidate() : 过期缓存
3.2.1 缓存键
com.netflix.eureka.registry.Key, 缓存键实现代码如下:
public class Key {
public enum KeyType {
JSON,
XML
}
/**
* An enum to define the entity that is stored in this cache for this key.
*/
public enum EntityType {
Application,
VIP,
SVIP
}
/**
* 实体名
*/
private final String entityName;
/**
* TODO[0009]:RemoteRegionRegistry
*/
private final String[] regions;
/**
* 请求参数类型
*/
private final KeyType requestType;
/**
* 请求 API 版本号
*/
private final Version requestVersion;
/**
* hashKey
*/
private final String hashKey;
/**
* 实体类型
*
* {@link EntityType}
*/
private final EntityType entityType;
/**
* {@link EurekaAccept}
*/
private final EurekaAccept eurekaAccept;
public Key(EntityType entityType, String entityName, KeyType type, Version v, EurekaAccept eurekaAccept, @Nullable String[] regions) {
this.regions = regions;
this.entityType = entityType;
this.entityName = entityName;
this.requestType = type;
this.requestVersion = v;
this.eurekaAccept = eurekaAccept;
hashKey = this.entityType + this.entityName + (null != this.regions ? Arrays.toString(this.regions) : "") + requestType.name() + requestVersion.name() + this.eurekaAccept.name();
}
public Key(EntityType entityType, String entityName, KeyType type, Version v, EurekaAccept eurekaAccept, @Nullable String[] regions) {
this.regions = regions;
this.entityType = entityType;
this.entityName = entityName;
this.requestType = type;
this.requestVersion = v;
this.eurekaAccept = eurekaAccept;
hashKey = this.entityType + this.entityName + (null != this.regions ? Arrays.toString(this.regions) : "") + requestType.name() + requestVersion.name() + this.eurekaAccept.name();
}@Override public int hashCode() {
String hashKey = getHashKey();
return hashKey.hashCode();
}@Override public boolean equals(Object other) {
if (other instanceof Key) {
return getHashKey().equals(((Key) other).getHashKey());
} else {
return false;
}
}
}
3.2.2 响应缓存实现类
com.netflix.eureka.registry.ResponseCacheImpl, 响应缓存实现类
在 ResponseCacheImpl 里, 将缓存拆分成两层 :
只读缓存( readOnlyCacheMap )
固定过期 + 固定大小的读写缓存( readWriteCacheMap )
默认配置下, 缓存读取策略如下:
缓存过期策略如下:
应用实例注册下线过期时, 只只只过期 readWriteCacheMap
readWriteCacheMap 写入一段时间 ( 可配置 ) 后自动过期
定时任务对比 readWriteCacheMap 和 readOnlyCacheMap 的缓存值, 若不一致, 以前者为主通过这样的方式, 实现了 readOnlyCacheMap 的定时过期
注意: 应用实例注册下线过期时, 不会很快刷新到 readWriteCacheMap 缓存里默认配置下, 最大延迟在 30 秒
为什么可以使用缓存?
在 CAP 的选择上, Eureka 选择了 AP , 不同于 Zookeeper 选择了 CP
推荐阅读:
为什么不应该使用 ZooKeeper 做服务发现
Spring Cloud Netflix Eureka 源码导读与原理分析 4. 作为服务注册中心, Eureka 比 Zookeeper 好在哪里
3.3 缓存读取
调用 ResponseCacheImpl#get(...) 方法( #getGzip(...) 类似 ), 读取缓存, 实现代码如下:
private final ConcurrentMap readOnlyCacheMap = new ConcurrentHashMap();
private final LoadingCache readWriteCacheMap;
public String get(final Key key) {
return get(key, shouldUseReadOnlyResponseCache);
}
String get(final Key key, boolean useReadOnlyCache) {
Value payload = getValue(key, useReadOnlyCache);
if (payload == null || payload.getPayload().equals(EMPTY_PAYLOAD)) {
return null;
} else {
return payload.getPayload();
}
}
Value getValue(final Key key, boolean useReadOnlyCache) {
Value payload = null;
try {
if (useReadOnlyCache) {
final Value currentPayload = readOnlyCacheMap.get(key);
if (currentPayload != null) {
payload = currentPayload;
} else {
payload = readWriteCacheMap.get(key);
readOnlyCacheMap.put(key, payload);
}
} else {
payload = readWriteCacheMap.get(key);
}
} catch(Throwable t) {
logger.error("Cannot get value for key :" + key, t);
}
return payload;
}
第 5 至 7 行 : 调用 #get(key, useReadOnlyCache) 方法, 读取缓存其中 shouldUseReadOnlyResponseCache 通过配置 eureka.shouldUseReadOnlyResponseCache = true (默认值 :true ) 开启只读缓存如果你对数据的一致性有相对高的要求, 可以关闭这个开关, 当然因为少了 readOnlyCacheMap , 性能会有一定的下降
第 9 至 16 行 : 调用 getValue(key, useReadOnlyCache) 方法, 读取缓存从 readOnlyCacheMap 和 readWriteCacheMap 变量可以看到缓存值的类为 com.netflix.eureka.registry.ResponseCacheImpl.Value , 实现代码如下:
public class Value {
/**
* 原始值
*/
private final String payload;
/**
* GZIP 压缩后的值
*/
private byte[] gzipped;
public Value(String payload) {
this.payload = payload;
if (!EMPTY_PAYLOAD.equals(payload)) {
// ... 省略 GZIP 压缩代码
gzipped = bos.toByteArray();
} else {
gzipped = null;
}
}
public String getPayload() {
return payload;
}
public byte[] getGzipped() {
return gzipped;
}
}
第 21 至 31 行 : 读取缓存
第 21 至 28 行 : 先读取 readOnlyCacheMap 读取不到, 读取 readWriteCacheMap , 并设置到 readOnlyCacheMap
第 29 至 31 行 : 读取 readWriteCacheMap
readWriteCacheMap 实现代码如下:
this.readWriteCacheMap =
CacheBuilder.newBuilder().initialCapacity(1000)
.expireAfterWrite(serverConfig.getResponseCacheAutoExpirationInSeconds(), TimeUnit.SECONDS)
.removalListener(new RemovalListener() {
@Override
public void onRemoval(RemovalNotification notification) {
// TODO[0009]:RemoteRegionRegistry
Key removedKey = notification.getKey();
if (removedKey.hasRegions()) {
Key cloneWithNoRegions = removedKey.cloneWithoutRegions();
regionSpecificKeys.remove(cloneWithNoRegions, removedKey);
}
}
})
.build(new CacheLoader() {
@Override
public Value load(Key key) throws Exception {
// // TODO[0009]:RemoteRegionRegistry
if (key.hasRegions()) {
Key cloneWithNoRegions = key.cloneWithoutRegions();
regionSpecificKeys.put(cloneWithNoRegions, key);
}
Value value = generatePayload(key);
return value;
}
});
readWriteCacheMap 最大缓存数量为 1000
调用 #generatePayload(key) 方法, 生成缓存值
#generatePayload(key)方法,
实现代码如下: private Value generatePayload(Key key) {
Stopwatch tracer = null;
try {
String payload;
switch (key.getEntityType()) {
case Application:
boolean isRemoteRegionRequested = key.hasRegions();
if (ALL_APPS.equals(key.getName())) {
if (isRemoteRegionRequested) { // TODO[0009]:RemoteRegionRegistry
tracer = serializeAllAppsWithRemoteRegionTimer.start();
payload = getPayLoad(key, registry.getApplicationsFromMultipleRegions(key.getRegions()));
} else {
tracer = serializeAllAppsTimer.start();
payload = getPayLoad(key, registry.getApplications());
}
} else if (ALL_APPS_DELTA.equals(key.getName())) {
// ... 省略增量获取相关的代码
} else {
tracer = serializeOneApptimer.start();
payload = getPayLoad(key, registry.getApplication(key.getName()));
}
break;
// ... 省略部分代码
}
return new Value(payload);
} finally {
if (tracer != null) {
tracer.stop();
}
}
}
第 10 至 12 行 :TODO[0009]:RemoteRegionRegistry
第 13 至 16 行 : 调用 AbstractInstanceRegistry#getApplications() 方法, 获得注册的应用集合后调用 #getPayLoad() 方法, 将注册的应用集合转换成缓存值 这两个方法代码较多, 下面详细解析
第 17 至 18 行 : 获取增量注册信息的缓存值, 在 Eureka 源码解析 应用实例注册发现 (七)之增量获取 详细解析
3.3.1 获得注册的应用集合
调用 AbstractInstanceRegistry#getApplications() 方法, 获得注册的应用集合, 实现代码如下:
// AbstractInstanceRegistry.java
private static final String[] EMPTY_STR_ARRAY = new String[0];
public Applications getApplications() {
boolean disableTransparentFallback = serverConfig.disableTransparentFallbackToOtherRegion();
if (disableTransparentFallback) { // TODO[0009]:RemoteRegionRegistry
return getApplicationsFromLocalRegionOnly();
} else {
return getApplicationsFromAllRemoteRegions(); // Behavior of falling back to remote region can be disabled.
}
}
public Applications getApplicationsFromLocalRegionOnly() {
return getApplicationsFromMultipleRegions(EMPTY_STR_ARRAY);
}
第 6 至 8 行 :TODO[0009]:RemoteRegionRegistry
第 9 至 16 行 : 调用 #getApplicationsFromMultipleRegions(...) 方法, 获得注册的应用集合, 实现代码如下:
public Applications getApplicationsFromMultipleRegions(String[] remoteRegions) {
// TODO[0009]:RemoteRegionRegistry
boolean includeRemoteRegion = null != remoteRegions && remoteRegions.length != 0;
logger.debug("Fetching applications registry with remote regions: {}, Regions argument {}", includeRemoteRegion, Arrays.toString(remoteRegions));
if (includeRemoteRegion) {
GET_ALL_WITH_REMOTE_REGIONS_CACHE_MISS.increment();
} else {
GET_ALL_CACHE_MISS.increment();
}
// 获得获得注册的应用集合
Applications apps = new Applications();
apps.setVersion(1L);
for (Entry >> entry: registry.entrySet()) {
Application app = null;
if (entry.getValue() != null) {
for (Entry > stringLeaseEntry: entry.getValue().entrySet()) {
Lease lease = stringLeaseEntry.getValue();
if (app == null) {
app = new Application(lease.getHolder().getAppName());
}
app.addInstance(decorateInstanceInfo(lease));
}
}
if (app != null) {
apps.addApplication(app);
}
}
// TODO[0009]:RemoteRegionRegistry
if (includeRemoteRegion) {
for (String remoteRegion: remoteRegions) {
RemoteRegionRegistry remoteRegistry = regionNameVSRemoteRegistry.get(remoteRegion);
if (null != remoteRegistry) {
Applications remoteApps = remoteRegistry.getApplications();
for (Application application: remoteApps.getRegisteredApplications()) {
if (shouldFetchFromRemoteRegistry(application.getName(), remoteRegion)) {
logger.info("Application {} fetched from the remote region {}", application.getName(), remoteRegion);
Application appInstanceTillNow = apps.getRegisteredApplications(application.getName());
if (appInstanceTillNow == null) {
appInstanceTillNow = new Application(application.getName());
apps.addApplication(appInstanceTillNow);
}
for (InstanceInfo instanceInfo: application.getInstances()) {
appInstanceTillNow.addInstance(instanceInfo);
}
} else {
logger.debug("Application {} not fetched from the remote region {} as there exists a" + "whitelist and this app is not in the whitelist.", application.getName(), remoteRegion);
}
}
} else {
logger.warn("No remote registry available for the remote region {}", remoteRegion);
}
}
}
// 设置 应用集合 hashcode
apps.setAppsHashCode(apps.getReconcileHashCode());
return apps;
}
第 2 至 第 10 行 :TODO[0009]:RemoteRegionRegistry
第 11 至 29 行 : 获得获得注册的应用集合
第 30 至 59 行 :TODO[0009]:RemoteRegionRegistry
第 61 行 : 计算应用集合 hashcode 该变量用于校验增量获取的注册信息和 Eureka-Server 全量的注册信息是否一致 ( 完整 ), 在 Eureka 源码解析 应用实例注册发现 (七) 之增量获取 详细解析
3.3.2 转换成缓存值
调用 #getPayLoad() 方法, 将注册的应用集合转换成缓存值, 实现代码如下:
/**
* Generate pay load with both JSON and XML formats for all applications.
*/
private String getPayLoad(Key key, Applications apps) {
// 获得编码器
EncoderWrapper encoderWrapper = serverCodecs.getEncoder(key.getType(), key.getEurekaAccept());
String result;
try {
// 编码
result = encoderWrapper.encode(apps);
} catch(Exception e) {
logger.error("Failed to encode the payload for all apps", e);
return "";
}
if (logger.isDebugEnabled()) {
logger.debug("New application cache entry {} with apps hashcode {}", key.toStringCompact(), apps.getAppsHashCode());
}
return result;
}
3.4 主动过期读写缓存
应用实例注册下线过期时, 调用 ResponseCacheImpl#invalidate() 方法, 主动过期读写缓存( readWriteCacheMap ), 实现代码如下:
public void invalidate(String appName, @Nullable String vipAddress, @Nullable String secureVipAddress) {
for (Key.KeyType type : Key.KeyType.values()) {
for (Version v : Version.values()) {
invalidate(
new Key(Key.EntityType.Application, appName, type, v, EurekaAccept.full),
new Key(Key.EntityType.Application, appName, type, v, EurekaAccept.compact),
new Key(Key.EntityType.Application, ALL_APPS, type, v, EurekaAccept.full),
new Key(Key.EntityType.Application, ALL_APPS, type, v, EurekaAccept.compact),
new Key(Key.EntityType.Application, ALL_APPS_DELTA, type, v, EurekaAccept.full),
new Key(Key.EntityType.Application, ALL_APPS_DELTA, type, v, EurekaAccept.compact)
);
if (null != vipAddress) {
invalidate(new Key(Key.EntityType.VIP, vipAddress, type, v, EurekaAccept.full));
}
if (null != secureVipAddress) {
invalidate(new Key(Key.EntityType.SVIP, secureVipAddress, type, v, EurekaAccept.full));
}
}
}
}
调用 #invalidate(keys) 方法, 逐个过期每个缓存键值, 实现代码如下:
public void invalidate(Key...keys) {
for (Key key: keys) {
logger.debug("Invalidating the response cache key : {} {} {} {}, {}", key.getEntityType(), key.getName(), key.getVersion(), key.getType(), key.getEurekaAccept());
// 过期读写缓存
readWriteCacheMap.invalidate(key);
// TODO[0009]:RemoteRegionRegistry
Collection keysWithRegions = regionSpecificKeys.get(key);
if (null != keysWithRegions && !keysWithRegions.isEmpty()) {
for (Key keysWithRegion: keysWithRegions) {
logger.debug("Invalidating the response cache key : {} {} {} {} {}", key.getEntityType(), key.getName(), key.getVersion(), key.getType(), key.getEurekaAccept());
readWriteCacheMap.invalidate(keysWithRegion);
}
}
}
}
3.5 被动过期读写缓存
读写缓存( readWriteCacheMap ) 写入后, 一段时间自动过期, 实现代码如下:
expireAfterWrite(serverConfig.getResponseCacheAutoExpirationInSeconds())
配置 eureka.responseCacheAutoExpirationInSeconds , 设置写入过期时长默认值 :180 秒
3.6 定时刷新只读缓存
定时任务对比 readWriteCacheMap 和 readOnlyCacheMap 的缓存值, 若不一致, 以前者为主通过这样的方式, 实现了 readOnlyCacheMap 的定时过期实现代码如下:
ResponseCacheImpl(EurekaServerConfig serverConfig, ServerCodecs serverCodecs, AbstractInstanceRegistry registry) {
// ... 省略无关代码
long responseCacheUpdateIntervalMs = serverConfig.getResponseCacheUpdateIntervalMs();
// ... 省略无关代码
if (shouldUseReadOnlyResponseCache) {
timer.schedule(getCacheUpdateTask(), new Date(((System.currentTimeMillis() / responseCacheUpdateIntervalMs) * responseCacheUpdateIntervalMs) + responseCacheUpdateIntervalMs), responseCacheUpdateIntervalMs);
}
// ... 省略无关代码
}
private TimerTask getCacheUpdateTask() {
return new TimerTask() {@Override public void run() {
logger.debug("Updating the client cache from response cache");
for (Key key: readOnlyCacheMap.keySet()) { // 循环 readOnlyCacheMap 的缓存键
if (logger.isDebugEnabled()) {
Object[] args = {
key.getEntityType(),
key.getName(),
key.getVersion(),
key.getType()
};
logger.debug("Updating the client cache from response cache for key : {} {} {} {}", args);
}
try {
CurrentRequestVersion.set(key.getVersion());
Value cacheValue = readWriteCacheMap.get(key);
Value currentCacheValue = readOnlyCacheMap.get(key);
if (cacheValue != currentCacheValue) { // 不一致时,进行替换
readOnlyCacheMap.put(key, cacheValue);
}
} catch(Throwable th) {
logger.error("Error while updating the client cache from response cache for key {}", key.toStringCompact(), th);
}
}
}
};
}
第 7 至 12 行 : 初始化定时任务配置 eureka.responseCacheUpdateIntervalMs, 设置任务执行频率, 默认值 :30 * 1000 毫秒
第 17 至 39 行 : 创建定时任务
第 22 行 : 循环 readOnlyCacheMap 的缓存键为什么不循环 readWriteCacheMap 呢? readOnlyCacheMap 的缓存过期依赖 readWriteCacheMap, 因此缓存键会更多
第 28 行 至 33 行 : 对比 readWriteCacheMap 和 readOnlyCacheMap 的缓存值, 若不一致, 以前者为主通过这样的方式, 实现了 readOnlyCacheMap 的定时过期
来源: http://t.cn/R8x4xRY