异步线程池的实现(一)-------具体实现方法

本篇是这个内容的第一篇,主要是写:遇到的问题,和自己摸索实现的方法。后面还会有一篇是总结性地写线程池的相关内容(偏理论的)。

一、背景介绍

朋友的项目开发到一定程度之后,又遇到了一些问题:在某些流程中的一些节点,由于是串联执行的。上一步要等下一步执行完毕;或者提交数据之后要等待后台其他系统处理完成之后,才能返回结果。这样就会导致,请求发起方不得不一直等待结果,用户体验很不好;从项目优化来说,模块与模块之间构成了强耦合,这也是不利于以后扩展的,更不用说访问量上来之后,肯定会抓瞎的问题。所以,我就着手开始,利用异步线程池来解决这个问题。

刚开始的时候,我准备只是在节点处另外起线程去执行异步操作。但是,考虑到以后的扩展,同时利用“池化”技术,更加高效地重复利用线程,节省资源。在这里就选定了,使用线程池的方法。

二、实现步骤

实现总共分为四步:

第一步,在启动服务的时候初始化线程池;

第二步,建立有队列的线程池;

第三步,将业务逻辑方法与线程池联系起来;

第四步,调整原有代码逻辑结构,将可以异步的操作放入第三步的业务逻辑方法,并将请求放入线程池的队列中,等待执行。

三、具体实现

首先,第一步我们在web项目的起源之处web.xml中加入这么一行

1 <listener>
2
3                    <listener-class>com.jptec.kevin.thread.listener.InitThreadPoolListener</listener-class>
4
5 </listener>

这里的路径实际上就是,在启动项目之后,会加载的初始化函数。这个函数主要的作用就是:将线程池启动起来。实现代码如下:

 1 public class InitThreadPoolListener implements ServletContextListener {
 2
 3     @Override
 4     public void contextInitialized(ServletContextEvent sce) {
 5
 6         new TestThreadPool().runThread();
 7     }
 8
 9     @Override
10     public void contextDestroyed(ServletContextEvent sce) {
11     }
12
13 }

好了,第一步就算完工了。

然后,我们开始第二步,建立有队列的线程池(这里有很多,理论上的内容,会放在第二篇中详细说)。在这里主要是,定义了一个ArrayBlockingQueue队列(先进先出,有限阻塞),使用Executor定义了一个线程池。具体代码如下:

 1 public class TestThreadPool {
 2
 3     protected final static Logger log = LoggerFactory.getLogger(TestThreadPool.class);
 4
 5     // 线程休眠时间(秒)
 6     // 存放需要发送的信息
 7     public static BlockingQueue<Runnable> addressBqueue = new ArrayBlockingQueue<Runnable>(
 8                 10000);
 9
10     public static final ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 200, 25, TimeUnit.SECONDS,
11             addressBqueue);
12
13     public TestThreadPool() {
14     }
15
16
17     public void runThread() {
18
19         try {
20             executor.prestartCoreThread();
21             log.info("队列大小:" + executor.getQueue().size());
22
23         } catch (Exception e) {
24             log.error("启动子线程异常", e);
25         }
26
27     }
28
29 }

完成第二步之后,我们继续第三步。我们有了线程池,那么实际代码如何将请求放入其中,并等待执行呢。于是,这里分为两个类,一个是负责业务代码中调用的,负责向队列中插入请求,一个是单个线程的实现类。具体实现如下:

插入请求实现:

 1 /**
 2  * 线程队列
 3  */
 4 public class TestQueue {
 5     protected final static Logger log = LoggerFactory.getLogger(TestQueue.class);
 6
 7     public static boolean put(String userId,String tradeId, String amount, String flag, String term) {
 8             log.debug("添加入队列开始... 额度申请用户tradeId=[{}]", tradeId);
 9             try {
10                 TestThreadPool.executor.execute(new TestThread( tradeId, amount,  flag, term, userId));
11             log.debug("添加入队列结束...");
12         } catch (Exception e) {
13             log.error("添加入队列异常...", e);
14             return false;
15         }
16         return true;
17     }
18
19 }

单个线程实现(第八行的引用在下文细说):

 1 /**
 2  * 发送信息线程处理类
 3  */
 4 public class TestThread implements Runnable {
 5
 6     protected final static Logger log = LoggerFactory.getLogger(TestThread.class);
 7
 8     private TradeService tradeService = (TradeService) SpringHandle.getBean("tradeService");
 9
10     String tradeId;
11     String amount;
12     String flag;
13     String term;
14     String userId;
15
16
17
18     /**
19      * <p>Title: </p>
20      * <p>Description: </p>
21      * @param tradeId
22      * @param amount
23      * @param flag
24      * @param term
25      * @param userId
26      */
27
28     public TestThread(String tradeId, String amount, String flag, String term,
29             String userId) {
30         super();
31         this.tradeId = tradeId;
32         this.amount = amount;
33         this.flag = flag;
34         this.term = term;
35         this.userId = userId;
36     }
37
38     @Override
39     public void run() {
40         log.info("线程开始tradeId={}", tradeId);
41         log.info("线程名:={}", Thread.currentThread().getId());
42         log.info("队列大小:" + TestThreadPool.executor.getPoolSize() + ","
43                 + TestThreadPool.executor.getCompletedTaskCount());
44         putTradeConfirm(userId,tradeId, amount, flag, term);
45         try {
46             Thread.sleep(1000L);
47         } catch (InterruptedException e) {
48             e.printStackTrace();
49         }
50     }
51
52     private void putTradeConfirm(String userId,String tradeId, String amount, String flag, String term) {
53
54         tradeService.getMatchFundInfo(userId,amount, tradeId, flag, term);
55
56     }
57
58 }

这里需要注意的是,我需要获得一个Service的实例来调用具体的方法。但是,注释的方法不起作用,于是在朋友的帮助下,使用了辅助类。具体实现如下:

 1 @Component
 2 public final class SpringHandle implements BeanFactoryPostProcessor {
 3
 4     private static ConfigurableListableBeanFactory beanFactory; // Spring应用上下文环境
 5
 6     public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
 7         SpringHandle.beanFactory = beanFactory;
 8     }
 9
10     /**
11      * 获取对象
12      *
13      * @param name
14      * @return Object 一个以所给名字注册的bean的实例
15      * @throws org.springframework.beans.BeansException
16      *
17      */
18     @SuppressWarnings("unchecked")
19     public static <T> T getBean(String name) throws BeansException {
20         return (T) beanFactory.getBean(name);
21     }
22
23     /**
24      * 获取类型为requiredType的对象
25      *
26      * @param clz
27      * @return
28      * @throws org.springframework.beans.BeansException
29      *
30      */
31     public static <T> T getBean(Class<T> clz) throws BeansException {
32         T result = (T) beanFactory.getBean(clz);
33         return result;
34     }
35
36     /**
37      * 如果BeanFactory包含一个与所给名称匹配的bean定义,则返回true
38      *
39      * @param name
40      * @return boolean
41      */
42     public static boolean containsBean(String name) {
43         return beanFactory.containsBean(name);
44     }
45
46     /**
47      * 判断以给定名字注册的bean定义是一个singleton还是一个prototype。
48      * 如果与给定名字相应的bean定义没有被找到,将会抛出一个异常(NoSuchBeanDefinitionException)
49      *
50      * @param name
51      * @return boolean
52      * @throws org.springframework.beans.factory.NoSuchBeanDefinitionException
53      *
54      */
55     public static boolean isSingleton(String name) throws NoSuchBeanDefinitionException {
56         return beanFactory.isSingleton(name);
57     }
58
59     /**
60      * @param name
61      * @return Class 注册对象的类型
62      * @throws org.springframework.beans.factory.NoSuchBeanDefinitionException
63      *
64      */
65     public static Class<?> getType(String name) throws NoSuchBeanDefinitionException {
66         return beanFactory.getType(name);
67     }
68
69     /**
70      * 如果给定的bean名字在bean定义中有别名,则返回这些别名
71      *
72      * @param name
73      * @return
74      * @throws org.springframework.beans.factory.NoSuchBeanDefinitionException
75      *
76      */
77     public static String[] getAliases(String name) throws NoSuchBeanDefinitionException {
78         return beanFactory.getAliases(name);
79     }
80
81 }

最后,在具体业务逻辑中,调用插入请求的方法,即可。

TradeGetFundInfoQueue.put(userId, tradeId, quota, repaymentType, timeLimit);

四、测试函数

由于在项目中,所以我写了另外一个测试函数(这个测试函数,会在下一篇文章中再次遇到),放在这里。供大家参考:

 1 public class TestThreadPool {
 2
 3     public static BlockingQueue<Runnable> queue = new  ArrayBlockingQueue<Runnable>(
 4             10000);
 5
 6     public static void main(String[] args) {
 7         for (int i = 0; i < 2; i++) {
 8             queue.add(new TestThread("初始化"));
 9         }
10
11         final ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 3, 15, TimeUnit.SECONDS, queue);
12
13         executor.prestartCoreThread();
14
15
16         new Thread(new Runnable() {
17             @Override
18             public void run() {
19                 while (true) {
20                     System.out.println("getActiveCount=" + executor.getActiveCount()
21                                     + ";getKeepAliveTime=" + executor.getKeepAliveTime(TimeUnit.SECONDS)
22                                     + ";getCompletedTaskCount=" + executor.getCompletedTaskCount()
23                                     + ";getCorePoolSize=" + executor.getCorePoolSize()
24                                     + ";getLargestPoolSize=" + executor.getLargestPoolSize()
25                                     + ";getMaximumPoolSize=" + executor.getMaximumPoolSize()
26                                     + ";getPoolSize=" + executor.getPoolSize()
27                                     + ";getTaskCount=" + executor.getTaskCount()
28                                     + ";getQueue().size()=" + executor.getQueue().size()
29                     );
30                     try {
31                         Thread.currentThread().sleep(200L);
32                     } catch (InterruptedException e) {
33                         e.printStackTrace();
34                     }
35                 }
36             }
37         }).start();
38
39         new Thread(new Runnable() {
40             @Override
41             public void run() {
42                 int i = 0;
43                 while (true) {
44                     queue.add(new TestThread("生产者"));
45                     try {
46                         Thread.currentThread().sleep(100L);
47                     } catch (InterruptedException e) {
48                         e.printStackTrace();
49                     }
50                     i++;
51                     if (i > 100) break;
52                 }
53             }
54         }).start();
55     }
56 }
57
58 class TestThread implements Runnable {
59     public static SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
60     private String name;        //创建者
61     private Date addDate;       //添加到队列的日期
62
63     TestThread(String name) {
64         this.name = name;
65         this.addDate = new Date();
66     }
67
68     @Override
69     public void run() {
70         System.out.println(Thread.currentThread().getName() +
71                 ":创建者=" + name + ",创建时间=" + sdf.format(addDate) + ",执行时间=" + sdf.format(new Date()) + ",当前队列大小=" + TestThreadPool.queue.size());
72
73         System.out.println(TestThreadPool.queue.peek());
74         try {
75             Thread.currentThread().sleep(1000L);
76         } catch (InterruptedException e) {
77             e.printStackTrace();
78         }
79     }
80 }

测试的结果大致是这个样子的:

最后,希望这篇文章对你有帮助,感谢朋友的帮助!

时间: 2024-10-03 13:04:01

异步线程池的实现(一)-------具体实现方法的相关文章

使用C++11 开发一个半同步半异步线程池

摘自:<深入应用C++11>第九章 实际中,主要有两种方法处理大量的并发任务,一种是一个请求由系统产生一个相应的处理请求的线程(一对一) 另外一种是系统预先生成一些用于处理请求的进程,当请求的任务来临时,先放入同步队列中,分配一个处理请求的进程去处理任务, 线程处理完任务后还可以重用,不会销毁,而是等待下次任务的到来.(一对多的线程池技术) 线程池技术,能避免大量线程的创建和销毁动作,节省资源,对于多核处理器,由于线程被分派配到多个cpu,会提高并行处理的效率. 线程池技术分为半同步半异步线程

c++11 实现半同步半异步线程池

感受: 随着深入学习,现代c++给我带来越来越多的惊喜- c++真的变强大了. 半同步半异步线程池: 其实很好理解,分为三层 同步层:通过IO复用或者其他多线程多进程等不断的将待处理事件添加到队列中,这个过程是同步进行的. 队列层:所有待处理事件都会放到这里.上一层事件放到这里,下一层从这里获取事件 异步层:事先创建好线程,让瞎猜呢和嗯不断的去处理队列层的任务,上层不关心这些,它只负责把任务放到队列里,所以对上层来说这里是异步的. 看张图: 如果你不熟悉c++11的内容 以下文章仅供参考 c++

异步线程池的使用

合理使用异步线程开发项目能提高一个项目的并发量,减少响应时间.下面就简单介绍一下异步线程池的使用,参考博客:https://blog.csdn.net/hry2015/article/details/67640534 spring 对@Async定义异步任务的方法有3种: 1.最简单的异步调用,返回值为void: 2.带参数的异步调用,异步方法可以传入参数: 3.异常调用返回Future 代码如下: package com.hry.spring.async.annotation; import

(原创)C++半同步半异步线程池2

(原创)C++半同步半异步线程池 c++11 boost技术交流群:296561497,欢迎大家来交流技术. 线程池可以高效的处理任务,线程池中开启多个线程,等待同步队列中的任务到来,任务到来多个线程会抢着执行任务,当到来的任务太多,达到上限时需要等待片刻,任务上限保证内存不会溢出.线程池的效率和cpu核数相关,多核的话效率更高,线程数一般取cpu数量+2比较合适,否则线程过多,线程切换频繁反而会导致效率降低. 线程池有两个活动过程:1.外面不停的往线程池添加任务:2.线程池内部不停的取任务执行

SimpleThreadPool给线程池增加拒绝策略和停止方法

给线程池增加拒绝策略和停止方法 package com.dwz.concurrency.chapter13; import java.util.ArrayList; import java.util.LinkedList; import java.util.List; public class SimpleThreadPool3 { private final int size; private final int queueSize; private final static int DEFA

半同步半异步线程池的实现(C++11)

简介 处理大量并发任务时,一个请求对应一个线程来处理任务,线程的创建和销毁将消耗过多的系统资源,并增加上下文切换代价.线程池技术通过在系统中预先创建一定数量的线程(通常和cpu核数相同),当任务到达时,从线程池中分配一个线程进行处理,线程在处理完任务之后不用销毁,等待重用. 线程池包括半同步半异步和领导者追随者两种实现方式.线程池包括三部分,第一层是同步服务层,它处理来自上层的任务请求.第二层是同步队列层,同步服务层中的任务将添加到队列中.第三层是异步服务层,多个线程同时处理队列中的任务. 先贴

异步线程池

1.创建类实现AsyncTaskExecutor, InitializingBean, DisposableBean接口,重写方法. import lombok.extern.slf4j.Slf4j;import org.springframework.beans.factory.DisposableBean;import org.springframework.beans.factory.InitializingBean;import org.springframework.core.task

Springboot的异步线程池图鉴

1:定义线程池 @EnableAsync @Configuration class TaskPoolConfig { @Bean("taskExecutor") public Executor taskExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(10); executor.setMaxPoolSize(20); executor

[email&#160;protected]异步线程池的配置及应用

示例: 1. 配置 @EnableAsync @Configuration public class TaskExecutorConfiguration { @Autowired private TaskExecutorProperties taskExecutorProperties; @Bean public Executor routeGen() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); execut