RabbitMq入门实战

1.依赖

<!--rabbitMQ-->
<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-amqp</artifactId>
</dep

2.配置信息

#设置端口
server.port=80
#安装的RabbitMq的服务器IP
spring.rabbitmq.host=192.168.***.**
#安装的RabbitMq的服务器端口
spring.rabbitmq.port=5672
#安装的RabbitMq的用户名
spring.rabbitmq.username=xxx
#安装的RabbitMq的密码
spring.rabbitmq.password=xxx
#消息确认机制
spring.rabbitmq.publisher-confirms=true
#与消息确认机制联合使用,保证能够收到回调
spring.rabbitmq.publisher-returns=true
#消息确认模式 MANUAL:手动确认  NONE:不确认  AUTO:自动确认
spring.rabbitmq.listener.simple.acknowledge-mode=auto
#消费者
spring.rabbitmq.listener.simple.concurrency=10
spring.rabbitmq.listener.simple.max-concurrency=10
#发布后重试
spring.rabbitmq.listener.simple.retry.enabled=true
spring.rabbitmq.listener.simple.retry.initial-interval=5000
spring.rabbitmq.listener.simple.retry.max-attempts=5
#每隔多久进行重试
spring.rabbitmq.template.retry.multiplier=1.0
#消费失败后重新消费
spring.rabbitmq.listener.simple.default-requeue-rejected=false
#自定义的vhost
spring.rabbitmq.dev-virtual-host=devVir
spring.rabbitmq.test-virtual-host=testVir

3.配置信息:此处为多个Vhost配置,单个可直接使用,无需另外配置,只需声明队列信息即可

package com.rabbit.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;

/**
 * 2019年7月7日15:43:38 Joelan整合 RabbitConfig 概念介绍:
 * 1.Queue:队列,是RabbitMq的内部对象,用于存储消息,RabbitMq的多个消费者可以订阅同一个队列,此时队列会以轮询的方式给多个消费者消费,而非多个消费者都收到所有的消息进行消费
 * 注意:RabbitMQ不支持队列层面的广播消费,如果需要广播消费,可以采用一个交换器通过路由Key绑定多个队列,由多个消费者来订阅这些队列的方式。
 * 2.Exchange:交换器,在RabbitMq中,生产者并非直接将消息投递到队列中。真实情况是,生产者将消息发送到Exchange(交换器),由交换器将消息路由到一个或多个队列中。
 * 注意:如果路由不到,或返回给生产者,或直接丢弃,或做其它处理。
 * 3.RoutingKey:路由Key,生产者将消息发送给交换器的时候,一般会指定一个RoutingKey,用来指定这个消息的路由规则。这个路由Key需要与交换器类型和绑定键(BindingKey)联合使用才能
 * 最终生效。在交换器类型和绑定键固定的情况下,生产者可以在发送消息给交换器时通过指定RoutingKey来决定消息流向哪里。
 * 4.Binding:RabbitMQ通过绑定将交换器和队列关联起来,在绑定的时候一般会指定一个绑定键,这样RabbitMQ就可以指定如何正确的路由到队列了。
 */
@Configuration
public class RabbitConfig {

    /**
     * RabbitMq的主机地址
     */
    @Value("${spring.rabbitmq.host}")
    private String host;

    /**
     * RabbitMq的端口
     */
    @Value("${spring.rabbitmq.port}")
    private Integer port;

    /**
     * 用户账号
     */
    @Value("${spring.rabbitmq.username}")
    private String username;

    /**
     * 用户密码
     */
    @Value("${spring.rabbitmq.password}")
    private String password;

    /**
     * 消息确认,回调机制
     */
    @Value("${spring.rabbitmq.publisher-confirms}")
    private boolean confirms;
    @Value("${spring.rabbitmq.publisher-returns}")
    private boolean returns;

    /**
     * vhost:dev
     */
    @Value("${spring.rabbitmq.dev-virtual-host}")
    private String hrmDevVirtualHost;

    /**
     * vhost:test
     */
    @Value("${spring.rabbitmq.test-virtual-host}")
    private String hrmTestVirtualHost;

    /**
     * 若一个项目只使用一个virtualHost的话,默认只需要在配置文件中配置其属性即可
     * 若项目中使用到多个virtualHost,那么可以以通过创建ConnectionFactory的方式指定不同的virtualHost
     */
    public ConnectionFactory createConnectionFactory(String host, Integer port, String username, String password,
            String vHost) {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setHost(host);
        connectionFactory.setPort(port);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        connectionFactory.setSimplePublisherConfirms(confirms);
        connectionFactory.setPublisherReturns(returns);
        connectionFactory.setVirtualHost(vHost);
        return connectionFactory;
    }

    // ----------------------------------------------------------------------------------------第一步,创建消息连接,第一个VirtualHost
    /**
     * 创建指定vhost:dev的连接工厂
     */
    @Primary
    @Bean(name = "devConnectionFactory")
    public ConnectionFactory devConnectionFactory() {
        return createConnectionFactory(host, port, username, password, hrmDevVirtualHost);
    }

    /**
     * 若有多个vhost则自定义RabbitMqTemplate 通过名称指定对应的vhost
     */
    @Primary
    @Bean(name = "devRabbitTemplate")
    public RabbitTemplate devRabbitTemplate(
            @Qualifier(value = "devConnectionFactory") ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        // 消息确认机制,ConnectionFactory中必须设置回调机制(publisher-confirms,publisher-returns)
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                if (ack) {
                    System.out.println("消息id为: " + correlationData + "的消息,已经被ack成功");
                } else {
                    System.out.println("消息id为: " + correlationData + "的消息,消息nack,失败原因是:" + cause);
                }
            }
        });
        return rabbitTemplate;
    }

    // ----------------------------------------------------------------------------------------第二个VirtualHost,以此类推
    /**
     * 创建指定vhost:test的连接工厂
     */
    @Bean(name = "testConnectionFactory")
    public ConnectionFactory testConnectionFactory() {
        return createConnectionFactory(host, port, username, password, hrmTestVirtualHost);
    }

    /**
     * 若有多个vhost则自定义RabbitMqTemplate 通过名称指定对应的vhost,此处未使用回调
     */
    @Bean(name = "testRabbitTemplate")
    public RabbitTemplate testRabbitTemplate(
            @Qualifier(value = "testConnectionFactory") ConnectionFactory connectionFactory) {
        // RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
        // Map<String, Object> args = new HashMap<String, Object>();
        // args.put("x-dead-letter-exchange","test_topic_exchange");
        // Queue queue = new Queue("test_topic_queue");
        return new RabbitTemplate(connectionFactory);
    }

    // ----------------------------------------------------------------------------------------第二步,声明队列信息,Fanout模式
    /**
     * 创建队列 参数name:队列的名称,不能为空;设置为“”以使代理生成该名称。
     * 参数durable:true表示为持久队列,该队列将在服务器重新启动后继续存在
     * 参数exclusive:如果声明独占队列,则为true,该队列将仅由声明者的连接使用
     * 参数autoDelete:如果服务器不再使用队列时应将其删除,则自动删除为true 参数arguments:用于声明队列的参数
     */
    @Bean
    public Queue testFanoutQueue() {
        /*
         * 1.new Queue(name); return new Queue("test_fanout_queue");
         */

        /*
         * 2.new Queue(name,durable);
         */
        return new Queue("test_fanout_queue", true, false, true);

        /*
         * 3.new Queue(name,durable,exclusive,autoDelete); return new
         * Queue("test_fanout_queue", true, false, false);
         */

        /*
         * 4.new Queue(name,durable,exclusive,autoDelete,arguments); return new
         * Queue("test_fanout_queue", true, true, true, null);
         */
    }

    /**
     * 创建交换机 1.fanout:扇形交换器,它会把发送到该交换器的消息路由到所有与该交换器绑定的队列中,如果使用扇形交换器,则不会匹配路由Key
     * 白话:一个交换机可以绑定N个队列,此模式会将写入的队列发送到一个交换机,由此交换机发送到N个队列中,那么监听该队列的消费者都能收到对应的消息
     */
    @Bean
    @Primary
    public Exchange testFanoutExchange() {
        return new FanoutExchange("test_fanout_exchange");
    }

    /**
     * 绑定队列到交换机 Fanout模式不需要RoutingKey
     */
    @Bean
    public Binding testFanoutBinding() {
        return BindingBuilder.bind(testFanoutQueue()).to(testFanoutExchange()).with("").noargs();
    }

    // ----------------------------------------------------------------------------------------Direct模式
    /**
     * 创建队列
     */
    @Bean
    public Queue testDirectQueue() {
        return new Queue("test_direct_queue", true, false, true);
    }

    /**
     * 创建交换机 2.direct交换器 直连模式,会把消息路由到RoutingKey与BindingKey完全匹配的队列中。
     * 白话:直连模式在绑定队列到交换机的时候,RoutingKey与发送队列的RoutingKey要完全保持一致
     */
    @Bean
    public Exchange testDirectExchange() {
        return new TopicExchange("test_direct_exchange");
    }

    /**
     * 绑定队列到交换机并指定一个路由,此处的RoutingKey为test,发送队列时也必须使用test
     */
    @Bean
    public Binding testDirectBinding() {
        return BindingBuilder.bind(testDirectQueue()).to(testDirectExchange()).with("test").noargs();
    }

    // ----------------------------------------------------------------------------------------Topic模式
    /**
     * 创建队列
     */
    @Bean
    public Queue testQueue() {
        return new Queue("test_topic_queue", true, false, true);
    }

    /**
     * 创建交换机 2.topic 匹配模式(个人)与直连模式区别:RoutingKey可以模糊匹配,两种匹配风格: *匹配 #匹配
     * 我们的RoutingKey和BindKey为一个点分隔的字符串,例:test.routing.client
     * 那么我们的模糊匹配,*可以匹配一个单词,即:*.routing.* 可以匹配到 test.routing.client,
     * #可以匹配多个单词,即:#.client 可以匹配到 test.routing.client,以此类推
     */
    @Bean
    public Exchange topicExchange() {
        return new TopicExchange("test_topic_exchange");
    }

    /**
     * 绑定队列到交换机并指定一个路由
     */
    @Bean
    public Binding testBinding() {
        return BindingBuilder.bind(testQueue()).to(topicExchange()).with("test.*").noargs();
    }

    // ----------------------结束强调:第一步创建连接,第二步声明队列,交换器,路由Key信息,第三步发送队列,第四步监听队列
}

4.发送队列

package com.rabbit.send;

import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;

/**
 * RabbitSend
 */
@Component
public class RabbitSend {

    @Autowired
    @Qualifier(value = "devRabbitTemplate")
    private RabbitTemplate rabbitTemplate;

    /**
     * 发送一条Fanout扇形队列
     */
    public void sendTestFanoutMsg(String msg) {
        rabbitTemplate.convertAndSend("test_fanout_exchange", "", msg);
    }

    /**
     * 发送一条Direct直连队列
     * 若有开启回调机制,必须传此参数new CorrelationData("1"),用于声明ID
     */
    public void sendTestDirectMsg(String msg) {
        rabbitTemplate.convertAndSend("test_direct_exchange", "test", msg, new CorrelationData("1"));
    }

    /**
     * 发送一条Topic消息队列
     */
    public void sendTestMsg(String msg) {
        rabbitTemplate.convertAndSend("test_topic_exchange", "test.mq", msg);
    }
}

5.监听队列

package com.rabbit.receiver;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * RabbitReceiver
 */
@Component
public class RabbitReceiver {

    @RabbitHandler
    @RabbitListener(queues = "test_fanout_queue")
    public void handlerFanout(String msg) {
        System.out.println("RabbitReceiver:" + msg + "test_fanout_queue");
    }

    @RabbitHandler
    @RabbitListener(queues = "test_direct_queue")
    public void handlerDirect(String msg) {
        System.out.println("RabbitReceiver:" + msg + "test_direct_queue");
    }

    @RabbitHandler
    @RabbitListener(queues = "test_topic_queue")
    public void handlerTopic(String msg) {
        System.out.println("RabbitReceiver:" + msg);
    }

}

  

原文地址:https://www.cnblogs.com/joelan0927/p/11143408.html

时间: 2024-11-05 23:17:52

RabbitMq入门实战的相关文章

RabbitMQ入门(二)工作队列

??在文章RabbitMQ入门(一)之Hello World,我们编写程序通过指定的队列来发送和接受消息.在本文中,我们将会创建工作队列(Work Queue),通过多个workers来分配耗时任务. ??工作队列(Work Queue,也被成为Task Queue,任务队列)的中心思想是,避免立即执行一个资源消耗巨大且必须等待其完成的任务.相反地,我们调度好队列可以安排该任务稍后执行.我们将一个任务(task)封装成一个消息,将它发送至队列.一个在后台运行的work进程将会抛出该任务,并最终执

JMeter学习-004-WEB脚本入门实战

此文为 JMeter 入门实战实例.我是 JMeter 初学菜鸟一个,因而此文适合 JMeter 初学者参阅.同时,因本人知识有限,若文中存在不足的地方,敬请大神不吝指正,非常感谢! 闲话少述,话归正题.本文将从 Chrome浏览器代理配置.JMeter HTTP代理服务器 配置.JMeter HTTP代理服务器获取HTTP请求.JMeter脚本创建.JMeter脚本运行 五个方面,分三大块对 JMeter 初级实战应用配以图文,进行详细步骤描述,真正的做到一步一步.从无到有. 一.WEB 浏览

Apache Curator入门实战

版权声明:本文为博主原创文章,未经博主允许不得转载. 目录(?)[+] Apache Curator入门实战 Curator是Netflix公司开源的一个Zookeeper客户端,与Zookeeper提供的原生客户端相比,Curator的抽象层次更高,简化了Zookeeper客户端的开发量. 1.Zookeeper安装部署 Zookeeper的部署很简单,如果已经有Java运行环境的话,下载tarball解压后即可运行. [root@vm Temp]$ wget http://mirror.bi

003-Tuple、Array、Map与文件操作入门实战

003-Tuple.Array.Map与文件操作入门实战 Tuple 各个元素可以类型不同 注意索引的方式 下标从1开始 灵活 Array 注意for循环的until用法 数组的索引方式 上面的for是下标索引(繁琐用的不多) 下面的for是增强for循环的元素遍历索引(推荐) Map 注意左边是Key,右边是Value _(下划线也可以作为占位符,形成结构,但无名称不可以访问) 文件操作 进行了Source包的引入 .fromFile() getLines 使用了Iterator 欢迎广大爱好

2.RABBITMQ 入门 - WINDOWS - 生产和消费消息 一个完整案例

关于安装和配置,见上一篇 1.RABBITMQ 入门 - WINDOWS - 获取,安装,配置 公司有需求,要求使用winform开发这个东西(消息中间件),另外还要求开发一个日志中间件,但是也是要求做成win form的,这明显不合理,因为之前,服务器上我已经放置了一个  短信的winform的服务.那么到后期的话,登录服务器之后,全是 一个个的窗体挂在那儿,这明显合不合常理,但是领导要求这么玩,也没办法, 因为卧虎要负责的是消费 消息,所以重点说明 消费端 该案例的接收端,源自网上的代码片段

Scala深入浅出实战经典-----002Scala函数定义、流程控制、异常处理入门实战

002-Scala函数定义.流程控制.异常处理入门实战 Scala函数定义 语句结束无分号 定义无参函数 def 函数名称(参数名称:参数类型)[:Unit=]{ 函数体 } 老师的代码 我的实际代码 原因是集成开发环境自带的版本为2.11.0 变量 常量(不可变)声明 val 变量声明 var 无参函数的调用也无需加括号() 定义有参有返回值的函数 def 函数名称(参数名称:参数类型...):返回值类型={ 函数体 } 老师代码 注意最后一个是b是本函数的返回值 默认最后一行为返回值 流程控

RabbitMQ入门与使用篇

介绍 RabbitMQ是一个由erlang开发的基于AMQP(Advanced Message Queue)协议的开源实现.用于在分布式系统中存储转发消息,在易用性.扩展性.高可用性等方面都非常的优秀.是当前最主流的消息中间件之一. RabbitMQ的官方 概念: Brocker:消息队列服务器实体. Exchange:消息交换机,指定消息按什么规则,路由到哪个队列. Queue:消息队列,每个消息都会被投入到一个或者多个队列里. Binding:绑定,它的作用是把exchange和queue按

Spark入门实战系列--8.Spark MLlib(上)--机器学习及SparkMLlib简介

[注]该系列文章以及使用到安装包/测试数据 可以在<倾情大奉送--Spark入门实战系列>获取 1.机器学习概念 1.1 机器学习的定义 在维基百科上对机器学习提出以下几种定义: l“机器学习是一门人工智能的科学,该领域的主要研究对象是人工智能,特别是如何在经验学习中改善具体算法的性能”. l“机器学习是对能通过经验自动改进的计算机算法的研究”. l“机器学习是用数据或以往的经验,以此优化计算机程序的性能标准.” 一种经常引用的英文定义是:A computer program is said

Scala深入浅出实战经典-----002-Scala函数定义、流程控制、异常处理入门实战

002-Scala函数定义.流程控制.异常处理入门实战 Scala函数定义 语句结束无分号 定义无参函数 def 函数名称(参数名称:参数类型)[:Unit=]{ 函数体 } 老师的代码 我的实际代码 原因是集成开发环境自带的版本为2.11.0 变量 常量(不可变)声明 val 变量声明 var 无参函数的调用也无需加括号() 定义有参有返回值的函数 def 函数名称(参数名称:参数类型...):返回值类型={ 函数体 } 老师代码 注意最后一个是b是本函数的返回值 默认最后一行为返回值 流程控