OpenFire源码学习之二十五:消息回执与离线消息(下)

这一篇紧接着上面继续了。

方案二

基于redis的消息回执。主要流程分为下面几个步骤:

1)将消息暂存储与redis中,设置好消息的过期时间

2)客户端回执消息id来消灭暂存的消息

3)开通单独线程论坛在第1)步中的消息。根据消息的时间重新发送消息。如果消息第一次存放的时间大雨有效期(自定义10秒),解析消息中的to查找用户是否还在线。如果在则T掉(因为它长时间不理会服务的重要命令),如果不在线则将消息放置离线表。

OK,先来看看消息的存储格式吧。

1.MESSAGE消息 用户集合

SADD  SOGU:[username]  [VALUE(messageID)] [VALUE(messageID)] ...

2.已读消息设备集合

SADD  RT:[terminalid]  [VALUE(messageID)] [VALUE(messageID)] ...

3.消息内容

HMSET  OGM:[messageID]  CREATIONDATE [VALUE]  UPDATEDATE [VALUE] STANZA [VALUE]

4.用户、设备关联

SADD URT:[USERNAME]  [VALUE(terminalid)] .......

(先根据消息id查找时间,在java中排序后 查找stanza)

MESSAGE

--离线表

ZADD OFOFFLINE:[username]  [INDEX(时间戳)] [VALUE(messageID)] 、[VALUE]、[VALUE]......              [VALUE]

HMSET OFOFFLINE:[messageID] STANZA[VALUE]

CREATIONDATE [VALUE]  MESSAGESIZ[VALUE]

将消息暂时消息存储:

    public void storeMessage(String username, Packet packet) {

		Jedis jedis = XMPPServer.getInstance().getGroupRedisManager().getJedis();
    	String packetID = "";
    	if (packet instanceof Message)
    		packetID = ((Message)packet).getID();
   		else if (packet instanceof IQ)
   			 packetID = ((IQ)packet).getID();
   		else
   			return;

    	try {
    		jedis.sadd("SOGU:" + username, packetID);
        	Map<String, String> hash = new HashMap<String, String>();
        	hash.put("STANZA", packet.toXML());
        	hash.put("CREATIONDATE", StringUtils.dateToMillis(new Date()));
        	jedis.hmset("OGM:" + packetID, hash);
		} finally {
			XMPPServer.getInstance().getGroupRedisManager().returnRes(jedis);
		}

    	htp.execute(addMessagesToDB(packet));
    }

    private Runnable addMessagesToDB(final Packet packet) {
    	return new Runnable() {
			@Override
			public void run() {
				MyDBopt.insertMessage(packet);
			}

客户端收到消息来回执服务端的操作

    private void handle(IQ packet) {
    	JID recipientJID = packet.getTo();
    	if (IQ.Type.crs != packet.getType()) {
            // Check if the packet was sent to the server hostname
            if (recipientJID != null && recipientJID.getNode() == null &&
                    recipientJID.getResource() == null && serverName.equals(recipientJID.getDomain())) {
                Element childElement = packet.getChildElement();
                if (childElement != null && childElement.element("addresses") != null) {
                    // to route this packet
                    multicastRouter.route(packet);
                    return;
                }
            }
    	}

        if (IQ.Type.crs == packet.getType()) {
        	String username = packet.getFrom().getNode();
        	String terminal = packet.getFrom().getTerminal();
        	String msgId = packet.getID();
        	if (username == null || msgId == null || "".equals(msgId)) {
        		return ;
        	}
        	if (terminal == null) {terminal = username + "_" + System.currentTimeMillis()%1000000; }
        	Jedis jedis = XMPPServer.getInstance().getGroupRedisManager().getJedis();

        	try {
        		jedis.sadd("URT:" + username, terminal);
            	jedis.sadd("RT:" + terminal, packet.getID());
			} finally {
				XMPPServer.getInstance().getGroupRedisManager().returnRes(jedis);
			}

    		threadPool.execute(createTask(msgId, username, terminal));
        	return;
        }
        if (packet.getID() != null && (IQ.Type.result == packet.getType() || IQ.Type.error == packet.getType())) {
            // The server got an answer to an IQ packet that was sent from the server
            IQResultListener iqResultListener = resultListeners.remove(packet.getID());
            if (iqResultListener != null) {
                resultTimeout.remove(packet.getID());
                if (iqResultListener != null) {
                    try {
                        iqResultListener.receivedAnswer(packet);
                    }
                    catch (Exception e) {
                        Log.error(
                                "Error processing answer of remote entity. Answer: "
                                        + packet.toXML(), e);
                    }
                    return;
                }
            }
        }
        try {
            // Check for registered components, services or remote servers
            if (recipientJID != null &&
                    (routingTable.hasComponentRoute(recipientJID) || routingTable.hasServerRoute(recipientJID))) {
                // A component/service/remote server was found that can handle the Packet
                routingTable.routePacket(recipientJID, packet, false);
                return;
            }
            if (isLocalServer(recipientJID)) {
                // Let the server handle the Packet
                Element childElement = packet.getChildElement();
                String namespace = null;
                if (childElement != null) {
                    namespace = childElement.getNamespaceURI();
                }
                if (namespace == null) {
                    if (packet.getType() != IQ.Type.result && packet.getType() != IQ.Type.error) {
                        // Do nothing. We can't handle queries outside of a valid namespace
                        Log.warn("Unknown packet " + packet.toXML());
                    }
                }
                else {
                    // Check if communication to local users is allowed
                    if (recipientJID != null && userManager.isRegisteredUser(recipientJID.getNode())) {
                        PrivacyList list =
                                PrivacyListManager.getInstance().getDefaultPrivacyList(recipientJID.getNode());
                        if (list != null && list.shouldBlockPacket(packet)) {
                            // Communication is blocked
                            if (IQ.Type.set == packet.getType() || IQ.Type.get == packet.getType()) {
                                // Answer that the service is unavailable
                                sendErrorPacket(packet, PacketError.Condition.service_unavailable);
                            }
                            return;
                        }
                    }
                    IQHandler handler = getHandler(namespace);
                    if (handler == null) {
                        if (recipientJID == null) {
                            // Answer an error since the server can't handle the requested namespace
                            sendErrorPacket(packet, PacketError.Condition.service_unavailable);
                        }
                        else if (recipientJID.getNode() == null ||
                                "".equals(recipientJID.getNode())) {
                            // Answer an error if JID is of the form <domain>
                            sendErrorPacket(packet, PacketError.Condition.feature_not_implemented);
                        }
                        else {
                            // JID is of the form <[email protected]>
                            // Answer an error since the server can't handle packets sent to a node
                            sendErrorPacket(packet, PacketError.Condition.service_unavailable);
                        }
                    }
                    else {
                        handler.process(packet);
                    }
                }
            }
            else {
                // JID is of the form <[email protected]/resource> or belongs to a remote server
                // or to an uninstalled component
                routingTable.routePacket(recipientJID, packet, false);
            }
        }
        catch (Exception e) {
        ......
        }
    }

离线消息

离线消息的优化。

同样可以拓展XMPP。比如

客户端获取离线消息,可以这么通讯。

1)先向服务器询问,我总的离线消息的基本状况(有多大,有多少条)

<iq id="BfI3V-47" to="8ntmorv1ep4wgcy" type="get" from="[email protected]">
  <query xmlns="http://jabber.org/protocol/offmsg#bif"/>
</iq>

2)服务端返回

<iq type="result" id="BfI3V-47" from="8ntmorv1ep4wgcy" to="[email protected]">
  <query xmlns="http://jabber.org/protocol/offmsg#bifs">
     <size>1024b</>
     <count>128</>
     <idset>1001,1002...</>
  </query>
</iq>

3)客户端发送分批获取命令,一次给我发10条发完为止。

<iq id="BfI3V-47" to="8ntmorv1ep4wgcy" type="get" from="[email protected]">
  <query xmlns="http://jabber.org/protocol/offmsg#start"/>
   <pagesize>10</>
</iq>

4)服务端开始发送消息

<iq type="result" id="BfI3V-47" from="8ntmorv1ep4wgcy" to="[email protected]">
  ......
</iq>
<iq type="result" id="BfI3V-47" from="8ntmorv1ep4wgcy" to="[email protected]">
  ......
</iq>
.....

5)告诉客户端我都发完了

<iq type="result" id="BfI3V-47" from="8ntmorv1ep4wgcy" to="[email protected]">
  <query xmlns="http://jabber.org/protocol/offmsg#end"/>
</iq>

6)客户端本地校验,回执已经接收到的消息

<message id="BfI3V-47" to="8ntmorv1ep4wgcy"
 from="[email protected]/Spark 2.6.3" type="crs"/>

这里本人只是做了一个简单的示意想法。如果需要更加精准的不妨在仔细想想消息处理与格式。

离线消息存储。

将消息存储到redis中:

public void addMessageToRedis(Message message) {
    	if (message == null) {
            return;
        }
        JID recipient = message.getTo();
        String username = recipient.getNode();
        // If the username is null (such as when an anonymous user), don't store.
        if (username == null || !UserManager.getInstance().isRegisteredUser(recipient)) {
            return;
        }
        else
        if (!XMPPServer.getInstance().getServerInfo().getXMPPDomain().equals(recipient.getDomain())) {
            // Do not store messages sent to users of remote servers
            return;
        }
        String msgXML = message.getElement().asXML();

    	Jedis jedis = XMPPServer.getInstance().getChatMessageJedisPoolManager().getJedis();

    	try {
    		String newDate = StringUtils.dateToMillis(new java.util.Date());
        	String id = MessageIdTactics.mid(username);
        	jedis.zadd("OFOFFLINE:" + username, Long.valueOf(newDate), id
        	Map<String, String> hash = new HashMap<String, String>();
        	hash.put("STANZA", msgXML);
        	hash.put("MESSAGESIZ", String.valueOf(msgXML.length()));
        	hash.put("CREATIONDATE", newDate);
        	jedis.hmset("OFOFFLINE:" + id, hash);
		} finally {
			XMPPServer.getInstance().getChatMessageJedisPoolManager().returnRes(jedis);
		}

    	if (sizeCache.containsKey(username)) {
            int size = sizeCache.get(username);
            size += msgXML.length();
            sizeCache.put(username, size);
        }
    	htp.execute(addMessageToDB(message));
    }

Redis优化这块就到这啦。主要要做的就是:

第一:存储用户或者MUC、Group等这些都需要设置消息存储的生命周期。当用户不处于活跃状态或者长时间不登陆的。要从redis中提出。免得浪费资源。当用户重新加载的时候再将他放置redis中

第二:将需要回执消息和离线消息分开。需要回执的消息需要设置他的生命周期。离线表最好做个定时器。轮询消息。将超时出现范围内的消息(比如周期为一周)的消息同步至关系表中。这里的离线消息需要将用户的设备分开来。

这里要考虑不同的设备终端等很多不同场景,问题会比较绕口。欢迎大家和我邮件交流。

时间: 2024-10-24 02:00:45

OpenFire源码学习之二十五:消息回执与离线消息(下)的相关文章

OpenFire源码学习之二十九:集群

集群 Openfire的给集群提供了多种方案.一种是基于Hazelcast插件,还有基于Oracle的coherence插件. Linux集群配置 一.修改配置文件 /etc/hosts文件 openfire1 192.168.2.104 openfire2192.168.2.240 每台主机都需要配置 二.添加jar包 将coherence.jar.coherence-work.jar.tangosol.jar添加到lib目录 将clustering.jar放到plugins下面 Hazelc

OpenFire源码学习之二十:在openfire中使用redis插件(下)

Redis插件实现 首先来看下插件目录: RedisServicePlugin 源码清单: import java.io.File; import java.sql.Connection; import java.sql.PreparedStatement; import java.util.Collection; import java.util.HashMap; import java.util.Map; import org.jivesoftware.database.DbConnecti

OpenFire源码学习之二十二:openfie对用户的优化(下)

用户名片 在预初始化中,贴出来用户名片的程序.这里也一样不在重复. 首先同样先修改系统属性: provider.vcard.className org.jivesoftware.util.redis.expand.RedisVCardProvider 然后需要修改VCardManager名片管理这个类. RedisVCardProvider: import redis.clients.jedis.Jedis; public class RedisVCardProvider implements

OpenFire源码学习之二十八:与其他系统的用户整合

与三方系统整合.Openfire提供了一套不错的解决方案.其实openfire的用户插件也做http方式的用户数据同步.关于openfire的用户整合,本人也不是十分赞同(这仅仅只是在我们自己的项目下).我们的办法是做数据共享池.这个解决方案可以用到redis. 下面是opnfire的 解决方案 openfire可以非常方便的整合现有系统用户. 进入openfire管理控制台-服务器-服务管理器-系统属性 可以发现如下配置 : 以下Provider是openfire默认自己管理用户组 属性名 属

OpenFire源码学习之二十一:openfie对用户的优化(上)

用户类 优化用户主要是要解决用户的连接量.已经对用户的访问速度和吞吐量. 预初始化 在前面的带面中提出来了用户的预初始化.这里就不在贴出来了.下面将redis用户库连接池处理贴出来UserJedisPoolManager public class UserJedisPoolManager extends BasicModule{ private static final Logger log = LoggerFactory.getLogger(UserJedisPoolManager.class

OpenFire源码学习之二十三:关于消息的优化处理

消息处理 之前有说过,openfire的消息处理策略本人并不是很喜欢.先看下openfire上脱机消息策略. 个人认为消息关于会话的消息,用户的存储量应该无限大.服务器不应该被消息吃撑了.所谓聊天通讯,这一关很重要. Openfire的消息是什么流程呢. 1.当用户登陆连接的时候.握手.认证.绑定资源.获取花名册.获取离线消息. 2.服务端会查找关系型数据库.经本人测试离线消息在数据库表中达到100万条以上的时候,查询速度非常慢,甚至会导致openfire奔溃. ..... 那么openfire

three.js 源码注释(二十五)Core/Geometry.js

商域无疆 (http://blog.csdn.net/omni360/) 本文遵循"署名-非商业用途-保持一致"创作公用协议 转载请保留此句:商域无疆 -  本博客专注于 敏捷开发及移动和物联设备研究:数据可视化.GOLANG.Html5.WEBGL.THREE.JS,否则,出自本博客的文章拒绝转载或再转载,谢谢合作. 俺也是刚开始学,好多地儿肯定不对还请见谅. 以下代码是THREE.JS 源码文件中Core/Geometry.js文件的注释. 更多更新在 : https://gith

OpenFire源码学习之二十七:Smack源码解析

Smack Smack是一个用于和XMPP服务器通信的类库,由此可以实现即时通讯和聊天.Android中开发通讯APP也可以使用这个包.关于smack的中文开发文档,目前网上也有很多. 下面本,将从源码中分析smack的几个案例. 连接 关于smack的Connection是连接XMPP服务器的默认实现.他有两个构造函数,一个是XMPPConecttion(String) 接收服务器地址名的参数.这个是默认的比较常用的构造方法. l 查找一个DNS的SRC,首先需要找到它精确的服务器地址和端口默

android源码解析(二十五)--&gt;onLowMemory执行流程

上篇文章中我们分析了Activity的onSaveInstanceState方法执行时机,知道了Activity在一般情况下,若只是执行onPause方法则不会执行onSaveInstanceState方法,而一旦执行了onStop方法就会执行onSaveInstanceState方法,具体的信息,可以参见onSaveInstanceState方法执行时机:http://blog.csdn.net/qq_23547831/article/details/51464535 这篇文章中同样的我们分析