前言
之前写了几篇 Spring Cloud 的小白教程, 相信看过的朋友对 Spring Cloud 中的一些应用有了简单的了解, 写小白篇的目的就是为初学者建立一个基本概念, 让初学者在学习的道路上建立一定的基础.
从今天开始, 我会持续更新几篇 Spring Cloud 的进阶教程.
Eureka 简介
Eureka 是 Netflix 开发的服务发现框架, 本身就是一个基于 REST 的服务. Spring Cloud 将它集成在其子项目 spring-cloud-netflix 中, 用来实现服务的注册与发现功能.
Eureka 总体架构图
Eureka 组件介绍
服务注册中心集群
分别部署在 IDC1,IDC2,IDC3 中心
服务提供者
服务提供者一个部署在 IDC1, 一个部署在 IDC3
服务消费者
服务消费者一个部署在 IDC1, 一个部署在 IDC2
组件之间的调用关系
服务提供者
启动服务: 服务提供者会向服务注册中心发起 Register 请求, 注册服务.
运行过程中: 服务提供者会定时向注册中心发送 Renew 心跳, 告诉它 "我还活着".
停止服务提供: 服务提供者会向服务注册中心发送 Cancel 请求, 告诉它清空当前服务注册信息.
服务消费者
启动后: 从服务注册中心拉取服务注册信息.
运行过程中: 定时更新服务注册信息.
发起远程调用:
- 服务消费者会从服务注册中心选择同机房的服务提供者, 然后发起远程调用, 只有同机房的服务提供者宕机才会去选择其他机房的服务提供者.
如果服务消费者发现同机房没有服务提供者, 则会按照负载均衡算法 选择其他机房的服务提供者, 然后发起远程调用.
注册中心
启动后: 从其他节点拉取服务注册信息
运行过程中:
- 定时运行 Evict 任务, 定时清理没有按时发送 Renew 的服务提供者, 这里的清理会将非常正常停止, 网络异常等其他因素引起的所有服务.
接收到的 Register,Renew,Cancel 请求, 都会同步到其他的注册中心节点.
Eureka Server 会通过 Register,Renew,Get Registry 等接口提供服务的注册, 发现和心跳检测等.
Eureka Client 是一个 java 客户端, 用于简化与 Eureka Server 的交互, 客户端本身也内置了负载均衡器 (默认使用 round-robin 方式), 在启动后会向 Eureka Server 发送心跳检测, 默认周期为 30s,Eureka Server 如果在多个心跳周期内没有接收到 Eureka client 的某一个节点的心跳请求, Eureka Server 会从服务注册中心清理到对应的 Eureka Client 的服务节点 (默认 90s).
数据结构
服务存储的数据结构可以简单的理解为是一个两层的 HashMap 结构 (为了保证线程安全使用的 ConcurrentHashMap), 具体的我们可以查看源码中的 AbstractInstanceRegistry 类:
private final ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> registry = new ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>();
第一层 ConcurrentHashMap 的 key=spring.application.name, 也就是应用名称, value 为 ConcurrentHashMap.
第二层 ConcurrentHashMap 的 key=instanceId, 也就是服务的唯一实例 id,value 为 Lease 对象, 也就是具体的服务. Lease 其实就是对 InstanceInfo 的包装, 里面保存了实例信息, 服务注册的时间等. 具体的我们可以查看 InstanceInfo 源码.
数据存储过程
Eureka 是通过 REST 接口对外提供服务的.
这里我以注册为例 (ApplicationResource), 首先将 PeerAwareInstanceRegistry 的实例注入到 ApplicationResource 的成员变量的 registry 里.
ApplicationResource 接收到请求后, 对调用 registry.register() 方法.
- @POST
- @Consumes({"application/json", "application/xml"})
- public Response addInstance(InstanceInfo info,
- @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
- logger.debug("Registering instance {} (replication={})", info.getId(), isReplication);
- // validate that the instanceinfo contains all the necessary required fields
- if (isBlank(info.getId())) {
- return Response.status(400).entity("Missing instanceId").build();
- } else if (isBlank(info.getHostName())) {
- return Response.status(400).entity("Missing hostname").build();
- } else if (isBlank(info.getIPAddr())) {
- return Response.status(400).entity("Missing ip address").build();
- } else if (isBlank(info.getAppName())) {
- return Response.status(400).entity("Missing appName").build();
- } else if (!appName.equals(info.getAppName())) {
- return Response.status(400).entity("Mismatched appName, expecting" + appName + "but was" + info.getAppName()).build();
- } else if (info.getDataCenterInfo() == null) {
- return Response.status(400).entity("Missing dataCenterInfo").build();
- } else if (info.getDataCenterInfo().getName() == null) {
- return Response.status(400).entity("Missing dataCenterInfo Name").build();
- }
- // handle cases where clients may be registering with bad DataCenterInfo with missing data
- DataCenterInfo dataCenterInfo = info.getDataCenterInfo();
- if (dataCenterInfo instanceof UniqueIdentifier) {
- String dataCenterInfoId = ((UniqueIdentifier) dataCenterInfo).getId();
- if (isBlank(dataCenterInfoId)) {
- boolean experimental = "true".equalsIgnoreCase(serverConfig.getExperimental("registration.validation.dataCenterInfoId"));
- if (experimental) {
- String entity = "DataCenterInfo of type" + dataCenterInfo.getClass() + "must contain a valid id";
- return Response.status(400).entity(entity).build();
- } else if (dataCenterInfo instanceof AmazonInfo) {
- AmazonInfo amazonInfo = (AmazonInfo) dataCenterInfo;
- String effectiveId = amazonInfo.get(AmazonInfo.MetaDataKey.instanceId);
- if (effectiveId == null) {
- amazonInfo.getMetadata().put(AmazonInfo.MetaDataKey.instanceId.getName(), info.getId());
- }
- } else {
- logger.warn("Registering DataCenterInfo of type {} without an appropriate id", dataCenterInfo.getClass());
- }
- }
- }
- registry.register(info, "true".equals(isReplication));
- return Response.status(204).build(); // 204 to be backwards compatible
- }
AbstractInstanceRegistry 在 register 方法里完成对服务信息的存储.
- public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
- try {
- read.lock();
- Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
- REGISTER.increment(isReplication);
- if (gMap == null) {
- final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
- gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
- if (gMap == null) {
- gMap = gNewMap;
- }
- }
- Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
- // Retain the last dirty timestamp without overwriting it, if there is already a lease
- if (existingLease != null && (existingLease.getHolder() != null)) {
- Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
- Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
- logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
- // this is a > instead of a >= because if the timestamps are equal, we still take the remote transmitted
- // InstanceInfo instead of the server local copy.
- if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
- logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" +
- "than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
- logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");
- registrant = existingLease.getHolder();
- }
- } else {
- // The lease does not exist and hence it is a new registration
- synchronized (lock) {
- if (this.expectedNumberOfClientsSendingRenews > 0) {
- // Since the client wants to register it, increase the number of clients sending renews
- this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews + 1;
- updateRenewsPerMinThreshold();
- }
- }
- logger.debug("No previous lease information found; it is new registration");
- }
- Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
- if (existingLease != null) {
- lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
- }
- gMap.put(registrant.getId(), lease);
- synchronized (recentRegisteredQueue) {
- recentRegisteredQueue.add(new Pair<Long, String>(
- System.currentTimeMillis(),
- registrant.getAppName() + "(" + registrant.getId() + ")"));
- }
- // This is where the initial state transfer of overridden status happens
- if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {
- logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the"
- + "overrides", registrant.getOverriddenStatus(), registrant.getId());
- if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {
- logger.info("Not found overridden id {} and hence adding it", registrant.getId());
- overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());
- }
- }
- InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());
- if (overriddenStatusFromMap != null) {
- logger.info("Storing overridden status {} from map", overriddenStatusFromMap);
- registrant.setOverriddenStatus(overriddenStatusFromMap);
- }
- // Set the status based on the overridden status rules
- InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);
- registrant.setStatusWithoutDirty(overriddenInstanceStatus);
- // If the lease is registered with UP status, set lease service up timestamp
- if (InstanceStatus.UP.equals(registrant.getStatus())) {
- lease.serviceUp();
- }
- registrant.setActionType(ActionType.ADDED);
- recentlyChangedQueue.add(new RecentlyChangedItem(lease));
- registrant.setLastUpdatedTimestamp();
- invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
- logger.info("Registered instance {}/{} with status {} (replication={})",
- registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);
- } finally {
- read.unlock();
- }
- }
从源码中不难看出存储的数据结构是双层的 HashMap.
Eureka 还实现了二级缓存来保证即将对外传输的服务信息,
一级缓存: 本质还是 HashMap, 没有过期时间, 保存服务信息的对外输出的数据结构.
private final ConcurrentMap<Key, Value> readOnlyCacheMap = new ConcurrentHashMap<Key, Value>();
二级缓存: 是 guava 的缓存, 包含失效机制, 保存服务信息的对外输出的数据结构.
private final LoadingCache<Key, Value> readWriteCacheMap;
缓存的更新:
删除二级缓存:
- client 端发送 register,renew,cancel 请求并更新 register 注册表之后会删除二级缓存;
server 端自身的 Evict 任务剔除服务后会删除二级缓存;
二级缓存本事设置的失效机制 (指的是 guava 实现的 readWriteCacheMap),
加载二级缓存:
- client 发送 Get registry 请求后, 如果二级缓存中没有, 就会触发 guava 的 load 机制, 从 registry 中获取原始的服务信息后进行加工处理, 然后放入二级缓存中;
server 端更新一级缓存的时候, 如果二级缓存没有数据也会触发 guava 的 load 机制;
更新一级缓存:
- server 端内置了一个 time task 会定时将二级缓存中的数据同步到一级缓存中, 这其中包括了删除和更新.
缓存的机制可以查看 ResponseCacheImpl 源码.
Eureka 的数据结构简单总结为:
服务注册机制
服务注册中心, 服务提供者, 服务消费者在启动后都会向服务注册中心发起注册服务的请求 (前提是配置了注册服务).
注册中心接到 register 请求后:
将服务信息保存到 registry 中;
更新队列, 将该事件添加到更新队列中, 给 Eureka client 增量同步服务信息使用;
清空二级缓存, 用于保证数据的一致性;(即清空的是: readWriteCacheMap)
更新阈值;
同步服务信息;
服务续约
服务注册后, 要定时发送续约请求 (心跳检查), 证明我还活着, 不要清空我的服务信息, 定时时间默认 30s, 可以通过配置: eureka.instance.lease-renewal-interval-in-seconds 来修改.
注册中心接收到续约请求后 (renew):
更新服务对象的最近续约时间 (lastUpdateTimestamp);
将信息同步给其他的节点;
服务注销
正常的服务停止之前会发送注销服务请求, 通知注册中心我要下线了.
注册中心接收到注销请求后 (cancel):
将服务信息从 registry 中删除;
更新队列;
清空二级缓存;
更新阈值;
同步信息给其他节点;
说明: 只有服务正常停止才会发送 cancel 请求, 非正常停止的会通过 Eureka Server 的主动剔除机制进行删除.
服务剔除
服务剔除其实是一个兜底的方案, 目的就是解决非正常情况下的服务宕机或其他因素导致不能发送 cancel 请求的服务信息清理的策略.
服务剔除分为:
判断剔除条件
找出过期服务
清理过期服务
剔除条件:
关闭自我保护
自我保护如果开启, 会先判断是 server 还是 client 出现问题, 如果是 client 的问题就会进行删除;
自我保护机制: Eureka 的自我保护机制是为了防止误杀服务提供的一种保护机制. Eureka 的自我保护机制认为如果有大量的服务都续约失败, 则认为自己出现了问题 (例如: 自己断网了), 也就不剔除了. 反之, 则是它人的问题, 就进行剔除.
自我保护的阈值分为 server 和 client, 如果超出阈值就是表示大量服务可用, 部分服务不可用, 这判定为 client 端出现问题. 如果未超出阈值就是表示大量服务不可用, 则判定是自己出现了问题.
阈值的计算:
自我保护阈值 = 服务总数 * 每分钟续约数 * 自我保护阈值因子;
每分钟续约数 = (60s / 客户端续约时间);
过期服务:
找出过期服务会遍历所有的服务, 判断上次续约时间距离当前时间大于阈值就标记为过期, 同时会将这些过期的服务保存的过期的服务集合中.
剔除服务:
剔除服务之前会先计算要是剔除的服务数量, 然后遍历过期服务, 通过洗牌算法确保每次都公平的选择出要剔除的服务, 然后进行剔除.
执行剔除服务后:
从 register 中删除服务信息;
更新队列;
清空二级缓存, 保证数据的一致性;
服务获取
Eureka Client 服务的获取都是从缓存中获取, 如果缓存中没有, 就加载数据到缓存中, 然后在从缓存中取. 服务的获取方式分为全量同步和增量同步两种.
registry 中只保存数据结构, 缓存中存 ready 的服务信息
先读取一级缓存
先判断是否开启一级缓存
如果开启一级缓存, 就从一级缓存中取, 如果一级缓存中没有, 则从二级缓存中取;
如果没有开启一级缓存, 则直接从二级缓存中取;
再读取二级缓存
如果二级缓存中存在, 则直接返回;
如果二级缓存中不存在, 则先将数据加载到二级缓存中, 然后再读取二级缓存中的数据.
注意: 加载二级缓存的时候需要判断是全量还是增量, 如果是增量的话, 就从 recentlyChangedQueue 中加载, 如果是全量的话就从 registry 中加载.
服务同步
服务同步是 Server 节点之间的数据同步. 分为启动时同步, 运行时同步.
启动同步
启动同步时, 会先遍历 Applications 中获取的服务信息, 并将服务信息注册到 registry 中. 可以参考 PeerAwareInstanceRegistryImpl 类中的 syncUp 方法:
- public int syncUp() {
- // Copy entire entry from neighboring DS node
- int count = 0;
- for (int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); i++) {
- if (i > 0) {
- try {
- Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs());
- } catch (InterruptedException e) {
- logger.warn("Interrupted during registry transfer..");
- break;
- }
- }
- Applications apps = eurekaClient.getApplications();
- for (Application App : apps.getRegisteredApplications()) {
- for (InstanceInfo instance : App.getInstances()) {
- try {
- if (isRegisterable(instance)) {
- register(instance, instance.getLeaseInfo().getDurationInSecs(), true);
- count++;
- }
- } catch (Throwable t) {
- logger.error("During DS init copy", t);
- }
- }
- }
- }
- return count;
- }
注意这个方法使用类两层 for 循环, 第一次循环时保证自己已经拉取到服务信息, 第二层循环是遍历拉取到服务注册信息.
运行时同步
server 端当有 reigster,renew,cancel 请求进来时, 会将这些请求封装到一个 task 中, 然后放到一个队列当中, 然后经过一系列的处理后, 在放到另一个队列中. 可以查看 PeerAwareInstanceRegistryImpl 类中的 BatchWorkerRunnable 类, 这里就不再贴源码了.
总结
Eureka 的原理接介绍到这里, 从整体上看似简单, 但实现细节相关复杂. 得多看几遍源码才能猜透他们的设计思路.
Eureka 作为服务的注册与发现, 它实际的设计原则是遵循 AP 原则, 也就是 "数据的最终一致性". 现在还有好多公司使用 zk,nacos 来作为服务的注册中心, 后续会简单更新一篇关于服务注册中心的对比, 这里就不过多阐述.
来源: https://www.cnblogs.com/fengfujie/p/12037895.html