RabbitMQ消息分发轮询

我们首先下载pika,以及rabbitMQ,和ir语言,rabbitMQ是由ir语言编写的

消息队列的使用过程大概如下:

(1)客户端连接到消息队列服务器,打开一个channel。

channel:消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务。

(2)客户端声明一个exchange,并设置相关属性。

Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。

(3)客户端声明一个queue,并设置相关属性。

Queue:消息队列载体,每个消息都会被投入到一个或多个队列。

(4)客户端使用routing key,在exchange和queue之间建立好绑定关系。

Routing Key:路由关键字,exchange根据这个关键字进行消息投递。

(5)客户端投递消息到exchange。

Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。

接下来写一个生产者:

import pika
connection = pika.BlockingConnection(
   pika.ConnectionParameters(‘localhost‘))
channel = connection.channel()#先通过socket建立一个实例,创建一个新的频道

# 声明queue
channel.queue_declare(queue=‘hello‘)# 注意此处需要声明一个管道或者称之为队列,在此处出发消息 同时客户端与服务端都需要

# n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange.
channel.basic_publish(exchange=‘‘,
                     routing_key=‘hello‘,#queue名字#路由键,写明将消息发往哪个队列,本例是将消息发往队列pikamq
                     body=‘Hello World!‘)# 消息内容
print(" [x] Sent ‘Hello World!‘")# 当生产者发送完消息后,可选择关闭连接
connection.close()

消费者:

import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(‘localhost‘ ))
channel = connection.channel()
channel.queue_declare(queue=‘hello‘)
def callback(ch, method, properties, body):
         print(‘--->‘,ch,method,properties)
         print(" [x] Received %r"%body)
channel.basic_consume(callback,
                     queue=‘hello‘,
                     #no_ack=True  #此处有的代码加了,但是python系统会自带,同时加了之后,一旦等待时间过长,生产者发送的消息,无法转移到另一个消费者中
                     )
channel.start_consuming()

    时间: 2024-11-08 18:54:23

    RabbitMQ消息分发轮询的相关文章

    RabbitMQ消息分发轮询和Message Acknowledgment

    一.消息分发 RabbitMQ中的消息都只能存储在Queue中,生产者(下图中的P)生产消息并最终投递到Queue中,消费者(下图中的C)可以从Queue中获取消息并消费. 多个消费者可以订阅同一个Queue,这时Queue中的消息会被平均分摊给多个消费者进行处理,而不是每个消费者都收到所有的消息并处理. 启动3个消费者 生产者依次生成3条消息 可见3条消息分别被3个消费者获取,所以RabbitMQ是采用轮询机制将消息队列Queue中的消息依次发给不同的消费者 二.消息确认(Message Ac

    java用while循环设计轮询线程的性能问题

    java用while循环设计轮询线程的性能问题 轮询线程在开发过程中的应用是比较广泛的,在这我模拟一个场景,有一个队列和轮询线程,主线程往队列中入队消息,轮询线程循环从队列中读取消息并打印消息内容.有点类似Android中Handler发送消息. 首先定义一个Message类. public class Message { private String content; public Message(String content) { this.content=content; } public

    RabbitMQ学习第二记:工作队列的两种分发方式,轮询分发(Round-robin)和 公平分发(Fair dispatch)

    1.什么是RabbitMQ工作队列 我们在应用程序使用消息系统时,一般情况下生产者往队列里插入数据时速度是比较快的,但是消费者消费数据往往涉及到一些业务逻辑处理导致速度跟不上生产者生产数据.因此如果一个生产者对应一个消费者的话,很容易导致很多消息堆积在队列里.这时,就得使用工作队列了.一个队列有多个消费者同时消费数据. 下图取自于官方网站(RabbitMQ)的工作队列的图例 P:消息的生产者 C1:消息的消费者1 C2:消息的消费者2 红色:队列 生产者将消息发送到队列,多个消费者同时从队列中获

    (转)RabbitMQ消息队列(三):任务分发机制

    在上篇文章中,我们解决了从发送端(Producer)向接收端(Consumer)发送“Hello World”的问题.在实际的应用场景中,这是远远不够的.从本篇文章开始,我们将结合更加实际的应用场景来讲解更多的高级用法. 当有Consumer需要大量的运算时,RabbitMQ Server需要一定的分发机制来balance每个Consumer的load.试想一下,对于web application来说,在一个很多的HTTP request里是没有时间来处理复杂的运算的,只能通过后台的一些工作线程

    RabbitMQ消息队列(三):任务分发机制

    在上篇文章中,我们解决了从发送端(Producer)向接收端(Consumer)发送“Hello World”的问题.在实际的应用场景中,这是远远不够的.从本篇文章开始,我们将结合更加实际的应用场景来讲解更多的高级用法. 当有Consumer需要大量的运算时,RabbitMQ Server需要一定的分发机制来balance每个Consumer的load.试想一下,对于web application来说,在一个很多的HTTP request里是没有时间来处理复杂的运算的,只能通过后台的一些工作线程

    长轮询实现消息推送

    一.应用场景 浏览器与服务器之间保持一个长连接(http链接),服务器有最新的数据生成时及时推送到前端展现.典型场景:新邮件到达通知. 二.业界常用的解决方案 定时轮询,长轮询,websocket(HTML5新增的能力) 其中长轮询兼容性较好,应用的较为广泛,但是切忌在移动网络中应用该技术. 三.长连接前端代码 /** *pns模型层 *@constructs M2012.Model.Pns.PnsModel *@extends Backbone.Model *@example *new M20

    Ajax轮询消息自动提示(消息盒子)

    经过一下午写了个消息盒子的例子,用的是ajax方式轮询读取,没有用到后台自动“推”数据的方式,效果良好. <%@ Page Language="C#" AutoEventWireup="true" CodeBehind="mainTalk.aspx.cs" Inherits="wj_test.Talk.mainTalk" %> <!DOCTYPE html PUBLIC "-//W3C//DTD X

    rabbitmq 公平分发和消息接收确认(转载)

    原文地址:http://www.jianshu.com/p/f63820fe2638 当生产者投递消息到broker,rabbitmq把消息分发到消费者. 如果设置了autoAck=true 消费者会自动确认收到信息.这时broker会立即将消息删除,这种情况下如果消费者出现异常(连接中断)该消息就会丢失.为了保证消息能够被正确的消费,rabbitmq支持消息确认. String basicConsume(String queue, boolean autoAck, Consumer callb

    关于android 消息轮询处理

    android 中涉及到服务器中数据变化信息通知用户一般有两种 办法,推送和轮询,消息推送是服务端主动发消息给客户端,因为第一时间知道数据变化是服务器自己,所以推送的优势是实时性高,但服务器主动推送需要开发一套能让客户端持久链接的服务器 现在已经有很多开源的代码实现了基于XMMP 协议的推送方案,而且还可以使用谷歌的推送方案,但有些情况并不需要服务端主动推送二是在一定的时间间隔客户端发起查询 private MyThread myThread; private NotificationManag