turbine源码分析

turbine源码分析

1、turbine架构设计

一切从InstanceDiscovery模块开始,该模块提供所有的主机信息。它会定期的发送更新,ConnectionManager负责创建连接到主机。一旦建立起连接,数据流将源源不断的发送给Aggregator既聚合器。聚合器将数据汇聚后的数据输出到客户端或者下游监听者。

汇聚示例:

{type:‘weather-data-temp‘, name:‘New York‘, temp:74}
{type:‘weather-data-temp‘, name:‘Los Angeles‘, temp:85}
{type:‘weather-data-temp‘, name:‘New York‘, temp:76}

{type:‘weather-data-wind-velocity‘, name:‘New York‘, temp:12}
{type:‘weather-data-wind-velocity‘, name:‘Los Angeles‘, temp:10}

比如有一组这样的数据,经过汇聚后得到下面的结果

{type:‘weather-data-temp‘, name:‘Los Angeles‘, temp:85}
{type:‘weather-data-temp‘, name:‘New York‘, temp:75}
{type:‘weather-data-wind-velocity‘, name:‘New York‘, temp:12}
{type:‘weather-data-wind-velocity‘, name:‘Los Angeles‘, temp:10}

说明:它是使用type和name做为关键字,进行数据汇聚的

2、实例发现

我们先来看看的架构图

turbine通过配置文件,配置需要收集指标的应用。InstanceObservable定期的拉取实例信息,通知InstanceObserver进行主机信息的更新。turbine只有在主机是UP的状态的时候,才会去连接主机获取指标。这么做的原因是很多复杂的系统在他们将自己标记为可用之前,需要通过长时间的启动引导做一些预处理,比如查询缓存、建立主机连接、运行预计算逻辑等。我们通过分析代码的结构也可以看到

turbine启动的时候会调用TurbineInitinit()方法进行初始化

public static void init() {

        ClusterMonitorFactory clusterMonitorFactory = PluginsFactory.getClusterMonitorFactory();
        if(clusterMonitorFactory == null) {
            PluginsFactory.setClusterMonitorFactory(new DefaultAggregatorFactory());
        }

        PluginsFactory.getClusterMonitorFactory().initClusterMonitors();

        InstanceDiscovery instanceDiscovery = PluginsFactory.getInstanceDiscovery();
        if (instanceDiscovery == null) {
            PluginsFactory.setInstanceDiscovery(getInstanceDiscoveryImpl());
        }
        //调用start()方式,初始化实例发现
        InstanceObservable.getInstance().start(PluginsFactory.getInstanceDiscovery());
    }

start()方法如下:

public void start(InstanceDiscovery iDiscovery) {
        if (started.get()) {
            throw new RuntimeException("InstanceDiscovery already started");
        }
        if (iDiscovery == null) {
            throw new RuntimeException("InstanceDiscovery is null");
        }
        instanceDiscovery = iDiscovery;
        logger.info("Starting InstanceObservable at frequency: " + pollDelayMillis.get() + " millis");
        //每隔pollDelayMillis去拉取一次主机的信息
        //通过producer
        timer.schedule(producer, 0, pollDelayMillis.get());
        started.set(true);
    }

producer的定义如下:

private final TimerTask producer = new TimerTask() {

        @Override
        public void run() {
                //主要代码...
                for(InstanceObserver watcher: observers.values()) {
                    if(currentState.get().hostsUp.size() > 0) {
                        try {
                            //通知InstanceObserver主机是UP状态
                            //watcher是个InstanceObserver接口的实例化对象
                            watcher.hostsUp(currentState.get().hostsUp);
                        } catch(Throwable t) {
                            logger.error("Could not call hostUp on watcher: " + watcher.getName(), t);
                        }
                    }
                }
        }
    };

通过IDEA工具,找到InstanceObserver接口只有一个实现类ClusterMonitorInstanceManager,该类中实现了hostsUp()方法,而hostsUp()方法又循环去调用hostUp方法,hostUp()方法如下:

    public void hostUp(Instance host) {

            if (!(getObservationCriteria().observeHost(host))) {
                return;
            }

            TurbineDataMonitor<DataFromSingleInstance> monitor = getMonitor(host);
            try {
                if(hostDispatcher.findHandlerForHost(host, getEventHandler().getName()) == null) {
                    // this handler is not already present for this host, then add it
                    hostDispatcher.registerEventHandler(host, getEventHandler());
                }
                //启动实例的监听
                monitor.startMonitor();
            } catch(Throwable t) {
                logger.info("Failed to start monitor: " + monitor.getName() + ", ex message: ", t);
                monitor.stopMonitor();
                logger.info("Removing monitor from stats event console");
                TurbineDataMonitor<DataFromSingleInstance> oldMonitor = hostConsole.removeMonitor(monitor.getName());
                if (oldMonitor != null) {
                    hostCount.decrementAndGet();
                }
            }
        }

monitorInstanceMonitor,既实例监控对象,如下图:

实例监听,主要工作就是从实例获取指标信息,下面会分析到

3、数据聚合

数据聚合的架构图如下

TurbineDataMonitor:数据监听,从实例处获取指标

TurbineDataDispatcher:派发器,将数据聚合后输出到客户端或者下游的数据监听

TurbineDataHandler:数据处理,其实就是客户端或者下游的数据监听

该架构有它的好处,可以实现数据的生产和消费的解耦,隔离客户端之间的处理。TurbineDataMonitor收到指标数据后,发送给TurbineDataDispatcher进行处理,它将指标信息聚合后写到一个队列中,TurbineDataHandler负责从队列取消息,如果TurbineDataHandler消费来不及,队列中的指标信息会增长,如果增长指定的大小的时候,只有消息被消费了,才会继续填充新的消息进去,否则消息将被丢弃。

4、TurbineDataMonitor

通过源码我们可以整理出TurbineDataMonitor类的上下结构图,如下:

TurbineDataMonitor是一个抽象类,里面的主要方法都是抽象的,其功能实现还是依赖它的子类。

在文中第二部分的最后,我们说到,InstanceObserver会启动实例的监听,我们继续看实例的监听到底做了什么。InstanceMonitor实例监听类中的startMonitor()方法如下

public void startMonitor() throws Exception {
        // This is the only state that we allow startMonitor to proceed in
        if (monitorState.get() != State.NotStarted) {
            return;
        }

        taskFuture = ThreadPool.submit(new Callable<Void>() {

            @Override
            public Void call() throws Exception {

                try {
                    //初始化
                    init();
                    monitorState.set(State.Running);
                    while(monitorState.get() == State.Running) {
                        //真正干活的地方
                        doWork();
                    }
                } catch(Throwable t) {
                    logger.warn("Stopping InstanceMonitor for: " + getStatsInstance().getHostname() + " " + getStatsInstance().getCluster(), t);
                } finally {
                    if (monitorState.get() == State.Running) {
                        monitorState.set(State.StopRequested);
                    }
                    cleanup();
                    monitorState.set(State.CleanedUp);
                }
                return null;
            }
        });
    }

先看看init()方法,初始化做了什么

通过调试我们发现,init()方法会根据实例的指标地址http://sheng:8088/manage/hystrix.stream去获取指标信息,这个正是指标真正的来源。

继续看doWork()方法

private void doWork() throws Exception {

        DataFromSingleInstance instanceData = null;
        //获取实例的指标信息
        instanceData = getNextStatsData();
        if(instanceData == null) {
            return;
        } else {
            lastEventUpdateTime.set(System.currentTimeMillis());
        }

        List<DataFromSingleInstance> list = new ArrayList<DataFromSingleInstance>();
        list.add(instanceData);

        /* send to all handlers */
        //向派发器中发送数据
        boolean continueRunning = dispatcher.pushData(getStatsInstance(), list);
        if(!continueRunning) {
            logger.info("No more listeners to the host monitor, stopping monitor for: " + host.getHostname() + " " + host.getCluster());
            monitorState.set(State.StopRequested);
            return;
        }
    }

派发器的说明见第5部分

5、TurbineDataDispatcher

通过查看派发器TurbineDataDispatcher中的源码可以找到pushData()方法如下:

public boolean pushData(final Instance host, final Collection<K> statsData) {

        if(stopped) {
            return false;
        }

        // get a copy of the list so we don‘t have ConcurrentModification errors when it changes while we‘re iterating
        Map<String, HandlerQueueTuple<K>> eventHandlers = eventHandlersForHosts.get(host);
        if (eventHandlers == null) {
            return false;  // stop the monitor, this should generally not happen, since we generally manage a set of static listeners for all hosts in discovery
        }

        for (final HandlerQueueTuple<K> tuple : eventHandlers.values()) {
            //HandlerQueueTuple管道中添加数据
            tuple.pushData(statsData);
        }

        // keep track of listeners registered, and if there are none, then notify the publisher of the events
        AtomicInteger count = getIterationWithoutHandlerCount(host);
        if (eventHandlers.size() == 0) {
            count.incrementAndGet();
            if (count.get() > 5) {
                logger.info("We no longer have handlers to dispatch to");
                return false;
            }
        } else {
            count.set(0);
        }
        return true;
    }

HandlerQueueTuple管道中的方法pushData()方法如下

 public void pushData(K data) {
        if (stopped) {
            return;
        }
        //往队列中写数据
        boolean success = queue.writeEvent(data);
        if (isCritical()) {
            // track stats
            if (success) {
                counter.increment(Type.EVENT_PROCESSED);
            } else {
                counter.increment(Type.EVENT_DISCARDED);
            }
        }
    }

HandlerQueueTuple管道中除了writeEvent()写事件外,还有一个readEvent()读事件的操作。将在第6部分分析

6、TurbineDataHandler

我们在HandlerQueueTuple中找到doWork()方法如下

public void doWork() throws Exception {

            List<K> statsData = new ArrayList<K>();

            int numMisses = 0;
            boolean stopPolling = false;

            do {
                //从队列中读取事件
                K data = queue.readEvent();
                if (data == null) {
                    numMisses++;
                    if (numMisses > 100) {
                        Thread.sleep(100);
                        numMisses = 0; // reset count so we can try polling again.
                    }
                } else {
                    statsData.add(data);
                    numMisses = 0;
                    stopPolling = true;
                }
            }
            while(!stopPolling);

            try {
                //通过事件处理器将数据输出到客户端
                eventHandler.handleData(statsData);
            } catch (Exception e) {
                if(eventHandler.getCriteria().isCritical()) {
                    logger.warn("Could not publish event to event handler for " + eventHandler.getName(), e);
                }
            }
        }

通过事件处理器将数据输出到客户端,这个操作主要是由TurbineDataHandler完成的,我们通过源码的整理,可以得到如下的代码类的图。

AggregateClusterMonitor的内部类AggStatsEventHandler实现数据的汇聚,TurbineStreamingConnection类是浏览器客户端连接时,数据的输出。

AggStatsEventHandlerhandleData()方法如下:

public void handleData(Collection<DataFromSingleInstance> statsData) {
            //整理出关键代码

            for (DataFromSingleInstance data : statsData) {
                TurbineData.Key dataKey = data.getKey();
                // 汇聚数据
                AggDataFromCluster clusterData = monitor.aggregateData.get(dataKey);
                if (clusterData == null) {
                    monitor.aggregateData.putIfAbsent(dataKey, new AggDataFromCluster(monitor, data.getType(), data.getName()));
                }
                clusterData.addStatsDataFromSingleServer(data);
                AggDataFromCluster dataToSend = monitor.aggregateData.get(dataKey);
                if (dataToSend != null && (!throttleCheck.throttle(data))) {
                    dataToSend.performPostProcessing();
                    //将数据添加到集群派发器的队列中
                    monitor.clusterDispatcher.pushData(monitor.getStatsInstance(), dataToSend);
                }
            }

        }

TurbineStreamingConnectionhandleData()方法很简单,就是将数据直接响应给浏览器。如下:

    public void handleData(Collection<T> data) {

        if (stopMonitoring) {
            // we have been stopped so don‘t try handling data
            return;
        }
        //写到stream中
        writeToStream(data);
    }

writeToStream()方法最终将指标数据响应给浏览器

原文地址:https://www.cnblogs.com/liangzs/p/8575079.html

时间: 2024-08-26 18:13:37

turbine源码分析的相关文章

TeamTalk源码分析之login_server

login_server是TeamTalk的登录服务器,负责分配一个负载较小的MsgServer给客户端使用,按照新版TeamTalk完整部署教程来配置的话,login_server的服务端口就是8080,客户端登录服务器地址配置如下(这里是win版本客户端): 1.login_server启动流程 login_server的启动是从login_server.cpp中的main函数开始的,login_server.cpp所在工程路径为server\src\login_server.下表是logi

Android触摸屏事件派发机制详解与源码分析二(ViewGroup篇)

1 背景 还记得前一篇<Android触摸屏事件派发机制详解与源码分析一(View篇)>中关于透过源码继续进阶实例验证模块中存在的点击Button却触发了LinearLayout的事件疑惑吗?当时说了,在那一篇咱们只讨论View的触摸事件派发机制,这个疑惑留在了这一篇解释,也就是ViewGroup的事件派发机制. PS:阅读本篇前建议先查看前一篇<Android触摸屏事件派发机制详解与源码分析一(View篇)>,这一篇承接上一篇. 关于View与ViewGroup的区别在前一篇的A

HashMap与TreeMap源码分析

1. 引言     在红黑树--算法导论(15)中学习了红黑树的原理.本来打算自己来试着实现一下,然而在看了JDK(1.8.0)TreeMap的源码后恍然发现原来它就是利用红黑树实现的(很惭愧学了Java这么久,也写过一些小项目,也使用过TreeMap无数次,但到现在才明白它的实现原理).因此本着"不要重复造轮子"的思想,就用这篇博客来记录分析TreeMap源码的过程,也顺便瞅一瞅HashMap. 2. 继承结构 (1) 继承结构 下面是HashMap与TreeMap的继承结构: pu

Linux内核源码分析--内核启动之(5)Image内核启动(rest_init函数)(Linux-3.0 ARMv7)【转】

原文地址:Linux内核源码分析--内核启动之(5)Image内核启动(rest_init函数)(Linux-3.0 ARMv7) 作者:tekkamanninja 转自:http://blog.chinaunix.net/uid-25909619-id-4938395.html 前面粗略分析start_kernel函数,此函数中基本上是对内存管理和各子系统的数据结构初始化.在内核初始化函数start_kernel执行到最后,就是调用rest_init函数,这个函数的主要使命就是创建并启动内核线

Spark的Master和Worker集群启动的源码分析

基于spark1.3.1的源码进行分析 spark master启动源码分析 1.在start-master.sh调用master的main方法,main方法调用 def main(argStrings: Array[String]) { SignalLogger.register(log) val conf = new SparkConf val args = new MasterArguments(argStrings, conf) val (actorSystem, _, _, _) =

Solr4.8.0源码分析(22)之 SolrCloud的Recovery策略(三)

Solr4.8.0源码分析(22)之 SolrCloud的Recovery策略(三) 本文是SolrCloud的Recovery策略系列的第三篇文章,前面两篇主要介绍了Recovery的总体流程,以及PeerSync策略.本文以及后续的文章将重点介绍Replication策略.Replication策略不但可以在SolrCloud中起到leader到replica的数据同步,也可以在用多个单独的Solr来实现主从同步.本文先介绍在SolrCloud的leader到replica的数据同步,下一篇

zg手册 之 python2.7.7源码分析(4)-- pyc字节码文件

什么是字节码 python解释器在执行python脚本文件时,对文件中的python源代码进行编译,编译的结果就是byte code(字节码) python虚拟机执行编译好的字节码,完成程序的运行 python会为导入的模块创建字节码文件 字节码文件的创建过程 当a.py依赖b.py时,如在a.py中import b python先检查是否有b.pyc文件(字节码文件),如果有,并且修改时间比b.py晚,就直接调用b.pyc 否则编译b.py生成b.pyc,然后加载新生成的字节码文件 字节码对象

LevelDB源码分析--Iterator

我们先来参考来至使用Iterator简化代码2-TwoLevelIterator的例子,略微修改希望能帮助更加容易立即,如果有不理解请各位看客阅读原文. 下面我们再来看一个例子,我们为一个书店写程序,书店里有许多书Book,每个书架(BookShelf)上有多本书. 类结构如下所示 class Book { private: string book_name_; }; class Shelf { private: vector<Book> books_; }; 如何遍历书架上所有的书呢?一种实

【Heritrix源码分析】Heritrix基本内容介绍

1.版本说明 (1)最新版本:3.3.0 (2)最新release版本:3.2.0 (3)重要历史版本:1.14.4 3.1.0及之前的版本:http://sourceforge.net/projects/archive-crawler/files/ 3.2.0及之后的版本:http://archive.org/ 由于国情需要,后者无法访问,因此本blog研究的是1.14.4版本. 2.官方材料 source:http://sourceforge.net/projects/archive-cra