Python3 通过 kombu 连接 RabbitMQ 的基本用法

【RabbitMQ 服务器】

# 在 vhosttest 里面有 exchangetest 和 queuetest 通过 rkeytest 绑定
Broker: 192.168.0.xx
virtual host: vhosttest
Exchange: exchangetest 
Queue: queuetest 
Routing key: rkeytest

【Python 环境】

OS: Windows 10
Python: 3.6.3 x64
kombu: 4.1.0

【查看队列状态】

# 通过浏览器查看队列状态
http://192.168.0.xx:15672/api/queues/vhosttest/queuetest  

# 通过命令行查看队列状态curl -u user:password 
http://192.168.0.xx:15672/api/queues/vhosttest/queuetest  |  jq 

# 通过命令行查看队列长度(messages = messages_ready + messages_unacknowledged)
curl -s -u user:password http://192.168.0.xx:15672/api/queues/vhosttest/queuetest  | \    
    jq '.messages'

【send.py】

#encoding: utf-8
#author: walker
#date: 2018-03-09
#summary: 发送方/生产者

import os, sys, time
from kombu import Connection

def Main():
	with Connection('amqp://test:[email protected]:5672/vhosttest') as conn:
		with conn.channel() as channel:
			#producer = Producer(channel)
			producer = channel.Producer()

			while True:
				message = time.strftime('%H:%M:%S', time.localtime())
				producer.publish(
						body=message,
						retry=True,
						exchange='exchangetest',
						routing_key='rkeytest'
					)
				print('send message: %s' %  message)     

				while True:
				        # 检查队列,以重新得到消息计数
					queue = channel.queue_declare(queue='queuetest', passive=True)     
					messageCount = queue.message_count
					print('messageCount: %d' % messageCount)
					if messageCount < 100:
						break
					time.sleep(1)

if __name__ == '__main__':
	Main()

【recv.py】

#encoding: utf-8
#author: walker
#date:  2018-03-09
#summary: 接收方/消费者

import os, sys, time
from kombu import Connection, Queue
from kombu.mixins import ConsumerMixin

class C(ConsumerMixin):

	def __init__(self, connection, queueNmae):
		self.connection = connection
		self.queues = [Queue(queueNmae, durable=False)]

	def get_consumers(self, Consumer, channel):
		return [
			Consumer(self.queues, callbacks=[self.on_message]),
		]

	# 接收处理消息的回调函数
	def on_message(self, body, message):
		print("Received %s" % body)
		message.ack()

def Main():
	with Connection('amqp://test:[email protected]:5672/vhosttest') as conn:
		C(conn, 'queuetest').run()					

if __name__ == '__main__':
	Main()	

【相关阅读】

*** walker ***

原文地址:http://blog.51cto.com/walkerqt/2084510

时间: 2024-11-03 12:16:33

Python3 通过 kombu 连接 RabbitMQ 的基本用法的相关文章

使用php-amqplib连接rabbitMQ 学习笔记及总结

1.使用composer安装php-amqplib 在你的项目中添加一个 composer.json文件: { "require": { "php-amqplib/php-amqplib": "2.6.*" } } 只要你已经安装Composer功能,你可以运行以下: $ composer install 已经存在的项目则执行 $ composer update这时在verdor目录就已经下载完毕 具体可以参考官方文档:https://githu

Python3.x与Python2.x的差异用法

Python3.x与Python2.x的差异用法 1,Python2.x:import urllib2 Python3.x:import urllib.request 2,Python2.x: urllib2.URLError, e: Python3.x:urllib.request.URLError as e: 3,Python2.x:print 'hello world' Python3.x: print('hello world') 4,Python2.x:有raw_input和input

RabbitMQ学习第一记:用java连接RabbitMQ

1.什么是RabbitMQ MQ(Message Queue):消息队列,是服务端设计的一个可以存储大量消息的队列,并提供客户端操作队列的方法:生产队列(向队列中添加数据).消费队列(从队列中取数据).RabbitMQ就是基于消息队列的一个典型应用.RabbitMQ除了普通的生产消费功能,还有一些高级功能:公平分发 ,轮询分发,路由模式,通配符模式,发布订阅,队列持久化. 2.java实现RabbitMQ的连接 2.1.RabbitMQ客户端jar包 <dependency><group

python3.4怎么连接mysql pymysql连接mysql数据库

本文介绍了python3 4连接mysql数据库的方法,在python3 4中使用原来python2 7的mysqldb已不能连接mysql数据库了,可以使用pymysql. 在python3.4中使用原来python2.7的mysqldb已不能连接mysql数据库了,可以使用pymysql,来完成连接mysql的重任. 具体步骤: 序号 描述1 去github上下载pymysql的安装包pymysql https://github.com/PyMySQL/PyMySQL2 解压到某个盘符下3 

python3使用PyMysql连接mysql数据库

python语言的3 x完全不向前兼容,导致我们在python2 x中可以正常使用的库,到了python3就用不了了 比如说mysqldb目前MySQLdb并不支持python3 python语言的3.x完全不向前兼容,导致我们在python2.x中可以正常使用的库,到了python3就用不了了.比如说mysqldb 目前MySQLdb并不支持python3.x , Python3.x连接MySQL的方案有:oursql, PyMySQL, myconnpy 等 下面来说下python3如何安装

远程连接RabbitMQ失败

为了避免污染宿主系统环境,于是在虚拟机中搭建了一个linux环境并且按照了rabbitmq-server.然后在远程连接的时候一直连接失败. 官网上面给的例子都是在本地使用系统默认的guest用户连接的.没有给出远程连接的例子,于是阅读文档发现: When the server first starts running, and detects that its database is uninitialised or has been deleted, it initialises a fre

【EasyNetQ 教程】- 连接RabbitMQ

如果您习惯于处理与SQL Server等关系数据库的连接,那么您可能会发现EasyNetQ处理连接的方式有点奇怪.与关系数据库的通信始终由客户端启动.客户端打开连接,发出SQL命令,在必要时处理结果,然后关闭连接.一般的建议是,您应该在尽可能短的时间内保持打开连接,并将连接池保留给API. 与RabbitMQ等消息代理进行交谈有点不同,因为连接往往会持续应用程序的生命周期.通常,您将打开连接,创建订阅,然后等待任何消息到达打开的连接.EasyNetQ不假设经纪人随时可用.相反,它采用延迟连接方法

Java连接RabbitMQ之创建连接

依赖包: 1 <dependencies> 2 <dependency> 3 <groupId>junit</groupId> 4 <artifactId>junit</artifactId> 5 <version>4.12</version> 6 <scope>test</scope> 7 </dependency> 8 9 <!-- https://mvnrepos

Python3安装cx_Oracle连接oracle数据库实操总结

首先安装配置时,必须把握一个点,就是版本一致!包括:系统版本,python版本,oracle客户端的版本,cx_Oracle的版本,然后安装配置就容易了! 如果已经安装Python,查看你安装的Python版本是多少位的: 当然,你64位的操作系统也是可以安装32位的开发环境.反之则不行!切记! oracle客户端的版本,cx_Oracle的版本,要与Python版本和位数对应: 比如: Python版本:Python3.4.3   32位: cx_Oracle的版本:cx_Oracle-5.2