解决方式:使用线程池+队列
项目基于Spring,如果不用spring需要自己把
ThreadPoolManager.java
改成单例模式
1.写一个Controller(Spring mvc)
/** * @author HeyS1 * @date 2016/12/1 * @description */ @Controller public class ThreadPoolController { @Autowired ThreadPoolManager tpm; @RequestMapping("/pool") public @ResponseBody Object test() { for (int i = 0; i < 500; i++) { //模拟并发500条记录 tpm.processOrders(Integer.toString(i)); } return "ok"; } }
2.线程池管理
/** * @author HeyS1 * @date 2016/12/1 * @description threadPool订单线程池, 处理订单 * scheduler 调度线程池 用于处理订单线程池由于超出线程范围和队列容量而不能处理的订单 */ @Component public class ThreadPoolManager implements BeanFactoryAware { private static Logger log = LoggerFactory.getLogger(ThreadPoolManager.class); private BeanFactory factory;//用于从IOC里取对象 // 线程池维护线程的最少数量 private final static int CORE_POOL_SIZE = 2; // 线程池维护线程的最大数量 private final static int MAX_POOL_SIZE = 10; // 线程池维护线程所允许的空闲时间 private final static int KEEP_ALIVE_TIME = 0; // 线程池所使用的缓冲队列大小 private final static int WORK_QUEUE_SIZE = 50; // 消息缓冲队列 Queue<Object> msgQueue = new LinkedList<Object>(); //用于储存在队列中的订单,防止重复提交 Map<String, Object> cacheMap = new ConcurrentHashMap<>(); //由于超出线程范围和队列容量而使执行被阻塞时所使用的处理程序 final RejectedExecutionHandler handler = new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { //System.out.println("太忙了,把该订单交给调度线程池逐一处理" + ((DBThread) r).getMsg()); msgQueue.offer(((DBThread) r).getMsg()); } }; // 订单线程池 final ThreadPoolExecutor threadPool = new ThreadPoolExecutor( CORE_POOL_SIZE, MAX_POOL_SIZE, KEEP_ALIVE_TIME, TimeUnit.SECONDS, new ArrayBlockingQueue(WORK_QUEUE_SIZE), this.handler); // 调度线程池。此线程池支持定时以及周期性执行任务的需求。 final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(5); // 访问消息缓存的调度线程,每秒执行一次 // 查看是否有待定请求,如果有,则创建一个新的AccessDBThread,并添加到线程池中 final ScheduledFuture taskHandler = scheduler.scheduleAtFixedRate(new Runnable() { @Override public void run() { if (!msgQueue.isEmpty()) { if (threadPool.getQueue().size() < WORK_QUEUE_SIZE) { System.out.print("调度:"); String orderId = (String) msgQueue.poll(); DBThread accessDBThread = (DBThread) factory.getBean("dBThread"); accessDBThread.setMsg(orderId); threadPool.execute(accessDBThread); } // while (msgQueue.peek() != null) { // } } } }, 0, 1, TimeUnit.SECONDS); //终止订单线程池+调度线程池 public void shutdown() { //true表示如果定时任务在执行,立即中止,false则等待任务结束后再停止 System.out.println(taskHandler.cancel(false)); scheduler.shutdown(); threadPool.shutdown(); } public Queue<Object> getMsgQueue() { return msgQueue; } //将任务加入订单线程池 public void processOrders(String orderId) { if (cacheMap.get(orderId) == null) { cacheMap.put(orderId,new Object()); DBThread accessDBThread = (DBThread) factory.getBean("dBThread"); accessDBThread.setMsg(orderId); threadPool.execute(accessDBThread); } } //BeanFactoryAware @Override public void setBeanFactory(BeanFactory beanFactory) throws BeansException { factory = beanFactory; } }
3.线程池中工作的线程
//线程池中工作的线程 @Component @Scope("prototype")//spring 多例 public class DBThread implements Runnable { private String msg; private Logger log = LoggerFactory.getLogger(DBThread.class); @Autowired SystemLogService systemLogService; @Override public void run() { //模拟在数据库插入数据 Systemlog systemlog = new Systemlog(); systemlog.setTime(new Date()); systemlog.setLogdescribe(msg); //systemLogService.insert(systemlog); log.info("insert->" + msg); } public String getMsg() { return msg; } public void setMsg(String msg) { this.msg = msg; } }
浏览器输入地址127.0.0.1/pool
几秒后关闭tomcat。
模拟500条数据,订单线程池处理了117条。调度线程池处理5条
关闭tomcat,后还有378条未处理(这里的实现需要用到spring监听器)。加起来一共500
OK。完毕
spring监听器,监听tomcat关闭事件:
public class MyApplicationListener implements ApplicationListener<ApplicationEvent> { @Autowired ThreadPoolManager threadPoolManager; @Override public void onApplicationEvent(ApplicationEvent event) { if (event instanceof ContextClosedEvent) { XmlWebApplicationContext x = (XmlWebApplicationContext) event.getSource(); //防止执行两次。root application context 没有parent,他就是老大 if (x.getDisplayName().equals("Root WebApplicationContext")) { threadPoolManager.shutdown(); Queue q = threadPoolManager.getMsgQueue(); System.out.println("关闭了服务器,还有未处理的信息条数:" + q.size()); } } else if (event instanceof ContextRefreshedEvent) { // System.out.println(event.getClass().getSimpleName()+" 事件已发生!"); } else if (event instanceof ContextStartedEvent) { // System.out.println(event.getClass().getSimpleName()+" 事件已发生!"); } else if (event instanceof ContextStoppedEvent) { // System.out.println(event.getClass().getSimpleName()+" 事件已发生!"); } else { // System.out.println("有其它事件发生:"+event.getClass().getName()); } } }
spring配置一下
<bean id="springStartListener" class="com.temp.MyApplicationListener"></bean>
原文地址:https://www.cnblogs.com/vianzhang/p/8920445.html
时间: 2024-11-13 15:49:56