Android Mqtt 消息推送使用

初始化SDK:

/**
     * 初始化SDK
     *
     * @param context context
     */
    public void initSDK(Context context) {
        String clientId = String.valueOf(System.currentTimeMillis()+userId);
        mqttAndroidClient = new MqttAndroidClient(mContext, serverUri, clientId);
        subscriptionTopics = new ArrayList<>();
        mqttAndroidClient.setCallback(new MqttCallbackExtended() {
            @Override
            public void connectComplete(boolean reconnect, String serverURI) {

                if (reconnect) {

                    Log.d(TAG, "Reconnected to : " + serverURI);
                    // Because Clean Session is true, we need to re-subscribe
//                    subscribeToTopic();
                    //publishMessage();
                } else {
                    Log.d(TAG, "Connected to: " + serverURI);

                }
                connectSuccess = true;

                subscribeToTopic();
            }

            @Override
            public void connectionLost(Throwable cause) {
                connectSuccess = false;
                Log.e(TAG, "The Connection was lost." + cause.getLocalizedMessage());

            }

            // THIS DOES NOT WORK!
            @Override
            public void messageArrived(String topic, MqttMessage message) throws Exception {
                Log.d(TAG, "Incoming message: " +topic+ new String(message.getPayload()));

            }

            @Override
            public void deliveryComplete(IMqttDeliveryToken token) {

            }
        });

    }

连接远程服务:

/**
     * 连接远程服务
     */
    public void connectServer() {
        MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
        mqttConnectOptions.setAutomaticReconnect(true);
        mqttConnectOptions.setCleanSession(false);

        try {
            //addToHistory("Connecting to " + serverUri);

            mqttAndroidClient.connect(mqttConnectOptions, null, new IMqttActionListener() {
                @Override
                public void onSuccess(IMqttToken asyncActionToken) {
                    connectSuccess = true;
                    DisconnectedBufferOptions disconnectedBufferOptions = new DisconnectedBufferOptions();
                    disconnectedBufferOptions.setBufferEnabled(true);
                    disconnectedBufferOptions.setBufferSize(100);
                    disconnectedBufferOptions.setPersistBuffer(false);
                    disconnectedBufferOptions.setDeleteOldestMessages(false);
                    mqttAndroidClient.setBufferOpts(disconnectedBufferOptions);
                }

                @Override
                public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
                    Log.e(TAG, "Failed to connect to: " + serverUri);
                    exception.printStackTrace();
                    Log.d(TAG, "onFailure: " + exception.getCause());
                    connectSuccess = false;

                }
            });

        } catch (MqttException ex) {
            ex.printStackTrace();
        }
    }

获取订阅信息:

  /**
     *获取订阅信息   */

public void connectGateway(String gatewayId, String userId) { 
//获取订阅信息
        if (!subscriptionTopics.contains(gatewayId)) {
            subscriptionTopics.add(gatewayId);
        }
        Log.d(TAG, "pre sub topic: connect status=" + connectSuccess);
        Log.d(TAG, "subtopic " + subscriptionTopics);
        subscribeToTopic();
    }

  

订阅mqtt消息:

/**
     * 订阅mqtt消息
     */
    private void subscribeToTopic() {
        try {
            if(subscriptionTopics.size()==0)
                return;
            String[] topics = new String[subscriptionTopics.size()];
            subscriptionTopics.toArray(topics);
            int[] qoc = new int[topics.length];
            IMqttMessageListener[] mqttMessageListeners = new IMqttMessageListener[topics.length];
            for (int i = 0; i < topics.length; i++) {
                IMqttMessageListener mqttMessageListener = new IMqttMessageListener() {
                    @Override
                    public void messageArrived(String topic, MqttMessage message) throws Exception {
                        // message Arrived!消息送达后做出的处理
                        Log.d(TAG, topic + " : " + new String(message.getPayload()));
                        handleReceivedMessage(new String(message.getPayload()), topic);
                    }
                };
                mqttMessageListeners[i] = mqttMessageListener;
                Log.d(TAG, "subscribeToTopic: qoc= " + qoc[i]);
            }
            mqttAndroidClient.subscribe(topics, qoc, null, new IMqttActionListener() {
                @Override
                public void onSuccess(IMqttToken iMqttToken) {
                    Log.d(TAG, "Subscribed!");
                }

                @Override
                public void onFailure(IMqttToken iMqttToken, Throwable throwable) {
                    Log.d(TAG, "Failed to subscribe");
                }
            }, mqttMessageListeners);

        } catch (MqttException ex) {
            System.err.println("Exception whilst subscribing");
            ex.printStackTrace();
        }

    }

处理收到的消息:

private void handleReceivedMessage(String message, String gatewayId) {
//可以发送一条广播通知程序
}

  

发送mqtt消息:

/**
     * 发送 mqtt 消息
     *
     * @param publishMessage 要发送的信息的 字符串
     */
    private void publishMessage(String publishMessage, String publishTopic) {
            try {
                publishTopic = userId + "/" + publishTopic;
                MqttMessage message = new MqttMessage();
                message.setPayload(publishMessage.getBytes());
                mqttAndroidClient.publish(publishTopic, message);
                Log.d(TAG, "publishMessage:Message Published \n" + publishTopic + ":" + message);
                if (!mqttAndroidClient.isConnected()) {
                    Log.d(TAG, mqttAndroidClient.getBufferedMessageCount() + " messages in buffer.");
                }
            } catch (MqttException e) {
                System.err.println("Error Publishing: " + e.getMessage());
                e.printStackTrace();
            }
    }

  

没有封装的类:

public class SubscribeClient {
    private final static String CONNECTION_STRING = "tcp://mqtt地址:mqtt端口";
    private final static boolean CLEAN_START = true;
    private final static short KEEP_ALIVE = 30;//低耗网络,但是又需要及时获取数据,心跳30s
    private final static String CLIENT_ID = "client1";
    private final static String[] TOPICS = {
            //订阅信息
    };
    private final static int[] QOS_VALUES = {0, 0, 2, 0};

    private MqttClient mqttClient = null;

    public SubscribeClient(String i) {
        try {
            mqttClient = new MqttClient(CONNECTION_STRING);
            SimpleCallbackHandler simpleCallbackHandler = new SimpleCallbackHandler();
            mqttClient.registerSimpleHandler(simpleCallbackHandler);//注册接收消息方法
            mqttClient.connect(CLIENT_ID + i, CLEAN_START, KEEP_ALIVE);
            mqttClient.subscribe(TOPICS, QOS_VALUES);//订阅接主题

            /**
             * 完成订阅后,可以增加心跳,保持网络通畅,也可以发布自己的消息
             */

            mqttClient.publish(PUBLISH_TOPICS, "keepalive".getBytes(), QOS_VALUES[0], true);

        } catch (MqttException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

    /**
     * 简单回调函数,处理client接收到的主题消息
     *
     * @author pig
     */
    class SimpleCallbackHandler implements MqttSimpleCallback {

        /**
         * 当客户机和broker意外断开时触发
         * 可以再此处理重新订阅
         */
        @Override
        public void connectionLost() throws Exception {
            // TODO Auto-generated method stub
            System.out.println("客户机和broker已经断开");
        }

        /**
         * 客户端订阅消息后,该方法负责回调接收处理消息
         */
        @Override
        public void publishArrived(String topicName, byte[] payload, int Qos, boolean retained) throws Exception {
            // TODO Auto-generated method stub
            System.out.println("订阅主题: " + topicName);
            System.out.println("消息数据: " + new String(payload));
            System.out.println("消息级别(0,1,2): " + Qos);
            System.out.println("是否是实时发送的消息(false=实时,true=服务器上保留的最后消息): " + retained);
        }

    }

    /**
     * 高级回调
     *
     * @author pig
     */
    class AdvancedCallbackHandler implements MqttSimpleCallback {

        @Override
        public void connectionLost() throws Exception {
            // TODO Auto-generated method stub

        }

        @Override
        public void publishArrived(String arg0, byte[] arg1, int arg2,
                                   boolean arg3) throws Exception {
            // TODO Auto-generated method stub

        }

    }

    /**
     * @param args
     */
    public static void main(String[] args) {
        // TODO Auto-generated method stub
        new SubscribeClient("" + i);

    }

}

  

时间: 2024-11-07 20:20:39

Android Mqtt 消息推送使用的相关文章

android热门消息推送横向测评!

关于这个话题,已经不是什么新鲜事了.对于大多数中小型公司一般都是选择第三方的服务来实现.但是现在已经有很多提供推送服务的公司和产品,如何选择一个适合自己项目的服务呢?它们之间都有什么差别?在此为大家做了一个简单的调研,希望可以帮到大家. 简介: 手机推送服务是指服务器定向将信息实时送达手机的服务.推送服务,主要就是将最新资讯和最近的活动信息及时推送给用户,与用户保持互动,从而提高用户粘性,提升用户体验.例如,微信,新浪微博等APP的通知栏消息. 原理: 1)轮询(Pull)方式:应用程序应当阶段

[android] 安卓消息推送的几种实现方式

消息推送的目的:让服务器端及时的通知客户端 实现方案 轮询:客户端每隔一定的时间向服务器端发起请求,获得最新的消息 特点:如果用在最新新闻通知上,效率就有点低了,技术简单,好实现 应用场景:服务器端以一定的频率更新时,如:股票行情,就比较适用轮询了 当你把手机应用的聊天打开,过了一会发烫,基本就是轮询实现的 发短信:服务端发送短信,客户端对短信进行解析 特点:最及时,费用高 应用场景:土豪公司,没联网也可以收到信息 使用第三方的开源项目: androidPN (android push noti

Android (Notification)消息推送机制

从网上查询资料学习Android消息推送机制,效果图如下: 1.首先是布局文件代码 activity_main.xml <?xml version="1.0" encoding="utf-8"?> <LinearLayout xmlns:android="http://schemas.android.com/apk/res/android" android:layout_width="fill_parent"

Android本地消息推送

项目介绍:cocos2dx跨平台游戏 项目需求:实现本地消息推送,需求①:定点推送:需求②:根据游戏内逻辑实现推送(比如玩家体力满时,需要计算后到点推送):需求③:清理后台程序或重启后依然能够实现本地推送. 功能实现:由于IOS有一套比较成熟的UILocalNotification推送机制,这里主要说明Android下的实现.另外大家感兴趣可以看下第三方的推送:个推.极光.腾讯信鸽.百度云推送等,第三方多是要接入服务端,否则只能自己在第三方申请的应用的后台手动推送,另外第三方也不保证能100%所

Android第三方消息推送

====================问题描述==================== 我看现在第三方的消息推送怎么都是从网页推到手机上的?有没有从手机到手机的,即时聊天那样的?有哪些? ====================解决方案1==================== 终端直接一般依靠服务器实现,如果终端不经过服务器中转,那就不能实现广域网通讯.终端之间可以通过udp广播实现局域网内消息推送. ====================解决方案2====================

关于Android的消息推送以及前后台切换

让Android点击通知栏信息后返回正在运行的程序,并完全退出所有activity 首先,其中一个原因可能是从通知栏打开应用是新开activity并且是存在一个新的task堆里的,这种情况下使用 restartpackage()方法不能关闭应用的所有activity,解决的办法可以强制从通知栏打开应用时回来原activity或者能关闭所有 task里的 activity 方法二试过,不过不知如何取出所有activity,task也不知如何清除.方法一现在想想应该推荐这种方法,找到的方法资料先贴上

Android后台消息推送-android学习之旅(71)

建议使用第三方的sdk,比如极光推送,小米推送,百度推送 版权声明:本文为博主原创文章,未经博主允许不得转载.

Android 基于Netty的消息推送方案之Hello World(一)

消息推送方案(轮询.长连接) 轮询 轮询:比较简单的,最容易理解和实现的就是客户端去服务器上拉信息,信息的及时性要求越高则拉信息的频率越高.客户端拉信息的触发可以是一些事件,也可以是一个定时器,不断地去查询服务器.所以这个方案的弊端也是显而易见的,在轮询的频率较高时,服务器端的压力很大,通讯的流量也很大,并且大部分时间都是做的无用功. 长连接 长连接:客户端和服务端维持一个长连接,服务端在有信息推送的时候,借助这个连接把信息发送到客户端.这个方案的优点是信息推送的及时性很高,基本是实时的,并且除

Android 基于Netty的消息推送方案(一)

消息推送方案(轮询.长连接) 轮询 轮询:比较简单的,最容易理解和实现的就是客户端去服务器上拉信息,信息的及时性要求越高则拉信息的频率越高.客户端拉信息的触发可以是一些事件,也可以是一个定时器,不断地去查询服务器.所以这个方案的弊端也是显而易见的,在轮询的频率较高时,服务器端的压力很大,通讯的流量也很大,并且大部分时间都是做的无用功. 长连接 长连接:客户端和服务端维持一个长连接,服务端在有信息推送的时候,借助这个连接把信息发送到客户端.这个方案的优点是信息推送的及时性很高,基本是实时的,并且除