<dependency> <groupId>org.drools</groupId> <artifactId>drools-core</artifactId> <version>6.3.0.Final</version> </dependency> <dependency> <groupId>org.drools</groupId> <artifactId>drools-compiler</artifactId> <version>6.3.0.Final</version> </dependency> <dependency> <groupId>org.eclipse.jdt.core.compiler</groupId> <artifactId>ecj</artifactId> <version>4.5.1</version> </dependency> <dependency> <groupId>org.drools</groupId> <artifactId>knowledge-api</artifactId> <version>6.3.0.Final</version> </dependency> <dependency> <groupId>org.mvel</groupId> <artifactId>mvel2</artifactId> <version>2.2.8.Final</version> </dependency> <dependency> <groupId>commons-codec</groupId> <artifactId>commons-codec</artifactId> <version>1.10</version> </dependency> <dependency> <groupId>commons-collections</groupId> <artifactId>commons-collections</artifactId> <version>3.2.2</version> </dependency> <dependency> <groupId>org.antlr</groupId> <artifactId>antlr4</artifactId> <version>4.5.3</version> <scope>test</scope> </dependency> <dependency> <groupId>org.antlr</groupId> <artifactId>antlr4-runtime</artifactId> <version>4.5.3</version> <scope>test</scope> </dependency>
package com.pccc.ucs.brms import com.pccc.brms.service.ILoggingService import com.pccc.brms.rulemodel.RuleMessage import com.pccc.brms.rulemodel.ScheduleMessage import com.pccc.brms.service.IRuleService import com.pccc.brms.rulemodel.ScheduleAction import com.pccc.brms.service.IRuleSession import com.pccc.brms.service.IService import com.pccc.brms.service.IThreadPoolService import com.pccc.brms.service.IScheduleService import com.pccc.brms.service.ILoggingService import com.pccc.brms.rulemodel.FreqFilterParam import com.pccc.brms.rulemodel.FreqFilterRuleList import java.util.List import com.pccc.brms.rulemodel.BatchFilterScheduleAction import com.pccc.brms.rulemodel.FreqFilterScheduleAction import com.pccc.brms.rulemodel.BatchFilterStatus import com.pccc.brms.service.IBatchFilterStatusService import com.pccc.brms.rulemodel.UrgentTaskScheduleAction import com.pccc.brms.rulemodel.UrgentCommTask import com.pccc.brms.rulemodel.AppMessage import com.pccc.brms.rulemodel.BatchImportStatus import com.pccc.brms.rulemodel.TaskScheduleAction import com.pccc.brms.rulemodel.UrgentTaskStatus import com.pccc.brms.service.IBatchImportStatusService //1-->java程序发来的消息 rule "RouteAppMessage" dialect "mvel" when appMsg : AppMessage( ) then System.out.println("触发规则引擎appMsg : AppMessage( ) ,appMsg.id=" + appMsg.id); //retract( appMsg ); RuleMessage fact0 = new RuleMessage(); fact0.setAppName( appMsg.appName ); fact0.setId( appMsg.id ); fact0.setData( appMsg.data ); fact0.setParamData( appMsg.paramData ); insertLogical( fact0 ); end //调用java程序 rule "DoScheduleAction" dialect "mvel" when action : ScheduleAction( ) scheduleSvc : IScheduleService( ) then System.out.println("触发规则引擎scheduleSvc.scheduleAction( action ) = " + action); scheduleSvc.scheduleAction( action ); end //2--> rule "010-OnLoadMainApp" dialect "mvel" when ruleMsg : RuleMessage( id == "app_load" , appName == "main" ) poolService : IThreadPoolService( ) then System.out.println("触发规则引擎appName=main, id=app_load"); poolService.initPool("EXPORT",4); poolService.initPool("IMPORT",4); poolService.initPool("IMPORT_ZNX",1); poolService.initPool("FILTER_ZNX",1); poolService.initPool("TASK_FILTER",1); poolService.initPool("TASK_IMPORT",1); RuleMessage fact0 = new RuleMessage(); fact0.setAppName( "main" ); fact0.setId( "batch_import" ); insertLogical( fact0 ); end //from row number: 6 //3-->批量导入客户名单,分发任务 rule "Row 6 020-ScheduleActionList" dialect "mvel" when ruleMsg : RuleMessage( appName == "main" , id == "batch_import" ) then System.out.println("触发规则引擎appName=main , id=batch_import - 操作ActionImportZnx"); ScheduleAction action = new ScheduleAction(); action.setActionType( "com.pccc.brms.bcl.input.action.ActionImportZnx" ); action.setActionPropertyFile( "./conf/import.properties,./conf/global.properties" ); action.setPoolName( "IMPORT" ); insertLogical( action ); end //from row number: 6 //4-->导入站内信渠道 rule "Row 6 031-ImportCustTask" dialect "mvel" when m : RuleMessage( id == "app_start_import_cust_task" , data matches "10" ) status : BatchImportStatus( this != null ) then System.out.println("触发规则引擎id=app_start_import_cust_task , data matches 10 - 操作ActionImportCustTaskZnx"); status.addZnxTaskCount(); TaskScheduleAction t = new TaskScheduleAction(); t.setActionType( "com.pccc.brms.bcl.input.impl.ActionImportCustTaskZnx" ); t.setActionPropertyFile( "./conf/import.properties" ); t.setPoolName( "IMPORT_ZNX" ); t.setNextMessageId( "import_one_cust_task_over" ); t.setNextMessageData( m.data ); t.setTask( m.paramData ); insertLogical( t ); end //from row number: 6 //5-->导入ZNX渠道单个名单完成 rule "Row 7 032-OnImportCustFinish" dialect "mvel" when m : RuleMessage( id == "import_one_cust_task_over" , data matches "10" ) status : BatchImportStatus( this != null ) s : IBatchImportStatusService( this != null ) then System.out.println("触发规则引擎id=import_one_cust_task_over , data matches 10"); status.addZnxTaskFinishCount(); s.isZnxChannelImportFinished(); end //from row number: 4 //6-->过滤ZNX rule "Row 2 051-BatchFilter" dialect "mvel" when f1 : RuleMessage( id == "app_import_completely_over" , data == "ZNX" ) then System.out.println("触发规则引擎id=app_import_completely_over , data == ZNX"); BatchFilterScheduleAction action = new BatchFilterScheduleAction(); action.setActionType( "com.pccc.brms.bcl.filter.impl.ActionBatchFilterZNX" ); action.setActionPropertyFile( "./conf/filter.properties" ); action.setPoolName( "FILTER_ZNX" ); action.setChannelType( f1.data ); insertLogical( action ); end //from row number: 9 //7-->站内信渠道过滤完成,立即输出 rule "Row 9 020-ScheduleActionList" dialect "mvel" when ruleMsg : RuleMessage( appName == "main" , id == "app_batch_filter_channel_ok" , data == "ZNX" ) then System.out.println("触发规则引擎appName=main , id=app_batch_filter_channel_ok , data=ZNX - 操作ActionDistributeExportZnx"); ScheduleAction action = new ScheduleAction(); action.setActionType( "com.pccc.brms.bcl.export.action.ActionDistributeExportZnx" ); action.setActionPropertyFile( "./conf/export.properties,./conf/import.properties" ); action.setPoolName( "EXPORT" ); insertLogical( action ); end //8-->站内信2次调度(逐个输出文件) rule "Row 5 080-ExportCustTask" dialect "mvel" when m : RuleMessage( id == "app_export_cust_task" , data matches "10" ) then System.out.println("触发规则引擎id=app_export_cust_task , data matches 10"); TaskScheduleAction t = new TaskScheduleAction(); t.setActionType( "com.pccc.brms.bcl.export.impl.ActionExportZnx" ); t.setActionPropertyFile( "./conf/export.properties,./conf/import.properties" ); t.setPoolName( "EXPORT" ); t.setNextMessageData( m.data ); t.setTask( m.paramData ); insertLogical( t ); end
package com.pccc.brms.rulemodel;/* * Copyright (c) 2012 Hex */ /** * Created with IntelliJ IDEA. * User: Hex * Date: 12-11-7 * Time: 上午10:51 * 专用在规则引擎中传递 */ public class RuleMessage { /** * 应用名称 */ private String appName=""; /** * 消息ID */ private String id=""; /** * 消息数据 */ private String data=""; /** * 参数对象 */ private Object paramData;
package com.pccc.brms.rulemodel; /** * User: Hex * Date: 12-11-8 * Time: 下午12:28 * Desc: */ public class ScheduleMessage { /* cron4j调度时间格式 */ private String schedulePattern; private String messageId;
/* * Copyright (c) 2012 Hex */ package com.pccc.brms.rulemodel; import java.util.Properties; /** * User: Hex * Date: 12-11-7 * Time: 下午4:59 * Desc: 用来调度Action 的配置类 */ public class ScheduleAction { /** * Action的Class Name * 例如ActionInit */ private String actionType; /** * 线程池名称 */ private String poolName=""; /** * 执行完成后通知到规则引擎的消息 */ private String nextMessageId=""; /** * 执行完成后通知到规则引擎的消息数据 */ private String nextMessageData=""; /** * 在线程池中执行时,是否等待线程执行完成 */ private boolean join = false; /** * Action需要的参数 */ private Properties properties=new Properties(); /** * Action Property 配置文件 */ private String actionPropertyFile=""; /** * cron4j调度时间格式 */ private String schedulePattern=""; public ScheduleAction() { //System.out.println("+------+ ScheduleAction.ScheduleAction()"); } public String getNextMessageId() { return nextMessageId; } public void setNextMessageId(String nextMessageId) { this.nextMessageId = nextMessageId; } public String getNextMessageData() { return nextMessageData; } public void setNextMessageData(String nextMessageData) { this.nextMessageData = nextMessageData; } public boolean isJoin() { return join; } public void setJoin(boolean join) { this.join = join; } public String getPoolName() { return poolName; } public void setPoolName(String poolName) { this.poolName = poolName; this.putString("poolName",poolName); } public String getActionPropertyFile() { return actionPropertyFile; } public void setActionPropertyFile(String actionPropertyFile) { this.actionPropertyFile = actionPropertyFile; } public String getSchedulePattern() { return schedulePattern; } public void setSchedulePattern(String schedulePattern) { this.schedulePattern = schedulePattern; } public String getActionType() { return actionType; } public void setActionType(String actionType) { this.actionType = actionType; } public Properties getProperties() { return properties; } public void setProperties(Properties properties) { this.properties = properties; } /** * 设置配置值到Properties * @param key * @param value */ public void putString(String key,String value){ this.properties.setProperty(key, value); } /** * 从Properties读取配置值 * @param key * @return */ public String fetchString(String key){ return this.properties.getProperty(key,""); } /** * 设置对象到Properties * @param key * @param value */ public void put(String key,Object value){ this.properties.put(key,value); } /** * 从Properties获得对象 * @param key * @return */ public Object fetch(String key){ return this.properties.get(key); } }
/* * Copyright (c) 2012 Hex */ package com.pccc.brms.rulemodel; /** * User: Hex * Date: 12-11-9 * Time: 下午7:25 * Desc: 频次过滤规则 */ public class FreqFilterParam { /** * 条件优先级 */ private String freqPriority; /** * 沟通业务类型 */ private int commBussGroup; /** * 渠道类型 */ private String channelSort; /** * 日频次限制 */ private int dayLimit; /** * 周频次限制 */ private int weekLimit; /************************ gets and sets *****************************/ public String getFreqPriority() { return freqPriority; } public void setFreqPriority(String freqPriority) { this.freqPriority = freqPriority; } public int getCommBussGroup() { return commBussGroup; } public void setCommBussGroup(int commBussGroup) { this.commBussGroup = commBussGroup; } public String getChannelSort() { return channelSort; } public void setChannelSort(String channelSort) { this.channelSort = channelSort; } public int getDayLimit() { return dayLimit; } public void setDayLimit(int dayLimit) { this.dayLimit = dayLimit; } public int getWeekLimit() { return weekLimit; } public void setWeekLimit(int weekLimit) { this.weekLimit = weekLimit; } }
/* * Copyright (c) 2012 Hex */ package com.pccc.brms.rulemodel; import java.util.ArrayList; import java.util.List; /** * User: Hex * Date: 12-11-10 * Time: 下午4:53 * Desc: */ public class FreqFilterRuleList { private List<FreqFilterParam> freqFilterRules=new ArrayList<FreqFilterParam>(); public List<FreqFilterParam> getFreqFilterRules() { return freqFilterRules; } public void setFreqFilterRules(List<FreqFilterParam> freqFilterRules) { this.freqFilterRules = freqFilterRules; } }
/* * Copyright (c) 2012 Hex */ package com.pccc.brms.rulemodel; /** * User: Hex * Date: 12-11-10 * Time: 下午6:12 * Desc: */ public class BatchFilterScheduleAction extends ScheduleAction { /** * 沟通性质:生产/非生产 */ private String commProperty =""; /** * 渠道类型 */ private String channelType=""; public BatchFilterScheduleAction() { //System.out.println("+------+ BatchFilterScheduleActionz()"); } public String getChannelType() { return channelType; } public void setChannelType(String channelType) { this.channelType = channelType; this.putString("channelType",channelType); } public String getCommProperty() { return commProperty; } public void setCommProperty(String commProperty) { this.commProperty = commProperty; this.putString("commProperty",commProperty); } }
/* * Copyright (c) 2012 Hex */ package com.pccc.brms.rulemodel; import com.pccc.brms.service.IRuleSession; /** * User: Hex * Date: 12-11-10 * Time: 下午10:43 * Desc: 批量过滤状态数据 */ public class BatchFilterStatus { private int smsFilterCount = 1; private int emailFilterCount = 1; private int postFilterCount = 1; private int telFilterCount = 1; private int bxtelFilterCount = 1; private int znxFilterCount = 1; private int wcFilterCount = 1; private int smsFilterFinishCount = 0; private int emailFilterFinishCount = 0; private int postFilterFinishCount = 0; private int telFilterFinishCount = 0; private int bxtelFilterFinishCount = 0; private int znxFilterFinishCount = 0; private int wcFilterFinishCount = 0; public void setWcFilterFinishCount(int wcFilterFinishCount) { this.wcFilterFinishCount = wcFilterFinishCount; } /** * 频次优先级过滤规则配置 */ private FreqFilterRuleList freqFilterRuleList = new FreqFilterRuleList(); /** * 是否需要过滤频次 */ private boolean needFilterFreq = true; /** * 是否需要过滤频次 * * @return */ public boolean isNeedFilterFreq() { return needFilterFreq; } public void setNeedFilterFreq(boolean needFilterFreq) { this.needFilterFreq = needFilterFreq; } public FreqFilterRuleList getFreqFilterRuleList() { return freqFilterRuleList; } public void insertToRuleSession(IRuleSession ruleSession) { for (FreqFilterParam rule : freqFilterRuleList.getFreqFilterRules()) { ruleSession.insert(rule); } this.needFilterFreq = this.freqFilterRuleList.getFreqFilterRules().size() > 0; ruleSession.insert(this.freqFilterRuleList); ruleSession.insert(this); } public int getSmsFilterCount() { return smsFilterCount; } public void setSmsFilterCount(int smsFilterCount) { this.smsFilterCount = smsFilterCount; } public int getEmailFilterCount() { return emailFilterCount; } public void setEmailFilterCount(int emailFilterCount) { this.emailFilterCount = emailFilterCount; } public int getPostFilterCount() { return postFilterCount; } public void setPostFilterCount(int postFilterCount) { this.postFilterCount = postFilterCount; } public int getTelFilterCount() { return telFilterCount; } public void setTelFilterCount(int telFilterCount) { this.telFilterCount = telFilterCount; } public int getZnxFilterCount() { return znxFilterCount; } public void setZnxFilterCount(int znxFilterCount) { this.znxFilterCount = znxFilterCount; } public int getSmsFilterFinishCount() { return smsFilterFinishCount; } public synchronized void addSmsFilterFinishCount() { this.smsFilterFinishCount++; } public int getEmailFilterFinishCount() { return emailFilterFinishCount; } public synchronized void addEmailFilterFinishCount() { this.emailFilterFinishCount++; } public int getPostFilterFinishCount() { return postFilterFinishCount; } public synchronized void addPostFilterFinishCount() { this.postFilterFinishCount++; } public int getTelFilterFinishCount() { return telFilterFinishCount; } public synchronized void addTelFilterFinishCount() { this.telFilterFinishCount++; } public int getBxtelFilterCount() { return bxtelFilterCount; } public void setBxtelFilterCount(int bxtelFilterCount) { this.bxtelFilterCount = bxtelFilterCount; } public int getBxtelFilterFinishCount() { return bxtelFilterFinishCount; } public synchronized void addBxtelFilterFinishCount() { this.bxtelFilterFinishCount++; } public int getZnxFilterFinishCount() { return znxFilterFinishCount; } public synchronized void addZnxFilterFinishCount() { this.znxFilterFinishCount++; } public int getWcFilterFinishCount() { return wcFilterFinishCount; } public synchronized void addWcFilterFinishCount() { this.wcFilterFinishCount++; } public int getWcFilterCount() { return wcFilterCount; } public void setWcFilterCount(int wcFilterCount) { this.wcFilterCount = wcFilterCount; } }
/* * Copyright (c) 2012 Hex */ package com.pccc.brms.rulemodel; /** * User: Hex Date: 12-11-19 Time: 下午10:35 Desc: 批量导入状态 */ public class BatchImportStatus { /** * 任务总数(非外呼渠道) */ private int totalTaskCount = 0; /** * 屏蔽名单是否导入完成 */ private boolean maskPolicyImportFinished; /** * 电话外呼渠道是否导入完成 */ private boolean telImportFinished; /** * 保险外呼渠道是否导入完成 */ private boolean bxtelImportFinished; /** * WC渠道是否导入完成 */ private boolean wcImportFinished; /** * SMS类任务数量 */ private int smsTaskCount; /** * EMAIL类任务数量 */ private int emailTaskCount; /** * POST类任务数量 */ private int postTaskCount; /** * ZNX类任务数量 */ private int znxTaskCount; /** * WC类任务数量 */ private int wcTaskCount; /** * SMS类任务已导入完成数量 */ private int smsTaskFinishCount; /** * EMAIL类任务已导入完成数量 */ private int emailTaskFinishCount; /** * POST类任务已导入完成数量 */ private int postTaskFinishCount; /** * ZNX类任务已导入完成数量 */ private int znxTaskFinishCount; /** * WC类任务已导入完成数量 */ private int wcTaskFinishCount; /** * 已记录待导入的任务数 * * @return */ public int getTransTaskCount() { return this.getSmsTaskCount() + this.getEmailTaskCount() + this.getPostTaskCount(); } public synchronized void addTransTaskCount() { this.totalTaskCount++; } public int getTotalTaskCount() { return totalTaskCount; } public int getWcTaskCount() { return wcTaskCount; } public void setWcTaskCount(int wcTaskCount) { this.wcTaskCount = wcTaskCount; }
/* * Copyright (c) 2012 Hex */ package com.pccc.brms.rulemodel; /** * User: Hex Date: 12-11-11 Time: 下午2:55 Desc: 用来调度单个任务 */ public class TaskScheduleAction extends ScheduleAction { /** * 执行任务的类 * 例如:com.pccc.brms.bcl.export.impl.ActionExportSms */ private Object task; public Object getTask() { return task; } public void setTask(Object task) { this.task = task; this.put("task", task); } }
/* * Copyright (c) 2012 Hex */ package com.pccc.brms.rulemodel; /** * User: Hex * Date: 12-11-20 * Time: 下午6:48 * Desc: 即时任务状态数据 */ public class UrgentTaskStatus { /** * 即时任务数量 */ private int taskCount; /** * 已导入完成的数量 */ private int importFinishCount; /** * 已过滤完成的数量 */ private int filterFinishCount; public int getTaskCount() { return taskCount; } public void setTaskCount(int taskCount) { this.taskCount = taskCount; } public int getImportFinishCount() { return importFinishCount; } public void setImportFinishCount(int importFinishCount) { this.importFinishCount = importFinishCount; } public int getFilterFinishCount() { return filterFinishCount; } public void setFilterFinishCount(int filterFinishCount) { this.filterFinishCount = filterFinishCount; } public synchronized void addImportFinish(){ this.importFinishCount ++; } public synchronized void addFilterFinish(){ this.filterFinishCount++; } }
/* * Copyright (c) 2012 Hex */ package com.pccc.brms.service; import com.pccc.brms.rulemodel.BatchImportStatus; /** * User: Hex * Date: 12-11-11 * Time: 上午12:17 * Desc: 批量导入状态服务 */ public interface IBatchImportStatusService extends IService { /** * 获得批量导入状态数据 **/ public BatchImportStatus getBatchImportStatus(); /** * 设置数据库用户名 * * @param dbUserName */ public void setDbUserName(String dbUserName); /** * 检查各渠道批量导入是否完成 * * 如果完成则通知完成消息 */ public void isAllChannelImportFinished(); /** * 检查保险外呼渠道批量导入是否完成 * * 如果完成则通知完成消息 */ public void isBxTelChannelImportFinished(); /** * 检查站内信渠道批量导入是否完成 * * 如果完成则通知完成消息 */ public void isZnxChannelImportFinished(); /** * 检查微信渠道批量导入是否完成 * * 如果完成则通知完成消息 */ public void isWcChannelImportFinished(); }
/* * Copyright (c) 2012 Hex */ package com.pccc.brms.service; import com.pccc.brms.rulemodel.BatchFilterStatus; /** * User: Hex * Date: 12-11-11 * Time: 上午12:17 * Desc: 批量过滤状态服务 */ public interface IBatchFilterStatusService extends IService { /** * 获得批量过滤状态数据 **/ public BatchFilterStatus getBatchFilterStatus(); /** * 判断所有渠道类型是否过滤完成 **/ public boolean isAllChannelFilterFinished(); /** * 判断保险外呼渠道类是否过滤完成 **/ public boolean isBxTelChannelFilterFinished(); /** * 判断站内信渠道类是否过滤完成 **/ public boolean isZnxChannelFilterFinished(); /** * 判断微信渠道类是否过滤完成 **/ public boolean isWcChannelFilterFinished(); }
/* * Copyright (c) 2012 Hex */ package com.pccc.brms.service; /** * User: Hex * Date: 12-11-7 * Time: 下午1:39 * Desc: 日志服务接口 */ public interface ILoggingService extends IService { public void debug(java.lang.Object message); public void debug(java.lang.Object message, java.lang.Throwable t); public void error(java.lang.Object message); public void error(java.lang.Object message, java.lang.Throwable t); public void fatal(java.lang.Object message) ; public void fatal(java.lang.Object message, java.lang.Throwable t); public void info(java.lang.Object message) ; public void info(java.lang.Object message, java.lang.Throwable t) ; public void warn(java.lang.Object message); public void warn(java.lang.Object message, java.lang.Throwable t); }
/* * Copyright (c) 2012 Hex */ package com.pccc.brms.service; import java.io.File; import com.pccc.brms.service.IService; /** * User: Hex * Date: 12-11-7 * Time: 上午10:52 * Desc:规则引擎服务接口 */ public interface IRuleService extends IService { /** * 载入规则文件,初始化规则库 * @param ruleFile */ public void initRuleEngine(File ruleFile); /** * 构造一个规则运行时Session,用来计算规则 * @return */ public IRuleSession newRuleSession(); }
package com.pccc.brms.service;/* * Copyright (c) 2012 Hex */ /** * User: Hex * Date: 12-11-7 * Time: 上午10:54 * Desc: 规则运行时Session接口 */ public interface IRuleSession { /** * 插入一个对象到Session中,以便规则引擎进行匹配计算 * 每次insert一次,规则引擎自动匹配规则一次 * * @param model 在规则中成为模型 */ public void insert(Object model); /** * 触发所有规则,直到所有规则匹配完成后才返回 */ public void fireAll(); /** * 触发所有规则,所有规则匹配完成后不返回,还继续等待insert,可继续触发规则,直接显示调用halt方法后才返回 */ public void fireUnitHalt(); /** * 终止规则计算,将导致 fireUnitHalt 方法退出 */ public void halt(); /** * 释放资源 */ public void dispose(); /** * 发送消息给规则引擎 * * @param appName 程序名 - main * @param msgId 消息ID - 例如 load “app_export_cust_task” * @param data 传递数据 - 例如 渠道 * @param paramData 参数 - 例如 task */ public void sendAppMessage(String appName,String msgId, String data, Object paramData); }
package com.pccc.brms.service;/* * Copyright (c) 2012 Hex */ import com.pccc.brms.rulemodel.ScheduleAction; import com.pccc.brms.rulemodel.ScheduleMessage; /** * User: Hex * Date: 12-11-7 * Time: 下午3:40 * Desc: */ public interface IScheduleService extends IService { /* 日初/日间 */ public String getWorkMode(); public void setWorkMode(String mode); /** * 规则引擎调用该方法,实现对具体的Action的启动 * * @param scheduleAction */ public void scheduleAction(ScheduleAction scheduleAction); /** * 调度发送规则消息 * @param scheduleMessage */ public void scheduleMessage(ScheduleMessage scheduleMessage); }
/* * Copyright (c) 2012 Hex */ package com.pccc.brms.service; /** * Created with IntelliJ IDEA. * User: Hex * Date: 12-11-7 * Time: 下午12:52 * Desc:服务基础接口. */ public interface IService { /** * 获取服务名称 * @return */ public String getName(); /** * 关闭服务 */ public void shutdown(); }
package com.pccc.brms.service; /* * Copyright (c) 2012 Hex */ import com.pccc.brms.rulemodel.ScheduleAction; /** * User: Hex * Date: 12-11-8 * Time: 上午11:06 * Desc: 线程池服务 */ public interface IThreadPoolService extends IService { /** * 初始化一个线程池,并设置名单和大小 * 由rule调用 */ public void initPool(String name, int maxCount); /** * * 根据Action的Class名称,在指定的线程池中执行Action * @join:是否等待Action执行完成 * * @param action */ public void queueAction(ScheduleAction action); }
<?xml version="1.0" encoding="UTF-8"?> <module classpath="eclipse" classpath-dir="$MODULE_DIR$" type="JAVA_MODULE" version="4"> <component name="NewModuleRootManager" inherit-compiler-output="true"> <exclude-output /> <content url="file://$MODULE_DIR$"> <sourceFolder url="file://$MODULE_DIR$/src" isTestSource="false" /> </content> <orderEntry type="inheritedJdk" /> <orderEntry type="sourceFolder" forTests="false" /> </component> </module>
<?xml version="1.0" encoding="UTF-8"?> <component inherit-compiler-output="true"> <output-test url="file://$MODULE_DIR$/../out/test/ucs.brms.rulebase"/> <exclude-output/> <contentEntry url="file://$MODULE_DIR$"/> </component>
/* * Copyright (c) 2012 Hex */ package com.pccc.brms.rulemodel; /** * User: Hex * Date: 12-11-11 * Time: 下午2:55 * Desc: 用来调度即时沟通任务 */ public class UrgentTaskScheduleAction extends ScheduleAction { private Object urgentCommTask; public Object getUrgentCommTask() { return urgentCommTask; } public void setUrgentCommTask(Object urgentCommTask) { this.urgentCommTask = urgentCommTask; this.put("task",urgentCommTask); } }
/* * Copyright (c) 2012 Hex */ package com.pccc.brms.rulemodel; /** * User: Hex * Date: 12-11-11 * Time: 下午2:52 * Desc: 即时沟通任务 */ public class UrgentCommTask { /** * 任务ID */ private String taskId; /** * 沟通性质 */ private String commProperty=""; /** * 渠道类型 */ private String channelType=""; /** * 即时任务状态数据 */ private UrgentTaskStatus urgentTaskStatus; /** * 当前任务数据(TaskToday) */ private Object currTask;
/* * Copyright (c) 2012 Hex */ package com.pccc.brms.rulemodel; /** * User: Hex * Date: 12-11-10 * Time: 下午8:08 * Desc: 用来调度批量过滤频次 */ public class FreqFilterScheduleAction extends BatchFilterScheduleAction { private FreqFilterParam freqFilterRule; public FreqFilterScheduleAction() { //System.out.println("+------+ FreqFilterScheduleAction()"); } public FreqFilterParam getFreqFilterRule() { return freqFilterRule; } public void setFreqFilterRule(FreqFilterParam freqFilterRule) { this.freqFilterRule = freqFilterRule; this.put("freqRule",this.freqFilterRule); } }
/* * Copyright (c) 2012 Hex */ package com.pccc.brms.service; import com.pccc.brms.action.Action; import jsr166y.ForkJoinPool; import jsr166y.RecursiveAction; /** * User: Hex * Date: 12-11-16 * Time: 下午9:57 * Desc: 线程池基础接口 */ public interface IBaseThreadPoolService extends IService { /** * 获取指定名称的线程池 * @param name * @return */ public ForkJoinPool getPool(String name); /** * 在指定的线程池中执行Action * @param action * @param poolName * @param join * @return */ public RecursiveAction queueAction(Action action, String poolName, boolean join); }
/* * Copyright (c) 2012 Hex */ package com.pccc.brms.service; /** * Created with IntelliJ IDEA. * User: Hex * Date: 12-11-7 * Time: 下午12:31 * 服务提供器接口 */ public interface IServiceProvider { /** * 获得容器内所有服务对象 * @return */ public IService[] getServices(); /** * 添加一个服务到容器内 * @param svc */ public void add(IService svc); /** * 获得指定名称的服务对象 * @param name * @return */ public IService getService(String name); /** * 获得指定类型的服务对象 * @param clazz * @param <T> * @return */ public <T extends IService> T getService(Class<T> clazz); /** * 清空容器 */ public void clear(); }
/* * Copyright (c) 2012 Hex */ package com.pccc.brms.service; /** * User: Hex * Date: 12-11-16 * Time: 下午3:58 * Desc: 系统模块日志服务 */ public interface ISysLogService extends IService { /** * 记录模块日志 * @param module * @param statusCode * @param statusDesc * @param switchDate */ public void log(String module,String statusCode,String statusDesc,String switchDate); }
/* * Copyright (c) 2012 Hex */ package com.pccc.brms.service; import org.drools.KnowledgeBase; import org.drools.builder.KnowledgeBuilder; import org.drools.builder.KnowledgeBuilderFactory; import org.drools.builder.ResourceType; import org.drools.io.ResourceFactory; import org.drools.runtime.StatefulKnowledgeSession; import java.io.*; /** * User: Hex * Date: 12-11-7 * Time: 上午10:57 * 使用Drools实现的规则引擎服务类 */ public class RuleService extends Service implements IRuleService { /** * 规则引擎知识库 */ private KnowledgeBase knowledgeBase; @Override public void initRuleEngine(File ruleFile) { //hex 20121114 修复读取utf-8格式的drl文件发生错误的bug String content = readStringFromFile(ruleFile); KnowledgeBuilder knowledgeBuilder = KnowledgeBuilderFactory.newKnowledgeBuilder(); knowledgeBuilder.add(ResourceFactory.newReaderResource(new StringReader(content)), ResourceType.DRL); if (knowledgeBuilder.hasErrors()) { this.getLogger().fatal(knowledgeBuilder.getErrors()); return; } knowledgeBase = knowledgeBuilder.newKnowledgeBase(); } /** * 根据指定编码从文件中读取所有字符串 * * @param ruleFile * @param encoding * @return */ private String readStringFromFile(File ruleFile) { StringBuffer fileContent = new StringBuffer(); BufferedReader br = null; FileInputStream fis=null; try { fis=new FileInputStream(ruleFile); br = new BufferedReader(new InputStreamReader(fis, "UTF-8")); String line = null; while ((line = br.readLine()) != null) { fileContent.append(line); fileContent.append("\n"); } } catch (Exception e) { e.printStackTrace(); } finally{ if(br!=null) try { br.close(); } catch (IOException e) { e.printStackTrace(); } if(fis!=null) try { fis.close(); } catch (IOException e) { e.printStackTrace(); } } return fileContent.toString(); } @Override public IRuleSession newRuleSession() { if (this.knowledgeBase == null) { return null; } StatefulKnowledgeSession session = this.knowledgeBase.newStatefulKnowledgeSession(); return new RuleSession(session); } @Override public String getName() { return ServiceName.RULE_SERVICE; } }
/* * Copyright (c) 2012 Hex */ package com.pccc.brms.service; import org.drools.runtime.StatefulKnowledgeSession; import com.pccc.brms.rulemodel.AppMessage; import com.pccc.brms.utils.LoggerFactory; /** * User: Hex Date: 12-11-7 Time: 上午11:20 Desc: 由 StatefulKnowledgeSession * 实现IRuleSession的类 */ class RuleSession implements IRuleSession { private ILoggingService logger = LoggerFactory.getLoggingService(this.getClass()); /** * 规则引擎会话 */ private StatefulKnowledgeSession session; /** * 构造方法 * * @param session */ public RuleSession(StatefulKnowledgeSession session) { this.session = session; } @Override public void insert(Object model) { synchronized (this) { this.session.insert(model); } } @Override public void sendAppMessage(String appName, String msgId, String data, Object paramData) { logger.debug("Java发送规则消息,appName=" + appName + ",msgId=" + msgId + ",data=" + data + ",paramData=" + paramData); AppMessage appMsg = new AppMessage(); appMsg.setAppName(appName); appMsg.setId(msgId); appMsg.setData(data); appMsg.setParamData(paramData); this.insert(appMsg); } @Override public void fireUnitHalt() { this.session.fireUntilHalt(); } @Override public void fireAll() { this.session.fireAllRules(); } @Override public void halt() { this.session.halt(); } @Override public void dispose() { this.session.dispose(); } }
/* * Copyright (c) 2012 Hex */ package com.pccc.brms.service; import com.pccc.brms.App; import com.pccc.brms.action.Action; import com.pccc.brms.action.ActionProxy; import com.pccc.brms.rulemodel.AppMessage; import com.pccc.brms.rulemodel.ScheduleAction; import com.pccc.brms.rulemodel.ScheduleMessage; import it.sauronsoftware.cron4j.Scheduler; import it.sauronsoftware.cron4j.Task; import it.sauronsoftware.cron4j.TaskExecutionContext; /** * User: Hex * Date: 12-11-7 * Time: 下午4:57 * Desc: 使用Cron4j实现的调度服务类 */ public class ScheduleService extends Service implements IScheduleService { /** * cron4j定时器 */ private Scheduler scheduler; /** * 目前没用 */ private String workMode="0"; /** * 构造方法 */ public ScheduleService() { this.scheduler = new Scheduler(); this.scheduler.start(); } @Override public void scheduleAction(ScheduleAction scheduleAction) { this.getLogger().debug("scheduleAction被规则引擎调度启动......"); Task task = this.buildCron4jTask(scheduleAction);//是否要线程池//NonePoolTask or InPoolTask if (scheduleAction.getSchedulePattern().isEmpty()){ this.getLogger().debug("Task立刻执行:" + task); scheduler.launch(task);//启动cron4j的Task } else { this.getLogger().debug("Task定时("+scheduleAction.getSchedulePattern()+")执行:" + task); scheduler.schedule(scheduleAction.getSchedulePattern(),task); } } /** * 根据具体的模块action是否需要使用多线程的情况 * * @param scheduleAction 模块action * @return Cron4j的Task对象 * @throws Exception */ private Task buildCron4jTask(ScheduleAction scheduleAction) { Task task = null; if(scheduleAction.getPoolName().isEmpty()) { //规则引擎传过来的action不需要用线程池的情况 task = new NonePoolTask(new ActionProxy(scheduleAction)); this.getLogger().debug("不使用线程池的模块Action:"+ scheduleAction.getActionType() +",Cron4j的Task构建成功:" + task); } else { //需要线程池线程池,则返回ActionPoolTask IThreadPoolService threadPoolService = this.getApp().getServiceProvider().getService(IThreadPoolService.class); task = new InPoolTask(threadPoolService, scheduleAction); this.getLogger().debug("使用线程池的模块Action:"+ scheduleAction.getActionType() +",Cron4j的Task构建成功:" + task); } return task; } @Override public void scheduleMessage(ScheduleMessage scheduleMessage) { try{ IRuleSession ruleSession = App.getCurrent().getRuleSession(); if (null == ruleSession){ return; } Task task = new SendRuleMessageTask(scheduleMessage.getMessageId(), ruleSession); if (scheduleMessage.getSchedulePattern().isEmpty()){ //立即执行 scheduler.launch(task); return; } scheduler.schedule(scheduleMessage.getSchedulePattern(),task); } catch (Throwable ex){ this.getLogger().error(ex.toString()); } } /** * **************************************** * 内部类 * **************************************** * * Cron4j Task的子类 * 用来调度在线程池中执行Action的 * * @author caikang * */ class InPoolTask extends Task { private ScheduleAction scheduleAction; private IThreadPoolService threadPoolService; /** * 构造方法 * @param actionProxy */ public InPoolTask(IThreadPoolService poolService, ScheduleAction scheduleAction) { this.threadPoolService = poolService; this.scheduleAction = scheduleAction; } @Override public void execute(TaskExecutionContext taskExecutionContext) throws RuntimeException { this.threadPoolService.queueAction(this.scheduleAction); } } /** * **************************************** * 内部类 * **************************************** * * Cron4j Task的子类 * 用来调度执行Action的 * * @author caikang * */ class NonePoolTask extends Task { /** * ActionProxy */ private Action actionProxy; /** * 构造方法 * @param action */ public NonePoolTask(Action action) { this.actionProxy = action; } @Override public void execute(TaskExecutionContext taskExecutionContext) throws RuntimeException { try { this.actionProxy.executeAction(); } catch (Exception e) { e.printStackTrace(); } } } /** * **************************************** * 内部类 * **************************************** * * 用来调度发送消息的Cron4j Task * */ class SendRuleMessageTask extends Task { private String messageId; private IRuleSession ruleSession; public SendRuleMessageTask(String messageId, IRuleSession ruleSession) { this.messageId = messageId; this.ruleSession = ruleSession; } @Override public void execute(TaskExecutionContext taskExecutionContext) throws RuntimeException { AppMessage ruleMessage = new AppMessage(); ruleMessage.setAppName(App.getCurrent().getName()); ruleMessage.setId(this.messageId); this.ruleSession.insert(ruleMessage); } } @Override public String getName() { return ServiceName.SCHEDULE_SERVICE; } @Override public void shutdown() { this.scheduler.stop(); } @Override public String getWorkMode() { return workMode; } @Override public void setWorkMode(String mode) { workMode=mode; } }
/* * Copyright (c) 2012 Hex */ package com.pccc.brms.service; import com.pccc.brms.App; import com.pccc.brms.IApp; import com.pccc.brms.utils.LoggerFactory; /** * User: Hex * Date: 12-11-7 * Time: 下午3:47 * Desc: 基础服务抽象类 */ public abstract class Service implements IService { protected ILoggingService logger; /** * 构造方法 */ protected Service() { // this.logger= LoggerFactory.getLoggingService(this.getName());//会造成死循环 } protected ILoggingService getLogger() { synchronized (this) { if (null == this.logger) { this.logger = LoggerFactory.getLoggingService(this.getName()); } return this.logger; } } public IApp getApp() { return App.getCurrent(); } public void shutdown() { } public String getName() { return this.getClass().getSimpleName(); } }
package com.pccc.brms.service; public class ServiceName { public static final String RULE_SERVICE = "rule service"; public static final String THREAD_POOL_SERVICE = "thread pool service"; public static final String SCHEDULE_SERVICE = "schedule service"; }
/* * Copyright (c) 2012 Hex */ package com.pccc.brms.service; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; /** * User: Hex * Date: 12-11-7 * Time: 下午12:52 * 实现IServiceProvider 的服务提供器 */ public class ServiceProvider implements IServiceProvider { /** * 服务名称的Map集合 */ private Map<String, IService> services; /** * 服务类型的Map集合 * key 接口名或者父类名 * value 继承这个接口或者父类的所有类 */ private Map<Class<?>, List<Object>> serviceMap; /** * 构造方法 */ public ServiceProvider() { services = new HashMap<String, IService>(); serviceMap = new HashMap<Class<?>, List<Object>>(); } @Override public void add(IService svc) { services.put(svc.getName(), svc); //添加到服务类型Map // RuleService的结果: // class java.lang.Object=[[email protected]], // interface com.p[email protected]1e0a283], // class com.pccc.brms.service.RuleService=[[email protected]], // class com.pccc.brms.service.Service=[[email protected]] synchronized (this) { Class<?> key = svc.getClass(); for (Class<?> oneInterface : svc.getClass().getInterfaces()) {//返回这个类继承的所有接口 List<Object> list = null; if (serviceMap.containsKey(oneInterface)) { list = serviceMap.get(oneInterface); } else { list = new ArrayList<Object>(); serviceMap.put(oneInterface, list); } list.add(svc); } while (key != null) { List<Object> list = null; if (this.serviceMap.containsKey(key)) { list = this.serviceMap.get(key); } else { list = new ArrayList<Object>(); this.serviceMap.put(key, list); } list.add(svc); key = key.getSuperclass();//获取它的父类的名称 } } } @Override public IService[] getServices() { return services.values().toArray(new IService[0]); } @Override public IService getService(String name) { return services.get(name); } public Object GetServiceByClass(Class<?> serviceType) { if (serviceType == null) { return null; } synchronized (this) { Object obj2 = null; if (this.serviceMap.containsKey(serviceType)) { List<Object> list = this.serviceMap.get(serviceType); if (list.size() > 0) { obj2 = list.get(0); } } return obj2; } } public <T extends IService> T getService(Class<T> clazz) { Object o = this.GetServiceByClass(clazz); if (o instanceof IService){ return clazz.cast(o); } return null; } @Override public void clear() { this.services.clear(); this.serviceMap.clear(); } }
/* * Copyright (c) 2012 Hex */ package com.pccc.brms.service; import com.pccc.brms.action.Action; import com.pccc.brms.action.ActionProxy; import com.pccc.brms.rulemodel.ScheduleAction; import jsr166y.ForkJoinPool; import jsr166y.RecursiveAction; import java.util.HashMap; import java.util.Map; /** * User: Hex * Date: 12-11-8 * Time: 上午11:11 * Desc: 使用ForkJoinPool实现的线程池服务类 */ public class ThreadPoolService extends Service implements IThreadPoolService, IBaseThreadPoolService { /** * 线程池集合 * key - 线程池名称 * value - 线程池实例 */ private Map<String, ForkJoinPool> pools; /** * 默认的线程池 */ private ForkJoinPool defaultPool; /** * 构造方法 */ public ThreadPoolService() { pools = new HashMap<String, ForkJoinPool>(); defaultPool = new ForkJoinPool(); } @Override public void initPool(String name, int maxCount) { ForkJoinPool pool = new ForkJoinPool(maxCount); pools.put(name, pool); } /** * 在指定的线程池中执行Action * * @param action * @param poolName * @param join */ @Override public RecursiveAction queueAction(Action action, String poolName, boolean join) { ForkJoinPool pool = this.getPool(poolName); ActionAdapter actionWrapper = new ActionAdapter(action); pool.submit(actionWrapper);//submit无返回, invoke有返回 if (join) { actionWrapper.join(); } return actionWrapper; } @Override public ForkJoinPool getPool(String name) { if (pools.containsKey(name)) { return pools.get(name); } return defaultPool; } @Override public void queueAction(ScheduleAction scheduleAction) { try { ActionProxy actionProxy = new ActionProxy(scheduleAction); this.queueAction(actionProxy, scheduleAction.getPoolName(), scheduleAction.isJoin()); } catch (Exception ex) { this.getLogger().fatal(ex); } } /** * Action适配器,用来在线程池中执行 */ private class ActionAdapter extends RecursiveAction { private static final long serialVersionUID = 7769517269350767917L; private Action action; private ActionAdapter(Action action) { this.action = action; } @Override protected void compute() { try { action.executeAction(); } catch (Exception e) { e.printStackTrace(); } } } @Override public String getName() { return ServiceName.THREAD_POOL_SERVICE; } @Override public void shutdown() { for (ForkJoinPool pool : this.pools.values()) { pool.shutdown(); } defaultPool.shutdown(); } }
/* * Copyright (c) 2012 Hex */ package com.pccc.brms.utils; public class RuleMessages { /** * 程序启动时触发的规则消息 */ public static final String LOAD = "app_load"; /** * 开始导入客户任务 */ public static final String START_IMPORT_CUST_TASK = "app_start_import_cust_task"; /** * 开始导入即时客户任务 */ public static final String START_IMPORT_URGENT_CUST_TASK = "app_start_import_urgent_cust_task"; /** * */ public static final String FILTER_URGENT_TASK = "filter_urgent_cust_task"; /** * 某个渠道批量过滤完成消息 */ public static final String FILTER_BATCH_CHANNEL_FINISH = "app_batch_filter_channel_ok"; /** * 所有渠道批量过滤完成消息 */ public static final String FILTER_BATCH_DONE = "app_batch_filter_done"; /** * 当前所有即时任务过滤完成消息 */ public static final String FILTER_URGENT_TASKS_DONE = "app_urgent_tasks_filter_done"; /** * 某个渠道类型数据导入完成 */ public static final String BATCH_IMPORT_TASK_CHANNEL_FINISH = "app_import_completely_over"; /** * 导出客户任务触发的消息 */ public static final String EXPORT_CUST_TASK = "app_export_cust_task"; }
/* * Copyright (c) 2012 Hex */ package com.pccc.brms.config; import java.io.*; /** * User: Hex * Date: 12-11-15 * Time: 下午9:59 * Desc: 应用程序配置类 */ public class AppConfig implements IAppConfig { /** * 配置文件 */ private String propFilePath; /** * 配置对象 */ private PropertyConfig config; /** * 构造方法,初始化config * * @param configFile * @throws IOException */ public AppConfig(String propFilePath) throws IOException { this.propFilePath = propFilePath; this.config = new PropertyConfig(); this.config.load(this.propFilePath); } /********************** gets and sets **************************/ @Override public String getDbDriverClass() { return this.config.getString("db.driverClass"); } @Override public String getJdbcUrl() { return this.config.getString("db.jdbcUrl"); } @Override public String getDbUser() { return this.config.getString("db.user"); } @Override public String getDbPassword() { return this.config.getString("db.password"); } @Override public String getPropFilePath() { return propFilePath; } public void setPropFilePath(String propFilePath) { this.propFilePath = propFilePath; } @Override public IConfig getConfig(){ return this.config; } public void setConfig(PropertyConfig config) { this.config = config; } }
/* * Copyright (c) 2012 Hex */ package com.pccc.brms.config; /** * User: Hex * Date: 12-11-15 * Time: 下午9:58 * Desc: */ public interface IAppConfig { /** * 获得配置文件路径 * @return */ public String getPropFilePath(); /** * 获得所有配置的IConfig对象,通过IConfig对象可访问其它配置项 * @return */ public IConfig getConfig(); /** * 获得数据库驱动类名 * @return */ public String getDbDriverClass(); /** * 获得数据库连接地址 * @return */ public String getJdbcUrl(); /** * 获得数据库访问用户 * @return */ public String getDbUser(); /** * 获得数据库访问密码 * @return */ public String getDbPassword(); }
/* * Copyright (c) 2012 Hex */ package com.pccc.brms.config; /** * User: Hex * Date: 12-11-15 * Time: 下午10:14 * Desc: 配置访问接口 */ public interface IConfig { /** * 获取一个字符串配置值,不存在时则返回指定的默认值 * @param key * @param defaultValue * @return */ public String getString(String key, String defaultValue); /** * 获取一个字符串配置值 * @param key * @return */ public String getString(String key); /** * 获取一个32位整数的配置值 * @param key * @param defaultValue * @return */ public int getInt32(String key, int defaultValue); /** * 获取一个双精度浮点数的配置值 * @param key * @param defaultValue * @return */ public double getDouble(String key, double defaultValue); }
/* * Copyright (c) 2012 Hex */ package com.pccc.brms.config; import java.io.*; import java.util.Properties; /** * User: Hex * Date: 12-11-15 * Time: 下午10:16 * Desc: 使用Properties实现IConfig的类 */ public class PropertyConfig implements IConfig { /** * 存放 key - value属性 */ private Properties properties; /** * 构造方法 */ public PropertyConfig() { this.properties = new Properties(); } /** * 构造方法 */ public PropertyConfig(Properties properties) { this.properties = properties; } /** * 加载 * * @param propFilePath * @throws IOException */ public void load(String propFilePath) throws IOException { //先清理 this.properties.clear(); //实例化文件 File file = new File(propFilePath); if (!file.exists()) { throw new IOException("没有找到配置文件:" + propFilePath); } //加载 InputStream is = null; try { is = new BufferedInputStream(new FileInputStream(file)); properties.load(is); } catch (IOException e) { throw new IOException("加载配置文件时出错:" + propFilePath); } finally { is.close(); } } public int size() { return this.properties.size(); } public void save(String propertyFile) throws IOException { FileOutputStream outputStream = null; try { File file = new File(propertyFile); if (!file.exists()) { file.createNewFile(); } outputStream = new FileOutputStream(propertyFile); this.properties.store(outputStream, null); } finally { if (null != outputStream) { outputStream.close(); } } } @Override public String getString(String key, String defaultValue) { return this.properties.getProperty(key, defaultValue); } @Override public String getString(String key) { return this.properties.getProperty(key); } @Override public int getInt32(String key, int defaultValue) { if (!this.properties.containsKey(key)) { return defaultValue; } String value = this.getString(key); return Integer.parseInt(value); } public void setInt32(String key, int value) { this.properties.setProperty(key, String.valueOf(value)); } public void setString(String key, String value) { this.properties.setProperty(key, value); } @Override public double getDouble(String key, double defaultValue) { if (!this.properties.containsKey(key)) { return defaultValue; } String value = this.getString(key); return Double.parseDouble(value); } /************************ gets and sets *****************************/ public Properties getProperties() { return this.properties; } }
/* * Copyright (c) 2012 Hex */ package com.pccc.brms; import com.pccc.brms.config.AppConfig; import com.pccc.brms.config.IAppConfig; import com.pccc.brms.service.*; import com.pccc.brms.utils.LoggerFactory; import com.pccc.brms.utils.RuleMessages; import com.pccc.brms.utils.StringUtils; import jsr166y.ForkJoinPool; import jsr166y.RecursiveAction; import java.io.File; import java.io.IOException; import java.util.Scanner; import org.apache.log4j.Logger; /** * User: Hex Date: 12-11-7 Time: 下午3:09 Desc: 实现基本功能的应用抽象类 */ public abstract class App implements IApp { /** * 日志记录类 */ private Logger logger = LoggerFactory.getLogger(); /** * 配置文件管理类 */ private IAppConfig appConfig; /** * 命令行action */ private CommandLineLoopAction cmdLoopAction; /** * 线程池类 */ private ForkJoinPool pool; /** * 服务提供器 */ protected IServiceProvider serviceProvider; /** * 单例类 */ private static App CURRENT_APP; /** * 规则会话 */ protected IRuleSession ruleSession; /** * 运行模块名称 */ private String model = ""; /** * 构造方法 */ public App() { App.CURRENT_APP = this; this.pool = new ForkJoinPool(1); this.serviceProvider = new ServiceProvider(); } /** * 开始运行服务 * * @param args * 模块名 */ public void start(String[] args) { // 读取模块名 if (args != null) { if (args.length > 0) { this.model = args[0]; if("".equals(this.model)){ //ExportStatusService.RESEND_EXPORT_TASK_PARMS=args; } } } logger.info("统一沟通规则管理子系统启动......"); logger.info("args:" + StringUtils.join(args, "\t")); // 系统运行目录 String userDir = System.getProperty("user.dir"); logger.info("user.dir:" + userDir); // 加载全局配置 try { this.appConfig = new AppConfig(this.getGlobalPropertyFile()); logger.info("加载全局配置文件完成:" + this.appConfig.getPropFilePath()); } catch (IOException e) { logger.error("加载配置文件出错:" + this.getGlobalPropertyFile(), e); return; } // 初始化服务 this.initSrvs(); // 命令行循环,在独立的线程中执行 cmdLoopAction = new CommandLineLoopAction(this); pool.submit(cmdLoopAction); // 启动之后的操作 this.afterAppStart(); } protected abstract void initSrvs(); /** * 全局配置文件名 * * @return */ protected String getGlobalPropertyFile() { return "./conf/global.properties"; } /** * 初始化规则引擎 * * @param ruleService */ protected void initRuleSrv(IRuleService ruleService) { String drlFileName = this.getAppConfig().getConfig().getString("ruleFileName", "");// ruleFileName=./conf/rules/rule_$model.drl if (drlFileName == null || "".equals(drlFileName)) { this.dumpAppError("错误信息:没有找到规则文件相关的配置,程序退出!"); } drlFileName = drlFileName.replace("$model", this.getModel()); File ruleFile = new File(drlFileName); if (!ruleFile.exists()) { this.dumpAppError("错误信息:没有找到规则文件配置,程序退出!"); } ruleService.initRuleEngine(ruleFile); this.ruleSession = ruleService.newRuleSession(); } protected void dumpAppError(String msg) { this.logger.error(msg); System.exit(-1); } protected void dumpAppInfo(String msg) { this.logger.info(msg); } /** * 规则程序启动之后的操作,开始与规则引擎会话 */ protected void afterAppStart() { if (this.ruleSession != null) { startRuleSession(); } } /** * 启动规则会话 * */ private void startRuleSession() { for (IService svc : this.serviceProvider.getServices()) { this.ruleSession.insert(svc); } this.ruleSession.sendAppMessage(this.getName(), RuleMessages.LOAD, "", null); this.setBatchImportStatusModel(); try { this.ruleSession.fireUnitHalt(); } catch (Exception ex) { LoggerFactory.getLoggingService().fatal("错误信息:规则引擎发生重要错误,", ex); this.shutdown(); } } /** * 设置批量导入状态 */ private void setBatchImportStatusModel() { this.ruleSession.insert(this.serviceProvider.getService(IBatchImportStatusService.class).getBatchImportStatus()); } /** * ****************************************** * 内部类,用于接收命令行 * ****************************************** * * @author caikang * */ private class CommandLineLoopAction extends RecursiveAction { private static final long serialVersionUID = -50122496131577152L; private App app; CommandLineLoopAction(App app) { this.app = app; } @Override protected void compute() { app.runMessageLoop(); } } /** * 命令行消息循环 */ protected void runMessageLoop() { Scanner scanner = new Scanner(System.in); String line = scanner.nextLine(); while(true) { line = scanner.nextLine();//挂起 if("".equalsIgnoreCase(line)) { continue; } if("exit".equalsIgnoreCase(line)) { this.shutdown(); logger.info("收到退出命令,规则管理子系统进程退出......"); break; } this.processCommandLine(line); } } @Override public void shutdown() { if (null != this.cmdLoopAction) { this.cmdLoopAction.cancel(true); this.cmdLoopAction = null; } this.onShutdown(); for (IService svc : this.serviceProvider.getServices()) { svc.shutdown(); } this.serviceProvider.clear(); } protected void onShutdown() { if (null != this.ruleSession) { ruleSession.halt(); ruleSession.dispose(); ruleSession = null; } } /** * 处理命令行 * * @param line */ protected void processCommandLine(String line) { this.logger.info("无法解析未知的命令:" + line); } /** ******************** gets and sets *********************** */ @Override public IAppConfig getAppConfig() { return this.appConfig; } public static IApp getCurrent() { return App.CURRENT_APP; } @Override public IRuleSession getRuleSession() { return ruleSession; } @Override public IServiceProvider getServiceProvider() { return serviceProvider; } public String getModel() { return model; } public void setModel(String model) { this.model = model; } }
package com.pccc.brms; import com.pccc.brms.config.IAppConfig; import com.pccc.brms.service.IRuleSession; import com.pccc.brms.service.IServiceProvider; /** * User: Hex * Date: 12-11-16 * Time: 上午12:01 * Desc: 应用程序基础接口 */ public interface IApp { /** * 获得应用配置 * @return */ IAppConfig getAppConfig(); /** * 获得当前应用默认的RuleSession * @return */ IRuleSession getRuleSession(); /** * 获得当前应用名称 * @return */ String getName(); /** * 终止应用程序 */ void shutdown(); /** * 获得当前应用的服务提供器 * @return */ IServiceProvider getServiceProvider(); }
package com.pccc.brms; import com.pccc.brms.config.IAppConfig; import com.pccc.brms.service.IRuleSession; import com.pccc.brms.service.IServiceProvider; /** * User: Hex * Date: 12-11-16 * Time: 上午12:01 * Desc: 应用程序基础接口 */ public interface IApp { /** * 获得应用配置 * @return */ IAppConfig getAppConfig(); /** * 获得当前应用默认的RuleSession * @return */ IRuleSession getRuleSession(); /** * 获得当前应用名称 * @return */ String getName(); /** * 终止应用程序 */ void shutdown(); /** * 获得当前应用的服务提供器 * @return */ IServiceProvider getServiceProvider(); }
/* * Copyright (c) 2012 Hex */ package com.pccc.brms.action; import com.pccc.brms.App; import com.pccc.brms.config.PropertyConfig; import java.io.*; import java.util.Properties; /** * User: Hex * Date: 12-11-8 * Time: 上午11:33 * Desc: 用来构造模块Action实例的类 */ public final class ActionBuilder { /** * 根据 Action的Class Name 及配置参数(例如配置文件)构造Action的实例 * @param actionType * @param propertyFile * @param properties * @return * @throws ClassNotFoundException * @throws IllegalAccessException * @throws InstantiationException * @throws IOException */ public static Action buildAction(String actionType, String propertyFile, Properties properties) throws ClassNotFoundException, IllegalAccessException, InstantiationException, IOException { //实例化模块的Action Action action = ActionBuilder.newInstance(actionType); action.setApp(App.getCurrent()); Properties props = new Properties(); if (null != propertyFile && !propertyFile.trim().isEmpty()) { //设置了配置文件时,从配置文件读取配置 String[] propFiles=propertyFile.trim().split(","); for (int i= 0;i< propFiles.length; i++){ PropertyConfig config=new PropertyConfig(); config.load(propFiles[i]); Properties p = config.getProperties(); //合并配置 props.putAll(p); } } props.putAll(properties); action.setProperties(props); return action; } /** * 根据Action的Class Name构造Action的实例newInstance() * * @param actionType * @return * @throws ClassNotFoundException * @throws IllegalAccessException * @throws InstantiationException */ public static Action newInstance(String actionType) throws ClassNotFoundException, IllegalAccessException, InstantiationException { Object o = Class.forName(actionType).newInstance(); Action action = Action.class.cast(o); return action; } }
/* * Copyright (c) 2012 Hex */ package com.pccc.brms.bootstrap; import com.pccc.brms.bcl.export.client.ESBClient; import com.pccc.brms.App; import com.pccc.brms.bcl.filter.service.BatchFilterStatusService; import com.pccc.brms.bcl.input.service.ImportStatusService; import com.pccc.brms.bcl.util.SysLogService; import com.pccc.brms.service.IRuleService; import com.pccc.brms.service.RuleService; import com.pccc.brms.service.ScheduleService; import com.pccc.brms.service.ThreadPoolService; /** * User: Hex * Date: 12-11-7 * Time: 下午3:08 * Desc: */ public class MainApp extends App { // protected static ILoggingService logger = LoggerFactory.getLoggingService(); public static void main(String[] args){ //启动服务 MainApp app = new MainApp(); app.start(args); } /** * 初始化所有服务 */ protected void initSrvs() { //初始化rule服务 IRuleService ruleSrv = new RuleService(); this.initRuleSrv(ruleSrv); this.serviceProvider.add(ruleSrv); this.dumpAppInfo("规则服务加载完成......"); //初始化线程池服务 this.serviceProvider.add(new ThreadPoolService()); this.dumpAppInfo("线程池服务加载完成......"); //初始化定时器服务 this.serviceProvider.add(new ScheduleService()); this.dumpAppInfo("定时器服务加载完成......"); //初始化导入状态服务 this.serviceProvider.add(new ImportStatusService()); this.dumpAppInfo("数据导入状态控制服务加载完成......"); //初始化批量过滤服务 this.serviceProvider.add(new BatchFilterStatusService()); this.dumpAppInfo("数据过滤状态控制服务加载完成......"); //初始化系统日志服务 this.serviceProvider.add(new SysLogService());//TODO 目前不使用这个service,直接使用SysLogDao this.dumpAppInfo("数据库日志记录服务加载完成......"); //初始化ESB CLIENT上下文 ESBClient.initEsbContext(); this.dumpAppInfo("ESB客户端服务加载完成......"); } @Override public String getName() { return "main"; } } //测试ctrl-m sh脚本会不会挂起 //for(int i=0; i<300; i++) { //System.out.println("sleep " + i + " second."); //logger.info("sleep " + i + " second."); //try { // Thread.sleep(1*60*1000); //} catch (InterruptedException e) { // e.printStackTrace(); //} //} //System.exit(0);
时间: 2024-10-11 06:02:45