使用beanstalkd实现定制化持续集成过程中pipeline - 持续集成系列

持续集成是一种项目管理和流程模型,依赖于团队中各个角色的配合。各个角色的意识和配合不是一朝一夕能练就的,我们的工作只是提供一种方案和能力,这就是持续集成能力的服务化。而在做持续集成能力服务化的过程中,最核心的一点就是,如何实现一个可定制化的任务流,即所谓的pipeline。

在传统的持续集成工具实现了pipeline功能,以供串联上下游job,并把多个job联系成一次完整的构建,例如jenkins的pipeline插件。

但是各种持续集成工具,或多或少都有自己的短板,总结起来如下:

1、配置并不方便,上下游job配置并不能点击即可用;

2、上下游job之间参数的传递无法很方便的实现;

3、一次完整构建链路如何trace并收集各个job的执行情况;

4、根据3实现问题的快速定位。

我们先说一下,beanstalkd实现可定制化pipeline的方法吧。

一、先通过概念让大家了解Beanstalkd的特性和工作场景。

Beanstalkd 是一个轻量级消息中间件,它最大特点是将自己定位为基于管道  (tube) 和任务 (job) 的工作队列 (work-queue):

Beanstalkd 支持任务优先级 (priority), 延时 (delay), 超时重发 (time-to-run) 和预留 (buried), 能够很好的支持分布式的后台任务和定时任务处理。

它的内部实现采用 libevent, 服务器-客户端之间用类似 memcached 的轻量级通讯协议,具有有很高的性能。

尽管是内存队列, beanstalkd 提供了 binlog 机制, 当重启 beanstalkd 时,当前任务状态能够从纪录的本地 binlog 中恢复。

管道 (tube):

管道类似于消息主题 (topic), 在一个 Beanstalkd 中可以支持多个管道, 每个管道都有自己的发布者 (producer) 和消费者 (consumer). 管道之间互相不影响。

任务 (job):

READY- 需要立即处理的任务,当延时 (DELAYED) 任务到期后会自动成为当前任务;

DELAYED- 延迟执行的任务, 当消费者处理任务后, 可以用将消息再次放回 DELAYED 队列延迟执行;

RESERVED- 已经被消费者获取, 正在执行的任务。Beanstalkd 负责检查任务是否在 TTR(time-to-run) 内完成;

BURIED- 保留的任务: 任务不会被执行,也不会消失,除非有人把它 "踢" 回队列;

DELETED- 消息被彻底删除。Beanstalkd 不再维持这些消息。

Beanstalkd 用任务 (job) 代替消息 (message) 的概念。与消息不同,任务有一系列状态:

任务优先级 (priority):

任务 (job) 可以有 0~2^32 个优先级, 0 代表最高优先级。 beanstalkd 采用最大最小堆 (Min-max heap) 处理任务优先级排序, 任何时刻调用 reserve 命令的消费者总是能拿到当前优先级最高的任务, 时间复杂度为 O(logn).

延时任务 (delay):

有两种方式可以延时执行任务 (job): 生产者发布任务时指定延时;或者当任务处理完毕后, 消费者再次将任务放入队列延时执行 (RELEASE with <delay>)。这种机制可以实现分布式的 Java.util.Timer,这种分布式定时任务的优势是:如果某个消费者节点故障,任务超时重发 (time-to-run) 能够保证任务转移到另外的节点执行。

任务超时重发 (time-to-run):

Beanstalkd 把任务返回给消费者以后:消费者必须在预设的 TTR (time-to-run) 时间内发送 delete / release/ bury 改变任务状态;否则 Beanstalkd 会认为消息处理失败,然后把任务交给另外的消费者节点执行。如果消费者预计在 TTR (time-to-run) 时间内无法完成任务, 也可以发送 touch 命令, 它的作用是让 Beanstalkd 从系统时间重新计算 TTR (time-to-run).

任务预留 (buried):

如果任务因为某些原因无法执行, 消费者可以把任务置为 buried 状态让 Beanstalkd 保留这些任务。管理员可以通过 peek buried 命令查询被保留的任务,并且进行人工干预。简单的, kick <n> 能够一次性把 n 条被保留的任务踢回队列。

Beanstalkd 协议:

Beanstalkd 采用类 memcached 协议, 客户端通过文本命令与服务器交互。这些命令可以简单的分成三组:

生产类 - use <tube> / put <priority> <delay> <ttr> [bytes]:

生产者用 use 选择一个管道 (tube), 然后用 put 命令向管道发布任务 (job).

消费类 - watch <tubes> / reserve / delete <id> / release <id> <priority> <delay> / bury <id> / touch <id>

消费者用 watch 选择多个管道 (tube), 然后用 reserve 命令获取待执行的任务,这个命令是阻塞的。客户端直到有任务可执行才返回。当任务处理完毕后, 消费者可以彻底删除任务 (DELETE), 释放任务让别人处理 (RELEASE), 或者保留 (BURY) 任务。

维护类 - peek job / peek delayed / peek ready / peek buried / kick <n>

用于维护管道内的任务状态, 在不改变任务状态的条件下获取任务。可以用消费类命令改变这些任务的状态。

被保留 (buried) 的任务可以用 kick 命令 "踢" 回队列。

二、python对beanstalkd的封装

import beanstalkc

class BstkManager(object):

    __doc__ = ‘beanstalk封装类,这里只封装了用到的方法‘

    def __init__(self, config):
        self.config = config
        self.conn = self.__createConnection(self.config)    

    def __createConnection(self, config):
        try:
            conn = beanstalkc.Connection(host=config.get(‘host‘), port=int(config.get(‘port‘)))
            return conn
        except Exception, ex:
            raise Exception(‘beanstalkd connection can not be established!‘, ex)

    def getConnection(self):
        return self.conn

    def put(self, message, tube=None):
        try:
            tube = self.config.get(‘topic‘) if tube == None else tube
            self.conn.use(tube)
            self.conn.ignore(‘default‘)
            self.conn.put(message)
        except Exception, ex:
            raise Exception(‘put message to %s failure!‘ % tube, ex)

    def reserve(self, tube=None, timeout=None):
        try:
            tube = self.config.get(‘topic‘) if tube == None else tube
            self.conn.ignore(‘default‘)
            self.conn.watch(tube)
            msg = self.conn.reserve(timeout=timeout)
            message_body = msg.body
            msg.delete()
            return message_body
        except Exception, ex:
            raise Exception(‘reserve message from %s failure!‘ % tube, ex)

    def clean(self, tube=None):
        try:
            while True:
                tube = self.config.get(‘topic‘) if tube == None else tube
                msg = self.conn.reserve(tube, timeout=1)
                # 如果超时 return
                if msg == None:
                    return
                msg.delete()
        except Exception, ex:
            raise Exception(‘clean tube %s failure!‘ % tube, ex)
            

在持续集成中,使用tube或者说topic区分不同的业务线,不同的业务人员通过向系统注册管道topic。这样做的收益是:

1、所有的业务在环境和流程上被隔离,互补干扰。

2、每个topic是一个独立的pipeline,每个pipeline之间是串行,但是topic之间是并行。这样保证一个业务线上的job是串行执行的,独占测试环境,而不用担心测试环境占用冲突。

import logging
import os
import sys
import traceback

import time
from django.conf import settings
from django.core.management.base import BaseCommand
from beanstalkd_client import connect_beanstalkd, BeanstalkError
from beanstalkc import SocketError

logger = logging.getLogger(‘beanstalkd_client‘)
logger.addHandler(logging.StreamHandler())

class Command(BaseCommand):
    help = "Start a Beanstalk worker serving all registered Beanstalk jobs"
    __doc__ = help

    def add_arguments(self, parser):

        parser.add_argument(
            ‘-w‘,
            ‘--workers‘,
            action=‘store‘,
            dest=‘worker_count‘,
            default=‘1‘,
            help=‘Number of workers to spawn.‘,
        )

        parser.add_argument(
            ‘-l‘,
            ‘--log-level‘,
            action=‘store‘,
            dest=‘log_level‘,
            default=‘info‘,
            help=‘Log level of worker process (one of ‘
                    ‘"debug", "info", "warning", "error"‘,
        )

    children = [] # list of worker processes
    jobs = {}

    def handle(self, *args, **options):
        # set log level
        logger.setLevel(getattr(logging, options[‘log_level‘].upper()))

        # find beanstalk job modules
        bs_modules = []
        for app in settings.INSTALLED_APPS:
            try:
                modname = "%s.beanstalk_jobs" % app
                __import__(modname)
                bs_modules.append(sys.modules[modname])
            except ImportError:
                pass
        if not bs_modules:
            logger.error("No beanstalk_jobs modules found!")
            return

        # find all jobs
        jobs = []
        for bs_module in bs_modules:
            try:
                jobs += bs_module.beanstalk_job_list
            except AttributeError:
                pass
        if not jobs:
            logger.error("No beanstalk jobs found!")
            return
        logger.info("Available jobs:")
        for job in jobs:
            # determine right name to register function with
            app = job.app
            jobname = job.__name__
            try:
                func = settings.BEANSTALK_JOB_NAME % {
                    ‘app‘: app,
                    ‘job‘: jobname,
                }
            except AttributeError:
                func = ‘%s.%s‘ % (app, jobname)
            self.jobs[func] = job
            logger.info("* %s" % func)

        # spawn all workers and register all jobs
        try:
            worker_count = int(options[‘worker_count‘])
            assert(worker_count > 0)
        except (ValueError, AssertionError):
            worker_count = 1
        self.spawn_workers(worker_count)

        # start working
        logger.info("Starting to work... (press ^C to exit)")
        try:
            for child in self.children:
                os.waitpid(child, 0)
        except KeyboardInterrupt:
            sys.exit(0)

    def spawn_workers(self, worker_count):
        """
        Spawn as many workers as desired (at least 1).
        Accepts:
        - worker_count, positive int
        """
        # no need for forking if there‘s only one worker
        if worker_count == 1:
            return self.work()

        logger.info("Spawning %s worker(s)" % worker_count)
        # spawn children and make them work (hello, 19th century!)
        for i in range(worker_count):
            child = os.fork()
            if child:
                self.children.append(child)
                continue
            else:
                self.work()
                break

    def work(self):
        """children only: watch tubes for all jobs, start working"""
        try:

            while True:
                try:
                    # Reattempt Beanstalk connection if connection attempt fails or is dropped
                    beanstalk = connect_beanstalkd()
                    for job in self.jobs.keys():
                        beanstalk.watch(job)
                    beanstalk.ignore(‘default‘)

                    # Connected to Beanstalk queue, continually process jobs until an error occurs
                    self.process_jobs(beanstalk)

                except (BeanstalkError, SocketError) as e:
                    logger.info("Beanstalk connection error: " + str(e))
                    time.sleep(2.0)
                    logger.info("retrying Beanstalk connection...")

        except KeyboardInterrupt:
            sys.exit(0)

    def process_jobs(self, beanstalk):
        while True:
            logger.debug("Beanstalk connection established, waiting for jobs")
            job = beanstalk.reserve()
            job_name = job.stats()[‘tube‘]
            if job_name in self.jobs:
                logger.debug("Calling %s with arg: %s" % (job_name, job.body))
                try:
                    self.jobs[job_name](job.body)
                except Exception, e:
                    tp, value, tb = sys.exc_info()
                    logger.error(‘Error while calling "%s" with arg "%s": ‘
                        ‘%s‘ % (
                            job_name,
                            job.body,
                            e,
                        )
                    )
                    logger.debug("%s:%s" % (tp.__name__, value))
                    logger.debug("\n".join(traceback.format_tb(tb)))
                    job.bury()
                else:
                    job.delete()
            else:
                job.release()
时间: 2024-10-20 04:40:03

使用beanstalkd实现定制化持续集成过程中pipeline - 持续集成系列的相关文章

SSH集成过程中遇到的问题及解决

1.出现了下面问题 页面显示为 1 Struts Problem Report 2 Struts has detected an unhandled exception: 3 4 Messages: could not initialize proxy - no Session 5 could not initialize proxy - no Session - Class: org.hibernate.proxy.AbstractLazyInitializer File: AbstractL

Selenium2学习-018-WebUI自动化实战实例-016-自动化脚本编写过程中的登录验证码问题

日常的 Web 网站开发的过程中,为提升登录安全或防止用户通过脚本进行黄牛操作(宇宙最贵铁皮天朝魔都的机动车牌照竞拍中),很多网站在登录的时候,添加了验证码验证,而且验证码的实现越来越复杂,对其进行脚本识别的难度也越来越高.这对我们自动化脚本编写带了非常的不便,那么如何解决登录时的验证码问题呢?经常有初学自动化脚本编写的小主们问及此问题. 此文主要针对如何解决自动化测试脚本中含登录态的操作问题,即如何降低验证码对自动化脚本编写过程中的解决方法进行分析和解决,并以实例演示(基于易迅网易迅账号登录)

驰骋工作流引擎-CCMobile与安卓、IOS集成过程中的问题与解决方案

CCMobile与安卓.IOS集成过程中的问题与解决方案 前言: CCMobile(2019版本)是CCFlow&JFlow 的一款移动端审批的产品.系统基于mui框架开发,是一款可以兼容Android与IOS的移动端工作流审批系统.由于CCMobile仅仅局限于移动端的流程审批,所以在其他办公功能方面很少,这时可能就需要在源码上开发或者与其他APP进行集成. 由于Mui是一款h5的框架,并不是原生的,所以在与原生APP集成时,会出现一些问题,主要集中在附件上传下载.屏幕兼容等.本文章,将具体描

share SDK 集成过程中遇到的问题 新浪微博error:redirect_uri_mismatch的解决方法

暂时 也没有发现这个报错有什么后果 解决方法:到新浪微博注册应用的地方填上回调地址,然后将代码中的回调地址修改成和新浪微博开放平台里一样即可,见下图 完成后,删除测试机上的APP,Clean工程,然后重新编绎即可.

文章阅读:计算机体系-计算机将代码编译和持续运行过程中需要考虑的问题,以及具体的实现原理讲解

文章太棒,我无法理解和评价,备份一下. 1.编程漫游 - Mr.Riddler's Puzzle http://blog.mrriddler.com/2016/12/15/%E7%BC%96%E7%A8%8B%E6%BC%AB%E6%B8%B8/ 注:讲了什么是函数,什么是数据.太抽象了,不过看了之后有很深的感触,还有不懂的感触. 2.计算机体系-编译体系漫游 - Mr.Riddler's Puzzle http://blog.mrriddler.com/2017/02/10/%E8%AE%A1

Springboot kafka 集成过程中的问题

Q1:Failed to start bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry' A1:是因为没有在注入时未对其进行配置导致,注册参照如下 /** * Created by wolf 2018/12/1 */ @Configuration @EnableKafka public class kafkatemplateConfig { @Value("${spring.kafka.boo

在TFS持续集成(持续发布)中执行Telnet任务

Telnet是一种在因特网或局域网上使用虚拟终端连接,提供双向交互式文本通信设备的协议. 它是最早的互联网通讯协议之一.自1969年启用以来,已经经过了将近50年时间,在开放式的操作系统中拥有广泛的用户. 虽然由于其安全性的弊端,已经逐渐被淘汰,但是在许多AIX系统的服务器上,运维人员都习惯使用Telnet作为自己的主要工具,维护服务器系统.TFS系统作为应用软件生命周期管理(ALM)平台的产品,原生提供SSH工具连接Linux系统,可惜没有提供Telnet的工具,这里我介绍如何使用Ant中的T

一些刷题过程中的结论

来源UVA 10313 整数i拆分成不超过j个整数的拆分方案数,是和整数i拆成若干个值不超过j的整数的拆分方案数是相同的 原理 Ferrers图像 详:http://blog.csdn.net/shiqi_614/article/details/7001949 关于C++引用带的的不易发现的错误.来自于一个4维DP.UVA 10913 例如int&res=dp[i][j][p][q].如果在这个记忆化搜索子过程中需要更新i,j,p,q;比如在当前递归中p++,那么此时的res是dp[i][j][

视频演示eworkflow集成定制aspx页面的过程

eworkflow自定义工作流系统,集成eform自定义表单,可以做到在线编辑流程,在线编辑表单.eform也提供在线建立业务表,维护表字段等,所以通过eworkflow+eform可以在线完成业务流程和业务模块设计,制作,调试运行,正式使用等等.整个业务流程的开发过程,不需要编译,边设计就可以边运行查看结果.  我们通常是调试运行结束后,没有问题后,就可以正式使用了,这给开发人员,甚至是终端使用人员都带来了方便. eworkflow工作流系统,也可以直接集成aspx,jsp等页面. ework