本文主要基于 Eureka 1.8.X 版本
1. 概述
2. Eureka-Client 发起全量获取
2.1 初始化全量获取
2.2 定时获取
2.3 刷新注册信息缓存
2.4 发起获取注册信息
3. Eureka-Server 接收全量获取
3.1 接收全量获取请求
3.2 响应缓存 ResponseCache
3.3 缓存读取
3.4 主动过期读写缓存
3.5 被动过期读写缓存
3.6 定时刷新只读缓存
666. 彩蛋
RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表
RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
您对于源码的疑问每条留言都将得到认真回复. 甚至不知道如何读源码也可以请教噢.
新的源码解析文章实时收到通知. 每周更新一篇左右.
认真的源码交流微信群.
1. 概述
本文主要分享 Eureka-Client 向 Eureka-Server 获取全量注册信息的过程.
FROM 《深度剖析服务发现组件 Netflix Eureka》
Eureka-Client 获取注册信息, 分成全量获取和增量获取. 默认配置下, Eureka-Client 启动时, 首先执行一次全量获取进行本地缓存注册信息, 而后每 30 秒增量获取刷新本地缓存( 非 "正常" 情况下会是全量获取 ).
本文重点在于全量获取.
推荐 Spring Cloud 书籍:
请支持正版. 下载盗版, 等于主动编写低级 BUG .
程序猿 DD -- 《Spring Cloud 微服务实战》 https://union-click.jd.com/jdc?d=505Twi
周立 -- 《Spring Cloud 与 Docker 微服务架构实战》 https://union-click.jd.com/jdc?d=k3sAaK
两书齐买, 京东包邮.
2. Eureka-Client 发起全量获取
本小节调用关系如下:
2.1 初始化全量获取
Eureka-Client 启动时, 首先执行一次全量获取进行本地缓存注册信息, 首先代码如下:
- // DiscoveryClient.java
- /**
- * Applications 在本地的缓存
- */
- private final AtomicReference<Applications> localRegionApps = new AtomicReference<Applications>();
- DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
- Provider<BackupRegistry> backupRegistryProvider) {
- // ... 省略无关代码
- // [3.2.5] 初始化应用集合在本地的缓存
- localRegionApps.set(new Applications());
- // ... 省略无关代码
- // [3.2.12] 从 Eureka-Server 拉取注册信息
- if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {
- fetchRegistryFromBackup();
- }
- // ... 省略无关代码
- }
com.netflix.discovery.shared.Applications, 注册的应用集合. 较为容易理解, 点击 链接 链接查看带中文注释的类, 这里就不啰嗦了. Applications 与 InstanceInfo 类关系如下:
配置 eureka.shouldFetchRegistry = true, 开启从 Eureka-Server 获取注册信息. 默认值: true .
调用
#fetchRegistry(false)
方法, 从 Eureka-Server 全量获取注册信息, 在 「2.4 发起获取注册信息」 详细解析.
2.2 定时获取
Eureka-Client 在初始化过程中, 创建获取注册信息线程, 固定间隔向 Eureka-Server 发起获取注册信息( fetch ), 刷新本地注册信息缓存. 实现代码如下:
- // DiscoveryClient.java
- DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
- Provider<BackupRegistry> backupRegistryProvider) {
- // ... 省略无关代码
- // [3.2.9] 初始化线程池
- // default size of 2 - 1 each for heartbeat and cacheRefresh
- scheduler = Executors.newScheduledThreadPool(2,
- new ThreadFactoryBuilder()
- .setNameFormat("DiscoveryClient-%d")
- .setDaemon(true)
- .build());
- cacheRefreshExecutor = new ThreadPoolExecutor(
- 1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
- new SynchronousQueue<Runnable>(),
- new ThreadFactoryBuilder()
- .setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
- .setDaemon(true)
- .build()
- ); // use direct handoff
- // ... 省略无关代码
- // [3.2.14] 初始化定时任务
- initScheduledTasks();
- // ... 省略无关代码
- }
- private void initScheduledTasks() {
- // 向 Eureka-Server 心跳 (续租) 执行器
- if (clientConfig.shouldFetchRegistry()) {
- // registry cache refresh timer
- int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
- int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
- scheduler.schedule(
- new TimedSupervisorTask(
- "cacheRefresh",
- scheduler,
- cacheRefreshExecutor,
- registryFetchIntervalSeconds,
- TimeUnit.SECONDS,
- expBackOffBound,
- new CacheRefreshThread()
- ),
- registryFetchIntervalSeconds, TimeUnit.SECONDS);
- }
- // ... 省略无关代码
- }
初始化定时任务代码, 和续租的定时任务代码类似, 在 《Eureka 源码解析 -- 应用实例注册发现 (二) 之续租
》 有详细解析, 这里不重复分享.
com.netflix.discovery.DiscoveryClient.CacheRefreshThread, 注册信息缓存刷新任务, 实现代码如下:
- class CacheRefreshThread implements Runnable {
- public void run() {
- refreshRegistry();
- }
- }
调用
#refreshRegistry(false)
方法, 刷新注册信息缓存, 在 「2.3 刷新注册信息缓存」 详细解析.
2.3 刷新注册信息缓存
调用 #refreshRegistry(false) 方法, 刷新注册信息缓存, 实现代码如下:
- // DiscoveryClient.java
- void refreshRegistry() {
- try {
- // TODO 芋艿: TODO[0009]:RemoteRegionRegistry
- boolean isFetchingRemoteRegionRegistries = isFetchingRemoteRegionRegistries();
- boolean remoteRegionsModified = false;
- // This makes sure that a dynamic change to remote regions to fetch is honored.
- String latestRemoteRegions = clientConfig.fetchRegistryForRemoteRegions();
- if (null != latestRemoteRegions) {
- String currentRemoteRegions = remoteRegionsToFetch.get();
- if (!latestRemoteRegions.equals(currentRemoteRegions)) {
- // Both remoteRegionsToFetch and AzToRegionMapper.regionsToFetch need to be in sync
- synchronized (instanceRegionChecker.getAzToRegionMapper()) {
- if (remoteRegionsToFetch.compareAndSet(currentRemoteRegions, latestRemoteRegions)) {
- String[] remoteRegions = latestRemoteRegions.split(",");
- remoteRegionsRef.set(remoteRegions);
- instanceRegionChecker.getAzToRegionMapper().setRegionsToFetch(remoteRegions);
- remoteRegionsModified = true;
- } else {
- logger.info("Remote regions to fetch modified concurrently," +
- "ignoring change from {} to {}", currentRemoteRegions, latestRemoteRegions);
- }
- }
- } else {
- // Just refresh mapping to reflect any DNS/Property change
- instanceRegionChecker.getAzToRegionMapper().refreshMapping();
- }
- }
- boolean success = fetchRegistry(remoteRegionsModified);
- if (success) {
- // 设置 注册信息的应用实例数
- registrySize = localRegionApps.get().size();
- // 设置 最后获取注册信息时间
- lastSuccessfulRegistryFetchTimestamp = System.currentTimeMillis();
- }
- // 打印日志
- if (logger.isDebugEnabled()) {
- StringBuilder allAppsHashCodes = new StringBuilder();
- allAppsHashCodes.append("Local region apps hashcode:");
- allAppsHashCodes.append(localRegionApps.get().getAppsHashCode());
- allAppsHashCodes.append(", is fetching remote regions?");
- allAppsHashCodes.append(isFetchingRemoteRegionRegistries);
- for (Map.Entry<String, Applications> entry : remoteRegionVsApps.entrySet()) {
- allAppsHashCodes.append(", Remote region:");
- allAppsHashCodes.append(entry.getKey());
- allAppsHashCodes.append(", apps hashcode:");
- allAppsHashCodes.append(entry.getValue().getAppsHashCode());
- }
- logger.debug("Completed cache refresh task for discovery. All Apps hash code is {}",
- allAppsHashCodes.toString());
- }
- } catch (Throwable e) {
- logger.error("Cannot fetch registry from server", e);
- }
- }
第 3 至 28 行 :TODO[0009]:RemoteRegionRegistry
第 30 行 : 调用
#fetchRegistry(false)
方法, 从 Eureka-Server 获取注册信息, 在 「2.4 发起获取注册信息」 详细解析.
第 31 至 36 行 : 获取注册信息成功, 设置注册信息的应用实例数, 最后获取注册信息时间. 变量代码如下:
- /**
- * 注册信息的应用实例数
- */
- private volatile int registrySize = 0;
- /**
- * 最后成功从 Eureka-Server 拉取注册信息时间戳
- */
- private volatile long lastSuccessfulRegistryFetchTimestamp = -1;
第 38 至 53 行 : 打印调试日志.
第 54 至 56 行 : 打印异常日志.
2.4 发起获取注册信息
调用 #fetchRegistry(false) 方法, 从 Eureka-Server 获取注册信息( 根据条件判断, 可能是全量, 也可能是增量 ), 实现代码如下:
- private boolean fetchRegistry(boolean forceFullRegistryFetch) {
- Stopwatch tracer = FETCH_REGISTRY_TIMER.start();
- try {
- // 获取 本地缓存的注册的应用实例集合
- // If the delta is disabled or if it is the first time, get all
- // applications
- Applications applications = getApplications();
- // 全量获取
- if (clientConfig.shouldDisableDelta() // 禁用增量获取
- || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
- || forceFullRegistryFetch
- || (applications == null) // 空
- || (applications.getRegisteredApplications().size() == 0) // 空
- || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
- {
- logger.info("Disable delta property : {}", clientConfig.shouldDisableDelta());
- logger.info("Single vip registry refresh property : {}", clientConfig.getRegistryRefreshSingleVipAddress());
- logger.info("Force full registry fetch : {}", forceFullRegistryFetch);
- logger.info("Application is null : {}", (applications == null));
- logger.info("Registered Applications size is zero : {}",
- (applications.getRegisteredApplications().size() == 0));
- logger.info("Application version is -1: {}", (applications.getVersion() == -1));
- // 执行 全量获取
- getAndStoreFullRegistry();
- } else {
- // 执行 增量获取
- getAndUpdateDelta(applications);
- }
- // 设置 应用集合 hashcode
- applications.setAppsHashCode(applications.getReconcileHashCode());
- // 打印 本地缓存的注册的应用实例数量
- logTotalInstances();
- } catch (Throwable e) {
- logger.error(PREFIX + appPathIdentifier + "- was unable to refresh its cache! status =" + e.getMessage(), e);
- return false;
- } finally {
- if (tracer != null) {
- tracer.stop();
- }
- }
- // Notify about cache refresh before updating the instance remote status
- onCacheRefreshed();
- // Update remote status based on refreshed data held in the cache
- updateInstanceRemoteStatus();
- // registry was fetched successfully, so return true
- return true;
- }
第 5 至 8 行 : 获取本地缓存的注册的应用实例集合, 实现代码如下:
- public Applications getApplications() {
- return localRegionApps.get();
- }
第 10 至 26 行 : 全量获取注册信息.
第 11 行 : 配置
eureka.disableDelta = true
, 禁用增量获取注册信息. 默认值: false .
第 12 行 : 只获得一个 vipAddress 对应的应用实例们的注册信息.
第 13 行 : 方法参数
forceFullRegistryFetch
强制全量获取注册信息.
第 14 至 15 行 : 本地缓存为空.
第 25 至 26 行 : 调用
#getAndStoreFullRegistry()
方法, 全量获取注册信息, 并设置到本地缓存. 下文详细解析.
第 27 至 30 行 : 增量获取注册信息, 并刷新本地缓存, 在 《Eureka 源码解析 -- 应用实例注册发现 (七)之增量获取》 详细解析.
第 31 至 32 行 : 计算应用集合 hashcode . 该变量用于校验增量获取的注册信息和 Eureka-Server 全量的注册信息是否一致 ( 完整 ), 在 《Eureka 源码解析 -- 应用实例注册发现 (七) 之增量获取》 详细解析.
第 33 至 34 行 : 打印调试日志, 输出本地缓存的注册的应用实例数量. 实现代码如下:
- private void logTotalInstances() {
- if (logger.isDebugEnabled()) {
- int totInstances = 0;
- for (Application application : getApplications().getRegisteredApplications()) {
- totInstances += application.getInstancesAsIsFromEureka().size();
- }
- logger.debug("The total number of all instances in the client now is {}", totInstances);
- }
- }
第 44 至 45 行 : 触发 CacheRefreshedEvent 事件, 事件监听器执行. 目前 Eureka 未提供默认的该事件监听器.
- #onCacheRefreshed() 方法, 实现代码如下:
- /**
- * Eureka 事件监听器
- */
- private final CopyOnWriteArraySet<EurekaEventListener> eventListeners = new CopyOnWriteArraySet<>();
- protected void onCacheRefreshed() {
- fireEvent(new CacheRefreshedEvent());
- }
- protected void fireEvent(final EurekaEvent event) {
- for (EurekaEventListener listener : eventListeners) {
- listener.onEvent(event);
- }
- }
- x
笔者的 YY : 你可以实现自定义的事件监听器监听 CacheRefreshedEvent 事件, 以达到持久化最新的注册信息到存储器( 例如, 本地文件 ), 通过这样的方式, 配合实现 BackupRegistry 接口读取存储器. BackupRegistry 接口调用如下:
- // [3.2.12] 从 Eureka-Server 拉取注册信息
- if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {
- fetchRegistryFromBackup();
- }
第 47 至 48 行 : 更新本地缓存的当前应用实例在 Eureka-Server 的状态.
- private volatile InstanceInfo.InstanceStatus lastRemoteInstanceStatus = InstanceInfo.InstanceStatus.UNKNOWN;
- private synchronized void updateInstanceRemoteStatus() {
- // Determine this instance's status for this App and set to UNKNOWN if not found
- InstanceInfo.InstanceStatus currentRemoteInstanceStatus = null;
- if (instanceInfo.getAppName() != null) {
- Application App = getApplication(instanceInfo.getAppName());
- if (App != null) {
- InstanceInfo remoteInstanceInfo = App.getByInstanceId(instanceInfo.getId());
- if (remoteInstanceInfo != null) {
- currentRemoteInstanceStatus = remoteInstanceInfo.getStatus();
- }
- }
- }
- if (currentRemoteInstanceStatus == null) {
- currentRemoteInstanceStatus = InstanceInfo.InstanceStatus.UNKNOWN;
- }
- // Notify if status changed
- if (lastRemoteInstanceStatus != currentRemoteInstanceStatus) {
- onRemoteStatusChanged(lastRemoteInstanceStatus, currentRemoteInstanceStatus);
- lastRemoteInstanceStatus = currentRemoteInstanceStatus;
- }
- }
第 4 至 14 行 : 从注册信息中获取当前应用在 Eureka-Server 的状态.
第 19 至 23 行 : 对比本地缓存和最新的的当前应用实例在 Eureka-Server 的状态, 若不同, 更新本地缓存( 注意, 只更新该缓存变量, 不更新本地当前应用实例的状态( instanceInfo.status ) ), 触发 StatusChangeEvent 事件, 事件监听器执行. 目前 Eureka 未提供默认的该事件监听器.#onRemoteStatusChanged(...) 实现代码如下:
- protected void onRemoteStatusChanged(InstanceInfo.InstanceStatus oldStatus, InstanceInfo.InstanceStatus newStatus) {
- fireEvent(new StatusChangeEvent(oldStatus, newStatus));
- }
Eureka-Client 本地应用实例与 Eureka-Server 的该应用实例状态不同的原因, 因为应用实例的覆盖状态, 在 《Eureka 源码解析 -- 应用实例注册发现 (八)之覆盖状态》 有详细解析.
2.4.1 全量获取注册信息, 并设置到本地缓存
调用 #getAndStoreFullRegistry() 方法, 全量获取注册信息, 并设置到本地缓存. 下实现代码如下:
- private void getAndStoreFullRegistry() throws Throwable {
- long currentUpdateGeneration = fetchRegistryGeneration.get();
- logger.info("Getting all instance registry info from the eureka server");
- // 全量获取注册信息
- Applications apps = null;
- EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null
- ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
- : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());
- if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
- apps = httpResponse.getEntity();
- }
- logger.info("The response status is {}", httpResponse.getStatusCode());
- // 设置到本地缓存
- if (apps == null) {
- logger.error("The application is null for some reason. Not storing this information");
- } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
- localRegionApps.set(this.filterAndShuffle(apps));
- logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode());
- } else {
- logger.warn("Not updating applications as another thread is updating it already");
- }
- }
第 6 至 14 行 : 全量获取注册信息, 实现代码如下:
- // AbstractJerseyEurekaHttpClient.java
- @Override
- public EurekaHttpResponse<Applications> getApplications(String... regions) {
- return getApplicationsInternal("apps/", regions);
- }
- private EurekaHttpResponse<Applications> getApplicationsInternal(String urlPath, String[] regions) {
- ClientResponse response = null;
- String regionsParamValue = null;
- try {
- WebResource webResource = jerseyClient.resource(serviceUrl).path(urlPath);
- if (regions != null && regions.length> 0) {
- regionsParamValue = StringUtil.join(regions);
- webResource = webResource.queryParam("regions", regionsParamValue);
- }
- Builder requestBuilder = webResource.getRequestBuilder();
- addExtraHeaders(requestBuilder);
- response = requestBuilder.accept(MediaType.APPLICATION_JSON_TYPE).get(ClientResponse.class); // JSON
- Applications applications = null;
- if (response.getStatus() == Status.OK.getStatusCode() && response.hasEntity()) {
- applications = response.getEntity(Applications.class);
- }
- return anEurekaHttpResponse(response.getStatus(), Applications.class)
- .headers(headersOf(response))
- .entity(applications)
- .build();
- } finally {
- if (logger.isDebugEnabled()) {
- logger.debug("Jersey HTTP GET {}/{}?{}; statusCode={}",
- serviceUrl, urlPath,
- regionsParamValue == null ? "":"regions=" + regionsParamValue,
- response == null ? "N/A" : response.getStatus()
- );
- }
- if (response != null) {
- response.close();
- }
- }
- }
调用
AbstractJerseyEurekaHttpClient#getApplications(...)
方法, GET 请求 Eureka-Server 的 apps/ 接口, 参数为 regions , 返回格式为 JSON , 实现全量获取注册信息.
第 16 至 24 行 : 设置到本地注册信息缓存.
第 19 行 :TODO[0025] : 并发更新的情况???
第 20 行 : 调用
#filterAndShuffle(...)
方法, 根据配置
eureka.shouldFilterOnlyUpInstances = true
( 默认值 :true ) 过滤只保留状态为开启 ( UP ) 的应用实例, 并随机打乱应用实例顺序. 打乱后, 实现调用应用服务的随机性. 代码比较易懂, 点击链接查看方法实现.
3. Eureka-Server 接收全量获取
3.1 接收全量获取请求
com.netflix.eureka.resources.ApplicationsResource, 处理所有应用的请求操作的 Resource ( Controller ).
接收全量获取请求, 映射 ApplicationsResource#getContainers() 方法, 实现代码如下:
- @GET
- public Response getContainers(@PathParam("version") String version,
- @HeaderParam(HEADER_ACCEPT) String acceptHeader,
- @HeaderParam(HEADER_ACCEPT_ENCODING) String acceptEncoding,
- @HeaderParam(EurekaAccept.HTTP_X_EUREKA_ACCEPT) String eurekaAccept,
- @Context UriInfo uriInfo,
- @Nullable @QueryParam("regions") String regionsStr) {
- // TODO[0009]:RemoteRegionRegistry
- boolean isRemoteRegionRequested = null != regionsStr && !regionsStr.isEmpty();
- String[] regions = null;
- if (!isRemoteRegionRequested) {
- EurekaMonitors.GET_ALL.increment();
- } else {
- regions = regionsStr.toLowerCase().split(",");
- Arrays.sort(regions); // So we don't have different caches for same regions queried in different order.
- EurekaMonitors.GET_ALL_WITH_REMOTE_REGIONS.increment();
- }
- // 判断是否可以访问
- // Check if the server allows the access to the registry. The server can
- // restrict access if it is not
- // ready to serve traffic depending on various reasons.
- if (!registry.shouldAllowAccess(isRemoteRegionRequested)) {
- return Response.status(Status.FORBIDDEN).build();
- }
- // API 版本
- CurrentRequestVersion.set(Version.toEnum(version));
- // 返回数据格式
- KeyType keyType = Key.KeyType.JSON;
- String returnMediaType = MediaType.APPLICATION_JSON;
- if (acceptHeader == null || !acceptHeader.contains(HEADER_JSON_VALUE)) {
- keyType = Key.KeyType.xml;
- returnMediaType = MediaType.APPLICATION_XML;
- }
- // 响应缓存键( KEY )
- Key cacheKey = new Key(Key.EntityType.Application,
- ResponseCacheImpl.ALL_APPS,
- keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions
- );
- //
- Response response;
- if (acceptEncoding != null && acceptEncoding.contains(HEADER_GZIP_VALUE)) {
- response = Response.ok(responseCache.getGZIP(cacheKey))
- .header(HEADER_CONTENT_ENCODING, HEADER_GZIP_VALUE)
- .header(HEADER_CONTENT_TYPE, returnMediaType)
- .build();
- } else {
- response = Response.ok(responseCache.get(cacheKey))
- .build();
- }
- return response;
- }
第 8 至 17 行 :TODO[0009]:RemoteRegionRegistry
第 19 至 25 行 :Eureka-Server 启动完成, 但是未处于就绪 ( Ready ) 状态, 不接受请求全量应用注册信息的请求, 例如, Eureka-Server 启动时, 未能从其他 Eureka-Server 集群的节点获取到应用注册信息.
第 27 至 28 行 : 设置 API 版本号. 默认最新 API 版本为 V2. 实现代码如下:
- public enum Version {
- V1, V2;
- public static Version toEnum(String v) {
- for (Version version : Version.values()) {
- if (version.name().equalsIgnoreCase(v)) {
- return version;
- }
- }
- //Defaults to v2
- return V2;
- }
- }
第 30 至 36 行 : 设置返回数据格式, 默认 JSON .
第 38 至 42 行 : 创建响应缓存( ResponseCache ) 的键( KEY ), 在 「3.2.1 缓存键」详细解析.
第 44 至 55 行 : 从响应缓存读取全量注册信息, 在 「3.3 缓存读取」详细解析.
3.2 响应缓存 ResponseCache
com.netflix.eureka.registry.ResponseCache, 响应缓存接口, 接口代码如下:
- public interface ResponseCache {
- String get(Key key);
- byte[] getGZIP(Key key);
- void invalidate(String appName, @Nullable String vipAddress, @Nullable String secureVipAddress);
- AtomicLong getVersionDelta();
- AtomicLong getVersionDeltaWithRegions();
- }
其中,#getVersionDelta() 和 #getVersionDeltaWithRegions() 已经废弃. 这里保留的原因主要是考虑兼容性. 判断依据来自如下代码:
- // Applications.java
- @Deprecated
- public void setVersion(Long version) {
- this.versionDelta = version;
- }
- // AbstractInstanceRegistry.java
- public Applications getApplicationDeltas() {
- // ... 省略其它无关代码
- apps.setVersion(responseCache.getVersionDelta().get()); // 唯一调用到 ResponseCache#getVersionDelta() 方法的地方
- // ... 省略其它无关代码
- }
- #get() : 获得缓存.
- #getGZIP() : 获得缓存, 并 GZIP .
- #invalidate() : 过期缓存.
3.2.1 缓存键
com.netflix.eureka.registry.Key, 缓存键. 实现代码如下:
- public class Key {
- public enum KeyType {
- JSON, xml
- }
- /**
- * An enum to define the entity that is stored in this cache for this key.
- */
- public enum EntityType {
- Application, VIP, SVIP
- }
- /**
- * 实体名
- */
- private final String entityName;
- /**
- * TODO[0009]:RemoteRegionRegistry
- */
- private final String[] regions;
- /**
- * 请求参数类型
- */
- private final KeyType requestType;
- /**
- * 请求 API 版本号
- */
- private final Version requestVersion;
- /**
- * hashKey
- */
- private final String hashKey;
- /**
- * 实体类型
- *
- * {@link EntityType}
- */
- private final EntityType entityType;
- /**
- * {@link EurekaAccept}
- */
- private final EurekaAccept eurekaAccept;
- public Key(EntityType entityType, String entityName, KeyType type, Version v, EurekaAccept eurekaAccept, @Nullable String[] regions) {
- this.regions = regions;
- this.entityType = entityType;
- this.entityName = entityName;
- this.requestType = type;
- this.requestVersion = v;
- this.eurekaAccept = eurekaAccept;
- hashKey = this.entityType + this.entityName + (null != this.regions ? Arrays.toString(this.regions) : "")
- + requestType.name() + requestVersion.name() + this.eurekaAccept.name();
- }
- public Key(EntityType entityType, String entityName, KeyType type, Version v, EurekaAccept eurekaAccept, @Nullable String[] regions) {
- this.regions = regions;
- this.entityType = entityType;
- this.entityName = entityName;
- this.requestType = type;
- this.requestVersion = v;
- this.eurekaAccept = eurekaAccept;
- hashKey = this.entityType + this.entityName + (null != this.regions ? Arrays.toString(this.regions) : "")
- + requestType.name() + requestVersion.name() + this.eurekaAccept.name();
- }
- @Override
- public int hashCode() {
- String hashKey = getHashKey();
- return hashKey.hashCode();
- }
- @Override
- public boolean equals(Object other) {
- if (other instanceof Key) {
- return getHashKey().equals(((Key) other).getHashKey());
- } else {
- return false;
- }
- }
- }
3.2.2 响应缓存实现类
com.netflix.eureka.registry.ResponseCacheImpl, 响应缓存实现类.
在 ResponseCacheImpl 里, 将缓存拆分成两层 :
只读缓存( readOnlyCacheMap )
固定过期 + 固定大小的读写缓存( readWriteCacheMap )
默认配置下, 缓存读取策略如下:
缓存过期策略如下:
应用实例注册, 下线, 过期时, 只只只过期 readWriteCacheMap .
readWriteCacheMap 写入一段时间 ( 可配置 ) 后自动过期.
定时任务对比 readWriteCacheMap 和 readOnlyCacheMap 的缓存值, 若不一致, 以前者为主. 通过这样的方式, 实现了 readOnlyCacheMap 的定时过期.
注意: 应用实例注册, 下线, 过期时, 不会很快刷新到 readWriteCacheMap 缓存里. 默认配置下, 最大延迟在 30 秒.
为什么可以使用缓存?
在 的选择上, Eureka 选择了 AP , 不同于 Zookeeper 选择了 CP .
- private final ConcurrentMap<Key, Value> readOnlyCacheMap = new ConcurrentHashMap<Key, Value>();
- private final LoadingCache<Key, Value> readWriteCacheMap;
- public String get(final Key key) {
- return get(key, shouldUseReadOnlyResponseCache);
- }
- String get(final Key key, boolean useReadOnlyCache) {
- Value payload = getValue(key, useReadOnlyCache);
- if (payload == null || payload.getPayload().equals(EMPTY_PAYLOAD)) {
- return null;
- } else {
- return payload.getPayload();
- }
- }
- Value getValue(final Key key, boolean useReadOnlyCache) {
- Value payload = null;
- try {
- if (useReadOnlyCache) {
- final Value currentPayload = readOnlyCacheMap.get(key);
- if (currentPayload != null) {
- payload = currentPayload;
- } else {
- payload = readWriteCacheMap.get(key);
- readOnlyCacheMap.put(key, payload);
- }
- } else {
- payload = readWriteCacheMap.get(key);
- }
- } catch (Throwable t) {
- logger.error("Cannot get value for key :" + key, t);
- }
- return payload;
- }
- public class Value {
- /**
- * 原始值
- */
- private final String payload;
- /**
- * GZIP 压缩后的值
- */
- private byte[] gzipped;
- public Value(String payload) {
- this.payload = payload;
- if (!EMPTY_PAYLOAD.equals(payload)) {
- // ... 省略 GZIP 压缩代码
- gzipped = bos.toByteArray();
- } else {
- gzipped = null;
- }
- }
- public String getPayload() {
- return payload;
- }
- public byte[] getGzipped() {
- return gzipped;
- }
- }
- this.readWriteCacheMap =
- CacheBuilder.newBuilder().initialCapacity(1000)
- .expireAfterWrite(serverConfig.getResponseCacheAutoExpirationInSeconds(), TimeUnit.SECONDS)
- .removalListener(new RemovalListener<Key, Value>() {
- @Override
- public void onRemoval(RemovalNotification<Key, Value> notification) {
- // TODO[0009]:RemoteRegionRegistry
- Key removedKey = notification.getKey();
- if (removedKey.hasRegions()) {
- Key cloneWithNoRegions = removedKey.cloneWithoutRegions();
- regionSpecificKeys.remove(cloneWithNoRegions, removedKey);
- }
- }
- })
- .build(new CacheLoader<Key, Value>() {
- @Override
- public Value load(Key key) throws Exception {
- // // TODO[0009]:RemoteRegionRegistry
- if (key.hasRegions()) {
- Key cloneWithNoRegions = key.cloneWithoutRegions();
- regionSpecificKeys.put(cloneWithNoRegions, key);
- }
- Value value = generatePayload(key);
- return value;
- }
- });
- #generatePayload(key) 方法, 实现代码如下:
- private Value generatePayload(Key key) {
- Stopwatch tracer = null;
- try {
- String payload;
- switch (key.getEntityType()) {
- case Application:
- boolean isRemoteRegionRequested = key.hasRegions();
- if (ALL_APPS.equals(key.getName())) {
- if (isRemoteRegionRequested) { // TODO[0009]:RemoteRegionRegistry
- tracer = serializeAllAppsWithRemoteRegionTimer.start();
- payload = getPayLoad(key, registry.getApplicationsFromMultipleRegions(key.getRegions()));
- } else {
- tracer = serializeAllAppsTimer.start();
- payload = getPayLoad(key, registry.getApplications());
- }
- } else if (ALL_APPS_DELTA.equals(key.getName())) {
- // ... 省略增量获取相关的代码
- } else {
- tracer = serializeOneApptimer.start();
- payload = getPayLoad(key, registry.getApplication(key.getName()));
- }
- break;
- // ... 省略部分代码
- }
- return new Value(payload);
- } finally {
- if (tracer != null) {
- tracer.stop();
- }
- }
- }
- // AbstractInstanceRegistry.java
- private static final String[] EMPTY_STR_ARRAY = new String[0];
- public Applications getApplications() {
- boolean disableTransparentFallback = serverConfig.disableTransparentFallbackToOtherRegion();
- if (disableTransparentFallback) { // TODO[0009]:RemoteRegionRegistry
- return getApplicationsFromLocalRegionOnly();
- } else {
- return getApplicationsFromAllRemoteRegions(); // Behavior of falling back to remote region can be disabled.
- }
- }
- public Applications getApplicationsFromLocalRegionOnly() {
- return getApplicationsFromMultipleRegions(EMPTY_STR_ARRAY);
- }
- public Applications getApplicationsFromMultipleRegions(String[] remoteRegions) {
- // TODO[0009]:RemoteRegionRegistry
- boolean includeRemoteRegion = null != remoteRegions && remoteRegions.length != 0;
- logger.debug("Fetching applications registry with remote regions: {}, Regions argument {}",
- includeRemoteRegion, Arrays.toString(remoteRegions));
- if (includeRemoteRegion) {
- GET_ALL_WITH_REMOTE_REGIONS_CACHE_MISS.increment();
- } else {
- GET_ALL_CACHE_MISS.increment();
- }
- // 获得获得注册的应用集合
- Applications apps = new Applications();
- apps.setVersion(1L);
- for (Entry<String, Map<String, Lease<InstanceInfo>>> entry : registry.entrySet()) {
- Application App = null;
- if (entry.getValue() != null) {
- for (Entry<String, Lease<InstanceInfo>> stringLeaseEntry : entry.getValue().entrySet()) {
- Lease<InstanceInfo> lease = stringLeaseEntry.getValue();
- if (App == null) {
- App = new Application(lease.getHolder().getAppName());
- }
- App.addInstance(decorateInstanceInfo(lease));
- }
- }
- if (App != null) {
- apps.addApplication(App);
- }
- }
- // TODO[0009]:RemoteRegionRegistry
- if (includeRemoteRegion) {
- for (String remoteRegion : remoteRegions) {
- RemoteRegionRegistry remoteRegistry = regionNameVSRemoteRegistry.get(remoteRegion);
- if (null != remoteRegistry) {
- Applications remoteApps = remoteRegistry.getApplications();
- for (Application application : remoteApps.getRegisteredApplications()) {
- if (shouldFetchFromRemoteRegistry(application.getName(), remoteRegion)) {
- logger.info("Application {} fetched from the remote region {}",
- application.getName(), remoteRegion);
- Application appInstanceTillNow = apps.getRegisteredApplications(application.getName());
- if (appInstanceTillNow == null) {
- appInstanceTillNow = new Application(application.getName());
- apps.addApplication(appInstanceTillNow);
- }
- for (InstanceInfo instanceInfo : application.getInstances()) {
- appInstanceTillNow.addInstance(instanceInfo);
- }
- } else {
- logger.debug("Application {} not fetched from the remote region {} as there exists a"
- + "whitelist and this app is not in the whitelist.",
- application.getName(), remoteRegion);
- }
- }
- } else {
- logger.warn("No remote registry available for the remote region {}", remoteRegion);
- }
- }
- }
- // 设置 应用集合 hashcode
- apps.setAppsHashCode(apps.getReconcileHashCode());
- return apps;
- }
- /**
- * Generate pay load with both JSON and xml formats for all applications.
- */
- private String getPayLoad(Key key, Applications apps) {
- // 获得编码器
- EncoderWrapper encoderWrapper = serverCodecs.getEncoder(key.getType(), key.getEurekaAccept());
- String result;
- try {
- // 编码
- result = encoderWrapper.encode(apps);
- } catch (Exception e) {
- logger.error("Failed to encode the payload for all apps", e);
- return "";
- }
- if(logger.isDebugEnabled()) {
- logger.debug("New application cache entry {} with apps hashcode {}", key.toStringCompact(), apps.getAppsHashCode());
- }
- return result;
- }
- public void invalidate(String appName, @Nullable String vipAddress, @Nullable String secureVipAddress) {
- for (Key.KeyType type : Key.KeyType.values()) {
- for (Version v : Version.values()) {
- invalidate(
- new Key(Key.EntityType.Application, appName, type, v, EurekaAccept.full),
- new Key(Key.EntityType.Application, appName, type, v, EurekaAccept.compact),
- new Key(Key.EntityType.Application, ALL_APPS, type, v, EurekaAccept.full),
- new Key(Key.EntityType.Application, ALL_APPS, type, v, EurekaAccept.compact),
- new Key(Key.EntityType.Application, ALL_APPS_DELTA, type, v, EurekaAccept.full),
- new Key(Key.EntityType.Application, ALL_APPS_DELTA, type, v, EurekaAccept.compact)
- );
- if (null != vipAddress) {
- invalidate(new Key(Key.EntityType.VIP, vipAddress, type, v, EurekaAccept.full));
- }
- if (null != secureVipAddress) {
- invalidate(new Key(Key.EntityType.SVIP, secureVipAddress, type, v, EurekaAccept.full));
- }
- }
- }
- }
- public void invalidate(Key... keys) {
- for (Key key : keys) {
- logger.debug("Invalidating the response cache key : {} {} {} {}, {}", key.getEntityType(), key.getName(), key.getVersion(), key.getType(), key.getEurekaAccept());
- // 过期读写缓存
- readWriteCacheMap.invalidate(key);
- // TODO[0009]:RemoteRegionRegistry
- Collection<Key> keysWithRegions = regionSpecificKeys.get(key);
- if (null != keysWithRegions && !keysWithRegions.isEmpty()) {
- for (Key keysWithRegion : keysWithRegions) {
- logger.debug("Invalidating the response cache key : {} {} {} {} {}",
- key.getEntityType(), key.getName(), key.getVersion(), key.getType(), key.getEurekaAccept());
- readWriteCacheMap.invalidate(keysWithRegion);
- }
- }
- }
- }
- ResponseCacheImpl(EurekaServerConfig serverConfig, ServerCodecs serverCodecs, AbstractInstanceRegistry registry) {
- // ... 省略无关代码
- long responseCacheUpdateIntervalMs = serverConfig.getResponseCacheUpdateIntervalMs();
- // ... 省略无关代码
- if (shouldUseReadOnlyResponseCache) {
- timer.schedule(getCacheUpdateTask(),
- new Date(((System.currentTimeMillis() / responseCacheUpdateIntervalMs) * responseCacheUpdateIntervalMs)
- + responseCacheUpdateIntervalMs),
- responseCacheUpdateIntervalMs);
- }
- // ... 省略无关代码
- }
- private TimerTask getCacheUpdateTask() {
- return new TimerTask() {
- @Override
- public void run() {
- logger.debug("Updating the client cache from response cache");
- for (Key key : readOnlyCacheMap.keySet()) { // 循环 readOnlyCacheMap 的缓存键
- if (logger.isDebugEnabled()) {
- Object[] args = {key.getEntityType(), key.getName(), key.getVersion(), key.getType()};
- logger.debug("Updating the client cache from response cache for key : {} {} {} {}", args);
- }
- try {
- CurrentRequestVersion.set(key.getVersion());
- Value cacheValue = readWriteCacheMap.get(key);
- Value currentCacheValue = readOnlyCacheMap.get(key);
- if (cacheValue != currentCacheValue) { // 不一致时, 进行替换
- readOnlyCacheMap.put(key, cacheValue);
- }
- } catch (Throwable th) {
- logger.error("Error while updating the client cache from response cache for key {}", key.toStringCompact(), th);
- }
- }
- }
- };
- }
来源: https://juejin.im/entry/5c994f85e51d454075334273