flume服务管理实现分析

flume可以监控并管理组件的运行状态,在组件关闭的时候可以自动拉起来,原理是通过启动一个计划任务线程池(monitorService ,线程的最大数量为30),运行监控线程(MonitorRunnable线程),每隔3s判断组件(包括Channel,SinkRunner)的状态是否符合要求(可用的状态由两种START和STOP),根据不同的要求调用对应组件不同的方法,START会调用start方法,STOP会调用stop方法,如果想监控一个组件的状态,只需对这个组件调用supervise方法即可,如果想停止监控一个组件,只需对这个组件调用unsupervise方法即可,同时有一个线程每隔两小时移除已经不再监控(调用了unsupervise方法)的组件的检查任务。

这个功能主要是通过org.apache.flume.lifecycle.LifecycleSupervisor实现
在org.apache.flume.node.Application类的构造函数中会初始化LifecycleSupervisor类的对象:

  public Application(List<LifecycleAware> components) {
    this. components = components;
    supervisor = new LifecycleSupervisor();
  }

flume进程启动时调用

org.apache.flume.node.Application.main--->org.apache.flume.node.Application.start
  public synchronized void start() { //start方法会对每一个组件调用LifecycleSupervisor.supervise方法,参数为组件,AlwaysRestartPolicy和START状态(即期望的状态为START)
    for(LifecycleAware component : components) {
      supervisor.supervise(component,
          new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START); 
    }
  }

分析LifecycleSupervisor类:
org.apache.flume.lifecycle.LifecycleSupervisor实现了LifecycleAware接口,本身也有一个生命周期的概念(提供start/stop等方法)

其定义了几个重要的内部类:
1.MonitorRunnable,实现了Runnable接口的线程类
1)3个属性ScheduledExecutorService monitorService,LifecycleAware lifecycleAware,Supervisoree supervisoree;
2)主要的run方法分析

    public void run() {
      long now = System.currentTimeMillis();
      try {
        if (supervisoree.status.firstSeen == null) {
          logger.debug("first time seeing {}", lifecycleAware);
          supervisoree.status.firstSeen = now; //第一次开始运行时,设置firstSeen为System.currentTimeMillis()
        }
        supervisoree.status.lastSeen = now; //设置lastSeen为now
        synchronized (lifecycleAware) {
          if (supervisoree.status.discard) { //如果Status的discard或者error的值为true,会直接退出
...
            return;
          } else if (supervisoree.status.error) {
...
            return;
          }
          supervisoree.status.lastSeenState = lifecycleAware.getLifecycleState(); //设置lastSeenState的值
          if (!lifecycleAware.getLifecycleState().equals(
              supervisoree.status.desiredState)) { //如果获取的lifecycleAware对象状态不是想设置的desiredState状态
...
            switch (supervisoree.status.desiredState) { //根据设置的desiredState状态调用lifecycleAware的不同方法,desiredState的值只有两种START和STOP
              case START:
                try {
                  lifecycleAware.start(); //状态为START时设置运行start方法
                } catch (Throwable e) {
...
                  supervisoree.status.failures++; //start方法异常时failures的值加1
                }
                break;
              case STOP:
                try {
                  lifecycleAware.stop(); //状态为STOP时设置运行stop方法
                } catch (Throwable e) {
...
                  supervisoree.status.failures++; //stop方法异常时failures的值加1
                }
                break;
              default:
...
            }
            if (!supervisoree.policy.isValid(lifecycleAware, supervisoree.status)) {
               //调用SupervisorPolicy的isValid方法,比如OnceOnlyPolicy 的isValid的方法会判断Status.failures 的值,如果为0则返回true,否则返回false
              logger.error(
                  "Policy {} of {} has been violated - supervisor should exit!",
                  supervisoree.policy, lifecycleAware);
            }
          }
        }
      } catch(Throwable t) {
...      
      }
...
    }

2.Purger,实现了Runnable接口的线程类
run方法:

    public void run() { 
      if(needToPurge){  //如果needToPurge设置为true
        monitorService.purge(); //ScheduledThreadPoolExecutor.purge方法用于从工作队列中删除已经cancel的java.util.concurrent.Future对象(释放队列空间)
        needToPurge = false ; //并设置needToPurge为false
      }
    }

3.Status内部类定义了几个状态属性,代表了Supervisoree的状态

    public Long firstSeen;
    public Long lastSeen;
    public LifecycleState lastSeenState;
    public LifecycleState desiredState;
    public int failures ;
    public boolean discard ;
    public volatile boolean error ;

4. SupervisorPolicy 是抽象类,定义了抽象方法isValid(LifecycleAware object, Status status),包含两个扩展类AlwaysRestartPolicy 和OnceOnlyPolicy
AlwaysRestartPolicy 的isValid会一直返回true,OnceOnlyPolicy 的isValid的方法会判断Status.failures 的值,如果为0则返回true,否则返回false

5.Supervisoree包含SupervisorPolicy 和Status属性

主要的方法分析:
在构造方法中初始化几个重要的属性:

  public LifecycleSupervisor() {
    lifecycleState = LifecycleState.IDLE;
    supervisedProcesses = new HashMap<LifecycleAware, Supervisoree>(); // supervisedProcesses 用于存放LifecycleAware和Supervisoree对象的键值对,代表已经管理的组件
    monitorFutures = new HashMap<LifecycleAware, ScheduledFuture<?>>(); //monitorFutures 用于存放LifecycleAware对象和ScheduledFuture对象的键值对
    monitorService = new ScheduledThreadPoolExecutor(10,
        new ThreadFactoryBuilder().setNameFormat(
            "lifecycleSupervisor-" + Thread.currentThread().getId() + "-%d")
            .build()); // monitorService 用于调用Purger线程,定时移除线程池中已经cancel的task
    monitorService.setMaximumPoolSize(20);
    monitorService.setKeepAliveTime(30, TimeUnit.SECONDS);
    purger = new Purger();
    needToPurge = false; // 初始时为false,在有task cancel的时候设置为true
  }

start方法用于启动检测线程池:

  public synchronized void start() {
....
    monitorService.scheduleWithFixedDelay( purger, 2, 2, TimeUnit. HOURS); //在两小时后每隔两小时运行一次Purger,释放线程池的工作队列
    lifecycleState = LifecycleState. START; //设置状态为START
...
  }

stop方法首先关闭线程池,然后关闭各个组件
1)线程池关闭

monitorService.shutdown();

2)各组件关闭

   for ( final Entry<LifecycleAware, Supervisoree> entry : supervisedProcesses
        .entrySet()) { //遍历supervisedProcesses中的各个组件
      if (entry.getKey(). getLifecycleState().equals(LifecycleState.START)) { //如果组件的当前状态是START,则首先设置其需要变成的状态为STOP,并调用组件的stop方法
        entry.getValue(). status. desiredState = LifecycleState.STOP; 
        entry.getKey().stop();
      }
    }

supervise方法用于监控对应的组件,有3个参数LifecycleAware lifecycleAware, SupervisorPolicy policy, LifecycleState desiredState

 public synchronized void supervise(LifecycleAware lifecycleAware,
      SupervisorPolicy policy, LifecycleState desiredState) {
    if(this. monitorService.isShutdown()
        || this.monitorService .isTerminated()
        || this.monitorService .isTerminating()){ //检测监控线程池是否正常
      throw new FlumeException("Supervise called on " + lifecycleAware + " " +
          "after shutdown has been initiated. " + lifecycleAware + " will not" +
          " be started");
    }
    Preconditions.checkState(!supervisedProcesses .containsKey(lifecycleAware),
        "Refusing to supervise " + lifecycleAware + " more than once" ); //检测是否已经管理
.....
    Supervisoree process = new Supervisoree(); //初始化Supervisoree对象
    process.status = new Status(); //并实例化Supervisoree对象的Status属性
    process.policy = policy; //设置Supervisoree的属性
    process.status.desiredState = desiredState;
    process.status.error = false;
    MonitorRunnable monitorRunnable = new MonitorRunnable(); //初始化一个MonitorRunnable 对象(线程),并设置对象的属性
    monitorRunnable.lifecycleAware = lifecycleAware;
    monitorRunnable.supervisoree = process;
    monitorRunnable.monitorService = monitorService;
    supervisedProcesses.put(lifecycleAware, process); //向supervisedProcesses中插入键值对,代表已经开始管理的组件
    ScheduledFuture<?> future = monitorService.scheduleWithFixedDelay(
        monitorRunnable, 0, 3, TimeUnit. SECONDS); // 设置计划任务线程池,每隔3s之后运行monitorRunnable
    monitorFutures.put(lifecycleAware, future); // 向monitorFutures中插入键值对
  }

unsupervise方法用于停止组件并从监控容器中去除:

    synchronized (lifecycleAware) {
    Supervisoree supervisoree = supervisedProcesses.get(lifecycleAware); //从已经管理的Supervisoree  hashmap中获取Supervisoree对象
    supervisoree.status.discard = true;  //设置Supervisoree对象的Status属性的discard 值为discard
      this.setDesiredState(lifecycleAware, LifecycleState.STOP); 
      //调用setDesiredState方法,设置Supervisoree对象的Status属性的desiredState 值为STOP(supervisoree.status.desiredState = desiredState)
      logger.info("Stopping component: {}", lifecycleAware);
      lifecycleAware.stop(); //调用组件的stop方法
    }
    supervisedProcesses.remove(lifecycleAware); //从supervisedProcesses hashmap中移除这个组件
    monitorFutures.get(lifecycleAware).cancel(false); 
    //调用组件对应的ScheduledFuture的cancel方法取消任务(A Future represents the result of an asynchronous computation.
 ,cancel :Attempts to cancel execution of this task.)
    needToPurge = true; //设置needToPurge 的属性为true,这样就可以在purge中删除已经cancel的ScheduledFuture对象
    monitorFutures.remove(lifecycleAware);
时间: 2024-10-15 16:59:17

flume服务管理实现分析的相关文章

实战篇:如何建设企业的营销管理和分析平台

企业每天都在制造大量的经营数据,这些数据反映了企业生成.销售状况.营销分析是在广泛收集信息资料的基础上,运用各种定性和定量的方法,帮助管理层决策分析,更好的为开展营销工作服务. 一般而言营销管理分析系统包含以下几个基本要求: ①灵活弹性的报表设计,适应各个地区.情况的报表需求,能迎合企业需要快速反应企业状况: ②可视化的数据呈现方式,帮助企业领导层快速解读报表内容: ③多元化的数据分析维度,帮助企业发现数据隐含的意义. 前述讲完了,下面就开始我们的实战: (一)企业介绍: 主角--青岛海立美达股

如何写SysV服务管理脚本

本文目录: 1.1 SysV脚本的特性1.2 SysV脚本要具备的能力1.3 start函数分析1.4 stop函数分析1.5 reload函数分析1.6 status.restart.force-reload等1.7 结束语 SysV服务管理脚本和/etc/rc.d/init.d/functions文件中的几个重要函数(包括daemon,killproc,status以及几个和pid有关的函数)"关系匪浅".本人已对该文件做了极详细的分析和说明,参考functions文件详细分析和说

到底什么是IT服务管理

作者:易仔阿克    时间:2014-08-11 标题:到底什么是IT服务管理 经过近三十年的发展演进,以ITIL(IT基础设施库)为基础发展而来的IT服务管理成为企业主流的参考框架.那么,为什么IT服务管理具有这么重要的地位呢?IT服务管理对于企业信息化有什么重要的作用呢? 从历史的角度看,IT起初主要是作为各个职能部门提高办公效率的一种支撑手段,有了信息系统,企业可以扔掉传统的纸和笔,取而代之的是通过电脑进行记录.查询和统计,这种方式实现了信息共享并提高了工作效率. 但是,满足各个职能部门需

activitymanagerservice服务源码分析

activitymanagerservice服务源码分析 1.ActivityManagerService概述 ActivityManagerService(以下简称AMS)作为android中最核心的服务,主要负责系统的四大组件的启动.切换.调度以及应用进程的管理和调度等工作.它类似于操作系统中的进程管理和调度模块类似,所以要想掌握android,AMS至关重要.AMS属于service的一种,所以它也是由system_server进行启动以及管理.本文将以两条不同的主线来分析AMS:第一条与

数据平台之企业营销管理与分析平台建设

企业每天都在制造大量的经营数据,这些数据反映了企业生成.销售状况.营销分析是在广泛收集信息资料的基础上,运用各种定性和定量的方法,帮助管理层决策分析,更好的为开展营销工作服务. 一般而言营销管理分析系统包含以下几个基本要求: ①灵活弹性的报表设计,适应各个地区.情况的报表需求,能迎合企业需要快速反应企业状况: ②可视化的数据呈现方式,帮助企业领导层快速解读报表内容: ③多元化的数据分析维度,帮助企业发现数据隐含的意义. 建设企业营销管理和分析平台的背景 某公司的信息化建设主要借助SAP系统进行,

第11章 服务管理

本文目录: 11.1 服务的概念 11.2 管理独立守护进程 11.3 管理服务的开机自启动 11.4 管理xinetd及相关瞬时守护进程 11.5 CentOS 7上管理服务 CentOS 7和CentOS 6管理服务的方式完全不同.本文先说明CentOS 6上的管理方式,在最后列出CentOS 7上服务管理方式. 11.1 服务的概念 服务是向外提供服务的进程,一般来说都会放在后台,既然要持续不断的提供外界随时发来的服务请求,服务进程就需要常驻在内存中,且不应该和终端有关,否则终端退出服务程

Linux基础四(服务管理)

目录 一.简介与分类 1.系统的默认运行级别 2.服务的分类 3.服务与端口 二.服务管理 1.RPM包服务管理 2.源码包服务管理 三.服务管理总结 一.简介与分类 1. 系统的运行级别 1.1 默认运行级别 系统运行级别 0 - 关机 1 - 单用户模式,主要用于系统修复,类似于windows的安全模式 2 - 不完全的命令行模式,不含NFS服务(NFS是Linux之间进行文件共享的服务) 3 - 完全的命令行模式,即标准的字符界面 4 - 系统保留 5 - 图形模式 6 - 重启 1.2

提高企业IT服务管理能力的神器-ITSM(IT Service Management,IT服务管理)

一.历史ITSM起源于ITIL(IT Infrastructure Library,IT基础架构标准库),ITIL是CCTA(英国国家电脑局)于1980年开发的一套IT服务管理标准库.它把英国在IT管理方面的方法归纳起来,变成规范,为企业的IT部门提供一套从规划.研发.实施到运维的标准方法.80年代中期,英国政府发现IT服务质量普遍不理想,甚至提供给其的IT服务质量也很差,于是就责成其下属机构--计算机和电信局(CCTA)(后来并入英国政府商务部(OGC)) ,启动一个项目对此进行调查,并开发一

“商务智能”对于IT服务管理来说,非常重要!

企业整体信息化管理水平也随之不断提高,同样,科学决策难度也随之大大增加,商业智能应用需求迫切.而在企业对前端系统数据的商务分析高度关注的同时,后台运维管理的各项数据分析也日益引起企业信息化管理者的密切注视.特别是当IT服务工作量日益庞大,后台运维数据的实时收集和精准分析,更是关系到企业总体IT成本的节约和服务管理的整体效率. 那么后台运维过程中如何让商务智能得到最大限度的展现呢?这一过程中各项数据又将如何支持最佳决策分析呢?针对以上种种情况,本土第一IT运维服务商金道公司结合10多年来服务百余家