flume与Mosquitto的集成

文章来自:http://www.cnblogs.com/hark0623/p/4173714.html  转发请注明

因业务需求,需要flume收集MQTT(Mosquitto)的数据。  方法就是flume自定义source,source中来订阅(subscribe)MQTT

flume source的java代码如下:

package com.yhx.sensor.flume.source;

import java.util.HashMap;
import java.util.Map;

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDrivenSource;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.EventBuilder;
import org.apache.flume.source.AbstractSource;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttTopic;

public class MQTTSource extends AbstractSource implements EventDrivenSource,
        Configurable {
    /**
     * The initialization method for the Source. The context contains all the
     * Flume configuration info, and can be used to retrieve any configuration
     * values necessary to set up the Source.
     */
    @Override
    public void configure(Context arg0) {
        // TODO Auto-generated method stub

    }

    SimpleMqttClient client = null;

    /**
     * Start any dependent systems and begin processing events.
     */
    @Override
    public void start() {
        // TODO Auto-generated method stub
        // super.start();
        client = new SimpleMqttClient();
        client.runClient();
    }

    /**
     * Stop processing events and shut any dependent systems down.
     */
    @Override
    public void stop() {
        // TODO Auto-generated method stub
        // super.stop();
        if (client != null) {
            client.closeConn();
        }
    }

    // public static void main(String[] args) {
    // SimpleMqttClient smc = new SimpleMqttClient();
    // smc.runClient();
    // }

    public class SimpleMqttClient implements MqttCallback {

        MqttClient myClient;
        MqttConnectOptions connOpt;

        String BROKER_URL = "tcp://192.168.116.128:1883";
        String M2MIO_DOMAIN = "192.168.116.128";
        String M2MIO_STUFF = "yhx";
        String M2MIO_THING = "yhx_flume";
        // String M2MIO_USERNAME = "<m2m.io username>";
        // String M2MIO_PASSWORD_MD5 =
        // "<m2m.io password (MD5 sum of password)>";

        Boolean subscriber = true;
        Boolean publisher = false;

        /**
         *
         * connectionLost This callback is invoked upon losing the MQTT
         * connection.
         *
         */
        @Override
        public void connectionLost(Throwable t) {
            System.out.println("Connection lost!");
            // code to reconnect to the broker would go here if desired
        }

        public void closeConn() {
            if (myClient != null) {
                if (myClient.isConnected()) {
                    try {
                        myClient.disconnect();
                    } catch (MqttException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                }
            }
        }

        /**
         *
         * deliveryComplete This callback is invoked when a message published by
         * this client is successfully received by the broker.
         *
         */
        @Override
        public void deliveryComplete(IMqttDeliveryToken token) {
            // System.out.println("Pub complete" + new
            // String(token.getMessage().getPayload()));
        }

        /**
         *
         * messageArrived This callback is invoked when a message is received on
         * a subscribed topic.
         *
         */
        @Override
        public void messageArrived(String topic, MqttMessage message)
                throws Exception {
            // System.out
            // .println("-------------------------------------------------");
            // // System.out.println("| Topic:" + topic.getName());
            // System.out.println("| Topic:" + topic);
            // System.out
            // .println("| Message: " + new String(message.getPayload()));
            // System.out
            // .println("-------------------------------------------------");

            Map<String, String> headers = new HashMap<String, String>();
            //headers.put("curDate", df.format(new Date()));

            Event flumeEvent = EventBuilder.withBody(message.getPayload(),
                    headers);
            try {
                getChannelProcessor().processEvent(flumeEvent);
            } catch (Exception e) {
                // TODO: handle exception
                e.printStackTrace();
            }

        }

        /**
         *
         * runClient The main functionality of this simple example. Create a
         * MQTT client, connect to broker, pub/sub, disconnect.
         *
         */
        public void runClient() {
            // setup MQTT Client
            String clientID = M2MIO_THING;
            connOpt = new MqttConnectOptions();

            connOpt.setCleanSession(true);
            connOpt.setKeepAliveInterval(3000);
            // connOpt.setUserName(M2MIO_USERNAME);
            // connOpt.setPassword(M2MIO_PASSWORD_MD5.toCharArray());

            // Connect to Broker
            try {
                myClient = new MqttClient(BROKER_URL, clientID);
                myClient.setCallback(this);
                myClient.connect(connOpt);
            } catch (MqttException e) {
                e.printStackTrace();
                System.exit(-1);
            }

            System.out.println("Connected to " + BROKER_URL);

            // setup topic
            // topics on m2m.io are in the form <domain>/<stuff>/<thing>
            String myTopic = M2MIO_DOMAIN + "/" + M2MIO_STUFF + "/"
                    + M2MIO_THING;
            System.out.println("myTopic:" + myTopic);
            MqttTopic topic = myClient.getTopic(myTopic);

            // subscribe to topic if subscriber
            if (subscriber) {
                try {
                    int subQoS = 0;
                    myClient.subscribe(myTopic, subQoS);

                } catch (Exception e) {
                    e.printStackTrace();
                }
            }

            // publish messages if publisher
            if (publisher) {
                for (int i = 1; i <= 10; i++) {
                    String pubMsg = "{\"pubmsg\":" + i + "}";
                    int pubQoS = 0;
                    MqttMessage message = new MqttMessage(pubMsg.getBytes());
                    message.setQos(pubQoS);
                    message.setRetained(false);

                    // Publish the message
                    System.out.println("Publishing to topic \"" + topic
                            + "\" qos " + pubQoS);
                    MqttDeliveryToken token = null;
                    try {
                        // publish message to broker
                        token = topic.publish(message);
                        // Wait until the message has been delivered to the
                        // broker
                        token.waitForCompletion();
                        Thread.sleep(100);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }

            // disconnect
            try {
                // wait to ensure subscribed messages are delivered
                if (subscriber) {
                    while (true) {
                        Thread.sleep(5000);
                    }
                }
                // myClient.disconnect();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
            }
        }

    }

}

打JAR包注意要把Class-Path写上,如下:

Manifest-Version: 1.0
Class-Path: flume-ng-configuration-1.5.2.jar flume-ng-core-1.5.2.jar flume-ng-node-1.5.2.jar flume-ng-sdk-1.5.2.jar org.eclipse.paho.client.mqttv3-1.0.0.jar

将打好的JAR包放到flume的lib目录(注意,class-path说明的jar包在lib一定要有。 如果没有,则放上去)

接着修改一下flume的配置文件,如下(主要是sourceMqtt ,看这个。  因为我这块同时还监听了UDP):

a1.sources = sourceMqtt sourceUdp
a1.sinks = sinkMqtt sinkUdp
a1.channels = channelMqtt channelUdp

# Describe/configure the source
a1.sources.sourceMqtt.type = com.yhx.sensor.flume.source.MQTTSource

# Describe the sink
a1.sinks.sinkMqtt.type = logger

# Use a channel which buffers events in memory
a1.channels.channelMqtt.type = memory
a1.channels.channelMqtt.capacity = 1000
a1.channels.channelMqtt.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.sourceMqtt.channels = channelMqtt
a1.sinks.sinkMqtt.channel = channelMqtt

# a2.sources = sourceUdp
# a2.sinks = sinkUdp
# a2.channels = channelUdp

# Describe/configure the source
a1.sources.sourceUdp.type = syslogudp
a1.sources.sourceUdp.host = 0.0.0.0
a1.sources.sourceUdp.port = 12459
a1.sources.sourceUdp.interceptors=interceptorUdp

a1.sources.sourceUdp.interceptors.interceptorUdp.type=com.yhx.sensor.flume.intercepter.UDPIntercepter$Builder

# Describe the sink
a1.sinks.sinkUdp.type = logger

# Use a channel which buffers events in memory
a1.channels.channelUdp.type = memory
a1.channels.channelUdp.capacity = 1000
a1.channels.channelUdp.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.sourceUdp.channels = channelUdp
a1.sinks.sinkUdp.channel = channelUdp

配置文件保存至flume目录下的conf,叫flume.conf

然后flume启动命令如下

bin/flume-ng agent --conf conf --conf-file conf/flume.conf --name a1
时间: 2024-11-17 16:49:14

flume与Mosquitto的集成的相关文章

Spark学习八:spark streaming与flume和kafka集成

Spark学习八:spark streaming与flume和kafka集成 标签(空格分隔): Spark Spark学习八spark streaming与flume和kafka集成 一Kafka 二flume和kafka的集成 三kafka和spark streaming的集成方式一kafka推送 四kafka和spark streaming的集成方式一spark streaam主动获取 五spark stream的高级应用updateStateByKey实现累加功能 六spark stre

SparkStreaming基础

* SparkStreaming基础 打开之前构建好的Maven工程,如何构建?请参看SparkCore基础(二)的最后部分. 在SparkCore中,我们操作的数据都在RDD中,是Spark的一个抽象概念,也是一个抽象类,是由SparkContext对象sc转换得到的. 那么在SparkStreaming中,我们使用的Spark的StreamingContext对象,简称ssc. 我们本节内容以动手为基础,直接开始一些测试案例:具体的框架结构请参看官方文档,写的非常之详细. SparkStre

大数据学习方向,从入门到精通

推荐一个大数据学习群 119599574晚上20:10都有一节[免费的]大数据直播课程,专注大数据分析方法,大数据编程,大数据仓库,大数据案例,人工智能,数据挖掘都是纯干货分享,你愿意来学习吗 很多初学者在萌生向大数据方向发展的想法之后,不免产生一些疑问,应该怎样入门?应该学习哪些技术?学习路线又是什么? 所有萌生入行的想法与想要学习Java的同学的初衷是一样的.岗位非常火,就业薪资比较高,,前景非常可观.基本都是这个原因而向往大数据,但是对大数据却不甚了解. 如果你想学习,那么首先你需要学会编

大数据学习之小白如何学大数据?(详细篇)

大数据这个话题热度一直高居不下,不仅是国家政策的扶持,也是科技顺应时代的发展.想要学习大数据,我们该怎么做呢?大数据学习路线是什么?先带大家了解一下大数据的特征以及发展方向. 大数据的三个发展方向,平台搭建/优化/运维/监控.大数据开发/设计/架构.数据分析/挖掘. 先说一下大数据的4V特征: 数据量大,TB->PB 数据类型繁多,结构化.非结构化文本.日志.视频.图片.地理位置等; 商业价值高,但是这种价值需要在海量数据之上,通过数据分析与机器学习更快速的挖掘出来; 处理时效性高,海量数据的处

大数据开发初学者该怎么做?

经常有初学者在问,自己想往大数据方向发展,该学哪些技术,学习路线是什么样的,觉得大数据很火,就业很好,薪资很高.如果自己很迷茫,为了这些原因想往大数据方向发展,也可以,那么我就想问一下,你的专业是什么,对于计算机/软件,你的兴趣是什么?是计算机专业,对操作系统.硬件.网络.服务器感兴趣?是软件专业,对软件开发.编程.写代码感兴趣?还是数学.统计学专业,对数据和数字特别感兴趣.. 其实这就是想告诉你的大数据的三个发展方向,平台搭建/优化/运维/监控.大数据开发/设计/架构.数据分析/挖掘.请不要问

做了五年大数据开发工程师总结的的大数据学习路线

先扯一下大数据的4V特征: 数据量大,TB->PB 数据类型繁多,结构化.非结构化文本.日志.视频.图片.地理位置等: 商业价值高,但是这种价值需要在海量数据之上,通过数据分析与机器学习更快速的挖掘出来: 处理时效性高,海量数据的处理需求不再局限在离线计算当中. 现如今,正式为了应对大数据的这几个特点,开源的大数据框架越来越多,越来越强,先列举一些常见的: 文件存储:Hadoop HDFS.Tachyon.KFS 离线计算:Hadoop MapReduce.Spark 流式.实时计算:Storm

大数据入门,到底要怎么学习大数据?

很多人都知道大数据很火,就业很好,薪资很高,想往大数据方向发展.但该学哪些技术,学习路线是什么样的呢? 其实就是想告诉你的大数据的三个发展方向,平台搭建/优化/运维/监控.大数据开发/设计/架构.数据分析/挖掘.请不要问我哪个好学,哪个钱多. 先说一下大数据的4V特征: 数据量大,TB->PB 数据类型繁多,结构化.非结构化文本.日志.视频.图片.地理位置等: 商业价值高,但是这种价值需要在海量数据之上,通过数据分析与机器学习更快速的挖掘出来: 处理时效性高,海量数据的处理需求不再局限在离线计算

大数据开发学习路线整理

参考博客:做了五年大数据开发工程师总结的的大数据学习路线 大数据的4V特征: 1.        数据量大,TB->PB 2.        数据类型繁多,结构化.非结构化文本.日志.视频.图片.地理位置等: 3.        商业价值高,但是这种价值需要在海量数据之上,通过数据分析与机器学习更快速的挖掘出来: 4.        处理时效性高,海量数据的处理需求不再局限在离线计算当中. 常见的大数据的开源框架: l  文件存储:Hadoop HDFS.Tachyon.KFS l  离线计算:

大数据开发学习步骤

经常有初学者 问我,自己想往大数据方向发展,该学哪些技术,学习路线是什么样的,觉得大数据很火,就业很好,薪资很高.如果自己很迷茫,为了这些原因想往大数据方向发展,也可以,那么我就想问一下,你的专业是什么,对于计算机/软件,你的兴趣是什么?是计算机专业,对操作系统.硬件.网络.服务器感兴趣?是软件专业,对软件开发.编程.写代码感兴趣?还是数学.统计学专业,对数据和数字特别感兴趣. 其实这就是想告诉你的大数据的三个发展方向,平台搭建/优化/运维/监控.大数据开发/设计/架构.数据分析/挖掘.请不要问