本文主要基于 Eureka 1.8.X 版本
- RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表
- RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
- 您对于源码的疑问每条留言都将得到认真回复。甚至不知道如何读源码也可以请教噢。
- 新的源码解析文章实时收到通知。每周更新一篇左右。
- 认真的源码交流微信群。
本文主要分享 Eureka-Client 向 Eureka-Server 续租应用实例的过程。
FROM 《深度剖析服务发现组件 Netflix Eureka》 二次编辑
推荐 Spring Cloud 书籍:
Eureka-Client 向 Eureka-Server 发起注册应用实例成功后获得租约 (Lease)。
Eureka-Client 固定间隔向 Eureka-Server 发起续租 (renew),避免租约过期。
默认情况下,租约有效期为 90 秒,续租频率为 30 秒。两者比例为 1 : 3 ,保证在网络异常等情况下,有三次重试的机会。
Eureka-Client 在 初始化 过程中,创建心跳线程,固定间隔向 Eureka-Server 发起续租 (renew)。实现代码如下:
- // DiscoveryClient.java
- DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
- Provider<BackupRegistry> backupRegistryProvider) {
- // ... 省略无关代码
- scheduler = Executors.newScheduledThreadPool(2,
- new ThreadFactoryBuilder()
- .setNameFormat("DiscoveryClient-%d")
- .setDaemon(true)
- .build());
- heartbeatExecutor = new ThreadPoolExecutor(
- 1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
- new SynchronousQueue<Runnable>(),
- new ThreadFactoryBuilder()
- .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
- .setDaemon(true)
- .build()
- ); // use direct handoff
- // ... 省略无关代码
- // 【3.2.14】初始化定时任务
- initScheduledTasks();
- }
- private void initScheduledTasks() {
- // 向 Eureka-Server 心跳(续租)执行器
- if (clientConfig.shouldRegisterWithEureka()) {
- int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs(); // 续租频率
- int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound(); //
- logger.info("Starting heartbeat executor: " + "renew interval is: " + renewalIntervalInSecs);
- // Heartbeat timer
- scheduler.schedule(
- new TimedSupervisorTask(
- "heartbeat",
- scheduler,
- heartbeatExecutor,
- renewalIntervalInSecs,
- TimeUnit.SECONDS,
- expBackOffBound,
- new HeartbeatThread()
- ),
- renewalIntervalInSecs, TimeUnit.SECONDS);
- // ... 省略无关代码
- }
- // ... 省略无关代码
- }
方法,只延迟执行一次心跳,说好的固定频率执行心跳呢!!!答案在 「2.3 TimedSupervisorTask」 揭晓。
- ScheduledExecutorService#schedule(...)
,心跳线程,实现执行 Eureka-Client 向 Eureka-Server 发起续租 (renew) 请求。实现代码如下:
- com.netflix.discovery.DiscoveryClient.HeartbeatThread
- // DiscoveryClient.java
- /**
- * 最后成功向 Eureka-Server 心跳时间戳
- */
- private volatile long lastSuccessfulHeartbeatTimestamp = -1;
- private class HeartbeatThread implements Runnable {
- public void run() {
- if (renew()) {
- lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
- }
- }
- }
- // DiscoveryClient.java
- boolean renew() {
- EurekaHttpResponse < InstanceInfo > httpResponse;
- try {
- httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
- logger.debug("{} - Heartbeat status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode());
- if (httpResponse.getStatusCode() == 404) {
- REREGISTER_COUNTER.increment();
- logger.info("{} - Re-registering apps/{}", PREFIX + appPathIdentifier, instanceInfo.getAppName());
- long timestamp = instanceInfo.setIsDirtyWithTime();
- // 发起注册
- boolean success = register();
- if (success) {
- instanceInfo.unsetIsDirty(timestamp);
- }
- return success;
- }
- return httpResponse.getStatusCode() == 200;
- } catch(Throwable e) {
- logger.error("{} - was unable to send heartbeat!", PREFIX + appPathIdentifier, e);
- return false;
- }
- }
方法,发起续租请求,实现代码如下:
- AbstractJerseyEurekaHttpClient#sendHeartBeat(...)
- // AbstractJerseyEurekaHttpClient.java
- @Override
- public EurekaHttpResponse<InstanceInfo> sendHeartBeat(String appName, String id, InstanceInfo info, InstanceStatus overriddenStatus) {
- String urlPath = "apps/" + appName + '/' + id;
- ClientResponse response = null;
- try {
- WebResource webResource = jerseyClient.resource(serviceUrl)
- .path(urlPath)
- .queryParam("status", info.getStatus().toString())
- .queryParam("lastDirtyTimestamp", info.getLastDirtyTimestamp().toString());
- if (overriddenStatus != null) {
- webResource = webResource.queryParam("overriddenstatus", overriddenStatus.name());
- }
- Builder requestBuilder = webResource.getRequestBuilder();
- addExtraHeaders(requestBuilder);
- response = requestBuilder.put(ClientResponse.class);
- EurekaHttpResponseBuilder<InstanceInfo> eurekaResponseBuilder = anEurekaHttpResponse(response.getStatus(), InstanceInfo.class).headers(headersOf(response));
- if (response.hasEntity()) {
- eurekaResponseBuilder.entity(response.getEntity(InstanceInfo.class));
- }
- return eurekaResponseBuilder.build();
- } finally {
- if (logger.isDebugEnabled()) {
- logger.debug("Jersey HTTP PUT {}/{}; statusCode={}", serviceUrl, urlPath, response == null ? "N/A" : response.getStatus());
- }
- if (response != null) {
- response.close();
- }
- }
- }
接口,参数为 status、lastDirtyTimestamp、overriddenstatus,实现续租。
- apps/${APP_NAME}/${INSTANCE_INFO_ID}
方法,当 Eureka-Server 不存在租约时,重新发起注册,在 《Eureka 源码解析 —— 应用实例注册发现 (一)之注册》 有详细解析。
- AbstractJerseyEurekaHttpClient#register(...)
,监管定时任务的任务。
- com.netflix.discovery.TimedSupervisorTask
A supervisor task that schedules subtasks while enforce a timeout.
创建 TimedSupervisorTask 代码如下:
- public class TimedSupervisorTask extends TimerTask {
- private final Counter timeoutCounter;
- private final Counter rejectedCounter;
- private final Counter throwableCounter;
- private final LongGauge threadPoolLevelGauge;
- /**
- * 定时任务服务
- */
- private final ScheduledExecutorService scheduler;
- /**
- * 执行子任务线程池
- */
- private final ThreadPoolExecutor executor;
- /**
- * 子任务执行超时时间
- */
- private final long timeoutMillis;
- /**
- * 子任务
- */
- private final Runnable task;
- /**
- * 当前任子务执行频率
- */
- private final AtomicLong delay;
- /**
- * 最大子任务执行频率
- *
- * 子任务执行超时情况下使用
- */
- private final long maxDelay;
- public TimedSupervisorTask(String name, ScheduledExecutorService scheduler, ThreadPoolExecutor executor, int timeout, TimeUnit timeUnit, int expBackOffBound, Runnable task) {
- this.scheduler = scheduler;
- this.executor = executor;
- this.timeoutMillis = timeUnit.toMillis(timeout);
- this.task = task;
- this.delay = new AtomicLong(timeoutMillis);
- this.maxDelay = timeoutMillis * expBackOffBound;
- // Initialize the counters and register.
- timeoutCounter = Monitors.newCounter("timeouts");
- rejectedCounter = Monitors.newCounter("rejectedExecutions");
- throwableCounter = Monitors.newCounter("throwables");
- threadPoolLevelGauge = new LongGauge(MonitorConfig.builder("threadPoolUsed").build());
- Monitors.registerObject(name, this);
- }
- }
参数。
- timeout * expBackOffBound
实现代码如下:
- TimedSupervisorTask.java
- : @Override
- : public void run() {
- : Future<?> future = null;
- : try {
- : // 提交 任务
- : future = executor.submit(task);
- : //
- : threadPoolLevelGauge.set((long) executor.getActiveCount());
- : // 等待任务 执行完成 或 超时
- : future.get(timeoutMillis, TimeUnit.MILLISECONDS); // block until done or timeout
- : // 设置 下一次任务执行频率
- : delay.set(timeoutMillis);
- : //
- : threadPoolLevelGauge.set((long) executor.getActiveCount());
- : } catch (TimeoutException e) {
- : logger.error("task supervisor timed out", e);
- : timeoutCounter.increment(); //
- :
- : // 设置 下一次任务执行频率
- : long currentDelay = delay.get();
- : long newDelay = Math.min(maxDelay, currentDelay * 2);
- : delay.compareAndSet(currentDelay, newDelay);
- :
- : } catch (RejectedExecutionException e) {
- : if (executor.isShutdown() || scheduler.isShutdown()) {
- : logger.warn("task supervisor shutting down, reject the task", e);
- : } else {
- : logger.error("task supervisor rejected the task", e);
- : }
- :
- : rejectedCounter.increment(); //
- : } catch (Throwable e) {
- : if (executor.isShutdown() || scheduler.isShutdown()) {
- : logger.warn("task supervisor shutting down, can't accept the task");
- : } else {
- : logger.error("task supervisor threw an exception", e);
- : }
- :
- : throwableCounter.increment(); //
- : } finally {
- : // 取消 未完成的任务
- : if (future != null) {
- : future.cancel(true);
- : }
- :
- : // 调度 下次任务
- : if (!scheduler.isShutdown()) {
- : scheduler.schedule(this, delay.get(), TimeUnit.MILLISECONDS);
- : }
- : }
- : }
。如果多次超时,超时时间不断乘以 2 ,不允许超过最大延迟时间 (maxDelay)。
- Math.min(maxDelay, currentDelay * 2)
,处理单个应用实例信息的请求操作的 Resource (Controller)。
- com.netflix.eureka.resources.InstanceResource
续租应用实例信息的请求,映射
方法,实现代码如下:
- InstanceResource#renewLease()
- : @PUT
- : public Response renewLease(
- : @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication,
- : @QueryParam("overriddenstatus") String overriddenStatus,
- : @QueryParam("status") String status,
- : @QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) {
- : boolean isFromReplicaNode = "true".equals(isReplication);
- : // 续租
- : boolean isSuccess = registry.renew(app.getName(), id, isFromReplicaNode);
- :
- : // 续租失败
- : // Not found in the registry, immediately ask for a register
- : if (!isSuccess) {
- : logger.warn("Not Found (Renew): {} - {}", app.getName(), id);
- : return Response.status(Status.NOT_FOUND).build();
- : }
- :
- : // 比较 InstanceInfo 的 lastDirtyTimestamp 属性
- : // Check if we need to sync based on dirty time stamp, the client
- : // instance might have changed some value
- : Response response = null;
- : if (lastDirtyTimestamp != null && serverConfig.shouldSyncWhenTimestampDiffers()) {
- : response = this.validateDirtyTimestamp(Long.valueOf(lastDirtyTimestamp), isFromReplicaNode);
- : // Store the overridden status since the validation found out the node that replicates wins
- : if (response.getStatus() == Response.Status.NOT_FOUND.getStatusCode()
- : && (overriddenStatus != null)
- : && !(InstanceStatus.UNKNOWN.name().equals(overriddenStatus))
- : && isFromReplicaNode) {
- : registry.storeOverriddenStatusIfRequired(app.getAppName(), id, InstanceStatus.valueOf(overriddenStatus));
- : }
- : } else { // 成功
- : response = Response.ok().build();
- : }
- : logger.debug("Found (Renew): {} - {}; reply status={}" + app.getName(), id, response.getStatus());
- : return response;
- : }
方法,续租。实现代码如下:
- PeerAwareInstanceRegistryImpl#renew(...)
- // PeerAwareInstanceRegistryImpl.java
- public boolean renew(final String appName, final String id, final boolean isReplication) {
- if (super.renew(appName, id, isReplication)) { // 续租
- // Eureka-Server 复制
- replicateToPeers(Action.Heartbeat, appName, id, null, null, isReplication);
- return true;
- }
- return false;
- }
方法,注册应用实例信息。
- AbstractInstanceRegistry#renew(...)
(默认开启)。
- eureka.syncWhenTimestampDiffers = true
方法,比较 lastDirtyTimestamp 的差异。实现代码如下:
- #validateDirtyTimestamp(...)
- InstanceResource.java
- : private Response validateDirtyTimestamp(Long lastDirtyTimestamp, boolean isReplication) {
- : // 获取 InstanceInfo
- : InstanceInfo appInfo = registry.getInstanceByAppAndId(app.getName(), id, false);
- : if (appInfo != null) {
- : if ((lastDirtyTimestamp != null) && (!lastDirtyTimestamp.equals(appInfo.getLastDirtyTimestamp()))) {
- : Object[] args = {id, appInfo.getLastDirtyTimestamp(), lastDirtyTimestamp, isReplication};
- : // 请求 的 较大
- : if (lastDirtyTimestamp > appInfo.getLastDirtyTimestamp()) {
- : logger.debug("Time to sync, since the last dirty timestamp differs -"
- : + " ReplicationInstance id : {},Registry : {} Incoming: {} Replication: {}", args);
- : return Response.status(Status.NOT_FOUND).build();
- : // Server 的 较大
- : } else if (appInfo.getLastDirtyTimestamp() > lastDirtyTimestamp) {
- : // In the case of replication, send the current instance info in the registry for the
- : // replicating node to sync itself with this one.
- : if (isReplication) {
- : logger.debug(
- : "Time to sync, since the last dirty timestamp differs -"
- : + " ReplicationInstance id : {},Registry : {} Incoming: {} Replication: {}",
- : args);
- : return Response.status(Status.CONFLICT).entity(appInfo).build();
- : } else {
- : return Response.ok().build();
- : }
- : }
- : }
- :
- : }
- : return Response.ok().build();
- : }
调用
方法,续租应用实例信息,实现代码如下:
- AbstractInstanceRegistry#renew(...)
- : public boolean renew(String appName, String id, boolean isReplication) {
- : // 增加 续租次数 到 监控
- : RENEW.increment(isReplication);
- : // 获得 租约
- : Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
- : Lease<InstanceInfo> leaseToRenew = null;
- : if (gMap != null) {
- : leaseToRenew = gMap.get(id);
- : }
- : // 租约不存在
- : if (leaseToRenew == null) {
- : RENEW_NOT_FOUND.increment(isReplication);
- : logger.warn("DS: Registry: lease doesn't exist, registering resource: {} - {}", appName, id);
- : return false;
- : } else {
- : InstanceInfo instanceInfo = leaseToRenew.getHolder();
- : if (instanceInfo != null) {
- : // touchASGCache(instanceInfo.getASGName());
- : // override status
- : InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(
- : instanceInfo, leaseToRenew, isReplication);
- : if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) {
- : logger.info("Instance status UNKNOWN possibly due to deleted override for instance {}"
- : + "; re-register required", instanceInfo.getId());
- : RENEW_NOT_FOUND.increment(isReplication);
- : return false;
- : }
- : if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) {
- : Object[] args = {
- : instanceInfo.getStatus().name(),
- : instanceInfo.getOverriddenStatus().name(),
- : instanceInfo.getId()
- : };
- : logger.info(
- : "The instance status {} is different from overridden instance status {} for instance {}. "
- : + "Hence setting the status to overridden status", args);
- : instanceInfo.setStatusWithoutDirty(overriddenInstanceStatus);
- : }
- : }
- : // 新增 续租每分钟次数
- : renewsLastMin.increment();
- : // 设置 租约最后更新时间(续租)
- : leaseToRenew.renew();
- : return true;
- : }
- : }
,速度测量类,实现代码如下:
- com.netflix.eureka.util.MeasuredRate
- // AbstractInstanceRegistry.java
- /**
- * 续租每分钟次数
- */
- private final MeasuredRate renewsLastMin;
- // MeasuredRate.java
- public class MeasuredRate {
- /**
- * 上一个间隔次数
- */
- private final AtomicLong lastBucket = new AtomicLong(0);
- /**
- * 当前间隔次数
- */
- private final AtomicLong currentBucket = new AtomicLong(0);
- /**
- * 间隔
- */
- private final long sampleInterval;
- /**
- * 定时器
- */
- private final Timer timer;
- private volatile boolean isActive;
- public MeasuredRate(long sampleInterval) {
- this.sampleInterval = sampleInterval;
- this.timer = new Timer("Eureka-MeasureRateTimer", true);
- this.isActive = false;
- }
- public synchronized void start() {
- if (!isActive) {
- timer.schedule(new TimerTask() {@Override public void run() {
- try {
- // Zero out the current bucket.
- lastBucket.set(currentBucket.getAndSet(0));
- } catch(Throwable e) {
- logger.error("Cannot reset the Measured Rate", e);
- }
- }
- },
- sampleInterval, sampleInterval);
- isActive = true;
- }
- }
- public synchronized void stop() {
- if (isActive) {
- timer.cancel();
- isActive = false;
- }
- }
- /**
- * Returns the count in the last sample interval.
- */
- public long getCount() {
- return lastBucket.get();
- }
- /**
- * Increments the count in the current sample interval.
- */
- public void increment() {
- currentBucket.incrementAndGet();
- }
- }
- public void renew() {
- lastUpdateTimestamp = System.currentTimeMillis() + duration;
- }
效率比想象的低一些,加油继续更新下一篇。
胖友,分享我的公众号 (芋道源码) 给你的胖友可好?
来源: https://juejin.im/entry/5a383a8b51882574d23c7361