使用celery之了解celery(转)

原文  http://www.dongwm.com/archives/shi-yong-celeryzhi-liao-jie-celery/

前言

我想很多做开发和运维的都会涉及一件事:crontab, 也就是在服务器上设定定时任务,按期执行一些任务.但是假如你有上千台的服务器, 你有上千种任务,那么对于这个定时任务的管理恐怕是一件很头疼的事情.哪怕你只是几十个任务分配的不同的机器上怎么样合理的管理和实现以下功能呢:

  1. 查看定时任务的执行情况.比如执行是否成功,当前状态,执行花费的时间.
  2. 一个友好的界面或者命令行下实现添加,删除任务
  3. 怎么样简单实现不同的机器设定不同种任务,某些机器执行不同的队列
  4. 假如你需要生成一个任务怎么样不阻塞剩下来的过程(异步了呗)
  5. 怎么样并发的执行任务

几种选择

  1. 有钱有人有时间自己实现一套,优点是完全符合公司业务的需要,有专门的团队维护和服务
  2. 使用 Gearman ,听说过没用过,因为是C/java/perl,对我们这种python开发者或者运维来说,假如没有这方面经验之后没有能力了解底层实现和二次开发的能力
  3. 使用 rq , rq是搞gitflow的那个作者写的,简介里面说的很清楚:Simple job queues for Python. 怕它不够复杂,但是假如业务没有那么复杂或者应用不是那么严格,完全可以尝试下
  4. 好吧我选择了 celery , 现在用了快半年,可能是历史遗留问题,版本较低.有很多坑.但是很不错

消息队列

RabbitMQ,ZeroMQ这样的消息队列总是出现在我们视线中, 其实意义是很简单: 消息就是一个要传送的数据,celery是一个分布式的任务队列.这个”任务”其实就是一种消息, 任务被生成到队列中,被RabbitMQ等容器接收和存储,在适当的时候又被要执行的机器把这个消息取走.

celery任务可以使用RabbitMQ/Redis/SQLAlchemy/Django的orm/Mongodb等等容器(这里叫做Broker).我使用的是RabbitMQ,因为作者在github主页的介绍里面很明确的写了这个

所谓队列,你可以设想一个问题,我有一大推的东西要执行,但是我并不是需要每个服务器都执行这个任务,因为业务不同嘛. 所以就要做个队列, 比如任务A,B,C A可以在X,Y服务器执行, 但是不需要或者不能在Z服务器上执行.那么在X,Y你启动worker(下面会说,其实就是消费者和生产者的消费者)加上这个队列,Z服务器就不需要指定这个队列,也就不会执行这个队列的任务

celery的原理,我这里的角度是django+celery+django-celery

首先说一下流程:

  1. 使用django-celery或者直接操作数据库(settings.py里面指定)添加任务,设置的相关属性(包含定时任务的间隔)存入数据库.
  2. celerybeat通过djcelery.schedulers.DatabaseScheduler获取django内你设置的任务周期性的检查(默认5s),发现需要执行某任务讲其丢入你设置的broker(我这里是rabbitmq),他会更具settings.py的设置放到对应的队列
  3. 在你启动了celery worker(以前是celeryd)的服务器上,根据worker也会定期(默认5s)去broker里面查找需要它执行的队列里面是否有任务
  4. 当发现队列有要执行的任务,worker将它取出来执行,执行完的结果通过celerycam(默认30s,所以这个进程也要启动)写入django设置的数据库,更新了这个任务的状态.比如花费的时间

supervisor进程管理

不知道有没有人用过 supervise ,我以前经常在最初的项目开发中经常使用它监视我的程序,当程序死掉自动启动, supervisor 确是一个 进程管理的工具,我在这里使用它管理celery的程序和uwsgi

粘贴下我的一个本地环境的配置,并直接进行一下说明:

;程序名字
[program:celery-queue-fetch]
;程序要执行的命令, -Q 指定了生成和接受任务的队列,多个用都好分开 -c为workr的数量,原意为并发数量
command=python /home/dongwm/work/manage.py celery worker -E --settings=settings_local --loglevel=INFO -Q fetch_ -c 30
;程序执行时候所在目录
directory=/home/dongwm/work/
;执行程序使用的用户
user=dongwm
;启动的程序的实例数,默认是1
numprocs=1
stdout_logfile=/home/dongwm/work/celerylog/celery.log
stderr_logfile=/home/dongwm/work/celerylog/celery.log
;在启动supervisor时候自动启动
autostart=true
;当程序可能因为某些原因没有启动成功会自动重启
autorestart=true
;启动的等待时候,我想是为了重启能杀掉原来进程预留的时间
startsecs=10
;进程发送停止信号等待os返回SIGCHILD的时间
stopwaitsecs=10
;低优先级的会首先启动最后关闭
priority=998
;以下2句是为了保证杀掉进程和其子进程而不会只杀死其控制的程序主进程而留下子进程变为孤立进程的问题
stopsignal=KILL
stopasgroup=true

[program:celery-queue-feed]
command=python /home/dongwm/work/manage.py celeryd -E --settings=settings_local --loglevel=INFO -Q feed
directory=/home/dongwm/work/
user=dongwm
numprocs=1
stdout_logfile=/home/dongwm/work/celerylog/celery.log
stderr_logfile=/home/dongwm/work/celerylog/celery.log
autostart=true
autorestart=true
startsecs=10
stopwaitsecs=10
priority=998
stopsignal=KILL
stopasgroup=true

[program:celerycam]
;任务快照的间隔时间为10s
command=python /home/dongwm/work/manage.py celerycam -F 10 --settings=settings_local
directory=/home/dongwm/work/
user=dongwm
numprocs=1
stdout_logfile=/home/dongwm/work/celerylog/celerycam.log
stderr_logfile=/home/dongwm/work/celerylog/celerycam.log
autostart=true
autorestart=true
startsecs=5
stopwaitsecs=5
priority=998
stopsignal=KILL
stopasgroup=true

[program:celerybeat]
command=python /home/dongwm/work/manage.py celerybeat --settings=settings_real_old --loglevel=DEBUG
directory=/home/dongwm/work/
user=dongwm
numprocs=1
stdout_logfile=/home/dongwm/work/celerylog/celery_beat.log
stderr_logfile=/home/dongwm/work/celerylog/celery_beat.log
autostart=true
autorestart=true
startsecs=10
priority=999
stopsignal=KILL
stopasgroup=true

;这是supervisor官方的一个监控进程状态异常退出的脚本,我对它进行了较大的修改,这样在程序奇怪退出的时候会给我发邮件
[eventlistener:crashmail]
command=python /home/dongwm/superlance/superlance/crashmail.py -a -m [email protected]
events=PROCESS_STATE_EXITED

[program:uwsgi]
user = dongwm
numprocs=1
command=/usr/local/bin/uwsgi -s /tmp/uwsgi-sandbox.sock --processes 4  --enable-threads      --pythonpath /home/dongwm/uwsgi --buffer-size 32768 --listen 100 --daemonize /home/dongwm/ulog/uwsgi_out.log
directory=/home/dongwm/work
autostart=true
autorestart=true
redirect_stderr=true
stopsignal=KILL
stopasgroup=true

nginx+uwsgi的实践

nginx进程数会直接影响性能, 如何使用到的模块不会出现阻塞式的调用,应该有多少cpu就配多少worker_processes,否则才需要配置更多的进程数. 比如你的用户大量读取你的本地静态文件,并且服务器上面内存较少,硬盘的I/O调用可能会阻塞worker少量时间,那么就要多配

为了更好利用多核的优势,我绑定了worker和对应的内核:

worker_processes     4;
worker_cpu_affinity 0001 0010 0100 1000;
时间: 2024-10-14 21:39:30

使用celery之了解celery(转)的相关文章

day43——celery简介、celery小例子

一.Celery介绍和基本使用 Celery 是一个 基于python开发的分布式异步消息任务队列,通过它可以轻松的实现任务的异步处理, 如果你的业务场景中需要用到异步任务,就可以考虑使用celery, 举几个实例场景中可用的例子: 你想对100台机器执行一条批量命令,可能会花很长时间 ,但你不想让你的程序等着结果返回,而是给你返回 一个任务ID,你过一段时间只需要拿着这个任务id就可以拿到任务执行结果, 在任务执行ing进行时,你可以继续做其它的事情. 你想做一个定时任务,比如每天检测一下你们

使用celery之深入celery配置(转)

原文:http://www.dongwm.com/archives/shi-yong-celeryzhi-shen-ru-celerypei-zhi/ 前言 celery的官方文档其实相对还是写的很不错的.但是在一些深层次的使用上面却显得杂乱甚至就没有某些方面的介绍, 通过我的一个测试环境的settings.py来说明一些使用celery的技巧和解决办法 amqp交换类型 其实一共有4种交换类型,还有默认类型和自定义类型. 但是对我们配置队列只会用到其中之三,我来一个个说明,英语好的话可以直接去

celery expires 让celery任务具有时效性

起因:有的时候,我们希望任务具有时效性,比如定时每5分钟去抓取某个状态,由于celery队列中的任务可能很多,等到这个任务被执行时,已经超过了5分钟,那么这个任务的执行已经没有意义,因为下一次抓取已经执行了. 可以进行如下设定: @task(ignore_result=True, expires=900) def nupdate_influence_by_15min(uid, today=None, if_whole=False): ... ... expires – Either a int,

Celery/RabbitMQ在Ubuntu上的安装

1.安装RabbitMQ sudo apt-get install rabbitmq-server sudo rabbitmqctl add_user [username] [password] sudo rabbitmqctl add_vhost [vhostname] sudo rabbitmqctl set_user_tags [username] [tagname] sudo rabbitmqctl set_permissions -p [vhostname] [username]".*

Celery的实践指南

celery原理: celery实际上是实现了一个典型的生产者-消费者模型的消息处理/任务调度统,消费者(worker)和生产者(client)都可以有任意个,他们通过消息系统(broker)来通信. 典型的场景为: 客户端启动一个进程(生产者),当用户的某些操作耗时较长或者比较频繁时,考虑接入本消息系统,发送一个task任务给broker. 后台启动一个worker进程(消费者),当发现broker中保存有某个任务到了该执行的时间,他就会拿过来,根据task类型和参数执行. 实践中的典型场景:

在tornado中使用celery实现异步任务处理之一

一.简介 tornado-celery是用于Tornado web框架的非阻塞 celery客户端. 通过tornado-celery可以将耗时任务加入到任务队列中处理, 在celery中创建任务,tornado中就可以像调用AsyncHttpClient一样调用这些任务. ? Celery中两个基本的概念:Broker.Backend Broker : 其实就是一开始说的 消息队列 ,用来发送和接受消息. Broker有几个方案可供选择:RabbitMQ,Redis,数据库等 Backend:

Python Celery队列

Celery队列简介: Celery 是一个 基于python开发的分布式异步消息任务队列,通过它可以轻松的实现任务的异步处理, 如果你的业务场景中需要用到异步任务,就可以考虑使用celery. 使用场景: 1.你想对100台机器执行一条批量命令,可能会花很长时间 ,但你不想让你的程序等着结果返回,而是给你返回 一个任务ID,你过一段时间只需要拿着这个任务id就可以拿到任务执行结果, 在任务执行ing进行时,你可以继续做其它的事情. 2.你想做一个定时任务,比如每天检测一下你们所有客户的资料,如

Celery 使用(一)

Celery 使用(一) 架构 Producer:任务发布者: Celery Beat:任务调度器,Beat进程会读取配置文件中的内容,周期性的将配置中到期需要执行的任务发送给任务队列: Broker:消息代理,接受生产者的任务消息,存进队列后发送给消费者: Celery Worker:执行任务的消费者: Result Backend:保存消费者执行任务完后的结果: 如下图: 整体的流程,任务发布者或者是任务调度,将任务发送到消息代理中,接着消息代理将任务发送给消费者来执行,其执行完后将结果保存

celery和supervisor配合使用,实现supervisor管理celery进程

在这里我选择redis作为celery异步任务的中间人,系统选择CentOS6.5 64位.redis.celery和supervisor的安装参见官方文档. 安装完毕后: 1, 创建celery的实例/home/user_00/learn/tasks.py文件 tasks.py: # -*-coding:utf-8-*- from celery import Celery, platforms app = Celery('tasks', backend='redis://localhost:6