ChannelProcessor的实现分析

org.apache.flume.channel.ChannelProcessor 用于实际的Event到Channel的操作(在Source中用到),可以把它想象成channel的proxy,用于控制把Event put到哪些Channel中,以及怎么put(bacth或者单个),同时在put之前会使用 Interceptor对Event进行处理。
把Event put到哪些Channel中是由ChannelSelector 控制的,根据selector的设置,目前主要有两种:

REPLICATING->org.apache.flume.channel.ReplicatingChannelSelector,
MULTIPLEXING->org.apache.flume.channel.MultiplexingChannelSelector;

REPLICATING 把Event发送到每一个对应的channel上,每个channel都有完整的一份。
MULTIPLEXING 把Event发送到设置的映射的channel上,类似于hash,每个channel包含一部分
org.apache.flume.channel.MultiplexingChannelSelector会根据header(默认为flume.selector.header),mapping,default,optional的设置获取channel。

这里看下org.apache.flume.channel.ReplicatingChannelSelector的实现,可以看出有两个channel列表,optional和require,分布对应getOptionalChannels和getRequiredChannels方法,如果设置了optional,optionalChannels为optional的设置,requiredChannels为getAllChannels的设置减去optionalChannels的设置

  public void configure(Context context) { //通过configure配置requiredChannels 
    String optionalList = context.getString(CONFIG_OPTIONAL); //根据optional的设置
    requiredChannels = new ArrayList<Channel>(getAllChannels()); //初始时requiredChannels 即为getAllChannels
    Map<String, Channel> channelNameMap = getChannelNameMap();
    if(optionalList != null && !optionalList.isEmpty()) { //如果optional的设置不为空
      for(String optional : optionalList.split("\\s+")) { //对optional按空格进行split
        Channel optionalChannel = channelNameMap.get(optional);
        requiredChannels.remove(optionalChannel); //从requiredChannels 数组中去除optionalChannel
        if (!optionalChannels.contains(optionalChannel)) {
          optionalChannels.add(optionalChannel); //添加到optionalChannels
        }
      }
    }
  }

ChannelProcessor的初始调用在SourceRunner中,比如在org.apache.flume.source.EventDrivenSourceRunner的start方法:

  public void start() {
    Source source = getSource(); //通过getSource获取Source对象
    ChannelProcessor cp = source.getChannelProcessor(); //获取ChannelProcessor 对象
    cp.initialize(); //调用ChannelProcessor.initialize方法
    source.start(); //调用Source.start方法
    lifecycleState = LifecycleState. START;
  }

而在org.apache.flume.source.ExecSource.ExecRunnable类中会调用其processEventBatch方法,进行批量插入数据

           while ((line = reader.readLine()) != null) {
            counterGroup.incrementAndGet("exec.lines.read" );
            eventList.add(EventBuilder. withBody(line.getBytes(charset)));
            if(eventList.size() >= bufferCount ) {
              channelProcessor.processEventBatch(eventList);
              eventList.clear();
            }
          }

看下ChannelProcessor的具体实现:
首先两个重要的属性

  private final ChannelSelector selector ;
  private final InterceptorChain interceptorChain ;

initialize方法调用InterceptorChain.initialize方法,初始化interceptorChain

  public void initialize() {
    interceptorChain.initialize();
  }

configure方法调用configureInterceptors方法,用于根据interceptors设置InterceptorChain

private void configureInterceptors(Context context) {
    List<Interceptor> interceptors = Lists.newLinkedList();
    String interceptorListStr = context.getString( "interceptors", "" ); //获取interceptors的设置
    if (interceptorListStr.isEmpty()) {
      return;
    }
    String[] interceptorNames = interceptorListStr.split( "\\s+"); //根据空格分隔
    Context interceptorContexts =
        new Context(context.getSubProperties("interceptors." ));
    // run through and instantiate all the interceptors specified in the Context
    InterceptorBuilderFactory factory = new InterceptorBuilderFactory();
    for (String interceptorName : interceptorNames) {
      Context interceptorContext = new Context(
          interceptorContexts.getSubProperties(interceptorName + "."));
      String type = interceptorContext.getString( "type");
      if (type == null) {
        LOG.error("Type not specified for interceptor " + interceptorName);
        throw new FlumeException("Interceptor.Type not specified for " +
          interceptorName);
      }
      try {
        Interceptor.Builder builder = factory.newInstance(type); //根据type的设置获取Interceptor
        builder.configure(interceptorContext);
        interceptors.add(builder.build());
......
    }
    interceptorChain.setInterceptors(interceptors);
  }

另外提供了两个插入数据的方法,processEventBatch和processEvent,processEventBatch用于插入一批Event(参数是List<Event> events),processEvent用于插入一个Event。
看下processEvent的实现:

  public void processEvent(Event event) {
    event = interceptorChain.intercept(event); //调用InterceptorChain.intercept对Event进行处理
    if (event == null) {
      return;
    }
    // Process required channels
    List<Channel> requiredChannels = selector.getRequiredChannels(event); // 根据ChannelSelector获取requiredChannels 
    for (Channel reqChannel : requiredChannels) { // 对requiredChannels 中的每一个channel执行对应的put操作,每个操作都在一个事务内
      Transaction tx = reqChannel.getTransaction();
      Preconditions.checkNotNull(tx, "Transaction object must not be null");
      try {
        tx.begin();
        reqChannel.put(event);
        tx.commit();
      } catch (Throwable t) {
        tx.rollback();
        if (t instanceof Error) {
          LOG.error("Error while writing to required channel: " +
              reqChannel, t);
          throw (Error) t;
        } else {
          throw new ChannelException("Unable to put event on required " +
              "channel: " + reqChannel, t);
        }
      } finally {
        if (tx != null) {
          tx.close();
        }
      }
    }
    // Process optional channels
    List<Channel> optionalChannels = selector.getOptionalChannels(event); //同样对optionalChannels做相同的操作
    for (Channel optChannel : optionalChannels) {
      Transaction tx = null;
      try {
        tx = optChannel.getTransaction();
        tx.begin();
        optChannel.put(event);
        tx.commit();
      } catch (Throwable t) {
        tx.rollback();
        LOG.error("Unable to put event on optional channel: " + optChannel, t);
        if (t instanceof Error) {
          throw (Error) t;
        }
      } finally {
        if (tx != null) {
          tx.close();
        }
      }
    }
  }
时间: 2024-10-22 11:03:48

ChannelProcessor的实现分析的相关文章

Flume NG源码分析(五)使用ThriftSource通过RPC方式收集日志

上一篇说了利用ExecSource从本地日志文件异步的收集日志,这篇说说采用RPC方式同步收集日志的方式.笔者对Thrift比较熟悉,所以用ThriftSource来介绍RPC的日志收集方式. 整体的结构图如下: 1. ThriftSource包含了一个Thrift Server,以及一个Thrift Service服务的实现.这里的Thrift Service是由ThriftSourceProtocol定义 2. 应用程序调用Thrift Service的客户端,以RPC的方式将日志发送到Th

Flume-NG启动过程源码分析(三)(原创)

上一篇文章分析了Flume如何加载配置文件的,动态加载也只是重复运行getConfiguration(). 本篇分析加载配置文件后各个组件是如何运行的? 加载完配置文件订阅者Application类会收到订阅信息执行: @Subscribe public synchronized void handleConfigurationEvent(MaterializedConfiguration conf) { stopAllComponents(); startAllComponents(conf)

【Java】【Flume】Flume-NG启动过程源码分析(二)

本节分析配置文件的解析,即PollingPropertiesFileConfigurationProvider.FileWatcherRunnable.run中的eventBus.post(getConfiguration()).分析getConfiguration()方法.此方法在AbstractConfigurationProvider类中实现了,并且这个类也初始化了三大组件的工厂类:this.sourceFactory = new DefaultSourceFactory();this.s

【Java】【Flume】Flume-NG启动过程源码分析(三)

本篇分析加载配置文件后各个组件是如何运行的? 加载完配置文件订阅者Application类会收到订阅信息执行: @Subscribe public synchronized void handleConfigurationEvent(MaterializedConfiguration conf) { stopAllComponents(); startAllComponents(conf); } MaterializedConfiguration conf就是getConfiguration()

SourceRunner与ExecSource实现分析

在agent启动时,会启动Channel,SourceRunner,SinkRunner,比如在org.apache.flume.agent.embedded.EmbeddedAgent类的doStart方法中:   private void doStart() {     boolean error = true;     try {       channel.start(); //调用Channel.start启动Channel       sinkRunner.start(); //调用

spring security源码分析之二---web包分析

Spring 是一个非常流行和成功的 Java 应用开发框架.Spring Security 基于 Spring 框架,提供了一套 Web 应用安全性的完整解决方案.一般来说,Web 应用的安全性包括用户认证(Authentication)和用户授权(Authorization)两个部分.用户认证指的是验证某个用户是否为系统中的合法主体,也就是说用户能否访问该系统.用户认证一般要求用户提供用户名和密码.系统通过校验用户名和密码来完成认证过程.用户授权指的是验证某个用户是否有权限执行某个操作.在一

爱奇艺、优酷、腾讯视频竞品分析报告2016(一)

1 背景 1.1 行业背景 1.1.1 移动端网民规模过半,使用时长份额超PC端 2016年1月22日,中国互联网络信息中心 (CNNIC)发布第37次<中国互联网络发展状况统计报告>,报告显示,网民的上网设备正在向手机端集中,手机成为拉动网民规模增长的主要因素.截至2015年12月,我国手机网民规模达6.20亿,有90.1%的网民通过手机上网. 图 1  2013Q1~2015Q3在线视频移动端和PC端有效使用时长份额对比 根据艾瑞网民行为监测系统iUserTracker及mUserTrac

Tomcat启动分析(我们为什么要配置CATALINA_HOME环境变量)

原文:http://www.cnblogs.com/heshan664754022/archive/2013/03/27/2984357.html Tomcat启动分析(我们为什么要配置CATALINA_HOME环境变量) 用文本编辑工具打开用于启动Tomcat的批处理文件startup.bat,仔细阅读.在这个文件中,首先判断CATALINA_HOME环境变量是否为空,如果为空,就将当前目录设为CATALINA_HOME的值.接着判断当前目录下是否存在bin\catalina.bat,如果文件

C# 最佳工具集合: IDE 、分析、自动化工具等

C#是企业中广泛使用的编程语言,特别是那些依赖微软的程序语言.如果您使用C#构建应用程序,则最有可能使用Visual Studio,并且已经寻找了一些扩展来对您的开发进行管理.但是,这个工具列表可能会改变您编写C#代码的方式. C#编程的最佳工具有以下几类: IDE VS扩展 编译器.编辑器和序列化 反编译和代码转换工具 构建自动化和合并工具 版本控制 测试工具和VS扩展 性能分析 APM 部署自动化 容器 使用上面的链接直接跳转到特定工具,或继续阅读以浏览完整列表.