本文主要基于 Eureka 1.8.X 版本
1. 概述
2. Eureka-Client 发起下线
3. Eureka-Server 接收下线
3.1 接收下线请求
3.2 下线应用实例信息
RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表
RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
您对于源码的疑问每条留言都将得到认真回复甚至不知道如何读源码也可以请教噢
新的源码解析文章实时收到通知每周更新一篇左右
认真的源码交流微信群
1. 概述
本文主要分享 Eureka-Client 向 Eureka-Server 下线应用实例的过程
FROM 深度剖析服务发现组件 Netflix Eureka 二次编辑
蓝框部分, 为本文重点
非蓝框部分, Eureka-Server 集群间复制注册的应用实例信息, 不在本文内容范畴
推荐 Spring Cloud 书籍:
请支持正版下载盗版, 等于主动编写低级 BUG
程序猿 DD Spring Cloud 微服务实战
周立 Spring Cloud 与 Docker 微服务架构实战
2. Eureka-Client 发起下线
应用实例关闭时, Eureka-Client 向 Eureka-Server 发起下线应用实例需要满足如下条件才可发起:
配置
eureka.registration.enabled = true
, 应用实例开启注册开关默认为 false
配置
eureka.shouldUnregisterOnShutdown = true
, 应用实例开启关闭时下线开关默认为 true
实现代码如下:
- // DiscoveryClient.java
- public synchronized void shutdown() {
- // ... 省略无关代码
- // If APPINFO was registered
- if (applicationInfoManager != null
- && clientConfig.shouldRegisterWithEureka() // eureka.registration.enabled = true
- && clientConfig.shouldUnregisterOnShutdown()) { // eureka.shouldUnregisterOnShutdown = true
- applicationInfoManager.setInstanceStatus(InstanceStatus.DOWN);
- unregister();
- }
- }
调用
ApplicationInfoManager#setInstanceStatus(...)
方法, 设置应用实例为关闭( DOWN )
调用 #unregister() 方法, 实现代码如下:
- // DiscoveryClient.java
- void unregister() {
- // It can be null if shouldRegisterWithEureka == false
- if (eurekaTransport != null && eurekaTransport.registrationClient != null) {
- try {
- logger.info("Unregistering ...");
- EurekaHttpResponse < Void > httpResponse = eurekaTransport.registrationClient.cancel(instanceInfo.getAppName(), instanceInfo.getId());
- logger.info(PREFIX + appPathIdentifier + "- deregister status:" + httpResponse.getStatusCode());
- } catch(Exception e) {
- logger.error(PREFIX + appPathIdentifier + "- de-registration failed" + e.getMessage(), e);
- }
- }
- }
- // AbstractJerseyEurekaHttpClient.java
- @Override public EurekaHttpResponse < Void > cancel(String appName, String id) {
- String urlPath = "apps/" + appName + '/' + id;
- ClientResponse response = null;
- try {
- Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder();
- addExtraHeaders(resourceBuilder);
- response = resourceBuilder.delete(ClientResponse.class);
- return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
- } finally {
- if (logger.isDebugEnabled()) {
- logger.debug("Jersey HTTP DELETE {}/{}; statusCode={}", serviceUrl, urlPath, response == null ? "N/A": response.getStatus());
- }
- if (response != null) {
- response.close();
- }
- }
- }
调用
AbstractJerseyEurekaHttpClient#cancel(...)
方法, DELETE 请求 Eureka-Server 的
apps/${APP_NAME}/${INSTANCE_INFO_ID}
接口, 实现应用实例信息的下线
3. Eureka-Server 接收下线
3.1 接收下线请求
com.netflix.eureka.resources.InstanceResource
, 处理单个应用实例信息的请求操作的 Resource ( Controller )
下线应用实例信息的请求, 映射
InstanceResource#cancelLease()
方法, 实现代码如下:
- @DELETE
- public Response cancelLease(
- @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
- // 下线
- boolean isSuccess = registry.cancel(app.getName(), id, "true".equals(isReplication));
- if (isSuccess) { // 下线成功
- logger.debug("Found (Cancel):" + app.getName() + "-" + id);
- return Response.ok().build();
- } else { // 下线成功
- logger.info("Not Found (Cancel):" + app.getName() + "-" + id);
- return Response.status(Status.NOT_FOUND).build();
- }
- }
调用
PeerAwareInstanceRegistryImpl#cancel(...)
方法, 下线应用实例实现代码如下:
- @Override
- public boolean cancel(final String appName, final String id,
- final boolean isReplication) {
- if (super.cancel(appName, id, isReplication)) { // 下线
- // Eureka-Server 复制
- replicateToPeers(Action.Cancel, appName, id, null, null, isReplication);
- // 减少 `numberOfRenewsPerMinThreshold` `expectedNumberOfRenewsPerMin`
- synchronized (lock) {
- if (this.expectedNumberOfRenewsPerMin > 0) {
- // Since the client wants to cancel it, reduce the threshold (1 for 30 seconds, 2 for a minute)
- this.expectedNumberOfRenewsPerMin = this.expectedNumberOfRenewsPerMin - 2;
- this.numberOfRenewsPerMinThreshold = (int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
- }
- }
- return true;
- }
- return false;
- }
第 4 行 : 调用父类
AbstractInstanceRegistry#cancel(...)
方法, 下线应用实例信息
第 6 行 :Eureka-Server 复制下线操作, 在 Eureka 源码解析 Eureka-Server 集群同步 有详细解析
第 7 至 14 行 : 减少
- numberOfRenewsPerMinThreshold
- expectedNumberOfRenewsPerMin
, 自我保护机制相关, 在 Eureka 源码解析 应用实例注册发现 (四) 之自我保护机制 有详细解析
3.2 下线应用实例信息
调用
AbstractInstanceRegistry#cancel(...)
方法, 下线应用实例信息, 实现代码如下:
- @Override public boolean cancel(String appName, String id, boolean isReplication) {
- return internalCancel(appName, id, isReplication);
- }
- protected boolean internalCancel(String appName, String id, boolean isReplication) {
- try {
- // 获得读锁
- read.lock();
- // 增加 取消注册次数 到 监控
- CANCEL.increment(isReplication);
- // 移除 租约映射
- Map < String,
- Lease < InstanceInfo >> gMap = registry.get(appName);
- Lease < InstanceInfo > leaseToCancel = null;
- if (gMap != null) {
- leaseToCancel = gMap.remove(id);
- }
- // 添加到 最近取消注册的调试队列
- synchronized(recentCanceledQueue) {
- recentCanceledQueue.add(new Pair < Long, String > (System.currentTimeMillis(), appName + "(" + id + ")"));
- }
- // 移除 应用实例覆盖状态映射
- InstanceStatus instanceStatus = overriddenInstanceStatusMap.remove(id);
- if (instanceStatus != null) {
- logger.debug("Removed instance id {} from the overridden map which has value {}", id, instanceStatus.name());
- }
- // 租约不存在
- if (leaseToCancel == null) {
- CANCEL_NOT_FOUND.increment(isReplication); // 添加 取消注册不存在 到 监控
- logger.warn("DS: Registry: cancel failed because Lease is not registered for: {}/{}", appName, id);
- return false; // 失败
- } else {
- // 设置 租约的取消注册时间戳
- leaseToCancel.cancel();
- // 添加到 最近租约变更记录队列
- InstanceInfo instanceInfo = leaseToCancel.getHolder();
- String vip = null;
- String svip = null;
- if (instanceInfo != null) {
- instanceInfo.setActionType(ActionType.DELETED);
- recentlyChangedQueue.add(new RecentlyChangedItem(leaseToCancel));
- instanceInfo.setLastUpdatedTimestamp();
- vip = instanceInfo.getVIPAddress();
- svip = instanceInfo.getSecureVipAddress();
- }
- // 设置 响应缓存 过期
- invalidateCache(appName, vip, svip);
- logger.info("Cancelled instance {}/{} (replication={})", appName, id, isReplication);
- return true; // 成功
- }
- } finally {
- // 释放锁
- read.unlock();
- }
- }
第 9 行 : 获取读锁在 Eureka 源码解析 应用实例注册发现 (九)之岁月是把萌萌的读写锁 详细解析
第 10 至 11 行 : 增加下线次数到监控配合 Netflix Servo 实现监控信息采集
第 12 至 17 行 : 移除租约映射( registry )
第 18 至 21 行 : 添加到最近下线的调试队列(
recentCanceledQueue
), 用于 Eureka-Server 运维界面的显示, 无实际业务逻辑使用实现代码如下:
- /**
- * 最近取消注册的调试队列
- * key : 添加时的时间戳
- * value : 字符串 = 应用名(应用实例信息编号)
- */
- private final CircularQueue<Pair<Long, String>> recentCanceledQueue;
第 22 至 26 行 : 移除应用实例覆盖状态映射在应用实例注册发现 (八)之覆盖状态详细解析
第 27 至 31 行 : 租约不存在, 返回下线失败( false )
第 34 行 : 调用 Lease#cancel() 方法, 取消租约实现代码如下:
- // Lease.java
- public void cancel() {
- if (evictionTimestamp <= 0) {
- evictionTimestamp = System.currentTimeMillis();
- }
- }
第 35 至 45 行 : 设置应用实例信息的操作类型为添加, 并添加到最近租约变更记录队列(
- recentlyChangedQueue
- )
- recentlyChangedQueue
用于注册信息的增量获取, 在应用实例注册发现 (七)之增量获取详细解析实现代码如下:
- /**
- * 最近租约变更记录队列
- */
- private ConcurrentLinkedQueue < RecentlyChangedItem > recentlyChangedQueue = new ConcurrentLinkedQueue < RecentlyChangedItem > ();
第 47 行 : 设置响应缓存 ( ResponseCache ) 过期, 在 Eureka 源码解析 应用实例注册发现 (六)之全量获取详细解析
第 49 行 : 返回下线失败( false )
第 53 行 : 释放锁
来源: https://juejin.im/entry/5a7cc9a26fb9a063543c1f8f