rabbitmq消息队列

rabbitmq消息队列

在之前的教程中,我们创建了一个简单的日志系统。我们能够向许多交换器转发日志消息。

在本教程中,我们将添加一个功能——我们让它仅仅接收我们感兴趣的日志类别。举例:我们 实现仅将严重级别的错误日志写入磁盘(为了节省磁盘空间),其余日志级别的日志直接打印到控制台。

绑定

之前的章节中我们已经创建过绑定,你可能还会记得:


1

2

3

4

5

6

err = ch.QueueBind(

  q.Name, // queue name

  "",     // routing key

  "logs", // exchange

  false,

  nil)

绑定是用来维系交换器和队列关系的,这可以被简单地理解为:队列仅仅对从交换器中传的消息感兴趣。

绑定有个额外参数叫做routing_key,为了避免与Channel.Publish方法中的参数相混淆,我们称之为binding key(绑定键)。使用绑定键创建绑定如下:


1

2

3

4

5

6

err = ch.QueueBind(

  q.Name,    // queue name

  "black",   // routing key

  "logs",    // exchange

  false,

  nil)

绑定键的含义取决于交换器的类型。我们之前使用的fanout类型的交换器,就会直接忽略这个参数。

Direct型交换器

我们之前的教程中的日志系统是广播所有的消息到所有消费者。我们希望以此拓展来实现根据消息严重性来过滤消息。比如我们希望 写日志到硬盘的代码仅仅接收严重级别的,不要浪费磁盘存储在warning或者info级别的日志。

之前使用的是fanout类型交换器,没有更好的拓展性或者说灵活性——它只能盲目的广播。

现在 使用direct型交换器替代。Direct型的路由算法 比较简单——消息会被派发到某个队列,该队列的绑定键恰好和消息的路由键一致。

为了阐述,考虑如下设置:

该设置中,可以看到direct型的交换器X被绑定到了两个队列:Q1、Q2。Q1使用绑定键orange绑定,Q2包含两个绑定键:black和green。

基于如上设置的话,使用路由键orange发布的消息会被路由到Q1队列,而使用black或者green路由键的消息均会被路由到Q2,所有其余消息将被丢弃。

备注:这里的交换器X和队列的绑定是多对多的关系,也就是说一个交换器可以到绑定多个队列,一个队列也可以被多个交换器绑定,消息只会被路由一次,不能因为两个绑定键都匹配上了路由键消息就会被路由两次,这种是不存在的。

多个绑定

用相同的绑定键去绑定多个队列是完全合法的,我们可以再添加一个black绑定键来绑定X和Q1,这样Q1和Q2都使用black绑定到了交换器X,这其实和fanout类型的交换器直接绑定到队列Q1、Q2功能相同:使用black路由键的消息会被直接路由到Q1和Q2。

发送日志

我们将使用该模型来构建日志系统。使用direct型的交换器替换fanout型的,我们将日志的严重级别作为路由键,这样的话接收端程序可以选择日志接收级别进行接收,首先聚焦下日志发送端:

首先创建一个交换器:


1

2

3

4

5

6

7

8

9

err = ch.ExchangeDeclare(

  "logs_direct", // name

  "direct",      // type

  true,          // durable

  false,         // auto-deleted

  false,         // internal

  false,         // no-wait

  nil,           // arguments

)

然后是发送消息:


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

err = ch.ExchangeDeclare(

  "logs_direct", // name

  "direct",      // type

  true,          // durable

  false,         // auto-deleted

  false,         // internal

  false,         // no-wait

  nil,           // arguments

)

failOnError(err, "Failed to declare an exchange")

body := bodyFrom(os.Args)

err = ch.Publish(

  "logs_direct",         // exchange

  severityFrom(os.Args), // routing key

  false, // mandatory

  false, // immediate

  amqp.Publishing{

    ContentType: "text/plain",

    Body:        []byte(body),

  })

为了简单起见,我们假设日志严重级别如下:‘info‘, ‘warning‘, ‘error‘。

订阅

接收还和之前章节接收一样,只有一个例外:我们将为每一个感兴趣的严重级别创建一个绑定:


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

q, err := ch.QueueDeclare(

  "",    // name

  false, // durable

  false, // delete when usused

  true// exclusive

  false, // no-wait

  nil,   // arguments

)

failOnError(err, "Failed to declare a queue")

if len(os.Args) < 2 {

  log.Printf("Usage: %s [info] [warning] [error]", os.Args[0])

  os.Exit(0)

}

for _, s := range os.Args[1:] {

  log.Printf("Binding queue %s to exchange %s with routing key %s",

     q.Name, "logs_direct", s)

  err = ch.QueueBind(

    q.Name,        // queue name

    s,             // routing key

    "logs_direct", // exchange

    false,

    nil)

  failOnError(err, "Failed to bind a queue")

}

糅合在一起

发送端:


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

// rabbitmq_4_emit_log_direct.go project main.go

package main

import (

    "fmt"

    "log"

    "os"

    "strings"

    "github.com/streadway/amqp"

)

func failOnError(err error, msg string) {

    if err != nil {

        log.Fatalf("%s: %s", msg, err)

        panic(fmt.Sprintf("%s: %s", msg, err))

    }

}

func main() {

    //链接队列服务

    conn, err := amqp.Dial("amqp://guest:[email protected]:5672/")

    failOnError(err, "Failed to connect to RabbitMQ")

    defer conn.Close()

    //声明一个channel

    ch, err := conn.Channel()

    failOnError(err, "Failed to open a channel")

    defer ch.Close()

    //声明一个direct类型交换器

    err = ch.ExchangeDeclare("logs_direct", "direct", true, false, false, false, nil)

    failOnError(err, "Failed to declare an exchange")

    body := bodyFrom(os.Args)

    ch.Publish("logs_direct", severityFrom(os.Args), false, false, amqp.Publishing{

        ContentType: "text/plain",

        Body:        []byte(body),

    })

    failOnError(err, "Failed to publish a message")

    log.Printf(" [x] Sent %s", body)

}

//接收消息发送内容

func bodyFrom(args []string) string {

    var s string

    if (len(args) < 3) || os.Args[2] == "" {

        s = "hello"

    } else {

        s = strings.Join(args[2:], " ")

    }

    return s

}

//接收日志级别,作为路由键使用

func severityFrom(args []string) string {

    var s string

    if len(args) < 2 || args[1] == "" {

        s = "info"

    } else {

        s = args[1]

    }

    return s

}

接收端:


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

// rabbitmq_4_receive_logs_direct.go project main.go

package main

import (

    "fmt"

    "log"

    "os"

    "github.com/streadway/amqp"

)

func failOnError(err error, msg string) {

    if err != nil {

        log.Fatalf("%s: %s", msg, err)

        panic(fmt.Sprintf("%s: %s", msg, err))

    }

}

func main() {

    //链接队列服务

    conn, err := amqp.Dial("amqp://guest:[email protected]:5672/")

    failOnError(err, "Failed to connect to RabbitMQ")

    defer conn.Close()

    //声明一个channel

    ch, err := conn.Channel()

    failOnError(err, "Failed to open a channel")

    defer ch.Close()

    //声明一个direct类型交换器

    err = ch.ExchangeDeclare("logs_direct", "direct", true, false, false, false, nil)

    failOnError(err, "Failed to declare an exchange")

    //声明一个队列

    q, err := ch.QueueDeclare("", false, false, true, false, nil)

    failOnError(err, "Failed to declare a queue")

    //判断cmd窗口接收参数是否足够

    if len(os.Args) < 2 {

        log.Printf("Usage:%s [info] [warning] [error]", os.Args[0])

        os.Exit(0)

    }

    //cmd窗口输入的多个日志级别,分别循环处理—进行绑定

    for _, s := range os.Args[1:] {

        log.Printf("Binding queue %s to exchange %s with routing key %s", q.Name, "logs_direct", s)

        ch.QueueBind(q.Name, s, "logs_direct", false, nil)

        failOnError(err, "Failed to bind a queue")

    }

    msgs, err := ch.Consume(q.Name, "", true, false, false, false, nil)

    failOnError(err, "Failed to register a consumer")

    forever := make(chan bool)

    go func() {

        for d := range msgs {

            log.Printf(" [x] %s", d.Body)

        }

    }()

    log.Printf(" [*] Waiting for logs. To exit press CTRL+C")

    <-forever

}

如果您只想保存“警告”和“错误”(而不是“信息”)日志消息到文件,只需要打开一个控制台然后输入:


1

go run receive_logs_direct.go warning error > logs_from_rabbit.log

如果你想看到所有的日志消息在你的屏幕上,打开一个新的终端,输入:


1

go run receive_logs_direct.go info warning error

发出一个错误日志消息类型如下:


1

go run emit_log_direct.go error "Run. Run. Or it will explode."

可以观察到:

消息可以进行分类接收了, 只有error级别的消息才会被存入log日志文件,而info、warning级别的都不存入。

实际效果如下:

分类: 消息队列

时间: 2024-10-09 14:16:08

rabbitmq消息队列的相关文章

(转)RabbitMQ消息队列(六):使用主题进行消息分发

在上篇文章RabbitMQ消息队列(五):Routing 消息路由 中,我们实现了一个简单的日志系统.Consumer可以监听不同severity的log.但是,这也是它之所以叫做简单日志系统的原因,因为是仅仅能够通过severity设定.不支持更多的标准. 比如syslog unix的日志工具,它可以通过severity (info/warn/crit...) 和模块(auth/cron/kern...).这可能更是我们想要的:我们可以仅仅需要cron模块的log. 为了实现类似的功能,我们需

RabbitMQ消息队列(六):使用主题进行消息分发

在上篇文章RabbitMQ消息队列(五):Routing 消息路由 中,我们实现了一个简单的日志系统.Consumer可以监听不同severity的log.但是,这也是它之所以叫做简单日志系统的原因,因为是仅仅能够通过severity设定.不支持更多的标准. 比如syslog unix的日志工具,它可以通过severity (info/warn/crit...) 和模块(auth/cron/kern...).这可能更是我们想要的:我们可以仅仅需要cron模块的log. 为了实现类似的功能,我们需

(转)RabbitMQ消息队列(九):Publisher的消息确认机制

在前面的文章中提到了queue和consumer之间的消息确认机制:通过设置ack.那么Publisher能不到知道他post的Message有没有到达queue,甚至更近一步,是否被某个Consumer处理呢?毕竟对于一些非常重要的数据,可能Publisher需要确认某个消息已经被正确处理. 在我们的系统中,我们没有是实现这种确认,也就是说,不管Message是否被Consume了,Publisher不会去care.他只是将自己的状态publish给上层,由上层的逻辑去处理.如果Message

(转)RabbitMQ消息队列(七):适用于云计算集群的远程调用(RPC)

在云计算环境中,很多时候需要用它其他机器的计算资源,我们有可能会在接收到Message进行处理时,会把一部分计算任务分配到其他节点来完成.那么,RabbitMQ如何使用RPC呢?在本篇文章中,我们将会通过其它节点求来斐波纳契完成示例. 1. 客户端接口 Client interface 为了展示一个RPC服务是如何使用的,我们将创建一段很简单的客户端class. 它将会向外提供名字为call的函数,这个call会发送RPC请求并且阻塞知道收到RPC运算的结果.代码如下: [python] vie

(转)(二)RabbitMQ消息队列-RabbitMQ消息队列架构与基本概念

http://blog.csdn.net/super_rd/article/details/70238869 没错我还是没有讲怎么安装和写一个HelloWord,不过快了,这一章我们先了解下RabbitMQ的基本概念. RabbitMQ架构 说是架构其实更像是应用场景下的架构(自己画的有点丑,勿嫌弃) 从图中可以看出RabbitMQ主要由Exchange和Queue两部分组成,然后通过RoutingKey关联起来,消息投递到Exchange然后通过Queue接收. RabbitMQ消息队列基本概

RabbitMQ消息队列(九):Publisher的消息确认机制

在前面的文章中提到了queue和consumer之间的消息确认机制:通过设置ack.那么Publisher能不到知道他post的Message有没有到达queue,甚至更近一步,是否被某个Consumer处理呢?毕竟对于一些非常重要的数据,可能Publisher需要确认某个消息已经被正确处理. 在我们的系统中,我们没有是实现这种确认,也就是说,不管Message是否被Consume了,Publisher不会去care.他只是将自己的状态publish给上层,由上层的逻辑去处理.如果Message

RabbitMQ消息队列1: Detailed Introduction 详细介绍

1. 历史 RabbitMQ是一个由erlang开发的AMQP(Advanced Message Queue )的开源实现.AMQP 的出现其实也是应了广大人民群众的需求,虽然在同步消息通讯的世界里有很多公开标准(如 COBAR的 IIOP ,或者是 SOAP 等),但是在异步消息处理中却不是这样,只有大企业有一些商业实现(如微软的 MSMQ ,IBM 的 Websphere MQ 等),因此,在 2006 年的 6 月,Cisco .Redhat.iMatix 等联合制定了 AMQP 的公开标

rabbitmq 消息队列

rabbitmq 消息队列: 解耦:降低一个程序降低耦合性 异步: 优点:--解决排队的问题. --解决资源浪费的问题.   --讲要处理的事物,进行存放,集中处理. 缺点:--不能保证任务被及时执行 应该场景:--去哪儿网 --12306 同步: 优点:--可以保证任务被及时执行 缺点:--排队问题,占用资源,造成资源浪费 大并发: web环境: --Nginx (epoll模式)   10000-20000 --Apache(epoll模式)1000-2000 pv = page visit

OpenstackMySQL和rabbitMQ消息队列

OpenstackMySQL和rabbitMQ消息队列 教程大纲 1. 安装基础数据存储服务MySQL数据库2. 安装rabbitMQ消息队列 1.安装数据库 yum install mariadb-server MySQL-python 所以在环境中要给MySQL做高可用,或备份. 修改MySQL的配置 cp /usr/share/mysql/my-medium.cnf /etc/my.cnf cp /usr/share/mariadb/my-medium.cnf /etc/my.conf 2

RabbitMQ消息队列应用

RabbitMQ消息队列应用 消息通信组件Net分布式系统的核心中间件之一,应用与系统高并发,各个组件之间解耦的依赖的场景.本框架采用消息队列中间件主要应用于两方面:一是解决部分高并发的业务处理:二是通过消息队列传输系统日志.目前业界使用较多的消息队列组件有RabbitMQ.ActiveMQ.MSMQ.kafka.zeroMQ等,本文对系统架构之MQ Component诠释,并采用RabbitMQ作为消息队列中间件. 图1- 消息队列组件示意图 一.RabbitMQ介绍 RabbitMQ是一款基