Celery配置

Celery 配置

最近项目中需要做一些定时任务的工作,之前都是用 LInux 的Crontab 但是任务多了之后 不好维护也没有什么监控的措施。所以考虑使用Celery 来解决这一问题。

1.安装

pip install celery-with-redis

注意:其实celery 支持多中broker

Name Status Monitoring Remote Control
RabbitMQ Stable Yes Yes
Redis Stable Yes Yes
Amazon SQS Stable No No
Zookeeper Experimental No No

可以看见redis 和RabbitMq支持的是最好的,其中官方推荐RabbitMq,因为redis有断电或者重启丢失数据的风险。不过在这里我因为方便使用的是redis

2 配置

安装之后开始写配置文件 celeryconfig.py

BROKER_URL = ‘redis://127.0.0.1:6379/1‘ #消息队列选用
CELERY_RESULT_BACKEND = ‘redis://127.0.0.1:6379/0‘#结果存储
CELERY_TASK_SERIALIZER = ‘msgpack‘#任务序列化方式
CELERY_RESULT_SERIALIZER = ‘json‘#结果序列化方式
CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24
CELERY_ACCEPT_CONTENT = [‘json‘, ‘msgpack‘]#接收序列化方式,masgpack序列化的方式效率很高。
创建celery 实例
app = Celery(‘app‘, include=[‘YoutProjectName.tasks‘])

app.config_from_object(‘YoutProjectName.celeryconfig‘)  # 设置环境变量

3运行代码

创建YouProjName.tasks.py

# -*- coding: utf-8 -*-
#因为celery 不支持隐式导入
from __future__ import absolute_import, unicode_literals
import random
#引入app实例
from CronTab import celery_app
#引入日志类
from celery.utils.log import get_task_logger

logger = get_task_logger(__name__)

#celery 装饰器,可以在运行的时候讲任务加到队列 这里是注册,将任务加到了注册中心,这样才能消费者才能收到
@celery_app.task()
def mul(x, y):
    total = x * (y * random.randint(3, 100))
    return total

@celery_app.task()
def xsum(numbers):
    return sum(numbers)

@celery_app.task()
def xsum3(numbers):
    return sum(numbers)

打开terminal开始运行

celery -A CronTab worker -l info 可以看到如下输出

 -------------- [email protected] v3.1.25 (Cipater)

----  ----- 

--- * ***  * -- Darwin-17.3.0-x86_64-i386-64bit

-- * -  --- 

- ** ---------- [config]
- ** ---------- .> app:         app:0x1033ad978
- ** ---------- .> transport:   redis://127.0.0.1:6379/1
- ** ---------- .> results:     redis://127.0.0.1:6379/0
- *** --- * --- .> concurrency: 8 (prefork)
  -- *** ----
  --- * ----- [queues]
   -------------- .> default          exchange=tasks(topic) key=task.#

Options:

-A APP, --app=APP app instance to use (e.g. module.attr_name) 指定实例app

-b BROKER, --broker=BROKER 指定broker

--loader=LOADER name of custom loader class to use.

--config=CONFIG Name of the configuration module

--workdir=WORKING_DIRECTORY

-C, --no-color

-q, --quiet

--version show program‘s version number and exit

-h, --help show this help message and exit

-A 是指定项目 worker 执行,

-l 日志级别

再打开一个terminal 开始

消费

In [1]: from apps.message import tasks

In [2]: tasks.div.delay(1,2)

Out[2]: <AsyncResult: 7b8db8fc-3119-48c5-8d95-c74a283ae8b2>

注意调用的时候要加delay 语句,这样的话,就会发送给任务队列,让任务队列来执行该语句。

这个时候我们再切回到worker进程

消费

[2018-02-14 10:00:53,963: INFO/MainProcess] Task apps.message.tasks.div[7b8db8fc-3119-48c5-8d95-c74a283ae8b2] succeeded in 0.008831199025735259s: 0.5

可以看见就这样队列执行了你的任务,这就是celery 任务队列生产消费的过程

使用不同的队列

当你有很多任务需要执行的时候,不要只使用默认的queue,这样会相互影响,并且拖慢任务执行的,导致重要的任务不能被快速的执行。

CELERY_QUEUES = (  # 定义任务队列

    Queue(‘default‘, routing_key=‘task.#‘),  # 路由键以“task.”开头的消息都进default队列

    Queue(‘web_tasks‘, routing_key=‘web.#‘),  # 路由键以“web.”开头的消息都进web_tasks队列

)

CELERY_DEFAULT_EXCHANGE = ‘tasks‘  # 默认的交换机名字为tasks

CELERY_DEFAULT_EXCHANGE_TYPE = ‘topic‘  # 默认的交换类型是topic

CELERY_DEFAULT_ROUTING_KEY = ‘task.default‘  # 默认的路由键是task.default,这个路由键符合上面的default队列

CELERY_ROUTES = {

    ‘apps.message.tasks.add‘: {  # tasks.add的消息会进入web_tasks队列

        ‘queue‘: ‘web_tasks‘,

        ‘routing_key‘: ‘web.add‘,
    },
    ‘apps.message.tasks.div‘: {  # tasks.add的消息会进入web_tasks队列

        ‘queue‘: ‘web_tasks‘,

        ‘routing_key‘: ‘web.div‘,
    }
}

然后执行命令

celery -A CronTab worker -l info -Q web_task

这样的话,只会执行文在web_task中的任务,其他任务都不在执行

启动多个workers执行不同的任务

在同一台机器上,对于优先级不同的任务最好启动不同的worker去执行,比如把实时任务和定时任务分开,把执行频率高的任务和执行频率低的任务分开,这样有利于保证高优先级的任务可以得到更多的系统资源,同时高频率的实时任务日志比较多也会影响实时任务的日志查看,分开就可以记录到不同的日志文件,方便查看。

$ celery -A proj worker --loglevel=INFO --concurrency=10 -n worker1.%h
$ celery -A proj worker --loglevel=INFO --concurrency=10 -n worker2.%h
$ celery -A proj worker --loglevel=INFO --concurrency=10 -n worker3.%h

可以像这样启动不同的worker,%h可以指定hostname,详细说明可以查看官方文档

高优先级的任务可以分配更多的concurrency,但是并不是worker并法数越多越好,保证任务不堆积就好。

定时任务

在 celeryconfig.py 文件中添加如下配置

CELERYBEAT_SCHEDULE = {

    ‘add‘: {
        ‘task‘: ‘apps.message.tasks.add‘,
        ‘schedule‘: timedelta(seconds=1),
        ‘args‘: (16, 16)
    }

然后新开一个命令行启动

celery beat -A CronTab

可以看见任务开始不停的向队列发送

[2018-02-14 09:25:42,689: INFO/MainProcess] Scheduler: Sending due task add (apps.message.tasks.add)
[2018-02-14 09:25:43,689: INFO/MainProcess] Scheduler: Sending due task add (apps.message.tasks.add)
[2018-02-14 09:25:44,690: INFO/MainProcess] Scheduler: Sending due task add (apps.message.tasks.add)
[2018-02-14 09:25:45,690: INFO/MainProcess] Scheduler: Sending due task add (apps.message.tasks.add)
[2018-02-14 09:25:46,690: INFO/MainProcess] Scheduler: Sending due task add (apps.message.tasks.add)
[2018-02-14 09:25:47,690: INFO/MainProcess] Scheduler: Sending due task add (apps.message.tasks.add)
[2018-02-14 09:25:48,690: INFO/MainProcess] Scheduler: Sending due task add (apps.message.tasks.add)
[2018-02-14 09:25:49,691: INFO/MainProcess] Scheduler: Sending due task add (apps.message.tasks.add)

回到worker 可以看到,开始没秒钟接收并且消费了

```[2018-02-14 11:09:06,332: INFO/MainProcess] Received task: apps.message.tasks.add[36490b36-9f3a-4158-aa2e-a6640d4607b6][2018-02-14 11:09:06,332: INFO/Worker-8] apps.message.tasks.add[36490b36-9f3a-4158-aa2e-a6640d4607b6]: Executing task id 36490b36-9f3a-4158-aa2e-a6640d4607b6, args: [16, 16] kwargs: {}

[2018-02-14 11:09:06,332: INFO/Worker-1] apps.message.tasks.add[36490b36-9f3a-4158-aa2e-a6640d4607b6]: Executing task id 36490b36-9f3a-4158-aa2e-a6640d4607b6, args: [16, 16] kwargs: {}

[2018-02-14 11:09:06,333: INFO/MainProcess] Task apps.message.tasks.add[36490b36-9f3a-4158-aa2e-a6640d4607b6] succeeded in 0.001320198003668338s: 32

[2018-02-14 11:09:06,333: INFO/MainProcess] Task apps.message.tasks.add[36490b36-9f3a-4158-aa2e-a6640d4607b6] succeeded in 0.0014644850161857903s: 32

```

原文地址:https://www.cnblogs.com/maxaimee/p/8448725.html

时间: 2024-10-12 20:37:53

Celery配置的相关文章

使用celery之深入celery配置(转)

原文:http://www.dongwm.com/archives/shi-yong-celeryzhi-shen-ru-celerypei-zhi/ 前言 celery的官方文档其实相对还是写的很不错的.但是在一些深层次的使用上面却显得杂乱甚至就没有某些方面的介绍, 通过我的一个测试环境的settings.py来说明一些使用celery的技巧和解决办法 amqp交换类型 其实一共有4种交换类型,还有默认类型和自定义类型. 但是对我们配置队列只会用到其中之三,我来一个个说明,英语好的话可以直接去

django+celery配置(定时任务)

下面介绍一下django+celery的配置做定时任务 1.首先介绍一下环境和版本 python==2.7 django == 1.8.1 celery == 3.1.23 django-celery == 3.1.17 2.celery的安装   sudo pip install celery==3.1.23 sudo pip install django-celery==3.1.17 3.新建一个项目 (1)django-admin startproject django_celery_de

Django的celery配置(包括定时任务、队列)

一.安装celery Django项目不需要安装celery这个包,可以直接使用django-celery这个包,,先来安装它,在终端中输入: pip install django-celery 二.安装rabbitmq,建立celery队列 我做的项目用的就是rabbitmq,按道理来说,也是可以用redis作为消息队列的,但是rabbitmq更好,此处不做详细解释,有兴趣的同学的可以去研究下. ubuntu环境下,在终端中输入: sudo apt-get install rabbitmq-s

ubuntu 环境 celery配置全解[持续生产中]

继续尝试没有时间弄明白的技术. celery官方文档地址:http://docs.celeryproject.org/en/stable/getting-started/introduction.html#get-started. 简单的说,可以理解celery是帮助你把一些指定的工作异步化,不用等待io的工具. 比如要发十条短信,需要10秒.如果线性操作,就得等待到短信全部发送完毕之后,再开始执行下面的代码,如果中间某一个死了,你的用户就会呆呆的等待,直到操作超时,这种体验是非常糟糕.如果再a

Celery 和 Redis 入门

Reference:  http://www.thinksaas.cn/group/topic/395734/ Celery是一个广泛应用于网络应用程序的任务处理系统. 它可以在以下情况下使用: 在请求响应周期中做网络调用.服务器应当立即响应任何网络请求.如果在请求响应周期内需要进行网络调用,则应在周期外完成调用.例如当用户在网站上注册时,需要发送激活邮件.发送邮件是一种网络调用,耗时2到3秒.用户应该无需等待这2到3秒.因此,发送激活邮件应当在请求响应周期外完成,celery 就能实现这一点.

Celery 分布式任务队列入门

一.Celery介绍和基本使用 Celery 是一个 基于python开发的分布式异步消息任务队列,通过它可以轻松的实现任务的异步处理, 如果你的业务场景中需要用到异步任务,就可以考虑使用celery, 举几个实例场景中可用的例子: 你想对100台机器执行一条批量命令,可能会花很长时间 ,但你不想让你的程序等着结果返回,而是给你返回 一个任务ID,你过一段时间只需要拿着这个任务id就可以拿到任务执行结果, 在任务执行ing进行时,你可以继续做其它的事情. 你想做一个定时任务,比如每天检测一下你们

Celery,Tornado,Supervisor构建和谐的分布式系统

Celery 分布式的任务队列 与rabbitmq消息队列的区别与联系: rabbitmq 调度的是消息,而Celery调度的是任务. Celery调度任务时,需要传递参数信息,传输载体可以选择rabbitmq. 利用rabbitmq的持久化和ack特性,Celery可以保证任务的可靠性. 优点: 轻松构建分布式的Service Provider. 高可扩展性,增加worker也就是增加了队列的consumer. 可靠性,利用消息队列的durable和ack,可以尽可能降低消息丢失的概率,当wo

#SORA#celery原生配置文件研究

ps:百度是xxx的走狗 回到正题,今天研究了下用一个py文件作为celery的配置文件,所以,还是参考昨天的例子:http://my.oschina.net/hochikong/blog/396079 我们把celery.py的配置项拿出来,在proj目录中创建celeryconfig.py,内容如下: CELERY_TASK_RESULT_EXPIRES=3600 CELERY_TASK_SERIALIZER='json' CELERY_ACCEPT_CONTENT=['json'] CEL

celery简介

Celery简介 [toc] celery userguide 知乎大神解释celery Celery(芹菜)是基于Python开发的分布式任务队列.它支持使用任务队列的方式在分布的机器/进程/线程上执行任务调度. Celery架构 架构图如下: Celery包括如下组件: Celery Beat 任务调度器,Beat进程会读取配置文件的内容,周期性的将配置中到期需要执行的任务发送给任务队列 celery Worker 执行任务的消费者,通常会在多台服务器运行多个消费者来提高执行效率 Broke