大家好。
公司有个需求。要求Flumne 从MQ 取消息存储到DFS ,写了Flume自定义的source 。,由于我也是刚接触Flume 。 所以有啥不对的请谅解。
查看了Flume-ng的源码。 一般都是根据不同的场景 extends AbstractSource implements EventDrivenSource, Configurable
MQSource 代码如下:
1 public class MQSource extends AbstractSource implements EventDrivenSource, Configurable { 2 3 private Logger logger = org.slf4j.LoggerFactory.getLogger(MQSource.class); 4 5 private long heartbeat; 6 7 private MQReceiver receiver; 8 9 private HandleLineCallBack handle; 10 11 private Thread t; 12 13 @Override 14 public void configure(Context context) { 15 16 String mq_url = context.getString(MQContext.MQ_BROKER_URI, ActiveMQConnection.DEFAULT_BROKER_URL); 17 String mq_userName = context.getString(MQContext.MQ_USERNAME, ActiveMQConnection.DEFAULT_USER); 18 String mq_password = context.getString(MQContext.MQ_PASSWORD, ActiveMQConnection.DEFAULT_PASSWORD); 19 String mq_queueKey = context.getString(MQContext.MQ_QUEUEKEY, "NULL"); 20 String handleClass = context.getString(MQContext.HANDLECLASS, "NULL"); 21 22 long mq_reciveTimeout = context.getLong(MQContext.MQ_RECIVETIMEOUT, 3000L); 23 long heartbeat = context.getLong(MQContext.HEARTBEAT, 3000L); 24 this.heartbeat = heartbeat; 25 if ("NULL".equals(mq_queueKey)) { 26 logger.error("{} : Unable to load MQ_queueKey ", getName()); 27 return; 28 } 29 if ("NULL".equals(handleClass)) { 30 logger.warn("{} : Unable to handleClass using DefaultHandle ", getName()); 31 handleClass = "com.bidlink.handle.DefaultHandle"; 32 } 33 34 MQConfig mqconfig = new MQConfig(mq_url, mq_userName, mq_password, mq_queueKey, mq_reciveTimeout); 35 logger.info("{} MQ Configuration : {} ", getName(), mqconfig.toString()); 36 receiver = MQFactory.MQ.getReceiver(mqconfig); 37 logger.info("{} .get recerver key is {} . obj is : {} ", getName(), mqconfig.getQueueKey(), receiver); 38 39 try { 40 @SuppressWarnings("unchecked") 41 Class<?> handleClazz = (Class<? extends HandleLineCallBack>) Class.forName(handleClass); 42 handle = (HandleLineCallBack) handleClazz.newInstance(); 43 } catch (ClassNotFoundException e) { 44 logger.error("{} unable to load class {} . {} ", getName(), handleClass, e); 45 } catch (InstantiationException e1) { 46 logger.error("{} instance class error {} . {} ", getName(), handleClass, e1); 47 } catch (IllegalAccessException e2) { 48 logger.error("{} occur exception {} . {} ", getName(), handleClass, e2); 49 } 50 } 51 52 @Override 53 public synchronized void start() { 54 logger.info("MQSource start....."); 55 // TODO Auto-generated method stub 56 try { 57 t = new Thread() { 58 public void run() { 59 while (true) { 60 try { 61 List<String> lines = receiver.getText(); 62 for (String line : lines) { 63 //logger.info("Message line : {} ",line); 64 Event e = new SimpleEvent(); 65 String refStr = handle.refactor(line); 66 e.setBody(refStr.getBytes("GBK")); 67 getChannelProcessor().processEvent(e); 68 } 69 super.start(); 70 Thread.sleep(heartbeat); 71 } catch (Exception e1) { 72 e1.printStackTrace(); 73 } 74 75 } 76 }; 77 }; 78 t.start(); 79 } catch (Exception e1) { 80 logger.error("error starting MQResource {} ",e1.getMessage()); 81 e1.printStackTrace(); 82 } 83 } 84 85 @Override 86 public synchronized void stop() { 87 logger.info("MQSource stoping..."); 88 if (t.isAlive()) { 89 try { 90 t.join(); 91 } catch (InterruptedException e) { 92 e.printStackTrace(); 93 } 94 t.interrupt(); 95 } 96 super.stop(); 97 } 98 99 }
start方法中主要代码:
Event e = new SimpleEvent(); e.setBody("hello everyone ".getBytes("GBK")); getChannelProcessor().processEvent(e); super.start();
configure方法中的context中能获取各种自定义的配置信息。如在flume.conf中配置以下信息:
tier1.sources.testSources.type = org.yunjume.source.MQSource tier1.sources.testSources.MQ_userName= admin tier1.sources.testSources.MQ_password= admin123 tier1.sources.testSources.MQ_brokerURL=tcp://localhost:61616 tier1.sources.testSources.MQ_queueKey=FirstQueue tier1.sources.testSources.MQ_reciveTimeout=30000 tier1.sources.testSources.heartbeat=30000 # to process mq message queue line and return new line . tier1.sources.testSources.handleClass=org.yunjume.handle.DefaultHandle tier1.sources.testSources.channels = testChannels
获取MQ_userName值代码为:
String mq_userName = context.getString("MQ_userName", ActiveMQConnection.DEFAULT_USER);
stop 就结束了。
打包jar 放到Flume主目录的插件目录下。我的是/usr/lib/flume-ng/plugins.d
如打包的名字叫MQSource.jar 那应该在plugins.d创建文件夹 MQSource 然后把MQSource.jar放到MQSource/lib下。
依赖的jar 放到 MQSource/libext下 。目录结构如下
/usr/lib/flume-ng/plugins.d/MQSource/lib/MQSource.jar
/usr/lib/flume-ng/plugins.d/MQSource/libext/ 依赖的jar包
/usr/lib/flume-ng/plugins.d/MQSource/native 本地so文件或dll文件
时间: 2024-10-11 22:34:41