发布订阅

"发布订阅"

三、发布订阅

上一节的练习中我们创建了一个工作队列。队列中的每条消息都会被发送至一个工作进程。这节,我们将做些完全不同的事情——我们将发送单个消息发送至多个消费者。这种模式就是广为人知的“发布订阅”模式。

为了说明这种模式,我们将构建一个简单的日志系统。包括2个应用程序,一个传送日志消息另一个接收并打印这些消息。

我们的日志系统中每一个运作的接收端程序都会收到这些消息。这种方式下,我们就可以运行一个接收端发送日志消息至硬盘,同时可以运行另一个接收端将日志打印到屏幕上。

理论上讲,已发布的日志消息将会被广播到所有的接收者。

交换器(Exchange

之前的几节练习中我们发送接收消息都是在队列中进行,是时候介绍下RabbitMQ完整的消息传递模式了。

先来迅速的回顾下我们之前章节:

  • 一个生产者就是一个用来发送消息的应用程序
  • 一个 队列好比存储消息的缓存buffer
  • 一个消费者就是一个用户应用程序用来接收消息

RabbitMQ消息传递模型的核心思想是生产者从来不会直接发送消息至队列。事实上,生产者经常都不知道消息会被分发至哪个队列。

相反的是,生产者仅仅发送消息至交换器。交换器是非常简单的东西:一边从生产者那边接收消息一边发送这些消息至队列。交换器必须准确的知道这些被接收的消息该如何处理。它应该被添加到某个特定队列?或者添加到多个队列?甚至直接放弃。具体的传输规则就是通过交换器类型来定义的。

交换器类型有四种:direct、topic、headers、fanout。这节我们主要关注最后一种——fanout。让我们来创建一个fanout类型的交换器,命名为logs:

err = ch.ExchangeDeclare(
  "logs",   // name
  "fanout", // type
  true,     // durable
  false,    // auto-deleted
  false,    // internal
  false,    // no-wait
  nil,      // arguments
)

正如你从名字中猜测的一样,它仅仅广播所有消息到所有已知的接收队列。实际上这正是我们需要的日志系统。

备注:之前的几节练习中我们并不知道交换器,但我们依然能够将消息发送至队列中,之所以可以实现是因为我们使用了默认的交换器,使用空字符串表示。

回顾下之前我们发送消息是这样子的:

err = ch.Publish(
  "",     // exchange
  q.Name, // routing key
  false,  // mandatory
  false,  // immediate
  amqp.Publishing{
    ContentType: "text/plain",
    Body:        []byte(body),
  })

这里我们可以使用默认也可以自己命名交换器:如果路由键存在的话,消息会被路由到加上路由键参数的地址,注意fanout类型会直接忽略路由键的存在。

以下是修改后的代码:

err = ch.ExchangeDeclare(
  "logs",   // name    定义一个名为logs的交换器
  "fanout", // type    交换器类型为fanout即广播类型
  true,     // durable    持久化
  false,    // auto-deleted        无队列绑定时是否自动删除
  false,    // internal
  false,    // no-wait
  nil,      // arguments
)
failOnError(err, "Failed to declare an exchange")

body := bodyFrom(os.Args)
err = ch.Publish(
  "logs", // exchange    指定消息发送的交换器名称
  "",     // routing key    因为fanout类型会自动忽略路由键,所以这里的路由键参数任意,一般不填
  false,  // mandatory
  false,  // immediate
  amqp.Publishing{
          ContentType: "text/plain",
          Body:        []byte(body),
  })

临时队列

你可能记得之前我们声明队列的时候都会指定一个队列名称(记得hello和task_queue?)。队列的命名对我们来说至关重要——我们需要将工作进程指向同一个队列。当你需要在消费者和生产者之间共享队列的话声明队列就显得很重要。

但这对我们的日志系统来说无关重要。我们需要监听的是所有的日志消息,而不是他们中的某一类。我们只关注当前流中的消息而不关注旧的那些。解决这个我们需要做两件事。

首先,每当链接RabbitMQ的时候我们需要创建一个新的、空的队列。为做到这点,我们必须创建一个名称随机的队列,甚至更好的实现方式是——让服务端给我们自动生成一个随机的队列。

其次,一旦消费者链接断开,该队列便会自动删除。

在amqp客户端中,当我们给一个队列名称设定为空字符串时,我们就创建了一个非持久化的生成队列:

q, err := ch.QueueDeclare(
  "",    // name    满足第一点:服务端自动产生随机队列
  false, // durable
  false, // delete when usused
  true,  // exclusive   满足第二点:连接断开立即删除
  false, // no-wait
  nil,   // arguments
)

当该方法返回的时候,声明好的队列便包含一个由RabbitMQ生成的随机队列名称。举例来说,队列名称形如:amq.gen-JzTY20BRgKO-HjmUJj0wLg这种的。

当消费者的链接宣布关闭后,队列便像exclusive参数设置的那样,自动删除。

绑定

我们已经创建了一个fanout类型的交换器和一个队列,现在我们需要告诉交换器将消息发送至我们的队列。这种交换器和队列中的关联关系就叫做绑定。

err = ch.QueueBind(
  q.Name, // queue name    绑定的队列名称
  "",     // routing key    绑定的路由键
  "logs", // exchange    绑定的交换器名称
  false,
  nil
)

从现在起,logs交换器便能发送消息至我们的队列。

糅合在一起

生产者的程序,也就是发送消息端,跟之前几节的发送代码差不多。最重要的是我们现在要发送消息到logs交换器而非默认的交换器。发送的时候我们可以设置一个路由键,但是对于fanout类型的交换器来说它将被忽略。下面就是发送日志方的代码:

// rabbitmq_3_emit_log.go project main.go
package main

import (
    "fmt"
    "log"
    "os"
    "strings"

    "github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
    if err != nil {
        log.Fatalf("%s:%s", err, msg)
        panic(fmt.Sprintf("%s:%s", err, msg))
    }
}

func bodyForm(args []string) string {
    var s string
    if len(args) < 2 || os.Args[1] == "" {
        s = "Hello World! This is a test!"
    } else {
        s = strings.Join(args[1:], " ")
    }
    return s
}

func main() {
    conn, err := amqp.Dial("amqp://guest:[email protected]:5672/")
    failOnError(err, "failed to dial rabbitmq server")
    defer conn.Close()

    ch, err := conn.Channel()
    failOnError(err, "failed to declare the channel")
    defer ch.Close()

    //声明一个交换器,交换器名称logs,类型fanout
    err = ch.ExchangeDeclare("logs", "fanout", true, false, false, false, nil)
    failOnError(err, "failed to declare the exchange")

    body := bodyForm(os.Args)
    //发送消息到交换器
    err = ch.Publish("logs", "", false, false, amqp.Publishing{
        Body:        []byte(body),
        ContentType: "text/plain",
    })
    failOnError(err, "failed to publish the message")
}

备注:这里发送方并不需要声明队列之类的,不像之前的代码需要声明,这里的发送方唯一关联的是交换器,所以只需声明交换器并发送消息至交换器即可。

正如你想的那样,链接建立后我们声明交换器,这一步是必须的因为发送消息到一个不存在的交换器是完全禁止的。

如果该交换器上面没有队列绑定的话那么发送至该交换器的消息将全部丢失,但这对我们来时ok;如果没有消费者我们会安全地丢弃这些消息。

下面是日志接收方的代码:

// rabbitmq_3_receive_logs.go project main.go
package main

import (
    "fmt"
    "log"
    "os"
    "strings"

    "github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
    if err != nil {
        log.Fatalf("%s:%s", err, msg)
        panic(fmt.Sprintf("%s:%s", err, msg))
    }
}

func bodyForm(args []string) string {
    var s string
    if len(args) < 2 || os.Args[1] == "" {
        s = "Hello World! This is a test!"
    } else {
        s = strings.Join(args[1:], " ")
    }
    return s
}

func main() {
    conn, err := amqp.Dial("amqp://guest:[email protected]:5672/")
    failOnError(err, "failed to dial rabbitmq server")
    defer conn.Close()

    ch, err := conn.Channel()
    failOnError(err, "failed to declare the channel")
    defer ch.Close()

    //声明一个交换器,交换器名称logs,类型fanout
    err = ch.ExchangeDeclare("logs", "fanout", true, false, false, false, nil)
    failOnError(err, "failed to declare the exchange")

    //声明一个队列
    q, err := ch.QueueDeclare("", false, false, true, false, nil)
    failOnError(err, "failed to declare the queue")

    //设置绑定(第二个参数为路由键,这里为空)
    err = ch.QueueBind(q.Name, "", "logs", false, nil)
    failOnError(err, "failed to bind the queue")

    //注册一个消费者
    msgs, err := ch.Consume(q.Name, "", true, false, false, false, nil)
    failOnError(err, "Failed to register a consumer")

    forever := make(<-chan bool)

    go func() {
        for d := range msgs {
            log.Printf(" [x] %s", d.Body)
        }
    }()
    log.Printf(" [*] Waiting for logs. To exit press CTRL+C")
    <-forever
}

如果你想将日志保存到文件,执行如下命令:

go run receive_logs.go > logs_from_rabbit.log

如果你仅仅想在屏幕上查看日志,开启一个新的控制台执行如下命令:

go run receive_logs.go

当然了,你最后还要发出日志才行:

go run emit_log.go

使用rabbitmqctl list_bindings命令可以直接查看所有的绑定,如运行2个receive_logs.go程序你就会看到如下输出:

rabbitmqctl list_bindings
Listing bindings ...
logs    exchange        amq.gen-JzTY20BRgKO-HjmUJj0wLg  queue           []
logs    exchange        amq.gen-vso0PVvyiRIL2WoV3i48Yg  queue           []
...done.

实际效果:

分别开启两个控制台,均监听相同队列,同时收到消息并打印了,说明两个随机的队列均收到了logs交换器发来的消息,发送方略。

时间: 2024-09-30 19:05:46

发布订阅的相关文章

python之上下文管理、redis的发布订阅

使用with打开文件的方式,是调用了上下文管理的功能 1 #打开文件的两种方法: 2 3 f = open('a.txt','r') 4 5 with open('a.txt','r') as f 6 7 实现使用with关闭socket 8 import contextlib 9 import socket 10 11 @contextlib.contextmanage 12 def Sock(ip,port): 13 socket = socket.socket() 14 socket.bi

RedisRepository封装—Redis发布订阅以及StackExchange.Redis中的使用

本文版权归博客园和作者本人吴双共同所有,转载请注明本Redis系列分享地址.http://www.cnblogs.com/tdws/tag/NoSql/ Redis Pub/Sub模式 基本介绍 Redis发布订阅—Pub/Sub模式或者说是观察者模式.我想大家即使没有使用过,也已经耳熟能详了. 先简单举例说明下应用场景,在场景中我们可以分析到其优势在哪. 比如你的线上应用应用,你想设置一个日志报警系统,当应用出现异常的时候,立马发送通知给你,可能是短信的形式,也可能是邮件的形式.当然如果只将报

C# Redis系列(三)-Redis发布订阅及客户端编程

发布订阅模型 Redis中的发布订阅 客户端编程示例 0.3版本Hredis 发布订阅模型 在应用级其作用是为了减少依赖关系,通常也叫观察者模式.主要是把耦合点单独抽离出来作为第三方,隔离易变化的发送方和接收方. 发送方:只负责向第三方发送消息.(杂志社把读者杂志交给邮局) 接收方:被动接收消息.(1:向邮局订阅读者杂志,2:门口去接邮过来的杂志) 第三方作用是:存储订阅杂志的接收方,并在杂志过来时送给接收方. (邮局) C#示例,发送方把杂志放到邮局里面: if (QA.AddBug()) E

Kafka是分布式发布-订阅消息系统

https://www.biaodianfu.com/kafka.html Kafka是分布式发布-订阅消息系统.它最初由LinkedIn公司开发,之后成为Apache项目的一部分.Kafka是一个分布式的,可划分的,冗余备份的持久性的日志服务.它主要用于处理活跃的流式数据. 在大数据系统中,常常会碰到一个问题,整个大数据是由各个子系统组成,数据需要在各个子系统中高性能,低延迟的不停流转.传统的企业消息系统并不是非常适合大规模的数据处理.为了已在同时搞定在线应用(消息)和离线应用(数据文件,日志

RabbitMQ 发布/订阅

我们会做一些改变,就是把一个消息发给多个消费者,这种模式称之为发布/订阅(类似观察者模式). 为了验证这种模式,我们准备构建一个简单的日志系统.这个系统包含两类程序,一类程序发动日志,另一类程序接收和处理日志. 在我们的日志系统中,每一个运行的接收者程序都会收到日志.然后我们实现,一个接收者将接收到的数据写到硬盘上,与此同时,另一个接收者把接收到的消息展现在屏幕上.本质上来说,就是发布的日志消息会转发给所有的接收者. 1.转发器(Exchanges) RabbitMQ消息模型的核心理念是生产者永

Redis 发布订阅

Redis 发布订阅(pub/sub)是一种消息通信模式:发送者(pub)发送消息,订阅者(sub)接收消息. Redis 客户端可以订阅任意数量的频道. 下图展示了频道 channel1 , 以及订阅这个频道的三个客户端 —— client2 . client5 和 client1 之间的关系: 当有新消息通过 PUBLISH 命令发送给频道 channel1 时, 这个消息就会被发送给订阅它的三个客户端: 实例 以下实例演示了发布订阅是如何工作的.在我们实例中我们创建了订阅频道名为 redi

【译】RabbitMQ:发布-订阅(Publish/Subscribe)

在前一篇教程中,我们创建了一个工作队列,我们假设在工作队列后的每一个任务都只被调度给一个消费者.在这一部分,我们将做一些完全不一样的事情,调度同一条消息给多个消费者,也就是有名的“发布-订阅”模式.为了阐述这种模式,我们将构建一个简单的日志系统.该系统将由两部分组成:一部分发送日志消息,另一部分接收并且打印日志消息,在这个日志系统中,每一份运行着的接收程序都将会收到消息.这样我们可以运行一个接收者把日志写入到磁盘中,同时可以运行另一个接收者将日志打印到显示器上面.也就是说,发布的日志消息会被广播

sql server之发布订阅(读写分离)

分布式开发之发布与订阅 发布订阅:数据实时备份同步 软件环境:sql server2008 r2 硬件环境:视数据量和对应机器分配的任务而定 机器数量:视分割线标准而定(即数据分别存放的分割线) 作        用 : 数据库服务器出问题时我们也有其正常工作时的备份 一台服务器负载不起时,可以用来做负载均衡 数据库服务器可以无间断,无损失迁移 主服务器被攻击或当机时另一台服务同步机可以应急 意        义:咱们可以用于两台服务器,其中一台机器用作增删改,另外一台机器用作查询,为了防止读写

《Redis设计与实现》学习笔记-发布订阅与事务

发布与订阅 Redis通过发布订阅提供一对多甚至是多对多的节点消息通信,发布订阅由PUBLISH.SUBSCRIBE.PSUBSCRIBE.PUBSUB等命令组成. SUBSCRIBE命令:订阅某频道,在redisServer结构中通过pubsub_channels字典属性保存当前服务器所有频道的订阅关系,字典键时频道名称,字典值是一个链表,记录了所有订阅这个频道的客户端. UNSUBSCRIBE命令:退订频道,调用该命令之后,会把订阅关系从pubsub_channels中删掉,如果键对应的链表

发布-订阅模式

1.什么是发布订阅模式 发布订阅模式 又叫观察者模式,他是定义对象间的一种一对多的依赖关系,当一个对象的状态发生改变,所有依赖他的对象都将得到通知. 在javascript开发中,我们一般用事件模型来替代传统的发布-订阅模式. 2.Dom事件 实际上,只要我们曾经在dom节点上绑定过事件函数,那么我们就曾经使用过发布-订阅模式. document.getElementById('test').addEventListener('click',function(){ alert(2)},fasle