Tigase开发笔记6:packet流转机制 -> 一条消息(packet)的请求和响应过程解析

初看Tigase的packet内部流转机制一开始不是太明白。里面用到了较多的线程,代码不太看得懂。慢慢的通过一条消息的请求和响应的代码跟踪分析,搞清楚了消息流转的过程。

前言

本文使用Tigase Server version:7.0.2 进行的代码跟踪和分析。

使用工具:IntelliJ IDEA14.1.4

Tigase通过tigase.io包当中的代码读取网络中的字节数组,然后通过tigase.net包当中的类把字节数组转换为字符,最后通过tigase.xml包当中的XML解析器把这些字符转换成XML DOM对象。

图片流程说明

看tigase源码你会发现所有的tigase处理都是基于多线程,每个component都有自己的in和out处理线程,线程间的数据传输通过queue

总的流程大致就是:

文字流程说明

下面是请求和响应的步骤说明(只列出了关键的步骤)。

A. 从client到server的过程(请求-Request)
ClientConnectionManager和MessageRouter都间接或直接继承了AbstractMessageReceiver。
tigase.server.AbstractMessageReceiver – 它已经实现了四个接口:ServerComponent,MessageReceiver,Configurable和StatisticsContainer。

它通过自己的多个线程来管理内部数据队列,且能避免死锁。

它使用事件驱动的方式来处理数据,当packet被发送到AbstractMessageReceiver实例的abstract void processPacket(Packet packet)方法时,就立即启动了packet的处理工作。当然你还是需要实现抽象类当中的抽象方法,如果你还希望输出packet数据(例如当它收到请求时还需要发送响应),可以调用boolean addOutPacket(Packet packet)方法。
a. XMPPIOService负责接收客户端报文并转换为对应的packet放入队列receivedPackets(相当于in_queues)中,而SocketThread负责创建ReadThread和WriteThread线程,
ResultsListener则是SocketThread的一个内部类。ResultsListener负责调度socketReadThread()和socketWriteThread(), Client2Server读取和写入数据包的IO操作主要由XMPPIOService来完成,XMPPIOService实例则在read和write线程中被使用。

b. ClientConnectionManager则负责从XMPPIOService获取receivedPackets,并对收到的报文进行处理,并将处理后的报文放入MessageRouter的out_queues队列中。

c. MessageRouter重写了AbstractMessageReceiver的processPacket方法,在该方法中MessageRouter通过packet.getTo()得到组件的名称并转发packet到该组件的in_queues队列中,该目标服务组件(如ses-man,即SessionManager)可能在本机服务器上,也可能在该域(domain)集群的其他服务器上运行着,MessageRouter的查找顺序是先查本机服务器,找不到的话再去集群中查找;如果找到则将packet转发给目标组件处理,如果没找到则返回没有找到目标组件的错误消息。

d. 我们的组件一般都有自己的in_queues和out_queues. 当前组件的in_queues的数据来自于上一组件的out_queues。
int queueIdx = Math.abs(hashCodeForPacket(packet) % in_queues_size);
boolean result = in_queues.get(queueIdx).offer(packet, packet.getPriority().ordinal());//#这里的packet数据来自于packetFrom="xxx",即传给当前组件的上一组件。
B.从Service到Client的过程(响应-Response)

服务端响应的数据也是放到out_queues中的,各组件的对应的线程会对out_queues中的packet的to属性做解析,并将消息转发到指定目标。

其实消息的流转传递机制实现的核心就是packet的from(包含packetFrom和stanzaFrom)
和 to(packetTo和stanzaTo)属性,路由的路径会默认先取packetFrom或packetTo,其次再去取stanzaFrom和stanzaTo。

客户端发送一条ping命令

<iq type=‘get‘ id=‘purplee4ad721‘>

<ping xmlns=‘urn:xmpp:ping‘/>

</iq>

IOService.java 

public abstract class IOService<RefObject>            implements Callable<IOService<?>>, TLSEventHandler,            IOListener {

/**接收报文后call->处理客户端报文数据信息*/
@Overridepublic IOService<?> call() throws IOException {   writeData(null);

boolean readLock = true;

if (stopping) {      stop();   } else {      readLock = readInProgress.tryLock();      if (readLock) {         try {            processSocketData();            if ((receivedPackets() > 0) && (serviceListener != null)) {               serviceListener.packetsReady(this);            }    // end of if (receivedPackets.size() > 0)         } finally {            readInProgress.unlock();            if (!isConnected()) {               // added to sooner detect disconnection of peer - ie. client               if (log.isLoggable(Level.FINEST)) {                  log.log(Level.FINEST, "{0}, stopping connection due to the fact that it was disconnected, forceStop()", toString());               }               forceStop();            }         }      }   }

return readLock         ? this         : null;}


/** * Describe * <code>writeData</code> method here. * 这里是最后响应消息给客户端的方法,写入TCP连接中的Socket * @param data a * <code>String</code> value */protected void writeData(final String data) {
     ...


     ...

}

}

返回消息前还调用了下面两个类的对应write方法

SocketIO.java
@Overridepublic int write(final ByteBuffer buff) throws IOException {

}

TLSIO.java
public class TLSIO implements IOInterface {
         private int writeBuff(ByteBuffer buff) throws IOException {

}

}

IOService打包好了消息之后形成响应的packet包,->然后再执行到ConnectionManager的writePacketToSocket()方法->再执行到ClientConnectionManager的processPacket方法。

ConnectionManager



ClientConnectionManager

至此本次消息的请求和响应结束!server会继续通过AbstractMessageReceiver获取下一条消息进行处理,如此循环。。。


下面的资料希望对你也有所帮助
网络
  • connectionManager同时协调ConnectionOpenThread与SocketThread。
  • ConnectionOpenThread脱离上述组件,属于网络层实现,操作selector。它负责Selector.open。
  • IOService提供线程安全的call方法,XMPPIOService继承它,保存了连接信息,每个连接一个IOService。
  • SocketThread在实例化时,会启动多个线程,同时盯住selector。负责将每个确定的IOService进行数据处理。
  • 实现ConnectionOpenListener接口accept方法接收SocketChannel,组装IOService,交由SocketThread处理。
  • ConnectionManager用ConcurrentHashMap记录了所有的连接。
零碎
  • AbstractMessageReceiver.addPacket 往自己的in_queue里加数据,是阻塞的,如果满了会出事。
  • AbstractMessageReceiver.addPacketNB 往自己的in_queue里加数据,非阻塞的,和上一个的区别在于,一个是put一个是offer到queue。
  • AbstractMessageReceiver.addPackets 来一堆数据。
  • 所有in_queue里的数据,会被processPacket方法所处理。
  • 对应有addOutPacket。
  • 所有out_queue里的数据,都默认扔给parent的in_queue,没有parent就扔到自己的in_queue。
  • 所有in_queue的数据,都由processPacket具体的实现来处理。

tigase.server.ServerComponent – 这是一个非常基本的component接口。所有的component都必须实现接口中定义的方法。
tigase.server.MessageReceiver – 这个接口extends ServerComponent,所有希望接收数据packets的Component都需要实现接口中定义的方法,比如session manager和c2s connection manager。
tigase.conf.Configurable – 如果希望components可以被配置,则需要实现这个接口,所有默认定义基本都在这。在运行时,配置信息会被推送到这种类型的对象。components必须能够在运行时对变更的配置项进行处理,这一点在实现时要留神。
tigase.disco.XMPPService – 实现了这个对象的类可以对“ServiceDiscovery”请求做出响应。
tigase.stats.StatisticsContainer – 实现了这个对象的类可以返回运行时的统计信息。任何一个对象都可以实现这个接口用来收集统计信息

tigase.server.AbstractMessageReceiver –
它已经实现了四个接口:ServerComponent,MessageReceiver,Configurable和StatisticsContainer。它通过自己的多个线程来管理内部数据队列,且能避免死锁。

它使用事件驱动的方式来处理数据,当packet被发送到AbstractMessageReceiver实例的abstract
void processPacket(Packet packet)方法时,就立即启动了packet的处理工作。当然你还是需要实现抽象类当中的抽象方法,如果你还希望输出packet数据(例如当它收到请求时还需要发送响应),可以调用boolean
addOutPacket(Packet packet)方法。

tigase.server.ConnectionManager –
这是一个extend AbstractMessageReceiver的抽象类。正如其名,这个类专注于对连接进行管理工作。如果你的组件需要通过网络直接发送或接受数据(比如c2s connection,s2s connection 或者 连接到外部第三方jabber服务),你可以把它作为基类进行扩展。它会帮你把所有和网络有关的工作都打理好(例如io,重连,socket监听和连接握手等工作)。

如果你extend这个类,你需要知道数据来源于哪里:如果来源于MessageRouter,那么abstract
void processPacket(Packet packet)方法会被调用; 如果来源于网络连接,那么abstract
Queue processSocketData(XMPPIOService serv)方法会被调用。


版权声明:本文为博主原创文章,未经博主允许不得转载。

时间: 2024-11-08 19:05:57

Tigase开发笔记6:packet流转机制 -> 一条消息(packet)的请求和响应过程解析的相关文章

Tigase开发笔记1:深入认识XMPP协议

要学习基于XMPP协议的IM开发,首先要熟悉XMPP协议本身. XMPP协议的组成 主要的XMPP 协议范本及当今应用很广的XMPP 扩展: RFC 3920 XMPP:核心.定义了XMPP 协议框架下应用的网络架构,引入了XML Stream(XML 流)与XML Stanza(XML 节),并规定XMPP 协议在通信过程中使用的XML 标签.使用XML 标签从根本上说是协议开放性与扩展性的需要.此外,在通信的安全方面,把TLS 安全传输机制与SASL 认证机制引入到内核,与XMPP 进行无缝

Tigase开发笔记4:如何自定义插件 Plugin

其他博客比较好的介绍推荐:http://my.oschina.net/greki/blog/209726 1. 定义一个插件 四种处理器插件接口: 第一步 – 预处理 – XMPPPreprocessorIfc:这是预处理器插件需要实现的接口 第二步 –  处理 –    XMPPProcessorIfc:这是处理器插件需要实现的接口 第三步 –  投递 –    XMPPPostProcessorIfc:这是投递处理器插件需要实现的接口 第四步 –  过滤 –    XMPPPacketFil

项目开发笔记-传单下发 名片替换 文件复制上传/html静态内容替换/json解析/html解析

//////////////////////////// 注意: 此博客是个人工作笔记 非独立demo////////////////////////////////// ....................................................................................................................................................................

web前端开发笔记(2)

web前端开发笔记(1) 一.HTML标签书写有哪些规范? 页面编码. 文档声明. 关键字与描述. 行内元素不能包含块级元素. a标签不能嵌套a标签. 标签名和属性必须用小写字母书写,属性必须加引号,标签必须闭合,单标签页必须闭合. 页面中不要用 进行缩进,如需缩进,用css控制. html标签使用必须语义化. 要为img标签填写alt和title属性. 二.HTML静态页面出现中文乱码如何解决? 引入<meta charset="UTF-8"> 三.通常情况下块属性标签和

【Kinect开发笔记之(一)】初识Kinect

一.Kinect简介 Kinect是微软在2010年6月14日对XBOX360体感周边外设正式发布的名字.它是一种3D体感摄影机(开发代号"Project Natal"),同时它导入了即时动态捕捉.影像辨识.麦克风输入.语音辨识.社群互动等功能. 二.Kinect分类 Kinect for Xbox 360:该版本设计之初就是为了Xbox 360定制的,并未考虑其他的平台.从微软授权角度而言,它无法用于商业开发. Kinect for Windows : 固件上做了升级,支持"

安卓开发笔记——丰富多彩的TextView

随手笔记,记录一些东西~ 记得之前写过一篇文章<安卓开发笔记——个性化TextView(新浪微博)>:http://www.cnblogs.com/lichenwei/p/4411607.html 文章里实现个性化TextView的主要方法是通过替换的方式,对关键字进行一些个性化处理,晚上再来补充一种实现方式. 老规矩,先看下效果图: 晚上带来的这种实现方式是通过Android官方给我们提供的Html类下面的fromHtml方法,这个方法可以对字符串进行HTML格式化,让TextView等一些

安卓开发笔记——关于Handler的一些总结(上)

接上篇文章<安卓开发笔记——关于AsyncTask的使用>,今天来讲下在安卓开发里"重中之重"的另一个异步操作类Handler. 今天打算先讲下关于Handler的一些基本定义和使用方式 还是以一个下载图片为例,先看下实例效果: 好了,先来看下关于Handler的定义: 以上是官方对于Hanler类的描述,大致意思是说:Handler主要用于异步消息的处理:当发出一个消息之后,首先进入一个消息队列,发送消息的函数即刻返回,而另外一个部分在消息队列中逐一将消息取出,然后对消息

Android开发笔记(八十八)同步与加锁

同步synchronized 同步方法 synchronized可用来给方法或者代码块加锁,当它修饰一个方法或者一个代码块的时候,同一时刻最多只有一个线程执行这段代码.这就意味着,当两个并发线程同时访问synchronized代码块时,两个线程只能是排队做串行处理,另一个线程要等待前一个线程执行完该代码块后,才能再次执行synchronized代码块. 使用synchronized修饰某个方法,该方法便成为一个同步方法,在同一时刻只能有一个线程执行该方法.可是,synchronized的锁机制太

flask web开发笔记 -- 快速入门

flask web开发笔记 -- 快速入门 初始化 Flask应用需要创建应用实例. Web服务器通过Web Server Gateway Interface (WSGI)协议把从客户端接收到的请求传递给该对象.应用程序实例是Flask类对象,通常创建如下: from flask import Flask app = Flask(__name__) Flask类的构造函数唯一的参数是应用的主模块名或包名,用于确定应用的根目录.对于大多数应用程序,使用Python的__name__变量即可. 路由