7.Sentinel源码分析—Sentinel是怎么和控制台通信的?

这里会介绍:

  1. Sentinel会使用多线程的方式实现一个类Reactor的IO模型
  2. Sentinel会使用心跳检测来观察控制台是否正常

Sentinel源码解析系列:

1.Sentinel源码分析—FlowRuleManager加载规则做了什么?

2. Sentinel源码分析—Sentinel是如何进行流量统计的?

3. Sentinel源码分析— QPS流量控制是如何实现的?

4.Sentinel源码分析— Sentinel是如何做到降级的?

5.Sentinel源码分析—Sentinel如何实现自适应限流?

6.Sentinel源码分析—Sentinel是如何动态加载配置限流的?



在看我的这篇文章之前大家可以先看一下官方的这篇文章:https://github.com/alibaba/Sentinel/wiki/%E6%8E%A7%E5%88%B6%E5%8F%B0。介绍了控制台怎么使用,以及客户端要怎么设置才能被收集数据。

客户端会在InitExecutor调用doInit方法中与控制台建立通信,所以我们直接看doInit方法:

InitExecutor#doInit

public static void doInit() {
    //InitExecutor只会初始化一次,并且初始化失败会退出
    if (!initialized.compareAndSet(false, true)) {
        return;
    }
    try {
        //通过spi加载InitFunc子类
        ServiceLoader<InitFunc> loader = ServiceLoader.load(InitFunc.class);
        List<OrderWrapper> initList = new ArrayList<OrderWrapper>();
        for (InitFunc initFunc : loader) {
            RecordLog.info("[InitExecutor] Found init func: " + initFunc.getClass().getCanonicalName());
            //给所有的initFunc排序,按@InitOrder从小到大进行排序
            //然后封装成OrderWrapper对象
            insertSorted(initList, initFunc);
        }
        for (OrderWrapper w : initList) {
            w.func.init();
            RecordLog.info(String.format("[InitExecutor] Executing %s with order %d",
                w.func.getClass().getCanonicalName(), w.order));
        }
    } catch (Exception ex) {
        RecordLog.warn("[InitExecutor] WARN: Initialization failed", ex);
        ex.printStackTrace();
    } catch (Error error) {
        RecordLog.warn("[InitExecutor] ERROR: Initialization failed with fatal error", error);
        error.printStackTrace();
    }
}

因为这里我们引入了sentinel-transport-simple-http模块,所以使用spi加载InitFunc的子类的时候会加载三个子类实例,分别是:CommandCenterInitFunc、HeartbeatSenderInitFunc、MetricCallbackInit。
然后会遍历loader,根据@InitOrder的大小进行排序,并封装成OrderWrapper放入到initList中。
所以initList里面的对象顺序是:

  1. CommandCenterInitFunc
  2. HeartbeatSenderInitFunc
  3. MetricCallbackInit
    然后遍历initList依次调用init方法。

所以下面我们来看一下这三个实现类的init方法做了什么:

CommandCenterInitFunc

CommandCenterInitFunc#init

public void init() throws Exception {
    //获取commandCenter对象
    CommandCenter commandCenter = CommandCenterProvider.getCommandCenter();

    if (commandCenter == null) {
        RecordLog.warn("[CommandCenterInitFunc] Cannot resolve CommandCenter");
        return;
    }
    //调用SimpleHttpCommandCenter的beforeStart方法
    //用来设置CommandHandler的实现类
    commandCenter.beforeStart();
    commandCenter.start();
    RecordLog.info("[CommandCenterInit] Starting command center: "
            + commandCenter.getClass().getCanonicalName());
}

这个方法里面的所有操作都是针对CommandCenter来进行的,所以我们先来看看CommandCenterProvider这个类。

CommandCenterProvider

static {
    //初始化commandCenter对象
    resolveInstance();
}

private static void resolveInstance() {
    //获取SpiOrder更大的子类实现类
    CommandCenter resolveCommandCenter = SpiLoader.loadHighestPriorityInstance(CommandCenter.class);

    if (resolveCommandCenter == null) {
        RecordLog.warn("[CommandCenterProvider] WARN: No existing CommandCenter found");
    } else {
        commandCenter = resolveCommandCenter;
        RecordLog.info("[CommandCenterProvider] CommandCenter resolved: " + resolveCommandCenter.getClass()
            .getCanonicalName());
    }
}

CommandCenterProvider会在首次初始化的时候调用resolveInstance方法。在resolveInstance方法里面会调用SpiLoader.loadHighestPriorityInstance来获取CommandCenter,这里获取的是SimpleHttpCommandCenter这个实例,loadHighestPriorityInstance方法具体的实现非常简单,我就不去分析了。
然后将commandCenter赋值SimpleHttpCommandCenter实例。

所以CommandCenterProvider.getCommandCenter()方法返回的是SimpleHttpCommandCenter实例。

然后调用SimpleHttpCommandCenter的beforeStart方法。

SimpleHttpCommandCenter#beforeStart

public void beforeStart() throws Exception {
    // Register handlers
    //调用CommandHandlerProvider的namedHandlers方法
    //获取CommandHandler的spi中设置的实现类
    Map<String, CommandHandler> handlers = CommandHandlerProvider.getInstance().namedHandlers();
    //将handlers中的数据设置到handlerMap中
    registerCommands(handlers);
}

这个方法首先会调用CommandHandlerProvider的namedHandlers中获取所有的CommandHandler实现类。

CommandHandlerProvider#namedHandlers

private final ServiceLoader<CommandHandler> serviceLoader = ServiceLoader.load(CommandHandler.class);

public Map<String, CommandHandler> namedHandlers() {
    Map<String, CommandHandler> map = new HashMap<String, CommandHandler>();
    for (CommandHandler handler : serviceLoader) {
        //获取实现类CommandMapping注解的name属性
        String name = parseCommandName(handler);
        if (!StringUtil.isEmpty(name)) {
            map.put(name, handler);
        }
    }
    return map;
}

这个类会通过spi先加载CommandHandler的实现类,然后将实现类按注解上面的name属性放入到map里面去。
CommandHandler的实现类是用来和控制台进行交互的处理类,负责处理。
这也是策略模式的一种应用,根据map里面的不同策略来做不同的处理,例如SendMetricCommandHandler是用来统计调用信息然后发送给控制台用的,ModifyRulesCommandHandler是用来做实时修改限流策略的处理的等等。

然后我们再回到CommandCenterInitFunc中,继续往下走,调用commandCenter.start()方法。

SimpleHttpCommandCenter#start

public void start() throws Exception {
    //获取当前机器的cpu线程数
    int nThreads = Runtime.getRuntime().availableProcessors();
    //创建一个cpu线程数大小的固定线程池,用来做业务线程池用
    this.bizExecutor = new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
        new ArrayBlockingQueue<Runnable>(10),
        new NamedThreadFactory("sentinel-command-center-service-executor"),
        new RejectedExecutionHandler() {
            @Override
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                CommandCenterLog.info("EventTask rejected");
                throw new RejectedExecutionException();
            }
        });

    Runnable serverInitTask = new Runnable() {
        int port;

        {
            try {
                //获取port
                port = Integer.parseInt(TransportConfig.getPort());
            } catch (Exception e) {
                port = DEFAULT_PORT;
            }
        }

        @Override
        public void run() {
            boolean success = false;
            //创建一个ServerSocket
            ServerSocket serverSocket = getServerSocketFromBasePort(port);

            if (serverSocket != null) {
                CommandCenterLog.info("[CommandCenter] Begin listening at port " + serverSocket.getLocalPort());
                socketReference = serverSocket;
                executor.submit(new ServerThread(serverSocket));
                success = true;
                port = serverSocket.getLocalPort();
            } else {
                CommandCenterLog.info("[CommandCenter] chooses port fail, http command center will not work");
            }

            if (!success) {
                port = PORT_UNINITIALIZED;
            }

            TransportConfig.setRuntimePort(port);
            //关闭线程池
            executor.shutdown();
        }

    };

    new Thread(serverInitTask).start();
}
  1. 这个方法会创建一个固定大小的业务线程池
  2. 创建一个serverInitTask,里面负责建立serverSocket然后用executor去创建一个ServerThread异步执行serverSocket
  3. executor用完之后会在serverInitTask里面调用executor的shutdown方法去关闭线程池

其中executor是一个单线程的线程池:

private ExecutorService executor = Executors.newSingleThreadExecutor(
    new NamedThreadFactory("sentinel-command-center-executor"));

ServerThread是SimpleHttpCommandCenter的内部类:

public void run() {
    while (true) {
        Socket socket = null;
        try {
              //建立连接
            socket = this.serverSocket.accept();
              //默认的超时时间是3s
            setSocketSoTimeout(socket);
            HttpEventTask eventTask = new HttpEventTask(socket);
            //使用业务线程异步处理
            bizExecutor.submit(eventTask);
        } catch (Exception e) {
            CommandCenterLog.info("Server error", e);
            if (socket != null) {
                try {
                    socket.close();
                } catch (Exception e1) {
                    CommandCenterLog.info("Error when closing an opened socket", e1);
                }
            }
            try {
                // In case of infinite log.
                Thread.sleep(10);
            } catch (InterruptedException e1) {
                // Indicates the task should stop.
                break;
            }
        }
    }
}

run方法会使用构造器传入的serverSocket建立连接后设置超时时间,封装成HttpEventTask类,然后使用上面创建的bizExecutor异步执行任务。

HttpEventTask是Runnable的实现类,所以调用bizExecutor的submit的时候会调用其中的run方法使用socket与控制台进行交互。

HttpEventTask#run

public void run() {
          ....
        // Validate the target command.
        //获取commandName
        String commandName = HttpCommandUtils.getTarget(request);
        if (StringUtil.isBlank(commandName)) {
            badRequest(printWriter, "Invalid command");
            return;
        }
        // Find the matching command handler.
        //根据commandName获取处理器名字
        CommandHandler<?> commandHandler = SimpleHttpCommandCenter.getHandler(commandName);
        if (commandHandler != null) {
            //调用处理器结果,然后返回给控制台
            CommandResponse<?> response = commandHandler.handle(request);
            handleResponse(response, printWriter, outputStream);
        }
          ....
    } catch (Throwable e) {
        ....
    } finally {
        ....
    }
}

HttpEventTask的run方法很长,但是很多都是有关输入输出流的,我们不关心,所以省略。只需要知道会把request请求最后转换成一个控制台发过来的指令,然后通过SimpleHttpCommandCenter调用getHandler得到处理器,然后处理数据就行了。

所以这个整个的处理流程就是:

通过这样的一个处理流程,然后实现了类似reactor的一个处理流程。

SimpleHttpCommandCenter#getHandler

public static CommandHandler getHandler(String commandName) {
    return handlerMap.get(commandName);
}

handlerMap里面的数据是通过前面我们分析的调用beforeStart方法设置进来的。

然后通过commandName获取对应的控制台,例如:控制台发送过来metric指令,那么就会对应的调用SendMetricCommandHandler的handle方法来处理控制台的指令。

我们来看看SendMetricCommandHandler是怎么处理返回统计数据的:

SendMetricCommandHandler#handle

public CommandResponse<String> handle(CommandRequest request) {
    // Note: not thread-safe.
    if (searcher == null) {
        synchronized (lock) {
            //获取应用名
            String appName = SentinelConfig.getAppName();
            if (appName == null) {
                appName = "";
            }
            if (searcher == null) {
                //用来找metric文件,
                searcher = new MetricSearcher(MetricWriter.METRIC_BASE_DIR,
                    MetricWriter.formMetricFileName(appName, PidUtil.getPid()));
            }
        }
    }
    //获取请求的开始结束时间和最大的行数
    String startTimeStr = request.getParam("startTime");
    String endTimeStr = request.getParam("endTime");
    String maxLinesStr = request.getParam("maxLines");
    //用来确定资源
    String identity = request.getParam("identity");
    long startTime = -1;
    int maxLines = 6000;
    if (StringUtil.isNotBlank(startTimeStr)) {
        startTime = Long.parseLong(startTimeStr);
    } else {
        return CommandResponse.ofSuccess("");
    }
    List<MetricNode> list;
    try {
        // Find by end time if set.
        if (StringUtil.isNotBlank(endTimeStr)) {
            long endTime = Long.parseLong(endTimeStr);
            //根据开始结束时间找到统计数据
            list = searcher.findByTimeAndResource(startTime, endTime, identity);
        } else {
            if (StringUtil.isNotBlank(maxLinesStr)) {
                maxLines = Integer.parseInt(maxLinesStr);
            }
            maxLines = Math.min(maxLines, 12000);
            list = searcher.find(startTime, maxLines);
        }
    } catch (Exception ex) {
        return CommandResponse.ofFailure(new RuntimeException("Error when retrieving metrics", ex));
    }
    if (list == null) {
        list = new ArrayList<>();
    }
    //如果identity为空就加入CPU负载和系统负载
    if (StringUtil.isBlank(identity)) {
        addCpuUsageAndLoad(list);
    }
    StringBuilder sb = new StringBuilder();
    for (MetricNode node : list) {
        sb.append(node.toThinString()).append("\n");
    }
    return CommandResponse.ofSuccess(sb.toString());
}

我们在1.Sentinel源码分析—FlowRuleManager加载规则做了什么?里介绍了Metric统计信息会在MetricTimerListener的run方法中定时写入文件中去。

所以handle方法里面主要是如何根据请求的开始结束时间,资源名来获取磁盘的文件,然后返回磁盘的统计信息,并记录一下当前的统计信息,防止重复发送统计数据到控制台。

HeartbeatSenderInitFunc

HeartbeatSenderInitFunc主要是用来做心跳线程使用的,定期的和控制台进行心跳连接。

HeartbeatSenderInitFunc#init

public void init() {
    //获取HeartbeatSender的实现类
    HeartbeatSender sender = HeartbeatSenderProvider.getHeartbeatSender();
    if (sender == null) {
        RecordLog.warn("[HeartbeatSenderInitFunc] WARN: No HeartbeatSender loaded");
        return;
    }
    //创建一个corepoolsize为2,maximumPoolSize为最大的线程池
    initSchedulerIfNeeded();
    //获取心跳间隔时间,默认10s
    long interval = retrieveInterval(sender);
    //设置间隔心跳时间
    setIntervalIfNotExists(interval);
    //开启一个定时任务,每隔interval时间发送一个心跳
    scheduleHeartbeatTask(sender, interval);
}
  1. 首先会调用HeartbeatSenderProvider.getHeartbeatSender方法,里面会根据spi创建实例,返回一个SimpleHttpHeartbeatSender实例。
  2. 调用initSchedulerIfNeeded方法创建一个corepoolsize为2的线程池
  3. 获取心跳间隔时间,如果没有设置,那么是10s
  4. 调用scheduleHeartbeatTask方法开启一个定时线程调用。

我们来看看scheduleHeartbeatTask方法:
HeartbeatSenderInitFunc#scheduleHeartbeatTask

private void scheduleHeartbeatTask(/*@NonNull*/ final HeartbeatSender sender, /*@Valid*/ long interval) {
    pool.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            try {
                sender.sendHeartbeat();
            } catch (Throwable e) {
                RecordLog.warn("[HeartbeatSender] Send heartbeat error", e);
            }
        }
    }, 5000, interval, TimeUnit.MILLISECONDS);
    RecordLog.info("[HeartbeatSenderInit] HeartbeatSender started: "
        + sender.getClass().getCanonicalName());
}

默认的情况,创建的这个定时任务会每隔10s调用一次SimpleHttpHeartbeatSender的sendHeartbeat方法。

SimpleHttpHeartbeatSender#sendHeartbeat

public boolean sendHeartbeat() throws Exception {
    if (TransportConfig.getRuntimePort() <= 0) {
        RecordLog.info("[SimpleHttpHeartbeatSender] Runtime port not initialized, won't send heartbeat");
        return false;
    }
    //获取控制台的ip和端口等信息
    InetSocketAddress addr = getAvailableAddress();
    if (addr == null) {
        return false;
    }
    //设置http调用的ip和端口,还有访问的url
    SimpleHttpRequest request = new SimpleHttpRequest(addr, HEARTBEAT_PATH);
    //获取版本号,端口等信息
    request.setParams(heartBeat.generateCurrentMessage());
    try {
        //发送post请求
        SimpleHttpResponse response = httpClient.post(request);
        if (response.getStatusCode() == OK_STATUS) {
            return true;
        }
    } catch (Exception e) {
        RecordLog.warn("[SimpleHttpHeartbeatSender] Failed to send heartbeat to " + addr + " : ", e);
    }
    return false;
}

这个心跳检测的方法就写的很简单了,通过Dcsp.sentinel.dashboard.server预先设置好的ip和端口号发送post请求到控制台,然后检测是否返回200,如果是则说明控制台正常,否则进行异常处理。

原文地址:https://www.cnblogs.com/luozhiyun/p/11601129.html

时间: 2024-08-29 18:55:15

7.Sentinel源码分析—Sentinel是怎么和控制台通信的?的相关文章

4.Sentinel源码分析— Sentinel是如何做到降级的?

各位中秋节快乐啊,我觉得在这个月圆之夜有必要写一篇源码解析,以表示我内心的高兴~ Sentinel源码解析系列: 1.Sentinel源码分析-FlowRuleManager加载规则做了什么? 2. Sentinel源码分析-Sentinel是如何进行流量统计的? 3. Sentinel源码分析- QPS流量控制是如何实现的? 在我的第二篇文章里面2. Sentinel源码分析-Sentinel是如何进行流量统计的?里面介绍了整个Sentinel的主流程是怎样的.所以降级的大致流程可以概述为:

5.Sentinel源码分析—Sentinel如何实现自适应限流?

Sentinel源码解析系列: 1.Sentinel源码分析-FlowRuleManager加载规则做了什么? 2. Sentinel源码分析-Sentinel是如何进行流量统计的? 3. Sentinel源码分析- QPS流量控制是如何实现的? 4.Sentinel源码分析- Sentinel是如何做到降级的? 这篇文章主要学习一下Sentinel如何实现自适应限流的. 为什么要做自适应限流,官方给了两个理由: 保证系统不被拖垮 在系统稳定的前提下,保持系统的吞吐量 我再贴一下官方的原理: 能

6.Sentinel源码分析—Sentinel是如何动态加载配置限流的?

Sentinel源码解析系列: 1.Sentinel源码分析-FlowRuleManager加载规则做了什么? 2. Sentinel源码分析-Sentinel是如何进行流量统计的? 3. Sentinel源码分析- QPS流量控制是如何实现的? 4.Sentinel源码分析- Sentinel是如何做到降级的? 5.Sentinel源码分析-Sentinel如何实现自适应限流? 有时候我们做限流的时候并不想直接写死在代码里面,然后每次要改规则,或者增加规则的时候只能去重启应用来解决.而是希望能

源码分析 Sentinel 之 Dubbo 适配原理

在Alibaba Sentinel 限流与熔断初探(技巧篇) 的示例中我选择了 sentinel-demo-apache-dubbo 作为突破点,故本文就从该项目入手,看看 Sentinel 是如何对 Dubbo 做的适配,让项目使用方无感知,只需要引入对应的依即可. sentinel-apache-dubbo-adapter 比较简单,展开如下: 上面的代码应该比较简单,在正式进入源码研究之前,我先抛出如下二个问题: 1.限流.熔断相关的功能是在 Dubbo 的客户端实现还是服务端实现?为什么

Nginx源码分析 - Nginx启动以及IOCP模型

Nginx 源码分析 - Nginx启动以及IOCP模型 版本及平台信息 本文档针对Nginx1.11.7版本,分析Windows下的相关代码,虽然服务器可能用linux更多,但是windows平台下的代码也基本相似 ,另外windows的IOCP完成端口,异步IO模型非常优秀,很值得一看. Nginx启动 曾经有朋友问我,面对一个大项目的源代码,应该从何读起呢?我给他举了一个例子,我们学校大一大二是在紫金港校区,到了 大三搬到玉泉校区,但是大一的时候也会有时候有事情要去玉泉办.偶尔会去玉泉,但

【JUC】JDK1.8源码分析之SynchronousQueue(九)

一.前言 本篇是在分析Executors源码时,发现JUC集合框架中的一个重要类没有分析,SynchronousQueue,该类在线程池中的作用是非常明显的,所以很有必要单独拿出来分析一番,这对于之后理解线程池有很有好处,SynchronousQueue是一种阻塞队列,其中每个插入操作必须等待另一个线程的对应移除操作 ,反之亦然.同步队列没有任何内部容量,甚至连一个队列的容量都没有. 二.SynchronousQueue数据结构 由于SynchronousQueue的支持公平策略和非公平策略,所

jQuery 源码分析(十一) 队列模块 Queue详解

队列是常用的数据结构之一,只允许在表的前端(队头)进行删除操作(出队),在表的后端(队尾)进行插入操作(入队).特点是先进先出,最先插入的元素最先被删除. 在jQuery内部,队列模块为动画模块提供基础功能,负责存储动画函数.自动出队并执行动画函数,同时还要确保动画函数的顺序执行. jQuery的静态方法含有如下API: $.queue(elem,type,data) ;返回或修改匹配元素关联的队列,返回最新的队列,参数如下:   elem ;DOM元素或JavaScript对象 type  ;

TeamTalk源码分析之login_server

login_server是TeamTalk的登录服务器,负责分配一个负载较小的MsgServer给客户端使用,按照新版TeamTalk完整部署教程来配置的话,login_server的服务端口就是8080,客户端登录服务器地址配置如下(这里是win版本客户端): 1.login_server启动流程 login_server的启动是从login_server.cpp中的main函数开始的,login_server.cpp所在工程路径为server\src\login_server.下表是logi

Android触摸屏事件派发机制详解与源码分析二(ViewGroup篇)

1 背景 还记得前一篇<Android触摸屏事件派发机制详解与源码分析一(View篇)>中关于透过源码继续进阶实例验证模块中存在的点击Button却触发了LinearLayout的事件疑惑吗?当时说了,在那一篇咱们只讨论View的触摸事件派发机制,这个疑惑留在了这一篇解释,也就是ViewGroup的事件派发机制. PS:阅读本篇前建议先查看前一篇<Android触摸屏事件派发机制详解与源码分析一(View篇)>,这一篇承接上一篇. 关于View与ViewGroup的区别在前一篇的A