【一起学源码-微服务】Nexflix Eureka 源码九:服务续约源码分析

前言

前情回顾

上一讲 我们讲解了服务发现的相关逻辑,所谓服务发现 其实就是注册表抓取,服务实例默认每隔30s去注册中心抓取一下注册表增量数据,然后合并本地注册表数据,最后有个hash对比的操作。

本讲目录

今天主要是看下服务续约的逻辑,服务续约就是client端给server端发送心跳检测,告诉对方我还活着。现在很多分布式系统都会有心跳检查的机制,这里一起来学习下Eureka是怎么做心跳检查的。

目录如下:

  1. client端心跳检查调度任务
  2. server端接收心跳检查,设置最后renew时间

这一讲内容不太多,因为上一篇文章写全量和增量注册表信息内容有点多,所以这里将博客尽量一篇保持一个知识点,后面还会讲服务实例下线、摘除、注册中心自我保护等机制的实现原理。

说明

原创不易,如若转载 请标明来源:一枝花算不算浪漫

源码分析

client端心跳检查调度任务

服务实例续约代码比较简单,这里还是从DiscovertClient.java 开始,很多源码的入口都是在这里,因为client端初始化、注册 都是走的这里,因为前几篇文章对这个类已经分析很多了,这里只截取部分重要代码:

DiscovertClient.java 初始化后 会继续初始化一些调度任务:

private void initScheduledTasks() {
    if (clientConfig.shouldRegisterWithEureka()) {
        //  默认也是30s
        int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
        int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
        logger.info("Starting heartbeat executor: " + "renew interval is: " + renewalIntervalInSecs);

        // Heartbeat timer
        // 执行heartbeatExecutor心跳检查,默认是30s
        scheduler.schedule(
                new TimedSupervisorTask(
                        "heartbeat",
                        scheduler,
                        heartbeatExecutor,
                        renewalIntervalInSecs,
                        TimeUnit.SECONDS,
                        expBackOffBound,
                        new HeartbeatThread()
                ),
                renewalIntervalInSecs, TimeUnit.SECONDS);

        // 执行线程
        instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
    } else {
        logger.info("Not registering with Eureka server per configuration");
    }
}

private class HeartbeatThread implements Runnable {

    public void run() {
        if (renew()) {
            lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
        }
    }
}

boolean renew() {
    EurekaHttpResponse<InstanceInfo> httpResponse;
    try {
        httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
        logger.debug("{} - Heartbeat status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode());
        if (httpResponse.getStatusCode() == 404) {
            REREGISTER_COUNTER.increment();
            logger.info("{} - Re-registering apps/{}", PREFIX + appPathIdentifier, instanceInfo.getAppName());
            long timestamp = instanceInfo.setIsDirtyWithTime();
            boolean success = register();
            if (success) {
                instanceInfo.unsetIsDirty(timestamp);
            }
            return success;
        }
        return httpResponse.getStatusCode() == 200;
    } catch (Throwable e) {
        logger.error("{} - was unable to send heartbeat!", PREFIX + appPathIdentifier, e);
        return false;
    }
}

public EurekaHttpResponse<InstanceInfo> sendHeartBeat(String appName, String id, InstanceInfo info, InstanceStatus overriddenStatus) {
    String urlPath = "apps/" + appName + '/' + id;
    Response response = null;
    try {
        WebTarget webResource = jerseyClient.target(serviceUrl)
                .path(urlPath)
                .queryParam("status", info.getStatus().toString())
                .queryParam("lastDirtyTimestamp", info.getLastDirtyTimestamp().toString());
        if (overriddenStatus != null) {
            webResource = webResource.queryParam("overriddenstatus", overriddenStatus.name());
        }
        Builder requestBuilder = webResource.request();
        addExtraProperties(requestBuilder);
        addExtraHeaders(requestBuilder);
        requestBuilder.accept(MediaType.APPLICATION_JSON_TYPE);
        response = requestBuilder.put(Entity.entity("{}", MediaType.APPLICATION_JSON_TYPE)); // Jersey2 refuses to handle PUT with no body
        EurekaHttpResponseBuilder<InstanceInfo> eurekaResponseBuilder = anEurekaHttpResponse(response.getStatus(), InstanceInfo.class).headers(headersOf(response));
        if (response.hasEntity()) {
            eurekaResponseBuilder.entity(response.readEntity(InstanceInfo.class));
        }
        return eurekaResponseBuilder.build();
    } finally {
        if (logger.isDebugEnabled()) {
            logger.debug("Jersey2 HTTP PUT {}/{}; statusCode={}", serviceUrl, urlPath, response == null ? "N/A" : response.getStatus());
        }
        if (response != null) {
            response.close();
        }
    }
}

这里的流程很简单,初始化DiscoveryClient 后会新建一个调度任务,然后执行HeartbeatThread中的run方法,默认是renewalIntervalInSecs 30s执行一次。
具体就是给Server端发送一个http请求,类似于:http://localhost:8080/v2/apps/ServiceA/i-000000-1, 走的是put请求。
最后拿到响应结果,续约成功后会更新lastSuccessfulHeartbeatTimestamp 最近成功心跳检测的时间戳。

server端接收心跳检查请求

前几篇文章已经说过,Server端接收http请求的入口在eureka-core模块下的 resource包里面,这里直接找到ApplicationResource.java中的getInstanceInfo 方法,这里直接请求的InstanceResource 类的构造方法,找到这个方法中的@PUT请求。可以直接看下代码:

InstanceResource.renewLease +AbstractInstanceRegistry.renew 方法:

@PUT
public Response renewLease(
        @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication,
        @QueryParam("overriddenstatus") String overriddenStatus,
        @QueryParam("status") String status,
        @QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) {
    boolean isFromReplicaNode = "true".equals(isReplication);
    boolean isSuccess = registry.renew(app.getName(), id, isFromReplicaNode);

    // 省略部分代码

    logger.debug("Found (Renew): {} - {}; reply status={}" + app.getName(), id, response.getStatus());
    return response;
}

public boolean renew(String appName, String id, boolean isReplication) {
    RENEW.increment(isReplication);
    Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
    Lease<InstanceInfo> leaseToRenew = null;
    if (gMap != null) {
        leaseToRenew = gMap.get(id);
    }
    if (leaseToRenew == null) {
        RENEW_NOT_FOUND.increment(isReplication);
        logger.warn("DS: Registry: lease doesn't exist, registering resource: {} - {}", appName, id);
        return false;
    } else {
        InstanceInfo instanceInfo = leaseToRenew.getHolder();
        if (instanceInfo != null) {
            // touchASGCache(instanceInfo.getASGName());
            InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(
                    instanceInfo, leaseToRenew, isReplication);
            if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) {
                logger.info("Instance status UNKNOWN possibly due to deleted override for instance {}"
                        + "; re-register required", instanceInfo.getId());
                RENEW_NOT_FOUND.increment(isReplication);
                return false;
            }
            if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) {
                Object[] args = {
                        instanceInfo.getStatus().name(),
                        instanceInfo.getOverriddenStatus().name(),
                        instanceInfo.getId()
                };
                logger.info(
                        "The instance status {} is different from overridden instance status {} for instance {}. "
                                + "Hence setting the status to overridden status", args);
                instanceInfo.setStatusWithoutDirty(overriddenInstanceStatus);
            }
        }
        renewsLastMin.increment();
        leaseToRenew.renew();
        return true;
    }
}

这里主要看renew方法, 这里看到registry 是一个注册表,通过appName获取对应的服务注册表信息。

这里主要还是看leaseToRenew.renew() 其实很简单,就是设置当前示例注册表的renew属性的lastUpdateTimestamp 为最新时间+duration。

至于这里的duration 我们下一讲会详细讲解,duration 和服务实例摘除有关。

总结

(1)DiscoveryClient初始化的时候,会去调度一堆定时任务,其中有一个就是HeartbeatThread,心跳线程

(2)在这里可以看到,默认是每隔30秒去发送一次心跳,每隔30秒执行一次HeartbeatTHread线程的逻辑,发送心跳

(3)这边的话就是去发送这个心跳,走的是EurekaHttpClient的sendHeartbeat()方法,http://localhost:8080/v2/apps/ServiceA/i-000000-1,走的是put请求

(4)负责承接服务实例的心跳相关的这些操作的,是ApplicationsResource,服务相关的controller。找到ApplicationResource,再次找到InstanceResource,通过PUT请求,可以找到renewLease方法。

(5)通过注册表的renew()方法,进去完成服务续约,实际进入AbstractInstanceRegistry的renew()方法

(6)从注册表的map中,根据服务名和实例id,获取一个Lease,实际的服务续约的逻辑,其实就是在Lease对象中,更新一下lastUpdateTimestamp这个时间戳,每次续约,就更新一下这个时间戳就ok了。

申明

本文章首发自本人博客:https://www.cnblogs.com/wang-meng 和公众号:壹枝花算不算浪漫,如若转载请标明来源!

感兴趣的小伙伴可关注个人公众号:壹枝花算不算浪漫

原文地址:https://www.cnblogs.com/wang-meng/p/12122968.html

时间: 2024-10-11 23:45:40

【一起学源码-微服务】Nexflix Eureka 源码九:服务续约源码分析的相关文章

【第一章】 服务治理(Eureka)

Spring Cloud是一系列框架的集合,其基于Spring Boot的开发便利性巧妙地简化了分布式系统基础设施的开发,构建了服务治理(发现注册).配置中心.消息总线.负载均衡.断路器.数据监控.分布式会话和集群状态管理等功能,为我们提供一整套企业级分布式云应用的完美解决方案. Spring Cloud包含了多个子项目(针对分布式系统中涉及的多个不同开源产品),比如:Spring Cloud Config.Spring Cloud Netflix.Spring Cloud CloudFound

【一起学源码-微服务】Nexflix Eureka 源码十三:Eureka源码解读完结撒花篇~!

前言 想说的话 [一起学源码-微服务-Netflix Eureka]专栏到这里就已经全部结束了. 实话实说,从最开始Eureka Server和Eureka Client初始化的流程还是一脸闷逼,到现在Eureka各种操作都了然于心了. 本专栏从12.17开始写,一直到今天12.30(文章在平台是延后发布的),这将近半个月的时间确实收获很多.每天都会保持一定的时间学习,只要肯下功夫,没有学不会的东西. 2020年将继续保持学习的节奏,自己定的目标是把spring cloud几个重要的组件都学一遍

Eureka服务续约(Renew)源码分析

主要对Eureka的Renew(服务续约),从服务提供者发起续约请求开始分析,通过阅读源码和画时序图的方式,展示Eureka服务续约的整个生命周期.服务续约主要是把服务续约的信息更新到自身的Eureka Server中,然后再同步到其它Eureka Server中. Renew(服务续约)操作由Service Provider定期调用,类似于heartbeat.目的是隔一段时间Service Provider调用接口,告诉Eureka Server它还活着没挂,不要把它T了.通俗的说就是它们两之

Spring Cloud Eureka 分布式开发之服务注册中心、负载均衡、声明式服务调用实现

介绍 本示例主要介绍 Spring Cloud 系列中的 Eureka,使你能快速上手负载均衡.声明式服务.服务注册中心等 Eureka Server Eureka 是 Netflix 的子模块,它是一个基于 REST 的服务,用于定位服务,以实现云端中间层服务发现和故障转移. 服务注册和发现对于微服务架构而言,是非常重要的.有了服务发现和注册,只需要使用服务的标识符就可以访问到服务,而不需要修改服务调用的配置文件.该功能类似于 Dubbo 的注册中心,比如 Zookeeper. Eureka

【一起学源码-微服务】Nexflix Eureka 源码十:服务下线及实例摘除,一个client下线到底多久才会被其他实例感知?

前言 前情回顾 上一讲我们讲了 client端向server端发送心跳检查,也是默认每30钟发送一次,server端接收后会更新注册表的一个时间戳属性,然后一次心跳(续约)也就完成了. 本讲目录 这一篇有两个知识点及一个疑问,这个疑问是在工作中真真实实遇到过的. 例如我有服务A.服务B,A.B都注册在同一个注册中心,当B下线后,A多久能感知到B已经下线了呢? 不知道大家有没有这个困惑,这篇文章最后会对此问题答疑,如果能够看到文章的结尾,或许你就知道答案了,当然答案也会在结尾揭晓. 目录如下: C

【一起学源码-微服务】Nexflix Eureka 源码十一:EurekaServer自我保护机制竟然有这么多Bug?

前言 前情回顾 上一讲主要讲了服务下线,已经注册中心自动感知宕机的服务. 其实上一讲已经包含了很多EurekaServer自我保护的代码,其中还发现了1.7.x(1.9.x)包含的一些bug,但这些问题在master分支都已修复了. 服务下线会将服务实例从注册表中删除,然后放入到recentQueue中,下次其他EurekaClient来进行注册表抓取的时候就能感知到对应的哪些服务下线了. 自动感知服务实例宕机不会调用下线的逻辑,所以我们还抛出了一个问题,一个client宕机,其他的client

【一起学源码-微服务】Nexflix Eureka 源码三:EurekaServer启动之EurekaServer上下文EurekaClient创建

前言 上篇文章已经介绍了 Eureka Server 环境和上下文初始化的一些代码,其中重点讲解了environment初始化使用的单例模式,以及EurekaServerConfigure基于接口对外暴露配置方法的设计方式.这一讲就是讲解Eureka Server上下文初始化剩下的内容:Eureka Client初始化. 如若转载 请标明来源:一枝花算不算浪漫 EurekaServer上下文构建之Client EurekaClientConfigure创建过程 因为eurekaSever是集群部

springcloud 项目源码 微服务 分布式 Activiti6 工作流 vue.js html 跨域 前后分离

1.代码生成器: [正反双向](单表.主表.明细表.树形表,快速开发利器)freemaker模版技术 ,0个代码不用写,生成完整的一个模块,带页面.建表sql脚本.处理类.service等完整模块2.多数据源:(支持同时连接无数个数据库,可以不同的模块连接不同数的据库)支持N个数据源3.阿里数据库连接池druid,安全权限框架 shiro(菜单权限和按钮权限), 缓存框架 ehcache4.代码编辑器,在线模版编辑,仿开发工具编辑器5.调用摄像头拍照 自定义裁剪编辑头像,头像图片色度调节6.we

springcloud微服务实战:Eureka+Zuul+Ribbon+Hystrix+SpringConfig

原文地址:http://blog.csdn.net/yp090416/article/details/78017552 springcloud微服务实战:Eureka+Zuul+Ribbon+Hystrix+SpringConfig 相信现在已经有很多小伙伴已经或者准备使用springcloud微服务了,接下来为大家搭建一个微服务框架,后期可以自己进行扩展.会提供一个小案例: 服务提供者和服务消费者 ,消费者会调用提供者的服务,新建的项目都是用springboot,附源码下载. coding仓库