Flume NG 学习笔记(十) Transaction、Sink、Source和Channel开发

版权声明:本文为博主原创文章,未经博主允许不得转载。

目录(?)[+]

一、Transaction interface

Transaction接口是基于flume的稳定性考虑的。所有主要的组件(sources、sinks、channels)都必须使用Flume Transaction。我们也可以理解Transaction接口就是flume的事务,sources和sinks的发送数据与接受数据都是在一个Transaction里完成的。

从上图中可以看出,一个Transaction在Channel实现内实现。每一个连接到channel的source和sink都要获取一个Transaction对象。这Sources实际上使用了一个ChannelSelector接口来封装Transaction。存放事件到channel和从channel中提取事件的操作是在一个活跃的Transaction内执行的。

下面是官网例子

[java] view plain copy

  1. Channel ch = new MemoryChannel();
  2. Transaction txn = ch.getTransaction();
  3. txn.begin();
  4. try {
  5. // This try clause includes whatever Channel operations you want to do
  6. Event eventToStage = EventBuilder.withBody("Hello Flume!",
  7. Charset.forName("UTF-8"));
  8. ch.put(eventToStage);
  9. // Event takenEvent = ch.take();
  10. // ...
  11. txn.commit();
  12. } catch (Throwable t) {
  13. txn.rollback();
  14. // Log exception, handle individual exceptions as needed
  15. // re-throw all Errors
  16. if (t instanceof Error) {
  17. throw (Error)t;
  18. }
  19. } finally {
  20. txn.close();
  21. }

上面的代码是一个很简单的Transaction示例,在自定义Source与自定义Sink中都要使用。

二、自定义Sink开发

Sink提取event数据从channel中,然后直接将数据发送到下一个flume agent中或者存储到外部库中。

Sink和channel的关联关系可以在配置文件中配置。有一个SinkRunner实例与每一个已配置的Sink关联,当Flume框架调用SinkRunner.start()方法时候,将创建一个新的线程来驱动这Sink。

这个线程将管理这个Sink的生命周期。Sink需要实现LifecycleAware接口的start()和stop()方法。start()方法用于初始化数据;stop()用于释放资源;process()是从channel中提取event数据和转发数据的核心方法。

这Sink需要实现Configurable接口以便操作配置文件。

下面是官网例子:

[java] view plain copy

  1. public class MySink extends AbstractSink implements Configurable {
  2. private String myProp;
  3. @Override
  4. public void configure(Context context) {
  5. String myProp = context.getString("myProp", "defaultValue");
  6. // Process the myProp value (e.g. validation)
  7. // Store myProp for later retrieval by process() method
  8. this.myProp = myProp;
  9. }
  10. @Override
  11. public void start() {
  12. // Initialize the connection to the external repository (e.g. HDFS) that
  13. // this Sink will forward Events to ..
  14. }
  15. @Override
  16. public void stop () {
  17. // Disconnect from the external respository and do any
  18. // additional cleanup (e.g. releasing resources or nulling-out
  19. // field values) ..
  20. }
  21. @Override
  22. public Status process() throws EventDeliveryException {
  23. Status status = null;
  24. // Start transaction
  25. Channel ch = getChannel();
  26. Transaction txn = ch.getTransaction();
  27. txn.begin();
  28. try {
  29. // This try clause includes whatever Channel operations you want to do
  30. Event event = ch.take();
  31. // Send the Event to the external repository.
  32. // storeSomeData(e);
  33. txn.commit();
  34. status = Status.READY;
  35. } catch (Throwable t) {
  36. txn.rollback();
  37. // Log exception, handle individual exceptions as needed
  38. status = Status.BACKOFF;
  39. // re-throw all Errors
  40. if (t instanceof Error) {
  41. throw (Error)t;
  42. }
  43. } finally {
  44. txn.close();
  45. }
  46. return status;
  47. }
  48. }

下面是测试例子:

[java] view plain copy

  1. import org.apache.flume.Channel;
  2. import org.apache.flume.Context;
  3. import org.apache.flume.Event;
  4. import org.apache.flume.EventDeliveryException;
  5. import org.apache.flume.Transaction;
  6. import org.apache.flume.conf.Configurable;
  7. import org.apache.flume.sink.AbstractSink;
  8. public class Custom_Sink extends AbstractSink implements Configurable {
  9. private String myProp;
  10. @Override
  11. public void configure(Context context) {
  12. String myProp = context.getString("myProp", "defaultValue");
  13. // Process the myProp value (e.g. validation)
  14. // Store myProp for later retrieval by process() method
  15. this.myProp = myProp;
  16. }
  17. @Override
  18. public void start() {
  19. // Initialize the connection to the external repository (e.g. HDFS) that
  20. // this Sink will forward Events to ..
  21. }
  22. @Override
  23. public void stop () {
  24. // Disconnect from the external respository and do any
  25. // additional cleanup (e.g. releasing resources or nulling-out
  26. // field values) ..
  27. }
  28. @Override
  29. public Status process() throws EventDeliveryException {
  30. Status status = null;
  31. // Start transaction
  32. Channel ch = getChannel();
  33. Transaction txn = ch.getTransaction();
  34. txn.begin();
  35. try {
  36. // This try clause includes whatever Channel operations you want to do
  37. Event event = ch.take();
  38. String out = new String(event.getBody());
  39. // Send the Event to the external repository.
  40. // storeSomeData(e);
  41. System.out.println(out);
  42. txn.commit();
  43. status = Status.READY;
  44. } catch (Throwable t) {
  45. txn.rollback();
  46. // Log exception, handle individual exceptions as needed
  47. status = Status.BACKOFF;
  48. // re-throw all Errors
  49. if (t instanceof Error) {
  50. throw (Error)t;
  51. }
  52. } finally {
  53. txn.close();
  54. }
  55. return status;
  56. }
  57. }

上面的测试例子只输出事件的BODY信息,这里说明下直接用代码event.getBody().tostring() 输出是乱码。因为所有sink都是在Transaction里完成的,因此自定义开发sink是需要加上Transaction相关设置。

然后是测试配置,这里是自定义的jar 包是flumedev.Custom_Sink。注意,打包之后请放在目录$FLUME_HOME/lib下

[html] view plain copy

  1. #配置文件:custom_sink_case23.conf
  2. # Name the components on this agent
  3. a1.sources = r1
  4. a1.sinks = k1
  5. a1.channels = c1
  6. # Describe/configure the source
  7. a1.sources.r1.type = syslogtcp
  8. a1.sources.r1.port = 50000
  9. a1.sources.r1.bind = 192.168.233.128
  10. a1.sources.r1.channels = c1
  11. # Describe the sink
  12. a1.sinks.k1.channel = c1
  13. a1.sinks.k1.type = flumedev.Custom_Sink
  14. #a1.sinks.k1.type =logger
  15. # Use a channel which buffers events in memory
  16. a1.channels.c1.type = memory
  17. a1.channels.c1.capacity = 1000
  18. a1.channels.c1.transactionCapacity = 100

#敲命令

flume-ng agent -cconf -f conf/custom_sink_case23.conf -n a1 -Dflume.root.logger=INFO,console

启动成功后

打开另一个终端输入,往侦听端口送数据

echo "testcustom_sink" | nc 192.168.233.128 50000

#在启动的终端查看console输出

可以看到数据正常输出。

三、自定义Source开发

Source从外面接收数据并把数据存入Channel中。很少有人用。

下面是官网的例子

[java] view plain copy

  1. public class MySource extends AbstractSource implements Configurable, PollableSource {
  2. private String myProp;
  3. @Override
  4. public void configure(Context context) {
  5. String myProp = context.getString("myProp", "defaultValue");
  6. // Process the myProp value (e.g. validation, convert to another type, ...)
  7. // Store myProp for later retrieval by process() method
  8. this.myProp = myProp;
  9. }
  10. @Override
  11. public void start() {
  12. // Initialize the connection to the external client
  13. }
  14. @Override
  15. public void stop () {
  16. // Disconnect from external client and do any additional cleanup
  17. // (e.g. releasing resources or nulling-out field values) ..
  18. }
  19. @Override
  20. public Status process() throws EventDeliveryException {
  21. Status status = null;
  22. // Start transaction
  23. Channel ch = getChannel();
  24. Transaction txn = ch.getTransaction();
  25. txn.begin();
  26. try {
  27. // This try clause includes whatever Channel operations you want to do
  28. // Receive new data
  29. Event e = getSomeData();
  30. // Store the Event into this Source‘s associated Channel(s)
  31. getChannelProcessor().processEvent(e)
  32. txn.commit();
  33. status = Status.READY;
  34. } catch (Throwable t) {
  35. txn.rollback();
  36. // Log exception, handle individual exceptions as needed
  37. status = Status.BACKOFF;
  38. // re-throw all Errors
  39. if (t instanceof Error) {
  40. throw (Error)t;
  41. }
  42. } finally {
  43. txn.close();
  44. }
  45. return status;
  46. }
  47. }

测试的话,主要针对Event e 这里进行传输数据,这里就不测试了。

四、自定义Channel开发

官网说待定。

下面是美团网的自定义Channel 开发,下面是链接

http://tech.meituan.com/mt-log-system-optimization.html

……

Flume本身提供了MemoryChannel和FileChannel。MemoryChannel处理速度快,但缓存大小有限,且没有持久化;FileChannel则刚好相反。我们希望利用两者的优势,在Sink处理速度够快,Channel没有缓存过多日志的时候,就使用MemoryChannel,当Sink处理速度跟不上,又需要Channel能够缓存下应用端发送过来的日志时,就使用FileChannel,由此我们开发了DualChannel,能够智能的在两个Channel之间切换。

其具体的逻辑如下:

[java] view plain copy

  1. /***
  2. * putToMemChannel indicate put event to memChannel or fileChannel
  3. * takeFromMemChannel indicate take event from memChannel or fileChannel
  4. * */
  5. private AtomicBoolean putToMemChannel = new AtomicBoolean(true);
  6. private AtomicBoolean takeFromMemChannel = new AtomicBoolean(true);
  7. void doPut(Event event) {
  8. if (switchon && putToMemChannel.get()) {
  9. //往memChannel中写数据
  10. memTransaction.put(event);
  11. if ( memChannel.isFull() || fileChannel.getQueueSize() > 100) {
  12. putToMemChannel.set(false);
  13. }
  14. } else {
  15. //往fileChannel中写数据
  16. fileTransaction.put(event);
  17. }
  18. }
  19. Event doTake() {
  20. Event event = null;
  21. if ( takeFromMemChannel.get() ) {
  22. //从memChannel中取数据
  23. event = memTransaction.take();
  24. if (event == null) {
  25. takeFromMemChannel.set(false);
  26. }
  27. } else {
  28. //从fileChannel中取数据
  29. event = fileTransaction.take();
  30. if (event == null) {
  31. takeFromMemChannel.set(true);
  32. putToMemChannel.set(true);
  33. }
  34. }
  35. return event;
  36. }

这里要说明下,官网是建议使用file channel,虽然它的效率比较低,但是它能保证数据完整性,而memory channel效率高,但是只能对数据丢失和重复不太敏感的业务使用

时间: 2024-12-30 15:16:46

Flume NG 学习笔记(十) Transaction、Sink、Source和Channel开发的相关文章

Flume NG 学习笔记(四)Source配置

首先.这节水的东西就比较少了,大部分是例子. 一.Avro Source与Thrift Source Avro端口监听并接收来自外部的Avro客户流的事件.当内置Avro 去Sinks另一个配对Flume代理,它就可以创建分层采集的拓扑结构.官网说的比较绕,当然我的翻译也很弱,其实就是flume可以多级代理,然后代理与代理之间用Avro去连接 下面是官网给出的source的配置,加粗的参数是必选,描述就不解释了. Property Name Default Description channel

Flume NG 学习笔记(五)Sinks和Channel配置

一.HDFS Sink Flume Sink是将事件写入到Hadoop分布式文件系统(HDFS)中.主要是Flume在Hadoop环境中的应用,即Flume采集数据输出到HDFS,适用大数据日志场景. 目前,它支持HDFS的文本和序列文件格式,以及支持两个文件类型的压缩.支持将所用的时间.数据大小.事件的数量为操作参数,对HDFS文件进行关闭(关闭当前文件,并创建一个新的).它还可以对事源的机器名(hostname)及时间属性分离数据,即通过时间戳将数据分布到对应的文件路径. HDFS目录路径可

Flume NG 学习笔记(一)简介

一.简介 Flume是一个分布式.可靠.高可用的海量日志聚合系统,支持在系统中定制各类数据发送方,用于收集数据:同时,Flume提供对数据的简单处理,并写到各种数据接收方的能力. Flume在0.9.x and 1.x之间有较大的架构调整,1.x版本之后的改称Flume NG(next generation),0.9.x的称为Flume OG(originalgeneration). 对于OG版本, Flume NG (1.x.x)的主要变化如下: 1.sources和sinks 使用chann

Flume NG 学习笔记(二)单机与集群Flume 配置

下面的内容基本来自官网:http://flume.apache.org/FlumeUserGuide.html 本文使用的是最新版本的apache flume 1.5,安装完Flume然后测试下Flume是否可以用,在Flume目录下用以下语句测试: bin/flume-ng agent -n$agent_name -c conf -f conf/flume-conf.properties.template 结果如图显示: Ok,我们接下去看下面常用架构.功能配置示例 一.最简单的单一代理Flu

Flume NG 学习笔记(八)Interceptors(拦截器)测试

版权声明:本文为博主原创文章,未经博主允许不得转载. 目录(?)[+] 拦截器主要是对事件的header信息信息操作,要么直接忽略他,要么修改他的数据 一.Event Serializers file_roll sink 和hdfs sink 都支持EventSerializer接口 1.1.Body Text Serializer Body TextSerializer,别名:text.这个拦截器将把事件的body部分写入到输出流中而不需要任何转换或者修改.事件的header将直接被忽略. 下

Flume NG 学习笔记(三)流配置

版权声明:本文为博主原创文章,未经博主允许不得转载. 目录(?)[+] 在通过flume采集日志数据的时候,一般都是通过flume 代理从日志源或者日志客户端采集数据到flume代理中,然后再由flume代理送到目标存储.上图中就是每个一级flume代理负责从webserv采集数据,然后再由一个二级flume代理进行日志汇总. Flume支持从一个源发送事件到多个通道中,这被称为事件流的复用.这里需要在配置中定义事件流的复制/复用,选择1个或者多个通道进行数据流向. 下面的内容主要介绍flume

Swift学习笔记十二:下标脚本(subscript)

下标脚本就是对一个东西通过索引,快速取值的一种语法,例如数组的a[0].这就是一个下标脚本.通过索引0来快速取值.在Swift中,我们可以对类(Class).结构体(structure)和枚举(enumeration)中自己定义下标脚本的语法 一.常规定义 class Student{ var scores:Int[] = Array(count:5,repeatedValue:0) subscript(index:Int) -> Int{ get{ return scores[index];

第十七篇:实例分析(3)--初探WDDM驱动学习笔记(十)

续: 还是记录一下, BltFuncs.cpp中的函数作用: CONVERT_32BPP_TO_16BPP 是将32bit的pixel转换成16bit的形式. 输入是DWORD 32位中, BYTE 0,1,2分别是RGB分量, 而BYTE3则是不用的 为了不减少color的范围, 所以,都是取RGB8,8,8的高RGB5, 6, 5位, 然后将这16位构成一个pixel. CONVERT_16BPP_TO_32BPP是将16bit的pixel转换成32bit的形式 输入是WORD 16BIT中

初探swift语言的学习笔记十(block)

作者:fengsh998 原文地址:http://blog.csdn.net/fengsh998/article/details/35783341 转载请注明出处 如果觉得文章对你有所帮助,请通过留言或关注微信公众帐号fengsh998来支持我,谢谢! 在前面一些学习中,原本把闭包给理解成了block尽管有很多相似之处,但block还是有他自己的独特之外.近日,在写oc/swift混合编码时,有时候需要swift回调oc,oc回调swift . 因此我把swift中的 block 常见的声明和写