TaskUtil多线程与定时任务

package com.taoban.util;

/**

* 执行单次任务或定时任务工具类(用于减少new Thread()和new Timer()的使用)

*/

public class TaskUtil {

private static Log log = LogFactory.getLog(TaskUtil.class);

private static ExecutorService cachedExecutor = null;

private static ScheduledExecutorService scheduledExecutor = null;

private static Map<Runnable, Future<?>> keepRunningTasks = null;

private static Map<Future<?>, Callback> callbackdTasks = null;

static {

cachedExecutor = Executors.newCachedThreadPool(new TaskUtilThreadFactory("cached"));

scheduledExecutor = Executors.newScheduledThreadPool(5, new TaskUtilThreadFactory("scheduled"));

Runtime.getRuntime().addShutdownHook(new Thread() {//线程池自动退出

@Override

public void run() {

cachedExecutor.shutdown();

scheduledExecutor.shutdown();

log.info("TaskUtil executors shutdown.");

}

});

}

/**

* 立即执行任务

*/

public static Future<?> submit(Runnable task) {

return cachedExecutor.submit(task);

}

/**

* 自动保持任务持续运行,每分钟监视一次

*/

public static Future<?> submitKeepRunning(Runnable task){

Future<?> future = submit(task);

checkInitCachedTasks();

synchronized (keepRunningTasks) {

keepRunningTasks.put(task, future);

}

return future;

}

/**

* 延迟执行任务,例如延迟5秒:schedule(task,5,TimeUnit.SECONDS)

*/

public static void schedule(Runnable task, long delay, TimeUnit unit) {

scheduledExecutor.schedule(task, delay, unit);

}

/**

* 定时执行任务一次,比如下午两点:scheduleAt(task, DateUtils.setHours(new Date(), 13))

*/

public static void scheduleAt(Runnable task, Date time) {

long mills = time.getTime() - System.currentTimeMillis();

scheduledExecutor.schedule(task, mills>0 ? mills : 3, TimeUnit.MILLISECONDS);

}

/**

* 定时重复执行任务,比如延迟5秒,每10分钟执行一次:scheduleAtFixRate(task, 5, TimeUnit.MINUTES.toSeconds(10), TimeUnit.SECONDS)

*/

public static void scheduleAtFixtRate(Runnable task, long initialDelay, long delay, TimeUnit unit) {

scheduledExecutor.scheduleWithFixedDelay(task, initialDelay, delay, unit);

}

/**

* 定时重复执行任务,比如下午两点开始,每小时执行一次:scheduleAtFixRate(task, DateUtils.setHours(new Date(), 13), 1, TimeUnit.HOURS)

*/

public static void scheduleAtFixtRate(Runnable task, Date time, long delay, TimeUnit unit) {

long mills = time.getTime() - System.currentTimeMillis();

scheduledExecutor.scheduleWithFixedDelay(task, mills>0 ? mills : 3, unit.toMillis(delay), TimeUnit.MILLISECONDS);

}

/**

* 提交带返回值的任务,支持后续处理(调用者手动处理)

*/

public static <T> Future<T> submit(Callable<T> task) {

return cachedExecutor.submit(task);

}

/**

* 提交带返回值的任务,支持后续处理(自动调用Callback接口)

*/

public static <T> Future<T> submit(Callable<T> task, Callback callback) {

Future<T> future = submit(task);

checkInitCachedTasks();

if(callback != null) {

synchronized (callbackdTasks) {

callbackdTasks.put(future, callback);

}

}

return future;

}

/**

* 提交任务,等待返回值(阻塞调用者)

*/

public static <T> T wait(Callable<T> task) {

Future<T> future = cachedExecutor.submit(task);

try {

return future.get();

} catch (Exception e) {

log.warn(e);

return null;

}

}

private static void checkInitCachedTasks() {

if(keepRunningTasks != null) return;

keepRunningTasks = new HashMap<Runnable, Future<?>>();

callbackdTasks = new HashMap<Future<?>, Callback>();

scheduleAtFixtRate(new CachedTasksMonitor(), 1, 1, TimeUnit.MINUTES);

}

/**

* 监视需要保持运行的任务

*/

static class CachedTasksMonitor implements Runnable {

@Override

public void run() {

if(keepRunningTasks.size() > 0) {

synchronized (keepRunningTasks) {

Map<Runnable, Future<?>> tempTasks = null;

for(Runnable task : keepRunningTasks.keySet()) {

Future<?> future = keepRunningTasks.get(task);

if(future.isDone()) {

future = submit(task);//恢复运行异常结束任务

if(tempTasks == null) tempTasks = new HashMap<Runnable, Future<?>>();

tempTasks.put(task, future);

}

}

if(tempTasks != null && tempTasks.size() > 0) keepRunningTasks.putAll(tempTasks);

}

}

if(callbackdTasks.size() > 0) {

synchronized (callbackdTasks) {

List<Future<?>> callbackedFutures = null;

for(Future<?> future : callbackdTasks.keySet()) {

final Callback callback = callbackdTasks.get(future);

if(future.isDone()) {

try{

final Object result = future.get(5, TimeUnit.SECONDS);

submit(new Runnable() {

@Override

public void run() {//callback可能耗时所以作为独立运行任务,而本监视器需尽快完成工作

callback.handle(result);

}

});

if(callbackedFutures == null) callbackedFutures = new LinkedList<Future<?>>();

callbackedFutures.add(future);

}catch (Exception e) {

log.warn("TaskUtil callbackedTasks warn: ", e);

}

}

}

if(callbackedFutures != null && callbackedFutures.size() > 0) {

for(Future<?> future : callbackedFutures) {

callbackdTasks.remove(future);

}

}

}

}

}

}

/**

* 自定义线程名称Task-idx-name-idx2

*/

static class TaskUtilThreadFactory implements ThreadFactory {

private final static AtomicInteger taskutilThreadNumber = new AtomicInteger(1);

private final AtomicInteger threadNumber = new AtomicInteger(1);

private final String threadNamePrefix;

TaskUtilThreadFactory(String threadNamePrefix){

this.threadNamePrefix = threadNamePrefix;

}

@Override

public Thread newThread(Runnable r) {

Thread t = new Thread(r, String.format("TaskUtil-%d-%s-%d", taskutilThreadNumber.getAndIncrement(), this.threadNamePrefix, threadNumber.getAndIncrement()));

t.setDaemon(false);

t.setPriority(Thread.NORM_PRIORITY);

return t;

}

}

/**

* 等待结果回调接口

*/

public static interface Callback {

void handle(Object result);

}

}

来自为知笔记(Wiz)

TaskUtil多线程与定时任务

时间: 2024-08-10 15:08:54

TaskUtil多线程与定时任务的相关文章

使用spring-boot创建定时任务。同时创建多线程执行定时任务。

1,下载spring-boot的maven工程:http://start.spring.io/  直接自定义工程名称. 2 , 启动类增加注解:@EnableScheduling 具体的业务代码: package com.huike.ftp.main; import java.util.Date;import java.util.concurrent.Executor; import org.apache.logging.log4j.LogManager;import org.apache.lo

Spring Boot 定时任务单线程和多线程

Spring Boot 的定时任务: 第一种:把参数配置到.properties文件中: 代码: package com.accord.task; import java.text.SimpleDateFormat; import java.util.Date; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; /** * 从配置

python定时任务-sched模块

通过sched模块可以实现通过自定义时间,自定义函数,自定义优先级来执行函数. 范例一 1 import time 2 import sched 3 4 schedule = sched.scheduler( time.time,time.sleep) 5 6 def func(string1): 7 print "now excuted func is %s"%string1 8 9 print "start" 10 schedule.enter(2,0,func

python版 定时任务机制

定时任务的原理 服务器执行一个python脚本 这个脚本,循环执行配置的定时任务地址 Python请求地址, 该地址应该返回, 下次再来执行的秒数. 也就是任务的频率 比如任务希望每3秒执行一次, 那么任务结束后,应该返回一个3的数字 python脚本拿到任务返回的数字, 算出下次执行任务的时间. 当时间条件满足是, python脚本会继续访问该任务 不同的任务, 直接修改 init里面的配置就可以了 python脚本如下: #!/usr/bin/env python3 # -*- coding

Spring Boot定时任务运行一段时间后自动关闭的解决办法

用Spring Boot默认支持的 Scheduler来运行定时任务,有时在服务器运行一段时间后会自动关闭.原因:Schedule默认是单线程运行定时任务的,即使是多个不同的定时任务,默认也是单线程运行.当线程挂掉时,定时任务也随之终止. 解决方法: 一.改为多线程执行定时任务: 加一个配置类,实现SchedulingConfigurer接口,重写configureTasks方法即可: import org.springframework.context.annotation.Configura

SpringBoot多线程执行task任务

一.问题描述 Task定时任务默认都是使用单线程执行的,如果定时任务有很多的话,那么可能会导致很多任务无法按时准确执行,示例如下: import java.text.SimpleDateFormat; import java.util.Date; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.scheduling.annotation.Scheduled; import or

java之高并发与多线程

进程和线程的区别和联系 从资源占用,切换效率,通信方式等方面解答 线程具有许多传统进程所具有的特征,故又称为轻型进程(Light—Weight Process)或进程元:而把传统的进程称为重型进程(Heavy—Weight Process),它相当于只有一个线程的任务.在引入了线程的操作系统中,通常一个进程都有若干个线程,至少需要一个线程.下面,我们从调度.并发性. 系统开销.拥有资源等方面,来比较线程与进程. 1.调度 在传统的操作系统中,拥有资源的基本单位和独立调度.分派的基本单位都是进程.

SpringBoot:实现定时任务

一.定时任务实现的几种方式: Timer 这是java自带的java.util.Timer类,这个类允许你调度一个java.util.TimerTask任务.使用这种方式可以让你的程序按照某一个频度执行,但不能在指定时间运行.一般用的较少. ScheduledExecutorService 也jdk自带的一个类:是基于线程池设计的定时任务类,每个调度任务都会分配到线程池中的一个线程去执行,也就是说,任务是并发执行,互不影响. Spring Task Spring3.0以后自带的task,可以将它

一文搞懂springboot定时任务

Introduction 在springboot中自带了一个轻量级的调度系统.如果我们希望在特定的时间或者以特定的时间间隔完成某些任务,那么它完全能够满足需求,不需要再额外引入像Quartz这种略显沉重的调度框架.下面我们就来介绍springboot中@scheduled 注解的用法. 环境:springboot 2.2.2 常用简单定时任务 首先,为了使用springboot中的定时任务,需要在springboot应用中加入 @EnableScheduling 注解.该注解开启对定时任务的支持