android smack源码分析——接收消息以及如何解析消息

在android里面用的smack包其实叫做asmack,该包提供了两种不同的连接方式:socket和httpclient。该并且提供了很多操作xmpp协议的API,也方便各种不同自定义协议的扩展。我们不需要自己重新去定义一套接收机制来扩展新的协议,只需继承然后在类里处理自己的协议就可以了。而本文今天主要说两点,一点就是消息是如何接收的,另一点就是消息是如何通知事件的。

总的思路

1.使用socket连接服务器

2.将XmlPullParser的数据源关联到socket的InputStream

3.启动线程不断循环处理消息

4.将接收到的消息解析xml处理封装好成一个Packet包

5.将包广播给所有注册事件监听的类

逐步击破

(声明在看下面的文章时,最好先理解一下smack的使用,这样才能达到深入的理解)

(谨记:上图只显示本文章解释所要用到的类和方法,减缩了一些跟本文主题无关的代码,只留一条贯穿着从建立连接到接收消息的线。)

解析这块东西打算从最初的调用开始作为入口,抽丝剥茧,逐步揭开。

1.

PacketListener packetListener = new PacketListener() {

@Override
        public void processPacket(Packet packet) {
            System.out
                    .println("Activity----processPacket" + packet.toXML());
        }
    };

    PacketFilter packetFilter = new PacketFilter() {

        @Override
        public boolean accept(Packet packet) {
            System.out.println("Activity----accept" + packet.toXML());
            return true;
        }
    };

解释:创建包的监听以及包的过滤,当有消息到时就会广播到所有注册的监听,当然前提是要通过packetFilter的过滤。

2.

connection = new XMPPConnection();

XMPPConnection在这构造函数里面主要配置ip地址和端口(super(new ConnectionConfiguration(“169.254.141.109”, 9991));)

3.

connection.addPacketListener(packetListener, packetFilter);
connection.connect();

注册监听,开始初始化连接。

4.
public void connect() {

// Stablishes the connection, readers and writers
    connectUsingConfiguration(config);

}
5.

private void connectUsingConfiguration(ConnectionConfiguration config) {

String host = config.getHost();
    int port = config.getPort();
    try {
        this.socket = new Socket(host, port);
    } catch (UnknownHostException e) {
        e.printStackTrace();
    } catch (IOException e) {
        e.printStackTrace();
    }
    initConnection();
}

通过之前设置的ip和端口,建立socket对象

6.

protected void initDebugger() {

Class<?> debuggerClass = null;
    try {
        debuggerClass = Class.forName("com.simualteSmack.ConsoleDebugger");

        Constructor<?> constructor = debuggerClass.getConstructor(
                Connection.class, Writer.class, Reader.class);
        debugger = (SmackDebugger) constructor.newInstance(this, writer,
                reader);
        reader = debugger.getReader();
    } catch (ClassNotFoundException e1) {
        // TODO Auto-generated catch block
        e1.printStackTrace();
    } catch (Exception e) {
        throw new IllegalArgumentException(
                "Can‘t initialize the configured debugger!", e);
    }
}

private void initReaderAndWriter() {

try {
    reader = new BufferedReader(new InputStreamReader(socket
            .getInputStream(), "UTF-8"));
} catch (UnsupportedEncodingException e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
} catch (IOException e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
}
initDebugger();

}
private void initConnection() {

// Set the reader and writer instance variables
initReaderAndWriter();

packetReader = new PacketReader(this);

addPacketListener(debugger.getReaderListener(), null);
// Start the packet reader. The startup() method will block until we
// get an opening stream packet back from server.
packetReader.startup();

}
从三个方法可以看出,建立reader和writer的对象关联到socket的InputStream,实例化ConsoleDebugger,该类主要是打印出接收到的消息,给reader设置了一个消息的监听。接着建立PacketReader对象,并启动。PacketReader主要负责消息的处理和通知

7.

public class PacketReader {

private ExecutorService listenerExecutor;
private boolean done;
private XMPPConnection connection;
private XmlPullParser parser;
private Thread readerThread;

protected PacketReader(final XMPPConnection connection) {
    this.connection = connection;
    this.init();
}

/**
 * Initializes the reader in order to be used. The reader is initialized
 * during the first connection and when reconnecting due to an abruptly
 * disconnection.
 */
protected void init() {
    done = false;

    readerThread = new Thread() {
        public void run() {
            parsePackets(this);
        }
    };

    readerThread.setName("Smack Packet Reader ");
    readerThread.setDaemon(true);

    // create an executor to deliver incoming packets to listeners.
    // we will use a single thread with an unbounded queue.
    listenerExecutor = Executors
            .newSingleThreadExecutor(new ThreadFactory() {

                @Override
                public Thread newThread(Runnable r) {
                    Thread thread = new Thread(r,
                            "smack listener processor");
                    thread.setDaemon(true);
                    return thread;
                }
            });
    resetParser();
}

/**
 * Starts the packet reader thread and returns once a connection to the
 * server has been established. A connection will be attempted for a maximum
 * of five seconds. An XMPPException will be thrown if the connection fails.
 *
 */
public void startup() {
    readerThread.start();
}

/**
 * Shuts the packet reader down.
 */
public void shutdown() {
    done = true;
    // Shut down the listener executor.
    listenerExecutor.shutdown();
}

private void resetParser() {
    try {
        parser = XmlPullParserFactory.newInstance().newPullParser();
        parser.setFeature(XmlPullParser.FEATURE_PROCESS_NAMESPACES, true);
        parser.setInput(connection.reader);
    } catch (XmlPullParserException xppe) {
        xppe.printStackTrace();
    }
}

/**
 * Parse top-level packets in order to process them further.
 *
 * @param thread
 *            the thread that is being used by the reader to parse incoming
 *            packets.
 */
private void parsePackets(Thread thread) {
    try {
        int eventType = parser.getEventType();
        do {
            if (eventType == XmlPullParser.START_TAG) {
                if (parser.getName().equals("message")) {
                    processPacket(PacketParserUtils.parseMessage(parser));
                }
                System.out.println("START_TAG");
            } else if (eventType == XmlPullParser.END_TAG) {
                System.out.println("END_TAG");
            }
            eventType = parser.next();
        } while (!done && eventType != XmlPullParser.END_DOCUMENT
                && thread == readerThread);
    } catch (Exception e) {
        e.printStackTrace();
        if (!done) {
        }
    }
}

private void processPacket(Packet packet) {
    if (packet == null) {
        return;
    }

    // Loop through all collectors and notify the appropriate ones.
    for (PacketCollector collector : connection.getPacketCollectors()) {
        collector.processPacket(packet);
    }

    // Deliver the incoming packet to listeners.
    listenerExecutor.submit(new ListenerNotification(packet));
}

/**
 * A runnable to notify all listeners of a packet.
 */
private class ListenerNotification implements Runnable {

    private Packet packet;

    public ListenerNotification(Packet packet) {
        this.packet = packet;
    }

    public void run() {
        for (ListenerWrapper listenerWrapper : connection.recvListeners
                .values()) {
            listenerWrapper.notifyListener(packet);
        }
    }
}

}
创建该类时就初始化线程和ExecutorService ,接着调用resetParser() 方法为parser设置输入源(这里是重点,parser的数据都是通过这里获取),调用startup启动线程,循环监听parser,如果接收到消息根据消息协议的不同将调用PacketParserUtils类里的不同方法,这里调用parseMessage()该方法主要处理message的消息,在该方法里分析message消息并返回packet包。返回的包将调用processPacket方法,先通知所有注册了PacketCollector的监听,接着消息(listenerExecutor.submit(new ListenerNotification(packet)); )传递给所有注册了PacketListener的监听。这样在activity开始之前注册的那个监听事件就会触发,从而完成了整个流程。

7以上.

剩下的就是一些辅助包,很简单。比如PacketCollector 这个类,它的用处主要用来处理一些需要在发送后需要等待一个答复这样的请求。

protected synchronized void processPacket(Packet packet) {

System.out.println("PacketCollector---processPacket");
    if (packet == null) {
        return;
    }
    if (packetFilter == null || packetFilter.accept(packet)) {
        while (!resultQueue.offer(packet)) {
            resultQueue.poll();
        }
    }
}

public Packet nextResult(long timeout) {

long endTime = System.currentTimeMillis() + timeout;
    System.out.println("nextResult");
    do {
        try {
            return resultQueue.poll(timeout, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) { /* ignore */
        }
    } while (System.currentTimeMillis() < endTime);
    return null;
}

该方法就是将获取到的包,先过滤然后放到队列里,最后通过nextResult来获取包,这样就完成一个请求收一个答复。

这样整个流程就完成了,最后总结一下,如图(就这么简单0):

项目下载(只有客户端的,服务端的就是一个简单的socket接受,为了锻炼一下大家的编写代码的能力,服务器那个只能自己写咯0,其实是懒得上传了,代码很简单的)

http://files.cnblogs.com/not-code/simualteSmack.zip

本文为原创,如需转载,请注明作者和出处,谢谢!

出处:http://www.cnblogs.com/not-code/archive/2011/08/01/2124340.html

时间: 2025-01-02 15:39:56

android smack源码分析——接收消息以及如何解析消息的相关文章

Android IntentService 源码分析

IntentService简介: IntentService是一个通过Context.startService(Intent)启动可以处理异步请求的Service,使用时你只需要继承IntentService和重写其中的onHandleIntent(Intent)方法接收一个Intent对象,该服务会在异步任务完成时自动停止服务. 所有的请求的处理都在IntentService内部工作线程中完成,它们会顺序执行任务(但不会阻塞主线程的执行),某一时刻只能执行一个异步请求. IntnetServi

Android debuggerd 源码分析

debuggerd 简介 Android系统自带一个实用的程序异常退出的诊断daemon debuggerd.此进程可以侦测到程序崩溃,并将崩溃时的进程状态信息输出到文件和串口中,以供开发人员分析调试使用.Debuggerd的数据被保存在/data/tombstone/目录下,共可保存10个文件,当超过10个时,会覆盖重写最早生产的文件.串口中,则直接用DEBUG的tag,输出logcat信息. Linux kernel有自己的一套signal机制,在应用程序崩溃时,通常系统内核都会发送sign

[Android]Volley源码分析(二)Cache

Cache作为Volley最为核心的一部分,Volley花了重彩来实现它.本章我们顺着Volley的源码思路往下,来看下Volley对Cache的处理逻辑. 我们回想一下昨天的简单代码,我们的入口是从构造一个Request队列开始的,而我们并不直接调用new来构造,而是将控制权反转给Volley这个静态工厂来构造. com.android.volley.toolbox.Volley: public static RequestQueue newRequestQueue(Context conte

Android 消息处理源码分析(2)

Android 消息处理源码分析(1)点击打开链接 继续接着分析剩下的类文件 Looper.java public final class Looper { final MessageQueue mQueue; //消息队列 final Thread mThread; //Looper联系的线程 public static void prepare() { prepare(true); } private static void prepare(boolean quitAllowed) { //

[Android]Volley源码分析(五)

前面几篇通过源码分析了Volley是怎样进行请求调度及请求是如何被实际执行的,这篇最后来看下请求结果是如何交付给请求者的(一般是Android的UI主线程). 类图: 请求结果的交付是通过ResponseDelivery接口完成的,它有一个实现类ExecutorDelivery, 主要有postResponse()与postError()两个方法,分别在请求成功或失败时将结果提交给请求发起者. 1. 首先,在NetworkDispatcher的run()方法中,当服务器返回响应并解析完后,会调用

[Android]Volley源码分析(二)

上一篇介绍了Volley的使用,主要接触了Request与RequestQueue这两个类,这篇就来了解一下这两个类的具体实现. Request类图: Request类: Request是一个抽象类,其中的主要属性: mMethod: 请求方法,目前支持GET, POST, PUT, DELETE, HEAD, OPTIONS,TRACE, PATCH方法 mUrl: 请求Url mErrorListener: 错误处理监听器,请求出错时调用 mSequence: 请求的序号,相同优先级的请求在

Android系统源码阅读(13):Input消息的分发过程

Android系统源码阅读(13):Input消息的分发过程 请对照AOSP版本:6.0.1_r50.学校电脑好渣,看源码时卡半天 先回顾一下前两篇文章.在设备没有事件输入的时候,InputReader和InputDispatcher都处于睡眠状态.当输入事件发生,InputReader首先被激活,然后发送读取消息,激活Dispatcher.Dispatcher被激活以后,将消息发送给当前激活窗口的主线程,然后睡眠等待主线程处理完这个事件.主线程被激活后,会处理相应的消息,处理完毕后反馈给Dis

Android HandlerThread 源码分析

HandlerThread 简介: 我们知道Thread线程是一次性消费品,当Thread线程执行完一个耗时的任务之后,线程就会被自动销毁了.如果此时我又有一 个耗时任务需要执行,我们不得不重新创建线程去执行该耗时任务.然而,这样就存在一个性能问题:多次创建和销毁线程是很耗 系统资源的.为了解这种问题,我们可以自己构建一个循环线程Looper Thread,当有耗时任务投放到该循环线程中时,线程执行耗 时任务,执行完之后循环线程处于等待状态,直到下一个新的耗时任务被投放进来.这样一来就避免了多次

[Android]Fragment源码分析(肆) Fragment栈管理

Fragment的栈是Fragment管理颇为出彩的一部分,它跟Activity栈的本质差异除了在数据结构上和逻辑上的不同之外,主要区别还在于: 1.Fragment管理是在进程空间内的 2.Fragment的管理一般情况下是一个Window下进行的. Fragment的管理在一个进程空间内是比较好理解的,因为我们知道Activity的管理其实相对复杂,它的管理是通过IPC调用,IPC的一端是我们的Client,而作为Server的是Ams服务.Activity的管理是基于Window的,而Fr