Eureka服务续约(Renew)源码分析

主要对Eureka的Renew(服务续约),从服务提供者发起续约请求开始分析,通过阅读源码和画时序图的方式,展示Eureka服务续约的整个生命周期。服务续约主要是把服务续约的信息更新到自身的Eureka Server中,然后再同步到其它Eureka Server中。

Renew(服务续约)操作由Service Provider定期调用,类似于heartbeat。目的是隔一段时间Service Provider调用接口,告诉Eureka Server它还活着没挂,不要把它T了。通俗的说就是它们两之间的心跳检测,避免服务提供者被剔除掉。

Renew源码分析

服务提供者实现细节

服务提供者发发起服务续约的时序图

在com.netflix.discovery.DiscoveryClient.initScheduledTasks()中的1272行,TimedSupervisorTask会定时发起服务续约,代码如下所示:

// Heartbeat timer
  scheduler.schedule(
     new TimedSupervisorTask(
            "heartbeat",
             scheduler,
             heartbeatExecutor,
             renewalIntervalInSecs,
             TimeUnit.SECONDS,
             expBackOffBound,
              new HeartbeatThread()
            ),
  renewalIntervalInSecs, TimeUnit.SECONDS);

2.在com.netflix.discovery.DiscoveryClient中的1393行,有一个HeartbeatThread线程发起续约操作

private class HeartbeatThread implements Runnable {
        public void run() {
            //调用eureka-client中的renew
            if (renew()) {
                lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
            }
        }
}

renew()调用eureka-client-1.4.11.jarcom.netflix.discovery.DiscoveryClient中829行renew()发起PUT Reset请求,

调用com.netflix.eureka.resources.InstanceResource中的renewLease()续约。

Netflix中的Eureka Core实现细节

NetFlix中Eureka Core中的服务续约时序图,如下图所示。

打开com.netflix.eureka.resources.InstanceResource中的106行的renewLease()方法,代码如下:

private final PeerAwareInstanceRegistry registry
@PUT
public Response renewLease(
        @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication,
        @QueryParam("overriddenstatus") String overriddenStatus,
        @QueryParam("status") String status,
        @QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) {
    boolean isFromReplicaNode = "true".equals(isReplication);
    //调用
    boolean isSuccess = registry.renew(app.getName(), id, isFromReplicaNode);
    //其余省略
}

点开registry.renew(app.getName(), id, isFromReplicaNode);我们可以看到,调用了org.springframework.cloud.netflix.eureka.server.InstanceRegistry中的renew()方法,代码如下:

@Override
    public boolean renew(final String appName, final String serverId,
            boolean isReplication) {
        log("renew " + appName + " serverId " + serverId + ", isReplication {}"
                + isReplication);
        List<Application> applications = getSortedApplications();
        for (Application input : applications) {
            if (input.getName().equals(appName)) {
                InstanceInfo instance = null;
                for (InstanceInfo info : input.getInstances()) {
                    if (info.getHostName().equals(serverId)) {
                        instance = info;
                        break;
                    }
                }
                publishEvent(new EurekaInstanceRenewedEvent(this, appName, serverId,
                        instance, isReplication));
                break;
            }
        }
        //调用com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl中的renew方法
        return super.renew(appName, serverId, isReplication);
}

super.renew()看到调用了父类中的com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl420行的renew()方法

public boolean renew(final String appName, final String id, final boolean isReplication) {
        //服务续约成功,
        if (super.renew(appName, id, isReplication)) {
            //然后replicateToPeers同步其它Eureka Server中的数据
            replicateToPeers(Action.Heartbeat, appName, id, null, null, isReplication);
            return true;
        }
        return false;
}

从上面代码中super.renew(appName, id, isReplication)可以看出调用的是com.netflix.eureka.registry.AbstractInstanceRegistry中345行的renew()方法,

    /**
     * Marks the given instance of the given app name as renewed, and also marks whether it originated from
     * replication.
     *
     * @see com.netflix.eureka.lease.LeaseManager#renew(java.lang.String, java.lang.String, boolean)
     */
    public boolean renew(String appName, String id, boolean isReplication) {
        RENEW.increment(isReplication);
        Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
        Lease<InstanceInfo> leaseToRenew = null;
        if (gMap != null) {
            leaseToRenew = gMap.get(id);
        }
        if (leaseToRenew == null) {
            RENEW_NOT_FOUND.increment(isReplication);
            logger.warn("DS: Registry: lease doesn‘t exist, registering resource: {} - {}", appName, id);
            return false;
        } else {
            InstanceInfo instanceInfo = leaseToRenew.getHolder();
            if (instanceInfo != null) {
                // touchASGCache(instanceInfo.getASGName());
                InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(
                        instanceInfo, leaseToRenew, isReplication);
                if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) {
                    logger.info("Instance status UNKNOWN possibly due to deleted override for instance {}"
                            + "; re-register required", instanceInfo.getId());
                    RENEW_NOT_FOUND.increment(isReplication);
                    return false;
                }
                if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) {
                    Object[] args = {
                            instanceInfo.getStatus().name(),
                            instanceInfo.getOverriddenStatus().name(),
                            instanceInfo.getId()
                    };
                    logger.info(
                            "The instance status {} is different from overridden instance status {} for instance {}. "
                                    + "Hence setting the status to overridden status", args);
                    instanceInfo.setStatus(overriddenInstanceStatus);
                }
            }
            renewsLastMin.increment();
            leaseToRenew.renew();
            return true;
        }
    }

其中 leaseToRenew.renew()是调用com.netflix.eureka.lease.Lease中的62行的renew()方法

public void renew() {
    lastUpdateTimestamp = System.currentTimeMillis() + duration;
}

replicateToPeers(Action.Heartbeat, appName, id, null, null, isReplication);

调用自身的replicateToPeers()方法,在com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl中的618行,主要接口实现方式和register基本一致:首先更新自身Eureka Server中服务的状态,再同步到其它Eureka Server中。

private void replicateToPeers(Action action, String appName, String id,
                                  InstanceInfo info /* optional */,
                                  InstanceStatus newStatus /* optional */, boolean isReplication) {
        Stopwatch tracer = action.getTimer().start();
        try {
            if (isReplication) {
                numberOfReplicationsLastMin.increment();
            }
            // If it is a replication already, do not replicate again as this will create a poison replication
            if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
                return;
            }
            // 同步把续约信息同步到其它的Eureka Server中
            for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
                // If the url represents this host, do not replicate to yourself.
                if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
                    continue;
                }
                //根据action做相应操作的同步
                replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
            }
        } finally {
            tracer.stop();
        }
 }
时间: 2024-10-13 20:19:36

Eureka服务续约(Renew)源码分析的相关文章

Dubbo 源码分析 - 集群容错之 LoadBalance

1.简介 LoadBalance 中文意思为负载均衡,它的职责是将网络请求,或者其他形式的负载"均摊"到不同的机器上.避免集群中部分服务器压力过大,而另一些服务器比较空闲的情况.通过负载均衡,可以让每台服务器获取到适合自己处理能力的负载.在为高负载的服务器分流的同时,还可以避免资源浪费,一举两得.负载均衡可分为软件负载均衡和硬件负载均衡.在我们日常开发中,一般很难接触到硬件负载均衡.但软件负载均衡还是能够接触到一些的,比如 Nginx.在 Dubbo 中,也有负载均衡的概念和相应的实现

开源中国 OsChina Android 客户端源码分析(9)下载APK功能

源码中用以下载客户端的类为DownloadService,是一个服务.如果你对android服务不够理解的话,建议先查阅下有关服务的知识点.源码分析如下: 1首先我们先来看下该服务中几个重写的方法: 1.1onCreate()中 首先声明了自定义的绑定器对象,并在自定义的绑定器中添加了几个界面可以访问服务的方法,我们发现在这几个方法中,目前实际用到的start()方法用以开始下载APK,其他的没有用到.获取通知管理器.设置服务为 非前台服务.代码注释中,火蚁表明了不确定性. 其实如果将服务设置为

dubbo源码分析6-telnet方式的管理实现

dubbo源码分析1-reference bean创建 dubbo源码分析2-reference bean发起服务方法调用 dubbo源码分析3-service bean的创建与发布 dubbo源码分析4-基于netty的dubbo协议的server dubbo源码分析5-dubbo的扩展点机制 dubbo提供了telnet的方式,直接用命令查看服务信息等.怎么实现的呢. 1. 编解码器 com.alibaba.dubbo.remoting.transport.netty.NettyCodecA

8.源码分析---从设计模式中看SOFARPC中的EventBus?

我们在前面分析客户端引用的时候会看到如下这段代码: // 产生开始调用事件 if (EventBus.isEnable(ClientStartInvokeEvent.class)) { EventBus.post(new ClientStartInvokeEvent(request)); } 这里用EventBus调用了一下post方法之后就什么也没做了,就方法名来看是发送了一个post请求,也不知道发给谁,到底有什么用. 所以这一节我们来分析一下EventBus这个类的作用. 首先我们来看一下

【一起学源码-微服务】Nexflix Eureka 源码九:服务续约源码分析

前言 前情回顾 上一讲 我们讲解了服务发现的相关逻辑,所谓服务发现 其实就是注册表抓取,服务实例默认每隔30s去注册中心抓取一下注册表增量数据,然后合并本地注册表数据,最后有个hash对比的操作. 本讲目录 今天主要是看下服务续约的逻辑,服务续约就是client端给server端发送心跳检测,告诉对方我还活着.现在很多分布式系统都会有心跳检查的机制,这里一起来学习下Eureka是怎么做心跳检查的. 目录如下: client端心跳检查调度任务 server端接收心跳检查,设置最后renew时间 这

Spring Cloud Eureka服务注册源码分析

Eureka是怎么work的 那eureka client如何将本地服务的注册信息发送到远端的注册服务器eureka server上.通过下面的源码分析,看出Eureka Client的定时任务调用Eureka Server的Reset接口,而Eureka接收到调用请求后会处理服务的注册以及Eureka Server中的数据同步的问题. 服务注册 源码分析,看出服务注册可以认为是Eureka client自己完成,不需要服务本身来关心. Eureka Client的定时任务调用Eureka Se

Eureka 系列(02)服务发现源码分析

Eureka 系列(02)服务发现源码分析 [TOC] 在上一篇文章 Eureka 系列(02)客户端源码分析 中对客户端服务发现与 Eureka 一致性协议: Eureka 是 AP 模型 消息广播: Eureka源码解析 https://blog.csdn.net/u012394095/article/category/9279158 https://blog.csdn.net/u011834741/article/details/54694045 Eureka 集群发现 https://w

Spring Cloud Eureka源码分析---服务注册

本篇我们着重分析Eureka服务端的逻辑实现,主要涉及到服务的注册流程分析. 在Eureka的服务治理中,会涉及到下面一些概念: 服务注册:Eureka Client会通过发送REST请求的方式向Eureka Server注册自己的服务,提供自身的元数据,比如 IP 地址.端口.运行状况指标的URL.主页地址等信息.Eureka Server接收到注册请求后,就会把这些元数据信息存储在一个ConcurrentHashMap中. 服务续约:在服务注册后,Eureka Client会维护一个心跳来持

SpringCloud源码分析(一)--Eureka服务基础知识

一.前言 上两节已经搭建了一个简单的Eureka的服务注册中心和服务提供者或者服务消费者,因为有时候服务消费者也是服务提供者,这两者划分没有那么清楚的界限.本节主要介绍一些跟Eureka相关的知识.了解它们到底有什么特点和功能. 二.Eureka基础知识 本节主要将Eureka分为基础架构和服务治理两个方向描述 2.1.基础架构 在基础架构中,Eureka主要分为服务注册中心,服务提供者和服务消费者. 2.2.服务治理机制 先来看一下很基础的服务治理机制的简单示意图: 根据每一个的通信行为做具体