celery 实例进阶

认识

这里有几个概念,task、worker、broker。
顾名思义,task 就是老板交给你的各种任务,worker 就是你手下干活的人员。

那什么是 Broker 呢?

老板给你下发任务时,你需要 把它记下来, 这个它 可以是你随身携带的本子,也可以是 电脑里地记事本或者excel,或者是你的 任何时间管理工具。

Broker  则是 Celery 记录task的地方。
作为一个任务管理者的你,将老板(前端程序)发给你的 安排的工作(Task) 记录到你的本子(Broker)里。接下来,你就安排你手下的IT程序猿们(Worker),都到你的本子(Broker)里来取走工作(Task)

回到顶部

1. broker为rabbitmq

#tasks.py

from celery import Celery

app = Celery(‘tasks‘, broker=‘amqp://admin:[email protected]:5672‘)

@app.task
def add(x, y):
    return x + y

启动

celery -A tasks worker --loglevel=info

运行

>>> from tasks import add
>>> add(1, 3)
4
>>> add.delay(1,3)
<AsyncResult: 07614cef-f314-4c7b-a33f-92c080cadb83>
>>> 

:delay是使用异步的方式,会压入到消息队列。否则,不会使用消息队列。

文件名为tasks.py,则其中代码app = Celery(‘tasks‘, broker=),Celery第一个参数为工程名,启动时也是celery -A tasks worker --loglevel=info

对比

:投入到指定的队列用:add.delay(1, 3, queue=‘queue_add1‘)

test_2.py

from celery import Celery

app = Celery(‘proj‘, broker=‘amqp://admin:[email protected]:5672‘, include=‘test_2‘)

@app.task
def add(x, y):
    return x + y

回到顶部

2. 以python+文件名的方式启动

例1:

#test.py

from celery import Celery
import time
app = Celery(‘test‘, backend=‘amqp‘, broker=‘amqp://admin:[email protected]:5672‘)

@app.task
def add(x, y):
    print "------>"
    time.sleep(5)
    print "<--------------"
    return x + y

if __name__ == "__main__":
    app.start()

启动

python test.py worker 

celery默认启动的worker数为内核个数,如果指定启动个数,用参数-c,例

python test.py worker -c 2

例2:

#test.py

from celery import Celery
import time
app = Celery(‘test‘, backend=‘amqp‘, broker=‘amqp://admin:[email protected]:5672‘)

@app.task
def add(x, y):
    print "------>"
    time.sleep(2)
    print "<--------------"
    return x + y

if __name__ == "__main__":
    app.start()

#eg.py

from test import *
import time

rev = []
for i in range(3):
    rev.append(add.delay(1,3))

print "len rev:", len(rev)
while 1:
    tag = 1
    for key in rev:
        if not key.ready():
            tag = 0
            time.sleep(1)
            print "sleep 1"
    if tag:
        break
print "_____________________>"

回到顶部

3. broker为redis

#test_redis.py

from celery import Celery
import time
#app = Celery(‘test_redis‘, backend=‘amqp‘, broker=‘redis://100.69.201.116:7000‘)
app = Celery(‘test_redis‘, backend=‘redis‘, broker=‘redis://100.69.201.116:7000‘)

@app.task
def add(x, y):
    print "------>"
    time.sleep(5)
    print "<--------------"
    return x + y

if __name__ == "__main__":
    app.start()

启动

python test_redis.py worker -c 2

测试

from celery import group
from test_redis import *
g = group(add.s(2, 3)).apply_async()
g = group(add.s(2, 3)).apply_async()
g = group(add.s(2, 3)).apply_async()
g = group(add.s(2, 3)).apply_async()
g = group(add.s(2, 3)).apply_async()
for ret in g.get():
    print ret
print "end-----------------------------------"

结果

5
end-----------------------------------

回到顶部

4. 两个队列(redis)

#test_redis.py

from celery import Celery
import time
#app = Celery(‘test_redis‘, backend=‘amqp‘, broker=‘redis://100.69.201.116:7000‘)
app = Celery(‘test_redis‘, backend=‘redis‘, broker=‘redis://100.69.201.116:7000‘)

@app.task
def add(x, y):
    print "------>"
    time.sleep(5)
    print "<--------------"
    return x + y

if __name__ == "__main__":
    app.start()

#test_redis_2.py

from celery import Celery
import time
#app = Celery(‘test_redis‘, backend=‘amqp‘, broker=‘redis://100.69.201.116:7000‘)
app = Celery(‘test_redis_2‘, backend=‘redis‘, broker=‘redis://100.69.201.116:7001‘)

@app.task
def add_2(x, y):
    print "=======>"
    time.sleep(5)
    print "<================="
    return x + y

if __name__ == "__main__":
    app.start()

测试

from celery import group
from test_redis import *
from test_redis_2 import *
ll = [(1,2), (3,4), (5,6)]
g = group(add.s(key[0], key[1]) for key in ll).apply_async()
for ret in g.get():
    print ret
print "end redis_1 -----------------------------------"

ll = [(1,2), (3,4), (5,6)]
g = group(add_2.s(key[0], key[1]) for key in ll).apply_async()
for ret in g.get():
    print ":", ret
print "end redis_2 -----------------------------------"

结果

3
7
11
end redis_1 -----------------------------------
: 3
: 7
: 11
end redis_2 -----------------------------------

回到顶部

5. 两个队列(同一个rabbitmq)

注释:需要提前设置下队列

##例1

#test.py

from celery import Celery
import time
app = Celery(‘test‘, backend=‘amqp‘, broker=‘amqp://admin:[email protected]:5672//‘)

@app.task
def add(x, y):
    print "------>"
    time.sleep(5)
    print "<--------------"
    return x + y

if __name__ == "__main__":
    app.start()

#test_2.py

from celery import Celery
import time
app = Celery(‘test_2‘, backend=‘amqp‘, broker=‘amqp://admin:[email protected]:5672//hwzh‘)

@app.task
def add_2(x, y):
    print "=====>"
    time.sleep(5)
    print "<=========="
    return x + y

if __name__ == "__main__":
    app.start()

测试

from celery import group
from test import *
from test_2 import *

ll = [(1,2), (3,4), (7,8)]
g = group(add.s(key[0], key[1]) for key in ll).apply_async()
for ret in g.get():
    print ret

ll = [(1,2), (3,4), (7,8)]
g = group(add_2.s(key[0], key[1]) for key in ll).apply_async()
for ret in g.get():
    print ret

结果

3
7
15
3
7
15

##例2

#test.py

from celery import Celery
import time
app = Celery(‘test‘, backend=‘amqp‘, broker=‘amqp://admin:[email protected]:5672//mq4‘)

@app.task
def add(x, y):
    print "------>"
    time.sleep(2)
    print "<--------------"
    return x + y

@app.task
def sum(x, y):
    print "------>"
    time.sleep(2)
    print "<--------------"
    return x + y

if __name__ == "__main__":
    app.start()

#eg2.py

from test import *
import time

rev = []
for i in range(3):
    rev.append(add.delay(1,3))

for i in range(3):
    rev.append(sum.delay(1,3))

print "len rev:", len(rev)
while 1:
    tag = 1
    for key in rev:
        if not key.ready():
            tag = 0
            time.sleep(1)
            print "sleep 1"
    if tag:
        break
print "_____________________>"

回到顶部

6. 保存结果

from celery import Celery

app = Celery(‘tasks‘, backend=‘amqp‘, broker=‘amqp://admin:[email protected]‘)

@app.task
def add(x, y):
    return x + y

启动

celery -A tasks_1 worker --loglevel=info

与前例不同:

- ** ---------- [config]
- ** ---------- .> app: tasks:0x7f8057931810
- ** ---------- .> transport: amqp://admin:**@localhost:5672//
- ** ---------- .> results: amqp

运行

>>> from tasks_1 import add
>>> result = add.delay(1, 3)
>>> result.ready()
True
>>> result.get()
4

回到顶部

7. 多个队列

from celery import Celery
from kombu import Exchange, Queue
BROKER_URL = ‘amqp://admin:[email protected]//‘
app = Celery(‘tasks‘, backend=‘amqp‘,broker=BROKER_URL)
app.conf.update(
     CELERY_ROUTES={
          "add1":{"queue":"queue_add1"},
          "add2":{"queue":"queue_add2"},
          "add3":{"queue":"queue_add3"},
          "add4":{"queue":"queue_add4"},
        },
)
@app.task
def add1(x, y):
     return x + y

@app.task
def add2(x, y):
     return x + y

@app.task
def add3(x, y):
     return x + y

@app.task
def add4(x, y):
     return x + y

回到顶部

8. 消息路由

文件:tasks.py

from celery import Celery, platforms
import time
import os

app = Celery(‘proj‘, broker=‘amqp://admin:[email protected]:5672‘,
             include=[‘tasks‘]
             )
app.conf.update(
    CELERY_ROUTES={
        ‘tasks.fun_1‘: {
            ‘queue‘: "q_1"
        },
        ‘tasks.fun_2‘: {
            ‘queue‘: "q_2"
        }
    }
)
platforms.C_FORCE_ROOT = True 

@app.task
def fun_1(n):
    print "(((((((((((((((func_1", n
    return 1

@app.task
def fun_2(n):
    print n, ")))))))))))))))"
    return 2

if __name__ == "__main__":
    app.start()

启动

python tasks.py worker -c 2 -Q q_1
python tasks.py worker -c 2 -Q q_2

两个消息队列:q_1, q_2,调用示例

>>> from tasks import *
>>> fun_1(1)
(((((((((((((((func_1 1
1
>>> fun_1.delay(1)
<AsyncResult: 528a2ad1-bc16-4bdc-beff-cd166fe3e885>
>>> fun_2.delay(2)
<AsyncResult: ee5881eb-b384-4a39-ba00-08aa8ee53504>

回到顶部

9. woker内启多进程

#tasks.py

from celery import Celery
import time
import multiprocessing as mp

app = Celery(‘proj‘, broker=‘amqp://admin:[email protected]:5672‘, include="tasks")

def test_func(i):
    print "beg...:", i
    time.sleep(5)
    print "....end:", i
    return i * 5

@app.task
def fun_1(n):
    curr_proc = mp.current_process()
    curr_proc.daemon = False
    p = mp.Pool(mp.cpu_count())
    curr_proc.daemon = True
    for i in range(n):
        p.apply_async(test_func, args=(i,))
    p.close()
    p.join()
    return 1

if __name__ == "__main__":
    app.start()

说明

直接启动多进程是肯定不可以的,因为是守候进程(curr_proc.daemon=True),所以启多进程之前主动设置为非守候进程:curr_proc.daemon=False,启动了以后再设为守候进程

#tasks_callback.py

from celery import Celery
import time
import multiprocessing as mp

app = Celery(‘proj‘, broker=‘amqp://admin:[email protected]:5672‘, include="tasks_callback")
rev = []
def test_func(i):
    print "beg...:", i
    time.sleep(5)
    print "....end:", i
    return i * 5

def callback_log(rev_val):
    rev.append(rev_val)

@app.task
def fun_1(n):
    print "before rev:", rev
    curr_proc = mp.current_process()
    curr_proc.daemon = False
    p = mp.Pool(mp.cpu_count())
    curr_proc.daemon = True
    for i in range(n):
        p.apply_async(test_func, args=(i,), callback=callback_log)
    p.close()
    p.join()
    print "after rev:", rev
    return 1

if __name__ == "__main__":
    app.start()

回到顶部

10. 常用参数配置

1. CELERYD_PREFETCH_MULTIPLIER

同时预取得消息个数,比如如果CELERYD_PREFETCH_MULTIPLIER=2,那么如果现在对于1个worker,有一个状态是STARTED, 那么可以有2个处于RECEVED状态(如果有的话),这样就避免了如果消息很多全部分下取,后起来的worker领不到消息的尴尬。

参考代码

from celery import Celery, platforms
import time
import os

app = Celery(‘proj‘, broker=‘amqp://admin:[email protected]:5672‘,
             include=[‘tasks‘]
             )
app.conf.update(
    CELERYD_PREFETCH_MULTIPLIER=2,
    CELERY_ROUTES={
        ‘tasks.fun_1‘: {
            ‘queue‘: "q_1"
        },
        ‘tasks.fun_2‘: {
            ‘queue‘: "q_2"
        }
    }
)
platforms.C_FORCE_ROOT = True

@app.task
def fun_1(n):
    print "(((((((((((((((func_1", n
    time.sleep(20)
    return 1

@app.task
def fun_2(n):
    print n, ")))))))))))))))"
    return 2

调用

>>> from tasks import *
>>> fun_1.delay(3)
<AsyncResult: 609f2216-6785-409e-9f6f-85ae3fcce084>
>>> fun_1.delay(3)
<AsyncResult: 0230b8bd-b237-40ef-bc73-88929f8f8290>
>>> fun_1.delay(3)
<AsyncResult: 8fce172a-93c9-41f8-8c08-377a4363389c>
>>> fun_1.delay(3)

原文地址:https://www.cnblogs.com/zknublx/p/9149887.html

时间: 2024-08-09 14:16:59

celery 实例进阶的相关文章

Celery初识及简单实例

Celery是一个"自带电池"的任务队列.易于使用,可以轻易入门,它遵照最佳实践设计,使产品可以扩展,或与其他语言集成,并且它自带了在生产环境中运行这样一个系统所需的工具和支持.本文介绍基础部分: 选择和安装消息传输方式(中间人). 安装Celery并创建一个任务 运行职程并调用任务 追踪任务在不同状态间的迁移,并检视返回值 一.选择中间人 Celery需要一个发送和接收消息的解决方案,其通常以独立服务形式出现,称为消息中间人. 可行的选择包括: RabbitMQ RabbitMQ功能

Celery(三)实例Application

Celery必须实例化后才可以使用,实例称之为application或者简称app.实例是线程安全的,多个Celery实例(不同的配置.部件和任务)都可以在一个进程空间中运行. 创建一个最简单的app: >>> from celery import Celery >>> app = Celery() >>> app <Celery __main__ at 0x7f6be52d0cd0> 上述的app是一个运行在__main__模块中的Cel

JS高级学习路线——面向对象进阶

构造函数进阶 使用构造函数创建对象 用于创建对象 其除了是一个函数之外,我们又称之为构造对象的函数 - 简称构造函数 function Product(name,description){ //属性 this.name=name; // 属性 this.description = description //方法 又称方法属性 万物皆属性 this.buy=function(){ alert('buy') } } //会拷贝一份 var p1 = new Product() var p2 = n

Celery 使用(一)

Celery 使用(一) 架构 Producer:任务发布者: Celery Beat:任务调度器,Beat进程会读取配置文件中的内容,周期性的将配置中到期需要执行的任务发送给任务队列: Broker:消息代理,接受生产者的任务消息,存进队列后发送给消费者: Celery Worker:执行任务的消费者: Result Backend:保存消费者执行任务完后的结果: 如下图: 整体的流程,任务发布者或者是任务调度,将任务发送到消息代理中,接着消息代理将任务发送给消费者来执行,其执行完后将结果保存

Celery 和 Redis 入门

Reference:  http://www.thinksaas.cn/group/topic/395734/ Celery是一个广泛应用于网络应用程序的任务处理系统. 它可以在以下情况下使用: 在请求响应周期中做网络调用.服务器应当立即响应任何网络请求.如果在请求响应周期内需要进行网络调用,则应在周期外完成调用.例如当用户在网站上注册时,需要发送激活邮件.发送邮件是一种网络调用,耗时2到3秒.用户应该无需等待这2到3秒.因此,发送激活邮件应当在请求响应周期外完成,celery 就能实现这一点.

初识Celery

本系列文章的开发环境: window 7 + python2.7 + pycharm5 + celery3.1.25 + django1.9.4 在我们日常的开发工作中,经常会遇到这几种情况: 1.在web应用中,用户触发一个操作,执行后台处理程序,这个程序需要执行很长时间才能返回结果.怎样才能不阻塞http请求,不让用户等待从而提高用户体验呢? 2.定时任务脚本:生产环境经常会跑一些定时任务脚本,假如你有上千台的服务器.上千种任务,定时任务的管理很困难,如何对job进行有效的管理? 3.异步需

[读后感]spring Mvc 教程框架实例以及系统演示下载

太阳火神的美丽人生 (http://blog.csdn.net/opengl_es) 本文遵循"署名-非商业用途-保持一致"创作公用协议 转载请保留此句:太阳火神的美丽人生 -  本博客专注于 敏捷开发及移动和物联设备研究:iOS.Android.Html5.Arduino.pcDuino,否则,出自本博客的文章拒绝转载或再转载,谢谢合作. 不要好意思,昨晚写的,睡着忘发了,后附是篇好文,赶紧w分享一下. 感脚着,俺好像做了件聪明事儿,却不知会不会遭到不理解. 转载的好文,是不会推荐到

Celery 快速入门

Celery 快速入门 任务队列 任务队列用于分发工作给不同线程或机器. Celery通过消息传递 支持多个workers和brokers.提供高可用和水平扩展性. 用Python写的 优点 简单 高可用 快 易扩展 支持 Brokers RabbitMQ, Redis MongoDB, ZeroMQ CouchDB, SQLAlchemy Django ORM, Amazon SQS, ... Result Stores AMQP, Redis memcached, MongoDB SQLAl

celery实现任务统一收集、分发执行

首先解释下目标的概念:celery任务消息会由各种途径(比如手动通过python shell触发.通过tornado触发等)发往统一的一个celery broker,然后任务消息会由不同server上的worker去获取并执行.具体点说就是,借助celery消息路由机制,celery broker中开不同的消息队列来接收相应类型的任务消息,然后不同server上开启worker来处理目标消息队列里面的任务消息,即任务统一收集.分发到不同server上执行. 测试 项目架构如下:一个服务,一部分t