本文主要基于 Eureka 1.8.X 版本
1. 概述
2. Eureka-Client 发起续租
2.1 初始化定时任务
- 2.2 HeartbeatThread
- 2.3 TimedSupervisorTask
3. Eureka-Server 接收续租
3.1 接收续租请求
3.2 续租应用实例信息
RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表
RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
您对于源码的疑问每条留言都将得到认真回复甚至不知道如何读源码也可以请教噢
新的源码解析文章实时收到通知每周更新一篇左右
认真的源码交流微信群
1. 概述
本文主要分享 Eureka-Client 向 Eureka-Server 续租应用实例的过程
FROM 深度剖析服务发现组件 Netflix Eureka 二次编辑
蓝框部分, 为本文重点
非蓝框部分, Eureka-Server 集群间复制注册的应用实例信息, 不在本文内容范畴
推荐 Spring Cloud 书籍:
请支持正版下载盗版, 等于主动编写低级 BUG
程序猿 DD Spring Cloud 微服务实战
周立 Spring Cloud 与 Docker 微服务架构实战
2. Eureka-Client 发起续租
Eureka-Client 向 Eureka-Server 发起注册应用实例成功后获得租约 ( Lease ) Eureka-Client 固定间隔向 Eureka-Server 发起续租( renew ), 避免租约过期
默认情况下, 租约有效期为 90 秒, 续租频率为 30 秒两者比例为 1 : 3 , 保证在网络异常等情况下, 有三次重试的机会
2.1 初始化定时任务
Eureka-Client 在初始化过程中, 创建心跳线程, 固定间隔向 Eureka-Server 发起续租 ( renew ) 实现代码如下:
- // DiscoveryClient.java
- DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
- Provider backupRegistryProvider) {
- // ... 省略无关代码
- scheduler = Executors.newScheduledThreadPool(2,
- new ThreadFactoryBuilder()
- .setNameFormat("DiscoveryClient-%d")
- .setDaemon(true)
- .build());
- heartbeatExecutor = new ThreadPoolExecutor(
- 1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
- new SynchronousQueue(),
- new ThreadFactoryBuilder()
- .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
- .setDaemon(true)
- .build()
- ); // use direct handoff
- // ... 省略无关代码
- // 3.2.14 初始化定时任务
- initScheduledTasks();
- }
- private void initScheduledTasks() {
- // 向 Eureka-Server 心跳 (续租) 执行器
- if (clientConfig.shouldRegisterWithEureka()) {
- int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs(); // 续租频率
- int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound(); //
- logger.info("Starting heartbeat executor:" + "renew interval is:" + renewalIntervalInSecs);
- // Heartbeat timer
- scheduler.schedule(
- new TimedSupervisorTask(
- "heartbeat",
- scheduler,
- heartbeatExecutor,
- renewalIntervalInSecs,
- TimeUnit.SECONDS,
- expBackOffBound,
- new HeartbeatThread()
- ),
- renewalIntervalInSecs, TimeUnit.SECONDS);
- // ... 省略无关代码
- }
- // ... 省略无关代码
- }
scheduler, 定时任务服务, 用于定时触发心跳 ( 续租 ) 细心如你, 会发现任务提交的方式是
ScheduledExecutorService#schedule(...)
方法, 只延迟执行一次心跳, 说好的固定频率执行心跳呢!!! 答案在 2.3 TimedSupervisorTask 揭晓
heartbeatExecutor, 心跳任务执行线程池为什么有 scheduler 的情况下, 还有 heartbeatExecutor ??? 答案也在 2.3 TimedSupervisorTask 揭晓
HeartbeatThread, 心跳线程, 在 2.2 TimedSupervisorTask 详细解析
- 2.2 HeartbeatThread
- com.netflix.discovery.DiscoveryClient.HeartbeatThread
, 心跳线程, 实现执行 Eureka-Client 向 Eureka-Server 发起续租 ( renew ) 请求实现代码如下:
- // DiscoveryClient.java
- /**
- * 最后成功向 Eureka-Server 心跳时间戳
- */
- private volatile long lastSuccessfulHeartbeatTimestamp = -1;
- private class HeartbeatThread implements Runnable {
- public void run() {
- if (renew()) {
- lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
- }
- }
- }
调用 #renew 方法, 执行续租逻辑实现代码如下:
- // DiscoveryClient.java
- boolean renew() {
- EurekaHttpResponse httpResponse;
- try {
- httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
- logger.debug("{} - Heartbeat status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode());
- if (httpResponse.getStatusCode() == 404) {
- REREGISTER_COUNTER.increment();
- logger.info("{} - Re-registering apps/{}", PREFIX + appPathIdentifier, instanceInfo.getAppName());
- long timestamp = instanceInfo.setIsDirtyWithTime();
- // 发起注册
- boolean success = register();
- if (success) {
- instanceInfo.unsetIsDirty(timestamp);
- }
- return success;
- }
- return httpResponse.getStatusCode() == 200;
- } catch (Throwable e) {
- logger.error("{} - was unable to send heartbeat!", PREFIX + appPathIdentifier, e);
- return false;
- }
- }
调用
AbstractJerseyEurekaHttpClient#sendHeartBeat(...)
方法, 发起续租请求, 实现代码如下:
- // AbstractJerseyEurekaHttpClient.java
- @Override
- public EurekaHttpResponse sendHeartBeat(String appName, String id, InstanceInfo info, InstanceStatus overriddenStatus) {
- String urlPath = "apps/" + appName + '/' + id;
- ClientResponse response = null;
- try {
- WebResource webResource = jerseyClient.resource(serviceUrl)
- .path(urlPath)
- .queryParam("status", info.getStatus().toString())
- .queryParam("lastDirtyTimestamp", info.getLastDirtyTimestamp().toString());
- if (overriddenStatus != null) {
- webResource = webResource.queryParam("overriddenstatus", overriddenStatus.name());
- }
- Builder requestBuilder = webResource.getRequestBuilder();
- addExtraHeaders(requestBuilder);
- response = requestBuilder.put(ClientResponse.class);
- EurekaHttpResponseBuilder eurekaResponseBuilder = anEurekaHttpResponse(response.getStatus(), InstanceInfo.class).headers(headersOf(response));
- if (response.hasEntity()) {
- eurekaResponseBuilder.entity(response.getEntity(InstanceInfo.class));
- }
- return eurekaResponseBuilder.build();
- } finally {
- if (logger.isDebugEnabled()) {
- logger.debug("Jersey HTTP PUT {}/{}; statusCode={}", serviceUrl, urlPath, response == null ? "N/A" : response.getStatus());
- }
- if (response != null) {
- response.close();
- }
- }
- }
PUT 请求 Eureka-Server 的
apps/${APP_NAME}/${INSTANCE_INFO_ID}
接口, 参数为 statuslastDirtyTimestampoverriddenstatus, 实现续租
调用
AbstractJerseyEurekaHttpClient#register(...)
方法, 当 Eureka-Server 不存在租约时, 重新发起注册, 在 Eureka 源码解析 应用实例注册发现 (一)之注册有详细解析
- 2.3 TimedSupervisorTask
- com.netflix.discovery.TimedSupervisorTask
, 监管定时任务的任务
A supervisor task that schedules subtasks while enforce a timeout.
创建 TimedSupervisorTask 代码如下:
- public class TimedSupervisorTask extends TimerTask {
- private final Counter timeoutCounter;
- private final Counter rejectedCounter;
- private final Counter throwableCounter;
- private final LongGauge threadPoolLevelGauge;
- /**
- * 定时任务服务
- */
- private final ScheduledExecutorService scheduler;
- /**
- * 执行子任务线程池
- */
- private final ThreadPoolExecutor executor;
- /**
- * 子任务执行超时时间
- */
- private final long timeoutMillis;
- /**
- * 子任务
- */
- private final Runnable task;
- /**
- * 当前任子务执行频率
- */
- private final AtomicLong delay;
- /**
- * 最大子任务执行频率
- *
- * 子任务执行超时情况下使用
- */
- private final long maxDelay;
- public TimedSupervisorTask(String name, ScheduledExecutorService scheduler, ThreadPoolExecutor executor,
- int timeout, TimeUnit timeUnit, int expBackOffBound, Runnable task) {
- this.scheduler = scheduler;
- this.executor = executor;
- this.timeoutMillis = timeUnit.toMillis(timeout);
- this.task = task;
- this.delay = new AtomicLong(timeoutMillis);
- this.maxDelay = timeoutMillis * expBackOffBound;
- // Initialize the counters and register.
- timeoutCounter = Monitors.newCounter("timeouts");
- rejectedCounter = Monitors.newCounter("rejectedExecutions");
- throwableCounter = Monitors.newCounter("throwables");
- threadPoolLevelGauge = new LongGauge(MonitorConfig.builder("threadPoolUsed").build());
- Monitors.registerObject(name, this);
- }
- }
scheduler , 定时任务服务, 用于定时发起子任务
executor , 执行子任务线程池, 用于提交子任务执行
task , 子任务
timeoutMillis , 子任务执行超时时间, 单位: 毫秒
delay , 当前子任务执行频率, 单位: 毫秒值等于 timeout 参数
maxDelay , 最大子任务执行频率, 单位: 毫秒值等于
timeout * expBackOffBound
参数
scheduler 初始化延迟执行 TimedSupervisorTask
TimedSupervisorTask 执行时, 提交 task 到 executor 执行任务
当 task 执行正常, TimedSupervisorTask 再次提交自己到 scheduler 延迟 timeoutMillis 执行
当 task 执行超时, 重新计算延迟时间( 不允许超过 maxDelay ), 再次提交自己到 scheduler 延迟执行
实现代码如下:
- // TimedSupervisorTask.java
- @Override
- public void run() {
- Future future = null;
- try {
- // 提交 任务
- future = executor.submit(task);
- //
- threadPoolLevelGauge.set((long) executor.getActiveCount());
- // 等待任务 执行完成 或 超时
- future.get(timeoutMillis, TimeUnit.MILLISECONDS); // block until done or timeout
- // 设置 下一次任务执行频率
- delay.set(timeoutMillis);
- //
- threadPoolLevelGauge.set((long) executor.getActiveCount());
- } catch (TimeoutException e) {
- logger.error("task supervisor timed out", e);
- timeoutCounter.increment(); //
- // 设置 下一次任务执行频率
- long currentDelay = delay.get();
- long newDelay = Math.min(maxDelay, currentDelay * 2);
- delay.compareAndSet(currentDelay, newDelay);
- } catch (RejectedExecutionException e) {
- if (executor.isShutdown() || scheduler.isShutdown()) {
- logger.warn("task supervisor shutting down, reject the task", e);
- } else {
- logger.error("task supervisor rejected the task", e);
- }
- rejectedCounter.increment(); //
- } catch (Throwable e) {
- if (executor.isShutdown() || scheduler.isShutdown()) {
- logger.warn("task supervisor shutting down, can't accept the task");
- } else {
- logger.error("task supervisor threw an exception", e);
- }
- throwableCounter.increment(); //
- } finally {
- // 取消 未完成的任务
- if (future != null) {
- future.cancel(true);
- }
- // 调度 下次任务
- if (!scheduler.isShutdown()) {
- scheduler.schedule(this, delay.get(), TimeUnit.MILLISECONDS);
- }
- }
- }
第 5 至 6 行 : 提交子任务 task 到执行子任务线程池 executor
第 9 至 10 行 : 等待子任务 task 执行完成或执行超时
第 11 至 12 行 : 子任务 task 执行完成, 设置下一次执行延迟 delay
第 19 至 22 行 : 子任务 task 执行超时, 重新计算下一次执行延迟 delay 计算公式为
Math.min(maxDelay, currentDelay * 2)
如果多次超时, 超时时间不断乘以 2 , 不允许超过最大延迟时间( maxDelay )
第 41 至 44 行 : 强制取消未完成的子任务
第 46 至 49 行 : 调度下一次 TimedSupervisorTask
3. Eureka-Server 接收续租
3.1 接收续租请求
com.netflix.eureka.resources.InstanceResource
, 处理单个应用实例信息的请求操作的 Resource ( Controller )
续租应用实例信息的请求, 映射
InstanceResource#renewLease()
方法, 实现代码如下:
- @PUT
- public Response renewLease(
- @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication,
- @QueryParam("overriddenstatus") String overriddenStatus,
- @QueryParam("status") String status,
- @QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) {
- boolean isFromReplicaNode = "true".equals(isReplication);
- // 续租
- boolean isSuccess = registry.renew(app.getName(), id, isFromReplicaNode);
- // 续租失败
- // Not found in the registry, immediately ask for a register
- if (!isSuccess) {
- logger.warn("Not Found (Renew): {} - {}", app.getName(), id);
- return Response.status(Status.NOT_FOUND).build();
- }
- // 比较 InstanceInfo 的 lastDirtyTimestamp 属性
- // Check if we need to sync based on dirty time stamp, the client
- // instance might have changed some value
- Response response = null;
- if (lastDirtyTimestamp != null && serverConfig.shouldSyncWhenTimestampDiffers()) {
- response = this.validateDirtyTimestamp(Long.valueOf(lastDirtyTimestamp), isFromReplicaNode);
- // Store the overridden status since the validation found out the node that replicates wins
- if (response.getStatus() == Response.Status.NOT_FOUND.getStatusCode()
- && (overriddenStatus != null)
- && !(InstanceStatus.UNKNOWN.name().equals(overriddenStatus))
- && isFromReplicaNode) {
- registry.storeOverriddenStatusIfRequired(app.getAppName(), id, InstanceStatus.valueOf(overriddenStatus));
- }
- } else { // 成功
- response = Response.ok().build();
- }
- logger.debug("Found (Renew): {} - {}; reply status={}" + app.getName(), id, response.getStatus());
- return response;
- }
第 8 至 9 行 : 调用
PeerAwareInstanceRegistryImpl#renew(...)
方法, 续租实现代码如下:
- // PeerAwareInstanceRegistryImpl.java
- public boolean renew(final String appName, final String id, final boolean isReplication) {
- if (super.renew(appName, id, isReplication)) { // 续租
- // Eureka-Server 复制
- replicateToPeers(Action.Heartbeat, appName, id, null, null, isReplication);
- return true;
- }
- return false;
- }
调用父类
AbstractInstanceRegistry#renew(...)
方法, 注册应用实例信息
第 11 至 16 行 : 续租失败, 返回 404 响应当 Eureka-Client 收到 404 响应后, 会重新发起 InstanceInfo 的注册
第 18 至 30 行 : 比较请求的 lastDirtyTimestamp 和 Server 的 InstanceInfo 的 lastDirtyTimestamp 属性差异, 需要配置
- eureka.syncWhenTimestampDiffers = true
- ( 默认开启 )
第 23 行 : 调用
#validateDirtyTimestamp(...)
方法, 比较 lastDirtyTimestamp 的差异实现代码如下:
- // InstanceResource.java
- private Response validateDirtyTimestamp(Long lastDirtyTimestamp, boolean isReplication) {
- // 获取 InstanceInfo
- InstanceInfo appInfo = registry.getInstanceByAppAndId(app.getName(), id, false);
- if (appInfo != null) {
- if ((lastDirtyTimestamp != null) && (!lastDirtyTimestamp.equals(appInfo.getLastDirtyTimestamp()))) {
- Object[] args = {
- id,
- appInfo.getLastDirtyTimestamp(),
- lastDirtyTimestamp,
- isReplication
- };
- // 请求 的 较大
- if (lastDirtyTimestamp > appInfo.getLastDirtyTimestamp()) {
- logger.debug("Time to sync, since the last dirty timestamp differs -" + "ReplicationInstance id : {},Registry : {} Incoming: {} Replication: {}", args);
- return Response.status(Status.NOT_FOUND).build();
- // Server 的 较大
- } else if (appInfo.getLastDirtyTimestamp() > lastDirtyTimestamp) {
- // In the case of replication, send the current instance info in the registry for the
- // replicating node to sync itself with this one.
- if (isReplication) {
- logger.debug("Time to sync, since the last dirty timestamp differs -" + "ReplicationInstance id : {},Registry : {} Incoming: {} Replication: {}", args);
- return Response.status(Status.CONFLICT).entity(appInfo).build();
- } else {
- return Response.ok().build();
- }
- }
- }
- }
- return Response.ok().build();
- }
第 7 至 11 行 : 请求的 lastDirtyTimestamp 较大, 意味着请求方 ( 可能是 Eureka-Client , 也可能是 Eureka-Server 集群内的其他 Server ) 存在 InstanceInfo 和 Eureka-Server 的 InstanceInfo 的数据不一致, 返回 404 响应请求方收到 404 响应后重新发起注册
第 16 至 21 行 :Eureka 源码解析 Eureka-Server 集群同步 有详细解析
第 22 至 24 行 :Server 的 lastDirtyTimestamp 较大, 并且请求方为 Eureka-Client, 续租成功, 返回 200 成功响应
第 29 行 :lastDirtyTimestamp 一致, 返回 200 成功响应
第 24 至 30 行 :Eureka 源码解析 Eureka-Server 集群同步 有详细解析
第 31 至 33 行 : 续租成功, 返回 200 成功响应
3.2 续租应用实例信息
调用
AbstractInstanceRegistry#renew(...)
方法, 续租应用实例信息, 实现代码如下:
- public boolean renew(String appName, String id, boolean isReplication) {
- // 增加 续租次数 到 监控
- RENEW.increment(isReplication);
- // 获得 租约
- Map> gMap = registry.get(appName);
- Lease leaseToRenew = null;
- if (gMap != null) {
- leaseToRenew = gMap.get(id);
- }
- // 租约不存在
- if (leaseToRenew == null) {
- RENEW_NOT_FOUND.increment(isReplication);
- logger.warn("DS: Registry: lease doesn't exist, registering resource: {} - {}", appName, id);
- return false;
- } else {
- InstanceInfo instanceInfo = leaseToRenew.getHolder();
- if (instanceInfo != null) {
- // touchASGCache(instanceInfo.getASGName());
- // override status
- InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(
- instanceInfo, leaseToRenew, isReplication);
- if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) {
- logger.info("Instance status UNKNOWN possibly due to deleted override for instance {}"
- + "; re-register required", instanceInfo.getId());
- RENEW_NOT_FOUND.increment(isReplication);
- return false;
- }
- if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) {
- Object[] args = {
- instanceInfo.getStatus().name(),
- instanceInfo.getOverriddenStatus().name(),
- instanceInfo.getId()
- };
- logger.info(
- "The instance status {} is different from overridden instance status {} for instance {}."
- + "Hence setting the status to overridden status", args);
- instanceInfo.setStatusWithoutDirty(overriddenInstanceStatus);
- }
- }
- // 新增 续租每分钟次数
- renewsLastMin.increment();
- // 设置 租约最后更新时间(续租)
- leaseToRenew.renew();
- return true;
- }
- }
第 2 至 3 行 : 增加续租次数到监控配合 Netflix Servo 实现监控信息采集
第 4 至 9 行 : 获得租约( Lease )
第 10 至 14 行 : 租约不存在, 返回续租失败( false )
第 19 至 21 行 : 获得应用实例的最终状态在应用实例注册发现 (八)之覆盖状态详细解析
第 22 至 27 行 : 应用实例的最终状态为 UNKNOWN, 无法续约, 返回 false 在应用实例注册发现 (八)之覆盖状态详细解析
第 28 至 37 行 : 应用实例的状态与最终状态不相等, 使用最终状态覆盖应用实例的状态在应用实例注册发现 (八)之覆盖状态详细解析
第 40 至 41 行 : 新增续租每分钟次数( renewsLastMin )
com.netflix.eureka.util.MeasuredRate
, 速度测量类, 实现代码如下:
- // AbstractInstanceRegistry.java
- /**
- * 续租每分钟次数
- */
- private final MeasuredRate renewsLastMin;
- // MeasuredRate.java
- public class MeasuredRate {
- /**
- * 上一个间隔次数
- */
- private final AtomicLong lastBucket = new AtomicLong(0);
- /**
- * 当前间隔次数
- */
- private final AtomicLong currentBucket = new AtomicLong(0);
- /**
- * 间隔
- */
- private final long sampleInterval;
- /**
- * 定时器
- */
- private final Timer timer;
- private volatile boolean isActive;
- public MeasuredRate(long sampleInterval) {
- this.sampleInterval = sampleInterval;
- this.timer = new Timer("Eureka-MeasureRateTimer", true);
- this.isActive = false;
- }
- public synchronized void start() {
- if (!isActive) {
- timer.schedule(new TimerTask() {
- @Override
- public void run() {
- try {
- // Zero out the current bucket.
- lastBucket.set(currentBucket.getAndSet(0));
- } catch (Throwable e) {
- logger.error("Cannot reset the Measured Rate", e);
- }
- }
- }, sampleInterval, sampleInterval);
- isActive = true;
- }
- }
- public synchronized void stop() {
- if (isActive) {
- timer.cancel();
- isActive = false;
- }
- }
- /**
- * Returns the count in the last sample interval.
- */
- public long getCount() {
- return lastBucket.get();
- }
- /**
- * Increments the count in the current sample interval.
- */
- public void increment() {
- currentBucket.incrementAndGet();
- }
- }
timer , 定时器, 负责每个 sampleInterval 间隔重置当前次数( currentBucket ), 并将原当前次数设置到上一个次数( lastBucket )
- #increment() 方法, 返回当前次数( currentBucket )
- #getCount() 方法, 返回上一个次数( lastBucket )
renewsLastMin 有如下用途:
配合 Netflix Servo 实现监控信息采集续租每分钟次数
Eureka-Server 运维界面的显示续租每分钟次数
自我保护机制, 在 Eureka 源码解析 应用实例注册发现 (四)之自我保护机制 详细解析
第 42 至 43 行 : 调用 Lease#renew() 方法, 设置租约最后更新时间( 续租 ), 实现代码如下:
- public void renew() {
- lastUpdateTimestamp = System.currentTimeMillis() + duration;
- }
- x
第 44 行 : 返回续租成功( true )
整个过程修改的租约的过期时间, 即使并发请求, 也不会对数据的一致性产生不一致的影响, 因此像注册操作一样加锁
来源: http://www.suo.im/26aViC