OFBiz中JOB的运行机制

OFBiz执行后台任务的类在org.ofbiz.service.job中。
JobPoller和JobInvoker是主要的两个类,一个负责查询可以执行的Job,另一个执行Job任务。Job类图如下所示。

1.Job轮询
创建JobManager时,会创建JobPoller的一个实例。JobPoller实现了Runnable接口,以此创建线程后 通过JobManager一直轮询是否有Job需要执行,如果有奖将其放入队列中。

1     public synchronized void run() {  2         try {  3             // wait 30 seconds before the first poll  4             java.lang.Thread.sleep(30000);  5         } catch (InterruptedException e) {  6         }  7         while (isRunning) {  8             try {  9                 // grab a list of jobs to run. 10                 List<Job> pollList = jm.poll(); 11                 //Debug.logInfo("Received poll list from JobManager [" + pollList.size() + "]", module); 12  13                 for (Job job : pollList) { 14                     if (job.isValid()) { 15                         queueNow(job); 16                         //Debug.logInfo("Job [" + job.getJobId() + "] is queued", module); 17                     } 18                 } 19                 // NOTE: using sleep instead of wait for stricter locking 20                 java.lang.Thread.sleep(pollWaitTime()); 21             } catch (InterruptedException e) { 22                 Debug.logError(e, module); 23                 stop(); 24             } 25         } 26     } 27

queueNow方法将要执行job放入到队列中,如果队列中的等待执行的job数量很多,那么就创建一定数量的线程执行这些job。

1    public void queueNow(Job job) {  2         synchronized (run) {  6             run.add(job);  7         }  8         if (Debug.verboseOn()) Debug.logVerbose("New run queue size: " + run.size(), module);  9         if (run.size() > pool.size() && pool.size() < maxThreads()) { 10             synchronized (pool) { 11                 if (run.size() > pool.size() && pool.size() < maxThreads()) { 12                     int calcSize = (run.size() / jobsPerThread()) - (pool.size()); 13                     int addSize = calcSize > maxThreads() ? maxThreads() : calcSize; 14  15                     for (int i = 0; i < addSize; i++) { 16                         JobInvoker iv = new JobInvoker(this, invokerWaitTime()); 17                         pool.add(iv); 18                     } 19                 } 20             } 21         } 22     }

JobInvoker就是执行的线程,它从queue中取job并执行。JobInvoker线程不是一直运行下去,运行的时间长度超过一定的值(见 serviceengine.xml中ttl的值)线程就会停止并从pool中删除。JobInvoker的run方法中job.exec()执行具体的 任务。
2.Job执行
Job类都有一个exec方法,用户执行Job的service。如GenericServiceJob中的exec方法如下:

1     public void exec() throws InvalidJobException {  2         init();  3   4         // no transaction is necessary since runSync handles this  5         try {  6             // get the dispatcher and invoke the service via runSync -- will run all ECAs  7             LocalDispatcher dispatcher = dctx.getDispatcher();  8             Map<String, Object> result = dispatcher.runSync(getServiceName(), getContext());  9  10             // check for a failure 11             boolean isError = ModelService.RESPOND_ERROR.equals(result.get(ModelService.RESPONSE_MESSAGE)); 12             if (isError) { 13                  String errorMessage = (String) result.get(ModelService.ERROR_MESSAGE); 14                  this.failed(new Exception(errorMessage)); 15             } 16  17             if (requester != null) { 18                 requester.receiveResult(result); 19             } 20  21         } catch (Throwable t) { 22             // pass the exception back to the requester. 23             if (requester != null) { 24                 requester.receiveThrowable(t); 25             } 26  27             // call the failed method 28             this.failed(t); 29         } 30  31         // call the finish method 32         this.finish(); 33     } 34

在执行service执行,有一个init方法,在PersistedServiceJob类中init方法主要是生成下一个执行的任务,如果有的话。也 即是说每一个job是由当时执行的这个job生成的,根据是什么呢?主要是两个变量:tempExprId和 maxRecurrenceCount,init方法中:

1     TemporalExpression expr = null;  2     ……  3   4     if (expr == null && UtilValidate.isNotEmpty(job.getString("tempExprId"))) {  5             try {  6                 expr = TemporalExpressionWorker.getTemporalExpression(this.delegator, job.getString("tempExprId"));  7             } catch (GenericEntityException e) {  8                 throw new RuntimeException(e.getMessage());  9             } 10         } 11

TemporalExpressionWorker里面有一个makeTemporalExpression方法很重要,从这个方法可以知道怎么配置 TemporalExpression实体数据了,当然要结合TemporalExpressions类,里面定义了各种配置的细节。
tempExprTypeId有如下几种:
DateRange DayInMonth DayOfMonthRange DayOfWeekRange Difference Frequency Intersection MonthRange TimeOfDayRange Union
比如如果希望服务只执行一次,可以如下配置:     <TemporalExpression tempExprId="RUNONCE" tempExprTypeId="FREQUENCY" integer1="1" integer2="1"/>     <JobSandbox jobId="CurrencyRateSynAll" jobName="Currency Rate SynAll" runTime="2010-02-26 09:38:00.000" serviceName="currencyRateSynAll" poolId="pool" runAsUser="system" tempExprId="RUNONCE" maxRecurrenceCount="0"/>
maxRecurrenceCount="0" 表示,不重复。tempExprTypeId="FREQUENCY" integer1="1" integer2="1"表示一年执行一次。所以总共执行一次就结束了。
每天都执行可以这样配置:
<TemporalExpression tempExprId="MIDNIGHT_DAILY" tempExprTypeId="TIME_OF_DAY_RANGE" string1="20:00:00" string2="20:00:00"/>     <JobSandbox jobId="MailNotification" jobName="Mail Notification Job" runTime="2010-02-25 18:00:00.000" serviceName="mailNotificantion" poolId="pool" runAsUser="system" tempExprId="MIDNIGHT_DAILY" maxRecurrenceCount="-1"/>
maxRecurrenceCount="-1"表示无限循环下去。tempExprId="MIDNIGHT_DAILY" tempExprTypeId="TIME_OF_DAY_RANGE" string1="20:00:00" string2="20:00:00"/>表示每天晚上八点执行。
每个月一次任务可以如下配置:
<TemporalExpression tempExprId="ONCEINMONTH" tempExprTypeId="FREQUENCY" date1="2010-02-26 11:05:00.000" integer1="2" integer2="1"/>     <JobSandbox jobId="CurrencyRateSyn" jobName="Currency Rate Syn" runTime="2010-02-26 11:05:00.000" serviceName="currencyRateSyn" poolId="pool" runAsUser="system" tempExprId="ONCEINMONTH" maxRecurrenceCount="-1"/>
tempExprTypeId="FREQUENCY" date1="2010-02-26 11:05:00.000" integer1="2" integer2="1"表示每月一次,时间就是date1定义的时间,如果没用定义date1,那么就是当前时间。
这里的配置相当灵活,好好掌握。

仅供个人收藏 转载自:http://www.blogjava.net/persister/archive/2010/02/26/313974.html

时间: 2024-10-09 13:00:26

OFBiz中JOB的运行机制的相关文章

C#基础:.NET中GC的运行机制

一.什么是GC GC是垃圾回收(Garbage Collector)的缩写.GC可以说是.NET众多机制中最为重要的,对程序员代码书写方式影响最大的机制.在CLR规范制定之初,所有机制都还在斟酌的时候,垃圾回收已经被确定会存在于.NET框架之中. .NET的程序大部分被称为被托管的代码.托管的意义很广泛,其中重要的一点就是代码中对象内存的分配和释放是由.NET内存管理和垃圾回收机制统一管理的.和传统C++的程序员不同,C#或者其他面向.NET框架语言的程序员不再需要时刻警惕内存的泄露,因为托管对

转 OFBiz中services调用机制

OFBiz业务方法里面,当执行一个service的时候,通常采用如下的方式: LocalDispatcher dispatcher = dctx.getDispatcher();     Map<String, Object> result = dispatcher.runSync(getServiceName(), getContext()); LocalDispatcher是本地调度器,实现服务的同步异步调度和定时任务的调度.与服务调度相关的类图如下: LocalDispatcher是一个

OFBiz中services调用机制

OFBiz业务方法里面,当执行一个service的时候,通常采用如下的方式: LocalDispatcher dispatcher = dctx.getDispatcher();     Map<String, Object> result = dispatcher.runSync(getServiceName(), getContext()); LocalDispatcher是本地调度器,实现服务的同步异步调度和定时任务的调度.与服务调度相关的类图如下: LocalDispatcher是一个

深入理解struts的运行机制

在此申明本博文并非原创,原文:http://blog.csdn.net/lenotang/article/details/3336623,本文章是在此文章基础上进行优化.也谈不上优化,只是加上了点自己的想法 jar包准备 为什么会用到这两个jar包呢,因为我需要通过这个jar来解析xml配置文件. 新建项目 流程梳理 struts配置文件 <?xml version="1.0" encoding="UTF-8"?> <struts> <

Spark架构及运行机制

Spark是基于内存计算的大数据并行计算框架.因为其基于内存计算,较Hadoop中MapReduce计算框架具有更高的实时性,同时保证了高效容错性和可伸缩性.从2009年诞生于AMPLab到现在已经成为Apache顶级开源项目,并成功应用于商业集群中.学习Spark就需要了解其架构及运行机制. Spark架构 Spark架构使用了分布式计算中master-slave模型,master是集群中含有master进程的节点,slave是集群中含有worker进程的节点. master作为整个集群的控制

SSL/TLS协议运行机制的概述

转自:SSL/TLS协议运行机制的概述 作者: 阮一峰 日期: 2014年2月 5日 互联网的通信安全,建立在SSL/TLS协议之上. 本文简要介绍SSL/TLS协议的运行机制.文章的重点是设计思想和运行过程,不涉及具体的实现细节.如果想了解这方面的内容,请参阅RFC文档. 一.作用 不使用SSL/TLS的HTTP通信,就是不加密的通信.所有信息明文传播,带来了三大风险. (1) 窃听风险(eavesdropping):第三方可以获知通信内容. (2) 篡改风险(tampering):第三方可以

java运行机制详细

JVM(Java虚拟机)一种用于计算设备的规范,可用不同的方式(软件或硬件)加以实现.编译虚拟机的指令集与编译微处理器的指令集非常类似.Java虚拟机包括一套字节码指令集.一组寄存器.一个栈.一个垃圾回收堆和一个存储方法域. Java虚拟机(JVM)是可运行Java代码的假想计算机.只要根据JVM规格描述将解释器移植到特定的计算机上,就能保证经过编译的任何Java代码能够在该系统上运行. 1.为什么要使用Java虚拟机 Java语言的一个非常重要的特点就是与平台的无关性.而使用Java虚拟机是实

【Spark Core】任务运行机制和Task源代码浅析1

引言 上一小节<TaskScheduler源代码与任务提交原理浅析2>介绍了Driver側将Stage进行划分.依据Executor闲置情况分发任务,终于通过DriverActor向executorActor发送任务消息. 我们要了解Executor的运行机制首先要了解Executor在Driver側的注冊过程.这篇文章先了解一下Application和Executor的注冊过程. 1. Task类及其相关 1.1 Task类 Spark将由Executor运行的Task分为ShuffleMa

JavaWeb三大组件——过滤器的运行机制理解

过滤器Filter 文章前言:本文侧重实用和理解. 一.过滤器的概念. lFilter也称之为过滤器,它是Servlet技术中最实用的技术,WEB开发人员通过Filter技术,对web服务器管理的所有web资源:例如Jsp, Servlet, 静态图片文件或静态 html 文件等进行拦截,从而实现一些特殊的功能.例如实现URL级别的权限访问控制.过滤敏感词汇.压缩响应信息等一些高级功能. 二.过滤器的运行机制. 没有加Filter的web项目运行机制如下: 加上Filter的web运行机制: 由