深入浅出高性能服务发现、配置框架Nacos系列 3: 服务发现:Nacos客户端初始化流程

上一章节,我们从全局了解了一下Nacos项目的模块架构,做到了心中有数,现在,我们去逐步去挖掘里面的代码细节,很多人在学习开源的时候,无从下手,代码那么多,从哪个地方开始看呢?我们可以从一个接口开始入手,这个接口是你使用过的,知道它大概做什么事,有体感的,大家还记得第一章时,我们写的HelloWorld吗,对,就从里面的接口开始剥洋葱。

这个是Nacos的github代码地址,开始之前先start关注一下,加上watch,后续Nacos的邮件列表也会通知到你,可以关注到Nacos的最新实时消息,及各大牛之间的精彩讨论。

下面这段代码,是第一章节发布一个服务的代码:

public static void main(String[] args) throws NacosException, InterruptedException {
    //发布的服务名
    String serviceName = "helloworld.services";
    //构造一个Nacos实例,入参是Nacos server的ip和服务端口
    NamingService naming = NacosFactory.createNamingService("100.81.0.34:8080");
    //发布一个服务,该服务对外提供的ip为127.0.0.1,端口为8888
    naming.registerInstance(serviceName, "100.81.0.35", 8080);
    Thread.sleep(Integer.MAX_VALUE);
}

其中,第一步,是构造一个Nacos服务实例,构造实例的入参,是一个String,值的规范为ip:port,这个ip,就是我们任意一台Nacos server的地址,我们点进去看这个方法:

public static NamingService createNamingService(String serverAddr) throws NacosException {
        return NamingFactory.createNamingService(serverAddr);
    }

同时我们看下创建配置服务实例的代码:

public static ConfigService createConfigService(String serverAddr) throws NacosException {
   return ConfigFactory.createConfigService(serverAddr);
}

我们可以看到,NacosFactory实际上是一个服务发现和配置管理接口的统一入口,再由它不通的方法,创建不同服务的实例,我们可以直接使用NamingFactory,或者ConfigFactory直接创建Nacos的服务实例,也能work

接下来,看一下,是如何构造出这个Nacos naming实例的:

public static NamingService createNamingService(String serverList) throws NacosException {
   try {
      Class<?> driverImplClass = Class.forName("com.alibaba.nacos.client.naming.NacosNamingService");
      Constructor constructor = driverImplClass.getConstructor(String.class);
      NamingService vendorImpl = (NamingService) constructor.newInstance(serverList);
      return vendorImpl;
   } catch (Throwable e) {
      throw new NacosException(-400, e.getMessage());
   }
}

通过反射实例化出了一个NamingService的实例NacosNamingService,构造器是一个带String入参的,我们顺着往下看,构造函数里面做了哪些事情:

public NacosNamingService(String serverList) {

    this.serverList = serverList;
    init();
    eventDispatcher = new EventDispatcher();
    serverProxy = new NamingProxy(namespace, endpoint, serverList);
    beatReactor = new BeatReactor(serverProxy);
    hostReactor = new HostReactor(eventDispatcher, serverProxy, cacheDir);
}

入参serverList就是我们刚才传入的服务端地址,值赋给了实例的serverList字段,接下来调用了一个init方法,这个方法里面如下:

private void init() {
    namespace = System.getProperty(PropertyKeyConst.NAMESPACE);
    if (StringUtils.isEmpty(namespace)) {
        namespace = UtilAndComs.DEFAULT_NAMESPACE_ID;
    }
    logName = System.getProperty(UtilAndComs.NACOS_NAMING_LOG_NAME);
    if (StringUtils.isEmpty(logName)) {
        logName = "naming.log";
    }
    cacheDir = System.getProperty("com.alibaba.nacos.naming.cache.dir");
    if (StringUtils.isEmpty(cacheDir)) {
        cacheDir = System.getProperty("user.home") + "/nacos/naming/" + namespace;
    }
}

这面做了3件事,给namespace,logName,cacheDir赋值,namespace我们么有传入,默认是default,namespace在Nacos里面的作用,是用来进行本地缓存隔离的,一台机器上,启动一个Nacos的客户端进程,默认的本地缓存路径是default,如果再启动一个,需要重新设置一个namespace,否则就会复用之前的缓存,造成冲突;logName和cacheDir,这2个字段就不解释了,字面理解。这里多说一句,这些值的设置,可以在java启动时,通过系统参数的形式传入,并且是第一优先级的。

init方法执行完之后,接下来是实例化一些框架组件,EventDispatcher,这个是一个经典的事件分发组件,它的工作模式如下:

会有一个单独线程从blockQueue中获取事件,这个事件在Nacos这里, 就是服务端推送下来的数据,listener在我们订阅一条数据时,会从生成一个listener实例,在事件到了队列中,找到对应的listener,去执行里面listener的回调函数onEvent。如果对这个模式不熟悉的同学,可以再翻看下EventDispatcher的代码,这个属于基础知识了,和业务没有关系,这里就不过多详细讲解,篇幅太长。

接下来,实例化了一个NameProxy的组件,这个东西是干嘛的呢?我们看下里面代码:

public NamingProxy(String namespace, String endpoint, String serverList) {
    this.namespace = namespace;
    this.endpoint = endpoint;
    if (StringUtils.isNotEmpty(serverList)) {
        this.serverList = Arrays.asList(serverList.split(","));
        if (this.serverList.size() == 1) {
            this.nacosDomain = serverList;
        }
    }
    executorService = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {
        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r);
            t.setName("com.taobao.vipserver.serverlist.updater");
            t.setDaemon(true);
            return t;
        }
    });
    executorService.scheduleWithFixedDelay(new Runnable() {
        @Override
        public void run() {
            refreshSrvIfNeed();
        }
    }, 0, vipSrvRefInterMillis, TimeUnit.MILLISECONDS);
    refreshSrvIfNeed();
}

这里面逻辑有些多,我总结下,主要是启动了一个线程,每隔30s,去执行refreshSrvIfNeed()这个方法,
refreshSrvIfNeed()这个方法里面,做的事情,是通过一个http请求,去Nacos server获取一串Nacos server集群的地址列表,具体代码如下:

  private void refreshSrvIfNeed() {
        try {
            if (!CollectionUtils.isEmpty(serverList)) {
                LogUtils.LOG.info("server list provided by user: " + serverList);
                return;
            }
            if (System.currentTimeMillis() - lastSrvRefTime < vipSrvRefInterMillis) {
                return;
            }
            List<String> list = getServerListFromEndpoint();
            if (list.isEmpty()) {
                throw new Exception("Can not acquire vipserver list");
            }
            if (!CollectionUtils.isEqualCollection(list, serversFromEndpoint)) {
                LogUtils.LOG.info("SERVER-LIST", "server list is updated: " + list);
            }
            serversFromEndpoint = list;
            lastSrvRefTime = System.currentTimeMillis();
        } catch (Throwable e) {
            LogUtils.LOG.warn("failed to update server list", e);
        }
    }

获取完地址列表后,赋值给serversFromEndpoint,并且记录当前更新时间,在下一次更新时,小于30s,就不更新,避免频繁更新,总的来说,NameProxy的目的就是定时在客户端维护Nacos服务端的最新地址列表。

我们继续往下看,接下来初始化了BeatReactor这个组件,从名字可以猜测,应该是和心跳相关的事情,它初始化的代码如下:

public BeatReactor(NamingProxy serverProxy) {
    this.serverProxy = serverProxy;
    executorService.scheduleAtFixedRate(new BeatProcessor(), 0, clientBeatInterval, TimeUnit.MILLISECONDS);
}

起了一个定时间隔为10s的任务,去执行BeatProcessor里面的逻辑,BeatProcessor的代码里面,是循环的去取当前客户端注册好的实例,然后向服务端发送一个http的心跳通知请求,告诉客户端,这个服务的健康状态,具体代码如下:

class BeatTask implements Runnable {
    BeatInfo beatInfo;
    public BeatTask(BeatInfo beatInfo) {
        this.beatInfo = beatInfo;
    }
    @Override
    public void run() {
        Map<String, String> params = new HashMap<String, String>(2);
        params.put("beat", JSON.toJSONString(beatInfo));
        params.put("dom", beatInfo.getDom());
        try {
            String result = serverProxy.callAllServers(UtilAndComs.NACOS_URL_BASE + "/api/clientBeat", params);
            JSONObject jsonObject = JSON.parseObject(result);

            if (jsonObject != null) {
                clientBeatInterval = jsonObject.getLong("clientBeatInterval");
            }
        } catch (Exception e) {
            LogUtils.LOG.error("CLIENT-BEAT", "failed to send beat: " + JSON.toJSONString(beatInfo), e);
        }
    }
}

这里就是naocs的客户端主动上报服务健康状况的逻辑了,是服务发现功能,比较重要的一个概念,服务健康检查机制,常用的还有服务端主动去探测客户端的接口返回。

最后一步,就是初始化了一个叫HostReactor的实例,我们来看下,它干了些啥:

public HostReactor(EventDispatcher eventDispatcher, NamingProxy serverProxy, String cacheDir) {
    this.eventDispatcher = eventDispatcher;
    this.serverProxy = serverProxy;
    this.cacheDir = cacheDir;
    this.serviceInfoMap = new ConcurrentHashMap<>(DiskCache.read(this.cacheDir));
    this.failoverReactor = new FailoverReactor(this, cacheDir);
    this.pushRecver = new PushRecver(this);
}

第五行,是从缓存文件中加载数据到serviceInfoMap的内存map中,接下来,初始化了一个FailoverReactor的组件,这个是Nacos客户端缓存容灾相关的,它里面的初始化代码如下:

public void init() {
    executorService.scheduleWithFixedDelay(new SwitchRefresher(), 0L, 5000L, TimeUnit.MILLISECONDS);
    executorService.scheduleWithFixedDelay(new DiskFileWriter(), 30, DAY_PERIOD_MINUTES, TimeUnit.MINUTES);
    // backup file on startup if failover directory is empty.
    executorService.schedule(new Runnable() {
        @Override
        public void run() {
            try {
                File cacheDir = new File(failoverDir);

                if (!cacheDir.exists() && !cacheDir.mkdirs()) {
                    throw new IllegalStateException("failed to create cache dir: " + failoverDir);
                }

                File[] files = cacheDir.listFiles();
                if (files == null || files.length <= 0) {
                    new DiskFileWriter().run();
                }
            } catch (Throwable e) {
                LogUtils.LOG.error("NA", "failed to backup file on startup.", e);
            }

        }
    }, 10000L, TimeUnit.MILLISECONDS);
}

初始化了3个定时任务,第一个任务的代码如下:

class SwitchRefresher implements Runnable {
    long lastModifiedMillis = 0L;
    @Override
    public void run() {
        try {
            File switchFile = new File(failoverDir + UtilAndComs.FAILOVER_SWITCH);
            if (!switchFile.exists()) {
                switchParams.put("failover-mode", "false");
                LogUtils.LOG.debug("failover switch is not found, " + switchFile.getName());
                return;
            }
            long modified = switchFile.lastModified();
            if (lastModifiedMillis < modified) {
                lastModifiedMillis = modified;
                String failover = ConcurrentDiskUtil.getFileContent(failoverDir + UtilAndComs.FAILOVER_SWITCH, Charset.defaultCharset().toString());
                if (!StringUtils.isEmpty(failover)) {
                    List<String> lines = Arrays.asList(failover.split(DiskCache.getLineSeperator()));
                    for (String line : lines) {
                        String line1 = line.trim();
                        if ("1".equals(line1)) {
                            switchParams.put("failover-mode", "true");
                            LogUtils.LOG.info("failover-mode is on");
                            new FailoverFileReader().run();
                        } else if ("0".equals(line1)) {
                            switchParams.put("failover-mode", "false");
                            LogUtils.LOG.info("failover-mode is off");
                        }
                    }
                } else {
                    switchParams.put("failover-mode", "false");
                }
            }
        } catch (Throwable e) {
            LogUtils.LOG.error("NA", "failed to read failover switch.", e);
        }
    }
}

首先判定下容灾开关是否有,容灾开关是一个磁盘文件的形式存在,通过容灾开关文件名字,判定容灾开关是否打开,1表示打开,0为关闭,读取到容灾开关后,将值更新到内存中,后续解析地址列表时,首先会判定一下容灾开关是否打开,如果打开了,就读缓存的数据,否则从服务端获取最新数据。

第二个定时任务,做的事情如下:

class DiskFileWriter extends TimerTask {
    public void run() {
        Map<String, ServiceInfo> map = hostReactor.getServiceInfoMap();
        for (Map.Entry<String, ServiceInfo> entry : map.entrySet()) {
            ServiceInfo serviceInfo = entry.getValue();
            if (StringUtils.equals(serviceInfo.getKey(), UtilAndComs.ALL_IPS) || StringUtils.equals(serviceInfo.getName(), UtilAndComs.ENV_LIST_KEY)
                    || StringUtils.equals(serviceInfo.getName(), "00-00---000-ENV_CONFIGS-000---00-00")
                    || StringUtils.equals(serviceInfo.getName(), "vipclient.properties")
                    || StringUtils.equals(serviceInfo.getName(), "00-00---000-ALL_HOSTS-000---00-00")) {
                continue;
            }
            DiskCache.write(serviceInfo, failoverDir);
        }
    }
}

每隔24小时,把内存中所有的服务数据,写一遍到磁盘中,其中需要过滤掉一些非域名数据的特殊数据,具体可看代码中的描述。最后一个定时任务,是每隔10s,是检查缓存目录是否存在,同时如果缓存里面值没有的话,主动触发一次缓存写磁盘的操作。

以上就是客户端构造一个Nacos实例的初始化全部流程,大部分都是在初始化多个线程池或者定时任务,各司其职,这个也是我们写后端程序的一些基本套路,提高系统的并发能力,同时在对任务的分发和执行,引入一些常用的异步编程模型如队列模型的事件分发,这些都是异步和并发的很好学习素材,这2点也是写高性能程序的基本要求。

总结

这一章节,我们通过Nacos的NacosFactory构造一个nacos服务实例作为切入点,把客户端的初始化流程给串了一遍,概述下客户端初始化流程做的几件事:

  • 初始化事件分发组件,用于处理服务端主动通知下来的变更数据
  • 初始化Nacos服务集群地址列表更新组件,用于客户端维护Nacos服务端的最新地址列表
  • 初始化服务健康检查模块,主动给服务端上报服务的健康情况
  • 初始化客户端的缓存,10s检查一次,如果没有,则创建
  • 24小时备份一次客户端的缓存文件
  • 5s检查一次容灾开关,更新到内存中,容灾模式情况下,服务地址读的都是缓存

以上就是Nacos客户端实例初始化的整体流程,是不是感觉做的事情挺多的,还有一些代码的细节点,大家自己多精读一下,如果有什么不明白的,可以留言,或者在社区找@超哥帮你解答,如果能发现bug或者其他建议,可以在社区提issue。

转载请联系:微信(zjjxg2018)

原文地址:http://blog.51cto.com/13995002/2287545

时间: 2024-08-24 02:35:56

深入浅出高性能服务发现、配置框架Nacos系列 3: 服务发现:Nacos客户端初始化流程的相关文章

【开源】OSharp3.3框架解说系列(7.1):初始化流程概述

本文已同步到系列目录:OSharp快速开发框架解说系列 框架初始化 相对于OSharp 3.0,3.3版本最大的更新,就是从框架级别定义了初始化流程,对初始化功能进行了抽象与封装,不依赖于第三方实现,第三方实现仅作为可替换的服务实现方案存在. 例如,依赖注入功能中,接口与其实现类的映射配置,对象容器的构建,对象的解析获取,都将通过框架定义的API来完成,而Autofac,仅作为这些功能的实现方存在,如果不想使用Autofac,则可以很方便的切换成别的IoC组件. 具体的初始化功能是怎样抽象与定义

Nacos系列:Nacos的Java SDK使用

Maven依赖 Nacos提供完整的Java SDK,便于配置管理和服务发现及管理,以 Nacos-0.8.0 版本为例 添加Maven依赖: <dependency> <groupId>com.alibaba.nacos</groupId> <artifactId>nacos-client</artifactId> <version>0.8.0</version> </dependency> 仅仅引入naco

Nacos系列:Nacos的三种部署模式

三种部署模式 Nacos支持三种部署模式 1.单机模式:可用于测试和单机使用,生产环境切忌使用单机模式(满足不了高可用) 2.集群模式:可用于生产环境,确保高可用 3.多集群模式:可用于多数据中心场景 单机模式 启动 Nacos Server Linux:sh startup.sh -m standalone Windows:cmd startup.cmd -m standalone 或 双击 startup.cmd 启动 关闭 Nacos Server Linux:sh shutdown.sh

LINUX服务部分配置【带图的可以在csdn上找 hanbim520账号下载,我做成的是PDF】

REDHAT 5 常用服务配置实例 适用于初学者的朋友 测试时候注意防火墙和selinux哦!! 生活其实很精彩--本文版权保留,任何人不得利用本文获取利益!!! [email protected],欢迎交流,交流才能进步!! 一.YUM服务的配置 1.为什么要首先配置YUM服务器 答:我们都知道,安装软件在装机的时候会加长装机时间,也可能造成物理存储空间的浪费,有些软件在后来 中不一定会用到.然后配置YUM服务器后会非常的方便,一旦需要什么软件(必须是光盘里面自带或者您自 行下载后放入软件库)

深入浅出高性能服务发现、配置框架Nacos系列 1: HelloWorld

Nacos是什么? 引用官方的介绍,他主要提供以下几个功能点: 动态配置服务 服务发现及管理 动态DNS服务 动态配置服务 就是通过一个系统,管理系统中的配置项,在配置项需要更新的时候,可以通过管理系统去操作更新,更新完了之后,会主动推送到订阅了这个配置的客户端 具体的使用场景,例如,在系统中,会有数据库的链接串,账号密码等配置信息,常规的做法是写在配置文件里面,如果需要修改更新,需要重新打包编译,如果你是分布式集群,那成本太大了,通常我们都会将它抽取出来,存放到db,或者一个管理系统,Naco

SpringCloud系列四:Eureka 服务发现框架(定义 Eureka 服务端、Eureka 服务信息、Eureka 发现管理、Eureka 安全配置、Eureka-HA(高可用) 机制、Eureka 服务打包部署)

1.概念:Eureka 服务发现框架 2.具体内容 对于服务发现框架可以简单的理解为服务的注册以及使用操作步骤,例如:在 ZooKeeper 组件,这个组件里面已经明确的描述了一个服务的注册以及发现操作流程,在整个 Rest 架构里面,会存在有大量的微服务的信息. 在 SpringCloud 之中使用了大量的 Netflix 的开源项目,而其中 Eureka 就属于 Netflix 提供的发现服务组件,所有的微服务在使用之中全部向 Eureka 之中进行注册,而后客户端直接利用 Eureka 进

springcloud微服务系列之服务注册与发现组件Eureka

一.Eurake的简介二.使用Eureka进行服务的注册消费1.创建一个服务注册中心2.创建服务的提供者3.创建服务的消费者总结 一.Eurake的简介 今天我们来介绍下springcloud的核心组件Eureka,Eurake是负责微服务架构中服务治理的功能,负责各个服务实例的注册与发现. Eureka包含了服务器端和客户端组件.服务器端,也被称作是服务注册中心,用于提供服务的注册与发现. 客户端组件包含服务消费者与服务生产者.在应用程序运行时,服务生产者向注册中心注册自己的服务实例,当消费者

微服务配置中心实战:Spring + MyBatis + Druid + Nacos

在结合场景谈服务发现和配置中我们讲述了 Nacos 配置中心的三个典型的应用场景,包括如何在 Spring Boot 中使用 Nacos 配置中心将数据库连接信息管控起来,而在"原生"的 Spring 中可以怎么使用 Nacos 配置中心呢?很多基于 Spring MVC 框架的 Web 开发中,Spring + MyBatis + Druid 是一个黄金组合,在此基础上融入 Nacos 配置中心,将会发生什么特别的变化呢? 本文将通过一个用户信息查询示例,演示在 Spring Web

简单RPC框架-基于Consul的服务注册与发现

*:first-child { margin-top: 0 !important; } body>*:last-child { margin-bottom: 0 !important; } /* BLOCKS =============================================================================*/ p, blockquote, ul, ol, dl, table, pre { margin: 15px 0; } /* HEAD