java后端IM消息推送服务开发——协议

最近在一家saas企业使用Mqtt开发IM消息推送服务,把开发中的一些问题记录下来,项目仍在商用中,完整的消息服务包括4个模块---协议protocol,信令Signal,规则Rule,状态Status,这个主题主要是协议protocol部分。

主要技术涉及到MongoDB,webservice,httpclient,Mqtt等

protocol分为四个模块类来实现,当然这是为了以后的扩展性比较好

首先看一下我们的主类,主要是mqtt基础方法的一个框架

public class MqttProtocol
{
	private static Logger logger = Logger.getLogger(MqttProtocol.class);
    public static final String HOST = "tcp://xx.xx.xx.xx:1883";
    private static final String CLIENTID = "yyyy";
    private MqttClient client;
    private MqttConnectOptions options = new MqttConnectOptions();
    //private String userName = "admin";
    //private String passWord = "public";
    public MqttMessage message;
    private PushCallback callback;
    /**
     * 用于初始化mqttclient客户端,设置回调函数,同时连接mqtt服务器
     * @throws MqttException
     */
       public MqttProtocol() throws MqttException
       {
           //MemoryPersistence设置clientid的保存形式,默认为以内存保存
           client = new MqttClient(HOST, CLIENTID, new MemoryPersistence());
           callback = new PushCallback();
           client.setCallback(callback);
           options = new MqttConnectOptions();
           options.setCleanSession(false);
           options.setKeepAliveInterval(60);
           connect();
       }
       /**
        * 连接mqtt消息服务器,同时设置了断开重连的功能,主要是为了高可用性考虑,在断网服务器崩溃时候我们的程序仍然不会终止
        */
       private void connect()
       {
      	 SimpleDateFormat sdf= new SimpleDateFormat(Constant.DATE_FORMAT_MDYHMS);
      	 System.out.println(sdf.format(System.currentTimeMillis()));
           boolean tryConnecting = true;
           while (tryConnecting) {
             try {
               client.connect(options);
             } catch (Exception e1) {
          	   System.out.println("Connection attempt failed with '"+e1.getCause()+
                    "'. Retrying.");
             }
             if (client.isConnected()) {
          	   System.out.println("Connected.");
               tryConnecting = false;
             } else {
               pause();
             }
           }
       }
       private void pause() {
   	    try {
   	      Thread.sleep(1000);
   	    } catch (InterruptedException e) {
   	      // Error handling goes here...
   	    }
   	  }
       /**
        *
        * @param topic
        * @param qos
        * @throws MqttPersistenceException
        * @throws MqttException
        * 订阅相关主题
        */
       public void subscribe(String topic , int qos) throws MqttPersistenceException,
       		MqttException
       {
      	 client.subscribe(topic, qos);
       }
       /**
        *
        * @throws MqttPersistenceException
        * @throws MqttException
        * 断开连接服务器
        */
       public void disconnect() throws MqttPersistenceException,
  		MqttException
  	 {
      	 client.disconnect();
       }
       /**
        *
        * @author binshi
        *实现mqttcallback接口,主要用于接收消息后的处理方法
        */
       private class PushCallback implements MqttCallback {
      	 /**
      	  * 断开后 系统会自动调用这个函数,同时在这个函数里进行重连操作
      	  */
   	    public void connectionLost(Throwable cause) {
   	        // 连接丢失后,一般在这里面进行重连
   	    	System.out.println("连接断开,可以做重连");
   	        connect();
   	        try {
				subscribe(Constant.TOPIC_MQTT_PROTOCOL, 2);
			} catch (MqttPersistenceException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			} catch (MqttException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
   	    }
   	    /**
   	     * 消息成功传送后,系统会自动调用此函数,表明成功向topic发送消息
   	     */
   		@Override
  		public void deliveryComplete(IMqttDeliveryToken arg0) {
  			// TODO Auto-generated method stub
   			System.out.println("deliveryComplete---------" + arg0.isComplete());
  		}
   		/**
   		 * 连接mongo数据库,返回关于具体collection的Mongocollection
   		 * @param collectionname
   		 * @return
   		 */

   		public void messageArrived(String topic, MqttMessage message) throws Exception
   		{
			System.out.println(topic);
	    	SimpleDateFormat sdf= new SimpleDateFormat(Constant.DATE_FORMAT_MDYHMS);
	    	System.out.println(sdf.format(System.currentTimeMillis()));
	    	System.out.println("接收消息主题 : " + topic);
	    	System.out.println("接收消息Qos : " + message.getQos());
	    	System.out.println("接收消息内容 : " + new String(message.getPayload()));
	    	//1 抽取事件信令消息
		    String messagejudge=new String(message.getPayload());
		    System.out.println("忽略所有robot消息以及offline离线消息");
		    JSONObject jo=new JSONObject();
			try {
				 jo=JSONObject.fromObject(messagejudge);
			} catch (Exception e) {
				e.printStackTrace();
			}
			String from=jo.getString("from");
			System.out.println("获得from"+from);
			System.out.println("确定消息是否包含offline,如果包含取得offline,为1就不处理");
			String offline=null;
			if(messagejudge.contains("offline"))
			{
				offline=jo.getString("offline");
			}
			if((offline==null)&&(!from.contains("robot")))
			{
				System.out.println("处理非系统消息和非离线消息");
				String type=jo.getString("type");
				System.out.println("获得type"+type);
				if(type.equals("shakehand"))
				{
					System.out.println("处理shakehand消息");
					String admin="doyounkowwhy";
					if(jo.toString().contains("admin"))
					{
						admin=jo.getString("admin");
					}
					System.out.println("取得admin 如果为1定义为客服,否则为普通用户 admin为"+admin);
					if(admin.equals("1"))
					{
						System.out.println("处理客服握手消息");
						System.out.println("发送握手成功消息");
						MqttTopic retopic=client.getTopic(topic);
						MsgOperation.sendSysMsgToClient(from,"0", "1005", "握手成功", null,retopic);
						System.out.println("向客户端发送离线未接收的消息");
						String convid=jo.getString("convid");
						String database="dolina";
						String collection="messages";
						MongoDBDao.getMongoDBDaoInstance().sendOfflineMsgToClient(from, convid,retopic,database,collection);
					}
					else
					{
						System.out.println("处理普通用户的握手消息");
						String appid=jo.getString("appid");
						String pageid=jo.getString("pageid");
						String convid=jo.getString("convid");
						MqttTopic retopic=client.getTopic(topic);
						MsgOperation.sendShakeHandInfo(from,convid,appid,pageid,retopic);
					}
				}
				else if(type.equals("text")||type.equals("image"))
				{
					System.out.println("处理图片和文字消息");
					String tmpindex=jo.getString("tmpindex");
					String convid=jo.getString("convid");
					MqttTopic retopic=client.getTopic(topic);
					MsgOperation.getTextMsg( tmpindex, from, convid, retopic);
					System.out.println("保存图片文字消息");
					String database="dolina";
					String collection="messages";
					MongoDBDao.getMongoDBDaoInstance().saveTextMsg(database,collection,jo);
				}
				else if(type.equals("ack"))
				{
					System.out.println("处理ack消息");
					String tmpindex=jo.getString("tmpindex");
					String convid=jo.getString("convid");
					String database="dolina";
					String collection="messages";
					MongoDBDao.getMongoDBDaoInstance().getAck(tmpindex,convid,from,database,collection);
				}
			}

   		}
   	}
       /**
        *
        * @param args
        * @throws MqttException
        * 整个工程从这里开始执行,生成可执行jar包,这个设置为主类。
        */
       public static void main(String[] args) throws MqttException
       {
    	   MqttProtocol signal = new MqttProtocol();
           signal.message = new MqttMessage();
           /**
           server.message.setQos(2);
           server.message.setRetained(false);
           server.message.setPayload("给客户端124推送的信息".getBytes());
           server.subscribe("/engyne/1/7/169573fcbc96a816281192222", 2);
           */
           signal.subscribe(Constant.TOPIC_MQTT_PROTOCOL, 2);
           System.out.println(signal.message.isRetained() + "------ratained状态");
  	}
}

接下来使我们的远程连接模块,主要是通过给定的url调用远程接口

public class RemoteOperation
{
	private static Logger logger = Logger.getLogger(MqttProtocol.class);
	public static JSONObject remoteCall(String url) throws HttpException, IOException
		{
			HttpClient httpClient = new HttpClient();
	    	GetMethod method =null ;
	    	method=new GetMethod(url);
	    	int retcode = httpClient.executeMethod(method);
	    	if (retcode != HttpStatus.SC_OK)
	    	{// 发送不成功
	          logger.info("远程调用出错");
	          return null;
	        }
	    	else
	        {
	        	String body = method.getResponseBodyAsString();
	        	logger.info(body+"远程调用php成功");
	        	JSONObject jsonObject=new JSONObject();
				try {
					 jsonObject=JSONObject.fromObject(body);
				} catch (Exception e) {
					e.printStackTrace();
				}
				if (method != null)
			    {
			        method.releaseConnection();
			    }
				return jsonObject;
	       } 

		}
}

下面是Mongo数据库的相关操作的一个封装,设计为单例模式,相当于每次都使用同一个client打开连接,类似于连接池的概念,当然业务逻辑部分可以更换

public class MongoDBDao
{
	private static Logger logger = Logger.getLogger(MongoDBDao.class);
	/**
     * MongoClient的实例代表数据库连接池,是线程安全的,可以被多线程共享,客户端在多线程条件下仅维持一个实例即可
     * Mongo是非线程安全的,目前mongodb API中已经建议用MongoClient替代Mongo
     */
    private MongoClient mongoClient = null;
    /**
     *
     * 私有的构造函数
     * 作者:shibin
     */
    private MongoDBDao(){
        if(mongoClient == null){
        	String url = Constant.MONGO_MQTT_URL;
    	    String user = Constant.MONGO_MQTT_USER;
    	    String password = Constant.MONGO_MQTT_PASSWORD;
    	    String database = Constant.MONGO_MQTT_DATABASE;
    	    int port = 27017;
    	    ServerAddress serverAddress = new ServerAddress(url, port);
            List<ServerAddress> serverAddresses = new ArrayList<ServerAddress>();
            serverAddresses.add(serverAddress);
            MongoCredential credential = MongoCredential.createCredential(user, database, password.toCharArray());
            List<MongoCredential> credentials = new ArrayList<MongoCredential>();
            credentials.add(credential);
            mongoClient = new MongoClient(serverAddresses, credentials);
            System.out.println(mongoClient);
            System.out.println("初始化client完成");
        }
    }  

    /********单例模式声明开始,采用饿汉式方式生成,保证线程安全********************/  

    //类初始化时,自行实例化,饿汉式单例模式
    private static final MongoDBDao mongoDBDao = new MongoDBDao();
    /**
     *
     * 方法名:getMongoDBDaoImplInstance
     * 作者:shibin
     *
     * 描述:单例的静态工厂方法
     * @return
     */
    public static MongoDBDao getMongoDBDaoInstance(){
        return mongoDBDao;
    }
    public  void sendOfflineMsgToClient(String from, String convid,MqttTopic retopic,String database,String collection) throws MqttPersistenceException, MqttException
	{
		System.out.println("获得message的连接");
		MongoDatabase mongoDatabase = mongoClient.getDatabase(database);
        MongoCollection mongoCollection = mongoDatabase.getCollection(collection);
		System.out.println("取得convid所对应的msg列表");
		BasicDBObject query = new BasicDBObject();
		query.put("_id", convid);
		FindIterable<Document> iterable=null;
		iterable = mongoCollection.find(query);
		if(iterable.first()!=null)
		{
			System.out.println(iterable.first());
			String res= iterable.first().toJson();
			 JSONObject jo=new JSONObject();
			try {
				 jo=JSONObject.fromObject(res);
			} catch (Exception e) {
				e.printStackTrace();
			}
			JSONArray jsonArray=jo.getJSONArray("msg");
			for(int i=0;i<jsonArray.length();i++)
			{
				String read=jsonArray.getJSONObject(i).getString("read");
				System.out.println("获得msg对应的第"+i+"条记录的read信息"+read);
				System.out.println("判断read是否包含from的信息,如果不包含且这条消息不是他自己发的就给她发送这条消息");
				if(!read.contains(from)&&!jsonArray.getJSONObject(i).getString("from").equals(from))
				{
					System.out.println("获得这条消息的原型,然后加上offline=1并发送消息");
					JSONObject msg=jsonArray.getJSONObject(i);
					msg.put("offline", "1");
					retopic.publish(msg.toString().getBytes(), 0, false);
				}
				else
				{
					System.out.println("no  offline message for "+from);
				}
			}
		}
	}
    public  void saveTextMsg(String database,String collection,JSONObject jo)
	{
    	MongoDatabase mongoDatabase = mongoClient.getDatabase(database);
        MongoCollection mongoCollection = mongoDatabase.getCollection(collection);
		BasicDBObject query = new BasicDBObject();
		String convid=jo.getString("convid");
    query.put("_id", convid);
    FindIterable iterable;
    iterable = mongoCollection.find(query);
    System.out.println("更新message之前的值"+iterable.first());
	Bson filter = Filters.eq("_id", convid);
	Document content = new Document();

	String type=jo.getString("type");
	if(type.equals("text"))
	{
		String contentMsg=jo.getJSONObject("content").getString("content");
		content.put("content", contentMsg);
	}
	else
	{
		String url=jo.getJSONObject("content").getString("url");
		content.put("url", url);
	}
	String admin=jo.getJSONObject("extra").getString("admin");
	String headimgurl=jo.getJSONObject("extra").getString("headimgurl");
	String nickname=jo.getJSONObject("extra").getString("nickname");
	String from=jo.getString("from");
	String tmpindex=jo.getString("tmpindex");
	Document extra = new Document();
	extra.put("nickname", nickname);
	Document doc = new Document();
	doc.put("from",from );
	ArrayList<String> read=new ArrayList<String>();
	doc.put("read", read);
	Document tdoc = new Document();
	tdoc.put("msg", doc);
	UpdateOptions updateOptions=new UpdateOptions();
	updateOptions.upsert(true);
	mongoCollection.updateOne(filter, new Document("$addToSet", tdoc), updateOptions);
    iterable = mongoCollection.find(query);
    System.out.println("更新message之后的值"+iterable.first());
	}
public  void getAck(String tmpindex,String convid,String from,String database,String collection)
	{
		System.out.println("接收到ack消息后更新message中的read字段");
		MongoDatabase mongoDatabase = mongoClient.getDatabase(database);
        MongoCollection mongoCollection = mongoDatabase.getCollection(collection);
   	BasicDBObject query = new BasicDBObject();
    query.put("_id", convid);
    query.put("msg.tmpindex", tmpindex);
    BasicDBObject query1 = new BasicDBObject();
   	query1.put("_id", convid);
   	FindIterable iterable;
	FindIterable iterable2;
   	iterable = mongoCollection.find(query1);
	iterable2 = mongoCollection.find(query);
   	System.out.println("更新message满足id过滤条件之前的值"+iterable.first());
	System.out.println("更新message满足id和tmpindex过滤条件之前的值"+iterable2.first());
   	if(iterable2.first()!=null)
   	{
   		Document doc = new Document();
	   	doc.put("msg.$.read", from);
	   	UpdateOptions updateOptions=new UpdateOptions();
	   	updateOptions.upsert(true);
	   	mongoCollection.updateOne(query, new Document("$addToSet", doc), updateOptions);
   	}

   	iterable = mongoCollection.find(query1);
   	System.out.println("更新messages之后的值"+iterable.first());

	}
}

剩下的关于业务逻辑方面的就不多说了,主要是关于mqtt高可用性断开重连的功能以及mongo相关的操作

时间: 2024-08-15 13:37:46

java后端IM消息推送服务开发——协议的相关文章

java后端IM消息推送服务开发——规则

这一部分主要是负责智能消息推送,根据不同公司的不同产品的不同页面的不同事件的不同用户推送不同的消息,这也是整个业务逻辑的核心 技术主要涉及到Mysql,文件读取,dbutils,beanutils,mqtt,C3P0连接池 memcache package net.engyne.mqqt; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.List; import net.engyne.

atitit.极光消息推送服务器端开发实现推送&#160;&#160;jpush&#160;v3.&#160;总结o7p

atitit.极光消息推送服务器端开发实现推送  jpush v3. 总结o7p 1. 推送所设计到底功能1 1.1. 内容压缩1 1.2. 多引擎1 2. reg  ,设置appkey and pwdkey1 3. 下载server  sdk   v31 4. push推送样例1 5. Code3 1. 推送所设计到底功能 1.1. 内容压缩 1.2. 多引擎 2. reg  ,设置appkey and pwdkey 3. 下载server  sdk   v3 https://github.c

Android推送服务开发

由于公司之前使用的手机客户端推送服务是极光推送,给公司造成一年几十万的服务费,因此,公司决定开发自己的一套推送服务,初步的技术选型是: 服务端:netty4 关于netty框架在我的下面的博客里面我整理了相关资料,本来还有一些关于mina的由于时间原因暂时没整理出来. 为了便于自己测试,自己动手实现了如何使用netty完成服务端消息推送以及在Android客户端如何将接受到的信息显示在通知栏,整体思路大概是这样的: 服务端使用netty框架开启基于TCP监听服务. 客户端发起TCP连接(不关闭,

Android实战——第三方服务之Bmob后端云的推送服务的集成和使用(三)

第三方服务之Bmob后端云的推送服务的集成和使用(三) 事先说明:这里的一切操作都是在集成了BmobSDK之后实现的,如果对Bmob还不了解的话,请关注我第一篇Bmob文章 步骤一:推送服务的集成 在app的build.gradle中添加compile依赖文件: 在manifests文件中配置权限: 在manifests文件中添加需要的配置: 创建一个Receiver接受推送消息: 在Activity的onCreate方法中初始化BmobPush: 在Bmob后台管理中设置: 步骤二:推送服务的

消息推送服务

消息推送服务 服务器推送目前流行就是私信.发布/订阅等模式,基本上都是基于会话映射,消息对列等技术实现的:高性能.分布式可以如下解决:会话映射可采用redis cluster等技术实现,消息对列可使用kafka等分布式消息队列方案实现. APM.Server基于简单 1 static ConcurrentDictionary<string, Session> _sessionDic = new ConcurrentDictionary<string, Session>(); 和 1

26.app后端怎么架设推送服务

推送服务已经是app的标配了.架设推送服务,除了可以使用第三方服务商外,也有大量的开源技术可以选择. 现在推送主要分两块,android推送和ios推送,在下面分别论述: 1.    Android推送 Android手机由于没有系统的限制,当app进入了后台后,也能运行服务,所以android的推送可以基于长连接,这就注定了android的推送服务器和一般的app后端是不一样,技术细节上,架构上也不一样,幸好,现在有大量的开源软件可以轻松地实现推送. 下面深入研究过的开源推送软件:gopush

基于Qt移动应用的消息推送服务原理与应用

说到移动应用,大家都觉得移动嘛,当然是Java和Object-c来做啦,什么推送啊,各种系统调用啊,其实不然?如果你了解Qt, 你就知道我说的不然,也有所道理. 说道几点 一.目前Android的移动的消息.通知推送 1)轮询(Pull)方式:应用程序应当阶段性的与服务器进行连接并查询是否有新的消息到达,你必须自己实现与服务器之间的通信,例如消息排队等.而且你还要考虑轮询的频率,如果太慢可能导致某些消息的延迟,如果太快,则会大量消耗网络带宽和电池. 2)SMS(Push)方式:在Android平

SignalR Self Host+MVC等多端消息推送服务(2)

一.概述 上次的文章中我们简单的实现了SignalR自托管的服务端,今天我们来实现控制台程序调用SignalR服务端来实现推送信息,由于之前我们是打算做审批消息推送,所以我们的demo方向是做指定人发送信息,至于做聊天消息和全局广播,这里就不在进行演示了. 二.创建控制台客户端 1.在SignalRProject解决方案下新建一个名为Clinet的控制台 2.在程序包管理控制台中输入以下命令 1 Install-Package Microsoft.AspNet.SignalR.Client 3.

SignalR Self Host+MVC等多端消息推送服务(1)

一.概述 由于项目需要,最近公司项目里有个模块功能,需要使用到即时获得审批通知:原本的设计方案是使用ajax对服务器进行定时轮询查询,刚刚开始数据量和使用量不大的时候还好,后来使用量的增加和系统中各种业务的复杂度增加,服务器的压力也越来越大,于是我想使用消息推送的方式替换掉ajax轮询查询,当有审批提交时,调用推送方法,将消息推送到下一审批人那,这样就减低了服务器的压力. Signal 是微软支持的一个运行在.NET平台上的 html websocket 框架.它出现的主要目的是实现服务器主动推