一个任务调度

最近把以前项目中用的任务调度提了出来,做了一个Demo。

任务调度用到的组件是quartz.net。关于quartz.net的文章网上有很多了,这里再简单介绍下。

首先是创建一个作业明细

 /// <summary>
        /// 根据作业计划来创建作业明细
        /// </summary>
        /// <param name="task"></param>
        /// <param name="taskData"></param>
        /// <returns></returns>
        public static IJobDetail CreateJobDetail(ScheduleTask task, IDictionary<string, object> taskData = null)
        {
            if (task == null) throw new ArgumentNullException("『CreateJobDetail』的task参数为空!");
            //反射加载程序集
            var path = System.IO.Path.Combine(AppDomain.CurrentDomain.BaseDirectory, string.Format("bin\\TaskManageDemo.Task.dll"));
            Assembly asmb = Assembly.LoadFrom(path);
            Type taskType = asmb.GetType(string.Format("TaskManageDemo.Task.{0}", task.MethodName));
            if (taskType == null)
                throw new NotImplementedException(string.Format("『{0}』调用的类型未实现", task.MethodName));

            //作业执行上下文携带数据
            IDictionary<string, object> map = new Dictionary<string, object>() { { "task", task } };

            IJobDetail job =
                 JobBuilder.Create(taskType)
                .WithDescription(task.Remark)
                .WithIdentity(GetJobKey(task))
                .UsingJobData(new JobDataMap(map))
                .Build();

            return job;
        }

然后是创建一个触发器

 /// <summary>
        /// 根据作业计划来创建作业触发器
        /// </summary>
        /// <param name="task"></param>
        /// <param name="forJob"></param>
        /// <returns></returns>
        public static ITrigger CreateTrigger(ScheduleTask task, IJobDetail forJob = null)
        {
            if (task == null) throw new ArgumentNullException("『CreateTrigger』的task参数为空!");

            TriggerBuilder trigger = TriggerBuilder.Create();
            trigger.WithDescription(task.Remark)
             .WithIdentity(GetTriggerKey(task));

            trigger = string.IsNullOrEmpty(task.CronExpression) ?
                trigger.WithSimpleSchedule(x => x.WithIntervalInMinutes(30).WithRepeatCount(10)) :
                trigger.WithCronSchedule(task.CronExpression);

            if (forJob != null) trigger.ForJob(forJob);

            return trigger.Build();

        }

最后把创建的两样组合起来就新增了一个作业了

/// <summary>
        /// 使用作业计划来创建作业
        /// </summary>
        /// <param name="quartzScheduler">调度器</param>
        /// <param name="task">作业任务</param>
        /// <returns>二元组</returns>
        public static Tuple<IJobDetail, ITrigger> CreatScheduleJob(IScheduler quartzScheduler, ScheduleTask task)
        {
            Tuple<IJobDetail, ITrigger> tuple = null;
            IJobDetail ij = CreateJobDetail(task);
            ITrigger it = CreateTrigger(task);

            quartzScheduler.ScheduleJob(ij, it);
            tuple = new Tuple<IJobDetail, ITrigger>(ij, it);

            return tuple;
        }

作业必须实现IJob接口

这里用了一个基类来实现IJob接口,基类里有一个抽象方法,其他作业子类只要继承这个基类并实现这个抽象方法就行了。

public void Execute(IJobExecutionContext context)
        {
            //Quartz.Collection.ISet<JobKey> jobKeys = context.Scheduler.GetJobKeys(
            //      Quartz.Impl.Matchers.GroupMatcher<JobKey>.GroupEquals(JobHelp.jobGroupName));    //取所有运行的作业

            var task = context.MergedJobDataMap["task"] as ScheduleTask;
            string message = string.Format("{0}的『Execute』从『IJobExecutionContext』读取不到作业计划!", ""); // this.FullDevName
            if (task == null) throw new Exception(message);

            //刷新作业计划信息,防止作业计划配置发生改变
            string sql = "select * from ScheduleTask where [email protected]";
            var taskNew = _scheduleTask.GetFirst<ScheduleTask>(sql, new { id = task.Id });

            if (taskNew == null)
            {
                //计划已经被删除,则删除此作业
                context.Scheduler.DeleteJob(context.JobDetail.Key);
                Log.Logger.InfoFormat(string.Format("{0}作业计划为空,该记录可能已经被删除。", taskNew.MethodName));
                return; //退出
            }

            //作业不允许使用
            if (!taskNew.Allowused)
            {
                //不从调度计划中删除本作业,因为有可能又启用该作业计划
                Log.Logger.InfoFormat(string.Format("{0}作业计划不允许使用,跳过此次执行。", taskNew.MethodName));
                return; //退出
            }
            if (taskNew != task)
            {

                //脏数据,删除此作业,然后重新创建一个
                Log.Logger.InfoFormat("{0}的作业计划属性已更改,将删除该计划的实现作业,然后重新创建一个作业,并尝试调度它...", taskNew.MethodName);
                //作业计划属性发生变更,重新启动作业
                Tuple<IJobDetail, ITrigger> tuple = JobHelp.RestartJob(context.Scheduler, task, taskNew);
                Log.Logger.InfoFormat("{0}重新创建并调度作业完成,『IJOB.Execute』退出。作业计划:{1},作业:{2},触发器:{3},表达式:{4}。", taskNew.MethodName, taskNew.MethodName, tuple.Item1.Key.Name, tuple.Item2.Key.Name, taskNew.CronExpression);
                return; //退出
            }

            //执行具体作业的业务逻辑
            ExecuteJobImpl(context);

            //更新执行时间
            taskNew.LastTime=DateTime.Now.ToString();
            taskNew.NextTime = string.Format("{0:G}", context.NextFireTimeUtc.Value.AddHours(8));
            UpdateTime(taskNew);
        }

新增作业

怎样新增一个作业呢?

新增一个作业也很简单,只需在TaskManageDemo.Task项目下添加一个类,并继承作业父类实现抽象方法就可以了。

那怎样来配置呢?

实现了作业后,点击新增按钮来增加一个作业。

新增的作业是怎样被触发的呢?

这里我用了一个系统作业来唤醒新增的作业和删除不需要的作业。

protected override void ExecuteJobImpl(Quartz.IJobExecutionContext context)
        {
            //取所有运行的作业
            Quartz.Collection.ISet<JobKey> jobKeys = context.Scheduler.GetJobKeys(
                   Quartz.Impl.Matchers.GroupMatcher<JobKey>.GroupEquals(JobHelp.jobGroupName));

            string sql = "select * from ScheduleTask where [email protected]";
            var taskRuning = new List<ScheduleTask>();//正在运行的作业
            var taskInDb = _scheduleTask.Query<ScheduleTask>(sql, new { ClassName = "SysJob" }); //存在于数据库的作业,不包括系统作业
            foreach (var jk in jobKeys)
            {
                IJobDetail job = context.Scheduler.GetJobDetail(jk);
                ScheduleTask task = job.JobDataMap["task"] as ScheduleTask;
                if (task == null || task.ClassName == "SysJob") continue;  //不检查系统作业  

                //在数据库检测一次
                var taskInDb2 = taskInDb.ToList().FirstOrDefault(a => a.Id == task.Id);
                if (taskInDb2 == null)
                {
                    context.Scheduler.DeleteJob(jk); //删除该作业
                    Log.Logger.InfoFormat("作业计划『{0}』已经不存在于数据库。", task.ClassName);
                    continue;
                }
                taskRuning.Add(taskInDb2);

                if (taskInDb2 != task)
                {

                    //脏数据,删除此作业,然后重新创建一个
                    Log.Logger.InfoFormat("{0}的作业计划属性已更改,将删除该计划的实现作业,然后重新创建一个作业,并尝试调度它...", taskInDb2.ClassName);
                    //作业计划属性发生变更,重新启动作业
                    Tuple<IJobDetail, ITrigger> tuple = JobHelp.RestartJob(context.Scheduler, task, taskInDb2);
                    Log.Logger.InfoFormat("{0}重新创建并调度作业完成,『IJOB.Execute』退出。作业计划:{1},作业:{2},触发器:{3},表达式:{4}。", taskInDb2.ClassName, taskInDb2.ClassName, tuple.Item1.Key.Name, tuple.Item2.Key.Name, taskInDb2.CronExpression);
                    return; //退出
                }
            }
            //过滤出新增的作业
            var newTask = taskInDb.Except(taskRuning);
            if (newTask.Count() > 0)
            {
                //动态增加作业
                Log.Logger.InfoFormat("系统作业检测到有{0}个新增作业计划,开始创建这些作业...", newTask.Count());
                foreach (var task in newTask)
                    TaskManageDemo.Utility.Common.Execute(ScheduleJobByPlan, context.Scheduler, task, "创建作业失败,作业计划名称:{0}", task.ClassName); 

                Log.Logger.InfoFormat("系统作业创建作业完毕,共创建{0}个作业。", context.Scheduler.GetJobKeys(Quartz.Impl.Matchers.GroupMatcher<JobKey>.GroupEquals(JobHelp.jobGroupName)).Count - 1);
            }
            //系统作业执行完毕
            Log.Logger.InfoFormat("系统轮询作业执行完毕,目前共{0}个作业正在运行。", context.Scheduler.GetJobKeys(Quartz.Impl.Matchers.GroupMatcher<JobKey>.GroupEquals(JobHelp.jobGroupName)).Count);
            context.Put("ExecResult", "系统作业执行完成。");

            //Log.Logger.ErrorFormat("本次运行时间{0},下次运行时间{1}", DateTime.Now.ToString(), context.NextFireTimeUtc.Value.DateTime.AddHours(8).ToString());
        }

主要的应该也就这些,当然这是个Demo,有很多地方不是很完善。

源码下载

https://git.oschina.net/bin88123/TaskManageDemo

时间: 2024-10-04 10:17:09

一个任务调度的相关文章

ucos2.86的任务调度漏洞

Ucos2.86版本有一个任务调度的漏洞,该漏洞在2.88之后的版本已经修改过来了,今天我们来看看这个漏洞, 漏洞在官方2.88的文档中如下 这两个函数都是调度器函数,也就是说调度器有漏洞,但是看官方文档的说明,只有cortex-m3有这个bug,那我们就将2.88的代码和2.91的代码对比看看改变了哪些 2.86中的代码是这样的: void  OS_Sched (void) { #if OS_CRITICAL_METHOD == 3 OS_CPU_SR  cpu_sr = 0; #endif

任务调度

一个任务调度 最近把以前项目中用的任务调度提了出来,做了一个Demo. 任务调度用到的组件是quartz.net.关于quartz.net的文章网上有很多了,这里再简单介绍下. 首先是创建一个作业明细   然后是创建一个触发器   最后把创建的两样组合起来就新增了一个作业了   作业必须实现IJob接口 这里用了一个基类来实现IJob接口,基类里有一个抽象方法,其他作业子类只要继承这个基类并实现这个抽象方法就行了. public void Execute(IJobExecutionContext

转 linux任务调度之crontab命令

crontab命令常见于Unix和Linux的操作系统之中,用于设置周期性被执行的指令.该命令从标准输入设备读取指令,并将其存放于"crontab"文件中,以供之后读取和执行. 在Linux系统中,Linux任务调度的工作主要分为以下两类: 1.系统执行的工作:系统周期性所要执行的工作,如备份系统数据.清理缓存 2.个人执行的工作:某个用户定期要做的工作,例如每隔10分钟检查邮件服务器是否有新信,这些工作可由每个用户自行设置 一./etc/crontab./etc/cron.deny

在51上写一个“OS”原型

自己在51单片机上实现任务调度器的记录过程,下面的文本内容,完整的图文文档传送到了文库.传送门 闲来无聊,便有了想写操作系统的念头.之前也用过ucso.rtt.raw-os,虽然没怎么深入应用,但对操作系统也有些认识.好奇心的驱使,终于在国庆这段时间里实现了这个“OS”.于是,便有了本文,用来记录自己实现一个OS的过程.当然,这个OS,可不像上面说的几个rtos那样,这个OS只是实现了任务调度功能,还不能算真正意义的OS,甚至编码上看起来很丑陋.由于51单片机相对简单,尽管资源上比较有限,但还是

任务调度思考与开源软件opencron的使用经验

本文是我在一个技术分享群里面,对于任务调度和开源软件opencron的分享和使用体会,现在整理成文字,供大家参考. 大家好,下面我来分享一下 部署和使用 opencron 任务调度工具的一些经验和体会        我计划从五个方面来进行分享 1 任务调度需求与要素分析        任务调度,平时在大家的工作中应该会遇到比较多的,对于运维来说,操作系统的重要文件的每天定时备份,操作系统时间同步,各种数据库的备份,日志和监控信息的抓取等,都需要配置各种定时任务        对于开发和业务来说,

java实现任务调度

最近的一个小项目是做一个简单的数据仓库,需要将其他数据库的数据抽取出来,并通过而出抽取成页面需要的数据,以空间换时间的方式,让后端报表查询更快. 因为在抽取的过程中,有一定的先后顺序,需要做一个任务调度器,某一优先级的会先执行,然后会进入下一个优先级的队列任务中. 先定义了一个Map的集合,key是优先级,value是任务的集合,某一个优先级内的任务是并发执行的,而不同优先级是串行执行的,前一个优先级执行完之后,后面的才会执行. ConcurrentHashMap<Integer/* 优先级.

schedule任务调度及其用法(多线程并发)

如果需要执行更复杂的任务调度,则可使用 Python 提供的 sched 模块.该模块提供了 sched.scheduler 类,该类代表一个任务调度器. sched.scheduler(timefunc=time.monotonic, delayfunc=time.sleep) 构造器支持两个参数: timefunc:该参数指定生成时间戳的时间函数,默认使用 time.monotonic 来生成时间戳. delayfunc:该参数指定阻塞程序的函数,默认使用 time.sleep 函数来阻塞程

宜信微服务任务调度平台建设实践|分享实录

本文主要围绕SIA平台展开,包括研发背景设计思路和技术架构,以及如何支持业务方. 内容来源:宜信技术学院第4期技术沙龙-线上直播|宜信微服务任务调度平台建设实践 主讲人:宜信高级架构师&开发平台负责人 梁鑫 导读:如今,无论是互联网应用还是企业级应用,都充斥着大量的批处理任务,常常需要一些任务调度系统帮助我们解决问题.随着微服务化架构的逐步演进,单体架构逐渐演变为分布式.微服务架构. 在此背景下,很多之前的任务调度平台已经不能满足业务系统的需求,于是出现了一些基于分布式的任务调度平台.这些平台各

【转】Singularity:基于Apache Mesos构建的服务部署和作业调度平台

Singularity是一个在云基础设施中部署和运行服务和计划作业的平台,同时也是HubSpot PaaS的核心组件.它能够高效地管理底层进程的生命周期,并有效地利用集群资源.它可以作为持续部署基础设施的基本组成部分,而且是微服务部署的理想选择.它不仅能够管理数以百计的服务器上运行着的数以千计的进程,而且还提供了如下开箱即用的特性: 丰富的REST API,既有用于部署的,也有用于获取活动部署和历史部署信息的: Web应用客户端(Singularity UI)使用上述API向用户提供所有可获得信