搬砖的陈大师版权所有,转载请注明:http://www.lenggirl.com/tool/RabbitMQ.html
手册:http://www.rabbitmq.com/getstarted.html
安装:http://www.rabbitmq.com/download.html
参考:http://blog.csdn.net/whycold/article/details/41119807
一.介绍
AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。
AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。
RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Go、Python、Ruby。用于在分布式系统中存储转发消息。
二.安装
ubuntu直接下载deb文件安装,默认已经启动,sudo敲入:
sudo rabbitmq-server start
sudo lsof -i:5672
启用插件,进入UI:
sudo rabbitmq-plugins enable rabbitmq_management
登录http://127.0.0.1:15672
用户名:密码=guest:guest
三.使用
# 敲入查看帮助
sudo rabbitmqctl
# 创建用户
sudo rabbitmqctl add_user 登录用户名 密码
# 可以创建管理员用户,负责整个MQ的运维
sudo rabbitmqctl set_user_tags 登录用户名 administrator
# 可以创建RabbitMQ监控用户,负责整个MQ的监控
sudo rabbitmqctl set_user_tags 登录用户名 monitoring
# 可以创建某个项目的专用用户,只能访问项目自己的virtual hosts
sudo rabbitmqctl set_user_tags 登录用户名 management
# 查看用户
sudo rabbitmqctl list_users
# 授权
# 该命令使用户具有/这个virtual host中所有资源的配置、写、读权限以便管理其中的资源
# set_permissions [-p <vhostpath>] <user> <conf> <write> <read>
# 其中,<conf> <write> <read>的位置分别用正则表达式来匹配特定的资源,如‘^(amq\.gen.*|amq\.default)$‘可以匹配server生成的和默认的exchange,‘^$‘不匹配任何资源
sudo rabbitmqctl set_permissions -p / 登录用户名 ‘.*‘ ‘.*‘ ‘.*‘
四.概念入门
1.ConnectionFactory、Connection、Channel
ConnectionFactory、Connection、Channel都是RabbitMQ对外提供的API中最基本的对象。
Connection是RabbitMQ的socket链接,它封装了socket协议相关部分逻辑。
ConnectionFactory为Connection的制造工厂。
Channel是我们与RabbitMQ打交道的最重要的一个接口,我们大部分的业务操作是在Channel这个接口中完成的,包括定义Queue、定义Exchange、绑定Queue与Exchange、发布消息等。
2.Queue
Queue(队列)是RabbitMQ的内部对象,用于存储消息,用下图表示。
RabbitMQ中的消息都只能存储在Queue中,生产者(下图中的P)生产消息并最终投递到Queue中,消费者(下图中的C)可以从Queue中获取消息并消费。
多个消费者可以订阅同一个Queue,这时Queue中的消息会被平均分摊给多个消费者进行处理,而不是每个消费者都收到所有的消息并处理。
3.消息的一些机制
3.1.消息确认Message acknowledgment
在实际应用中,可能会发生消费者收到Queue中的消息,但没有处理完成就宕机(或出现其他意外)的情况,这种情况下就可能会导致消息丢失。为了避免这种情况发生,我们可以要求消费者在消费完消息后发送一个回执给RabbitMQ,RabbitMQ收到消息回执(Message acknowledgment)后才将该消息从Queue中移除;如果RabbitMQ没有收到回执并检测到消费者的RabbitMQ连接断开,则RabbitMQ会将该消息发送给其他消费者(如果存在多个消费者)进行处理。这里不存在timeout概念,一个消费者处理消息时间再长也不会导致该消息被发送给其他消费者,除非它的RabbitMQ连接断开。
这里会产生另外一个问题,如果我们的开发人员在处理完业务逻辑后,忘记发送回执给RabbitMQ,这将会导致严重的bug——Queue中堆积的消息会越来越多;消费者重启后会重复消费这些消息并重复执行业务逻辑…
另外pub message是没有ack的。(??)
3.2.消息持久Message durability
如果我们希望即使在RabbitMQ服务重启的情况下,也不会丢失消息,我们可以将Queue与Message都设置为可持久化的(durable),这样可以保证绝大部分情况下我们的RabbitMQ消息不会丢失。但依然解决不了小概率丢失事件的发生(比如RabbitMQ服务器已经接收到生产者的消息,但还没来得及持久化该消息时RabbitMQ服务器就断电了),如果我们需要对这种小概率事件也要管理起来,那么我们要用到事务。由于这里仅为RabbitMQ的简单介绍,所以这里将不讲解RabbitMQ相关的事务。
3.3.提前取机制Prefetch count
前面我们讲到如果有多个消费者同时订阅同一个Queue中的消息,Queue中的消息会被平摊给多个消费者。这时如果每个消息的处理时间不同,就有可能会导致某些消费者一直在忙,而另外一些消费者很快就处理完手头工作并一直空闲的情况。我们可以通过设置prefetchCount来限制Queue每次发送给每个消费者的消息数,比如我们设置prefetchCount=1,则Queue每次给每个消费者发送一条消息;消费者处理完这条消息后Queue会再给该消费者发送一条消息。就是变慢而已。订阅模式如何平摊?这种模式是一个消费者一次性拿很多条消息?
4.Exchange
在上一节我们看到生产者将消息投递到Queue中,实际上这在RabbitMQ中这种事情永远都不会发生。实际的情况是,生产者将消息发送到Exchange(交换器,下图中的X),由Exchange将消息路由到一个或多个Queue中(或者丢弃)。
Exchange是按照什么逻辑将消息路由到Queue的?这个将在Binding一节介绍。
RabbitMQ中的Exchange有四种类型,不同的类型有着不同的路由策略,这将在Exchange Types一节介绍。
5.Routing key
生产者在将消息发送给Exchange的时候,一般会指定一个routing key,来指定这个消息的路由规则,而这个routing key需要与Exchange Type及binding key联合使用才能最终生效。
在Exchange Type与binding key固定的情况下(在正常使用时一般这些内容都是固定配置好的),我们的生产者就可以在发送消息给Exchange时,通过指定routing key来决定消息流向哪里。
RabbitMQ为routing key设定的长度限制为255 bytes。
6.Binding
RabbitMQ中通过Binding将Exchange与Queue关联起来,这样RabbitMQ就知道如何正确地将消息路由到指定的Queue了。
6.1.Binding key
在绑定(Binding)Exchange与Queue的同时,一般会指定一个binding key;消费者将消息发送给Exchange时,一般会指定一个routing key;当binding key与routing key相匹配时,消息将会被路由到对应的Queue中。这个将在Exchange Types章节会列举实际的例子加以说明。
在绑定多个Queue到同一个Exchange的时候,这些Binding允许使用相同的binding key。 binding key 并不是在所有情况下都生效,它依赖于Exchange Type,比如fanout类型(广播)的Exchange就会无视binding key,而是将消息路由到所有绑定到该Exchange的Queue。
7.Exchange Types
RabbitMQ常用的Exchange Type有fanout、direct、topic、headers这四种(AMQP规范里还提到两种Exchange Type,分别为system与自定义,这里不予以描述),下面分别进行介绍。
7.1.fanout
fanout类型的Exchange路由规则非常简单,它会把所有发送到该Exchange的消息路由到所有与它绑定的Queue中。
7.2.direct
direct类型的Exchange路由规则也很简单,它会把消息路由到那些binding key与routing key完全匹配的Queue中。
以上图的配置为例,我们以routingKey=”error”发送消息到Exchange,则消息会路由到Queue1(amqp.gen-S9b…,这是由RabbitMQ自动生成的Queue名称)和Queue2(amqp.gen-Agl…);如果我们以routingKey=”info”或routingKey=”warning”来发送消息,则消息只会路由到Queue2。如果我们以其他routingKey发送消息,则消息不会路由到这两个Queue中。
7.3.topic
前面讲到direct类型的Exchange路由规则是完全匹配binding key与routing key,但这种严格的匹配方式在很多情况下不能满足实际业务需求。topic类型的Exchange在匹配规则上进行了扩展,它与direct类型的Exchage相似,也是将消息路由到binding key与routing key相匹配的Queue中,但这里的匹配规则有些不同,它约定:
1. routing key为一个句点号“. ”分隔的字符串(我们将被句点号“. ”分隔开的每一段独立的字符串称为一个单词),如“stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit”
2. binding key与routing key一样也是句点号“. ”分隔的字符串
3. binding key中可以存在两种特殊字符“*”与“#”,用于做模糊匹配,其中“*”用于匹配一个单词,“#”用于匹配多个单词(可以是零个)
以上图中的配置为例,routingKey=”quick.orange.rabbit”的消息会同时路由到Q1与Q2,routingKey=”lazy.orange.fox”的消息会路由到Q1与Q2,routingKey=”lazy.brown.fox”的消息会路由到Q2,routingKey=”lazy.pink.rabbit”的消息会路由到Q2(只会投递给Q2一次,虽然这个routingKey与Q2的两个bindingKey都匹配);routingKey=”quick.brown.fox”、routingKey=”orange”、routingKey=”quick.orange.male.rabbit”的消息将会被丢弃,因为它们没有匹配任何bindingKey
7.4.headers
headers类型的Exchange不依赖于routing key与binding key的匹配规则来路由消息,而是根据发送的消息内容中的headers属性进行匹配。
在绑定Queue与Exchange时指定一组键值对;当消息发送到Exchange时,RabbitMQ会取到该消息的headers(也是一个键值对的形式),对比其中的键值对是否完全匹配Queue与Exchange绑定时指定的键值对;如果完全匹配则消息会路由到该Queue,否则不会路由到该Queue。
该类型的Exchange没有用到过(不过也应该很有用武之地),所以不做介绍。
8.RPC
MQ本身是基于异步的消息处理,前面的示例中所有的生产者(P)将消息发送到RabbitMQ后不会知道消费者(C)处理成功或者失败(甚至连有没有消费者来处理这条消息都不知道)。
但实际的应用场景中,我们很可能需要一些同步处理,需要同步等待服务端将我的消息处理完成后再进行下一步处理。这相当于RPC(Remote Procedure Call,远程过程调用)。在RabbitMQ中也支持RPC。
RabbitMQ中实现RPC的机制是:
- 客户端发送请求(消息)时,在消息的属性(MessageProperties,在AMQP协议中定义了14中properties,这些属性会随着消息一起发送)中设置两个值replyTo(一个Queue名称,用于告诉服务器处理完成后将通知我的消息发送到这个Queue中)和correlationId(此次请求的标识号,服务器处理完成后需要将此属性返还,客户端将根据这个id了解哪条请求被成功执行了或执行失败)
- 服务器端收到消息并处理
- 服务器端处理完消息后,将生成一条应答消息到replyTo指定的Queue,同时带上correlationId属性
- 客户端之前已订阅replyTo指定的Queue,从中收到服务器的应答消息后,根据其中的correlationId属性分析哪条请求被执行了,根据执行结果进行后续业务处理
五.Go 接口
http://www.rabbitmq.com/tutorials/tutorial-one-go.html
请看官方示例:
https://github.com/rabbitmq/rabbitmq-tutorials/blob/master/go/send.go
上面的例子仔细看,有必要看源码!
六.实例解释
四种模式
- DIRECT 默认点对点模式
- TOPIC 话题模式
- FANOUT 广播模式
- RPC RPC模式
工作队列:默认点对点模式
发布方,一个!
package main
import (
"fmt"
"log"
"os"
"strings"
"github.com/streadway/amqp"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
panic(fmt.Sprintf("%s: %s", msg, err))
}
}
func main() {
// 拨号,下面例子都一样
conn, err := amqp.Dial("amqp://guest:[email protected]:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
// 这个是最重要的
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
// 申明一个队列
// https://godoc.org/github.com/streadway/amqp#Channel.QueueDeclare
q, err := ch.QueueDeclare(
"task_queue", // name 有名字!
true, // durable 持久性的,如果事前已经声明了该队列,不能重复声明
false, // delete when unused
false, // exclusive 如果是真,连接一断开,队列删除
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
body := bodyFrom(os.Args)
// 发布
err = ch.Publish(
"", // exchange 默认模式,exchange为空
q.Name, // routing key 默认模式路由到同名队列,即是task_queue
false, // mandatory
false,
amqp.Publishing{
// 持久性的发布,因为队列被声明为持久的,发布消息必须加上这个(可能不用),但消息还是可能会丢,如消息到缓存但MQ挂了来不及持久化。
DeliveryMode: amqp.Persistent,
ContentType: "text/plain",
Body: []byte(body),
})
failOnError(err, "Failed to publish a message")
log.Printf(" [x] Sent %s", body)
}
func bodyFrom(args []string) string {
var s string
if (len(args) < 2) || os.Args[1] == "" {
s = "hello"
} else {
s = strings.Join(args[1:], " ")
}
return s
}
工作方,多个,拿发布方的消息
package main
import (
"bytes"
"fmt"
"github.com/streadway/amqp"
"log"
"time"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
panic(fmt.Sprintf("%s: %s", msg, err))
}
}
func main() {
conn, err := amqp.Dial("amqp://guest:[email protected]:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
// 指定队列!
q, err := ch.QueueDeclare(
"task_queue", // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
// Fair dispatch 预取,每个工作方每次拿一个消息,确认后才拿下一次,缓解压力
err = ch.Qos(
1, // prefetch count
// 待解释
0, // prefetch size
false, // global
)
failOnError(err, "Failed to set QoS")
// 消费根据队列名
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
false, // auto-ack 设置为真自动确认消息
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")
forever := make(chan bool)
go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
dot_count := bytes.Count(d.Body, []byte("."))
t := time.Duration(dot_count)
time.Sleep(t * time.Second)
log.Printf("Done")
// 确认消息被收到!!如果为真的,那么同在一个channel,在该消息之前未确认的消息都会确认,适合批量处理
// 真时场景:每十条消息确认一次,类似
d.Ack(false)
}
}()
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
<-forever
}
发布-订阅:广播模式
发布方
package main
import (
"fmt"
"log"
"os"
"strings"
"github.com/streadway/amqp"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
panic(fmt.Sprintf("%s: %s", msg, err))
}
}
func main() {
conn, err := amqp.Dial("amqp://guest:[email protected]:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
// 默认模式有默认交换机,广播自己定义一个交换机,交换机可与队列进行绑定
err = ch.ExchangeDeclare(
"logs", // name
"fanout", // type 广播模式
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare an exchange")
body := bodyFrom(os.Args)
// 发布
err = ch.Publish(
"logs", // exchange 消息发送到交换机,这个时候没队列绑定交换机,消息会丢弃
"", // routing key 广播模式不需要这个,它会把所有消息路由到绑定的所有队列
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
})
failOnError(err, "Failed to publish a message")
log.Printf(" [x] Sent %s", body)
}
func bodyFrom(args []string) string {
var s string
if (len(args) < 2) || os.Args[1] == "" {
s = "hello"
} else {
s = strings.Join(args[1:], " ")
}
return s
}
订阅方
package main
import (
"fmt"
"log"
"github.com/streadway/amqp"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
panic(fmt.Sprintf("%s: %s", msg, err))
}
}
func main() {
conn, err := amqp.Dial("amqp://guest:[email protected]:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
// 同样要申明交换机
err = ch.ExchangeDeclare(
"logs", // name
"fanout", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare an exchange")
// 新建队列,这个队列没名字,随机生成一个名字
q, err := ch.QueueDeclare(
"", // name
false, // durable
false, // delete when usused
true, // exclusive 表示连接一断开,这个队列自动删除
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
// 队列和交换机绑定,即是队列订阅了发到这个交换机的消息
err = ch.QueueBind(
q.Name, // queue name 队列的名字
"", // routing key 广播模式不需要这个
"logs", // exchange 交换机名字
false,
nil)
failOnError(err, "Failed to bind a queue")
// 开始消费消息,可开多个订阅方,因为队列是临时生成的,所有每个订阅方都能收到同样的消息
msgs, err := ch.Consume(
q.Name, // queue 队列名字
"", // consumer
true, // auto-ack 自动确认
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")
forever := make(chan bool)
go func() {
for d := range msgs {
log.Printf(" [x] %s", d.Body)
}
}()
log.Printf(" [*] Waiting for logs. To exit press CTRL+C")
<-forever
}
高级路由 发布-订阅 新版:默认点对点模式
发布-订阅每个绑定的队列都收到一样的消息,现在不想!使用路由功能,队列绑定进行分发。
发布方
package main
import (
"fmt"
"log"
"os"
"strings"
"github.com/streadway/amqp"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
panic(fmt.Sprintf("%s: %s", msg, err))
}
}
func main() {
conn, err := amqp.Dial("amqp://guest:[email protected]:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
// 交换机申明,且类型为点对点默认
err = ch.ExchangeDeclare(
"logs_direct", // name
"direct", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare an exchange")
body := bodyFrom(os.Args)
// 发布
err = ch.Publish(
"logs_direct", // exchange 发到这个交换机
severityFrom(os.Args), // routing key 且路由key是由命令行指定,如下方,指定了error
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
})
failOnError(err, "Failed to publish a message")
log.Printf(" [x] Sent %s", body)
}
func bodyFrom(args []string) string {
var s string
if (len(args) < 3) || os.Args[2] == "" {
s = "hello"
} else {
s = strings.Join(args[2:], " ")
}
return s
}
func severityFrom(args []string) string {
var s string
if (len(args) < 2) || os.Args[1] == "" {
s = "info"
} else {
s = os.Args[1]
}
return s
}
发消息到交换机,路由key为error
go run *.go error "Run. Run. Or it will explode."
消费方
package main
import (
"fmt"
"log"
"os"
"github.com/streadway/amqp"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
panic(fmt.Sprintf("%s: %s", msg, err))
}
}
func main() {
conn, err := amqp.Dial("amqp://guest:[email protected]:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
// 惯例
err = ch.ExchangeDeclare(
"logs_direct", // name
"direct", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare an exchange")
// 申明临时队列
q, err := ch.QueueDeclare(
"", // name
false, // durable
false, // delete when usused
true, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
if len(os.Args) < 2 {
log.Printf("Usage: %s [info] [warning] [error]", os.Args[0])
os.Exit(0)
}
# 绑定队列和交换机,绑定多个路由key,见下方
for _, s := range os.Args[1:] {
log.Printf("Binding queue %s to exchange %s with routing key %s",
q.Name, "logs_direct", s)
// 下面同个队列可以收到不同路由key的消息 ,广播模式除外!
err = ch.QueueBind(
q.Name, // queue name
s, // routing key
"logs_direct", // exchange
false,
nil)
failOnError(err, "Failed to bind a queue")
}
// 消费队列
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto ack
false, // exclusive
false, // no local
false, // no wait
nil, // args
)
failOnError(err, "Failed to register a consumer")
forever := make(chan bool)
go func() {
for d := range msgs {
log.Printf(" [x] %s", d.Body)
}
}()
log.Printf(" [*] Waiting for logs. To exit press CTRL+C")
<forever
}
消费这些key:info warning error
go run *.go info warning error
话题 发布-订阅 新新版:话题模式
上面的路由都是标准的,就是固定字符串名字,话题模式可以使用类正则的路由,这样模糊匹配更棒!!
路由类似于这样 *.love.*
* (star) can substitute for exactly one word.
# (hash) can substitute for zero or more words.
发布方
package main
import (
"fmt"
"log"
"os"
"strings"
"github.com/streadway/amqp"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
panic(fmt.Sprintf("%s: %s", msg, err))
}
}
func main() {
conn, err := amqp.Dial("amqp://guest:[email protected]:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
// 交换机,话题模式
err = ch.ExchangeDeclare(
"logs_topic", // name
"topic", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare an exchange")
body := bodyFrom(os.Args)
// 类似上面
err = ch.Publish(
"logs_topic", // exchange
severityFrom(os.Args), // routing key 路由可以不标准了
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
})
failOnError(err, "Failed to publish a message")
log.Printf(" [x] Sent %s", body)
}
func bodyFrom(args []string) string {
var s string
if (len(args) < 3) || os.Args[2] == "" {
s = "hello"
} else {
s = strings.Join(args[2:], " ")
}
return s
}
func severityFrom(args []string) string {
var s string
if (len(args) < 2) || os.Args[1] == "" {
s = "anonymous.info"
} else {
s = os.Args[1]
}
return s
}
命令运行
To receive all the logs:
go run *.go "#"
To receive all logs from the facility “kern”:
go run *.go "kern.*"
Or if you want to hear only about “critical” logs:
go run *.go "*.critical"
You can create multiple bindings:
go run *.go "kern.*" "*.critical"
消费方
package main
import (
"fmt"
"log"
"os"
"github.com/streadway/amqp"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
panic(fmt.Sprintf("%s: %s", msg, err))
}
}
func main() {
conn, err := amqp.Dial("amqp://guest:[email protected]:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
// 同理
err = ch.ExchangeDeclare(
"logs_topic", // name
"topic", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare an exchange")
// 临时队列
q, err := ch.QueueDeclare(
"", // name
false, // durable
false, // delete when usused
true, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
if len(os.Args) < 2 {
log.Printf("Usage: %s [binding_key]...", os.Args[0])
os.Exit(0)
}
for _, s := range os.Args[1:] {
log.Printf("Binding queue %s to exchange %s with routing key %s",
q.Name, "logs_topic", s)
// 绑定也是类似的
err = ch.QueueBind(
q.Name, // queue name
s, // routing key
"logs_topic", // exchange
false,
nil)
failOnError(err, "Failed to bind a queue")
}
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto ack
false, // exclusive
false, // no local
false, // no wait
nil, // args
)
failOnError(err, "Failed to register a consumer")
forever := make(chan bool)
go func() {
for d := range msgs {
log.Printf(" [x] %s", d.Body)
}
}()
log.Printf(" [*] Waiting for logs. To exit press CTRL+C")
<forever
}
命令运行
go run *.go "kern.critical" "A critical kernel error"
rpc:RPC模式
应答方
package main
import (
"fmt"
"log"
"strconv"
"github.com/streadway/amqp"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
panic(fmt.Sprintf("%s: %s", msg, err))
}
}
func fib(n int) int {
if n == 0 {
return 0
} else if n == 1 {
return 1
} else {
return fib(n-1) + fib(n-2)
}
}
func main() {
// 拨号
conn, err := amqp.Dial("amqp://guest:[email protected]:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
// channel
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
// 声明匿名队列
q, err := ch.QueueDeclare(
"rpc_queue", // name
false, // durable
false, // delete when usused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
// 公平分发 没有这个则round-robbin:https://segmentfault.com/a/1190000004492447
err = ch.Qos(
1, // prefetch count
0, // prefetch size
false, // global
)
failOnError(err, "Failed to set QoS")
// 消费,等待请求
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")
forever := make(chan bool)
go func() {
//请求来了
for d := range msgs {
n, err := strconv.Atoi(string(d.Body))
failOnError(err, "Failed to convert body to integer")
log.Printf(" [.] fib(%d)", n)
// 计算
response := fib(n)
// 回答
err = ch.Publish(
"", // exchange
d.ReplyTo, // routing key 回答队列
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
CorrelationId: d.CorrelationId, 序列号
Body: []byte(strconv.Itoa(response)),
})
failOnError(err, "Failed to publish a message")
// 确认回答完毕
d.Ack(false)
}
}()
log.Printf(" [*] Awaiting RPC requests")
<forever
}
请教方
package main
import (
"fmt"
"log"
"math/rand"
"os"
"strconv"
"strings"
"time"
"github.com/streadway/amqp"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
panic(fmt.Sprintf("%s: %s", msg, err))
}
}
func randomString(l int) string {
bytes := make([]byte, l)
for i := 0; i < l; i++ {
bytes[i] = byte(randInt(65, 90))
}
return string(bytes)
}
func randInt(min int, max int) int {
return min + rand.Intn(max-min)
}
func fibonacciRPC(n int) (res int, err error) {
// 拨号
conn, err := amqp.Dial("amqp://guest:[email protected]:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
// 队列声明
q, err := ch.QueueDeclare(
"", // name
false, // durable
false, // delete when usused
true, // exclusive 为真即连接断开就删除
false, // noWait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
// 消费
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive 这个为真,服务器会认为这是该队列唯一的消费者
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")
corrId := randomString(32)
// 请教!
err = ch.Publish(
"", // exchange
"rpc_queue", // routing key 问题发到这里
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
CorrelationId: corrId,
ReplyTo: q.Name, 希望回答被发到这里
Body: []byte(strconv.Itoa(n)),
})
failOnError(err, "Failed to publish a message")
// 取答案
for d := range msgs {
if corrId == d.CorrelationId {
res, err = strconv.Atoi(string(d.Body))
failOnError(err, "Failed to convert body to integer")
break
}
}
return
}
func main() {
rand.Seed(time.Now().UTC().UnixNano())
n := bodyFrom(os.Args)
log.Printf(" [x] Requesting fib(%d)", n)
res, err := fibonacciRPC(n)
failOnError(err, "Failed to handle RPC request")
log.Printf(" [.] Got %d", res)
}
func bodyFrom(args []string) int {
var s string
if (len(args) < 2) || os.Args[1] == "" {
s = "30"
} else {
s = strings.Join(args[1:], " ")
}
n, err := strconv.Atoi(s)
failOnError(err, "Failed to convert arg to integer")
return n
}
http://www.rabbitmq.com/tutorials/tutorial-six-go.html
七.属性详解,测试!
package main
import (
"fmt"
"github.com/streadway/amqp"
"log"
"time"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
panic(fmt.Sprintf("%s: %s", msg, err))
}
}
// Queue属性测试
//
// durable属性和auto-delete属性可以同时生效;
// durable属性和exclusive属性会有性质上的冲突,两者同时设置时,仅exclusive属性生效;
// auto_delete属性和exclusive属性可以同时生效;
//
// auto_delete如果有连接存在消费者订阅该http://www.lenggirl.com/tool/RabbitMQ.htmlqueue,正常,如果消费者全部消失,自动删除队列
// 可以在没有创建consumer的情况下,创建出具有auto-delete属性的queue。
//
// exclusive,如果声明该队列的连接断开,自动删除队列
// queue的存在条件是在声明该队列的连接上存在某个consumer订阅了该queue。
//
func main() {
conn, err := amqp.Dial("amqp://guest:[email protected]:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
// 默认模式有默认交换机,广播自己定义一个交换机,交换机可与队列进行绑定
/*
ExchangeDeclare declares an exchange on the server. If the exchange does not
already exist, the server will create it. If the exchange exists, the server
verifies that it is of the provided type, durability and auto-delete flags.
ExchangeDeclare方法在服务器声明一个exchange。如果不存在,新建一个,存在的话则确认type和durability和auto-delete的标志是否一致。
Errors returned from this method will close the channel.
如果方法返回错误,channel会被关闭。
Exchange names starting with "amq." are reserved for pre-declared and
standardized exchanges. The client MAY declare an exchange starting with
"amq." if the passive option is set, or the exchange already exists. Names can
consists of a non-empty sequence of letters, digits, hyphen, underscore,
period, or colon.
名字以"amq."开头的Exchange是为之前已经声明和标准化的exchange们保留的,
在exchange已经存在的情况下,或者passive选项设置为真,客户端才有可能声明一个这样的exchange。
exchange的名字是一个非空序列,仅能包含字母,数字,连字符-,下划线_,句号.,冒号:
另外的方法ExchangeDeclarePassive主要用来检测exchange是否已经存在。
Each exchange belongs to one of a set of exchange kinds/types implemented by
the server. The exchange types define the functionality of the exchange - i.e.
how messages are routed through it. Once an exchange is declared, its type
cannot be changed. The common types are "direct", "fanout", "topic" and
"headers".
Durable and Non-Auto-Deleted exchanges will survive server restarts and remain
declared when there are no remaining bindings. This is the best lifetime for
long-lived exchange configurations like stable routes and default exchanges.
Non-Durable and Auto-Deleted exchanges will be deleted when there are no
remaining bindings and not restored on server restart. This lifetime is
useful for temporary topologies that should not pollute the virtual host on
failure or after the consumers have completed.
Non-Durable and Non-Auto-deleted exchanges will remain as long as the server is
running including when there are no remaining bindings. This is useful for
temporary topologies that may have long delays between bindings.
Durable and Auto-Deleted exchanges will survive server restarts and will be
removed before and after server restarts when there are no remaining bindings.
These exchanges are useful for robust temporary topologies or when you require
binding durable queues to auto-deleted exchanges.
Note: RabbitMQ declares the default exchange types like ‘amq.fanout‘ as
durable, so queues that bind to these pre-declared exchanges must also be
durable.
Exchanges declared as `internal` do not accept accept publishings. Internal
exchanges are useful for when you wish to implement inter-exchange topologies
that should not be exposed to users of the broker.
When noWait is true, declare without waiting for a confirmation from the server.
The channel may be closed as a result of an error. Add a NotifyClose listener
to respond to any exceptions.
Optional amqp.Table of arguments that are specific to the server‘s implementation of
the exchange can be sent for exchange types that require extra parameters.
func (me *Channel) ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args Table) error {
*/
err = ch.ExchangeDeclare("logs", "fanout", true, false, false, false, nil)
failOnError(err, "Failed to declare an exchange")
// test-1 无属性,机器重启删除
/*
QueueDeclare declares a queue to hold messages and deliver to consumers.
Declaring creates a queue if it doesn‘t already exist, or ensures that an
existing queue matches the same parameters.
Every queue declared gets a default binding to the empty exchange "" which has
the type "direct" with the routing key matching the queue‘s name. With this
default binding, it is possible to publish messages that route directly to
this queue by publishing to "" with the routing key of the queue name.
QueueDeclare("alerts", true, false, false, false, nil)
Publish("", "alerts", false, false, Publishing{Body: []byte("...")})
Delivery Exchange Key Queue
-----------------------------------------------
key: alerts -> "" -> alerts -> alerts
The queue name may be empty, in which the server will generate a unique name
which will be returned in the Name field of Queue struct.
Durable and Non-Auto-Deleted queues will survive server restarts and remain
when there are no remaining consumers or bindings. Persistent publishings will
be restored in this queue on server restart. These queues are only able to be
bound to durable exchanges.
Non-Durable and Auto-Deleted queues will not be redeclared on server restart
and will be deleted by the server after a short time when the last consumer is
canceled or the last consumer‘s channel is closed. Queues with this lifetime
can also be deleted normally with QueueDelete. These durable queues can only
be bound to non-durable exchanges.
Non-Durable and Non-Auto-Deleted queues will remain declared as long as the
server is running regardless of how many consumers. This lifetime is useful
for temporary topologies that may have long delays between consumer activity.
These queues can only be bound to non-durable exchanges.
Durable and Auto-Deleted queues will be restored on server restart, but without
active consumers, will not survive and be removed. This Lifetime is unlikely
to be useful.
Exclusive queues are only accessible by the connection that declares them and
will be deleted when the connection closes. Channels on other connections
will receive an error when attempting declare, bind, consume, purge or delete a
queue with the same name.
When noWait is true, the queue will assume to be declared on the server. A
channel exception will arrive if the conditions are met for existing queues
or attempting to modify an existing queue from a different connection.
When the error return value is not nil, you can assume the queue could not be
declared with these parameters and the channel will be closed.
func (me *Channel) QueueDeclare(name string, durable, autoDelete, exclusive, noWait bool, args Table) (Queue, error) {
*/
ch.QueueDeclare("test-1", false, false, false, false, nil)
/*
QueueBind binds an exchange to a queue so that publishings to the exchange will
be routed to the queue when the publishing routing key matches the binding
routing key.
QueueBind("pagers", "alert", "log", false, nil)
QueueBind("emails", "info", "log", false, nil)
Delivery Exchange Key Queue
-----------------------------------------------
key: alert --> log ----> alert --> pagers
key: info ---> log ----> info ---> emails
key: debug --> log (none) (dropped)
If a binding with the same key and arguments already exists between the
exchange and queue, the attempt to rebind will be ignored and the existing
binding will be retained.
In the case that multiple bindings may cause the message to be routed to the
same queue, the server will only route the publishing once. This is possible
with topic exchanges.
QueueBind("pagers", "alert", "amq.topic", false, nil)
QueueBind("emails", "info", "amq.topic", false, nil)
QueueBind("emails", "#", "amq.topic", false, nil) // match everything
Delivery Exchange Key Queue
-----------------------------------------------
key: alert --> amq.topic ----> alert --> pagers
key: info ---> amq.topic ----> # ------> emails
\---> info ---/
key: debug --> amq.topic ----> # ------> emails
It is only possible to bind a durable queue to a durable exchange regardless of
whether the queue or exchange is auto-deleted. Bindings between durable queues
and exchanges will also be restored on server restart.
If the binding could not complete, an error will be returned and the channel
will be closed.
When noWait is true and the queue could not be bound, the channel will be
closed with an error.
func (me *Channel) QueueBind(name, key, exchange string, noWait bool, args Table) error {
*/
ch.QueueBind("test-1", "test-1", "logs", false, nil)
// test-2 D,不会删除
ch.QueueDeclare("test-2", true, false, false, false, nil)
ch.QueueBind("test-2", "test-2", "logs", false, nil)
// test-3 AD,机器重启删除,消费者连接一断开,队列自动删除
ch.QueueDeclare("test-3", false, true, false, false, nil)
ch.QueueBind("test-3", "test-3", "logs", false, nil)
// test-4 D,AD,消费者连接一断开,队列自动删除
ch.QueueDeclare("test-4", true, true, false, false, nil)
ch.QueueBind("test-4", "test-4", "logs", false, nil)
// test-5 无属性,本连接一断开就删除了,只有exclusive属性有效
// 在其他连接消费报错:Exception (405) Reason: "RESOURCE_LOCKED - cannot obtain exclusive access to locked queue ‘test-7‘ in vhost ‘/‘"
// 只能在当前连接进行消费
ch.QueueDeclare("test-5", true, false, true, false, nil)
ch.QueueBind("test-5","test-5","logs",false,nil)// test-6 D,AD,消费者连接一断开,队列自动删除ch.QueueDeclare("test-6",true,true,false,false,nil)ch.QueueBind("test-6","test-6","logs",false,nil)// test-7 AD,本连接一断开就删除了,只有autoDelete属性和exclusive属性有效// 在其他连接消费报错:Exception (405) Reason: "RESOURCE_LOCKED - cannot obtain exclusive access to locked queue ‘test-7‘ in vhost ‘/‘"// 只能在当前连接进行消费ch.QueueDeclare("test-7",true,true,true,false,nil)ch.QueueBind("test-7","test-7","logs",false,nil)// 发布/*
Publish sends a Publishing from the client to an exchange on the server.
When you want a single message to be delivered to a single queue, you can
publish to the default exchange with the routingKey of the queue name. This is
because every declared queue gets an implicit route to the default exchange.
Since publishings are asynchronous, any undeliverable message will get returned
by the server. Add a listener with Channel.NotifyReturn to handle any
undeliverable message when calling publish with either the mandatory or
immediate parameters as true.
Publishings can be undeliverable when the mandatory flag is true and no queue is
bound that matches the routing key, or when the immediate flag is true and no
consumer on the matched queue is ready to accept the delivery.
This can return an error when the channel, connection or socket is closed. The
error or lack of an error does not indicate whether the server has received this
publishing.
It is possible for publishing to not reach the broker if the underlying socket
is shutdown without pending publishing packets being flushed from the kernel
buffers. The easy way of making it probable that all publishings reach the
server is to always call Connection.Close before terminating your publishing
application. The way to ensure that all publishings reach the server is to add
a listener to Channel.NotifyPublish and put the channel in confirm mode with
Channel.Confirm. Publishing delivery tags and their corresponding
confirmations start at 1. Exit when all publishings are confirmed.
When Publish does not return an error and the channel is in confirm mode, the
internal counter for DeliveryTags with the first confirmation starting at 1.
func (me *Channel) Publish(exchange, key string, mandatory, immediate bool, msg Publishing) error {
*/err=ch.Publish("logs","",false,false,amqp.Publishing{ContentType:"text/plain",Body:[]byte("hi"),// 下面这条没加,机器重启消息清零DeliveryMode:amqp.Persistent,})failOnError(err,"Failed to publish a message")// exclusive只能在这个连接消费/*
Consume immediately starts delivering queued messages.
Begin receiving on the returned chan Delivery before any other operation on the
Connection or Channel.
Continues deliveries to the returned chan Delivery until Channel.Cancel,
Connection.Close, Channel.Close, or an AMQP exception occurs. Consumers must
range over the chan to ensure all deliveries are received. Unreceived
deliveries will block all methods on the same connection.
All deliveries in AMQP must be acknowledged. It is expected of the consumer to
call Delivery.Ack after it has successfully processed the delivery. If the
consumer is cancelled or the channel or connection is closed any unacknowledged
deliveries will be requeued at the end of the same queue.
/*
Publish sends a Publishing from the client to an exchange on the server.
When you want a single message to be delivered to a single queue, you can
publish to the default exchange with the routingKey of the queue name. This is
because every declared queue gets an implicit route to the default exchange.
Since publishings are asynchronous, any undeliverable message will get returned
by the server. Add a listener with Channel.NotifyReturn to handle any
undeliverable message when calling publish with either the mandatory or
immediate parameters as true.
Publishings can be undeliverable when the mandatory flag is true and no queue is
bound that matches the routing key, or when the immediate flag is true and no
consumer on the matched queue is ready to accept the delivery.
This can return an error when the channel, connection or socket is closed. The
error or lack of an error does not indicate whether the server has received this
publishing.
It is possible for publishing to not reach the broker if the underlying socket
is shutdown without pending publishing packets being flushed from the kernel
buffers. The easy way of making it probable that all publishings reach the
server is to always call Connection.Close before terminating your publishing
application. The way to ensure that all publishings reach the server is to add
a listener to Channel.NotifyPublish and put the channel in confirm mode with
Channel.Confirm. Publishing delivery tags and their corresponding
confirmations start at 1. Exit when all publishings are confirmed.
When Publish does not return an error and the channel is in confirm mode, the
internal counter for DeliveryTags with the first confirmation starting at 1.
func (me *Channel) Publish(exchange, key string, mandatory, immediate bool, msg Publishing) error {
*/Theconsumerisidentifiedbyastringthatisuniqueandscopedforallconsumersonthischannel.Ifyouwishtoeventuallycanceltheconsumer,usethesamenon-emptyidenfitierinChannel.Cancel.Anemptystringwillcausethelibrarytogenerateauniqueidentity.TheconsumeridentitywillbeincludedineveryDeliveryintheConsumerTagfieldWhenautoAck(alsoknownasnoAck)istrue,theserverwillacknowledgedeliveriestothisconsumerpriortowritingthedeliverytothenetwork.WhenautoAckistrue,theconsumershouldnotcallDelivery.Ack.Automaticallyacknowledgingdeliveriesmeansthatsomedeliveriesmaygetlostiftheconsumerisunabletoprocessthemaftertheserverdeliversthem.Whenexclusiveistrue,theserverwillensurethatthisisthesoleconsumerfromthisqueue.Whenexclusiveisfalse,theserverwillfairlydistributedeliveriesacrossmultipleconsumers.WhennoLocalistrue,theserverwillnotdeliverpublishingsentfromthesameconnectiontothisconsumer.It‘sadvisabletouseseparateconnectionsforChannel.PublishandChannel.ConsumesonottohaveTCPpushbackonpublishingaffecttheabilitytoconsumemessages,sothisparameterisheremostlyforcompleteness.WhennoWaitistrue,donotwaitfortheservertoconfirmtherequestandimmediatelybegindeliveries.Ifitisnotpossibletoconsume,achannelexceptionwillberaisedandthechannelwillbeclosed.Optionalargumentscanbeprovidedthathavespecificsemanticsforthequeueorserver.Whenthechannelorconnectioncloses,alldeliverychanswillalsoclose.Deliveriesonthereturnedchanwillbebufferedindefinitely.Tolimitmemoryofthisbuffer,usetheChannel.Qosmethodtolimittheamountofunacknowledged/buffereddeliveriestheserverwilldeliveronthisChannel.func(me*Channel)Consume(queue,consumerstring,autoAck,exclusive,noLocal,noWaitbool,argsTable)(<-chanDelivery,error){*/c,e:=ch.Consume("test-5","test-5-c",false,false,false,false,nil)ife!=nil{fmt.Println(e.Error())}else{a:=<-cfmt.Println("test-5:"+string(a.Body))//a.Ack(false)}time.Sleep(1000*time.Second)//ch.Consume("test-1", "test-1-c", false, false, false, false, nil)}
package main
import (
"github.com/streadway/amqp"
)
// Queue属性测试
func main() {
conn, _ := amqp.Dial("amqp://guest:[email protected]:5672/")
defer conn.Close()
ch, _ := conn.Channel()
defer ch.Close()
ch.QueueDelete("test-1", false, false, false)
ch.QueueDelete("test-2", false, false, false)
ch.QueueDelete("test-3", false, false, false)
ch.QueueDelete("test-4", false, false, false)
ch.QueueDelete("test-5", false, false, false)
ch.QueueDelete("test-6", false, false, false)
ch.QueueDelete("test-7", false, false, false)
}
package main
import (
"fmt"
"github.com/streadway/amqp"
"time"
)
// Queue属性测试
func main() {
conn, _ := amqp.Dial("amqp://guest:[email protected]:5672/")
defer conn.Close()
ch, _ := conn.Channel()
defer ch.Close()
c, e := ch.Consume("test-3", "test-2-c", false, false, false, false, nil)
if e != nil {
fmt.Println(e.Error())
} else {
a := <-c
fmt.Println("test-1:" + string(a.Body))
//a.Ack(false)
}
time.Sleep(time.Second * 1000)
}
搬砖的陈大师版权所有,转载请注明:http://www.lenggirl.com/tool/RabbitMQ.html