【Flume】【源码分析】flume中事件Event的数据结构分析以及Event分流

前言

首先来看一下flume官网中对Event的定义

一行文本内容会被反序列化成一个event【序列化是将对象状态转换为可保持或传输的格式的过程。与序列化相对的是反序列化,它将流转换为对象。这两个过程结合起来,可以轻松地存储和传输数据】,event的最大定义为2048字节,超过,则会切割,剩下的会被放到下一个event中,默认编码是UTF-8,这都是统一的。

但是这个解释是针对Avro反序列化系统中的Event的定义,而flume ng中很多event用的不是这个,所以你只要记住event的数据结构即可,上面这个解释可以忽略。

一、Event定义

public interface Event {

  /**
   * Returns a map of name-value pairs describing the data stored in the body.
   */
  public Map<String, String> getHeaders();

  /**
   * Set the event headers
   * @param headers Map of headers to replace the current headers.
   */
  public void setHeaders(Map<String, String> headers);

  /**
   * Returns the raw byte array of the data contained in this event.
   */
  public byte[] getBody();

  /**
   * Sets the raw byte array of the data contained in this event.
   * @param body The data.
   */
  public void setBody(byte[] body);

}

很简单的数据结构

header是一个map,body是一个字节数组,body才是我们实际使用中真正传输的数据,header传输的数据,我们是不会是sink出去的。

二、Event如何产出以及如何分流

while ((line = reader.readLine()) != null) {
            synchronized (eventList) {
              sourceCounter.incrementEventReceivedCount();
              eventList.add(EventBuilder.withBody(line.getBytes(charset)));
              if(eventList.size() >= bufferCount || timeout()) {
                flushEventBatch(eventList);
              }
            }
          }
 public static Event withBody(byte[] body, Map<String, String> headers) {
    Event event = new SimpleEvent();

    if(body == null) {
      body = new byte[0];
    }
    event.setBody(body);

    if (headers != null) {
      event.setHeaders(new HashMap<String, String>(headers));
    }

    return event;
  }

这里是单纯的包装了event的body内容,line即是我们真正的数据内容,将其转换成UTF-8编码的字节内容分装到event的body中,它的header是null。

用的是SimpleEvent类。

header的话,就是在分装Event对象的时候,我们可以自定义的设置一些key-value对,这样做的目的,是为了后续的通道多路复用做准备的

在source端产出event的时候,通过header去区别对待不同的event,然后在sink端的时候,我们就可以通过header中的key来将不同的event输出到对应的sink下游去,这样就将event分流出去了,但是这里有一个前提:不建议通过对event的body解析来设置header,因为flume就是一个水槽,水槽是不会在中间对水进行加工的,要加工,等水流出去了再加工

a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = host
a1.sources.r1.interceptors.i1.hostHeader = hostname

如上,host是你自定义的一个拦截器,hostHeader都是自定义的key,这样你就在event产出的时候,给各个event定义了不同的header,然后再通过多路复用通道的模式进行分流

a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = state
a1.sources.r1.selector.mapping.CZ = c1
a1.sources.r1.selector.mapping.US = c2 c3
a1.sources.r1.selector.default = c4

这样你就可以根据event的header中的key将其放入不同的channel中,紧接着,再通过配置多个sink去不同的channel取出event,将其分流到不同的输出端

每个sink配置的通道区别开就行了。

时间: 2024-10-13 08:46:26

【Flume】【源码分析】flume中事件Event的数据结构分析以及Event分流的相关文章

jQuery源码分析-jQuery中的循环技巧

Js代码   作者:nuysoft/JS攻城师/高云 QQ:47214707 EMail:[email protected] 声明:本文为原创文章,如需转载,请注明来源并保留原文链接. 前记:本文收集了jQuery中出现的各种遍历技巧和场景 Js代码   // 简单的for-in(事件) for ( type in events ) { } Js代码   // 缓存length属性,避免每次都去查找length属性,稍微提升遍历速度 // 但是如果遍历HTMLCollection时,性能提升非常

UiAutomator源码分析之注入事件

上一篇文章<UiAutomator源码分析之UiAutomatorBridge框架>中我们把UiAutomatorBridge以及它相关的类进行的描述,往下我们会尝试根据两个实例将这些类给串联起来,我准备做的是用如下两个很有代表性的实例: 注入事件 获取控件 这一篇文章我们会通过分析UiDevice的pressHome这个方法来分析UiAutomator是如何注入事件的,下一篇文章会描述如何获取控件,敬请期待. 1. UiObject.pressHome顺序图 首先我们看一下我手画的非规范的顺

vlc源码分析之调用live555接收RTSP数据

首先了解RTSP/RTP/RTCP相关概念,尤其是了解RTP协议:RTP与RTCP协议介绍(转载). vlc使用模块加载机制调用live555,调用live555的文件是live555.cpp. 一.几个重要的类 以下向左箭头("<-")为继承关系. 1. RTPInterface RTPInterface是RTPSource的成员变量,其成员函数handleRead会读取网络数据存入BufferedPacket内,该类最终会调到UDP的发送接收函数. Boolean RTPIn

libevent源码分析一--io事件响应

这篇文章将分析libevent如何组织io事件,如何捕捉事件的发生并进行相应的操作.这里不会详细分析event与event_base的细节,仅描述io事件如何存储与如何响应. 1.  select libevent的实现io事件的backend实际上使用的是io复用接口,如select, poll, epoll等,这里以最简单的select为例进行说明.首先简单介绍一下select接口: int select(int nfds, fd_set *readfds, fd_set *writefds

[Abp 源码分析]九、事件总线

0.简介 事件总线就是订阅/发布模式的一种实现,本质上事件总线的存在是为了降低耦合而存在的. 从上图可以看到事件由发布者发布到事件总线处理器当中,然后经由事件总线处理器调用订阅者的处理方法,而发布者和订阅者之间并没有耦合关系. 像 Windows 本身的设计也是基于事件驱动,当用户点击了某个按钮,那么就会触发相应的按钮点击事件,而程序只需要监听这个按钮点击事件即可进行相应的处理,而事件被触发的时候往往都会附带相应的事件源,事件所产生的数据等. 还是以按钮被点击为例,该事件被触发的时候会装填上触发

[Tomcat源码分析] Eclipse中搭建Apache Tomcat源码调试环境

网上很多文章都推荐使用Ant下载编译,但本地实践中屡屡失败,无法下载. 后来参考 https://blog.csdn.net/xiongyouqiang/article/details/78941077 总算把调试环境搭建完成. 以下文章几乎完全copy上述网址,但稍作延展. 下载源码 官网直接下载源码 http://tomcat.apache.org/download-70.cgi 源码导入到Eclipse中 第1步:Eclipse中新建一个Java Project,例如名称可以是Tomcat

redis源码分析(1)--makefile和目录结构分析

一.redis源码编译 redis可以直接在官网下载(本文使用版本 3.0.7):https://redis.io/download 安装: $ tar xzf redis-3.0.7.tar.gz $ cd redis-3.0.7 $ make make执行以后主要编译产物在src/redis-server src/redis-cli 如果想把redis-server直接install到可执行目录/usr/local/bin,还需要执行: $ make install Run Redis wi

zookeeper源码分析三LEADER与FOLLOWER同步数据流程

根据二)中的分析,如果一台zookeeper服务器成为集群中的leader,那么一定是当前所有服务器中保存数据最多的服务器,所以在这台服务器成为leader之后,首先要做的事情就是与集群中的其它服务器(现在是follower)同步数据,保证大家的数据一致,这个过程完毕了才开始正式处理来自客户端的连接请求. 首先来看Leader做的工作:二)中提到的同步数据时使用的逻辑时钟,它的初始值是0,每次选举过程都会递增的,在leader正式上任之后做的第一件事情,就是根据当前保存的数据id值,设置最新的逻

5. SOFAJRaft源码分析— RheaKV中如何存放数据?

概述 上一篇讲了RheaKV是如何进行初始化的,因为RheaKV主要是用来做KV存储的,RheaKV读写的是相当的复杂,一起写会篇幅太长,所以这一篇主要来讲一下RheaKV中如何存放数据. 我们这里使用一个客户端的例子来开始本次的讲解: public static void main(final String[] args) throws Exception { final Client client = new Client(); client.init(); //get(client.get