SpringBoot中并发定时任务的实现、动态定时任务的实现(看这一篇就够了)

# 一、在JAVA开发领域,目前可以通过以下几种方式进行定时任务

1、单机部署模式

  • Timer:jdk中自带的一个定时调度类,可以简单的实现按某一频度进行任务执行。提供的功能比较单一,无法实现复杂的调度任务。
  • ScheduledExecutorService:也是jdk自带的一个基于线程池设计的定时任务类。其每个调度任务都会分配到线程池中的一个线程执行,所以其任务是并发执行的,互不影响。
  • Spring Task:Spring提供的一个任务调度工具,支持注解和配置文件形式,支持Cron表达式,使用简单但功能强大。
  • Quartz:一款功能强大的任务调度器,可以实现较为复杂的调度功能,如每月一号执行、每天凌晨执行、每周五执行等等,还支持分布式调度,就是配置稍显复杂。

    2、分布式集群模式(不多介绍,简单提一下)

    问题:

    I、如何解决定时任务的多次执行?
    II、如何解决任务的单点问题,实现任务的故障转移?

    问题I的简单思考:

    1、固定执行定时任务的机器(可以有效避免多次执行的情况 ,缺点就是单点故障问题)。
    2、借助Redis的过期机制和分布式锁。
    3、借助mysql的锁机制等。

    成熟的解决方案:

    1、Quartz:可以去看看这篇文章Quartz分布式
    2、elastic-job:(https://github.com/elasticjob/elastic-job-lite)当当开发的弹性分布式任务调度系统,采用zookeeper实现分布式协调,实现任务高可用以及分片。
    3、xxl-job:(https://github.com/xuxueli/xxl-job)是大众点评员发布的分布式任务调度平台,是一个轻量级分布式任务调度框架。
    4、saturn:(https://github.com/vipshop/Saturn) 是唯品会提供一个分布式、容错和高可用的作业调度服务框架。

二、SpringTask实现定时任务(这里是基于springboot)

1、简单的定时任务实现

使用方式:

使用@EnableScheduling注解开启对定时任务的支持。
使用@Scheduled 注解即可,基于corn、fixedRate、fixedDelay等一些定时策略来实现定时任务。

使用缺点:

1、多个定时任务使用的是同一个调度线程,所以任务是阻塞执行的,执行效率不高。
2、其次如果出现任务阻塞,导致一些场景的定时计算没有实际意义,比如每天12点的一个计算任务被阻塞到1点去执行,会导致结果并非我们想要的。

使用优点:

1、配置简单
2、适用于单个后台线程执行周期任务,并且保证顺序一致执行的场景    

源码分析:

//默认使用的调度器
if(this.taskScheduler == null) {
    this.localExecutor = Executors.newSingleThreadScheduledExecutor();
    this.taskScheduler = new ConcurrentTaskScheduler(this.localExecutor);
}
//可以看到SingleThreadScheduledExecutor指定的核心线程为1,说白了就是单线程执行
public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
    return new DelegatedScheduledExecutorService
        (new ScheduledThreadPoolExecutor(1));
}
//利用了DelayedWorkQueue延时队列作为任务的存放队列,这样便可以实现任务延迟执行或者定时执行
public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue());
}

  

2、实现并发的定时任务

使用方式:

  • 方式一:由1中我们知道之所以定时任务是阻塞执行,是配置的线程池决定的,那就好办了,换一个不就行了!直接上代码:

      @Configuration
      public class ScheduledConfig implements SchedulingConfigurer {
    
          @Autowired
          private TaskScheduler myThreadPoolTaskScheduler;
    
          @Override
          public void configureTasks(ScheduledTaskRegistrar scheduledTaskRegistrar) {
              //简单粗暴的方式直接指定
              //scheduledTaskRegistrar.setScheduler(Executors.newScheduledThreadPool(5));
              //也可以自定义的线程池,方便线程的使用与维护,这里不多说了
              scheduledTaskRegistrar.setTaskScheduler(myThreadPoolTaskScheduler);
          }
      }
    
      @Bean(name = "myThreadPoolTaskScheduler")
      public TaskScheduler getMyThreadPoolTaskScheduler() {
          ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
          taskScheduler.setPoolSize(10);
          taskScheduler.setThreadNamePrefix("Haina-Scheduled-");
          taskScheduler.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
          //调度器shutdown被调用时等待当前被调度的任务完成
          taskScheduler.setWaitForTasksToCompleteOnShutdown(true);
          //等待时长
          taskScheduler.setAwaitTerminationSeconds(60);
          return taskScheduler;
      }       
  • 方式二:方式一的本质改变了任务调度器默认使用的线程池,接下来这种是不改变调度器的默认线程池,而是把当前任务交给一个异步线程池去执行
    • 首先使用@EnableAsync 启用异步任务
    • 然后在定时任务的方法加上@Async即可,默认使用的线程池为SimpleAsyncTaskExecutor(该线程池默认来一个任务创建一个线程,就会不断创建大量线程,极有可能压爆服务器内存。当然它有自己的限流机制,这里就不多说了,有兴趣的自己翻翻源码~)
    • 项目中为了更好的控制线程的使用,我们可以自定义我们自己的线程池,使用方式@Async("myThreadPool")

    废话太多,直接上代码:

      @Scheduled(fixedRate = 1000*10,initialDelay = 1000*20)
      @Async("myThreadPoolTaskExecutor")
      //@Async
      public void scheduledTest02(){
          System.out.println(Thread.currentThread().getName()+"--->xxxxx--->"+Thread.currentThread().getId());
      }
    
      //自定义线程池
      @Bean(name = "myThreadPoolTaskExecutor")
      public TaskExecutor  getMyThreadPoolTaskExecutor() {
          ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
          taskExecutor.setCorePoolSize(20);
          taskExecutor.setMaxPoolSize(200);
          taskExecutor.setQueueCapacity(25);
          taskExecutor.setKeepAliveSeconds(200);
          taskExecutor.setThreadNamePrefix("Haina-ThreadPool-");
          // 线程池对拒绝任务(无线程可用)的处理策略,目前只支持AbortPolicy、CallerRunsPolicy;默认为后者
          taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
          //调度器shutdown被调用时等待当前被调度的任务完成
          taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
          //等待时长
          taskExecutor.setAwaitTerminationSeconds(60);
          taskExecutor.initialize();
          return taskExecutor;
      }
  • 线程池的使用心得(后续有专门文章来探讨)
    • java中提供了ThreadPoolExecutor和ScheduledThreadPoolExecutor,对应与spring中的ThreadPoolTaskExecutor和ThreadPoolTaskScheduler,但是在原有的基础上增加了新的特性,在spring环境下更容易使用和控制。
    • 使用自定义的线程池能够避免一些默认线程池造成的内存溢出、阻塞等等问题,更贴合自己的服务特性
    • 使用自定义的线程池便于对项目中线程的管理、维护以及监控。
    • 即便在非spring环境下也不要使用java默认提供的那几种线程池,坑很多,阿里代码规约不说了吗,得相信大厂!!!

三、动态定时任务的实现

问题:

  • 使用@Scheduled注解来完成设置定时任务,但是有时候我们往往需要对周期性的时间的设置会做一些改变,或者要动态的启停一个定时任务,那么这个时候使用此注解就不太方便了,原因在于这个注解中配置的cron表达式必须是常量,那么当我们修改定时参数的时候,就需要停止服务,重新部署。

    解决办法:

  • 方式一:实现SchedulingConfigurer接口,重写configureTasks方法,重新制定Trigger,核心方法就是addTriggerTask(Runnable task, Trigger trigger) ,不过需要注意的是,此种方式修改了配置值后,需要在下一次调度结束后,才会更新调度器,并不会在修改配置值时实时更新,实时更新需要在修改配置值时额外增加相关逻辑处理。
      @Configuration
      public class ScheduledConfig implements SchedulingConfigurer {
    
      @Autowired
      private TaskScheduler myThreadPoolTaskScheduler;
    
      @Override
      public void configureTasks(ScheduledTaskRegistrar scheduledTaskRegistrar) {
          //scheduledTaskRegistrar.setScheduler(Executors.newScheduledThreadPool(5));
          scheduledTaskRegistrar.setTaskScheduler(myThreadPoolTaskScheduler);
          //可以实现动态调整定时任务的执行频率
          scheduledTaskRegistrar.addTriggerTask(
                  //1.添加任务内容(Runnable)
                  () -> System.out.println("cccccccccccccccc--->" + Thread.currentThread().getId()),
                  //2.设置执行周期(Trigger)
                  triggerContext -> {
                      //2.1 从数据库动态获取执行周期
                      String cron = "0/2 * * * * ? ";
                      //2.2 合法性校验.
      //                    if (StringUtils.isEmpty(cron)) {
      //                        // Omitted Code ..
      //                    }
                          //2.3 返回执行周期(Date)
                          return new CronTrigger(cron).nextExecutionTime(triggerContext);
                      }
              );
      }
      }
  • 方式二:使用threadPoolTaskScheduler类可实现动态添加删除功能,当然也可实现执行频率的调整
      首先,我们要认识下这个调度类,它其实是对java中ScheduledThreadPoolExecutor的一个封装改进后的产物,主要改进有以下几点:
          1、提供默认配置,因为是ScheduledThreadPoolExecutor,所以只有poolSize这一个默认参数。
          2、支持自定义任务,通过传入Trigger参数。
          3、对任务出错处理进行优化,如果是重复性的任务,不抛出异常,通过日志记录下来,不影响下次运行,如果是只执行一次的任务,将异常往上抛。
      顺便说下ThreadPoolTaskExecutor相对于ThreadPoolExecutor的改进点:
          1、提供默认配置,原生的ThreadPoolExecutor的除了ThreadFactory和RejectedExecutionHandler其他没有默认配置
          2、实现AsyncListenableTaskExecutor接口,支持对FutureTask添加success和fail的回调,任务成功或失败的时候回执行对应回调方法。
          3、因为是spring的工具类,所以抛出的RejectedExecutionException也会被转换为spring框架的TaskRejectedException异常(这个无所谓)
          4、提供默认ThreadFactory实现,直接通过参数重载配置

    扯了这么多,还是直接上代码:

      @Component
      public class DynamicTimedTask {
    
          private static final Logger logger = LoggerFactory.getLogger(DynamicTimedTask.class);
    
          //利用创建好的调度类统一管理
          //@Autowired
          //@Qualifier("myThreadPoolTaskScheduler")
          //private ThreadPoolTaskScheduler myThreadPoolTaskScheduler;
    
          //接受任务的返回结果
          private ScheduledFuture<?> future;
    
          @Autowired
          private ThreadPoolTaskScheduler threadPoolTaskScheduler;
    
          //实例化一个线程池任务调度类,可以使用自定义的ThreadPoolTaskScheduler
          @Bean
          public ThreadPoolTaskScheduler threadPoolTaskScheduler() {
              ThreadPoolTaskScheduler executor = new ThreadPoolTaskScheduler();
              return new ThreadPoolTaskScheduler();
          }
    
          /**
           * 启动定时任务
           * @return
           */
          public boolean startCron() {
              boolean flag = false;
              //从数据库动态获取执行周期
              String cron = "0/2 * * * * ? ";
              future = threadPoolTaskScheduler.schedule(new CheckModelFile(),cron);
              if (future!=null){
                  flag = true;
                  logger.info("定时check训练模型文件,任务启动成功!!!");
              }else {
                  logger.info("定时check训练模型文件,任务启动失败!!!");
              }
              return flag;
          }
    
          /**
           * 停止定时任务
           * @return
           */
          public boolean stopCron() {
              boolean flag = false;
              if (future != null) {
                  boolean cancel = future.cancel(true);
                  if (cancel){
                      flag = true;
                      logger.info("定时check训练模型文件,任务停止成功!!!");
                  }else {
                      logger.info("定时check训练模型文件,任务停止失败!!!");
                  }
              }else {
                  flag = true;
                  logger.info("定时check训练模型文件,任务已经停止!!!");
              }
              return flag;
          }
    
          class CheckModelFile implements Runnable{
    
              @Override
              public void run() {
                  //编写你自己的业务逻辑
                  System.out.print("模型文件检查完毕!!!")
              }
          }
    
      }

四、总结

  • 到此基于springtask下的定时任务的简单使用算是差不多了,其中不免有些错误的地方,或者理解有偏颇的地方欢迎大家提出来!
  • 基于分布式集群下的定时任务使用,后续有时间再继续!!!

个人博客地址:

csdn:https://blog.csdn.net/tiantuo6513
cnblogs:https://www.cnblogs.com/baixianlong
segmentfault:https://segmentfault.com/u/baixianlong
github:https://github.com/xianlongbai

原文地址:https://www.cnblogs.com/baixianlong/p/10659045.html

时间: 2024-10-04 07:34:30

SpringBoot中并发定时任务的实现、动态定时任务的实现(看这一篇就够了)的相关文章

Java中的多线程你只要看这一篇就够了

Java中的多线程你只要看这一篇就够了 引 如果对什么是线程.什么是进程仍存有疑惑,请先Google之,因为这两个概念不在本文的范围之内. 用多线程只有一个目的,那就是更好的利用cpu的资源,因为所有的多线程代码都可以用单线程来实现.说这个话其实只有一半对,因为反应"多角色"的程序代码,最起码每个角色要给他一个线程吧,否则连实际场景都无法模拟,当然也没法说能用单线程来实现:比如最常见的"生产者,消费者模型". 很多人都对其中的一些概念不够明确,如同步.并发等等,让我

Java中的多线程=你只要看这一篇就够了

如果对什么是线程.什么是进程仍存有疑惑,请先Google之,因为这两个概念不在本文的范围之内. 用多线程只有一个目的,那就是更好的利用cpu的资源,因为所有的多线程代码都可以用单线程来实现.说这个话其实只有一半对,因为反应“多角色”的程序代码,最起码每个角色要给他一个线程吧,否则连实际场景都无法模拟,当然也没法说能用单线程来实现:比如最常见的“生产者,消费者模型”. 很多人都对其中的一些概念不够明确,如同步.并发等等,让我们先建立一个数据字典,以免产生误会. 多线程:指的是这个程序(一个进程)运

中后台产品的表格设计,看这一篇就够了(原型规范下载)

中后台产品的表格设计,看这一篇就够了(原型规范下载) 2018年4月16日luodonggan 中后台产品的表格设计,看这一篇就够了(原型规范下载) 经过了将近一年的后台产品经历,踩了很多坑,试了很多错,也学习到了很多东西,目前也形成了自己的一套规范.本文将其中的部分收获汇总成文,希望能够对大家有所帮助. 后台产品有一个很重要.常见的元素,就是表格.表格承担着详情入口.数据展示的功能,看似简单,其实里面的细节特别多.在以效率为最重要的需求的后台产品中,如何设计一个能够高效率地进行查看和编辑的表格

SpringBoot中异步请求和异步调用(看这一篇就够了)

一.SpringBoot中异步请求的使用 1.异步请求与同步请求 特点: 可以先释放容器分配给请求的线程与相关资源,减轻系统负担,释放了容器所分配线程的请求,其响应将被延后,可以在耗时处理完成(例如长时间的运算)时再对客户端进行响应.一句话:增加了服务器对客户端请求的吞吐量(实际生产上我们用的比较少,如果并发请求量很大的情况下,我们会通过nginx把请求负载到集群服务的各个节点上来分摊请求压力,当然还可以通过消息队列来做请求的缓冲). 2.异步请求的实现 方式一:Servlet方式实现异步请求

[转]Java中的多线程你只要看这一篇就够了

如果对什么是线程.什么是进程仍存有疑惑,请先Google之,因为这两个概念不在本文的范围之内. 用多线程只有一个目的,那就是更好的利用cpu的资源,因为所有的多线程代码都可以用单线程来实现.说这个话其实只有一半对,因为反应“多角色”的程序代码,最起码每个角色要给他一个线程吧,否则连实际场景都无法模拟,当然也没法说能用单线程来实现:比如最常见的“生产者,消费者模型”. 很多人都对其中的一些概念不够明确,如同步.并发等等,让我们先建立一个数据字典,以免产生误会. 多线程:指的是这个程序(一个进程)运

Java 中的多线程你只要看这一篇就够了

引 如果对什么是线程.什么是进程仍存有疑惑,请先Google之,因为这两个概念不在本文的范围之内. 用多线程只有一个目的,那就是更好的利用cpu的资源,因为所有的多线程代码都可以用单线程来实现.说这个话其实只有一半对,因为反应"多角色"的程序代码,最起码每个角色要给他一个线程吧,否则连实际场景都无法模拟,当然也没法说能用单线程来实现:比如最常见的"生产者,消费者模型". 很多人都对其中的一些概念不够明确,如同步.并发等等,让我们先建立一个数据字典,以免产生误会. 多

【转】Java中的多线程你只要看这一篇就够了

转自:https://www.cnblogs.com/wxd0108/p/5479442.html 引 如果对什么是线程.什么是进程仍存有疑惑,请先Google之,因为这两个概念不在本文的范围之内. 用多线程只有一个目的,那就是更好的利用cpu的资源,因为所有的多线程代码都可以用单线程来实现.说这个话其实只有一半对,因为反应"多角色"的程序代码,最起码每个角色要给他一个线程吧,否则连实际场景都无法模拟,当然也没法说能用单线程来实现:比如最常见的"生产者,消费者模型"

web开发中的长度单位(px,em,ex,rem),如何运用,看完这篇就够了!

原创 2017-03-08 web小二 web前端开发 作为一名前端开发人员,css中的长度单位,都是我们在工作中非常熟悉的名词,因为没有它们,我们就不能声明某个字符应该多大,或者某些图像周围应该留白多少,甚至有时候能导致css不能进行正常工作,所以在很多css属性中,它们都是依赖于长度单位来显示各种页面元素. 1.长度单位包括哪些? 长度单位,其实在我们的生活中,也非常常见,例如,厘米.毫米.英寸,还有经常接触到的像素(px),元素的字体高度(em).字母x的高度(ex).百分比(%)等等这些

Java 中的 override 和 overload,看这一篇就够

问题出现: 即使对于一个经验丰富的开发人员来说,方法重载和方法覆盖的区别都能让他犹豫一下, 对于新手来说,经常容易弄混淆. 有没有比较深入浅出的理解方式,能让人过目不忘,用起来还能有条件反射般的速度呢? 其他人是怎么做的: 写类似比较的博客非常之多,无非也就是分开介绍,然后比较区别. 好像也有效果,前提是要理解,完了还要不时拿来复习,保持记忆不被遗忘. 可以这样理解: override 最准确的翻译应该是推翻,重写. overload 最准确的翻译应该是过载. 如果你不是新手,看到这个解释的时候