Golang调用Rabbitmq消息队列和封装

前言

介绍Rabbimq

Rabbitmq消息队列是干嘛的?

简单的说,消息队列,引申一下就是传递消息用的队列,也可以称为传递消息的通信方法。用争抢订单的快车举个例子,假如,A用户发送了一个用车的消息,那么消息队列要做的就是把A用户用车的这个消息广而告之,发送到一个公用队列当中,司机只管取到消息,而不管是谁发布的,这就是一个简单的消息队列例子,Rabbitmq其实就是消息队列的一种,用的比较多的还可能有Redis,kafka,ActiceMq等等,这个后面的博文里面我会说,这次我们只说Rabbimq消息队列

Rabbitmq消息队列的好处是什么?为什么我们要用他?

这个网上有很多类似的玩意,我不说太多,就只说我在使用中感觉比较好的地方。

  1. 分布式,多节点部署。一个集群,保证消息的持久化和高可用,某节点挂了,其他节点可以结力。
  2. 路由Exchange,这个已经提供了内部的几种实现方法,可以指定路由,也就是指定传递的地址。
  3. 多语言支持,我以前干活儿用Python,现在用Go和java,人家无缝对接,多牛逼!
  4. Ack的消息确认机制,这样就保证了,任务下发时候的稳定性,ack消息确认可以手动,也可以自动,这样就保证了任务下发时候的可控和监控。

初步开始

简单的生产者和消费者的模型

讲那么多废话理论,还不如直接开始写代码更直观是吧,所以,奥莉给,干了兄弟们!我们实现一个简答的生产者,消费者模型。这个不用我多解释吧,基础的流程就是,我们定义一个生产者,生产信息到Rabbitmq中,然后再定义一个消费者,把数据从Rabbitmq中取出来,就这么简单,下面咱们就干了,先讲几个基础。

Rabbitmq的基础知识

发送 Publish

发送,你可以理解为上传,意思就是,上传一个消息到Rabbitmq当中。它这块的基础代码比较简单

package main

import (
  "log"
  "github.com/streadway/amqp"
)

func main() {
    //初始化一个Rabbimtq连接,后跟ip,user,password
    conn, err := amqp.Dial("amqp://guest:[email protected]:5672/")
    if err != nil {
        return
    }
    defer conn.Close()
    //创建一个channel的套接字连接
    ch, _ := conn.Channel()
    //创建一个指定的队列
    q, _ := ch.QueueDeclare(
        "work", // 队列名
        false,   // durable
        false,   // 不使用删除?
        false,   // exclusive
        false,   // 不必等待
        nil,     // arguments
    )
    //定义上传的消息
    body := "work message"
    //调用Publish上传消息1到指定的work队列当中
    err = ch.Publish(
        "",     // exchange
        "work", // 队列名
        false,  // mandatory
        false,  // immediate
        amqp.Publishing {
                ContentType: "text/plain",
                //[]byte化body
                Body:        []byte(body),
        })
}

这样就完成了上传消息到work队列当中。

接收 Consume

接收,顾名思义,就是接收到指定队列中的信息,信息存在队列当中,总要被拿出来用吧,放那里又不能下崽儿,所以,拿出来感觉用了才是最重要的。这块的基础代码如下

package main

import (
  "log"
  "github.com/streadway/amqp"
)

func main() {
    //初始化一个Rabbimtq连接,后跟ip,user,password
    conn, err := amqp.Dial("amqp://guest:[email protected]:5672/")
    if err != nil {
        return
    }
    defer conn.Close()
    //创建一个channel的套接字连接
    ch, _ := conn.Channel()

    msgs, err := ch.Consume(
        "work" // 队列名
        "",     // consumer
        true,   // auto-ack
        false,  // exclusive
        false,  // no-local
        false,  // 不等待
        nil,    // args
    )
    //定义一个forever,让他驻留在后台,等待消息,来了就消费
    forever := make(chan bool)

    //执行一个go func 完成任务消费
    go func() {
        for d := range msgs {
            //打印body
            log.Printf("message %s", d.Body)
        }
    }()
    <-forever
}

生产者/消费者模型

上面简单说了一下rabbimq的发送和接收,这下咱们就要实现一个生产者消费者模型了,这个模型的主要逻辑,就是生产者发送任务到指定的队列,有一个,或者多个消费者,会在此留守,一有任务,就争抢并且消费。

生产者逻辑

其实生产者逻辑和上面的发送逻辑差不多,这里给出写法。

package main

import (
  "log"
  "github.com/streadway/amqp"
)

func main() {
    //初始化一个Rabbimtq连接,后跟ip,user,password
    conn, err := amqp.Dial("amqp://guest:[email protected]:5672/")
    if err != nil {
        return
    }
    defer conn.Close()
    //创建一个channel的套接字连接
    ch, _ := conn.Channel()
    //创建一个指定的队列
    q, _ := ch.QueueDeclare(
        "work", // 队列名
        false,   // durable
        false,   // 不使用删除?
        false,   // exclusive
        false,   // 不必等待
        nil,     // arguments
    )
    //定义上传的消息
    body := "work message"
    //调用Publish上传消息1到指定的work队列当中
    err = ch.Publish(
        "",     // exchange
        "work", // 队列名
        false,  // mandatory
        false,  // immediate
        amqp.Publishing {
                ContentType: "text/plain",
                //[]byte化body
                Body:        []byte(body),
        })
}

消费者逻辑

消费者逻辑这边,主要是加了一个qos控制和手动ack,代码如下

package main

import (
  "log"
  "github.com/streadway/amqp"
)

func main() {
    //初始化一个Rabbimtq连接,后跟ip,user,password
    conn, err := amqp.Dial("amqp://guest:[email protected]:5672/")
    if err != nil {
        return
    }
    defer conn.Close()
    //创建一个channel的套接字连接
    ch, _ := conn.Channel()

    //创建一个qos控制
    err = ch.Qos(
            3,     // 同时最大消费数量(意思就是最多能消费几个任务)
            0,     // prefetch size
            false, // 全局设定?
        )
    if err != nil {
            return err
            }
    msgs, err := ch.Consume(
        "work" // 队列名
        "",     // consumer
        true,   // auto-ack
        false,  // exclusive
        false,  // no-local
        false,  // 不等待
        nil,    // args
    )
    //定义一个forever,让他驻留在后台,等待消息,来了就消费
    forever := make(chan bool)

    //执行一个go func 完成任务消费
    go func() {
        for d := range msgs {
            //打印body
            log.Printf("message %s", string(d.Body))
            //手动ack,不管是否发送完毕。
            d.Ack(false)
        }
    }()
    <-forever
}

Golang封装Rabbitmq的基础接口

Rabbitmq会用了吧,上面那个估计比较简单,但是估摸着你们还想要别的功能,好,那我就惯大家一次,干了兄弟们,奥莉给!

初始化Rabbitmq连接

为了避免每次重复调用Rabbitmq连接,我这里提供一个简单写法。

package main

import (
"context"
"fmt"

"github.com/streadway/amqp"
)

//Rabbitmq 初始化rabbitmq连接
type Rabbitmq struct {
        conn *amqp.Connection
        err  error
}

func New(ip string) (*Rabbitmq, error) {
        amqps := fmt.Sprintf("amqp://guest:[email protected]%s:5672/", ip)
        conn, err := amqp.Dial(amqps)
        if err != nil {
            return nil, err
        }
        rabbitmq := &Rabbitmq{
            conn: conn,
        }
        return rabbitmq, nil
}

创建一个Queue队列

func (rabbitmq *Rabbitmq) CreateQueue(id string) error {
        ch, err := rabbitmq.conn.Channel()
        defer ch.Close()
        if err != nil {
            return err
        }
        _, err = ch.QueueDeclare(
            id,    // name
            true,  // durable
            false, // delete when unused
            false, // exclusive
            false, // no-wait
            nil,   // arguments
        )
        if err != nil {
            return err
        }
        return nil
}

上传消息到指定的queue中

func (rabbitmq *Rabbitmq) PublishQueue(id string, body string) error {
        ch, err := rabbitmq.conn.Channel()
        defer ch.Close()
        if err != nil {
            return err
        }
        err = ch.Publish(
            "",    // exchange
            id,    // routing key
            false, // mandatory
            false,
            amqp.Publishing{
                DeliveryMode: amqp.Persistent,
                ContentType:  "text/plain",
                Body:         []byte(body),
            })
        if err != nil {
            return err
        }
        return nil
}

从队列中取出消息并且消费

func (rabbitmq *Rabbitmq) PublishQueue(id string, body string) error {
        ch, err := rabbitmq.conn.Channel()
        defer ch.Close()
        if err != nil {
            return err
        }
        err = ch.Publish(
            "",    // exchange
            id,    // routing key
            false, // mandatory
            false,
            amqp.Publishing{
                DeliveryMode: amqp.Persistent,
                ContentType:  "text/plain",
                Body:         []byte(body),
            })
        if err != nil {
            return err
        }
        return nil
}

统计队列中预备消费的数据

func (rabbitmq *Rabbitmq) GetReadyCount(id string) (int, error) {
        count := 0
        ch, err := rabbitmq.conn.Channel()
        defer ch.Close()
        if err != nil {
            return count, err
        }
        state, err := ch.QueueInspect(id)
        if err != nil {
            return count, err
        }
        return state.Messages, nil
    }

统计消费者/正在消费的数据

func (rabbitmq *Rabbitmq) GetConsumCount(id string) (int, error) {
        count := 0
        ch, err := rabbitmq.conn.Channel()
        defer ch.Close()
        if err != nil {
            return count, err
        }
        state, err := ch.QueueInspect(id)
        if err != nil {
            return count, err
        }
        return state.Consumers, nil
}

清理队列

func (rabbitmq *Rabbitmq) ClearQueue(id string) (string, error) {
        ch, err := rabbitmq.conn.Channel()
        defer ch.Close()
        if err != nil {
            return "", err
        }
        _, err = ch.QueuePurge(id, false)
        if err != nil {
            return "", err
        }
        return "Clear queue success", nil
}

总结

简单讲了一下Rabbimtq是啥,怎么用,我是怎么用的。

完整代码请访问我的Github: https://github.com/Alexanderklau/Go_poject/blob/master/Go-Rabbitmq/rabbitmq.go

如果有不懂的欢迎留言!如果能帮大家的我一定会帮!也希望你们指出我的错误!一起进步!

原文地址:https://www.cnblogs.com/Yemilice/p/12685380.html

时间: 2024-10-13 23:13:06

Golang调用Rabbitmq消息队列和封装的相关文章

(转)RabbitMQ消息队列(七):适用于云计算集群的远程调用(RPC)

在云计算环境中,很多时候需要用它其他机器的计算资源,我们有可能会在接收到Message进行处理时,会把一部分计算任务分配到其他节点来完成.那么,RabbitMQ如何使用RPC呢?在本篇文章中,我们将会通过其它节点求来斐波纳契完成示例. 1. 客户端接口 Client interface 为了展示一个RPC服务是如何使用的,我们将创建一段很简单的客户端class. 它将会向外提供名字为call的函数,这个call会发送RPC请求并且阻塞知道收到RPC运算的结果.代码如下: [python] vie

RabbitMQ消息队列(七):适用于云计算集群的远程调用(RPC) [转]

在云计算环境中,很多时候需要用它其他机器的计算资源,我们有可能会在接收到Message进行处理时,会把一部分计算任务分配到其他节点来完成.那么,RabbitMQ如何使用RPC呢?在本篇文章中,我们将会通过其它节点求来斐波纳契完成示例. 1. 客户端接口 Client interface 为了展示一个RPC服务是如何使用的,我们将创建一段很简单的客户端class. 它将会向外提供名字为call的函数,这个call会发送RPC请求并且阻塞知道收到RPC运算的结果.代码如下: [python] vie

(转)RabbitMQ消息队列(九):Publisher的消息确认机制

在前面的文章中提到了queue和consumer之间的消息确认机制:通过设置ack.那么Publisher能不到知道他post的Message有没有到达queue,甚至更近一步,是否被某个Consumer处理呢?毕竟对于一些非常重要的数据,可能Publisher需要确认某个消息已经被正确处理. 在我们的系统中,我们没有是实现这种确认,也就是说,不管Message是否被Consume了,Publisher不会去care.他只是将自己的状态publish给上层,由上层的逻辑去处理.如果Message

RabbitMQ消息队列(九):Publisher的消息确认机制

在前面的文章中提到了queue和consumer之间的消息确认机制:通过设置ack.那么Publisher能不到知道他post的Message有没有到达queue,甚至更近一步,是否被某个Consumer处理呢?毕竟对于一些非常重要的数据,可能Publisher需要确认某个消息已经被正确处理. 在我们的系统中,我们没有是实现这种确认,也就是说,不管Message是否被Consume了,Publisher不会去care.他只是将自己的状态publish给上层,由上层的逻辑去处理.如果Message

RabbitMQ 消息队列 应用

安装参考    详细介绍   学习参考 RabbitMQ 消息队列 RabbitMQ是一个在AMQP基础上完整的,可复用的企业消息系统.他遵循Mozilla Public License开源协议. MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法.应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们.消息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术.

Nginx通过LUA脚本访问RabbitMQ消息队列

发现了一个Nginx的LUA脚本:lua-resty-rabbitmqstomp,可以让Nginx通过LUA脚本访问RabbitMQ消息队列,这个脚本是通过stomp协议连接RabbitMQ的stomp适配器,来pub/sub消息的 关于RabbitMQ-STOMP安装使用相关内容可以参见:RabbitMQ STOMP Adapter 关于Nginx-LUA模块安装使用参见:LAMP架构演进到LAMPGC,再演进到LNMLGC 关于STOMP协议相关资料参见这里: STOMP官方英文协议1.1版

(转)RabbitMQ消息队列(六):使用主题进行消息分发

在上篇文章RabbitMQ消息队列(五):Routing 消息路由 中,我们实现了一个简单的日志系统.Consumer可以监听不同severity的log.但是,这也是它之所以叫做简单日志系统的原因,因为是仅仅能够通过severity设定.不支持更多的标准. 比如syslog unix的日志工具,它可以通过severity (info/warn/crit...) 和模块(auth/cron/kern...).这可能更是我们想要的:我们可以仅仅需要cron模块的log. 为了实现类似的功能,我们需

RabbitMQ消息队列(六):使用主题进行消息分发

在上篇文章RabbitMQ消息队列(五):Routing 消息路由 中,我们实现了一个简单的日志系统.Consumer可以监听不同severity的log.但是,这也是它之所以叫做简单日志系统的原因,因为是仅仅能够通过severity设定.不支持更多的标准. 比如syslog unix的日志工具,它可以通过severity (info/warn/crit...) 和模块(auth/cron/kern...).这可能更是我们想要的:我们可以仅仅需要cron模块的log. 为了实现类似的功能,我们需

(转)(二)RabbitMQ消息队列-RabbitMQ消息队列架构与基本概念

http://blog.csdn.net/super_rd/article/details/70238869 没错我还是没有讲怎么安装和写一个HelloWord,不过快了,这一章我们先了解下RabbitMQ的基本概念. RabbitMQ架构 说是架构其实更像是应用场景下的架构(自己画的有点丑,勿嫌弃) 从图中可以看出RabbitMQ主要由Exchange和Queue两部分组成,然后通过RoutingKey关联起来,消息投递到Exchange然后通过Queue接收. RabbitMQ消息队列基本概