python第三方库系列之十四--集群化部署定时任务apscheduler库

如果将定时任务部署在一台服务器上,那么这个定时任务就是整个系统的单点,这台服务器出现故障的话会影响服务。对于可以冗余的任务(重复运行不影响服务),可以部署在多台服务器上,让他们同时执行,这样就可以很简单的避免单点。但是如果任务不允许冗余,最多只能有一台服务器执行任务,那么前面的方法显然行不通。本篇文章就向大家介绍如何避免这种互斥任务的单点问题,最后再介绍一下基于APScheduler的分布式定时任务框架,这个框架是通过多个项目的实践总结而成的。

对于运行在同一台服务器上的两个进程,可以通过加锁实现互斥执行,而对于运行在多个服务器上的任务仍然可以通过用加锁实现互斥,不过这个锁是分布式锁。这个分布式锁并没有那么神秘,实际上只要一个提供原子性的数据库即可。比如,在数据库的locks表里有一个记录(lock record),包含属性:

name:锁的名字,互斥的任务需要用名字相同的锁。
active_ip:持有锁的服务器的ip。
update_time:上次持有锁的时间,其他非活跃的服务器通过这个属性判断活跃的服务器是否超时,如果超时,则会争夺锁。

一个持有锁的服务器通过不断的发送心跳,来更新这个记录,心跳的内容就是持有锁的时间戳(update_time),以及本机ip。也就是说,通过发送心跳来保证当前的服务器是活跃的,而其他服务器通过lock record中的update_time来判断当前活跃的服务器是否超时,一旦超时,其他的服务器就会去争夺锁,接管任务的执行,并发送心跳更新active_ip。

通过上面描述,这个框架中最重要的两个概念就是分布式锁和心跳。下面看一下分布式定时任务框架中是如何实现这两点的。当然,这个框架依赖于APScheduler,所以必须安装这个模块,具体APScheduler的介绍见我的上一篇文章:python第三方库系列之十三--定时任务apscheduler库,因为依赖APScheduler,所以这个框架很简单,只有一个类:

import datetime
import socket
import struct
import fcntl

from apscheduler.scheduler import Scheduler

def get_ip(ifname):
    s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    return socket.inet_ntoa(fcntl.ioctl(
        s.fileno(),
        0x8915,
        struct.pack('256s', ifname[:15])
    )[20:24])

class MutexScheduler(Scheduler):
    def __init__(self, local_ip, gconfig={}, **options):
        Scheduler.__init__(self, gconfig, **options)
        #self.ip = get_ip(settings.NETWORK_INTERFACE)
        self.ip = local_ip

    def mutex(self, lock=None, heartbeat=None, lock_else=None,
              unactive_interval=datetime.timedelta(seconds=10)):

        def mutex_func_gen(func):
            def mtx_func():
                if lock:
                    lock_rec = lock()
                    now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
                    now = datetime.datetime.strptime(now, "%Y-%m-%d %H:%M:%S")
                    # execute mutex job when the server is active, or the other server is timeout.
                    if not lock_rec or lock_rec['active_ip'] == self.ip or (
                        lock_rec['update_time'] and now - lock_rec['update_time'] >= unactive_interval):
                        if lock_rec:
                            del lock_rec['active_ip']
                            del lock_rec['update_time']
                        if not lock_rec:
                            lock_rec = {}
                        lock_attrs = func(**lock_rec)
                        if not lock_attrs:
                            lock_attrs = {}
                            # send heart beat
                        heartbeat(self.ip, now, **lock_attrs)
                    else:
                        lock_else(lock_rec)
                else:
                    func()

            return mtx_func

        self.mtx_func_gen = mutex_func_gen

        def inner(func):
            return func

        return inner

    def cron_schedule(self, **options):
        def inner(func):
            if hasattr(self, 'mtx_func_gen'):
                func = self.mtx_func_gen(func)
            func.job = self.add_cron_job(func, **options)
            return func

        return inner

mutex方法是核心,通过装饰器的方式提供互斥功能。在使用时:

@sched.mutex(lock = my_lock, heartbeat = my_heartbeat)
@sched.cron_schedule(second = '*')
def my_job(**attrs):
    print 'my_job ticks'
#mutex装饰器必须用在cron_schedule装饰器之前,mutex主要是组装job。mutex的参数有:
#lock:函数,用于获取锁记录(lock record),函数原型:lock()。lock的返回值时dict,就是锁记录内容。
#heartbeat:函数,用于发出心跳,函数原型:heartbeat(ip, now, **attrs)。ip是本机ip;now是当前时间戳;attrs是一个dict,用于在锁记录中存放一些其他用户自定义信息。
#lock_else:函数,在没有获得锁时执行,函数原型:lock_else(lock_rec)。lock_rec是锁记录,包含active_ip,update_time以及用户自定义的属性。
#unactive_interval:datetime.timedelta类型,超时时间,也就是说当前时间减去update_time大于unactive_interval的话,就代表超时。类中默认值<span style="font-family: Consolas, 'Courier New', Courier, mono, serif; line-height: 18px; background-color: rgb(248, 248, 248);">unactive_interval=datetime.timedelta(seconds=</span><span class="number" style="margin: 0px; padding: 0px; border: 0px; font-family: Consolas, 'Courier New', Courier, mono, serif; line-height: 18px; background-color: rgb(248, 248, 248);">10</span><span style="margin: 0px; padding: 0px; border: 0px; font-family: Consolas, 'Courier New', Courier, mono, serif; line-height: 18px; background-color: rgb(248, 248, 248);">)是默认10s。</span>
#在使用这个类时,必须实现自己的lock,heartbeat以及lock_else函数。

job的原型是job(**attrs),attrs就是存放在锁记录中的用户自定义属性,job可以有dict类型的返回值,这个返回值会存入锁记录中。

下面,看一下具体使用的例子,使用的mongodb存放分布式锁。

import apscheduler.events
import datetime
import time
import pymongo
import sys
import mtxscheduler  

sched = mtxscheduler.MutexScheduler()  

mongo = pymongo.Connection(host = '127.0.0.1', port = 27017)
lock_store = mongo['lockstore']['locks']  

def lock():
    conn = connect_adms_db()
	lock_name = 'xxx'
    sql = "select name, active_ip, update_time from locks where name='%s';" % lock_name
    log.info("sql:%s" % sql)
    res = conn.execute(sql)[0]
    conn.close()
    tuple = {'name': res["name"], 'active_ip': res["active_ip"], 'update_time': res["update_time"]}
    return tuple

def hb(ip, now, **attrs):
    attrs['active_ip'] = ip
    attrs['update_time'] = now
	conn = connect_adms_db()
	lock_name = 'xxx'
    sql = "update locks set active_ip='%(ip)s', update_time='%(update_time)s' "           "where name='%(name)s'" % {'ip': ip, 'update_time': now, 'name': lock_name}
    log.info("sql:%s" % sql)
    res = conn.execute(sql)
    conn.close()

def le(lock_rec):
    if lock_rec:
        print 'active ip', lock_rec['active_ip']
    else:
         print 'lock else'  

i = 0  

@sched.mutex(lock = lock, heartbeat = hb, lock_else = le)
@sched.cron_schedule(second = '*')
def job(**attr):
    global i
    i += 1
    print i  

def err_listener(ev):
    if ev.exception:
        print sys.exc_info()  

sched.add_listener(err_listener, apscheduler.events.EVENT_JOB_ERROR)  

sched.start()
time.sleep(10)  

这个任务很简单就是定时打印整数序列。同时在两台服务器上部署运行,可以发现只有一台服务器会输出整数序列。

还有使用redis,mongodb存储锁。

时间: 2024-12-15 14:54:42

python第三方库系列之十四--集群化部署定时任务apscheduler库的相关文章

RabbitMQ集群化部署

压测环境上RabbitMQ主库采用三台集群化部署,部署在172.16.103.127, 172.16.103.138, 172.16.103.129三台机器上. 安装目录:/opt/rabbitmq/rabbitmq_3.6.2 集群化部署 1.设置hosts解析,所有节点配置相同 vi /etc/hosts 172.16.103.129 mq-n129172.16.103.128 mq-n128172.16.103.127 mq-n127 2.设置节点间认证的cookiescp /root/.

面试系列10 es生产集群的部署架构

如果你确实干过es,那你肯定了解你们生产es集群的实际情况,部署了几台机器?有多少个索引?每个索引有多大数据量?每个索引给了多少个分片?你肯定知道! 但是如果你确实没干过,也别虚,我给你说一个基本的版本,你到时候就简单说一下就好了 (1)es生产集群我们部署了5台机器,每台机器是6核64G的,集群总内存是320G (2)我们es集群的日增量数据大概是2000万条,每天日增量数据大概是500MB,每月增量数据大概是6亿,15G.目前系统已经运行了几个月,现在es集群里数据总量大概是100G左右.

【2】微服务架构-kong的集群化部署

一:Kong的集群方案 Kong支持集群方案,可以加入多个Kong节点来保障服务的高可用性以及提高负载的能力,如下面官方图所示,多个kong组成的集群需要使用共享数据库,以保证集群数据的一致性. (1)集群状态 检查集群的状态 $kong cluster reachability 检查群集的成员 $kong cluster members (2)集群配置 集群配置包含以下几项 cluster_listen         用于与群集中其他节点之间通信的地址和端口. 默认值: 0.0.0.0:79

kairosdb+cassandra集群化安装

kairosdb (1)到/conf目录下,找到kairosdb.properties,修改datastore为cassandra (2)设置cassandra的连接方式 (3) 设置用户名密码 4. 启动:到/bin目录下,直接跑./kairosdb.sh start,最后会看到 KairosDB service started   这样一句话,就OK了 172.16.101.25:8080 kairosdb客户端 cassandra 修改cassandra配置文件 conf/cassandr

Docker系列(十四):Docker Swarm集群

一.Swarm简介 Swarm是Docker官方提供的一款集群管理工具,其主要作用是把若干台Docker主机抽象为一个整体,并且通过一个入口统一管理这些Docker主机上的各种Docker资源.Swarm和Kubernetes比较类似,但是更加轻便,具有的功能也较kubernetes更少一些. Swarm 在 Docker 1.12 版本之前属于一个独立的项目,在 Docker 1.12 版本发布之后,该项目合并到了 Docker 中,成为 Docker 的一个子命令.目前,Swarm 是 Do

Chrome浏览器扩展开发系列之十四:本地消息机制Native messagin

Chrome浏览器扩展开发系列之十四:本地消息机制Native messaging 2016-11-24 09:36 114人阅读 评论(0) 收藏 举报  分类: PPAPI(27)  通过将浏览器所在客户端的本地应用注册为Chrome浏览器扩展的"本地消息主机(native messaging host)",Chrome浏览器扩展还可以与客户端本地应用之间收发消息. 客户端的本地应用注册为Chrome浏览器扩展的"本地消息主机"之后,Chrome浏览器会在独立的

quick-cocos2d-x 学习系列之十四 测试用例

quick-cocos2d-x 学习系列之十四 测试用例 定义变量,创建13个场景名字 local items = { "framework.helper", "framework.native", "framework.display", "framework.crypto", "framework.network", "framework.luabinding", "fra

python mysql 导库,加入主从同步集群

脚本可以在任意机器上执行(需要安装mysql,至少是mysql客户端,mysql只能版本为5.6及以上),首先输入源ip,检测源ip上的mysql是否正常运行,再在本机dump mysql数据库,然后将dump文件传输到目的服务器,在目的服务器上导入数据库,最后把从库加入到现用集群中. [[email protected] test]# cat finaly_mysql.py #!/usr/bin/env python #-*- coding: utf-8 -*- import MySQLdb,

Chrome浏览器扩展开发系列之十四

Chrome浏览器扩展开发系列之十四:本地消息机制Native messaging 时间:2015-10-08 16:17:59      阅读:1361      评论:0      收藏:0      [点我收藏+] 通过将浏览器所在客户端的本地应用注册为Chrome浏览器扩展的"本地消息主机(native messaging host)",Chrome浏览器扩展还可以与客户端本地应用之间收发消息. 客户端的本地应用注册为Chrome浏览器扩展的"本地消息主机"