详解应对平台高并发的分布式调度框架TBSchedule

tbschedule是一款非常优秀的高性能分布式调度框架,非常高兴能分享给大家。这篇文章是我结合多年tbschedule使用经验和研读三遍源码的基础上完成的,期间和阿里空玄有过不少技术交流,非常感谢空玄给予的大力支持。我写这篇文章的目的一是出于对tbschedule的一种热爱,二是现在是一个资源共享、技术共享的时代,希望把它展现给大家(送人玫瑰,手留余香),能给大家的工作带来帮助。

一、tbschedule初识

时下互联网和电商领域,各个平台都存在大数据、高并发的特点,对数据处理的要求越来越高,既要保证高效性,又要保证安全性、准确性。tbschedule的使命就是将调度作业从业务系统中分离出来,降低或者是消除和业务系统的耦合度,进行高效异步任务处理。其实在互联网和电商领域tbschedule的使用非常广泛,目前被应用于阿里巴巴、淘宝、支付宝、京东、聚美、汽车之家、国美等很多互联网企业的流程调度系统。 
        在深入了解tbschedule之前我们先从内部和外部形态对它有个初步认识,如图1.1、图1.2。 

 
        从tbschedule的内部形态来说,与他有关的关键词包括批量任务、动态扩展、多主机、多线程、并发、分片……,这些词看起来非常的高大上,都是时下互联网技术比较流行的词汇。从tbschedule的外部架构来看,一目了然,宿主在调度应用中与zookeeper进行通信。一个框架结构是否是优秀的,从美感的角度就可以看出来,一个好的架构一定是隐藏了内部复杂的原理,外部视觉上美好的,让用户使用起来简单易懂。

二、tbschedule原理 
        为什么TBSchedule值得推广呢?

传统的调度框架spring task、quartz也是可以进行集群调度作业的,一个节点挂了可以将任务漂移给其他节点执行从而避免单点故障,但是不支持分布式作业,一旦达到单机处理极限也会存在问题。 
       elastic-job支持分布式,是一个很好的调度框架,但是开源时间较短,还没有经历大范围市场考验。 
       Beanstalkd基于C语言开发,使用范围较小,无法引入到php、java系统平台。

tbschedule到底有多强大呢?我对tbschedule的优势特点进行了如下总结: 
        1、支持集群、分布式 
        2、灵活的任务分片 
        3、动态的服务扩容和资源回收 
        4、任务监控支持 
        tbschedule支持cluster,可以宿主在多台服务器多个线程组并行进行任务调度,或者说可以将一个大的任务拆成多个小任务分配到不同的服务器。tbschedule的分布式机制是通过灵活的sharding方式实现的,比如可以按所有数据的ID按10取模分片(分片规则如图2.1)、按月份分片等等,根据不同的需求,不同的场景由客户端配置分片规则。我们知道spring task、quarz也是可以进行集群调度作业的,一个节点挂了可以将任务漂移给其他节点执行从而避免单点故障,但是不支持分布式作业,一旦达到单机处理极限也会存在问题,这个就是tbschedule的优势。然后就是tbschedule的宿主服务器可以进行动态扩容和资源回收,这个特点主要是因为它后端依赖的zookeeper,这里的zookeeper对于tbschedule来说是一个nosql,用于存储数据,它的数据结构类似文件系统的目录结构,它的节点有临时节点、持久节点之分。调度引擎上线后,随着业务量数据量的增多,当前cluster可能不能满足目前的处理需求,那么就需要增加服务器数量,一个新的服务器上线后会在zk中创建一个代表当前服务器的一个唯一性路径(临时节点),并且新上线的服务器会和zk有长连接,当通信断开后,节点会自动摘除。tbschedule会定时扫描当前服务器的数量,重新分配任务。tbschedule不仅提供了服务端的高性能调度服务,还提供了一个scheduleConsole war随着宿主应用的部署直接部署到服务器,可以通过web的方式对调度的任务、策略进行监控,实时更新。

是不是已经对tbschedule稍微了有些好感呢?我们接着往下看。 
        tbschedule提供了两个核心组件ScheduleServer、TbscheduleManagerFactory和两类核心接口IScheduleTaskDeal、IScheduleTaskDealSingle、IScheduleTaskDealMuti,这两部分是客户端研发的关键部分,是使用tbschedule必须要了解的。 
        ScheduleServer即任务处理器,的主要作用是任务和策略的管理、任务采集和执行,由一组工作线程组成,这组工作线程是基于队列实现的,进行任务抓取和任务处理(two mode)。每个任务处理器和zookeeper有一个心跳通信连接,用于检测server的状态和进行任务动态分配,举个例子,比如3服务器的worker集群执行出票消息生成任务,这对这个任务类型每台服务器可以配置一个ScheduleSever(即一个线程组),也可以配置两个线程组,相当于6台服务器在执行这个任务。当某台服务器宕机或者其他原因与zk通信断开时,他的任务将被其他服务器接管。ScheduleServer参数定义如图2.2

在这些参数中taskItems是一个非常重要的属性,是客户单可以自由发挥的地方,是任务分片的基础,比如我们处理一个任务可以根据ID按10取模,那么任务项就是0-9,3台服务器分别拿到4、 3、 3个任务项,服务器的上下线都会对任务项进行重新分配。任务项是进行任务分配的最小单位。一个任务项只能由一个ScheduleServer来进行处理,但一个Server可以处理任意数量的任务项。这就是刚才我们说的分片特性。 
        调度服务器TbscheduleManagerFactory的主要工作zookeeper连接参数配置和zookeeper的初始化、调度管理。 
        两类核心接口是需要被我们定义的目标任务实现的,根据自己的需要进行任务采集(重写selectTasks方法)和任务执行(重写execute方法),这类接口是客户端研发自由发挥的地方。 
        接下来我们深入了解下tbschedule,看看它的内部是如何实现的。下面流程图是我花了很多心血通过一周时间画出来的,基本是清晰的展现了tbschedule内部的执行流程以及每个步骤zookeer节点路径和数据的变化。因为图中的注释已经描述的很详细了,每个节点右侧是zk的信息(数据结构见图2.3),这里就不再做过多的文字描述了,有任何建议或者不明白的地方可以找我交流。

Tbschedule还有个强大之处是它提供了两种处理器模式模式: 
        1、SLEEP模式 
        当某一个线程任务处理完毕,从任务池中取不到任务的时候,检查其它线程是否处于活动状态。如果是,则自己休眠;如果其它线程都已经因为没有任务进入休眠,当前线程是最后一个活动线程的时候,就调用业务接口,获取需要处理的任务,放入任务池中,同时唤醒其它休眠线程开始工作。 
        2、NOTSLEEP模式 
        当一个线程任务处理完毕,从任务池中取不到任务的时候,立即调用业务接口获取需要处理的任务,放入任务池中。 
        SLEEP模式内部逻辑相对较简单,如果遇到大任务需要处理较长时间,可能会造成其他线程被动阻塞的情况。但其实生产环境一般都是小而快的任务,即使出现阻塞的情况ScheduleConsole也会及时的监控到。NOTSLEEP模式减少了线程休眠的时间,避免了因大任务造成阻塞的情况,但为了避免数据被重复处理,增加了CPU在数据比较上的开销。TBSchedule默认是SLEEP模式。

到目前为止我相信大家对tbschedule有了一个深刻的了解,心中的疑雾逐渐散开了。理论是实践的基础,实践才是最终的目的,下一节我们将结合理论知识进行tbschedule实战.

三、tbschedule实战

在项目中使用tbschedule需要依赖zookeeper、tbschedule 
        zookeeper依赖:

Java代码  

  1. <dependency>
  2. <groupId>org.apache.zookeeper</groupId
  3. <artifactId>zookeeper</artifactId>
  4. <version>3.4.6</version>
  5. </dependency>

tbschedule依赖:

Java代码  

  1. <dependency>
  2. <groupId>com.taobao.pamirs.schedule</groupId
  3. <artifactId>tbschedule</artifactId>
  4. <version>3.3.3.2</version>
  5. </dependency>

tbschedule有三种引入方式: 
        1、通过ScheduleConsole引入 
        Tbschedule随着宿主调度应用部署到服务器后,可以通过web浏览器的方式访问其提供监控平台。 
        第一步,初始化zookeeper

第二步,创建调度策略

第三步,创建调度任务

第四步,监控调度任务

2、通过原生java引入

Java代码  

  1. // 初始化Spring
  2. ApplicationContext ctx = new FileSystemXmlApplicationContext(
  3. "spring-config.xml");
  4. // 初始化调度工厂
  5. TBScheduleManagerFactory scheduleManagerFactory = new TBScheduleManagerFactory();
  6. Properties p = new Properties();
  7. p.put("zkConnectString", "127.0.0.1:2181");
  8. p.put("rootPath", "/taobao-schedule/train_worker");
  9. p.put("zkSessionTimeout", "60000");
  10. p.put("userName", "train_dev");
  11. p.put("password", " train_dev ");
  12. p.put("isCheckParentPath", "true");
  13. scheduleManagerFactory.setApplicationContext(ctx);
  14. scheduleManagerFactory.init(p);
  15. // 创建任务调度任务的基本信息
  16. String baseTaskTypeName = "DemoTask";
  17. ScheduleTaskType baseTaskType = new ScheduleTaskType();
  18. baseTaskType.setBaseTaskType(baseTaskTypeName);
  19. baseTaskType.setDealBeanName("demoTaskBean");
  20. baseTaskType.setHeartBeatRate(10000);
  21. baseTaskType.setJudgeDeadInterval(100000);
  22. baseTaskType.setTaskParameter("AREA=BJ,YEAR>30");  baseTaskType.setTaskItems(ScheduleTaskType.splitTaskItem(
  23. "0:{TYPE=A,KIND=1},1:{TYPE=A,KIND=2},2:{TYPE=A,KIND=3},3:{TYPE=A,KIND=4}," +
  24. "4:{TYPE=A,KIND=5},5:{TYPE=A,KIND=6},6:{TYPE=A,KIND=7},7:{TYPE=A,KIND=8}," +
  25. "8:{TYPE=A,KIND=9},9:{TYPE=A,KIND=10}"));
  26. baseTaskType.setFetchDataNumber(500);
  27. baseTaskType.setThreadNumber(5);
  28. this.scheduleManagerFactory.getScheduleDataManager()
  29. .createBaseTaskType(baseTaskType);
  30. log.info("创建调度任务成功:" + baseTaskType.toString());
  31. // 创建任务的调度策略
  32. String taskName = baseTaskTypeName;
  33. String strategyName =taskName +"-Strategy";
  34. try {
  35. this.scheduleManagerFactory.getScheduleStrategyManager()
  36. .deleteMachineStrategy(strategyName,true);
  37. } catch (Exception e) {
  38. e.printStackTrace();
  39. }
  40. ScheduleStrategy strategy = new ScheduleStrategy();
  41. strategy.setStrategyName(strategyName);
  42. strategy.setKind(ScheduleStrategy.Kind.Schedule);
  43. strategy.setTaskName(taskName);
  44. strategy.setTaskParameter("china");
  45. strategy.setNumOfSingleServer(1);
  46. strategy.setAssignNum(10);
  47. strategy.setIPList("127.0.0.1".split(","));
  48. this.scheduleManagerFactory.getScheduleStrategyManager()
  49. .createScheduleStrategy(strategy);
  50. log.info("创建调度策略成功:" + strategy.toString());

3、通过spring容器引入

Java代码  

  1. <!-- 初始化zookeeper -->
  2. <bean id="scheduleManagerFactory"
  3. class="com.xx.TBScheduleManagerFactory" init-method="init">
  4. <property name="zkConfig">
  5. <map>
  6. <entry key="zkConnectString" value="127.0.0.1:2181" />
  7. <entry key="rootPath" value="/taobao-schedule/train_worker" />
  8. <entry key="zkSessionTimeout" value="60000" />
  9. <entry key="userName" value="train_dev" />
  10. <entry key="password" value="train_dev" />
  11. <entry key="isCheckParentPath" value="true" />
  12. </map>
  13. </property>
  14. </bean>
  15. <!-- 配置调度策略 凌晨1点到3点执行 -->
  16. <bean id="abstractDemoScheduleTask" class="com.xx.core.tbschedule.InitMyScheduleTask" abstract="true">
  17. <property name="scheduleTaskType.heartBeatRate" value="10000" />
  18. <property name="scheduleTaskType.judgeDeadInterval" value="100000" />
  19. <property name="scheduleTaskType.permitRunStartTime" value="0 0 1 * * ?"/>
  20. <property name="scheduleTaskType.permitRunEndTime" value="0 0 3 * * ?"/>
  21. <property name="scheduleTaskType.taskParameter" value="AREA=BJ,YEAR>30" />
  22. <property name="scheduleTaskType.sleepTimeNoData" value="60000"/>
  23. <property name="scheduleTaskType.sleepTimeInterval" value="60000"/>
  24. <property name="scheduleTaskType.fetchDataNumber" value="500" />
  25. <property name="scheduleTaskType.executeNumber" value="1" />
  26. <property name="scheduleTaskType.threadNumber" value="5" />
  27. <property name="scheduleTaskType.taskItems">
  28. <list>
  29. <value>0:{TYPE=A,KIND=1}</value>
  30. <value>1:{TYPE=A,KIND=2}</value>
  31. <value>2:{TYPE=A,KIND=3}</value>
  32. <value>3:{TYPE=A,KIND=4}</value>
  33. <value>4:{TYPE=A,KIND=5}</value>
  34. <value>5:{TYPE=A,KIND=6}</value>
  35. <value>6:{TYPE=A,KIND=7}</value>
  36. <value>7:{TYPE=A,KIND=8}</value>
  37. <value>8:{TYPE=A,KIND=9}</value>
  38. <value>9:{TYPE=A,KIND=10}</value>
  39. </list>
  40. </property>
  41. <property name="scheduleStrategy.kind" value="Schedule" />
  42. <property name="scheduleStrategy.numOfSingleServer" value="1" />
  43. <property name="scheduleStrategy.assignNum" value="10" />
  44. <property name="scheduleStrategy.iPList">
  45. <list>
  46. <value>127.0.0.1</value>
  47. </list>
  48. </property>
  49. </bean>
  50. <!-- 配置调度任务 -->
  51. <bean id="demoTask" class="com.xx.worker.task.DemoTask" parent="abstractDemoScheduleTask">
  52. <property name="scheduleTaskType.baseTaskType" value="demoTask" />
  53. <property name="scheduleTaskType.dealBeanName" value="demoTaskBean" />
  54. <property name="scheduleStrategy.strategyName" value="demoTaskBean-Strategy" />
  55. <property name="scheduleStrategy.taskName" value="demoTaskBean" />
  56. </bean>

调度任务具体实现 DemoTask.java

Java代码  

  1. /**
  2. * DemoTask任务类
  3. */
  4. ublic class DemoTask  mplements
  5. IScheduleTaskDealSingle,TScheduleTaskDeal {
  6. /**
  7. * 数据采集
  8. * @param taskItemNum--分配的任务项 taskItemList--总任务项
  9. *        eachFetchDataNum--采集任务数量
  10. */
  11. @Override
  12. public List<DemoTask> selectTasks(String taskParameter,
  13. String ownSign, int taskItemNum, List<TaskItemDefine> taskItemList,
  14. int eachFetchDataNum) throws Exception {
  15. List<DemoTask> taskList = new LinkedList<DemoTask>();
  16. //客户端根据条件进行数据采集start
  17. //客户端根据条件进行数据采集end
  18. return rt;
  19. }
  20. **
  21. * 数据处理
  22. */
  23. @Override
  24. public boolean execute(DemoTask task, String ownSign)
  25. throws Exception {
  26. //客户端pop任务进行处理start
  27. //客户端pop任务进行处理end
  28. return true;
  29. }

其实我们看对于tbscchedule客户端的使用非常简单,初始化zk、配置调度策略、调度任务,对调度任务进行实现,就这几个步骤。现在可以庆祝下了,你又掌握了一个优秀开源框架的设计思想和使用方式。

四、tbschedule挑战

任何事物都是没有最好只有更好,tbschedule也一样,虽然它现在已经很完美了,我们不能放弃对更完美的追求。阿里团队可以在下面几个方面进行优化。 
        1、ScheduleConsole监控页面优化,目前ScheduleConsole监控页面过于简单,需完善UI设计,提高用户体验。 
        2、Zookeeper集群自动切换,避免zk服务的集群点多故障 
        3、原生zookeeper操作替换为curator,Curator对ZooKeeper进行了一次包装,对原生ZooKeeper的操作做了大量优化,Client和Server之间的连接可能出现的问题处理等等,可以进一步提高TBSchedule的高可用。 
        4、帮助文档较少,网上的资料基本是千篇一律,希望有更多的爱好者加入进来。

至此,我们已经完成了对tbschedule的全部介绍,尽快使用起来吧! 
   我的博客:http://mycolababy.iteye.com/

时间: 2024-10-09 12:48:41

详解应对平台高并发的分布式调度框架TBSchedule的相关文章

Java生鲜电商平台-高并发核心技术订单与库存实战

Java生鲜电商平台-高并发核心技术订单与库存实战 一. 问题 一件商品只有100个库存,现在有1000或者更多的用户来购买,每个用户计划同时购买1个到几个不等商品. 如何保证库存在高并发的场景下是安全的? (1)不多发 (2)不少发 二. 下单的步骤 (1)下单 (2)下单同时预占库存 (3)支付 (4)支付成功真正减扣库存 (5)取消订单 (6)回退预占库存 三. 什么时候进行预占库存? (1)方案一:加入购物车的时候去预占库存 (2)方案二:下单的时候去预占库存 (3)方案三:支付的时候去

详解IOS开发应用之并发Dispatch Queues (2011)

详解IOS开发应用之并发Dispatch Queues是本文哟啊介绍的内容,我们几乎可以调度队列去完成所有用线程来完成的任务.调度队列相对于线程代码更简单,易于使用,更高效.下面讲主要简述调度队列,在应用中如何使用调度队列去执行任务. 1.关于调度队列 所有的调度队列都是先进先出队列,因此,队列中的任务的开始的顺序和添加到队列中的顺序相同.GCD自动的为我们提供了一些调度队列,我们也可以创建新的用于具体的目的. 下面列出几种可用的调度队列类型以及如何使用. (1)serial queues(串行

转----详解IOS开发应用之并发Dispatch Queues

详解IOS开发应用之并发Dispatch Queues是本文要介绍的内容,我们几乎可以调度队列去完成所有用线程来完成的任务.调度队列相对于线程代码更简单,易于使用,更高效.下面讲主要简述调度队列,在应用中如何使用调度队列去执行任务. 1.关于调度队列 所有的调度队列都是先进先出队列,因此,队列中的任务的开始的顺序和添加到队列中的顺序相同.GCD自动的为我们提供了一些调度队列,我们也可以创建新的用于具体的目的. 下面列出几种可用的调度队列类型以及如何使用. (1)serial queues(串行队

图文详解OpenTLD平台的搭建

图文详解OpenTLD平台的搭建 本文以图文的方式详解了OpenTLD平台的搭建.硬件平台:装有Windows 7/8/XP的电脑一台:软件平台:OpenTLD.OpenCV.Visual Studio 2010. 1.软件下载 OpenTLD:https://github.com/arthurv/OpenTLD/tree/master OpenCV:http://opencv.org/downloads.html 下载后如下图: 对OpenCV解压:双击=>点击Extract: 再对OpenT

Python环境搭建详解(Window平台)

前言 Python,是一种面向对象的解释型计算机程序设计语言,是纯粹的自由软件,Python语法简洁清晰,特色是强制用空白符作为语句缩进,具有丰富和强大的库,它常被称为胶水语言. Python是一种解释型语言:这意味着开发过程中没有没有了编译的环境,是交换式语言,是面向对象语言,是初学者的语言,其优点是:易学习,面向对象,易维护,可移植,可扩展,广泛的标准库.其缺点就是运行速度慢. 安装 那么如何安装Python呢? 提供安装官网:https://www.python.org/downloads

Java网络编程和NIO详解9:基于NIO的网络编程框架Netty

Java网络编程和NIO详解9:基于NIO的网络编程框架Netty 转自https://sylvanassun.github.io/2017/11/30/2017-11-30-netty_introduction/ netty是基于NIO实现的异步事件驱动的网络编程框架,学完NIO以后,应该看看netty的实现,netty框架涉及的内容特别多,这里只介绍netty的基本使用和实现原理,更多扩展的内容将在以后推出. 本系列文章首发于我的个人博客:https://h2pl.github.io/ 欢迎

2017最新技术java高级架构、千万高并发、分布式集群、架构师入门到精通视频教程

* { font-family: "Microsoft YaHei" !important } h1 { color: #FF0 } 15套java架构师.集群.高可用.高可扩 展.高性能.高并发.性能优化.Spring boot.Redis.ActiveMQ.Nginx.Mycat.Netty.Jvm大型分布 式项目实战视频教程 视频课程包含: 高级Java架构师包含:Spring boot.Spring  cloud.Dubbo.Redis.ActiveMQ.Nginx.Mycat

高并发,分布式,高性能,系统架构项目实战

15套java架构师.集群.高可用.高可扩展.高性能.高并发.性能优化.Spring boot.Redis.ActiveMQ.Nginx.Mycat.Netty.Jvm大型分布式项目实战视频教程 视频课程内容包含: 高级Java架构师包含:Spring boot.Spring  cloud.Dubbo.Redis.ActiveMQ.Nginx.Mycat.Spring.MongoDB.ZeroMQ.Git.Nosql.Jvm.Mecached.Netty.Nio.Mina.性能调优.高并发.to

java架构师,高并发,分布式,集群,大型高并发电商项目实战视频教程

15套java架构师.集群.高可用.高可扩展.高性能.高并发.性能优化.Spring boot.Redis.ActiveMQ.Nginx.Mycat.Netty.Jvm大型分布式项目实战视频教程 视频课程内容包含: 高级Java架构师包含:Spring boot.Spring  cloud.Dubbo.Redis.ActiveMQ.Nginx.Mycat.Spring.MongoDB.ZeroMQ.Git.Nosql.Jvm.Mecached.Netty.Nio.Mina.性能调优.高并发.to