What is eureka ?
Eureka is a REST (Representational State Transfer) based service that is primarily used in the AWS cloud for locating services for the purpose of load balancing and failover of middle-tier servers. We call this service, the Eureka Server. Eureka also comes with a Java-based client component,the Eureka Client, which makes interactions with the service much easier. The client also has a built-in load balancer that does basic round-robin load balancing.
eureka 发音 [juˈri:kə] 伊瑞克
翻译后可简单概括为:
Eureka 是一个基于 REST 的服务, 用于定位服务, 以实现云端中间层服务发现和故障转移的中间件. 它有两个重要组成部分, Eureka 服务端和基于 JAVA 的客户端组件.
eureka 核心模块
由上图可以看出, eureka 主要有三种角色:
Eureka Server
eureka 服务端, 主要用来做服务的注册与发现
Service Provider
服务的实际提供方, 会将服务注册到 Eureka Server 上
Service Consumer
服务消费方, 从 Eureka Server 获取服务列表, 向 Service Provider 发起真实调用请求
TIP: 这三个角色是逻辑上的划分, 可能在使用时, 这几个角色可以是同一个实例;
一个 Service Provider 既可以是 Service Consumer, 也可以是 Service Provider
上图进一步展示了 3 个角色之间的交互.
ServiceProvider 会向 Eureka Server 做 Register(服务注册),Renew(服务续约),Cancel(服务下线)等操作.
EurekaServer 之间会做注册服务的同步, 从而保证状态一致
ServiceConsumer 会向 Eureka Server 获取注册服务列表, 并消费服务
eureka 源码阅读入口
eureka 的服务端
A: EurekaBootStrap.java 实现了 ServletContextListener 接口, 当项目启动时会初始化该类, 触发 contextInitialized 方法的执行.
B: contextInitialized 中调用了 initEurekaServerContext 方法.
C:
initEurekaServerContext 方法依次调用了三个方法
PeerAwareInstanceRegistryImpl 初始化
PeerEurekaNodes 初始化
DefaultEurekaServerContext 初始化, 该类有 @Singleton 注解, 并含有 @PostConstruct 注解 (构造方法执行后执行) 的 initialize 方法
D:
initialize 调用了两个方法
PeerEurekaNodes 的 start 方法
PeerAwareInstanceRegistryImpl 的 init 方法
E:
init 方法依次调用三个方法
- initializedResponseCache 方法, 该方法继续调用 ResponseCacheImpl 类, 最后使用的是 guava cache
- scheduleRenewalThresholdUpdateTask 使用 Timer 做定时任务定时更新新注册服务配置参数更新
- initRemoteRegionRegistry 初始化注册中心
eureka 的客户端
DiscoveryClient.java 这个类中含有了 client 侧的很多操作:
- register()
- renew()
- unregister()
fetchRegistry() 等等
register()方法调用过程
最为主要的 register 方法, 是在 DiscoveryClient 初始化的过程中被调用, 如下 DiscoveryClient 构造方法中调用了 initScheduledTasks()
- @Inject
- DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
- Provider<BackupRegistry> backupRegistryProvider) {
- ...
- if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {
- fetchRegistryFromBackup();
- }
- // call and execute the pre registration handler before all background tasks (inc registration) is started
- if (this.preRegistrationHandler != null) {
- this.preRegistrationHandler.beforeRegistration();
- }
- initScheduledTasks();
- try {
- Monitors.registerObject(this);
- } catch (Throwable e) {
- logger.warn("Cannot register timers", e);
- }
- // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
- // to work with DI'd DiscoveryClient
- DiscoveryManager.getInstance().setDiscoveryClient(this);
- DiscoveryManager.getInstance().setEurekaClientConfig(config);
- }
initScheduledTasks 方法中启动了 InstanceInfoReplicator 线程,
- instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds())
- private void initScheduledTasks() {
- // Heartbeat timer
- scheduler.schedule(
- new TimedSupervisorTask(
- "heartbeat",
- scheduler,
- heartbeatExecutor,
- renewalIntervalInSecs,
- TimeUnit.SECONDS,
- expBackOffBound,
- new HeartbeatThread()
- ),
- renewalIntervalInSecs, TimeUnit.SECONDS);
- // InstanceInfo replicator
- instanceInfoReplicator = new InstanceInfoReplicator(
- this,
- instanceInfo,
- clientConfig.getInstanceInfoReplicationIntervalSeconds(),
- 2); // burstSize
- statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
- @Override
- public String getId() {
- return "statusChangeListener";
- }
- ...
- instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
- } else {
- logger.info("Not registering with Eureka server per configuration");
- }
- }
InstanceInfoReplicator 实现了 Runnable 接口是一个线程, 其 run 方法逻辑如下, 可以看到其调用了 discoveryClient.register();
- public void run() {
- try {
- discoveryClient.refreshInstanceInfo();
- Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
- if (dirtyTimestamp != null) {
- discoveryClient.register();
- instanceInfo.unsetIsDirty(dirtyTimestamp);
- }
- } catch (Throwable t) {
- logger.warn("There was a problem with the instance info replicator", t);
- } finally {
- Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
- scheduledPeriodicRef.set(next);
- }
- }
再来看看 DiscoveryClient 的 register 方法
- /**
- * Register with the eureka service by making the appropriate REST call.
- */
- boolean register() throws Throwable {
- logger.info(PREFIX + appPathIdentifier + ": registering service...");
- EurekaHttpResponse<Void> httpResponse;
- try {
- httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
- } catch (Exception e) {
- logger.warn("{} - registration failed {}", PREFIX + appPathIdentifier, e.getMessage(), e);
- throw e;
- }
- if (logger.isInfoEnabled()) {
- logger.info("{} - registration status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode());
- }
- return httpResponse.getStatusCode() == 204;
- }
renew ()方法调用过程
Renew(服务续约)操作由 Service Provider 定期调用, 类似于 heartbeat. 主要是用来告诉 Eureka Server Service Provider 还活着, 避免服务被剔除掉
- /**
- * Renew with the eureka service by making the appropriate REST call
- */
- boolean renew() {
- EurekaHttpResponse<InstanceInfo> httpResponse;
- try {
- httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
- logger.debug("{} - Heartbeat status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode());
- if (httpResponse.getStatusCode() == 404) {
- REREGISTER_COUNTER.increment();
- logger.info("{} - Re-registering apps/{}", PREFIX + appPathIdentifier, instanceInfo.getAppName());
- return register();
- }
- return httpResponse.getStatusCode() == 200;
- } catch (Throwable e) {
- logger.error("{} - was unable to send heartbeat!", PREFIX + appPathIdentifier, e);
- return false;
- }
- }
renew 方法在 HeartbeatThread 线程中被调用
- /**
- * The heartbeat task that renews the lease in the given intervals.
- */
- private class HeartbeatThread implements Runnable {
- public void run() {
- if (renew()) {
- lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
- }
- }
- }
HeartbeatThread 线程在 initScheduledTasks 方法中被调用
- private void initScheduledTasks() {
- ...
- // Heartbeat timer
- scheduler.schedule(
- new TimedSupervisorTask(
- "heartbeat",
- scheduler,
- heartbeatExecutor,
- renewalIntervalInSecs,
- TimeUnit.SECONDS,
- expBackOffBound,
- new HeartbeatThread()
- ),
- renewalIntervalInSecs, TimeUnit.SECONDS);
- // InstanceInfo replicator
- instanceInfoReplicator = new InstanceInfoReplicator(
- this,
- instanceInfo,
- clientConfig.getInstanceInfoReplicationIntervalSeconds(),
- 2); // burstSize
- ...
- }
initScheduledTasks 是在 DiscoveryClient 构造函数初始化过程中被调用.
通过 register 方法和 renew 方法的分析, 相信大家已经摸索出 eureka 代码的套路, 其他的方法, 大家可以自己去深入理解了.
参考资料
- http://xujin.org/categories/Spring-Cloud-Eureka/
- http://blog.csdn.net/jenny8080/article/details/52448403
- https://github.com/Netflix/eureka
- https://github.com/spring-cloud/spring-cloud-netflix/tree/v1.2.2.RELEASE
- https://springcloud.cc/
来源: https://blog.csdn.net/mrzhangxl/article/details/75174214