flume 1.8.0 开发基础

本文由云+社区发表

作者:皮皮熊

概述

Apache Flume是一个用于高效地从大量异构数据源收集、聚合、传输到一个集中式数据存储的分布式、高可靠、高可用的系统。

Apache Flume是Apache基金会的顶级项目。现在有两个代码版本线可以获取:0.9.x和1.x。本文档对应的是1.x版本。

数据流模型

Event是流经flume agent的最小数据单元。一个Event(由Event接口实现)从source流向channel,再到sink。Event包含了一个payload(byte array)和可选的header(string attributes)。一个flume agent就是一个jvm下的进程:控制着Events从一个外部的源头到一个外部的目的地。

Source消费着具有特殊格式的Events(这些Event传递到Source通过像Web server这样外在的数据源)。例如AvroSource可以被用于接收Avro的Events,从本客户端或者其他运行中的flume客户端。当一个Source接收到一个Event,它会把它插入到一个或者多个Channel里。Channel会被动地存储这些Event直到它们被一个Sink消费到。Flume中一种Channel是FileChannel,其使用文件系统来作为后端存储。Sink需要负责任地将一个Event从Channel中移除,并将其放入像hdfs一样的外部存储系统(例如HDFSEventSink),或者转发到传输中下一个节点的source中。Source和Sink在agent中异步地交互Channel中的Event。

可靠性

Event是存储在Flume agent的Channel里。Sink的责任就是传输Event到下一个agent或者最终的存储系统(像hdfs)。Sink只有当Event写入下一个agent的Channel 或者 存储到最终的系统时才会从channel里面删掉Event。这就是Flume如何在单跳消息传输中提供端到端的可靠性。Flume提供了一个事务性的方法来修复可靠传输中的Event。Source和Sink包含了Event的存储和重试(通过由channel提供的事务)。

构建Flume

获取源码

通过git

编译/测试 Flume

Flume使用maven来build。你可以通过标准的maven命令行来编译Flume。

  1. 仅编译:mvn clean compile
  2. 编译且运行单元测试:mvn clean test
  3. 运行独立的测试:mvn clean test -Dtest=,,... -DfailIfNoTests=false
  4. 打包:mvn clean install
  5. 打包(忽略单元测试):mvn clean install -DskipTests

注意:Flume build需要在path中有Google Protocol Buffers编译器。

更新Protocol Buffer版本

File channel依赖Protocol Buffer。当你想更新Protocol Buffer版本时,你需要如下更新使用到Protocol Buffer的data access类:

  1. 本机安装你想要的PB版本
  2. 更新pom.xml中PB的版本
  3. 生成flume中新的PB data access类:cd flume-ng-channels/flume-file-channel; mvn -P compile-proto clean package -DskipTests
  4. 在所有生成文件中加上Apache license(如果缺了的话)
  5. rebuild及测试Flume:cd ../..; mvn clean install

开发自定义部分

client

Client在Event产生时运转,并将他们传递到Flume的agent。Client通常运行在应用消费数据的进程空间中。Flume目前支持Avro, log4j, syslog, 以及 Http POST (with a JSON body)方式从外部数据源传输数据。同时ExecSource支持将本地进程的输出作为Flume的输入。

可能已有的方案是不够的。本案例中你可以使用自定义的方法来向flume发送数据。这里有两种方法来实现。第一:写一个自定义的客户端来和flume已有的source交互,像AvroSource 或者 SyslogTcpSource。此时Client需要将数据转换成这些Source能理解的message。另外一个方案:写一个自定义的Flume Source,通过IPC或者RPC,直接地和已有的client应用通信(需要将client的数据转换成Flume的Event)。注意这些存储在flume agent channel中的事件,必须以Flume Event形式存在。

Client SDK

尽管Flume包含了一系列内置的,用于接收数据的方法(即Source),人们常常想直接地通过flume和自定义的程序进行通信。Flume SDK 就是这样一个lib,它可以通过RPC直接地连接到Flume,并且发送到Flume的数据流。

RPC客户端接口

一个RPC客户端接口的实现,包含了支持Flume的RPC方法。用户的程序可以简单地调用Flume SDK客户端的append(Event)或者appendBatch(List)接口来发送数据,而不用考虑消息交互的细节。用户可以通过使用诸如SimpleEvent类,或者使用EventBuilder的 静态helper方法withBody(),便捷地实现直接提供事件接口所需的事件ARG。

Transaction(事务)接口

Transaction接口是Flume可靠性的基础。所有主要组件(即source,sink和channel)必须使用Flume Transaction。

Transaction在channel的实现中实现。每个source和sink连接到channel时必须要得到一个channnel的对象。Source使用channnelprocessor来管理transaction。sink明确地通过他们配置的channel来管理transaction。存储一个事件(把他们放入channnel中)或者抽取一个事件(从channnel中取出)在一个激活的transaction中完成。例如:

Channel ch = new MemoryChannel();
Transaction txn = ch.getTransaction();
txn.begin();
try {
  // This try clause includes whatever Channel operations you want to do

  Event eventToStage = EventBuilder.withBody("Hello Flume!",
                       Charset.forName("UTF-8"));
  ch.put(eventToStage);
  // Event takenEvent = ch.take();
  // ...
  txn.commit();
} catch (Throwable t) {
  txn.rollback();

  // Log exception, handle individual exceptions as needed

  // re-throw all Errors
  if (t instanceof Error) {
    throw (Error)t;
  }
} finally {
  txn.close();
}

在这里,我们从channel获取transaction。在begin()返回后,Transaction现在处于活动/打开状态,然后将Event放入Channel中。如果put成功,则提交并关闭Transaction。

Sink

Sink的目的就是从Channel中提取事件并将其转发到传输中的下一个Flume Agent或将它们存储在外部存储库中。根据Flume属性文件中的配置,接收器只与一个通道关联。每个已配置的Sink都有一个SinkRunner实例,当Flume框架调用SinkRunner.start()时,会创建一个新线程来驱动Sink(使用SinkRunner.PollingRunner作为线程的Runnable),该线程管理Sink的生命周期。Sink需要实现start()和stop()方法作为LifecycleAware接口的一部分。

  • Sink.start()方法应初始化Sink并将其置于可将事件转发到其下一个目标的状态。
  • Sink.process()应该执行从Channel提取Event并转发它的核心处理过程。
  • Sink.stop()方法应该进行必要的清理(例如释放资源)。

Sink实现还需要实现Configurable接口来处理自己的配置设置。例如:

public class MySink extends AbstractSink implements Configurable {
  private String myProp;

  @Override
  public void configure(Context context) {
    String myProp = context.getString("myProp", "defaultValue");

    // Process the myProp value (e.g. validation)

    // Store myProp for later retrieval by process() method
    this.myProp = myProp;
  }

  @Override
  public void start() {
    // Initialize the connection to the external repository (e.g. HDFS) that
    // this Sink will forward Events to ..
  }

  @Override
  public void stop () {
    // Disconnect from the external respository and do any
    // additional cleanup (e.g. releasing resources or nulling-out
    // field values) ..
  }

  @Override
  public Status process() throws EventDeliveryException {
    Status status = null;

    // Start transaction
    Channel ch = getChannel();
    Transaction txn = ch.getTransaction();
    txn.begin();
    try {
      // This try clause includes whatever Channel operations you want to do

      Event event = ch.take();

      // Send the Event to the external repository.
      // storeSomeData(e);

      txn.commit();
      status = Status.READY;
    } catch (Throwable t) {
      txn.rollback();

      // Log exception, handle individual exceptions as needed

      status = Status.BACKOFF;

      // re-throw all Errors
      if (t instanceof Error) {
        throw (Error)t;
      }
    }
    return status;
  }
}

Source

Source的目的是从外部客户端接收数据并将其存储到已配置的Channels中。Source可以获取其自己的ChannelProcessor的实例来处理在Channel本地事务中提交的串行事件。在exception的情况下,需要Channels传播异常,则所有Channels将回滚其事务,但先前在其他Channel上处理的事件将保持提交。

与SinkRunner.PollingRunner Runnable类似,有一个PollingRunner Runnable,它在Flume框架调用PollableSourceRunner.start()时创建的线程上执行。每个配置的PollableSource都与自己运行PollingRunner的线程相关联。该线程管理PollableSource的生命周期,例如启动和停止。

  • PollableSource必须实现LifecycleAware接口中声明的start()和stop()方法。
  • PollableSource的运行器调用Source的process()方法。 process()方法应检查新数据并将其作为Flume事件存储到Channel中。

注意,实际上有两种类型的Source:已经提到过PollableSource,另一个是EventDrivenSource。与PollableSource不同,EventDrivenSource必须有自己的回调机制,捕获新数据并将其存储到Channel中。EventDrivenSources并不像PollableSources那样由它们自己的线程驱动。下面是一个自定义PollableSource的示例:

public class MySource extends AbstractSource implements Configurable, PollableSource {
  private String myProp;

  @Override
  public void configure(Context context) {
    String myProp = context.getString("myProp", "defaultValue");

    // Process the myProp value (e.g. validation, convert to another type, ...)

    // Store myProp for later retrieval by process() method
    this.myProp = myProp;
  }

  @Override
  public void start() {
    // Initialize the connection to the external client
  }

  @Override
  public void stop () {
    // Disconnect from external client and do any additional cleanup
    // (e.g. releasing resources or nulling-out field values) ..
  }

  @Override
  public Status process() throws EventDeliveryException {
    Status status = null;

    try {
      // This try clause includes whatever Channel/Event operations you want to do

      // Receive new data
      Event e = getSomeData();

      // Store the Event into this Source's associated Channel(s)
      getChannelProcessor().processEvent(e);

      status = Status.READY;
    } catch (Throwable t) {
      // Log exception, handle individual exceptions as needed

      status = Status.BACKOFF;

      // re-throw all Errors
      if (t instanceof Error) {
        throw (Error)t;
      }
    } finally {
      txn.close();
    }
    return status;
  }
}

参考自(Flume 1.8.0 Developer Guide)

flume 1.8.0 文档完整翻译可见 https://blog.csdn.net/u013128262

此文已由腾讯云+社区在各渠道发布

获取更多新鲜技术干货,可以关注我们腾讯云技术社区-云加社区官方号及知乎机构号

原文地址:https://www.cnblogs.com/qcloud1001/p/10455466.html

时间: 2024-08-27 23:53:18

flume 1.8.0 开发基础的相关文章

使用BleLib的轻松搞定Android低功耗蓝牙Ble 4.0开发具体解释

转载请注明来源: http://blog.csdn.net/kjunchen/article/details/50909410 使用BleLib的轻松搞定Android低功耗蓝牙Ble 4.0开发具体解释 演示样例源代码: https://github.com/junkchen/BleLib/tree/master/sample Android ble4.0开发基础篇:http://blog.csdn.net/kjunchen/article/details/50339549 BleLib是An

深入理解iPhone数据持久化(手把手教你iphone开发 – 基础篇)

在所有的移动开发平台数据持久化都是很重要的部分:在j2me中是rms或保存在应用程序的目录中,在symbian中可以保存在相应的磁盘目录中和数据库中.symbian中因为权限认证的原因,在3rd上大多数只能访问应用程序的private目录或其它系统共享目录.在iphone中,apple博采众长,提供了多种数据持久化的方法,下面笔者会逐个进行详细的讲解. iphone提供的数据持久化的方法,从数据保存的方式上讲可以分为三大部分:属性列表.对象归档.嵌入式数据库(SQLite3).其他方法. 一.属

独家全功能USB2.0开发板,最强CY7C68013A-128核心板,超强资料不断提供更新服务

       学习USB开发再也不用买书啦,也不用花费重金去上培训班啦,IFLabs开创USB学习开发新模式,你所需要的知识.技术.范例.代码等等统统都在这里.IFLabs打造全网最丰富.最权威的USB开发平台和资料. 热卖的IFLabs精品USB2.0核心板套件再升级,全网独家推出最强.最全功能的Cypress USB 2.0 CY7C68013A-128AXC核心板开发板,全网最全配件和全网最全开发资料.只需这一次投入,即可实现USB接口开发的从入门到精通!并且有长期的全网最全开发手册更新支

一起学Google Daydream VR开发,快速入门开发基础教程一:Android端开发环境配置一

原文因涉及翻墙信息,被强制删除,此文为补发! 准备工作 进入Google Daydream开发者官网,开启准备工作,官网地址:https://vr.google.com/daydream/developers/ -------------------------------------------------------------------------------------------------------------------- Google Daydream开发者网址: https

20145239 杜文超 实验四 Android开发基础

20145239实验四 Android开发基础 实验内容 基于Android Studio开发简单的Android应用并部署测试 了解Android组件.布局管理器的使用 掌握Android中事件处理机制 Android Studio安装 实验步骤 安装 JDK 并配置 JDK 环境变量 依次使用计算机->系统属性->高级系统设置->高级->环境变量,然后新建一个JAVA_HOME变量,令它为计算机中安装JDK的位置: 安装Andriod Studio并配置 1.双击运行Andri

ASP.Net MVC开发基础学习笔记(3):Razor视图引擎、控制器与路由机制学习

首页 头条 文章 频道                         设计频道 Web前端 Python开发 Java技术 Android应用 iOS应用 资源 小组 相亲 频道 首页 头条 文章 小组 相亲 资源 设计 前端 Python Java 安卓 iOS 登录 注册 首页 最新文章 经典回顾 开发 Web前端 Python Android iOS Java C/C++ PHP .NET Ruby Go 设计 UI设计 网页设计 交互设计 用户体验 设计教程 设计职场 极客 IT技术

Struts2开发基础

Struts2开发基础 struts2采用拦截器的机制来处理用户的请求,使得业务逻辑控制器能够与ServletAPI完全脱离开. 1. Hello World! 配置web.xml <?xml version="1.0" encoding="UTF-8"?> <web-app id="WebApp_9" version="2.4" xmlns="http://java.sun.com/xml/ns/

.NET基础拾遗(5)多线程开发基础

Index : (1)类型语法.内存管理和垃圾回收基础 (2)面向对象的实现和异常的处理基础 (3)字符串.集合与流 (4)委托.事件.反射与特性 (5)多线程开发基础 一.多线程编程的基本概念 下面的一些基本概念可能和.NET的联系并不大,但对于掌握.NET中的多线程开发来说却十分重要.我们在开始尝试多线程开发前,应该对这些基础知识有所掌握,并且能够在操作系统层面理解多线程的运行方式. 1.1 操作系统层面的进程和线程 (1)进程 进程代表了操作系统上运行着的一个应用程序.进程拥有自己的程序块

iOS开发基础知识--碎片32

 iOS开发基础知识--碎片32 1:动画属性UIViewAnimationOptions说明 a:常规动画属性设置(可以同时选择多个进行设置) UIViewAnimationOptionLayoutSubviews:动画过程中保证子视图跟随运动. UIViewAnimationOptionAllowUserInteraction:动画过程中允许用户交互. UIViewAnimationOptionBeginFromCurrentState:所有视图从当前状态开始运行. UIViewAnimat