流程控制:分布式并行任务流程控制

  • 背景:

目前工作中遇到一个比较急,又有点费事的工作任务:

1)目前系统中已经已经包含了一些比较完善的部分模块,但是模块之间没有一个控制流程来管理,就造成程序没有办法自动化;

2)已经完成模块中有几个是采用分布式部署,但各个服务器之间又是采用的并行执行不同的任务(目的最大化利用服务器,节省处理总耗时):这写对流程化控制带来了一些控制繁琐问题。

3)目前并不需要考虑太多稳定行的问题,但是流程控制程序必须考虑到高可用性(就是需要部署为HA)。

  • 目前已经拥有的功能模块:

1)采集及更新工参、KPI、路测、扫频等到数据库:这些数据在ftp上,因此,每次更新需要从ftp上自动采集数据(当然是任务触发时最新的数据)之后更新到数据库中;

2)采集mr数据及解析mr,这里分为了三个功能模块:

2.1)采集mr功能:目前已经包含,但是功能还是不太完善,每次采集的触发时间应该是当任务触发时开启采集(这样避免了未领取到任务时盲目的采集,造成服务器性能下降其他分析慢,IO存储不足问题),采集距离“采集任务”触发时间最近且完整的数据;

2.2)mr大压缩包解压任务:目前系统中包含的mr数据文件格式比较复杂,可能包含大压缩包套文件,或者达压缩吧套小压缩包的问题,最终需要加压为最小压缩包(也就是在解压一次就是.xml文件);

2.3)mr解析:mr解析是一个分布式部署(部署在多台服务器,按照公式:enb%服务器数量,得到的值来分配enb到具体哪台服务器),但是每台服务器与每台服务器之间是没有联系的:每台服务器只负责并行的处理分配到自己节点上的任务。

3)mr栅格化:由于上边mr解析后分别存储到自己节点服务器上的数据,因此这里的mr栅格化数据也是分布式的部署在每台节点服务器上。

  • 解决方案思路:

为此,我写了一个代码逻辑框架:

文件Program.cs是代码核心业务控制:

1)它支持部署HA:可以部署多台服务器,和一台服务器一样工作。

2)核心业务控制思路:

 1         static void Main(string[] args)
 2         {
 3             while (true)
 4             {
 5                 // TaskLock在zookeeper或sqlserver数据中只存在唯一的一条记录。
 6                 TaskLock taskLock = TaskLockBO.Get();
 7
 8                 if (taskLock.Lock == LockStatus.Locked)
 9                 {
10                     // 获取正在执行的任务。。。。
11                     List<Task> taskItems = TaskBO.GetToDoTaskByTaskGroup(taskLock.DoingTaskGroup);
12
13                     if (taskItems == null || taskItems.Count == 0)
14                     {
15                         // 修改taskLock.Lock=UnLock、taskLock.DoingTaskGroup=Guid.Empty
16                     }
17                     else
18                     {
19                         // 按照任务的优先级执行task
20                         // 开始调度每台计算服务器上的任务。。。
21                         // 1)如果第一个待执行(todo)的任务是“工参导入或更新”,修改其任务状态为doing。
22                         //       “工参导入或更新”服务只可能部署在一台服务器上或者就是这里实现,当获取到归属自己的任务状态是doing时,就开始从ftp上采集工参数据,并解析导入,完成修改任务状态为done,失败修改任务状态为fail
23                         // 2)如果第一个待执行(todo)的任务是“采集MR”,修改其任务状态为doing。
24                         //       “采集MR”服务也是只可能部署在一台服务器上(但不可能在这里执行),当获取到归属自己的任务状态是doing时,就开始监控ftp,并采集ftp数据到本地,完成修改任务状态为done,失败修改任务状态为fail
25                         // 3)如果第一个待执行(todo)的任务是“解压超大压缩包”,修改其任务状态为doing.
26                         //       “解压超大压缩包”服务也是只可能部署在一台服务器上(但不可能在这里执行),当获取到归属自己的任务状态是doing时,就开始循环遍历采集的mr,若找到超大压缩包就进行超大压缩包解压,完成修改任务状态为done,失败修改任务状态为fail
27                         // 4)如果第一个待执行(todo)的任务是“解析MR”,修改任务状态为predoing,循环遍历ftp目录的数据按照分发规则把mr问价分发到不同的计算节点服务器指定的位置,并创建“解析mr子任务”给每台解析mr服务器,并修改该任务状态为doing
28                         //       注意:这里是分布式处理的,因此给所有子节点分配任务后统一修改所有“解析MR”任务状态为doing(每个compute包含一个“mr解析”任务).
29                         //       “解析MR”服务部署在多个解析处理服务器上,当获取到归属自己的节点的“解析mr”任务状态是doing时,就开始获取自己的节点下的“解析mr子任务”逐个处理,处理完成后修改归属自己的节点“mr解析”任务状态为done,失败修改任务状态为fail
30                         //       注意:这里是分布式处理的,因此需要考虑到等待所有节点“mr解析”都完成后,才可以进行下一步。
31                         // 5)如果第一个待执行(todo)的任务是“mr栅格化”,修改任务状态为doing.
32                         //       注意:这里是分布式处理的,因此给所有子节点分配任务后统一修改所有“MR栅格化”任务状态为doing(每个compute包含一个“MR栅格化”任务).
33                         //       “MR栅格化”服务部署在多个处理服务器上,当获取到归属自己的节点的“mr栅格化”任务状态是doing时,开始逐个处理自己节点上的“mr栅格化”任务,处理完成后修改归属自己的节点“mr栅格化”任务状态为done,失败修改任务状态为fail
34                         //       注意:这里是分布式处理的,因此需要考虑到等待所有节点“mr栅格化”都完成后,才可以进行下一步。
35                     }
36                 }
37                 else if (taskLock.Lock == LockStatus.UnLock)
38                 {
39                     // 尝试获取新的任务。
40                     // 修改taskLock.lock=PreLock
41
42
43                     // 添加任务成功,则修改taskLock.Lock=Locked、taskLock.DoingTaskGroup赋值;添加任务失败,则修改taskLock.Lock=UnLock
44                 }
45
46                 // 5 分钟轮询一次。
47                 Thread.Sleep(5 * 60 * 1000);
48             }
49         }

3)任务状态&类型&定义包含:

    enum TaskType
    {
        /// <summary>
        /// 导入或更新工参、KPI等数据
        /// </summary>
        ImportSiteCellKpi = 0,
        /// <summary>
        /// 采集MR数据
        /// </summary>
        GatherMR = 1,
        /// <summary>
        /// 尝试解压包含多层压缩包的MR数据
        /// </summary>
        DoUnZipMR = 2,
        /// <summary>
        /// 解析入库MR数据
        /// </summary>
        DoParserMR = 3,
        /// <summary>
        /// MR栅格化
        /// </summary>
        DoMRRaster = 4,
    }
    enum TaskStatus
    {
        Todo = 0,        PreDoing = 1,        Doing = 2,        Done = 3,        Fail = 4
    }
    class Task
    {
        /// <summary>
        /// 如果为同一批次批量处理流程,则TaskGroup为同一个Guid值。
        /// </summary>
        public Guid TaskGroup { get; set; }
        public int TaskId { get; set; }
        public TaskType TaskType { get; set; }
        public TaskStatus TaskStatus { get; set; }
        /// <summary>
        /// 任务优先级
        /// </summary>
        public int Priority { get; set; }
        public string ComputeIP { get; set; }
        public DateTime CreateTime { get; set; }
        public DateTime DoingTime { get; set; }
        public DateTime DoneTime { get; set; }
        public DateTime FailTime { get; set; }
    }

4)任务流程控制锁(全局锁):

    enum LockStatus
    {
        PreLock = 0,
        Locked = 1,
        UnLock = 2
    }
    /// <summary>
    /// 所有任务流程是否被Lock掉
    /// 1)当一批次任务未完成之前就不允许有任何新的一批任务开始执行,
    ///     必须等到一批次任务流程执行完成后才可以执行,否则将会导致数据执行的速度过慢,或者导致数据混乱情况。
    /// 2)整个系统中,要确保正在执行的任务流程只有唯一一个,否则系统将会造成性能底下,或者出现数据错乱情况。
    /// </summary>
    class TaskLock
    {
        public LockStatus Lock { get; set; }
        /// <summary>
        /// 当开始执行新的一批次任务流程是,把该批次任务流程组编号写入此处,同时修改TaskLock.isLock为true.
        /// </summary>
        public Guid DoingTaskGroup { get; set; }
    }

代码下载:

链接:http://pan.baidu.com/s/1pKIYwl1 密码:mbl7

时间: 2024-10-18 04:38:36

流程控制:分布式并行任务流程控制的相关文章

spring mvc控制框架的流程及原理1: 总概及源码分析

主要介绍spring mvc控制框架的流程及原理 Spring Web MVC处理请求的流程 具体执行步骤如下: 首先用户发送请求————>前端控制器,前端控制器根据请求信息(如URL)来决定选择哪一个页面控制器进行处理并把请求委托给它,即以前的控制器的控制逻辑部分:图2-1中的1.2步骤: 页面控制器接收到请求后,进行功能处理,首先需要收集和绑定请求参数到一个对象,这个对象在Spring Web MVC中叫命令对象,并进行验证,然后将命令对象委托给业务对象进行处理:处理完毕后返回一个Model

Cordys BOP 4平台的子流程与嵌入子流程对比以及子流程建模技术

子流程含义是在某个流程流转过程中可以创建一个新的流程并执行,结束后可以在此返回父流程. 关于子流程 子流程 子流程是BPMN构件,是在一个流程中的复合的活动.子流程可以分解为一系列更精细的活动.子流程通过父流程实例化. 子流程可以设计成交易的一部分,下面列出三种可能的交易结果: 成功完成(Successful completion):流程按顺序执行,并且交易执行成功: 取消(Cancel):交易内的活动回滚,并且补充特定活动: 异常(Exception):出现异常活动不需要回滚,将从错误事件继续

BOS项目 第9天(activiti工作流第一天,工作流概念、工作流所需要的23张表、eclipse安装流程设计插件、流程api基本操作)

BOS项目笔记 第9天 今天内容安排: 1.工作流概念 2.安装流程设计器插件(eclipse)----设计流程图 3.创建activiti数据库(23张表) 4.activiti的API操作流程 1. 工作流概念 工作流(Workflow),就是"业务过程的部分或整体在计算机应用环境下的自动化",它主要解决的是"使在多个参与者之间按照某种预定义的规则传递文档.信息或任务的过程自动进行,从而实现某个预期的业务目标,或者促使此目标的实现". 工作流管理系统(Workf

按键精灵 用全局变量控制线程 子线程控制主线

//************************************************用全局变量控制线程 子线程控制主线 Global b b = 1 线程控制ID = BeginThread(线程控制)//启动线程 While b=1 Delay 2000 Call Plugin.Msg.Tips("我是主线程") wend Rem aaa Delay 8000 While b=2 Delay 2000 Call Plugin.Msg.Tips("我是主线程副

音乐播放器(支持本地搜索,字母检索,进度条控制,后台来电控制音乐)

音乐播放器(支持本地搜索,字母检索,进度条控制,后台来电控制音乐) 自己做的一个简单的音乐播放器,ViewPager+ListView布局. 下载地址:http://www.devstore.cn/code/info/541.html 运行截图:   版权声明:本文为博主原创文章,未经博主允许不得转载.

外来人员如何发起流程与参与内部流程?CCFlow中Guest流程说明

定义:分组织结构内的人员参与的流程,称为外部用户参与流程也叫客户参与流程. 应用背景:我们给学校设计一个流程,学校教师是内部用户,是组织结构内的用户,学生就是外部用户.如果我们制作一个学生请假流程,一条流程中有多个节点是学生参与的.这样的节点是学生参与的,我们就称为该流程是客户参与流程,学生参与的节点就是外部用户参与节点. 这种应用场景非常之多,比如我们为税务局设计一个系统税务系统工作人员是内部用户,纳税人就是外部用户. 我们为企业设计一个erp ,企业内部的组织机构人员是内部用户,供应商.客户

流程分析方法与流程分级

现实的企业流程是客观存在的,流程分析是基于企业现实展开:流程分析是流程优化和流程管理的基础和前提.任何企业的流程都是企业战略和企业目标持续作用的结果,也就是说企业战略与目标决定企业的组织架构,组织架构引导企业岗位的设置,每个岗位之间工作衔接的方式就产生了企业的现有流程.这种流程必然受到当时企业内外环境和岗位员工素质的影响,是否适应今天变化了的现实则是需要通过流程梳理和分析来判断和改进.因此,我们可以说流程分析就是对企业做事方式的全景式扫描,也可以说是漫镜头式的检阅.要进行有效地扫描或检阅,就必须

activiti学习3:流程引擎对象和流程引擎配置对象

目录 activiti学习3:流程引擎对象和流程引擎配置对象 一.activiti的简单使用流程 二.流程引擎配置对象ProcessEngineConfiguration的介绍 三.activiti配置文件的介绍 四.获取流程引擎配置对象 4.1 读取默认名称的配置文件来创建配置对象 4.2 读取自定义名称的配置文件来创建流程引擎配置对象 五.流程引擎对象 5.1 buildProcessEngine方法创建 5.2 用ProcessEngines类来获取 六.总结 activiti学习3:流程

php流程控制(新建流程)

流程控制,又是一个广泛应用于办公自动化等内部自定义流程及审核的功能.比如请假流程啊,申请办公用品经费,报销申请啊,等等,需要层层审批通过才能完成的(按规章办事,最好别越级,你说是吧) 不习惯一篇博客太长,看不过来,这篇我们先实现流程的新建 先看看效果图 1.有个下拉选择用户,可以依次添加到流程的节点位置, 2.下方显示流程的每个节点 3.填写流程名称,点击保存即可完成流程的新建 数据库中需要三张表,1.用户表2.流程表 3.流程节点表 选择用户和添加节点按钮 <div> 请选择用户: <