Flume自定义Source

 大家好。

公司有个需求。要求Flumne 从MQ 取消息存储到DFS ,写了Flume自定义的source 。,由于我也是刚接触Flume 。 所以有啥不对的请谅解。

查看了Flume-ng的源码。  一般都是根据不同的场景  extends AbstractSource implements EventDrivenSource, Configurable

MQSource 代码如下:

 1 public class MQSource extends AbstractSource implements EventDrivenSource, Configurable {
 2
 3     private Logger logger = org.slf4j.LoggerFactory.getLogger(MQSource.class);
 4
 5     private long heartbeat;
 6
 7     private MQReceiver receiver;
 8
 9     private HandleLineCallBack handle;
10
11     private Thread t;
12
13     @Override
14     public void configure(Context context) {
15
16         String mq_url = context.getString(MQContext.MQ_BROKER_URI, ActiveMQConnection.DEFAULT_BROKER_URL);
17         String mq_userName = context.getString(MQContext.MQ_USERNAME, ActiveMQConnection.DEFAULT_USER);
18         String mq_password = context.getString(MQContext.MQ_PASSWORD, ActiveMQConnection.DEFAULT_PASSWORD);
19         String mq_queueKey = context.getString(MQContext.MQ_QUEUEKEY, "NULL");
20         String handleClass = context.getString(MQContext.HANDLECLASS, "NULL");
21
22         long mq_reciveTimeout = context.getLong(MQContext.MQ_RECIVETIMEOUT, 3000L);
23         long heartbeat = context.getLong(MQContext.HEARTBEAT, 3000L);
24         this.heartbeat = heartbeat;
25         if ("NULL".equals(mq_queueKey)) {
26             logger.error("{} :  Unable to load MQ_queueKey ", getName());
27             return;
28         }
29         if ("NULL".equals(handleClass)) {
30             logger.warn("{} :  Unable to handleClass using DefaultHandle ", getName());
31             handleClass = "com.bidlink.handle.DefaultHandle";
32         }
33
34         MQConfig mqconfig = new MQConfig(mq_url, mq_userName, mq_password, mq_queueKey, mq_reciveTimeout);
35         logger.info("{}  MQ Configuration : {} ", getName(), mqconfig.toString());
36         receiver = MQFactory.MQ.getReceiver(mqconfig);
37         logger.info("{} .get recerver key is {} .  obj is  : {} ", getName(), mqconfig.getQueueKey(), receiver);
38
39         try {
40             @SuppressWarnings("unchecked")
41             Class<?> handleClazz = (Class<? extends HandleLineCallBack>) Class.forName(handleClass);
42             handle = (HandleLineCallBack) handleClazz.newInstance();
43         } catch (ClassNotFoundException e) {
44             logger.error("{} unable to load class  {}  . {} ", getName(), handleClass, e);
45         } catch (InstantiationException e1) {
46             logger.error("{} instance class error {} . {} ", getName(), handleClass, e1);
47         } catch (IllegalAccessException e2) {
48             logger.error("{} occur  exception {} . {} ", getName(), handleClass, e2);
49         }
50     }
51
52     @Override
53     public synchronized void start() {
54         logger.info("MQSource start.....");
55         // TODO Auto-generated method stub
56         try {
57             t = new Thread() {
58                 public void run() {
59                     while (true) {
60                         try {
61                             List<String> lines = receiver.getText();
62                             for (String line : lines) {
63                                 //logger.info("Message line : {} ",line);
64                                 Event e = new SimpleEvent();
65                                 String refStr = handle.refactor(line);
66                                 e.setBody(refStr.getBytes("GBK"));
67                                 getChannelProcessor().processEvent(e);
68                             }
69                             super.start();
70                             Thread.sleep(heartbeat);
71                         } catch (Exception e1) {
72                             e1.printStackTrace();
73                         }
74
75                     }
76                 };
77             };
78             t.start();
79         } catch (Exception e1) {
80             logger.error("error starting MQResource {} ",e1.getMessage());
81             e1.printStackTrace();
82         }
83     }
84
85     @Override
86     public synchronized void stop() {
87         logger.info("MQSource stoping...");
88         if (t.isAlive()) {
89             try {
90                 t.join();
91             } catch (InterruptedException e) {
92                 e.printStackTrace();
93             }
94             t.interrupt();
95         }
96         super.stop();
97     }
98
99 }

start方法中主要代码:

  Event e = new SimpleEvent();

  e.setBody("hello everyone ".getBytes("GBK"));
   getChannelProcessor().processEvent(e);

  super.start();
configure方法中的context中能获取各种自定义的配置信息。如在flume.conf中配置以下信息:
tier1.sources.testSources.type = org.yunjume.source.MQSource
tier1.sources.testSources.MQ_userName= admin
tier1.sources.testSources.MQ_password= admin123
tier1.sources.testSources.MQ_brokerURL=tcp://localhost:61616
tier1.sources.testSources.MQ_queueKey=FirstQueue
tier1.sources.testSources.MQ_reciveTimeout=30000
tier1.sources.testSources.heartbeat=30000
# to process mq message queue line and return new line .
tier1.sources.testSources.handleClass=org.yunjume.handle.DefaultHandle
tier1.sources.testSources.channels = testChannels


获取MQ_userName值代码为:
String mq_userName = context.getString("MQ_userName", ActiveMQConnection.DEFAULT_USER);

stop 就结束了。

打包jar 放到Flume主目录的插件目录下。我的是/usr/lib/flume-ng/plugins.d

如打包的名字叫MQSource.jar 那应该在plugins.d创建文件夹 MQSource 然后把MQSource.jar放到MQSource/lib下。

依赖的jar 放到 MQSource/libext下 。目录结构如下

/usr/lib/flume-ng/plugins.d/MQSource/lib/MQSource.jar

/usr/lib/flume-ng/plugins.d/MQSource/libext/   依赖的jar包

/usr/lib/flume-ng/plugins.d/MQSource/native   本地so文件或dll文件

时间: 2024-10-11 22:34:41

Flume自定义Source的相关文章

flume 自定义 hbase sink 类

参考(向原作者致敬) http://ydt619.blog.51cto.com/316163/1230586 https://blogs.apache.org/flume/entry/streaming_data_into_apache_hbase flume 1.5 的配置文件示例 #Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the

Flume学习之路 (二)Flume的Source类型

一.概述 官方文档介绍:http://flume.apache.org/FlumeUserGuide.html#flume-sources 二.Flume Sources 描述 2.1 Avro Source 2.1.1 介绍 Avro端口监听并接收来自外部的Avro客户流的事件.当内置Avro去Sinks另一个配对Flume代理,它就可以创建分层采集的拓扑结构.官网说的比较绕,当然我的翻译也很弱,其实就是flume可以多级代理,然后代理与代理之间用Avro去连接.==字体加粗的属性必须进行设置

Flume Avro Source 远程连接拒绝的解决办法

昨天做了一个Java连接虚拟机,实现Flume Avro Source 的远程连接,确报了一个这样的错,经过了一晚上,终于找到了解决的方案. 我来给大家分享一下! 报错如下: Exception in thread "main" org.apache.flume.FlumeException:NettyAvroRpcClient{ host:xxxx,port:xxxx}:RPC connection error 解决的办法是: 把配置文件中a1.sources.r1.bind必须设置

C++ Thrift Client 与 Flume Thrift Source 对接

项目需要C++代码与flume对接,进而将日志写入HDFS.flume原生为java代码,原先的解决方案是通过JNI调用flume java方法.但是由于一来对jni的调用效率的担心,二来C++调用JNI需要照顾local reference和GC的问题,被搞得头痛了.一怒之下,重写代码了,使用C++与远端的JAVA Flume对接. 在协议的选择上,AVRO C++虽然也有apache的开源项目,但是目前只支持读写文件,而不能使用RPC.故使用了thrift与远端Flume thrift so

Flume -- 初识flume、source和sink

Flume – 初识flume.source和sink 目录基本概念常用源 Source常用sink 基本概念 ? 什么叫flume? 分布式,可靠的大量日志收集.聚合和移动工具. ? events 事件,是一行数据的字节数据,是flume发送文件的基本单位. ? flume配置文件 重命名flume-env.sh.template为flume-env.sh,并添加[export JAVA_HOME=/soft/jdk] ? flume的Agent source //从哪儿读数据. 负责监控并收

Hadoop实战-Flume之自定义Source(十八)

import java.nio.charset.Charset; import java.util.HashMap; import java.util.Random; import org.apache.flume.Context; import org.apache.flume.EventDeliveryException; import org.apache.flume.PollableSource; import org.apache.flume.conf.Configurable; im

flume的source, channel, sink 列表

Flume Source Source类型 说明 Avro Source 支持Avro协议(实际上是Avro RPC),内置支持 Thrift Source 支持Thrift协议,内置支持 Exec Source 基于Unix的command在标准输出上生产数据 JMS Source 从JMS系统(消息.主题)中读取数据,ActiveMQ已经测试过 Spooling Directory Source 监控指定目录内数据变更 Twitter 1% firehose Source 通过API持续下载

Hadoop实战-Flume之Source replicating(十四)

a1.sources = r1 a1.sinks = k1 k2 a1.channels = c1 c2 # Describe/configure the source a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 44444 # Describe the sink a1.sinks.k1.type =file_roll a1.sinks.k1.sink.directory=/hom

Flume笔记--source端监听目录,sink端上传到HDFS

官方文档参数解释:http://flume.apache.org/FlumeUserGuide.html#hdfs-sink 需要注意:文件格式,fileType=DataStream 默认为SequenceFile,是hadoop的文件格式,改为DataStream就可直接读了(SqeuenceFile怎么用还不知道..)配置文件: hdfs.conf a1.sources = r1a1.sinks = k1a1.channels = c1 # Describe/configure the s