celery 定时任务

1 基本概念

使用 Celery 实现定时任务的步骤:

(1) 创建一个 Celery 实例

(2) 配置文件中配置任务 ,发布任务 celery A xxx beat

(3) 启动 Celery Worker

(4) 存储结果

使用 Celery 实现异步任务的步骤:

(1) 创建一个 Celery 实例

(2) 启动 Celery Worker ,通过delay() 或 apply_async()(delay 方法封装了 apply_async, apply_async支持更多的参数 ) 将任务发布到broker

(3) 应用程序调用异步任务

(4)存储结果 (发布的任务需要return才会有结果,否则为空)

2 celery定时任务简单使用

目录结构如下

celeryconfig.py

from __future__ import absolute_import # 拒绝隐式引入,因为celery.py的名字和celery的包名冲突,需要使用这条语句让程序正确地运行
from celery.schedules import crontab
from datetime import timedelta

broker_url = "redis://127.0.0.1:6379/2"   # 使用redis存储任务队列
result_backend = "redis://127.0.0.1:6379/6"  # 使用redis存储结果

task_serializer = ‘json‘
result_serializer = ‘json‘
accept_content = [‘json‘]
timezone = "Asia/Shanghai"  # 时区设置
worker_hijack_root_logger = False  # celery默认开启自己的日志,可关闭自定义日志,不关闭自定义日志输出为空
result_expires = 60 * 60 * 24  # 存储结果过期时间(默认1天)

# 导入任务所在文件
imports = [
    "celery_task.epp_scripts.test1",  # 导入py文件
    "celery_task.epp_scripts.test2",
]

# 需要执行任务的配置
beat_schedule = {
    "test1": {
        "task": "celery_task.epp_scripts.test1.celery_run",  #执行的函数
        "schedule": crontab(minute="*/1"),   # every minute 每分钟执行
        "args": ()  # # 任务函数参数
    },

    "test3": {
        "task": "celery_task.epp_scripts.test1.celery_run",  # 执行的函数
        "schedule": timedelta(seconds=10),  # 每10s执行一次
        "args": ()  # # 任务函数参数
    },

    "test2": {
        "task": "celery_task.epp_scripts.test2.celery_run",
        "schedule": crontab(minute=0, hour="*/1"),   # every minute 每小时执行
        "args": ()
    },

}

# "schedule": crontab()与crontab的语法基本一致
# "schedule": crontab(minute="*/10",  # 每十分钟执行
# "schedule": crontab(minute="*/1"),   # 每分钟执行
# "schedule": crontab(minute=0, hour="*/1"),    # 每小时执行

celery.py

# coding:utf-8
from __future__ import absolute_import # 拒绝隐式引入,因为celery.py的名字和celery的包名冲突,需要使用这条语句让程序正确地运行
from celery import Celery

# 创建celery应用对象
app = Celery("celery_demo")

# 导入celery的配置信息
app.config_from_object("celery_task.celeryconfig")

tes1.py

# coding:utf-8
from __future__ import absolute_import # 拒绝隐式引入,因为celery.py的名字和celery的包名冲突,需要使用这条语句让程序正确地运行
from celery import Celery

# 创建celery应用对象
app = Celery("celery_demo")

# 导入celery的配置信息
app.config_from_object("celery_task.celeryconfig")

test2.py

from celery_task.celery import app

def test33():
    print("test33----------------")
    # print("------"*50)

def test44():
    print("test44--------------")
    # print("------" * 50)
    test33()

@app.task
def celery_run():
    test33()
    test44()

3 启动

先在一个终端: celery -A celery_task beat   发布任务,在celery_task 同级目录下执行

再在另一个终端:celery -A pinduoduo worker -l info -P eventlet  开启worker 在celery_task 同级目录下执行

需要注意,上面的参数 -P eventlet 主要是为了解决在win10 下报  Celery ValueError: not enough values to unpack (expected 3, got 0) 的错误,需要安装eventlet先

pip install eventlet -i https://pypi.douban.com/simple/

参考:https://blog.csdn.net/Shyllin/article/details/80940643

参考:https://blog.csdn.net/qq_30242609/article/details/79047660

原文地址:https://www.cnblogs.com/jec1999/p/10663068.html

时间: 2024-08-01 21:10:08

celery 定时任务的相关文章

利用django admin后台配置celery定时任务

1.安装djcelery pip install django-celery 2.在Django项目setting配置 A.配置djcelery # CELERY STUFF import djcelery djcelery.setup_loader() BROKER_URL = 'redis://localhost:6379' CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler' # 定时任务 CELERY_RESULT_

将celery定时任务设置为根据本地时区触发

默认celery的时区为UTC,如果要在django项目中将celery定时任务配置为根据本地时区触发,则需要修改 在setttings.py 添加以下任意一行: # celery 相关配置 CELERY_TIMEZONE = 'Asia/Shanghai' CELERY_ENABLE_UTC = False 原文地址:https://www.cnblogs.com/linyihai/p/10140936.html

celery 定时任务,使用crontab表达式不执行(版本4.3.x)

celery 定时任务,使用crontab表达式不执行(版本4.3.x) 在使用celery 执行定时任务时,发现任务不会执行,schedule设置如下: 经测试,如果去掉hour,则任务每分钟都会执行,说明是hour的问题,那只有是时区的问题了. 遂将hour改为UTC的时间,发现任务可以执行,说明celery使用的是UTC时区,但我设置的时区如下: CELERY_TIMEZONE='Asia/Shanghai' CELERY_ENABLE_UTC=True 注意添加任务的时候使用utc时间,

Celery定时任务细讲

Celery定时任务细讲 一.目录结构 任务所在目录 ├── celery_task # celery包 如果celery_task只是建了普通文件夹__init__可以没有,如果是包一定要有 │ ├── __init__.py # 包文件 看情况要不要存在 │ ├── celery.py # celery连接和配置相关文件,且名字必须交celery.py,其实也不是必须的不然你指令可能要修改 │ └── tasks.py # 所有任务函数 二.配置 celery.py from celery

Django-如何写好一个celery定时任务

1.首先在项目同名目录下建一个celery.py from __future__ import absolute_import import os from celery import Celery from datetime import timedelta from kombu import Queue # set the default Django settings module for the 'celery' program. os.environ.setdefault('DJANG

celery定时任务

原文地址:https://www.cnblogs.com/jintian/p/11447252.html

Django的celery配置(包括定时任务、队列)

一.安装celery Django项目不需要安装celery这个包,可以直接使用django-celery这个包,,先来安装它,在终端中输入: pip install django-celery 二.安装rabbitmq,建立celery队列 我做的项目用的就是rabbitmq,按道理来说,也是可以用redis作为消息队列的,但是rabbitmq更好,此处不做详细解释,有兴趣的同学的可以去研究下. ubuntu环境下,在终端中输入: sudo apt-get install rabbitmq-s

Celery - 一个懂得 异步任务 , 定时任务 , 周期任务 的芹菜

1.什么是Celery?Celery 是芹菜Celery 是基于Python实现的模块, 用于执行异步定时周期任务的其结构的组成是由    1.用户任务 app    2.管道 broker 用于存储任务 官方推荐 redis rabbitMQ  / backend 用于存储任务执行结果的    3.员工 worker 2.Celery的简单实例 1 from celery import Celery 2 import time 3 4 #创建一个Celery实例,这就是我们用户的应用app 5

Python Celery队列

Celery队列简介: Celery 是一个 基于python开发的分布式异步消息任务队列,通过它可以轻松的实现任务的异步处理, 如果你的业务场景中需要用到异步任务,就可以考虑使用celery. 使用场景: 1.你想对100台机器执行一条批量命令,可能会花很长时间 ,但你不想让你的程序等着结果返回,而是给你返回 一个任务ID,你过一段时间只需要拿着这个任务id就可以拿到任务执行结果, 在任务执行ing进行时,你可以继续做其它的事情. 2.你想做一个定时任务,比如每天检测一下你们所有客户的资料,如