这一部分主要是负责智能消息推送,根据不同公司的不同产品的不同页面的不同事件的不同用户推送不同的消息,这也是整个业务逻辑的核心
技术主要涉及到Mysql,文件读取,dbutils,beanutils,mqtt,C3P0连接池 memcache
package net.engyne.mqqt; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.List; import net.engyne.common.Constant; import net.engyne.dao.RealUserDao; import net.engyne.dao.RuleFilterDao; import net.engyne.dao.RulePageEventDao; import net.engyne.dao.TemplateDao; import net.engyne.domain.Medium; import net.engyne.domain.Rule; import net.engyne.domain.Template; import net.engyne.domain.User; import net.sf.json.JSONObject; import org.apache.log4j.Logger; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.MqttPersistenceException; import org.eclipse.paho.client.mqttv3.MqttTopic; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; /** 12 * 13 * Title:Server 14 * Description: 服务器向多个客户端推送主题,即不同客户端可向服务器订阅相同主题 15 * @author shibin 16 * 2016年1月6日下午3:29:28 17 */ public class MqttServer { private static Logger logger = Logger.getLogger(MqttServer.class); public static final String HOST = "tcp://xx.xx.net:1883"; private static final String clientid = "yyyy"; private MqttClient client; private MqttConnectOptions options = new MqttConnectOptions(); //private String userName = "admin"; //private String passWord = "public"; public MqttMessage message; private PushCallback callback; /*这一部分是memcache相关配置。用于提升性能 优化数据库访问,减小数据库压力 * protected static MemCachedClient mcc = new MemCachedClient(); static{ // 设置缓存服务器列表,当使用分布式缓存的时,可以指定多个缓存服务器。这里应该设置为多个不同的服务,我这里将两个服务设置为一样的,大家不要向我学习,呵呵。 String[] servers = { "115.28.190.73:11211", "115.28.190.73:11211", }; // 设置服务器权重 Integer[] weights = {3, 2}; // 创建一个Socked连接池实例 SockIOPool pool = SockIOPool.getInstance(); // 向连接池设置服务器和权重 pool.setServers(servers); pool.setWeights(weights); // set some TCP settings // disable nagle // set the read timeout to 3 secs // and don't set a connect timeout pool.setNagle(false); pool.setSocketTO(3000); pool.setSocketConnectTO(0); // initialize the connection pool pool.initialize(); } * */ /** * 用于初始化mqttclient客户端,设置回调函数,同时连接mqtt服务器 * @throws MqttException */ public MqttServer() throws MqttException { //MemoryPersistence设置clientid的保存形式,默认为以内存保存 client = new MqttClient(HOST, clientid, new MemoryPersistence()); callback = new PushCallback(); client.setCallback(callback); options = new MqttConnectOptions(); options.setCleanSession(false); options.setKeepAliveInterval(60); connect(); } /** * 连接mqtt消息服务器,同时设置了断开重连的功能 */ private void connect() { SimpleDateFormat sdf= new SimpleDateFormat(Constant.DATE_FORMAT_MDYHMS); logger.info(sdf.format(System.currentTimeMillis())); /** options.setCleanSession(false); options.setUserName(userName); options.setPassword(passWord.toCharArray()); 设置超时时间 options.setConnectionTimeout(60); 设置会话心跳时间 options.setKeepAliveInterval(10); * */ boolean tryConnecting = true; while (tryConnecting) { try { client.connect(options); } catch (Exception e1) { logger.info("Connection attempt failed with '"+e1.getCause()+ "'. Retrying."); } if (client.isConnected()) { logger.info("Connected."); tryConnecting = false; } else { pause(); } } } private void pause() { try { Thread.sleep(1000); } catch (InterruptedException e) { // Error handling goes here... } } /** * * @param topic * @param message * @throws MqttPersistenceException * @throws MqttException * 用于向特定topic发送message */ public void publish(MqttTopic topic , MqttMessage message) throws MqttPersistenceException, MqttException { MqttDeliveryToken token = topic.publish(message); token.waitForCompletion(); logger.info("message is published completely! "+ token.isComplete()); } /** * * @param topic * @param qos * @throws MqttPersistenceException * @throws MqttException * 订阅相关主题 */ public void subscribe(String topic , int qos) throws MqttPersistenceException, MqttException { client.subscribe(topic, qos); } /** * * @throws MqttPersistenceException * @throws MqttException * 断开连接服务器 */ public void disconnect() throws MqttPersistenceException, MqttException { client.disconnect(); } /** * * @author binshi *实现mqttcallback接口,主要用于接收消息后的处理方法 */ private class PushCallback implements MqttCallback { /** * 断开后 系统会自动调用这个函数,同时在这个函数里进行重连操作 */ public void connectionLost(Throwable cause) { // 连接丢失后,一般在这里面进行重连 logger.info("连接断开,可以做重连"); connect(); try { subscribe("/engyne_signaling", 2); } catch (MqttPersistenceException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (MqttException e) { // TODO Auto-generated catch block e.printStackTrace(); } } /** * 消息成功传送后,系统会自动调用此函数,表明成功向topic发送消息 */ @Override public void deliveryComplete(IMqttDeliveryToken arg0) { // TODO Auto-generated method stub logger.info("deliveryComplete---------" + arg0.isComplete()); } /** * * @param template * @param convid * @return * 用于根据不同的消息模板,生成不同的json类型的消息体 */ public JSONObject createJsonObject(Template template,String convid) { JSONObject jsonObject=new JSONObject(); if(template.getTemplate_type().equalsIgnoreCase(Constant.FIELD_MQTT_TEXT) ) { JSONObject content=new JSONObject(); content.put(Constant.FIELD_MQTT_CONTENT, template.getTemplate_desc()); JSONObject extra=new JSONObject(); extra.put(Constant.FIELD_MQTT_HEADIMGURL, ""); extra.put(Constant.FIELD_MQTT_NICKNAME, ""); extra.put(Constant.FIELD_MQTT_ADMIN, 2); jsonObject.put(Constant.FIELD_MQTT_CONTENT, content); jsonObject.put(Constant.FIELD_MQTT_EXTRA, extra); jsonObject.put(Constant.FIELD_MQTT_TYPE, "text"); jsonObject.put(Constant.FIELD_MQTT_FROM, "_robot_rule"); jsonObject.put(Constant.FIELD_MQTT_CONVID, convid); jsonObject.put(Constant.FIELD_MQTT_TMPINDEX, System.currentTimeMillis()); jsonObject.put(Constant.FIELD_MQTT_TIME, System.currentTimeMillis()/1000); } else { JSONObject contentInner=new JSONObject(); JSONObject contentOut=new JSONObject(); JSONObject extra=new JSONObject(); extra.put(Constant.FIELD_MQTT_HEADIMGURL, ""); extra.put(Constant.FIELD_MQTT_NICKNAME, ""); extra.put(Constant.FIELD_MQTT_ADMIN, 2); contentInner.put(Constant.FIELD_MQTT_TITLE, template.getTemplate_title()); contentInner.put(Constant.FIELD_MQTT_DESC, template.getTemplate_desc()); contentInner.put(Constant.FIELD_MQTT_TEMPLATE, template.getTemplate_index()); contentInner.put(Constant.FIELD_MQTT_PHOTO, template.getPhoto()); contentOut.put(Constant.FIELD_MQTT_CONTENT, contentInner); jsonObject.put(Constant.FIELD_MQTT_CONTENT, contentOut); jsonObject.put(Constant.FIELD_MQTT_EXTRA, extra); jsonObject.put(Constant.FIELD_MQTT_TYPE, template.getTemplate_type()); jsonObject.put(Constant.FIELD_MQTT_FROM,"_robot_rule"); jsonObject.put(Constant.FIELD_MQTT_CONVID, convid); jsonObject.put(Constant.FIELD_MQTT_TMPINDEX, System.currentTimeMillis()); jsonObject.put(Constant.FIELD_MQTT_TIME, System.currentTimeMillis()/1000); } return jsonObject; } /** * 检测mediums是否存在全为空的元素 * @param mediums * @return */ public List<Medium> checkNullForMedium(List<Medium> mediums) { for(int k=0;k<mediums.size();k++) { if(mediums.get(k).getFilter_key()==null||mediums.get(k).getFilter_key().equalsIgnoreCase("")) { logger.info("key为空"); mediums.remove(k); k--; } } return mediums; } /** * 检测templates内部是否存在全为空的元素 * @param templates * @return */ public List<Template> checkNullForTemplate(List<Template> templates) { for(int k=0;k<templates.size();k++) { if(templates.get(k).getTemplate_index()==null||templates.get(k).getTemplate_index().equalsIgnoreCase("")) { logger.info("key为空"); templates.remove(k); k--; } } return templates; } /** * 生成clientid 的过滤条件 * @param clientid * @return */ public Medium generateMedium(String clientid) { Medium medium=new Medium(); medium.setFilter_key("clientid"); medium.setFilter_type("String"); medium.setFilter_value(clientid); return medium; } /** * * @param templates * @param returntopic * @param convid * @param j * @param once * @param counter * @throws MqttPersistenceException * @throws MqttException * 根据不同的消息模板发送jsonobject类型的消息内容 */ public void PublishTemplates(List<Template> templates,String returntopic,String convid,int j,String once,String counter) throws MqttPersistenceException, MqttException { logger.info("template内容"+templates.get(j)); //根据此模板构建消息然后转化为json字符串 //根据消息模板创建json消息 JSONObject jsonObject=createJsonObject(templates.get(j),convid); logger.info("发送topic message"); MqttTopic retopic=client.getTopic(returntopic); logger.info("返回消息主题为"+returntopic); //判断是否需要持续发送,获取data counter中的值以及once 如果once为1在判断counter,counter为1发送,否则不发送 if(once.equalsIgnoreCase("1")) { if(counter.equalsIgnoreCase("1")) { retopic.publish(jsonObject.toString().getBytes(), 0, false); logger.info("message 发送成功"); } else { logger.info("message 不重复发送"); } } else { retopic.publish(jsonObject.toString().getBytes(), 0, false); logger.info("message 发送成功"); } } @Override /** * */ public void messageArrived(String topic, MqttMessage message) throws Exception { logger.info(topic); SimpleDateFormat sdf= new SimpleDateFormat(Constant.DATE_FORMAT_MDYHMS); logger.info(sdf.format(System.currentTimeMillis())); logger.info("接收消息主题 : " + topic); logger.info("接收消息Qos : " + message.getQos()); logger.info("接收消息内容 : " + new String(message.getPayload())); //1 抽取事件信令消息 String messagejudge=new String(message.getPayload()); if(messagejudge.contains("signaling")&&messagejudge.contains("event")) { logger.info("进入信令消息内部处理函数"); //2将信令消息解析为信令对象 JSONObject jo=new JSONObject(); try { jo=JSONObject.fromObject(messagejudge); } catch (Exception e) { e.printStackTrace(); } //提取clientid、event、pageid , convid,appid String returntopic=jo.getString("topic"); String clientid=jo.getString("from"); String event=jo.getJSONObject("content").getJSONObject("signaling").getString("event"); String pageid=jo.getJSONObject("content").getJSONObject("signaling").getString("pageid"); String appid=jo.getJSONObject("content").getJSONObject("signaling").getString("appid"); String convid=jo.getString("convid"); String counter=jo.getJSONObject("content").getJSONObject("signaling").getJSONObject("data").getString("counter"); //做联合查询,找出满足条件的rule集合 logger.info("查询满足event和pageid的rule "+"event "+event+ "pageid "+pageid+" counter "+counter); List<Rule> rules=new ArrayList<Rule>(); rules=RulePageEventDao.findRules(event, pageid,appid); logger.info("rule查找成功 个数为"+"rulesize"+ rules.size()); //遍历rules 找到对应每个rule的filters,根据clientid以及过滤条件查询view_user_full如果能查询到值,就给他返回rule对应的模板消息 Medium medium= generateMedium( clientid); if(rules!=null) { for(int i=0;i<rules.size();i++) { logger.info("ruleindex 为"+rules.get(i).getRule_index()); String once=rules.get(i).getOnce(); logger.info("查找rule对应的filter"); List<Medium> mediums =RuleFilterDao.findFilters(rules.get(i).getRule_index()); logger.info(mediums.size()); //一下部分是为了,避免在rule和filter联合做视图的时候,rule没有对应的filter 视图上会显示为null,这个时候回查询出大量的重复的值为null的filter //去除为null以及为空的情况 mediums=checkNullForMedium( mediums); logger.info("过滤条件个数"+mediums.size()); logger.info("过滤条件为如下"+mediums.toString()); mediums.add(medium); logger.info("查找满足过滤条件的user"); List<User> users=RealUserDao.fuzzyQuery(mediums); logger.info("users的个数大小为"+users.size()); if(users.size()>0) { String ruleindex=rules.get(i).getRule_index(); //如果能查出结果 就找到这个rule对应的消息模板 logger.info("查找rule对应的template"); List<Template> templates=TemplateDao.findTemplate(ruleindex); //下面是对template的空处理,避免查询到大量template为空的情况,这种情况要删除掉,不是我们想要的template templates=checkNullForTemplate( templates); logger.info(templates.size()+" templatessize"); for(int j=0;j<templates.size();j++) { PublishTemplates( templates, returntopic, convid, j, once, counter); } } } } } } } /** * * @param args * @throws MqttException * 整个工程从这里开始执行,生成可执行jar包,这个设置为主类。 */ public static void main(String[] args) throws MqttException { MqttServer server = new MqttServer(); server.message = new MqttMessage(); /** server.message.setQos(2); server.message.setRetained(false); server.message.setPayload("给客户端124推送的信息".getBytes()); server.subscribe("/engyne/1/7/169573fcbc96a816281192222", 2); */ server.subscribe(Constant.TOPIC_MQTT_SIGNAL, 2); logger.info(server.message.isRetained() + "------ratained状态"); } }
下面都是mysql数据库相关操作,其他的省略吧 就列出一个最复杂的模糊查询
package net.engyne.dao; import java.sql.SQLException; import java.util.ArrayList; import java.util.List; import net.engyne.domain.Medium; import net.engyne.domain.User; import net.engyne.mqqt.MqttServer; import org.apache.commons.dbutils.QueryRunner; import org.apache.commons.dbutils.handlers.BeanListHandler; import org.apache.log4j.Logger; import cn.itcast.jdbc.TxQueryRunner; public class RealUserDao { private static Logger logger = Logger.getLogger(MqttServer.class); /** * 处理range类型的过滤条件,且begin和end都不为空的情况 * @param sql * @param mediums * @param i * @return */ public static String RangeWithBeginEnd(String sql,List<Medium> mediums,int i) { if (!sql.trim().endsWith("and")&& !sql.trim().endsWith("where")) { sql = sql + " and " + mediums.get(i).getFilter_key()+ " between " + mediums.get(i).getBegin()+ " and " + mediums.get(i).getEnd() + " "; } else { sql = sql + " " + mediums.get(i).getFilter_key()+ " between " + mediums.get(i).getBegin()+ " and " + mediums.get(i).getEnd() + " "; } return sql; } /** * 处理range类型的过滤条件,begin不为空,end为空 * @param sql * @param mediums * @param i * @return */ public static String RangeWithBegin(String sql,List<Medium> mediums,int i) { if (!sql.trim().endsWith("and")&& !sql.trim().endsWith("where")) { sql = sql + " and " + mediums.get(i).getFilter_key()+ " > " + mediums.get(i).getBegin() + " "; } else { sql = sql + " " + mediums.get(i).getFilter_key()+ " > " + mediums.get(i).getBegin() + " "; } return sql; } /** * 处理range类型的过滤条件,begin为空,end不为空 * @param sql * @param mediums * @param i * @return */ public static String RangeWithEnd(String sql,List<Medium> mediums,int i) { if (!sql.trim().endsWith("and")&& !sql.trim().endsWith("where")) { sql = sql + " and " + mediums.get(i).getFilter_key()+ " < " + mediums.get(i).getEnd() + " "; } else { sql = sql + " " + mediums.get(i).getFilter_key()+ " < " + mediums.get(i).getEnd() + " "; } return sql; } /** * 处理fuzzy类型的过滤条件 * @param sql * @param mediums * @param i * @return */ public static String Fuzzy(String sql,List<Medium> mediums,int i) { if (!sql.trim().endsWith("and")&& !sql.trim().endsWith("where")) { sql = sql + " and " + mediums.get(i).getFilter_key() + " "+ "like " + "?" + " and "; } else { sql = sql + " " + mediums.get(i).getFilter_key() + " "+ "like " + "?" + " and "; } return sql; } /** * 处理单值类型的过滤条件 * @param sql * @param mediums * @param i * @return */ public static String Value(String sql,List<Medium> mediums,int i) { if (!sql.trim().endsWith("and")&& !sql.trim().endsWith("where")) { sql = sql + " and " + mediums.get(i).getFilter_key()+ " =? and "; } else { sql = sql + " " + mediums.get(i).getFilter_key()+ " =? and "; } return sql; } /** * 处理标签sql查询 * @param sql * @param temp * @param tags * @param tagnames * @return */ public static String Tag(String sql,List<String> temp ,List<String> tags,List<String> tagnames ) { if (!sql.trim().endsWith("and") && !sql.trim().endsWith("where")) { sql = sql + " and ( "; } else { sql = sql + " ( "; } for (int i = 0; i < tags.size(); i++) { temp.add(tags.get(i)); sql = sql + "" + tagnames.get(i) + "=? or "; } sql = sql.substring(0, sql.length() - 3); sql = sql + " )"; return sql; } /** * 做模糊查询的主函数 * @param mediums * @return */ public static List<User> fuzzyQuery(List<Medium> mediums) { QueryRunner qr = new TxQueryRunner(); logger.info(mediums.toString()); String sql = "SELECT DISTINCT(userid),username,nickname,usersex,userage,usertype,registtime,lastseetime,lastpageindex,provinceid,channelid,terminalid,appindex,email,phonenumber FROM view_user_full where"; Object[] params = {}; // System.out.println("mediums size"+mediums.size()); List<String> temp = new ArrayList<String>(); List<String> tags = new ArrayList<String>(); List<String> tagnames = new ArrayList<String>(); for (int i = 0; i < mediums.size(); i++) { if (mediums.get(i).getFilter_type().equalsIgnoreCase("range")) { logger.info("进入range"); String begin=mediums.get(i).getBegin(); String end=mediums.get(i).getEnd(); if ((begin != null && !begin.equalsIgnoreCase(""))&& (end != null && !end.equalsIgnoreCase(""))) { sql=RangeWithBeginEnd( sql, mediums, i); } else if ((begin != null && !begin.equalsIgnoreCase(""))&& (end == null || end.equalsIgnoreCase(""))) { sql=RangeWithBegin( sql, mediums, i); } else if ((begin == null || begin.equalsIgnoreCase(""))&& (end != null && !end.equalsIgnoreCase(""))) { sql=RangeWithEnd( sql, mediums, i); } } else if (mediums.get(i).getFilter_type().equalsIgnoreCase("multi")) { tags.add(mediums.get(i).getFilter_value()); tagnames.add(mediums.get(i).getFilter_key()); } else if (mediums.get(i).getFilter_type().equalsIgnoreCase("fuzzy")) { sql=Fuzzy(sql,mediums,i); temp.add("%" + mediums.get(i).getFilter_value().trim() + "%"); } else { sql=Value(sql,mediums,i); temp.add(mediums.get(i).getFilter_value()); } } if (sql.trim().endsWith("and")) { sql = sql.substring(0, sql.length() - 5); } // 只有标签可以进行复合查询,其他参数只限一个key一个值 if (tags.size() > 0 && tagnames.size() > 0) { sql=Tag(sql,temp,tags,tagnames); } params = temp.toArray(); logger.info("end of temp to params" + params.length); List<User> users = new ArrayList<User>(); logger.info("last sql " + sql); try { users = qr.query(sql, new BeanListHandler<User>(User.class), params); } catch (SQLException e) { logger.error("realUserDao查询异常",e); } return users; } }
实体类就没必要展示了,都是一些简单的属性
下面是文件的读取主要是配置文件的读取
package net.engyne.util; import java.io.BufferedInputStream; import java.io.File; import java.io.FileInputStream; import java.io.InputStream; import java.util.Properties; import org.apache.log4j.Logger; import net.engyne.mqqt.MqttServer; public class FileUtil implements java.io.Serializable { /** * */ private static final long serialVersionUID = 1L; private static Logger logger = Logger.getLogger(MqttServer.class); public static String readValue(String filePath, String key) { Properties props = new Properties(); try { String url = new File("").getAbsolutePath(); InputStream in = new BufferedInputStream(new FileInputStream(url+ "/net/engyne/util/" + filePath)); props.load(in); String value = props.getProperty(key); logger.error(key + value); return value; } catch (Exception e) { e.printStackTrace(); return null; } } }
时间: 2024-10-25 15:32:08