操作系统环境
CentOS 7.4 X64
- rabbitmq-server
# yum install -y epel-release
# yum install erlang
# yum install -y rabbitmq-server
也可以添加-detached属性来后台运行
rabbitmq-server -detached
不要kill停止RabbitMQ,使用rabbitmqctl命令
rabbitmqctl stop
配置用户和权限
[[email protected] soft]# rabbitmqctl add_user my_user my_password
Creating user "my_user" ...
...done.
[[email protected] soft]# rabbitmqctl add_vhost my_vhost
Creating vhost "my_vhost" ...
...done.
[[email protected] app]# rabbitmqctl set_permissions -p my_vhost my_user ".*" ".*" ".*"
Setting permissions for user "my_user" in vhost "my_vhost" ...
...done.
[[email protected] ~]# rabbitmqctl set_user_tags my_user administrator
Setting tags for user "my_user" to [administrator] ...
...done.
[[email protected] ~]#
配置web管理界面(需要重启服务)
[[email protected] ~]# rabbitmq-plugins enable rabbitmq_management
The following plugins have been enabled:
mochiweb
webmachine
rabbitmq_web_dispatch
amqp_client
rabbitmq_management_agent
rabbitmq_management
Plugin configuration has changed. Restart RabbitMQ for changes to take effect.
[[email protected] ~]#
管理访问
http://rabbitmqserver_ip:15672/
使用上述用户登录查看队列情况
2.安装python库
# yum install -y python-pip
# pip install --upgrade pip
# pip --version
pip 9.0.1 from /usr/lib/python2.7/site-packages (python 2.7)
#
# pip install Flask
# pip install celery
3.测试程序
[[email protected] app]# more tasks.py
from celery import Celery
app = Celery(‘tasks‘, broker=‘amqp://my_user:[email protected]/my_vhost‘, backend=‘amqp‘)
@app.task
def add(x,y):
return x + y
[[email protected] app]#
[[email protected] app]#
启动worker
[[email protected] app]# celery -A tasks worker --loglevel=info
/usr/lib/python2.7/site-packages/celery/platforms.py:795: RuntimeWarning: You‘re running the worker with superuser privileges: this is
absolutely not recommended!
Please specify a different user using the -u option.
User information: uid=0 euid=0 gid=0 egid=0
uid=uid, euid=euid, gid=gid, egid=egid,
/usr/lib/python2.7/site-packages/celery/backends/amqp.py:68: CPendingDeprecationWarning:
The AMQP result backend is scheduled for deprecation in version 4.0 and removal in version v5.0. Please use RPC backend or a persistent backend.
alternative=‘Please use RPC backend or a persistent backend.‘)
-------------- [email protected] v4.1.0 (latentcall)
---- **** -----
--- * *** * -- Linux-3.10.0-693.el7.x86_64-x86_64-with-centos-7.4.1708-Core 2017-11-06 17:43:36
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app: tasks:0x2099210
- ** ---------- .> transport: amqp://my_user:**@localhost:5672/my_vhost
- ** ---------- .> results: amqp://
- *** --- * --- .> concurrency: 1 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
.> celery exchange=celery(direct) key=celery
[tasks]
. tasks.add
[2017-11-06 17:43:36,345: INFO/MainProcess] Connected to amqp://my_user:**@127.0.0.1:5672/my_vhost
[2017-11-06 17:43:36,366: INFO/MainProcess] mingle: searching for neighbors
[2017-11-06 17:43:37,398: INFO/MainProcess] mingle: all alone
[2017-11-06 17:43:37,435: INFO/MainProcess] [email protected] ready.
调用任务
[[email protected] app]# python
Python 2.7.5 (default, Aug 4 2017, 00:39:18)
[GCC 4.8.5 20150623 (Red Hat 4.8.5-16)] on linux2
Type "help", "copyright", "credits" or "license" for more information.
>>>
>>> from tasks import add
>>> result = add.delay(3, 5)
>>> result.ready()
True
>>> result.get()
8
>>>
work输出
[[email protected] app]# celery -A tasks worker --loglevel=info
/usr/lib/python2.7/site-packages/celery/platforms.py:795: RuntimeWarning: You‘re running the worker with superuser privileges: this is
absolutely not recommended!
Please specify a different user using the -u option.
User information: uid=0 euid=0 gid=0 egid=0
uid=uid, euid=euid, gid=gid, egid=egid,
/usr/lib/python2.7/site-packages/celery/backends/amqp.py:68: CPendingDeprecationWarning:
The AMQP result backend is scheduled for deprecation in version 4.0 and removal in version v5.0. Please use RPC backend or a persistent backend.
alternative=‘Please use RPC backend or a persistent backend.‘)
-------------- [email protected] v4.1.0 (latentcall)
---- **** -----
--- * *** * -- Linux-3.10.0-693.el7.x86_64-x86_64-with-centos-7.4.1708-Core 2017-11-06 17:43:36
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app: tasks:0x2099210
- ** ---------- .> transport: amqp://my_user:**@localhost:5672/my_vhost
- ** ---------- .> results: amqp://
- *** --- * --- .> concurrency: 1 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
.> celery exchange=celery(direct) key=celery
[tasks]
. tasks.add
[2017-11-06 17:43:36,345: INFO/MainProcess] Connected to amqp://my_user:**@127.0.0.1:5672/my_vhost
[2017-11-06 17:43:36,366: INFO/MainProcess] mingle: searching for neighbors
[2017-11-06 17:43:37,398: INFO/MainProcess] mingle: all alone
[2017-11-06 17:43:37,435: INFO/MainProcess] [email protected] ready.
------------------------------------------- new output ----------------------------------------------------
[2017-11-06 17:45:31,929: INFO/MainProcess] Received task: tasks.add[4c381059-b943-4a4c-8975-15505674504f]
[2017-11-06 17:45:31,982: INFO/ForkPoolWorker-1] Task tasks.add[4c381059-b943-4a4c-8975-15505674504f] succeeded in 0.0510413989978s: 8