跟踪MQTT源码(一)

Spring整合MQTT

pom.xml

<!-- MQTT消息队列 -->
        <dependency>
            <groupId>org.eclipse.paho</groupId>
             <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
            <version>1.0.2</version>
        </dependency>
        <dependency>
            <groupId>com.google.code.gson</groupId>
            <artifactId>gson</artifactId>
            <version>2.8.1</version>
        </dependency>
        <!-- 消息推送
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>4.1.0</version>
        </dependency>
         -->
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-mqtt</artifactId>
            <version>4.3.5.RELEASE</version>
            <exclusions>
                <exclusion>
                    <groupId>org.eclipse.paho</groupId>
                    <artifactId>mqtt-client</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.springframework</groupId>
                    <artifactId>spring-messaging</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <!--
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-core</artifactId>
            <version>5.0.0.M5</version>
        </dependency>
         -->

        <dependency>
            <groupId>org.fusesource.mqtt-client</groupId>
            <artifactId>mqtt-client</artifactId>
            <version>1.14</version>
        </dependency>

spring-mqtt.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:int="http://www.springframework.org/schema/integration"
    xmlns:mqtt="http://www.springframework.org/schema/integration/mqtt"
    xmlns:int-mqtt="http://www.springframework.org/schema/integration/mqtt"
    xmlns:context="http://www.springframework.org/schema/context"
    xsi:schemaLocation="http://www.springframework.org/schema/integration/mqtt http://www.springframework.org/schema/integration/mqtt/spring-integration-mqtt-4.3.xsd
        http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.3.xsd
        http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-4.3.xsd">

    <!-- 引入配置文件:classpath等同于src目录,两种配置方式 -->
    <context:property-placeholder location="classpath:mqtt.properties"  ignore-unresolvable="true" />

    <!-- mqtt客户端订阅消息 -->
    <bean id="clientFactory" class="org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory">
        <property name="userName" value="${broker.userName}"/>
        <property name="password" value="${broker.password}"/>
    </bean>

    <!--
    消息适配器 org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter
    org.springframework.messaging.MessageChannel
    org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler
    DefaultPahoMessageConverter
    -->
    <!-- 消息转换器
    <bean id="myConverter" class="org.springframework.integration.mqtt.support.DefaultPahoMessageConverter"></bean>
     -->

    <int-mqtt:message-driven-channel-adapter
        id="mqttInbound"
        client-id="client"
        url="${broker.host}"
        topics="activate"
        qos="1"
        client-factory="clientFactory"
        auto-startup="true"
        send-timeout="12"
        recovery-interval="10000"
        channel="startCase" />

    <!--
    <int:channel id="startCase" />
     -->

    <!-- 对接收消息进行过滤 @tstamp + ‘ ‘ + headers.get(‘mqtt_topic‘) + ‘: ‘ + payload.toString() + @newline
    <int:transformer id="convert"
        input-channel="startCase"
        expression="payload.toString() + headers.get(‘mqtt_topic‘)"
        output-channel="toMqttService" />
     -->

    <!-- 方案一 -->
    <int:service-activator id="startCaseService"
        input-channel="startCase" ref="mqttCaseService" method="startCase" />

    <!-- 方案二   id="toMqttService" channel="toMqttService"
    <int:outbound-channel-adapter id="toMqttService"
        ref="mqttCaseService"
        method="startCase" />
    -->

    <bean id="mqttCaseService" class="com.vguang.service.impl.MqttService2"></bean>

</beans>

实现消息处理类

public class MqttService2{
    private static final Logger log = LoggerFactory.getLogger(MqttService2.class);

    private MqttPahoMessageHandler mqtt;
    private volatile Integer serialno = 0;

    public void startCase(Message<String> recmsg){
                //mqtt5.0
//        String topic = (String) recmsg.getHeaders().get("mqtt_receivedTopic");
        String topic = (String) recmsg.getHeaders().get("mqtt_topic");
        String payload = recmsg.getPayload();

        log.info("消息解析headers结果:{},{}", topic, payload);
    }
}

startCase()方法中的参数目前我知道的有三种:

时间: 2024-10-05 05:02:20

跟踪MQTT源码(一)的相关文章

跟踪MQTT源码(二)

对于spring-mqtt.xml中的标签: <int-mqtt:message-driven-channel-adapter> <int-mqtt:outbound-channel-adapter> <int:channel> <int:transformer> <int:service-activator> <int:outbound-channel-adapter> 了解Spring自定义标签的应该都知道标签的构建过程: 1.首

使用Eclipse跟踪JDK源码

首先我们要学会的是将JDK源码加载Eclipse中. 1.点"窗口"-->"首选项",选择左边的"Java"-->"已安装的JRE",然后选择我们安装的JRE,并单击它,然后选择右边的"编辑". 点"编辑"将出现如下的界面: 2.跟踪阅读源码 如上图,在我自己写的代码中包含了StringTokenizer类,我们要看它的具体定义,就只要按住"Ctrl"键,

ESA2GJK1DH1K基础篇: STM32+Wi-Fi(AT指令版)实现MQTT源码讲解

前言 该程序需要的基础知识:  https://www.cnblogs.com/yangfengwu/category/1566194.html   所有源码开源,请自行学习 打开第一节的源码 为了方便修改,我用数组存储了些参数 然后看链接MQTT部分 说个地方 然后看 判断连接状态 接着看订阅 判断订阅是否成功 发送一条上线消息 连接MQTT部分就结束了,然后就到了主循环了 一,配网 二,处理接收的数据 三,每隔一段时间采集发送温湿度数据 看下我的心跳包处理 说下我的处理思路 首先,如果到了发

Qemu+gdb跟踪内核源码

1.编译安装Qemu Qemu源码下载地址:http://wiki.qemu.org/Download linux下可以直接用wget下载: wget http://wiki.qemu.org/download/qemu-2.2.0.tar.bz 解压缩bz2文件: tar -jxvf qemu-2.2.0.tar.bz 由于我只有模拟x86的需求,所以在编译之前先配置Qemu: cd qemu-2.2.2 ./configure --atrget-list=i386-softmmu make

Thread.interrupt()源码跟踪

1 JDK源码跟踪 // java.lang.Thread public void interrupt() { if (this != Thread.currentThread()) checkAccess(); synchronized (blockerLock) { Interruptible b = blocker; if (b != null) { interrupt0(); // Just to set the interrupt flag b.interrupt(this); ret

Android应用程序启动过程——Launcher源码分析

当我们在Launcher界面单击一个应用程序图标时就会启动一个程序,那这一个过程究竟发生了些哪样呢?让我们跟踪Launcher源码来分析一下吧. 先上流程图: step1.追踪Launcher  从源码中我们可以发现Launcher其实也是一个程序,它继承于Activity.找到该文件中的onCreate()方法,代码片段如下: protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceSta

源码跟读,Spring是如何解析和加载xml中配置的beans

Spring版本基于: 跟踪代码源码基于: https://github.com/deng-cc/KeepLearning commit id:c009ce47bd19e1faf9e07f12086cd440b7799a63 1.配置启动Spring所需的监听器 web.xml中配置监听器 <listener> <listener-class>org.springframework.web.context.ContextLoaderListener</listener-cla

java之XMemcached使用及源码详解

转载请注明出处:http://blog.csdn.net/tang9140/article/details/43445511 前言 本文主要讲述如何使用XMemcached客户端与Memcached服务端进行交互.通过XMemcached的API调用与Memcached的set/get命令对比及跟踪XMemcached源码,使大家对XMemcached的API有更深层次的理解,能够从底层上去了解其工作原理,从而能在项目中进行一些针对性的接口封闭及优化工作. 是叫Memcache还是Memcach

Android Toast源码分析

前言 这周去杭州参加了百阿培训,见到了传说中的牛人多隆大神.从多隆大神身上看到了做技术人的纯粹,单纯.除了见到多隆大神,这次培训并没有太多的收获,反而培训过程中遇到了好多产品上的Bug,远程办公快累到死.总结一下跟Toast相关的问题,首先从深入学习Toast的源码实现开始. Toast源码实现 Toast入口 我们在应用中使用Toast提示的时候,一般都是一行简单的代码调用,如下所示: Toast.makeText(context, msg, Toast.LENGTH_SHORT).show(