javaWeb 使用线程池+队列解决"订单并发"问题

解决方式:使用线程池+队列

项目基于Spring,如果不用spring需要自己把

ThreadPoolManager.java

改成单例模式

1.写一个Controller(Spring mvc)

/**
 * @author HeyS1
 * @date 2016/12/1
 * @description
 */
@Controller
public class ThreadPoolController {
    @Autowired
    ThreadPoolManager tpm;

    @RequestMapping("/pool")
    public
    @ResponseBody
    Object test() {
        for (int i = 0; i < 500; i++) {
            //模拟并发500条记录
            tpm.processOrders(Integer.toString(i));
        }

        return "ok";
    }
}

2.线程池管理

/**
 * @author HeyS1
 * @date 2016/12/1
 * @description threadPool订单线程池, 处理订单
 * scheduler 调度线程池 用于处理订单线程池由于超出线程范围和队列容量而不能处理的订单
 */
@Component
public class ThreadPoolManager implements BeanFactoryAware {
    private static Logger log = LoggerFactory.getLogger(ThreadPoolManager.class);
    private BeanFactory factory;//用于从IOC里取对象
    // 线程池维护线程的最少数量
    private final static int CORE_POOL_SIZE = 2;
    // 线程池维护线程的最大数量
    private final static int MAX_POOL_SIZE = 10;
    // 线程池维护线程所允许的空闲时间
    private final static int KEEP_ALIVE_TIME = 0;
    // 线程池所使用的缓冲队列大小
    private final static int WORK_QUEUE_SIZE = 50;
    // 消息缓冲队列
    Queue<Object> msgQueue = new LinkedList<Object>();

    //用于储存在队列中的订单,防止重复提交
    Map<String, Object> cacheMap = new ConcurrentHashMap<>();

    //由于超出线程范围和队列容量而使执行被阻塞时所使用的处理程序
    final RejectedExecutionHandler handler = new RejectedExecutionHandler() {
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            //System.out.println("太忙了,把该订单交给调度线程池逐一处理" + ((DBThread) r).getMsg());
            msgQueue.offer(((DBThread) r).getMsg());
        }
    };

    // 订单线程池
    final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
            CORE_POOL_SIZE, MAX_POOL_SIZE, KEEP_ALIVE_TIME,
            TimeUnit.SECONDS, new ArrayBlockingQueue(WORK_QUEUE_SIZE), this.handler);

    // 调度线程池。此线程池支持定时以及周期性执行任务的需求。
    final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(5);

    // 访问消息缓存的调度线程,每秒执行一次
    // 查看是否有待定请求,如果有,则创建一个新的AccessDBThread,并添加到线程池中
    final ScheduledFuture taskHandler = scheduler.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            if (!msgQueue.isEmpty()) {
                if (threadPool.getQueue().size() < WORK_QUEUE_SIZE) {
                    System.out.print("调度:");
                    String orderId = (String) msgQueue.poll();
                    DBThread accessDBThread = (DBThread) factory.getBean("dBThread");
                    accessDBThread.setMsg(orderId);
                    threadPool.execute(accessDBThread);
                }
                // while (msgQueue.peek() != null) {
                // }
            }
        }
    }, 0, 1, TimeUnit.SECONDS);

    //终止订单线程池+调度线程池
    public void shutdown() {
        //true表示如果定时任务在执行,立即中止,false则等待任务结束后再停止
        System.out.println(taskHandler.cancel(false));
        scheduler.shutdown();
        threadPool.shutdown();
    }

    public Queue<Object> getMsgQueue() {
        return msgQueue;
    }

    //将任务加入订单线程池
    public void processOrders(String orderId) {
        if (cacheMap.get(orderId) == null) {
            cacheMap.put(orderId,new Object());
            DBThread accessDBThread = (DBThread) factory.getBean("dBThread");
            accessDBThread.setMsg(orderId);
            threadPool.execute(accessDBThread);
        }
    }

    //BeanFactoryAware
    @Override
    public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
        factory = beanFactory;
    }
}

3.线程池中工作的线程

//线程池中工作的线程
@Component
@Scope("prototype")//spring 多例
public class DBThread implements Runnable {
    private String msg;
    private Logger log = LoggerFactory.getLogger(DBThread.class);

    @Autowired
    SystemLogService systemLogService;

    @Override
    public void run() {
        //模拟在数据库插入数据
        Systemlog systemlog = new Systemlog();
        systemlog.setTime(new Date());
        systemlog.setLogdescribe(msg);
        //systemLogService.insert(systemlog);
        log.info("insert->" + msg);
    }

    public String getMsg() {
        return msg;
    }

    public void setMsg(String msg) {
        this.msg = msg;
    }
}

浏览器输入地址127.0.0.1/pool

几秒后关闭tomcat。

模拟500条数据,订单线程池处理了117条。调度线程池处理5条

关闭tomcat,后还有378条未处理(这里的实现需要用到spring监听器)。加起来一共500

OK。完毕

spring监听器,监听tomcat关闭事件:

public class MyApplicationListener implements ApplicationListener<ApplicationEvent> {

    @Autowired
    ThreadPoolManager threadPoolManager;

    @Override
    public void onApplicationEvent(ApplicationEvent event) {

        if (event instanceof ContextClosedEvent) {
            XmlWebApplicationContext x = (XmlWebApplicationContext) event.getSource();
            //防止执行两次。root application context 没有parent,他就是老大
            if (x.getDisplayName().equals("Root WebApplicationContext")) {
                threadPoolManager.shutdown();
                Queue q = threadPoolManager.getMsgQueue();
                System.out.println("关闭了服务器,还有未处理的信息条数:" + q.size());
            }

        } else if (event instanceof ContextRefreshedEvent) {
//            System.out.println(event.getClass().getSimpleName()+" 事件已发生!");
        } else if (event instanceof ContextStartedEvent) {
//            System.out.println(event.getClass().getSimpleName()+" 事件已发生!");
        } else if (event instanceof ContextStoppedEvent) {
//            System.out.println(event.getClass().getSimpleName()+" 事件已发生!");
        } else {
//            System.out.println("有其它事件发生:"+event.getClass().getName());
        }
    }
}

spring配置一下

<bean id="springStartListener" class="com.temp.MyApplicationListener"></bean>

原文地址:https://www.cnblogs.com/vianzhang/p/8920445.html

时间: 2024-11-13 15:49:56

javaWeb 使用线程池+队列解决"订单并发"问题的相关文章

线程池队列区别

线程池的三种队列区别:SynchronousQueue.LinkedBlockingQueue 和ArrayBlockingQueue https://blog.csdn.net/qq_26881739/article/details/80983495 1.SynchronousQueue(CachedThreadPool) 类似交警只是指挥车辆,并不管理车辆 SynchronousQueue没有容量,是无缓冲等待队列,是一个不存储元素的阻塞队列,会直接将任务交给消费者,必须等队列中的添加元素被

线程池队列饱和策略

1.当一个有限队列充满后,线程池的饱和策略开始起作用. 2.ThreadPoolExecutor的饱和策略通过调用setRejectedExecutionHandler来修改.不同的饱和策略如下: 1)AbortPolicy:中止,executor抛出未检查RejectedExecutionException,调用者捕获这个异常,然后自己编写能满足自己需求的处理代码. 2)DiscardRunsPolicy:遗弃最旧的,选择丢弃的任务,是本应接下来就执行的任务. 3)DiscardPolicy:

Redis分布式队列解决文件并发的问题

1.首先将捕获的异常写到Redis的队列中 1 public class MyExceptionAttribute : HandleErrorAttribute 2 { 3 public static IRedisClientsManager clientManager = new PooledRedisClientManager(new string[] { "127.0.0.1:6379", "192.168.1.2:6379" }); 4 public sta

深入浅出Java并发编程(一):线程池的使用

我们在使用线程的时候就去建立一个线程,这样实现起来非常简便,但是会有一个问题: 如果并发的线程数量很多,并且每个线程都是执行一个时间段很短的任务就结束了,这样频繁创建线程就会大大降低系统的效率. 那么如何解决此类问题呢? 在Java中可以通过线程池来解决这样的效果.前面有文章简单提到过线程池的使用.今天我们来详细讲解下Java的线程池,由易而难,循序渐进,步骤如下: 首先我们从最核心的ThreadPoolExecutor类的方法讲起 然后讲述它的实现原理 接着给出了相应的示例 最后我们讨论下如何

并发编程 13—— 线程池 之 整体架构

Java并发编程实践 目录 并发编程 01—— ConcurrentHashMap 并发编程 02—— 阻塞队列和生产者-消费者模式 并发编程 03—— 闭锁CountDownLatch 与 栅栏CyclicBarrier 并发编程 04—— Callable和Future 并发编程 05—— CompletionService : Executor 和 BlockingQueue 并发编程 06—— 任务取消 并发编程 07—— 任务取消 之 中断 并发编程 08—— 任务取消 之 停止基于线

【Java并发编程】21、线程池ThreadPoolExecutor源码解析

一.前言 JUC这部分还有线程池这一块没有分析,需要抓紧时间分析,下面开始ThreadPoolExecutor,其是线程池的基础,分析完了这个类会简化之后的分析,线程池可以解决两个不同问题:由于减少了每个任务调用的开销,它们通常可以在执行大量异步任务时提供增强的性能,并且还可以提供绑定和管理资源(包括执行任务集时使用的线程)的方法.下面开始分析. 二.ThreadPoolExecutor数据结构 在ThreadPoolExecutor的内部,主要由BlockingQueue和AbstractQu

Java并发:一篇搞定线程池

原文地址:https://www.nowcoder.com/discuss/152050?type=0&order=0&pos=6&page=0 本文是在原文的基础+理解,想要系统学习,请看原文地址. 线程池介绍 1.1 线程池的概念 线程池(thread pool): 一种线程使用模式.线程的创建销毁是十分消耗资源的(线程创建消耗内存.线程上下文切换从消耗CPU资源).使用线程池可以更加充分的协调应用CPU.内存.网络.I/O等系统资源.在程序启动首先创建线程,在程序启动后可以将

Java 并发编程之线程池的使用 (三)

线程工厂 每当线程池需要创建一个线程时,都是通过线程工厂方法来完善的.默认的线程工厂方法将创建一个新的.非守护的线程,并且不包含特殊的配置信息,通过指定一个线程工厂方法,可以线程池的配置信息. 需要定制线程工厂方法的情景 : 需要为线程池里面的线程指定 个UncaughtExceptionHandler 实例化一个定制的Thread类执行调试信息的记录 需要修改线程的优先级或者守护线程的状态(这建设使用这两个功能,线程优先级会增加平台依赖性,并且导致活跃性问题,在大多数并发应用程序中,都可以使用

并发编程 15—— 线程池 之 原理二

Java并发编程实践 目录 并发编程 01—— ConcurrentHashMap 并发编程 02—— 阻塞队列和生产者-消费者模式 并发编程 03—— 闭锁CountDownLatch 与 栅栏CyclicBarrier 并发编程 04—— Callable和Future 并发编程 05—— CompletionService : Executor 和 BlockingQueue 并发编程 06—— 任务取消 并发编程 07—— 任务取消 之 中断 并发编程 08—— 任务取消 之 停止基于线