更新列表:
- 任务参数可视化.
- 立即中断正在执行的任务.
- 每个任务独立的应用程序域
上一版参见:
基于Quqrtz.NET 做的任务调度管理工具
界面具体变化如下:
任务参数可视化
如上图所示, 在管理任务的界面上就可以知道这个任务需哪些参数/类型 及 参数的说明.
实现方式, 在 Job 上添加 特性 : ParameterTypeAttribute
1 namespace JobA { 2 [ParameterType(typeof(Parameter))] 3 public class Job : IJob { 4 5 public static ILog Log = LogManager.GetLogger(typeof(Job)); 6 public void Execute(IJobExecutionContext context) { 7 var dataMap = context.JobDetail.JobDataMap; 8 //if (dataMap.ContainsKey("int")) { 9 // var pInt = dataMap.GetIntValue("int"); 10 // Console.WriteLine("1 JobA Parameter {0}", pInt); 11 //} else { 12 // Log.Error("缺少参数 int, 未执行"); 13 // throw new JobExecutionException("缺少参数"); 14 //} 15 16 var p = dataMap.Parse<Parameter>(); 17 Console.WriteLine("{0}\t{1}\t{2}\t{3}", p.PDateTime, p.PDecimal, p.PInt, p.PNullableInt); 18 19 20 Thread.Sleep(TimeSpan.FromMinutes(3)); 21 } 22 } 23 }
取参数直接调用 dataMap.Parse<Parameter>() 就行了.
Parse 方法在: QM.Common. DatamapParser 中定义.
相比原始的从 DataMap 中用 key / value 方法取参数, 这种处理方式的好处不言而喻.
但是也有缺点, DataMap 支持任何可序列化的类型,
而用这种方法只支持
string, decimal, long, int, single, double, DateTime, DateTimeOffset, TimeSpan , bool, char 这些类型. (没有做更深一步的处理, 有兴趣的,可以尝试自己去实现.)
每个任务独立的应用程序域
试想一下插件式开发, 如果你做的插件需要N个第三方DLL, 而这些DLL并没有引用到主项目上, 怎么办呢? 一堆的 FileLoadException, FileNotFoundException 等错误, 想想都头疼.
如果你开发的插件想拥有自己的配置文件, 又该怎么办呢? 自己实现一个配置文件读取解析? ini ? xml ? 头疼吧.
针对上面的问题, 在这里的最佳解决办法是 : 独立的应用程序域.
这个要从 IScheduler.JobFactory 说起.
在QM.Server.QMServer 的构造方法中, 指定 Schedule.JobFactory 为 IsolatedJobFactory
IsolatedJobFactory 的定义:
1 public class IsolatedJobFactory : IJobFactory { 2 3 public IJob NewJob(TriggerFiredBundle bundle, IScheduler scheduler) { 4 return NewJob(bundle.JobDetail.JobType); 5 } 6 7 private IJob NewJob(Type jobType) { 8 return new IsolatedJob(jobType); 9 } 10 11 public void ReturnJob(IJob job) { 12 IDisposable disposable = job as IDisposable; 13 if (disposable != null) { 14 disposable.Dispose(); 15 } 16 } 17 }
从 NewJob 方法上可以看出, 实例出来的 Job 并不是最终要执行的 Job, 而是 IsolatedJob 的实例, 它类似中间人的身份.
IsolatedJob 实现了 IInterruptableJob 接口, 为中断执行中的任务埋下伏笔.
在 IsolatedJob 的构造方法中, 通过 IsolateDomainLoader 新建一个应用程序域:
IsolatedDomainLoader 的构造函数:
1 public IsolateDomainLoader(string path, string configFileName = "") { 2 AppDomainSetup setup = new AppDomainSetup(); 3 setup.ApplicationName = "IsolateDomainLoader"; 4 setup.ApplicationBase = path; 5 setup.DynamicBase = path; 6 setup.PrivateBinPath = path; 7 setup.CachePath = setup.ApplicationBase; 8 setup.ShadowCopyFiles = "true"; 9 setup.ShadowCopyDirectories = setup.ApplicationBase; 10 if (!string.IsNullOrWhiteSpace(configFileName)) { 11 setup.ConfigurationFile = configFileName; 12 setup.ConfigurationFile = Path.Combine(path, configFileName); 13 } 14 this.Domain = AppDomain.CreateDomain("ApplicationLoaderDomain", null, setup); 15 }
参数 path 即最终要执行的 job 所在的 dll 的路径.
configFileName 即独立的配置文件名称.
这样一来, 一个 job 一个文件夹, 文件夹内放置这个 job 相关的DLL和配置文件, 和主程序完全隔离开来.
上面说 IsolatedJob 是个中间人, 这里解释一下:
1, IsolatedJobFactory 的 NewJob 方法返回的是 IsolatedJob 的实例, 而不是最终要执行的 Job.
2, 在 IsolatedJob 中, 会通过独立的应用程序域 实例一个最终要执行的 Job 的远程对象(通过 RemoteObject).
3, 当中间人的 Execute 方法被调用时, 会调用远程 Job 对象的 Execute 方法.
4, Interrupt 方法同理.
远程对象续约
因为独立的应用程序域用到了远程对象: MarshalByRefObject, 因此涉及到了远程对象的租约过期及续租的问题.
远程对象的租约默认为 5 分钟, 可以重写 InitializeLifetimeService 方法来修改租约的有效期. 但是一个 Job 不确定要执行多长时间, 修改租约有效期不是很合适, 所以这里是通过续约的方式来处理租约过期的问题.
本人对租约了解不多, 不多嘴.感兴趣的话,可参见源码:
QM.RemoteLoader.RemoteObjectSponsor 类
和 QM.RemoteLoader.IsolateDomainLoader类的 GetObject 方法.
立即中断正在执行的任务
这个命题是有条件的, 即: 任务必须实现: IInterruptableJob 接口.
一般一个任务要执行很长时间, 如果不给个中断的接口, 那就只能关闭服务或等任务执行完毕了.
实现了这个接口,在配合 CancellationToken.ThrowIfCancellationRequested 方法就可以中断当前执行的任务了(别告诉我,你的任务是单线程的).
卸载域
任务执行完成后, 会将关联的 IsolatedJob对象释放, 在 IsolatedJob 的 Dispose 方法中,会把IsolateDomainLoader 对象释放,IsolateDomainLoader 释放的时候, 会把关联的子应用程序域卸载.
所以, 如果如果你的任务是多线程的, 请在线程远行完之前, 进行阻塞.
自定义Job的基类
目前, 如果自定义的 Job 的基类在第三方DLL中, 而且第三方DLL未引用到QM.Server 项目中, 并且不在 QM.Server\Jobs 目录下, 会报:
未能加载文件或程序集 XXX 或它的某一个依赖项。系统找不到指定的文件。
解决办法有两种:
1, 将缺少的DLL放到Jobs 目录下.
2, 将缺少的DLL添加引用到 QM.Server 中.
注意, 该限制只针对 Job 的基类. 除基类使用外的第三方DLL不需要这样做, 在JOB上引用就是了.
放上一段不用的, 可终止的 任务示例代码 给你做参考
1 [ParameterType(typeof(FetcherParameter))] 2 public class ScheduleFetcherJob : IInterruptableJob, IDisposable { 3 4 5 private CancellationTokenSource CTS = new CancellationTokenSource(); 6 7 private long JobID = DateTime.Now.Ticks; 8 public void Execute(IJobExecutionContext context) { 9 10 var par = context.JobDetail.JobDataMap.Parse<FetcherParameter>(); 11 this.CTS.Token.Register(() => { 12 Console.WriteLine("正在尝试终止当前任务"); 13 }); 14 this.Execute(par); 15 } 16 17 private string GetUrl(string org, string dest) { 18 return string.Format("http://www.soushipping.com/shipping/{0}/{1}/{2}", 19 org, dest, 20 DateTime.Now.ToString("yyyy-MM-dd")); 21 } 22 23 private void Execute(FetcherParameter par) { 24 IFetcher<string> cityFetcher = new OrginCityFetcher(); 25 var orgCities = cityFetcher.GetDatasByUrl(cityFetcher.InitUrl); 26 cityFetcher = new DestCityFetcher(); 27 var destCities = cityFetcher.GetDatasByUrl(cityFetcher.InitUrl); 28 29 Console.WriteLine("找到 {0} 条始发地, {1} 条目的地", orgCities.Count(), destCities.Count()); 30 31 var limitScd = new LimitedConcurrencyLevelTaskScheduler(par.MaxThread); 32 33 var factory = new TaskFactory(limitScd); 34 35 List<Task> tasks = new List<Task>(); 36 foreach (var oc in orgCities.ToList()) { 37 foreach (var dc in destCities.ToList()) { 38 //注意下面这句的参数 t, 如果带这个参数, IsCanceled 永远都为 false 39 //var task = Task.Factory.StartNew((t) => { 40 var task = factory.StartNew(() => { 41 this.CTS.Token.ThrowIfCancellationRequested(); 42 43 44 var url = this.GetUrl(oc, dc); 45 46 var fetcher = new ScheduleFetcher(url); 47 fetcher.PageFetchCompleted += fetcher_PageFetchCompleted; 48 fetcher.DownloadCompleted += fetcher_DownloadCompleted; 49 fetcher.Fetch(); 50 fetcher = null; 51 }, this.CTS.Token) 52 .ContinueWith(t => { 53 //var completed = tasks.Where(tt => tt.Status == TaskStatus.RanToCompletion).Count(); 54 //Console.WriteLine("{0}\t已完成:{1}", DateTime.Now.ToString("yyyy/MM/dd"), completed); 55 var arr = tasks.GroupBy(tt => tt.Status).Select(g => string.Format("{0}:{1}", g.Key, g.Count())); 56 Console.WriteLine("{0}\t{1}", DateTime.Now.ToString("MM/dd HH:mm:ss"), string.Join(" ", arr)); 57 t.Dispose(); 58 });// 59 //, TaskContinuationOptions.OnlyOnRanToCompletion) 60 //.ContinueWith(t => { 61 // //Console.WriteLine("正在取消"); 62 // t.Dispose(); 63 //}, TaskContinuationOptions.OnlyOnCanceled).ContinueWith(t => { 64 // Console.WriteLine("发生错误"); 65 // t.Dispose(); 66 //}, TaskContinuationOptions.OnlyOnFaulted); 67 68 tasks.Add(task); 69 } 70 } 71 72 try { 73 Task.WaitAll(tasks.ToArray()); 74 } catch (AggregateException ex) { 75 ex.Handle(er => er is TaskCanceledException); 76 } 77 Console.WriteLine("任务完成"); 78 } 79 80 #region 81 //private void Execute2(FetcherParameter par) { 82 // IFetcher<string> cityFetcher = new OrginCityFetcher(); 83 // var orgCities = cityFetcher.GetDatasByUrl(cityFetcher.InitUrl); 84 // cityFetcher = new DestCityFetcher(); 85 // var destCities = cityFetcher.GetDatasByUrl(cityFetcher.InitUrl); 86 87 // Console.WriteLine("找到 {0} 条始发地, {1} 条目的地", orgCities.Count(), destCities.Count()); 88 89 // var urls = orgCities.SelectMany(o => destCities.Select(d => this.GetUrl(o, d))); 90 91 // var opts = new ParallelOptions() { 92 // MaxDegreeOfParallelism = par.MaxThread 93 // }; 94 95 // var total = urls.Count(); 96 // object lockObj = new object(); 97 98 // //int sum = 0; 99 // Parallel.ForEach(urls, opts, 100 // (url) => { 101 // var fetcher = new ScheduleFetcher(url); 102 // fetcher.PageFetchCompleted += fetcher_PageFetchCompleted; 103 // fetcher.DownloadCompleted += fetcher_DownloadCompleted; 104 // fetcher.Fetch(); 105 // fetcher = null; 106 107 // lock (lockObj) { 108 // total--; 109 // Console.WriteLine(total); 110 // } 111 // } 112 // ); 113 114 //} 115 #endregion 116 117 private void fetcher_DownloadCompleted(object sender, DownloadArgs e) { 118 if (e.ExceptionStatus.HasValue) { 119 Console.WriteLine("{0}\t请求地址: {1} 时,发生异常 {2}, 请检查网络环境.", DateTime.Now.ToString("yyyy/MM/dd HH:mm:ss"), e.Url, e.ExceptionStatus); 120 } 121 } 122 123 124 private List<DIRTY_SCHEDULE> Datas = new List<DIRTY_SCHEDULE>(); 125 private object lockObj = new object(); 126 127 void fetcher_PageFetchCompleted(object sender, FetchArgs<DIRTY_SCHEDULE> e) { 128 var datas = e.Datas.Distinct(d => d.UNQTAG); 129 130 lock (lockObj) { 131 this.Datas.AddRange(datas); 132 if (this.Datas.Count > 100) { 133 var tmp = new DIRTY_SCHEDULE[this.Datas.Count]; 134 this.Datas.CopyTo(tmp); 135 this.Datas = new List<DIRTY_SCHEDULE>(); 136 //不是放入线程池, 而是立即执行的线程 137 var tr = new Thread(new ParameterizedThreadStart(this.SaveDatas)); 138 tr.Start(tmp); 139 } 140 } 141 } 142 143 private void SaveDatas(object state) { 144 IEnumerable<DIRTY_SCHEDULE> datas = (IEnumerable<DIRTY_SCHEDULE>)state; 145 var biz = new Biz.DirtyScheduleBiz(); 146 biz.SaveDirtySchedule(datas, this.JobID); 147 } 148 149 public void Interrupt() { 150 this.CTS.Cancel(); 151 } 152 153 ~ScheduleFetcherJob() { 154 Dispose(false); 155 } 156 157 public void Dispose() { 158 this.Dispose(true); 159 GC.SuppressFinalize(this); 160 } 161 162 protected virtual void Dispose(bool disposing) { 163 if (disposing) { 164 if (this.CTS != null) 165 this.CTS.Dispose(); 166 167 Console.WriteLine("Job Disposed"); 168 } 169 } 170 }
最后, 源码下载
https://github.com/gruan01/QM
谢谢围观, 新年快乐!
----------------------------
题外: 大年初二, 我手一抖, 把断断续续写了快一年的东西给误删了!误删了啊!
用 360 的数据恢复功能, 没错, 是 360, 找出的文件, 我哭了, 数据库(SQLCE) 恢复出来的文件损坏, 用SQLCE的修复工具修复, 是个空库! 也就是说, 恢复出来的文件就是个屁!跟本就没有恢复出来!
EXCEL 文件也一样, 打不开!
更糟糕的是, 我没有验证, 恢复之后就直接盖到原来的位置上了!
泪奔啊, 大过年的, 我就忙着干这个去了!