【Python celery】 -- 2019-08-08 18:03:28

目录

原文: http://106.13.73.98/__/156/

安装:pip install celery

celery 是基于 Python 实现的模块,用于执行异步定时周期任务。

celery 组成结构:

  1. 用户任务 app: 用于生成任务
  2. 管道 broker 与 backend:前者用于存放任务,后者用于存放任务执行结果
  3. 员工 worker:负责执行任务

@(Python celery)

简单示例



员工文件(workers.py):

import time
from celery import Celery

# 创建一个Celery实例,这个就是我们用户的应用app
my_task = Celery(
    'tasks',
    broker='redis://127.0.0.1:6380',  # 指定存放任务的地方,这个指定为redis
    backend='redis://127.0.0.1:6380',  # 指定存放任务执行结果的地方
)

# 为应用创建任务
@my_task.task
def fn1(x, y):
    time.sleep(10)
    return x + y

"""
执行命令:
Linux:celery worker -A workers -l INFO
Windows:celery worker -A workers -l INFO -P eventlet

celery 4.0 已经不再对Windows操作系统提供支持了,需要安装:pip install eventlet
"""

用户app文件(user_app.py):

from workers import fn1

# 提交任务,将任务存到管道,等待员工执行
result = fn1.delay(2, 4)  # (i, i) 是传入的参数

print(result)  # b2df92e9-0eee-4af5-be83-dd8ac044d2a4

# 运行后,将提交任务

用于取结果的文件(get_result.py):

from celery.result import AsyncResult
from workers import my_task

# user_app文件的打印结果
ID = 'b2df92e9-0eee-4af5-be83-dd8ac044d2a4'

# 异步获取任务返回值
async_task = AsyncResult(id=ID, app=my_task)

# 判断异步任务是否执行成功
if async_task.successful():
    # 获取异步任务的返回值
    result = async_task.get()
    print(result)
else:
    print('任务还在执行中')

执行顺序:执行命令 > 提交任务 > 取结果

Celery 项目目录结构



在实际的项目中,我们的 celery 是有规则的:

要满足这样的条件才可以,目录 celery_task 和其它文件可以随意命名,但此目录内一定要有一个 celery.py 文件。

celery.py

from celery import Celery

celery_task = Celery(
    'task',
    broker='redis://127.0.0.1:6380',  # 指定存放任务的地方,这个指定为redis
    backend='redis://127.0.0.1:6380',  # 指定存放任务执行结果的地方
    include=['celery_task.task01', 'celery_task.task02']  # 指定任务文件,会自动寻找任务文件中所有的任务
)

# 启动worker时,无需指定文件,直接通过你的celery_task目录就可以了
# 启动命令:celery worker -A celery_task -l INFO -P eventlet
# 这样,celery就可以自动检索当前目录下的所有task了,通过Include参数去逐一寻找

task01.py

import time
from .celery import celery_task

@celery_task.task
def one(x, y):
    time.sleep(5)
    return f'one:{x + y}'

task02.py

import time
from .celery import celery_task

@celery_task.task
def two(x, y):
    time.sleep(10)
    return f'two: {x + y}'

user_app.py

from celery_task.task01 import one
from celery_task.task02 import two

one.delay(1, 1)
two.delay(2, 2)

get_result.py

from celery.result import AsyncResult
from celery_task.celery import celery_task

# 终端显示的任务ID
ID = 'b2df92e9-0eee-4af5-be83-dd8ac044d2a4'

# 异步获取任务返回值
async_task = AsyncResult(id=ID, app=celery_task)

# 判断异步任务是否执行成功
if async_task.successful():
    # 获取异步任务的返回值
    result = async_task.get()
    print(result)
else:
    print('任务还在执行中')

定时任务



请结合 Celery 项目目录结构 中的文件来设置定时任务。

user_app.py 文件内容如下:

import time
import datetime
from celery_task.task01 import one
from celery_task.task02 import two

# 获取当前时间,此时间为东八区时间
ctime = time.time()

# 将当前的东八区时间改为 UTC时间,注意这里一定是UTC时间,没有其它说法
utc_time = datetime.datetime.utcfromtimestamp(ctime)  # 时间格式例如:2019-02-20 11:30:02.032230

# 为当前时间增加10秒
add_time = datetime.timedelta(seconds=10)  # 0:00:10
action_time = utc_time + add_time
# 此时的 action_time 就是当前时间的未来10秒

# 现在,我们使用 apply_async 来提交定时任务
result = one.apply_async(args=(1, 1), eta=action_time)
# args=(1, 1):提交的参数
# eta=action_time:指定执行时间

周期任务



请结合 Celery 项目目录结构 中的文件来设置周期任务。

celery.py 文件内容如下:

from celery import Celery
from celery.schedules import crontab

celery_task = Celery(
    'task',
    broker='redis://127.0.0.1:6380',  # 指定存放任务的地方,这个指定为redis
    backend='redis://127.0.0.1:6380',  # 指定存放任务执行结果的地方
    include=['celery_task.task01', 'celery_task.task02']  # 指定任务文件,会自动寻找任务文件中所有的任务
)

# 定制周期任务
celery_task.conf.beat_schedule = {
    # 每10秒执行一次celery_task.task01.one
    'each10s_task': {
        'task': 'celery_task.task01.one',  # 指定任务
        'schedule': 10,  # 每10秒执行一次
        'args': (1, 1)  # 参数
    },
    # 每分钟执行一次celery_task.task01.one
    'each1m_task': {
        'task': 'celery_task.task01.one',  # 指定任务
        'schedule': crontab(minute=1),  # 每分钟执行一次
        'args': (1, 1)  # 参数
    },
    # 每隔23小时执行一次celery_task.task02.two
    'each24hour_task': {
        'task': 'celery_task.task02.two',  # 指定任务
        'schedule': crontab(hour=23),  # 每23小时执行一次
        'args': (1, 1)  # 参数
    }
}

"""
启动命令:
先执行:celery beat -A celery_task -l INFO  # 生产者,周期任务需要一个生产者来周期性提交任务
再执行:celery worker -A celery_task -l INFO -P eventlet
"""

提交周期任务时,需要一个生产者 beat 来提交任务。
因此,启动命令分为两个:
celery beat -A celery_task -l INFO # 生产者,用于提交任务
celery worker -A celery_task -l INFO -P eventlet # 处理任务

原文: http://106.13.73.98/__/156/

原文地址:https://www.cnblogs.com/gqy02/p/11322824.html

时间: 2024-10-08 23:35:03

【Python celery】 -- 2019-08-08 18:03:28的相关文章

【Python celery】 -- 2019-08-08 20:39:56

目录 原文: http://106.13.73.98/__/156/ 安装:pip install celery celery 是基于 Python 实现的模块,用于执行异步定时周期任务. celery 组成结构: 用户任务 app: 用于生成任务 管道 broker 与 backend:前者用于存放任务,后者用于存放任务执行结果 员工 worker:负责执行任务 @(Python celery) 简单示例 员工文件(workers.py): import time from celery im

【Python 多进程】 -- 2019-08-16 20:08:07

原文: http://blog.gqylpy.com/gqy/228 " 一.模块介绍 multiprocess模快 仔细说来,multiprocess不是一个模块,而是python中的一个操作.管理进程的包,之所以叫multi是取自multiple的多功能的意思,这个包中几乎包含了和进程有关的所有子模块. multiprocess.Process模块 Process能够帮助我们创建子进程,以及对子进程的一些控制. 参数:def __init__(self, group=None, target

【Python Network】使用DOM生成XML

单纯的为DOM树添加结点. 1 #!/usr/bin/env python 2 # Generating XML with DOM - Chapter 8 - domgensample.py 3 4 from xml.dom import minidom, Node 5 6 doc = minidom.Document() 7 8 doc.appendChild(doc.createComment("Sample XML Document - Chapter 8 -")) 9 10 #

【python进阶】详解元类及其应用2

前言 在上一篇文章[python进阶]详解元类及其应用1中,我们提到了关于元类的一些前置知识,介绍了类对象,动态创建类,使用type创建类,这一节我们将继续接着上文来讲~~~ 5.使?type创建带有?法的类 最终你会希望为你的类增加?法.只需要定义?个有着恰当签名的函数并将 其作为属性赋值就可以了.添加实例?法 In [14]: def echo_bar(self):#定义了一个普通的函数 ...: print(self.bar) ...: In [15]: FooChild = type('

【python学习】8.异常

[python学习]8.异常 raise Exception: 抛出指定异常 try/except: 捕捉异常 except: 第一个参数是需要捕获的异常类型,可以是多个类型组成元组,第二个参数是捕获到的异常对象, raise: 抛出已经捕获的异常 else: 当没有捕获的异常时候执行 finally: 总会被执行 def test(): try: raise Exception("test") except (Exception), e: print "Exception&

【LeetCode】【Python题解】Pascal's Triangle

Given numRows, generate the first numRows of Pascal's triangle. For example, given numRows = 5, Return [[1],[1,1],[1,2,1],[1,3,3,1],[1,4,6,4,1]] 要求输入一个整数,返回一个表示杨辉三角的数组.我的方法是计算通项公式,首先是编写阶乘函数,然后计算C00,C10,C11即可 利用Python 的map嵌套可以很简洁地实现,核心代码只有一行! class So

【LeetCode】【Python题解】Best Time to Buy and Sell Stock II

Say you have an array for which the ith element is the price of a given stock on day i. Design an algorithm to find the maximum profit. You may complete as many transactions as you like (ie, buy one and sell one share of the stock multiple times). Ho

【python爬虫】根据查询词爬取网站返回结果

最近在做语义方面的问题,需要反义词.就在网上找反义词大全之类的,但是大多不全,没有我想要的.然后就找相关的网站,发现了http://fanyici.xpcha.com/5f7x868lizu.html,还行能把"老师"-"学生","医生"-"病人"这样对立关系的反义词查出来. 一开始我想把网站中数据库中存在的所有的词语都爬出来(暗网爬虫),但是分析了url的特点: http://fanyici.xpcha.com/5f7x86

【python系列】SyntaxError:Missing parentheses in call to 'print'

打印python2和python3的区别 如上图所示,我的 PyCharm安装的是python3.6如果使用print 10会出现语法错误,这是python2.x和python3.x的区别所导致的. [python系列]SyntaxError:Missing parentheses in call to 'print'