【4】发布与订阅

在上一节我们创建了一个工作队列,并且假设工作队列把每一个任务都准确地分发给一个worker。在本章我们将创建一个更加复杂的例子–我们将把一个消息分发到多个消费者。这个模式就是发布/订阅。
为了说明这个模式,我们将构建一个简单的日志系统。它包含两个程序–第一个用来发出日志消息,第二个将接收并打印它们。
在我们的日志系统中,每个运行的receiver都将获得消息。这样我们通过一个receiver接收消息并直接记录到磁盘中;同时我们可以运行另一个receiver在屏幕上打印日志。
从本质上讲,日志消息是通过广播分发给所有的receivers.

Exchanges
在前面的章节中,我们使用了一个队列来发送和接收消息。而现在我们将完整地介绍RabbitMQ的消息模型。
首先我们先回顾一下前面的例子:

  • 生产者:用来发送消息的应用程序
  • 队列:存储消息的缓存区
  • 消费者:用来消费消息的应用程序

RabbitMQ消息模型的核心思想就是生产者不会直接向队列发送消息。实际上,生产者根本不知道消息是否发送到了任何队列。
相反,生产者只能发送消息给交换器exchange(后面统称交换器).交换器原理非常简单,一方面它从生产者接收消息而另一方面把消息推送给消息队列。 交换器必须知道如何处理接收到的每一个消息。是直接添加到特定的队列呢?还是添加到多个队列?再或者是直接丢弃。消息的处理方式取决于交换器的类型。

交换器的类型有:direct, topic, headers 和 fanout。本章我们主要关注第四个类型:fanout.下面我们创建一个该类型的交换器,并命名为logs:

channel.exchangeDeclare("logs", "fanout");

fanout交换器非常简单。根据名字你可能就猜到,它是把所有接收到消息都广播给其知道的队列。这也正是我们的日志记录器需要的。

交换器列表
我们可以执行命令rabbitmqctl来列出服务器的交换器列表:

$ sudo rabbitmqctl list_exchangesListing exchanges ...        directamq.direct      directamq.fanout      fanoutamq.headers     headersamq.match       headersamq.rabbitmq.log        topicamq.rabbitmq.trace      topicamq.topic       topiclogs    fanout...done

列表中有一些以amq.*开头的交换器以及未命名(默认)的交换器,这些交换器都是系统默认创建的,但是你不太可能用到它们。

无名交换器
在前面的例子中我们没有使用任何交换器,但是我们依然可以将消息发送到队列中。这是由于我们通过设置了名字为””字串,进而使用了默认的交换器。
回顾一下我们之前发送消息的方式:

channel.basicPublish("", "hello", null, message.getBytes());

第一个参数就是交换器的名称。空字符串表示使用默认的或者无名的交换器:消息被路由到routingKey指定的队列。

通过下面的代码我们就可以向我们的命名交换器发送消息:

channel.basicPublish( "logs", "", null, message.getBytes());

临时队列
你是否还记得前面的例子中我们使用的指定命名的队列么?如hello和task_queue。当你想在生产者与消费者之间的共享队列,那么给这个队列命名是非常重要的。
但是我们这个例子的日志记录器并不需要这样。我们希望收集到所有的日志信息而不是其中的一个子集。我们只对当前的消息流感兴趣而不会关注旧消息。为了实现这个例子,我们需要做两件事。
首先,任何时候连接到RabbitMQ,我们都需要一个新的、空的队列。如何做?我们可以创建一个具有随机命名的队列或者也可以让服务器来选择一个随机命名的队列,这样更好。
第二件事就是,一旦我们消费者的连接,队列应该能自动被删除。
在java客户端,我们可以无参调用queueDeclare()方法来创建一个非持久的,专属的,能被自动删除的队列;该方法返回队列的名称。

String queueName = channel.queueDeclare().getQueue();

变量queueName的内容就是队列的随机名称,类似字串’amq.gen-JzTY20BRgKO-HjmUJj0wLg’.

Bindings

前面我们已经创建了一个fanout类型的交换器和一个队列。现在我们就可以告诉交换器发送消息给队列了。交换器与队列之间的关系称作binding

channel.queueBind(queueName, "logs", "");

现在logs交换器将向我们创建的队列追加消息。

bindings列表
我们可以使用命令rabbitmqctl list_bindings来列出所有的bindings.

执行代码

用来发送日志消息的生产者代码看起来与之前的没什么区别。比较重要的改变就是我们把消息发送到了名为logs的交换器而不是系统默认的无名交换器。在发送消息时我们需要提供一个routingKey,但fanout类型的交换器会忽略该值的意义。下面是完整代码:

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;

public class EmitLog {

  private static final String EXCHANGE_NAME = "logs";

  public static void main(String[] argv) throws Exception {

    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

    String message = getMessage(argv);

    channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
    System.out.println(" [x] Sent ‘" + message + "‘");

    channel.close();
    connection.close();
  }
  
  private static String getMessage(String[] strings){
    if (strings.length < 1)
    	    return "info: Hello World!";
    return joinStrings(strings, " ");
  }
  
  private static String joinStrings(String[] strings, String delimiter) {
    int length = strings.length;
    if (length == 0) return "";
    StringBuilder words = new StringBuilder(strings[0]);
    for (int i = 1; i < length; i++) {
        words.append(delimiter).append(strings[i]);
    }
    return words.toString();
  }
}

如你所见,在建立连接后,我们就创建一个fanout的交换器,这一步是必须。禁止向一个不存在的交换器发送消息。
假如没有队列绑定到这个交换器,那么发送的消息就会丢失,但这在我们这个例子中影响不大。如果没有消费者在监听消息队列,我们可以安全地丢弃消息。
完整的ReceiveLogs.java代码如下:

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;

public class ReceiveLogs {

  private static final String EXCHANGE_NAME = "logs";

  public static void main(String[] argv) throws Exception {

    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
    String queueName = channel.queueDeclare().getQueue();
    channel.queueBind(queueName, EXCHANGE_NAME, "");
    
    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

    QueueingConsumer consumer = new QueueingConsumer(channel);
    channel.basicConsume(queueName, true, consumer);

    while (true) {
      QueueingConsumer.Delivery delivery = consumer.nextDelivery();
      String message = new String(delivery.getBody());

      System.out.println(" [x] Received ‘" + message + "‘");   
    }
  }
}
时间: 2024-10-27 12:08:42

【4】发布与订阅的相关文章

Redis - 发布与订阅

发布与订阅(也叫PUB/SUB)的特点是订阅者(Listener)负责订阅频道(Channel),发送者(Publisher)负责向频道发送二进制字符串消息. 每当有消息被发送至给定频道时,频道的所有订阅者都会收到消息.也可以把频道看作是电台,其中订阅者可以同时收听多个电台,而发送者则可以在任何电台发送消息. 使用Redis订阅与发布功能应该注意. 第一个,于Redis系统的稳定性有关. 对于旧版的Redis,如果一个客户端订阅了某个或某些频道,但它读取消息的速度不够快的话,那么不断挤压的消息会

发布与订阅SQLServer

SQLServer 中发布与订阅 在对数据库做迁移的时候,会有很多方法,用存储过程,job,也可以用开源工具lettle,那么今天这些天变接触到了一种新的方法,就是SqlServer中自带的发布与订阅. 首先说明一下数据复制的流程.如下图A是(192.168.210.170)上的数据库,B是(172.23.100.109)上的数据库.把B当作数据源,然后A从B上获取数据. 发布前准备:首先两个服务器之间要能相互通讯,也就是能ping命令能通.   平时我们链接数据库的时候,经常都是用的ip登陆,

文成小盆友python-num12 Redis发布与订阅补充,python操作rabbitMQ

本篇主要内容: redis发布与订阅补充 python操作rabbitMQ 一,redis 发布与订阅补充 如下一个简单的监控模型,通过这个模式所有的收听者都能收听到一份数据. 用代码来实现一个redis的订阅者何消费者. 定义一个类: import redis class Redis_helper(): def __init__(self): self.__conn = redis.Redis(host='192.168.11.87') #创建一个连接 def pub(self, mes, c

SqlServer2005 数据库发布、订阅配置图文详解

一:准备条件 <1>软件准备条件 机器A端:SqlServer2005 Management Studio + WinServer 2003 Enterprise (作为发布服务器) 机器B端:Sqlserver2005 Management Studio Express + WinXP(作为订阅服务器) (可以用别的,不过订阅服务器版本不得高于发布服务器版本) <2>数据库复制准备条件 1. 所有被同步的数据表尽量要用主键,如果没有主键也没有关系,SqlServer会提示为表自动

【转】SQL Server 2008 数据库同步的两种方式 (发布、订阅)

上篇中说了通过SQL JOB的方式对数据库的同步,这一节作为上一节的延续介绍通过发布订阅的方式实现数据库之间的同步操作.发布订阅份为两个步骤:1.发布.2.订阅.首先在数据源数据库服务器上对需要同步的数据进行发布,然后在目标数据库服务器上对上述发布进行订阅.发布可以发布一张表的部分数据,也可以对整张表进行发布.下面分别介绍发布.订阅的过程. 一.发布.发布需要用实际的服务器名称,不能使用服务器的IP地址进行.能发布的信息包括[表].[存储过程].[用户函数]如果使用IP会有错误,如下图: 具体发

自学总结redis第三部分(安全性、主从、哨兵、事物、持久化、发布与订阅、虚拟内存)

八.redis的安全性 因为redis速度相当快,所以在一台比较好的服务器下,一个外部用户在一秒内可以进行15W次的密码尝试,这意味着需要设定非常强大的密码来防止暴力破解. 可以通过设置密码以及登录redis方式来操作,具体参考 九.redis主从复制 9.1简介 1.Master可以拥有多个slave. 2.多个slave可以连接同一个master外,还可以连接到其他的slave. 3.主从复制不会阻塞master,在同步数据时,master可以继续处理client请求. 4.提供系统的伸缩性

Python redis 发布和订阅

发布和订阅 类似于RSS发布者:服务器订阅者:Dashboad和数据处理看下面代码:类文件名:monitor.py: #!/usr/bin/python # -*- coding: utf-8 -*- __author__ = 'gaogd' import redis class RedisHelper:     def __init__(self):         self.__conn = redis.Redis(host='192.168.10.12', port=6379, passw

发布和订阅

发布和订阅是一种消息通信模式. 优点:使消息订阅者和消息发布者耦合度降低,类似设计模式中的观察者模式. Redis 的发布和订阅 发布和订阅 订阅的命令如下: // 订阅一个或多个频道 subscribe channel1 channel2 channel3 ... // 模式订阅,频道参数类似正则表达式 psubscribe abc* xyz* ... 发布命令如下: publish channel msg 启动一个订阅者客户端 X 订阅 cctv-1 返回三个参数:subscribe 订阅成

SQL SERVER发布与订阅

一.配置分发 1.配置分发服务器,注:配置发布与订阅,连接SQLSERVER必须用服务器名登录 2.配置分发 3.选择分发服务器 4.选择快照文件夹 5.设置此文件夹的读写权限为everyone 6.选择分发数据库路径 7.配置分发 8.配置分发 9.配置分发完成 二.新建发布 1.新建发布 2.选择发布数据库 3.选择发布类型 4.选择发布对象 5.指定何时运行快照代理 6.代理安全性 7.创建发布 8.填写发布名称 9.新建发布完成 三.新建订阅 1. 新建订阅 2. 查找sqlserver

MSSQL复制中的发布与订阅

准备条件 1.2台服务器 2.WINDOWS SERVER 2008 64bit + 3.SQL SERVER 2008 R2 + 4.MSSQLSERVER服务与MSSQLAGENT服务正常运行中 三步曲 第一步 配置分发服务器 第二步 配置发布服务器 第三步 配置订阅服务器 小技巧 1.一般来说,分发服务器与发布服务器在同一个服务器上的同一个实例,也可不同实例,同样也可不同服务器,而订阅服务器存在于单独的服务器上.发布与订阅存在1对多关系,1个发布可以同时存在多个订阅. 2.配置订阅服务器可