java实现任务调度

最近的一个小项目是做一个简单的数据仓库,需要将其他数据库的数据抽取出来,并通过而出抽取成页面需要的数据,以空间换时间的方式,让后端报表查询更快。

因为在抽取的过程中,有一定的先后顺序,需要做一个任务调度器,某一优先级的会先执行,然后会进入下一个优先级的队列任务中。

先定义了一个Map的集合,key是优先级,value是任务的集合,某一个优先级内的任务是并发执行的,而不同优先级是串行执行的,前一个优先级执行完之后,后面的才会执行。

ConcurrentHashMap<Integer/* 优先级. */, List<BaseTask>/* 任务集合. */> tasks = new ConcurrentHashMap<>();

这个调度管理有一个演进的过程,我先说第一个,这个是比较好理解的。

第一个版本:

首先对tasks集合中的key进行一个排序,我定义的是数字越小就有限执行,则进行遍历key值,并取出某个优先级的任务队列,执行任务队列的任务。任务的执行交给线程池去执行,在遍历内部,需要不断的检查这个队列中的任务是否都执行了,没有则一直等待否则进入到下个队列,任务执行的时候可能会抛出异常,但是不管任务是否异常,都将任务状态设置已执行。

下面是其核心代码:

public void run() {
    //对key值进行排序
    Enumeration<Integer> keys = tasks.keys();
    List<Integer> prioritys = new ArrayList<>();
    while (keys.hasMoreElements()) {
      prioritys.add(keys.nextElement());
    }
    Collections.sort(prioritys);//升序
    //对key进行遍历,执行某个某个优先级的任务队列
    for (Integer priority : prioritys) {
      List<BaseTask> taskList = tasks.get(priority);
      if (taskList.isEmpty()) {
        continue;
      }
      logger.info("execute priority {} task ", taskList.get(0).priority);
      for (BaseTask task : taskList) {
        executor.execute(() -> {
          try {
            task.doTask();
          } catch (Exception e) {
            e.printStackTrace();
          }
        });//线程中执行任务
      }
      while (true) {//等待所有线程都执行完成之后执行下一个任务队列
        boolean finish = true;
        for (BaseTask t : taskList) {
          if (!t.finish) {
            finish = false;
          }
        }
        if (finish) {//当前任务都执行完毕
          break;
        }
        Misc.sleep(1000);//Thread.sleep(1000)
      }
      Misc.sleep(1000);
    }
  }

关键代码很好理解,在任务执行之前,需要对所有任务都初始化,初始化的时候给出每个任务的优先级和任务名称,任务抽象类如下:

public abstract class BaseTask {
  public String taskName;//任务名称
  public Integer priority; //优先级
  public boolean finish; //任务完成?
  /**
   * 执行的任务
   */
  public abstract void doTask(Date date) throws Exception;

第一个版本的思路很简单。

第二个版本稍微有一点点复杂。这里主要介绍该版本的内容,后续将代码的链接附上。

程序是由SpringBoot搭建起来的,定时器是Spring内置的轻量级的Quartz,使用Aop方式拦截异常,使用注解的方式在任务初始化时设置任务的初始变量。使用EventBus解耦程序,其中程序简单实现邮件发送功能(该功能还需要自己配置参数),以上这些至少需要简单的了解一下。

程序的思路:在整个队列执行过程中会有多个管道,某个队列上的管道任务执行完成,可以直接进行到下一个队列中执行,也设置了等待某一个队列上的所有任务都执行完成才执行当前任务。在某个队列任务中会标识某些任务是一队的,其他的为另一队,当这一队任务执行完成,就可以到下一个队列中去,不需要等待另一队。

这里会先初始化每个队列的每个队的条件,这个条件就是每个队的任务数,执行完成减1,当为0时,就进入下一个队列中。

分四个步骤进行完成:

1.bean的初始化

2.条件的设置

3.任务的执行

4.任务异常和任务执行完成之后通知检查是否执行下一个队列的任务

1.bean的初始化

1.创建注解类

@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
@Documented
public @interface TaskAnnotation {

  int priority() default 0;//优先级
  String taskName() default "";//任务名称
  TaskQueueEnum[] queueName() default {};//队列名称
}

2.实现BeanPostProcessor,该接口是中有两个方法postProcessBeforeInitialization和postProcessAfterInitialization,分别是bean初始化之前和bean初始化之后做的事情。

    Annotation[] annotations = bean.getClass().getAnnotations();//获取类上的注解
    if (ArrayUtils.isEmpty(annotations)) {//注解为空时直接返回(不能返回空,否则bean不会被加载)
        return bean;
    }
    for (Annotation annotation : annotations) {
        if (annotation.annotationType().equals(TaskAnnotation.class)) {
          TaskAnnotation taskAnnotation = (TaskAnnotation) annotation;//强转
           try {
            Field[] fields = target.getClass().getFields();//需要通过反射将值进行修改,下面的操作仅仅是对象的引用
            if (!ArrayUtils.isEmpty(fields)) {
              for (Field f : fields) {
                f.setAccessible(true);
                if (f.getName().equals("priority")) {
                  f.set(target, taskAnnotation.priority());
                }
            }
          }
     }
  }

上面需要注意的一点是需要通过反射的机制给bean设置值,不能直接调用bean的方式set值,否则bean的值是空的。

上面的代码通过实现BeanPostProcessor后置处理器,处理任务上的注解,完成对任务的初始化的。

2.条件的初始化

创建条件类,提供初始化的方法。

public abstract class BaseTask {

  public int nextPriority;//子级节点的优先级

  public String taskName;//任务名称

  public Integer priority; //优先级

  public String queueName;//队列名称

  public boolean finish; //任务完成?

  public boolean allExecute;

  /**
   * 执行的任务
   */
  public abstract void doTask(Date date) throws Exception;

    //任务完成之后,通过eventBus发送通知,是否需要执行下一个队列
  public void notifyExecuteTaskMsg(EventBus eventBus, Date date) {
    EventNotifyExecuteTaskMsg msg = new EventNotifyExecuteTaskMsg();
    msg.setDate(date);
    msg.setNextPriority(nextPriority);
    msg.setQueueName(queueName);
    msg.setPriority(priority);
    msg.setTaskName(taskName);
    eventBus.post(msg);
  }
}

public class TaskExecuteCondition {

  private ConcurrentHashMap<String, AtomicInteger> executeMap = new ConcurrentHashMap<>();

  /**
   * 初始化,每个队列进行分组,每个组的任务数量放入map集合中.
   */
  public void init(ConcurrentHashMap<Integer, List<BaseTask>> tasks) {
    Enumeration<Integer> keys = tasks.keys();
    List<Integer> prioritys = new ArrayList<>();
    while (keys.hasMoreElements()) {
      prioritys.add(keys.nextElement());
    }
    Collections.sort(prioritys);//升序
    for (Integer priority : prioritys) {
      List<BaseTask> list = tasks.get(priority);
      if (list.isEmpty()) {
        continue;
      }
      //对每个队列进行分组
      Map<String, List<BaseTask>> collect = list.stream()
          .collect(Collectors.groupingBy(x -> x.queueName, Collectors.toList()));
      for (Entry<String, List<BaseTask>> entry : collect.entrySet()) {
        for (BaseTask task : entry.getValue()) {
          addCondition(task.priority, task.queueName);
        }
      }
    }
  }

  /**
   * 执行任务完成,条件减1
   */
  public boolean executeTask(Integer priority, String queueName) {
    String name = this.getQueue(priority, queueName);
    AtomicInteger count = executeMap.get(name);
    int sum = count.decrementAndGet();
    if (sum == 0) {
      return true;
    }
    return false;
  }

  /**
   * 对个某个队列的条件
   */
  public int getCondition(Integer priority, String queueName) {
    String name = this.getQueue(priority, queueName);
    return executeMap.get(name).get();
  }

  private void addCondition(Integer priority, String queueName) {
    String name = this.getQueue(priority, queueName);
    AtomicInteger count = executeMap.get(name);
    if (count == null) {
      count = new AtomicInteger(0);
      executeMap.put(name, count);
    }
    count.incrementAndGet();
  }

  private void addCondition(Integer priority, String queueName, int sum) {
    String name = this.getQueue(priority, queueName);
    AtomicInteger count = executeMap.get(name);
    if (count == null) {
      count = new AtomicInteger(sum);
      executeMap.put(name, count);
    } else {
      count.set(sum);
    }
  }

  private String getQueue(Integer priority, String queueName) {
    return priority + queueName;
  }

  /**
   * 清除队列
   */
  public void clear() {
    this.executeMap.clear();
  }
}

3.任务的执行

任务执行类提供run方法,执行第一个队列,并提供获取下一个队列优先级方法,执行某个队列某个组的方法。

public class ScheduleTask {
  private static final Logger logger = LoggerFactory.getLogger(ScheduleTask.class);

  public ConcurrentHashMap<Integer/* 优先级. */, List<BaseTask>/* 任务集合. */> tasks = new ConcurrentHashMap<>();

  @Autowired
  private ThreadPoolTaskExecutor executor;//线程池
    //任务会先执行第一队列的任务.
  public void run(Date date) {
    Enumeration<Integer> keys = tasks.keys();
    List<Integer> prioritys = new ArrayList<>();
    while (keys.hasMoreElements()) {
      prioritys.add(keys.nextElement());
    }
    Collections.sort(prioritys);//升序
    Integer priority = prioritys.get(0);
    executeTask(priority, date);//执行第一行的任务.
  }
    //获取下一个队列的优先级
  public Integer nextPriority(Integer priority) {
    Enumeration<Integer> keys = tasks.keys();
    List<Integer> prioritys = new ArrayList<>();
    while (keys.hasMoreElements()) {
      prioritys.add(keys.nextElement());
    }
    Collections.sort(prioritys);//升序
    for (Integer pri : prioritys) {
      if (priority < pri) {
        return pri;
      }
    }
    return null;//没有下一个队列
  }

  public void executeTask(Integer priority) {
    List<BaseTask> list = tasks.get(priority);
    if (list.isEmpty()) {
      return;
    }
    for (BaseTask task : list) {
      execute(task);
    }
  }
 //执行某个队列的某个组
  public void executeTask(Integer priority, String queueName) {
    List<BaseTask> list = this.tasks.get(priority);
    list = list.stream().filter(task -> queueName.equals(task.queueName))
        .collect(Collectors.toList());
    if (list.isEmpty()) {
      return;
    }
    for (BaseTask task : list) {
      execute(task);
    }
  }

  public void execute(BaseTask task) {
    executor.execute(() -> {
      try {
        task.doTask(date);//
      } catch (Exception e) {//异常处理已经Aop拦截处理
      }
    });//线程中执行任务
  }

  /**
   * 增加任务
   */
  public void addTask(BaseTask task) {
    List<BaseTask> baseTasks = tasks.get(task.priority);
    if (baseTasks == null) {
      baseTasks = new ArrayList<>();
      List<BaseTask> putIfAbsent = tasks.putIfAbsent(task.priority, baseTasks);
      if (putIfAbsent != null) {
        baseTasks = putIfAbsent;
      }
    }
    baseTasks.add(task);
  }

  /**
   * 将任务结束标识重新设置
   */
  public void finishTask() {
    tasks.forEach((key, value) -> {
      for (BaseTask task : value) {
        task.finish = false;
      }
    });
  }
}

4.任务异常和任务执行完成之后通知检查是否执行下一个队列的任务

public class EventNotifyExecuteTaskListener {
  private static final Logger logger = LoggerFactory .getLogger(EventNotifyExecuteTaskListener.class);
  @Autowired
  private ScheduleTask scheduleTask;

  @Autowired
  private TaskExecuteCondition condition;

  @Subscribe
  public void executeTask(EventNotifyExecuteTaskMsg msg) {
  //当前队列的某组内容是否都执行完成
    boolean success = condition.executeTask(msg.getPriority(), msg.getQueueName());
    if (success) {
      Integer nextPriority = scheduleTask.nextPriority(msg.getPriority());
      if (nextPriority != null) {
        scheduleTask.executeTask(nextPriority, msg.getQueueName(), msg.getDate());//执行下一个队列
      } else {//执行完成,重置任务标识
        scheduleTask.finishTask();
        logger.info("CoreTask end!");
      }
    }
  }
}

整个思路介绍到这里,那么接下来是整个项目中出现的一些问题

1.BeanPostProcessor与Aop一起使用时,postProcessAfterInitialization调用之后获取的bean分为不同的了,一个是jdk原生实体对象,一种是Aop注解下的类会被cglib代理,生成带有后缀的对象,如果通过这个对象时反射获取类的注解,字段和方法,就获取不到,在代码中,需要将其转化一下,将cgLib代理之后的类转化为不带后缀的对象。

2.postProcessAfterInitialization的参数bean不能直接设置值,就是如下:

 TaskAnnotation taskAnnotation = (TaskAnnotation) annotation;//强转
 BaseTask baseTask = (BaseTask) bean;//强转
 baseTask.priority = taskAnnotation.priority();

在使用对象时,其中对象的字段时为空的,并需要通过反射的方式去设置字段的值。

上面仅仅只是个人的想法,如果有更好的方式,或者有某些地方可以进行改进的,我们可以共同探讨一下。

链接地址:https://github.com/wangice/task-scheduler

程序中使用了一个公共包:https://github.com/wangice/misc

原文地址:https://www.cnblogs.com/skyice/p/9691736.html

时间: 2024-10-21 16:37:19

java实现任务调度的相关文章

java计划任务调度框架quartz结合spring实现调度的配置实例代码分享

点击链接加入群[JavaEE(SSH+IntelliJIDE+Maven)]:http://jq.qq.com/?_wv=1027&k=L2rbHv 一:quartz简介 OpenSymphony 的Quartz提供了一个比较完美的任务调度解决方案. Quartz 是个开源的作业调度框架,定时调度器,为在 Java 应用程序中进行作业调度提供了简单却强大的机制. Quartz中有两个基本概念:作业和触发器.作业是能够调度的可执行任务,触发器提供了对作业的调度 二:quartz spring配置详

JAVA定时任务调度之Timer入门详解(二)

在上篇的JAVA定时任务调度之Timer入门详解(一)文章中,主要介绍了下Timer,接下来我们一起来看看Timer的一些常用方法. 1.schedule()的4种用法. 第一种:schedule(TimerTask task, Date time); task:安排的任务,time:具体执行时间.这个函数表达的意义是:在时间等于或超过time的时候执行且执行一次task.测试内容如下 MyTimerTask.java的代码跟第一篇一样,MyTimer.java的部分代码截图如下: 运行后,控制

Java定时任务调度详解

前言 在实际项目开发中,除了Web应用.SOA服务外,还有一类不可缺少的,那就是定时任务调度.定时任务的场景可以说非常广泛,比如某些视频网站,购买会员后,每天会给会员送成长值,每月会给会员送一些电影券:比如在保证最终一致性的场景中,往往利用定时任务调度进行一些比对工作:比如一些定时需要生成的报表.邮件:比如一些需要定时清理数据的任务等.本篇博客将系统的介绍定时任务调度,会涵盖Timer.ScheduledExecutorService.开源工具包Quartz,以及Spring和Quartz的结合

java定时任务调度工具

一.什么是定时任务调度 基于给定的时间点,给定的时间间隔或者给定的时间执行次数自动执行的任务. 二.java中常用的定时任务调度工具: Timer Quartz 2.1两者区别: Timer源自jdk,Quartz需要额外引入jar包. Timer功能少,使用方便,能解决许多常见问题.Quartz功能强大,使用麻烦,能解决几乎所有问题. Timer底层通过线程执行定时任务.Quartz底层通过多个线程池执行定时任务. 2.2Timer简介 定义:有且仅有一个后台线程对多个业务线程,进行定时定频率

Java的任务调度,使用Cron表达式实现

通过定时任务调度框架Quartz可以实现某些定时功能,定时执行某些方法等功能.下面提供一个简单的例子,实现Quartz框架的Hello World. import org.quartz.CronExpression; import org.quartz.CronTrigger; import org.quartz.Job; import org.quartz.JobDetail; import org.quartz.JobExecutionContext; import org.quartz.J

java定时任务调度工具Timer与Quartz的区别

Timer与Quartz的区别有三点: 1.出身不同:Timer由jdk直接提供,调用方式简单粗暴,不需要其它jar包支持.Quartz并非jdk自带,需要引入相应的jar包 2.能力区别:主要体现在对时间的控制上.某个具体时间执行具什么任务的话Timer可以轻松搞定,而比如每个星期天早上八点提醒做某事的功能就需要Quartz,因此Quartz对时间的控制远比Timer强大,完善 3.底层机制:

Java定时任务工具详解之Timer篇

Java定时任务调度工具详解 什么是定时任务调度? ◆ 基于给定的时间点,给定的时间间隔或者给定的执行次数自动执行的任务. 在Java中的定时调度工具? ◆ Timer       ◆Quartz Timer和Quarzt的区别? ◆ 出身不同(Timer由JDK直接提供,调用方式简单粗暴,不需要别的jar支持) ◆ 能力区别(TImer简单的定时任务,如需要每个星期天的8点,则需要Quarzt) ◆ 底层机制 原文地址:https://www.cnblogs.com/caifenglin/p/

Android 学习笔记之AndBase框架学习(四) 使用封装好的函数实现单,多线程任务

PS:Force Is Meaningless Without Skill 学习内容: 1.使用AndBase实现单线程任务... 2.使用AndBase实现多线程任务...   AndBase内部封装了多种方法,我们可以使用这些方法构建单线程任务和多线程任务..一般线程任务用于执行耗时的操作...比如说下载什么安装包,文件等数据量较大的东西,我们是必须要开启一个异步线程或者是同步线程来执行操作...如果任务过多,那么我们就可以使用任务请求队列或者是线程池来处理多个任务...这样可以节省很多的时

hadoop进阶系列之海量web日志KPI指标提取

转载请注明出处: 转载自  Thinkgamer的CSDN博客:blog.csdn.net/gamer_gyt 代码下载地址:点击查看 1:Web日志分析系统概述 2:需求分析:日志提取预处理,KPI指标设计,存储与展现 3:算法模型:Hadoop并行算法 4:架构设计:构建hadoop项目 5:程序实现:MR2V程序实现 6:结果可视化 一:Web日志分析系统概述 Web日志由Web]服务器产生,可能是Nginx,Apache,Tomcat等,从Web日志中我们可以提取到很多有用的信息,比如说