Spring Cloud进阶篇之Eureka原理分析

前言

之前写了几篇Spring Cloud的小白教程,相信看过的朋友对Spring Cloud中的一些应用有了简单的了解,写小白篇的目的就是为初学者建立一个基本概念,让初学者在学习的道路上建立一定的基础。

从今天开始,我会持续更新几篇Spring Cloud的进阶教程。

Eureka简介

Eureka是Netflix开发的服务发现框架,本身就是一个基于REST的服务。Spring Cloud将它集成在其子项目spring-cloud-netflix中,用来实现服务的注册与发现功能。

Eureka总体架构图

Eureka组件介绍

  • 服务注册中心集群

分别部署在IDC1、IDC2、IDC3中心

  • 服务提供者

服务提供者一个部署在IDC1,一个部署在IDC3

  • 服务消费者

服务消费者一个部署在IDC1,一个部署在IDC2

组件之间的调用关系

服务提供者

  • 启动服务:服务提供者会向服务注册中心发起Register请求,注册服务。
  • 运行过程中:服务提供者会定时向注册中心发送Renew心跳,告诉它“我还活着”。
  • 停止服务提供:服务提供者会向服务注册中心发送Cancel请求,告诉它清空当前服务注册信息。

服务消费者

  • 启动后:从服务注册中心拉取服务注册信息。
  • 运行过程中:定时更新服务注册信息。
  • 发起远程调用
  • - 服务消费者会从服务注册中心选择同机房的服务提供者,然后发起远程调用,只有同机房的服务提供者宕机才会去选择其他机房的服务提供者。
  • 如果服务消费者发现同机房没有服务提供者,则会按照负载均衡算法 选择其他机房的服务提供者,然后发起远程调用。

注册中心

  • 启动后:从其他节点拉取服务注册信息
  • 运行过程中:
  • - 定时运行Evict任务,定时清理没有按时发送Renew的服务提供者,这里的清理会将非常正常停止、网络异常等其他因素引起的所有服务。
  • 接收到的Register、Renew、Cancel请求,都会同步到其他的注册中心节点。

Eureka Server会通过Register、Renew、Get Registry等接口提供服务的注册、发现和心跳检测等。

Eureka Client是一个java客户端,用于简化与Eureka Server的交互,客户端本身也内置了负载均衡器(默认使用round-robin方式),在启动后会向Eureka Server发送心跳检测,默认周期为30s,Eureka Server如果在多个心跳周期内没有接收到Eureka client的某一个节点的心跳请求,Eureka Server会从服务注册中心清理到对应的Eureka Client的服务节点(默认90s)。

数据结构

服务存储的数据结构可以简单的理解为是一个两层的HashMap结构(为了保证线程安全使用的ConcurrentHashMap),具体的我们可以查看源码中的AbstractInstanceRegistry类:

private final ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> registry = new ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>();

第一层ConcurrentHashMap的key=spring.application.name,也就是应用名称,value为ConcurrentHashMap。

第二层ConcurrentHashMap的key=instanceId,也就是服务的唯一实例id,value为Lease对象,也就是具体的服务。Lease其实就是对InstanceInfo的包装,里面保存了实例信息、服务注册的时间等。具体的我们可以查看InstanceInfo源码。

数据存储过程

Eureka是通过REST接口对外提供服务的。

这里我以注册为例(ApplicationResource),首先将PeerAwareInstanceRegistry的实例注入到ApplicationResource的成员变量的registry里。

  • ApplicationResource接收到请求后,对调用registry.register()方法。
@POST    @Consumes({"application/json", "application/xml"})    public Response addInstance(InstanceInfo info,                                @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {        logger.debug("Registering instance {} (replication={})", info.getId(), isReplication);        // validate that the instanceinfo contains all the necessary required fields        if (isBlank(info.getId())) {            return Response.status(400).entity("Missing instanceId").build();        } else if (isBlank(info.getHostName())) {            return Response.status(400).entity("Missing hostname").build();        } else if (isBlank(info.getIPAddr())) {            return Response.status(400).entity("Missing ip address").build();        } else if (isBlank(info.getAppName())) {            return Response.status(400).entity("Missing appName").build();        } else if (!appName.equals(info.getAppName())) {            return Response.status(400).entity("Mismatched appName, expecting " + appName + " but was " + info.getAppName()).build();        } else if (info.getDataCenterInfo() == null) {            return Response.status(400).entity("Missing dataCenterInfo").build();        } else if (info.getDataCenterInfo().getName() == null) {            return Response.status(400).entity("Missing dataCenterInfo Name").build();        }

        // handle cases where clients may be registering with bad DataCenterInfo with missing data        DataCenterInfo dataCenterInfo = info.getDataCenterInfo();        if (dataCenterInfo instanceof UniqueIdentifier) {            String dataCenterInfoId = ((UniqueIdentifier) dataCenterInfo).getId();            if (isBlank(dataCenterInfoId)) {                boolean experimental = "true".equalsIgnoreCase(serverConfig.getExperimental("registration.validation.dataCenterInfoId"));                if (experimental) {                    String entity = "DataCenterInfo of type " + dataCenterInfo.getClass() + " must contain a valid id";                    return Response.status(400).entity(entity).build();                } else if (dataCenterInfo instanceof AmazonInfo) {                    AmazonInfo amazonInfo = (AmazonInfo) dataCenterInfo;                    String effectiveId = amazonInfo.get(AmazonInfo.MetaDataKey.instanceId);                    if (effectiveId == null) {                        amazonInfo.getMetadata().put(AmazonInfo.MetaDataKey.instanceId.getName(), info.getId());                    }                } else {                    logger.warn("Registering DataCenterInfo of type {} without an appropriate id", dataCenterInfo.getClass());                }            }        }

        registry.register(info, "true".equals(isReplication));        return Response.status(204).build();  // 204 to be backwards compatible    }
  • AbstractInstanceRegistry在register方法里完成对服务信息的存储。
public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {        try {            read.lock();            Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());            REGISTER.increment(isReplication);            if (gMap == null) {                final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();                gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);                if (gMap == null) {                    gMap = gNewMap;                }            }            Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());            // Retain the last dirty timestamp without overwriting it, if there is already a lease            if (existingLease != null && (existingLease.getHolder() != null)) {                Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();                Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();                logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);

                // this is a > instead of a >= because if the timestamps are equal, we still take the remote transmitted                // InstanceInfo instead of the server local copy.                if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {                    logger.warn("There is an existing lease and the existing lease‘s dirty timestamp {} is greater" +                            " than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);                    logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");                    registrant = existingLease.getHolder();                }            } else {                // The lease does not exist and hence it is a new registration                synchronized (lock) {                    if (this.expectedNumberOfClientsSendingRenews > 0) {                        // Since the client wants to register it, increase the number of clients sending renews                        this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews + 1;                        updateRenewsPerMinThreshold();                    }                }                logger.debug("No previous lease information found; it is new registration");            }            Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);            if (existingLease != null) {                lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());            }            gMap.put(registrant.getId(), lease);            synchronized (recentRegisteredQueue) {                recentRegisteredQueue.add(new Pair<Long, String>(                        System.currentTimeMillis(),                        registrant.getAppName() + "(" + registrant.getId() + ")"));            }            // This is where the initial state transfer of overridden status happens            if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {                logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "                                + "overrides", registrant.getOverriddenStatus(), registrant.getId());                if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {                    logger.info("Not found overridden id {} and hence adding it", registrant.getId());                    overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());                }            }            InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());            if (overriddenStatusFromMap != null) {                logger.info("Storing overridden status {} from map", overriddenStatusFromMap);                registrant.setOverriddenStatus(overriddenStatusFromMap);            }

            // Set the status based on the overridden status rules            InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);            registrant.setStatusWithoutDirty(overriddenInstanceStatus);

            // If the lease is registered with UP status, set lease service up timestamp            if (InstanceStatus.UP.equals(registrant.getStatus())) {                lease.serviceUp();            }            registrant.setActionType(ActionType.ADDED);            recentlyChangedQueue.add(new RecentlyChangedItem(lease));            registrant.setLastUpdatedTimestamp();            invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());            logger.info("Registered instance {}/{} with status {} (replication={})",                    registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);        } finally {            read.unlock();        }    }

从源码中不难看出存储的数据结构是双层的HashMap。

Eureka还实现了二级缓存来保证即将对外传输的服务信息,

  • 一级缓存:本质还是HashMap,没有过期时间,保存服务信息的对外输出的数据结构。

    private final ConcurrentMap<Key, Value> readOnlyCacheMap = new ConcurrentHashMap<Key, Value>();
  • 二级缓存:是guava的缓存,包含失效机制,保存服务信息的对外输出的数据结构。
    private final LoadingCache<Key, Value> readWriteCacheMap;
  • 缓存的更新:
  • 删除二级缓存:
  • - client端发送register、renew、cancel请求并更新register注册表之后会删除二级缓存;
  • server端自身的Evict任务剔除服务后会删除二级缓存;
  • 二级缓存本事设置的失效机制(指的是guava实现的readWriteCacheMap),
  • 加载二级缓存:
  • - client发送Get registry请求后,如果二级缓存中没有,就会触发guava的load机制,从registry中获取原始的服务信息后进行加工处理,然后放入二级缓存中;
  • server端更新一级缓存的时候,如果二级缓存没有数据也会触发guava的load机制;
  • 更新一级缓存:
  • - server端内置了一个time task会定时将二级缓存中的数据同步到一级缓存中,这其中包括了删除和更新。

缓存的机制可以查看ResponseCacheImpl源码。

Eureka的数据结构简单总结为:

服务注册机制

服务注册中心、服务提供者、服务消费者在启动后都会向服务注册中心发起注册服务的请求(前提是配置了注册服务)。

注册中心接到register请求后:

  • 将服务信息保存到registry中;
  • 更新队列,将该事件添加到更新队列中,给Eureka client增量同步服务信息使用;
  • 清空二级缓存,用于保证数据的一致性;(即清空的是:readWriteCacheMap
  • 更新阈值;
  • 同步服务信息;

服务续约

服务注册后,要定时发送续约请求(心跳检查),证明我还活着,不要清空我的服务信息,定时时间默认30s,可以通过配置:eureka.instance.lease-renewal-interval-in-seconds来修改。

注册中心接收到续约请求后(renew):

  • 更新服务对象的最近续约时间(lastUpdateTimestamp);
  • 将信息同步给其他的节点;

服务注销

正常的服务停止之前会发送注销服务请求,通知注册中心我要下线了。

注册中心接收到注销请求后(cancel):

  • 将服务信息从registry中删除;
  • 更新队列;
  • 清空二级缓存;
  • 更新阈值;
  • 同步信息给其他节点;

说明:只有服务正常停止才会发送cancel请求,非正常停止的会通过Eureka Server的主动剔除机制进行删除。

服务剔除

服务剔除其实是一个兜底的方案,目的就是解决非正常情况下的服务宕机或其他因素导致不能发送cancel请求的服务信息清理的策略。

服务剔除分为:

  • 判断剔除条件
  • 找出过期服务
  • 清理过期服务

剔除条件:

  • 关闭自我保护
  • 自我保护如果开启,会先判断是server还是client出现问题,如果是client的问题就会进行删除;

自我保护机制:Eureka的自我保护机制是为了防止误杀服务提供的一种保护机制。Eureka的自我保护机制认为如果有大量的服务都续约失败,则认为自己出现了问题(例如:自己断网了),也就不剔除了。反之,则是它人的问题,就进行剔除。

自我保护的阈值分为server和client,如果超出阈值就是表示大量服务可用,部分服务不可用,这判定为client端出现问题。如果未超出阈值就是表示大量服务不可用,则判定是自己出现了问题。

阈值的计算:

  • 自我保护阈值 = 服务总数 * 每分钟续约数 * 自我保护阈值因子;
  • 每分钟续约数 = (60s / 客户端续约时间);

过期服务:

找出过期服务会遍历所有的服务,判断上次续约时间距离当前时间大于阈值就标记为过期,同时会将这些过期的服务保存的过期的服务集合中。

剔除服务:

剔除服务之前会先计算要是剔除的服务数量,然后遍历过期服务,通过洗牌算法确保每次都公平的选择出要剔除的服务,然后进行剔除。

执行剔除服务后:

  • 从register中删除服务信息;
  • 更新队列;
  • 清空二级缓存,保证数据的一致性;

服务获取

Eureka Client服务的获取都是从缓存中获取,如果缓存中没有,就加载数据到缓存中,然后在从缓存中取。服务的获取方式分为全量同步和增量同步两种。

registry中只保存数据结构,缓存中存ready的服务信息

  • 先读取一级缓存
  • 先判断是否开启一级缓存
  • 如果开启一级缓存,就从一级缓存中取,如果一级缓存中没有,则从二级缓存中取;
  • 如果没有开启一级缓存,则直接从二级缓存中取;
  • 再读取二级缓存
  • 如果二级缓存中存在,则直接返回;
  • 如果二级缓存中不存在,则先将数据加载到二级缓存中,然后再读取二级缓存中的数据。

注意:加载二级缓存的时候需要判断是全量还是增量,如果是增量的话,就从recentlyChangedQueue中加载,如果是全量的话就从registry中加载。

服务同步

服务同步是Server节点之间的数据同步。分为启动时同步,运行时同步。

  • 启动同步

启动同步时,会先遍历Applications中获取的服务信息,并将服务信息注册到registry中。可以参考PeerAwareInstanceRegistryImpl类中的syncUp方法:

public int syncUp() {       // Copy entire entry from neighboring DS node       int count = 0;

       for (int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); i++) {           if (i > 0) {               try {                   Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs());               } catch (InterruptedException e) {                   logger.warn("Interrupted during registry transfer..");                   break;               }           }           Applications apps = eurekaClient.getApplications();           for (Application app : apps.getRegisteredApplications()) {               for (InstanceInfo instance : app.getInstances()) {                   try {                       if (isRegisterable(instance)) {                           register(instance, instance.getLeaseInfo().getDurationInSecs(), true);                           count++;                       }                   } catch (Throwable t) {                       logger.error("During DS init copy", t);                   }               }           }       }       return count;   }

注意这个方法使用类两层for循环,第一次循环时保证自己已经拉取到服务信息,第二层循环是遍历拉取到服务注册信息。

  • 运行时同步

server端当有reigster、renew、cancel请求进来时,会将这些请求封装到一个task中,然后放到一个队列当中,然后经过一系列的处理后,在放到另一个队列中。 可以查看PeerAwareInstanceRegistryImpl类中的BatchWorkerRunnable类,这里就不再贴源码了。

总结

Eureka的原理接介绍到这里,从整体上看似简单,但实现细节相关复杂。得多看几遍源码才能猜透他们的设计思路。

Eureka作为服务的注册与发现,它实际的设计原则是遵循AP原则,也就是“数据的最终一致性”。现在还有好多公司使用zk、nacos来作为服务的注册中心,后续会简单更新一篇关于服务注册中心的对比,这里就不过多阐述。


  • 写作不易,转载请注明出处,喜欢的小伙伴可以关注公众号查看更多喜欢的文章。
  • 联系方式:[email protected] QQ:95472323

原文地址:https://www.cnblogs.com/fengfujie/p/12037895.html

时间: 2024-10-30 21:41:52

Spring Cloud进阶篇之Eureka原理分析的相关文章

Spring Cloud第二篇 | 使用并认识Eureka注册中心

? 本文是Spring Cloud专栏的第二篇文章,了解前一篇文章内容有助于更好的理解本文: Spring Cloud第一篇 | Spring Cloud前言及其常用组件介绍概览 ?? 一.SpringCloud快速开发入门 SpringCloud是构建在SpringBoot基础之上的 1.创键一个服务提供者(springcloud-service-provider) 1-1.创键提供者类 @RestController @RequestMapping("/provider") pub

spring cloud云服务架构 -eureka 项目

上一篇我们回顾了关于 spring cloud eureka的相关基础知识,现在我们针对于HongHu cloud的eureka项目做以下构建,整个构建的过程很简单,我会将每一步都构建过程记录下来,希望可以帮助到大家: 创建一个名为particle-common-eureka的maven项目,继承particle-commonservice,具体的pom.xml配置文件如下: <?xml version="1.0" encoding="UTF-8"?>

Spring cloud 微服务架构 Eureka篇

1 服务发现 ## 关于服务发现 在微服务架构中,服务发现(Service Discovery)是关键原则之一.手动配置每个客户端或某种形式的约定是很难做的,并且很脆弱.Spring Cloud提供了多种服务发现的实现方式,例如:Eureka.Consul.Zookeeper. Spring Cloud支持得最好的是Eureka,其次是Consul,最次是Zookeeper. 2.创建一个Maven工程(microservice-discovery-eureka),并在pom.xml中加入如下内

Spring Cloud|高可用的Eureka集群服务

Eureka,作为spring cloud的服务发现与注册中心,在整个的微服务体系中,处于核心位置.单一的eureka服务,显然不能满足高可用的实际生产环境,这就要求我们配置一个能够应对各种突发情况,具有较强容灾能力的eureka服务.下面我将以一个较为简单的例子,来描述这种高可用服务的实现原理. 一.服务搭建 New->Project-> 选择spring initialir 如下图: 下一步->选择cloud discovery->eureka server,然后一直下一步就行

整合spring cloud云服务架构 - eureka 基础

在构建项目之前,我们先学习一下eureka,这是官方的讲解,我这边再重新帮大家回顾一下: 服务发现:Eureka客户端服务发现是基于微服务架构的关键原则之一.尝试配置每个客户端或某种形式的约定可能非常困难,可以非常脆弱.Netflix服务发现服务器和客户端是Eureka.可以将服务器配置和部署为高可用性,每个服务器将注册服务的状态复制到其他服务器. 如何包含Eureka客户端要在您的项目中包含Eureka客户端,请使用组org.springframework.cloud和工件ID spring-

从0开始构建你的api网关--Spring Cloud Gateway网关实战及原理解析

API 网关 API 网关出现的原因是微服务架构的出现,不同的微服务一般会有不同的网络地址,而外部客户端可能需要调用多个服务的接口才能完成一个业务需求,如果让客户端直接与各个微服务通信,会有以下的问题: 客户端会多次请求不同的微服务,增加了客户端的复杂性. 存在跨域请求,在一定场景下处理相对复杂. 认证复杂,每个服务都需要独立认证. 难以重构,随着项目的迭代,可能需要重新划分微服务.例如,可能将多个服务合并成一个或者将一个服务拆分成多个.如果客户端直接与微服务通信,那么重构将会很难实施. 某些微

没使用Spring Cloud的版本管理导致Eureka服务无法注册到Eureka服务注册中心

创建了一个Eureka Server的服务注册集群(两个Eureka服务),都能相互注册,写了一个Eureka客户端服务无法注册到服务发现注册中心 注册中心1: 注册中心2: 服务正常: pom依赖文件: <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://

Spring Cloud开发实践 - 02 - Eureka服务和接口定义

服务注册 EurekaServer Eureka服务模块只有三个文件, 分别是pom.xml, application.yml 和 EurekaServerApplication.java, 内容如下 pom.xml spring-boot-maven-plugin: 使用 goal=repackage 可以打包出一个包含所有依赖的fat jarmaven-deploy-plugin: skip=true 表示当执行deploy时, 这个模块不会被提交到maven的repository <?xm

Spring Cloud进阶之路 | 四:服务消费者(feign)

转载请注明作者及出处: 作者:银河架构师 原文链接:https://www.cnblogs.com/luas/p/12111916.html ?feign简介 github说明 Feign is a Java to Http client binder inspired by Retrofit, JAXRS-2.0, and WebSocket. Feign's first goal was reducing the complexity of binding Denominator unifo