接口使用两个queue监听信息,且有两个测试环境,所以需要向mq中发送测试数据:
python使用pika包:Pika is a RabbitMQ (AMQP-0-9-1) client library for Python.
可以参照: https://github.com/pika/pika
import pika connection = pika.BlockingConnection() channel = connection.channel() channel.basic_publish(exchange=‘example‘, routing_key=‘test‘, body=‘Test Message‘) connection.close()
将发送消息封装成一个函数:stream.py
具体代码如下:
import pikaimport jsonfrom config import env def send_msg(msg_exchange, msg_key, msg, msg_type=None): if str(env.__name__) == ‘Beta‘: if msg_type is None: connection = pika.BlockingConnection(pika.URLParameters( ‘amqp://talaris:[email protected]:5672/talaris‘)) else: connection = pika.BlockingConnection(pika.URLParameters( ‘amqp://talaris:[email protected]:5672/clair‘)) elif str(env.__name__) == ‘Alpha‘: if msg_type is None: connection = pika.BlockingConnection(pika.URLParameters( ‘amqp://talaris:[email protected]:5672/talaris‘)) else: connection = pika.BlockingConnection(pika.URLParameters( ‘amqp://talaris:[email protected]:5672/clair‘)) channel = connection.channel() channel.basic_publish(exchange=msg_exchange, routing_key=msg_key, body=json.dumps(msg)) print(" [x] Sent %r:%r" % (msg_key, msg)) connection.close() ps: 我用的是pika.URLParameters,也可以使用 pika.ConnectionParameters(host=‘localhost‘),详细参考github
时间: 2024-12-09 20:10:25