java后端IM消息推送服务开发——规则

这一部分主要是负责智能消息推送,根据不同公司的不同产品的不同页面的不同事件的不同用户推送不同的消息,这也是整个业务逻辑的核心

技术主要涉及到Mysql,文件读取,dbutils,beanutils,mqtt,C3P0连接池 memcache

package net.engyne.mqqt;

import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.List;

import net.engyne.common.Constant;
import net.engyne.dao.RealUserDao;
import net.engyne.dao.RuleFilterDao;
import net.engyne.dao.RulePageEventDao;
import net.engyne.dao.TemplateDao;
import net.engyne.domain.Medium;
import net.engyne.domain.Rule;
import net.engyne.domain.Template;
import net.engyne.domain.User;
import net.sf.json.JSONObject;
import org.apache.log4j.Logger;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttPersistenceException;
import org.eclipse.paho.client.mqttv3.MqttTopic;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

 /**
12  *
13  * Title:Server
14  * Description: 服务器向多个客户端推送主题,即不同客户端可向服务器订阅相同主题
15  * @author shibin
16  * 2016年1月6日下午3:29:28
17  */
 public class MqttServer
 {
	 private static Logger logger = Logger.getLogger(MqttServer.class);
     public static final String HOST = "tcp://xx.xx.net:1883";
     private static final String clientid = "yyyy";
     private MqttClient client;
     private MqttConnectOptions options = new MqttConnectOptions();
     //private String userName = "admin";
     //private String passWord = "public";
     public MqttMessage message;
     private PushCallback callback;
     /*这一部分是memcache相关配置。用于提升性能 优化数据库访问,减小数据库压力
      * protected static MemCachedClient mcc = new MemCachedClient();
     	static{
	       // 设置缓存服务器列表,当使用分布式缓存的时,可以指定多个缓存服务器。这里应该设置为多个不同的服务,我这里将两个服务设置为一样的,大家不要向我学习,呵呵。
	        String[] servers =
	                {
	                        "115.28.190.73:11211",
	                        "115.28.190.73:11211",
	                };
	        // 设置服务器权重
	        Integer[] weights = {3, 2};
	        // 创建一个Socked连接池实例
	        SockIOPool pool = SockIOPool.getInstance();
	        // 向连接池设置服务器和权重
	        pool.setServers(servers);
	        pool.setWeights(weights);
	        // set some TCP settings
	        // disable nagle
	        // set the read timeout to 3 secs
	        // and don't set a connect timeout
	        pool.setNagle(false);
	        pool.setSocketTO(3000);
	        pool.setSocketConnectTO(0);
	        // initialize the connection pool
	        pool.initialize();
	    }
      * */
  /**
   * 用于初始化mqttclient客户端,设置回调函数,同时连接mqtt服务器
   * @throws MqttException
   */
     public MqttServer() throws MqttException
     {
         //MemoryPersistence设置clientid的保存形式,默认为以内存保存
         client = new MqttClient(HOST, clientid, new MemoryPersistence());
         callback = new PushCallback();
         client.setCallback(callback);
         options = new MqttConnectOptions();
         options.setCleanSession(false);
         options.setKeepAliveInterval(60);
         connect();
     }
     /**
      * 连接mqtt消息服务器,同时设置了断开重连的功能
      */
     private void connect()
     {
    	 SimpleDateFormat sdf= new SimpleDateFormat(Constant.DATE_FORMAT_MDYHMS);
    	 logger.info(sdf.format(System.currentTimeMillis()));
    	 /**
         options.setCleanSession(false);
         options.setUserName(userName);
         options.setPassword(passWord.toCharArray());
          设置超时时间
         options.setConnectionTimeout(60);
          设置会话心跳时间
         options.setKeepAliveInterval(10);
    	  * */
         boolean tryConnecting = true;
         while (tryConnecting) {
           try {
             client.connect(options);
           } catch (Exception e1) {
        	   logger.info("Connection attempt failed with '"+e1.getCause()+
                  "'. Retrying.");
           }
           if (client.isConnected()) {
        	   logger.info("Connected.");
             tryConnecting = false;
           } else {
             pause();
           }
         }
     }
     private void pause() {
    	    try {
    	      Thread.sleep(1000);
    	    } catch (InterruptedException e) {
    	      // Error handling goes here...
    	    }
    	  }
     /**
      *
      * @param topic
      * @param message
      * @throws MqttPersistenceException
      * @throws MqttException
      * 用于向特定topic发送message
      */
     public  void publish(MqttTopic topic , MqttMessage message) throws MqttPersistenceException,
             MqttException
     {
         MqttDeliveryToken token = topic.publish(message);
         token.waitForCompletion();
         logger.info("message is published completely! "+ token.isComplete());
     }
     /**
      *
      * @param topic
      * @param qos
      * @throws MqttPersistenceException
      * @throws MqttException
      * 订阅相关主题
      */
     public void subscribe(String topic , int qos) throws MqttPersistenceException,
     		MqttException
     {
    	 client.subscribe(topic, qos);
     }
     /**
      *
      * @throws MqttPersistenceException
      * @throws MqttException
      * 断开连接服务器
      */
     public void disconnect() throws MqttPersistenceException,
		MqttException
	 {
    	 client.disconnect();
     }
     /**
      *
      * @author binshi
      *实现mqttcallback接口,主要用于接收消息后的处理方法
      */
     private class PushCallback implements MqttCallback {
    	 /**
    	  * 断开后 系统会自动调用这个函数,同时在这个函数里进行重连操作
    	  */
 	    public void connectionLost(Throwable cause) {
 	        // 连接丢失后,一般在这里面进行重连
 	    	logger.info("连接断开,可以做重连");
 	        connect();
 	        try {
				subscribe("/engyne_signaling", 2);
			} catch (MqttPersistenceException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			} catch (MqttException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
 	    }
 	    /**
 	     * 消息成功传送后,系统会自动调用此函数,表明成功向topic发送消息
 	     */
 		@Override
		public void deliveryComplete(IMqttDeliveryToken arg0) {
			// TODO Auto-generated method stub
 			logger.info("deliveryComplete---------" + arg0.isComplete());
		}

 		/**
 		 *
 		 * @param template
 		 * @param convid
 		 * @return
 		 * 用于根据不同的消息模板,生成不同的json类型的消息体
 		 */
 		public JSONObject createJsonObject(Template template,String convid)
 		{
 				JSONObject jsonObject=new JSONObject();
				if(template.getTemplate_type().equalsIgnoreCase(Constant.FIELD_MQTT_TEXT) )
				{
					JSONObject content=new JSONObject();
					content.put(Constant.FIELD_MQTT_CONTENT, template.getTemplate_desc());
	    			JSONObject extra=new JSONObject();
	    			extra.put(Constant.FIELD_MQTT_HEADIMGURL, "");
	    			extra.put(Constant.FIELD_MQTT_NICKNAME, "");
	    			extra.put(Constant.FIELD_MQTT_ADMIN, 2);
	    			jsonObject.put(Constant.FIELD_MQTT_CONTENT, content);
	    			jsonObject.put(Constant.FIELD_MQTT_EXTRA, extra);
	    			jsonObject.put(Constant.FIELD_MQTT_TYPE, "text");
	    			jsonObject.put(Constant.FIELD_MQTT_FROM, "_robot_rule");
	    			jsonObject.put(Constant.FIELD_MQTT_CONVID, convid);
	    			jsonObject.put(Constant.FIELD_MQTT_TMPINDEX, System.currentTimeMillis());
	    			jsonObject.put(Constant.FIELD_MQTT_TIME, System.currentTimeMillis()/1000);
				}
				else
				{
	    			JSONObject contentInner=new JSONObject();
	    			JSONObject contentOut=new JSONObject();
	    			JSONObject extra=new JSONObject();
	    			extra.put(Constant.FIELD_MQTT_HEADIMGURL, "");
	    			extra.put(Constant.FIELD_MQTT_NICKNAME, "");
	    			extra.put(Constant.FIELD_MQTT_ADMIN, 2);
	    			contentInner.put(Constant.FIELD_MQTT_TITLE, template.getTemplate_title());
	    			contentInner.put(Constant.FIELD_MQTT_DESC, template.getTemplate_desc());
	    			contentInner.put(Constant.FIELD_MQTT_TEMPLATE, template.getTemplate_index());
	    			contentInner.put(Constant.FIELD_MQTT_PHOTO, template.getPhoto());
	    			contentOut.put(Constant.FIELD_MQTT_CONTENT, contentInner);
	    			jsonObject.put(Constant.FIELD_MQTT_CONTENT, contentOut);
	    			jsonObject.put(Constant.FIELD_MQTT_EXTRA, extra);
	    			jsonObject.put(Constant.FIELD_MQTT_TYPE, template.getTemplate_type());
	    			jsonObject.put(Constant.FIELD_MQTT_FROM,"_robot_rule");
	    			jsonObject.put(Constant.FIELD_MQTT_CONVID, convid);
	    			jsonObject.put(Constant.FIELD_MQTT_TMPINDEX, System.currentTimeMillis());
	    			jsonObject.put(Constant.FIELD_MQTT_TIME, System.currentTimeMillis()/1000);
				}
 			return jsonObject;
 		}
 		/**
 		 * 检测mediums是否存在全为空的元素
 		 * @param mediums
 		 * @return
 		 */
 		public List<Medium> checkNullForMedium(List<Medium> mediums)
 		{
 			for(int k=0;k<mediums.size();k++)
	    		{
	    			if(mediums.get(k).getFilter_key()==null||mediums.get(k).getFilter_key().equalsIgnoreCase(""))
	    			{
	    				logger.info("key为空");
	    				mediums.remove(k);
	    				k--;
	    			}
	    		}
 			return mediums;
 		}
 		/**
 		 * 检测templates内部是否存在全为空的元素
 		 * @param templates
 		 * @return
 		 */
 		public List<Template>  checkNullForTemplate(List<Template> templates)
 		{
 			for(int k=0;k<templates.size();k++)
	    		{
	    			if(templates.get(k).getTemplate_index()==null||templates.get(k).getTemplate_index().equalsIgnoreCase(""))
	    			{
	    				logger.info("key为空");
	    				templates.remove(k);
	    				k--;
	    			}
	    		}
 			return templates;
 		}
 		/**
 		 * 生成clientid 的过滤条件
 		 * @param clientid
 		 * @return
 		 */
 		public Medium generateMedium(String clientid)
 		{
 			Medium medium=new Medium();
	    	medium.setFilter_key("clientid");
	    	medium.setFilter_type("String");
	    	medium.setFilter_value(clientid);
	    	return medium;
 		}
 		/**
 		 *
 		 * @param templates
 		 * @param returntopic
 		 * @param convid
 		 * @param j
 		 * @param once
 		 * @param counter
 		 * @throws MqttPersistenceException
 		 * @throws MqttException
 		 * 根据不同的消息模板发送jsonobject类型的消息内容
 		 */
 		public void PublishTemplates(List<Template> templates,String returntopic,String convid,int j,String once,String counter) throws MqttPersistenceException, MqttException
 		{
 			logger.info("template内容"+templates.get(j));
  			//根据此模板构建消息然后转化为json字符串
			//根据消息模板创建json消息
			JSONObject jsonObject=createJsonObject(templates.get(j),convid);
			logger.info("发送topic message");
  			MqttTopic retopic=client.getTopic(returntopic);
  			logger.info("返回消息主题为"+returntopic);
  			//判断是否需要持续发送,获取data counter中的值以及once 如果once为1在判断counter,counter为1发送,否则不发送
  			 if(once.equalsIgnoreCase("1"))
  			 {
  				 if(counter.equalsIgnoreCase("1"))
  				 {
  					retopic.publish(jsonObject.toString().getBytes(), 0, false);
  					logger.info("message 发送成功");
  				 }
  				 else
  				 {
  					logger.info("message 不重复发送");
  				 }
  			 }
  			 else
  			 {
  				retopic.publish(jsonObject.toString().getBytes(), 0, false);
  				logger.info("message 发送成功");
  			 }
 		}
 		@Override
 		/**
 		 *
 		 */
 		public void messageArrived(String topic, MqttMessage message) throws Exception
 				{
 					logger.info(topic);
 			    	SimpleDateFormat sdf= new SimpleDateFormat(Constant.DATE_FORMAT_MDYHMS);
 			    	logger.info(sdf.format(System.currentTimeMillis()));
 			    	logger.info("接收消息主题 : " + topic);
 			    	logger.info("接收消息Qos : " + message.getQos());
 			    	logger.info("接收消息内容 : " + new String(message.getPayload()));
 				     //1 抽取事件信令消息
 				    String messagejudge=new String(message.getPayload());
 				    if(messagejudge.contains("signaling")&&messagejudge.contains("event"))
 				    {
 				    	logger.info("进入信令消息内部处理函数");
 				    	//2将信令消息解析为信令对象
 				    	JSONObject jo=new JSONObject();
						try {
							 jo=JSONObject.fromObject(messagejudge);
						} catch (Exception e) {
							e.printStackTrace();
						}
 				    	//提取clientid、event、pageid , convid,appid
						String returntopic=jo.getString("topic");
						String clientid=jo.getString("from");
						String event=jo.getJSONObject("content").getJSONObject("signaling").getString("event");
						String pageid=jo.getJSONObject("content").getJSONObject("signaling").getString("pageid");
						String appid=jo.getJSONObject("content").getJSONObject("signaling").getString("appid");
						String convid=jo.getString("convid");
						String counter=jo.getJSONObject("content").getJSONObject("signaling").getJSONObject("data").getString("counter");
 				    	//做联合查询,找出满足条件的rule集合
						logger.info("查询满足event和pageid的rule "+"event "+event+ "pageid "+pageid+" counter "+counter);
 				    	List<Rule> rules=new ArrayList<Rule>();
 				    	rules=RulePageEventDao.findRules(event, pageid,appid);
 				    	logger.info("rule查找成功 个数为"+"rulesize"+ rules.size());
 				    	//遍历rules 找到对应每个rule的filters,根据clientid以及过滤条件查询view_user_full如果能查询到值,就给他返回rule对应的模板消息
 				    	Medium medium= generateMedium( clientid);
 				    	if(rules!=null)
 				    	{
 				    		for(int i=0;i<rules.size();i++)
 	 				    	{
 				    			logger.info("ruleindex 为"+rules.get(i).getRule_index());
 	 				    		String once=rules.get(i).getOnce();
 	 				    		logger.info("查找rule对应的filter");
 	 				    		List<Medium> mediums =RuleFilterDao.findFilters(rules.get(i).getRule_index());
 	 				    		logger.info(mediums.size());
 	 				    		//一下部分是为了,避免在rule和filter联合做视图的时候,rule没有对应的filter 视图上会显示为null,这个时候回查询出大量的重复的值为null的filter
 	 				    		//去除为null以及为空的情况
 	 				    		mediums=checkNullForMedium( mediums);
 	 				    		logger.info("过滤条件个数"+mediums.size());
 	 				    		logger.info("过滤条件为如下"+mediums.toString());
	 				    		mediums.add(medium);
	 				    		logger.info("查找满足过滤条件的user");
	 				    		List<User> users=RealUserDao.fuzzyQuery(mediums);
	 				    		logger.info("users的个数大小为"+users.size());
	 				    		if(users.size()>0)
	 				    		{
	 				    			String ruleindex=rules.get(i).getRule_index();
	 				    			//如果能查出结果 就找到这个rule对应的消息模板
	 				    			logger.info("查找rule对应的template");
	 				    			List<Template> templates=TemplateDao.findTemplate(ruleindex);
	 				    			//下面是对template的空处理,避免查询到大量template为空的情况,这种情况要删除掉,不是我们想要的template
	 				    			templates=checkNullForTemplate( templates);
	 				    			logger.info(templates.size()+" templatessize");
	 				    			for(int j=0;j<templates.size();j++)
	 				    			{
	 				    				PublishTemplates( templates, returntopic, convid, j, once, counter);
	 				    			}
	 				    		}
 	 				    	}
 				    	}
 				    }
 				}
 	}
     /**
      *
      * @param args
      * @throws MqttException
      * 整个工程从这里开始执行,生成可执行jar包,这个设置为主类。
      */
     public static void main(String[] args) throws MqttException
     {
    	 MqttServer server = new MqttServer();
         server.message = new MqttMessage();
         /**
         server.message.setQos(2);
         server.message.setRetained(false);
         server.message.setPayload("给客户端124推送的信息".getBytes());
         server.subscribe("/engyne/1/7/169573fcbc96a816281192222", 2);
         */
         server.subscribe(Constant.TOPIC_MQTT_SIGNAL, 2);
         logger.info(server.message.isRetained() + "------ratained状态");
	}

}

下面都是mysql数据库相关操作,其他的省略吧 就列出一个最复杂的模糊查询

package net.engyne.dao;

import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;

import net.engyne.domain.Medium;
import net.engyne.domain.User;
import net.engyne.mqqt.MqttServer;

import org.apache.commons.dbutils.QueryRunner;
import org.apache.commons.dbutils.handlers.BeanListHandler;
import org.apache.log4j.Logger;

import cn.itcast.jdbc.TxQueryRunner;
public class RealUserDao {

	private static Logger logger = Logger.getLogger(MqttServer.class);
	/**
	 * 处理range类型的过滤条件,且begin和end都不为空的情况
	 * @param sql
	 * @param mediums
	 * @param i
	 * @return
	 */
	public static String RangeWithBeginEnd(String sql,List<Medium> mediums,int i)
	{
		if (!sql.trim().endsWith("and")&& !sql.trim().endsWith("where"))
		{
			sql = sql + " and " + mediums.get(i).getFilter_key()+ " between " + mediums.get(i).getBegin()+ " and " + mediums.get(i).getEnd() + " ";
		}
		else
		{
			sql = sql + " " + mediums.get(i).getFilter_key()+ " between " + mediums.get(i).getBegin()+ " and " + mediums.get(i).getEnd() + " ";
		}
		return sql;
	}
	/**
	 * 处理range类型的过滤条件,begin不为空,end为空
	 * @param sql
	 * @param mediums
	 * @param i
	 * @return
	 */
	public static String RangeWithBegin(String sql,List<Medium> mediums,int i)
	{
		if (!sql.trim().endsWith("and")&& !sql.trim().endsWith("where"))
		{
			sql = sql + " and  " + mediums.get(i).getFilter_key()+ " > " + mediums.get(i).getBegin() + " ";
		} else
		{
			sql = sql + "   " + mediums.get(i).getFilter_key()+ " > " + mediums.get(i).getBegin() + " ";
		}
		return sql;
	}
	/**
	 * 处理range类型的过滤条件,begin为空,end不为空
	 * @param sql
	 * @param mediums
	 * @param i
	 * @return
	 */
	public static String RangeWithEnd(String sql,List<Medium> mediums,int i)
	{
		if (!sql.trim().endsWith("and")&& !sql.trim().endsWith("where"))
		{
			sql = sql + " and " + mediums.get(i).getFilter_key()+ " <  " + mediums.get(i).getEnd() + " ";
		}
		else
		{
			sql = sql + " " + mediums.get(i).getFilter_key()+ " <  " + mediums.get(i).getEnd() + " ";
		}
		return sql;
	}
	/**
	 * 处理fuzzy类型的过滤条件
	 * @param sql
	 * @param mediums
	 * @param i
	 * @return
	 */
	public static String Fuzzy(String sql,List<Medium> mediums,int i)
	{
		if (!sql.trim().endsWith("and")&& !sql.trim().endsWith("where"))
		{
			sql = sql + " and " + mediums.get(i).getFilter_key() + " "+ "like " + "?" + " and ";
		} else
		{
			sql = sql + " " + mediums.get(i).getFilter_key() + " "+ "like " + "?" + " and ";
		}

		return sql;
	}
	/**
	 * 处理单值类型的过滤条件
	 * @param sql
	 * @param mediums
	 * @param i
	 * @return
	 */
	public static String Value(String sql,List<Medium> mediums,int i)
	{
		if (!sql.trim().endsWith("and")&& !sql.trim().endsWith("where"))
		{
			sql = sql + " and " + mediums.get(i).getFilter_key()+ " =? and ";
		}
		else
		{
			sql = sql + " " + mediums.get(i).getFilter_key()+ " =? and ";
		}

		return sql;
	}
	/**
	 * 处理标签sql查询
	 * @param sql
	 * @param temp
	 * @param tags
	 * @param tagnames
	 * @return
	 */
	public static String Tag(String sql,List<String> temp ,List<String> tags,List<String> tagnames )
	{
		if (!sql.trim().endsWith("and") && !sql.trim().endsWith("where"))
		{
			sql = sql + " and ( ";
		} else {
			sql = sql + "  ( ";
		}
		for (int i = 0; i < tags.size(); i++)
		{
			temp.add(tags.get(i));
			sql = sql + "" + tagnames.get(i) + "=? or ";
		}
		sql = sql.substring(0, sql.length() - 3);
		sql = sql + " )";

		return sql;
	}
	/**
	 * 做模糊查询的主函数
	 * @param mediums
	 * @return
	 */
	public static List<User> fuzzyQuery(List<Medium> mediums) {
		 QueryRunner qr = new TxQueryRunner();
		logger.info(mediums.toString());
		String sql = "SELECT DISTINCT(userid),username,nickname,usersex,userage,usertype,registtime,lastseetime,lastpageindex,provinceid,channelid,terminalid,appindex,email,phonenumber FROM view_user_full  where";
		Object[] params = {};
		// System.out.println("mediums size"+mediums.size());
		List<String> temp = new ArrayList<String>();
		List<String> tags = new ArrayList<String>();
		List<String> tagnames = new ArrayList<String>();
		for (int i = 0; i < mediums.size(); i++)
		{
			if (mediums.get(i).getFilter_type().equalsIgnoreCase("range"))
			{
				logger.info("进入range");
				String begin=mediums.get(i).getBegin();
				String end=mediums.get(i).getEnd();
				if ((begin != null && !begin.equalsIgnoreCase(""))&& (end != null && !end.equalsIgnoreCase("")))
				{
					sql=RangeWithBeginEnd( sql, mediums, i);
				}
				else if ((begin != null && !begin.equalsIgnoreCase(""))&& (end == null || end.equalsIgnoreCase("")))
				{
					sql=RangeWithBegin( sql, mediums, i);
				}
				else if ((begin == null || begin.equalsIgnoreCase(""))&& (end != null && !end.equalsIgnoreCase("")))
				{
					sql=RangeWithEnd( sql, mediums, i);
				}
			}
			else if (mediums.get(i).getFilter_type().equalsIgnoreCase("multi"))
			{
				tags.add(mediums.get(i).getFilter_value());
				tagnames.add(mediums.get(i).getFilter_key());
			}
			else if (mediums.get(i).getFilter_type().equalsIgnoreCase("fuzzy"))
			{
				sql=Fuzzy(sql,mediums,i);
				temp.add("%" + mediums.get(i).getFilter_value().trim() + "%");
			}
			else
			{
				sql=Value(sql,mediums,i);
				temp.add(mediums.get(i).getFilter_value());

			}
		}
		if (sql.trim().endsWith("and"))
		{
			sql = sql.substring(0, sql.length() - 5);
		}
		// 只有标签可以进行复合查询,其他参数只限一个key一个值
		if (tags.size() > 0 && tagnames.size() > 0)
		{
			sql=Tag(sql,temp,tags,tagnames);
		}
		params = temp.toArray();
		logger.info("end of temp to params" + params.length);
		List<User> users = new ArrayList<User>();
		logger.info("last sql " + sql);
		try {
			users = qr.query(sql, new BeanListHandler<User>(User.class), params);
		} catch (SQLException e) {
			logger.error("realUserDao查询异常",e);

		}
		return users;
	}
}

实体类就没必要展示了,都是一些简单的属性

下面是文件的读取主要是配置文件的读取

package net.engyne.util;

import java.io.BufferedInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.InputStream;
import java.util.Properties;

import org.apache.log4j.Logger;

import net.engyne.mqqt.MqttServer;

public class FileUtil implements java.io.Serializable {
	 /**
	 *
	 */
	private static final long serialVersionUID = 1L;
	private static Logger logger = Logger.getLogger(MqttServer.class);
	 public static String readValue(String filePath, String key) {
		Properties props = new Properties();
		try {
			String url = new File("").getAbsolutePath();
			InputStream in = new BufferedInputStream(new FileInputStream(url+ "/net/engyne/util/" + filePath));
			props.load(in);
			String value = props.getProperty(key);
			logger.error(key + value);
			return value;
		} catch (Exception e) {
			e.printStackTrace();
			return null;
		}
	}
}
时间: 2024-10-25 15:32:08

java后端IM消息推送服务开发——规则的相关文章

java后端IM消息推送服务开发——协议

最近在一家saas企业使用Mqtt开发IM消息推送服务,把开发中的一些问题记录下来,项目仍在商用中,完整的消息服务包括4个模块---协议protocol,信令Signal,规则Rule,状态Status,这个主题主要是协议protocol部分. 主要技术涉及到MongoDB,webservice,httpclient,Mqtt等 protocol分为四个模块类来实现,当然这是为了以后的扩展性比较好 首先看一下我们的主类,主要是mqtt基础方法的一个框架 public class MqttProt

atitit.极光消息推送服务器端开发实现推送&#160;&#160;jpush&#160;v3.&#160;总结o7p

atitit.极光消息推送服务器端开发实现推送  jpush v3. 总结o7p 1. 推送所设计到底功能1 1.1. 内容压缩1 1.2. 多引擎1 2. reg  ,设置appkey and pwdkey1 3. 下载server  sdk   v31 4. push推送样例1 5. Code3 1. 推送所设计到底功能 1.1. 内容压缩 1.2. 多引擎 2. reg  ,设置appkey and pwdkey 3. 下载server  sdk   v3 https://github.c

Android推送服务开发

由于公司之前使用的手机客户端推送服务是极光推送,给公司造成一年几十万的服务费,因此,公司决定开发自己的一套推送服务,初步的技术选型是: 服务端:netty4 关于netty框架在我的下面的博客里面我整理了相关资料,本来还有一些关于mina的由于时间原因暂时没整理出来. 为了便于自己测试,自己动手实现了如何使用netty完成服务端消息推送以及在Android客户端如何将接受到的信息显示在通知栏,整体思路大概是这样的: 服务端使用netty框架开启基于TCP监听服务. 客户端发起TCP连接(不关闭,

Android实战——第三方服务之Bmob后端云的推送服务的集成和使用(三)

第三方服务之Bmob后端云的推送服务的集成和使用(三) 事先说明:这里的一切操作都是在集成了BmobSDK之后实现的,如果对Bmob还不了解的话,请关注我第一篇Bmob文章 步骤一:推送服务的集成 在app的build.gradle中添加compile依赖文件: 在manifests文件中配置权限: 在manifests文件中添加需要的配置: 创建一个Receiver接受推送消息: 在Activity的onCreate方法中初始化BmobPush: 在Bmob后台管理中设置: 步骤二:推送服务的

消息推送服务

消息推送服务 服务器推送目前流行就是私信.发布/订阅等模式,基本上都是基于会话映射,消息对列等技术实现的:高性能.分布式可以如下解决:会话映射可采用redis cluster等技术实现,消息对列可使用kafka等分布式消息队列方案实现. APM.Server基于简单 1 static ConcurrentDictionary<string, Session> _sessionDic = new ConcurrentDictionary<string, Session>(); 和 1

26.app后端怎么架设推送服务

推送服务已经是app的标配了.架设推送服务,除了可以使用第三方服务商外,也有大量的开源技术可以选择. 现在推送主要分两块,android推送和ios推送,在下面分别论述: 1.    Android推送 Android手机由于没有系统的限制,当app进入了后台后,也能运行服务,所以android的推送可以基于长连接,这就注定了android的推送服务器和一般的app后端是不一样,技术细节上,架构上也不一样,幸好,现在有大量的开源软件可以轻松地实现推送. 下面深入研究过的开源推送软件:gopush

基于Qt移动应用的消息推送服务原理与应用

说到移动应用,大家都觉得移动嘛,当然是Java和Object-c来做啦,什么推送啊,各种系统调用啊,其实不然?如果你了解Qt, 你就知道我说的不然,也有所道理. 说道几点 一.目前Android的移动的消息.通知推送 1)轮询(Pull)方式:应用程序应当阶段性的与服务器进行连接并查询是否有新的消息到达,你必须自己实现与服务器之间的通信,例如消息排队等.而且你还要考虑轮询的频率,如果太慢可能导致某些消息的延迟,如果太快,则会大量消耗网络带宽和电池. 2)SMS(Push)方式:在Android平

SignalR Self Host+MVC等多端消息推送服务(2)

一.概述 上次的文章中我们简单的实现了SignalR自托管的服务端,今天我们来实现控制台程序调用SignalR服务端来实现推送信息,由于之前我们是打算做审批消息推送,所以我们的demo方向是做指定人发送信息,至于做聊天消息和全局广播,这里就不在进行演示了. 二.创建控制台客户端 1.在SignalRProject解决方案下新建一个名为Clinet的控制台 2.在程序包管理控制台中输入以下命令 1 Install-Package Microsoft.AspNet.SignalR.Client 3.

SignalR Self Host+MVC等多端消息推送服务(1)

一.概述 由于项目需要,最近公司项目里有个模块功能,需要使用到即时获得审批通知:原本的设计方案是使用ajax对服务器进行定时轮询查询,刚刚开始数据量和使用量不大的时候还好,后来使用量的增加和系统中各种业务的复杂度增加,服务器的压力也越来越大,于是我想使用消息推送的方式替换掉ajax轮询查询,当有审批提交时,调用推送方法,将消息推送到下一审批人那,这样就减低了服务器的压力. Signal 是微软支持的一个运行在.NET平台上的 html websocket 框架.它出现的主要目的是实现服务器主动推