flume sink运行过程简单分析

没有运行,直接看源码得到sink简单运行过程

SinkRunner负责运行sink程序

内部类

PollingRunner implements Runnable

{

   private SinkProcessor policy;

}

负责运行sink

run方法

while (!shouldStop.get()) {
  try {
    if (policy.process().equals(Sink.Status.BACKOFF)) {
      counterGroup.incrementAndGet("runner.backoffs");

      Thread.sleep(Math.min(counterGroup.incrementAndGet("runner.backoffs.consecutive")* backoffSleepIncrement, maxBackoffSleep));
    } else {
      counterGroup.set("runner.backoffs.consecutive", 0L);
    }
  } catch (InterruptedException e) {
      logger.debug("Interrupted while processing an event. Exiting.");
      counterGroup.incrementAndGet("runner.interruptions");
  } catch (Exception e) {
      logger.error("Unable to deliver event. Exception follows.", e);
      if (e instanceof EventDeliveryException) {
        counterGroup.incrementAndGet("runner.deliveryErrors");
      } else {
        counterGroup.incrementAndGet("runner.errors");
      }
  try {
      Thread.sleep(maxBackoffSleep);
    } catch (InterruptedException ex) {
      Thread.currentThread().interrupt();
    }
    }
  }

policy 对应具体的sink处理器,这里以FailoverSinkProcessor举例子

这里面,针对FailoverSinkProcessor可以参照 http://blog.csdn.net/simonchi/article/details/42520193讲解,这里大致说下便可

configure方法

liveSinks = new TreeMap<Integer, Sink>();
failedSinks = new PriorityQueue<FailedSink>();

从配置文件中定义的sinks中遍历每一个sink,获得其优先级,然后放到liveSinks中,无论sink是否可用。

最后,activeSink = liveSinks.get(liveSinks.lastKey());,从liveSinks按照key排序,获得最后一个key(优先级,最大)对应的sink初始化 activeSink

policy.process().equals(Sink.Status.BACKOFF))执行的是FailoverSinkProcessor的process()方法

process()方法

首先一个while循环,遍历所有的failedSinks ,拿出每一个failed的sink,如果拿出来的failed sink能够访问了,则把他付给activeSink ,并return sink.process()的状态。在轮询的过程中,如果failed sink还是不能到达,则重新放入到failedSinks 中并刷新时间,否则,如果能够联通,但是状态不是READY,也放入到failedSinks 中且不刷新。

之后,是对activeSink进行while循环,调用activeSink中的每一个sink.proccess().调用成功,则return状态。否则,出现异常,将当前active的sink移动到failedSinks 中,同时获得下一个active的sink从activeSink中。继续while判断

函数的最后是一个异常,即没有任何一个sink可用。

sink.process()是啥?是从channel中拿出数据的。

这里以NullSink为例

根据事务和batchsize从chanel中拿出数据来,并写入到相应的位置

public Status process() throws EventDeliveryException {
    Status status = Status.READY;

    Channel channel = getChannel();
    Transaction transaction = channel.getTransaction();
    Event event = null;
    long eventCounter = counterGroup.get("events.success");

    try {
        transaction.begin();
        int i = 0;
        for (i = 0; i < batchSize; i++) {
          event = channel.take();
          if (++eventCounter % logEveryNEvents == 0) {
            logger.info("Null sink {} successful processed {} events.", getName(), eventCounter);
          }
          if(event == null) {
              status = Status.BACKOFF;
              break;
          }
        }
        transaction.commit();
        counterGroup.addAndGet("events.success", (long) Math.min(batchSize, i));
        counterGroup.incrementAndGet("transaction.success");
     } catch (Exception ex) {
        transaction.rollback();
        counterGroup.incrementAndGet("transaction.failed");
        logger.error("Failed to deliver event. Exception follows.", ex);
        throw new EventDeliveryException("Failed to deliver event: " + event, ex);
    } finally {
        transaction.close();
    }

      return status;
  }

时间: 2024-10-09 16:28:46

flume sink运行过程简单分析的相关文章

简单C程序在IA-32 CPU上运行过程的分析

本文将通过编译器生成的汇编代码分析C程序在IA-32体系PC上的运行流程 实验环境: gcc 4.8.2 C语言程序的内存结构 C代码如下 int g(int x) { return x + 1; } int f(int x) { return g(x); } int main(void) { return f(2) + 3; } 使用编译命令gcc -S -O0 -o main.s main.c -m32编译出汇编文件,如下 g: pushl %ebp movl %esp, %ebp movl

MVC请求过程 简单分析(一)

在服务端判断客户端传过来的文件的类型,如果是静态文件,直接返回,在页面输出显示.如果是动态文件,通过aspnet_isapi.dll转交过.NetFrameWork框架执行. 创建ISAPIruntime对象,执行ISAPIruntime对象中的ProcessRequest()方法 ProcessRequest()方法可以看做是进入Asp.Net的入口点. ProcessRequest()方法: 根据传进来的句柄(请求报文的编号)寻找资源,创建ISAPIWorkerRequest对象wr,将请求

Mybatis 映射器接口代理对象的方式 运行过程debug分析

查询一张表的所有数据. 环境: 使用工具IntelliJ IDEA 2018.2版本. 创建Maven工程不用骨架 1 <?xml version="1.0" encoding="UTF-8"?> 2 <project xmlns="http://maven.apache.org/POM/4.0.0" 3 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

Redis sentinel哨兵启动、切换过程简单分析

sentinel是Redis高可用Ha的重要组成部分,在传统Redis master/slave架构下,担任对主从复制的状态监控,并在主节点异常后自动将从节点提升为主节点对外提供服务. 下图展示了一个在哨兵sentinel集群中监控redis主从复制的一个例子,其中: 1. Sentinel集群包括三个sentinel节点sentinel1.sentinel2.seninel3,sentinel集群各节点之间互相监控哨兵运行状态. 2.Sentinel集群各节点分别与Redis主节点进行ping

对于HTML页面中CSS, JS, HTML的加载与执行过程的简单分析

最近在研究HTML页面中JavaScript的执行顺序问题.在JavaScript中,定义一个方法或者函数有很多方式,最常见的有2中,function语句式与函数直接量方式. 对于function语句式,解释器会优先解释.即加载了这个js文件后,会扫描一下所有的js代码,然后把该优先执行的东西先执行了,然后再从上到下按顺序执行.所以,定义的代码可以在执行的代码后边.就跟C#中的方法定义一样.解释器已经记住了这个方法,知道在内存中的哪里,用的时候直接去取就行了. C#语言是,对象中的属性与方法具有

Ffmpeg解析media容器过程/ ffmpeg 源代码简单分析 : av_read_frame()

ffmpeg 源代码简单分析 : av_read_frame() http://blog.csdn.net/leixiaohua1020/article/details/12678577 ffmpeg中的av_read_frame()的作用是读取码流中的音频若干帧或者视频一帧.例如,解码视频的时候,每解码一个视频帧,需要先调 用 av_read_frame()获得一帧视频的压缩数据,然后才能对该数据进行解码(例如H.264中一帧压缩数据通常对应一个NAL). 对该函数源代码的分析是很久之前做的了

从内存中分析程序的运行过程

我觉得图形是最可以直观一种解释方法,所以先把程序运行过程的图形解析流程给大家,通过图形来一步一步的理解才是最让人清楚,直观的: 流程图懂了,好多事情也就懂了!

appium界面运行过程(结合日志截图分析)

appium界面运行过程: 1.启动一个http服务器:127.0.0.1:47232.根据测试代码setUp()进行初始化,在http服务器上建立一个session对象3.开始调用adb,找到连接上的设备,设置设备id4.等待设备准备好响应命令5.开启logcat日志监控6.将生成的apk属性信息文件strings.json存到了设备 /data/local/tmp目录下7.读取apk安装情况8.端口映射,发给appium httpserver的内容,经过httpserver后直接发给设备 f

flume SinkProcessor 相关类实现分析

org.apache.flume.SinkProcessor 扩展了LifecycleAware, Configurable接口的接口类,操作多个sink的抽象层(类似于proxy),用来分配给SinkRunner对象抽象方法:process 和Sink 的process方法类似(内部实现增加了选择Sink的功能)setSinks 设置sinks具体实现类: org.apache.flume.sink.SinkProcessorFactory 设计模式的工厂模式,用于返回SinkProcesso