目录
原文: http://106.13.73.98/__/156/
安装:pip install celery
celery 是基于 Python 实现的模块,用于执行异步定时周期任务。
celery 组成结构:
- 用户任务 app: 用于生成任务
- 管道 broker 与 backend:前者用于存放任务,后者用于存放任务执行结果
- 员工 worker:负责执行任务
@(Python celery)
简单示例
员工文件(workers.py):import time from celery import Celery # 创建一个Celery实例,这个就是我们用户的应用app my_task = Celery( 'tasks', broker='redis://127.0.0.1:6380', # 指定存放任务的地方,这个指定为redis backend='redis://127.0.0.1:6380', # 指定存放任务执行结果的地方 ) # 为应用创建任务 @my_task.task def fn1(x, y): time.sleep(10) return x + y """ 执行命令: Linux:celery worker -A workers -l INFO Windows:celery worker -A workers -l INFO -P eventlet celery 4.0 已经不再对Windows操作系统提供支持了,需要安装:pip install eventlet """
用户app文件(user_app.py):
from workers import fn1 # 提交任务,将任务存到管道,等待员工执行 result = fn1.delay(2, 4) # (i, i) 是传入的参数 print(result) # b2df92e9-0eee-4af5-be83-dd8ac044d2a4 # 运行后,将提交任务
用于取结果的文件(get_result.py):
from celery.result import AsyncResult from workers import my_task # user_app文件的打印结果 ID = 'b2df92e9-0eee-4af5-be83-dd8ac044d2a4' # 异步获取任务返回值 async_task = AsyncResult(id=ID, app=my_task) # 判断异步任务是否执行成功 if async_task.successful(): # 获取异步任务的返回值 result = async_task.get() print(result) else: print('任务还在执行中')
执行顺序:执行命令 > 提交任务 > 取结果
Celery 项目目录结构
在实际的项目中,我们的 celery 是有规则的:
要满足这样的条件才可以,目录 celery_task 和其它文件可以随意命名,但此目录内一定要有一个 celery.py 文件。celery.py
from celery import Celery celery_task = Celery( 'task', broker='redis://127.0.0.1:6380', # 指定存放任务的地方,这个指定为redis backend='redis://127.0.0.1:6380', # 指定存放任务执行结果的地方 include=['celery_task.task01', 'celery_task.task02'] # 指定任务文件,会自动寻找任务文件中所有的任务 ) # 启动worker时,无需指定文件,直接通过你的celery_task目录就可以了 # 启动命令:celery worker -A celery_task -l INFO -P eventlet # 这样,celery就可以自动检索当前目录下的所有task了,通过Include参数去逐一寻找
task01.py
import time from .celery import celery_task @celery_task.task def one(x, y): time.sleep(5) return f'one:{x + y}'
task02.py
import time from .celery import celery_task @celery_task.task def two(x, y): time.sleep(10) return f'two: {x + y}'
user_app.py
from celery_task.task01 import one from celery_task.task02 import two one.delay(1, 1) two.delay(2, 2)
get_result.py
from celery.result import AsyncResult from celery_task.celery import celery_task # 终端显示的任务ID ID = 'b2df92e9-0eee-4af5-be83-dd8ac044d2a4' # 异步获取任务返回值 async_task = AsyncResult(id=ID, app=celery_task) # 判断异步任务是否执行成功 if async_task.successful(): # 获取异步任务的返回值 result = async_task.get() print(result) else: print('任务还在执行中')
定时任务
请结合 Celery 项目目录结构 中的文件来设置定时任务。user_app.py 文件内容如下:
import time import datetime from celery_task.task01 import one from celery_task.task02 import two # 获取当前时间,此时间为东八区时间 ctime = time.time() # 将当前的东八区时间改为 UTC时间,注意这里一定是UTC时间,没有其它说法 utc_time = datetime.datetime.utcfromtimestamp(ctime) # 时间格式例如:2019-02-20 11:30:02.032230 # 为当前时间增加10秒 add_time = datetime.timedelta(seconds=10) # 0:00:10 action_time = utc_time + add_time # 此时的 action_time 就是当前时间的未来10秒 # 现在,我们使用 apply_async 来提交定时任务 result = one.apply_async(args=(1, 1), eta=action_time) # args=(1, 1):提交的参数 # eta=action_time:指定执行时间
周期任务
请结合 Celery 项目目录结构 中的文件来设置周期任务。celery.py 文件内容如下:
from celery import Celery from celery.schedules import crontab celery_task = Celery( 'task', broker='redis://127.0.0.1:6380', # 指定存放任务的地方,这个指定为redis backend='redis://127.0.0.1:6380', # 指定存放任务执行结果的地方 include=['celery_task.task01', 'celery_task.task02'] # 指定任务文件,会自动寻找任务文件中所有的任务 ) # 定制周期任务 celery_task.conf.beat_schedule = { # 每10秒执行一次celery_task.task01.one 'each10s_task': { 'task': 'celery_task.task01.one', # 指定任务 'schedule': 10, # 每10秒执行一次 'args': (1, 1) # 参数 }, # 每分钟执行一次celery_task.task01.one 'each1m_task': { 'task': 'celery_task.task01.one', # 指定任务 'schedule': crontab(minute=1), # 每分钟执行一次 'args': (1, 1) # 参数 }, # 每隔23小时执行一次celery_task.task02.two 'each24hour_task': { 'task': 'celery_task.task02.two', # 指定任务 'schedule': crontab(hour=23), # 每23小时执行一次 'args': (1, 1) # 参数 } } """ 启动命令: 先执行:celery beat -A celery_task -l INFO # 生产者,周期任务需要一个生产者来周期性提交任务 再执行:celery worker -A celery_task -l INFO -P eventlet """
提交周期任务时,需要一个生产者 beat 来提交任务。
因此,启动命令分为两个:
celery beat -A celery_task -l INFO # 生产者,用于提交任务
celery worker -A celery_task -l INFO -P eventlet # 处理任务
原文: http://106.13.73.98/__/156/
原文地址:https://www.cnblogs.com/gqy02/p/11323685.html
时间: 2024-10-08 23:35:09