RabbitMq 集成 spring boot 消息队列 入门Demo

spring boot 集成 RabbitMq还是很方便的。现在来一个简单的例子来集成rabbitmq。入门demo。

主要概念:

其中比较重要的概念有 4 个,分别为:虚拟主机,交换机,队列,和绑定。

虚拟主机:一个虚拟主机持有一组交换机、队列和绑定。为什么需要多个虚拟主机呢?很简单,RabbitMQ当中,用户只能在虚拟主机的粒度进行权限控制。 因此,如果需要禁止A组访问B组的交换机/队列/绑定,必须为A和B分别创建一个虚拟主机。每一个RabbitMQ服务器都有一个默认的虚拟主机“/”。

交换机:Exchange 用于转发消息,但是它不会做存储 ,如果没有 Queue bind 到 Exchange 的话,它会直接丢弃掉 Producer 发送过来的消息。 这里有一个比较重要的概念:路由键 。消息到交换机的时候,交互机会转发到对应的队列中,那么究竟转发到哪个队列,就要根据该路由键。

绑定:也就是交换机需要和队列相绑定,这其中如上图所示,是多对多的关系。

首先是配制文件。

#spring.application.name=spring-boot-rabbitmq
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

发送者:

package com.basic.rabbitmq.send;

import com.basic.rabbitmq.configuration.RabbitMqConfig2;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;

import java.util.Date;

/**
 * Created by sdc on 2017/6/17.
 */
@Service("helloSender")
public class HelloSender {

    @Autowired
    private AmqpTemplate amqpTemplate;

//    private Rabbitt

    public void send() {
        String contenxt = "order_queue_message";
        this.amqpTemplate.convertAndSend(RabbitMqConfig2.QUEUE_EXCHANGE_NAME,"order_queue_routing",contenxt);
//        this.amqpTemplate.conver
    }

}

配制信息:

package com.basic.rabbitmq.configuration;

import com.basic.rabbitmq.receiver.Receiver;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.amqp.rabbit.listener.MessageListenerContainer;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.io.IOException;

/**
 * Created by sdc on 2017/6/17.
 */
@Configuration
public class RabbitMqConfig2 {

    public static final String QUEUE_NAME = "order_queue";

    public static final String QUEUE_EXCHANGE_NAME = "topic_exchange_new";

    public static final  String routing_key = "order_queue_routing";

    @Bean
    public Queue queue() {
        //是否持久化
        boolean durable = false;
        //仅创建者可以使用该队列,断开后自动删除
        boolean exclusive = false;
        //当所有消费者都断开连接后,是否删除队列
        boolean autoDelete = false;
        return new Queue(QUEUE_NAME, durable, exclusive, autoDelete);
    }

    @Bean
    public TopicExchange exchange() {
        //是否持久化
        boolean durable = false;
        //当所有消费者都断开连接后,是否删除队列
        boolean autoDelete = false;
        return  new TopicExchange(QUEUE_EXCHANGE_NAME, durable, autoDelete);
    }

    @Bean
    public Binding binding() {
        return BindingBuilder.bind(queue()).to(exchange()).with(routing_key);
    }

    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory("127.0.0.1",5672);

        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        connectionFactory.setVirtualHost("/");
        /** 如果要进行消息回调,则这里必须要设置为true */
        connectionFactory.setPublisherConfirms(true); // 必须要设置
//        connectionFactory.setPublisherReturns();
        return connectionFactory;
    }

    @Bean
    SimpleMessageListenerContainer container() {

        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory());
//        container.setQueueNames(QUEUE_NAME);
        container.setQueues(queue());
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //设置确认模式手工确认
        container.setMessageListener(new ChannelAwareMessageListener() {
            @Override
            public void onMessage(Message message, Channel channel) throws Exception {
                byte[] body = message.getBody();
                System.out.println("收到消息 : " + new String(body));
                channel.queueDeclare(QUEUE_NAME, true, false, false, null);
//                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //确认消息成功消费
//                channel.basicAck(); //应答
//                channel.basicReject();//拒绝
//                channel.basicRecover(); //恢复
//                channel.basicQos();
//                channel.addConfirmListener(new ConfirmListener() {
//                    @Override
//                    public void handleAck(long deliveryTag, boolean multiple) throws IOException {
//                        //失败重发
//                    }
//
//                    @Override
//                    public void handleNack(long deliveryTag, boolean multiple) throws IOException {
//                        //确认ok
//                    }
//                });
            }
        });
        return  container;
    }

//    @Bean
//    MessageListenerAdapter listenerAdapter(Receiver receiver) {
//        return new MessageListenerAdapter(receiver, "receiveMessage");
//    }

}

测试类:

package com.rabbit.test;

import com.basic.rabbitmq.send.HelloSender;
import com.basic.system.Application;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import javax.annotation.Resource;

/**
 * Created by sdc on 2017/6/17.
 */
@RunWith(SpringRunner.class)
@SpringBootTest(classes = Application.class)
public class RabbitMqTest {

    @Autowired
    public HelloSender helloSender;

    @Test
    public void helloword() throws  Exception {
        helloSender.send();
    }

}

这只是一个demo,学习的时候会测试各种的事情,在这基础上更改就可以了,心中的疑虑测试没了就可以写一些项目了。

时间: 2024-10-02 02:07:26

RabbitMq 集成 spring boot 消息队列 入门Demo的相关文章

rabbitMq与spring boot搭配实现监听

在我前面有一篇博客说到了rabbitMq实现与zk类似的watch功能,但是那一篇博客没有代码实例,后面自己补了一个demo,便于理解.demo中主要利用spring boot的配置方式, 一.消费者(也就是watcher)配置 配置都采用spring的注解进行配置 1.创建连接 @Bean public ConnectionFactory createConnectionFactory() { CachingConnectionFactory connectionFactory = new C

消息队列入门(二)消息队列的开源实现

消息队列入门(二)消息队列的开源实现 关于AMQP AMQP 是 Advanced Message Queuing Protocol,即高级消息队列协议.AMQP不是一个具体的消息队列实现,而 是一个标准化的消息中间件协议.目标是让不同语言,不同系统的应用互相通信,并提供一个简单统一的模型和编程接口.目前主流的ActiveMQ和RabbitMQ都支持AMQP协议. AMQP相关的角色和职责 Producer 消息生产者 一个给exchange发送消息的程序,发送方式大致是:它首先创建一个空消息,

Angular集成Spring Boot,Spring Security,JWT和CORS

本文介绍了Spring Boot的基本配置,Angular集成Spring Boot.Spring Security的方法.当前流行的JWT更适合与Angular集成,优于Spring Secuity提供的CSRF.另外引入了springfox-swagger和spring-boot-starter-actuator,演示了如何利用Swagger生成JSON API文档,如何利用Actuator监控应用. 本文前端基于Angular官方样例Tour of Heroes,请先到官网下载. 技术堆栈

Spring Boot 2.x 入门前的准备-IntelliJ IDEA 开发工具的安装与使用

常用的用于开发 spring boot 项目的开发工具有 eclipse 和 IntelliJ IDEA 两种,最近有声音提出 visual code 也开始流行开发 java,而且确实如此, vs code 是一个很有潜力的开发工具. 本章,主要讲述在 window 和 mac 操作系统环境下如何安装 IntelliJ IDEA .注意 IntelliJ IDEA 是收费的,也有用于免费的版本供大家开发学习. 在安装 IntelliJ IDEA 之前,你应该知晓如何安装 Java JDK Sp

RabbitMq+Spring boot 消息生产者向队列发送消息 (一)

本人学习新框架方法. 一.先学习框架基本知识,也就是看这本书的前三章,了解基本概念.比如这个Rabbitmq,我会先看一些概念,比如,交换机,路由器,队列,虚拟机. 二.然后写代码,写demo,有哪些不懂的地方直接再去翻书或者google找资料,带着问题去学习,学的更快更扎实一些. 三.然后再看这个框架的应用场景,自己能否独立的写一些简单的项目,来验证自己的成果. 四.实际项目积累经验. RabbitMq 消息生产者向队列发送消息 (一) MQ分为消息生产者和消息消费者,这次做的主要是消息的生产

SpringBoot(八) Spring和消息队列RabbitMQ

概述 1.大多数应用中,可以通过消息服务中间件来提升系统异步能力和拓展解耦能力. 2.消息服务中的两个重要概念:消息代理(Message broker)和目的地(destination) 当消息发送者发送消息后,将由消息代理接管,消息代理保证消息传递到指定目的地. 3.消息队列主要有两种形式的目的地: 队列:点对点方式通信(point-to-point) 主题:发布/订阅消息服务 点对点式:消息发送者发送消息后,消息代理将其放入一个队列中,消息接受者从队列中读取数据,接受者接收数据后,将消息移除

spring整合消息队列rabbitmq

spring大家太熟,就不多说了 rabbitmq一个amqp的队列服务实现,具体介绍请参考本文http://lynnkong.iteye.com/blog/1699684 本文侧重介绍如何将rabbitmq整合到项目中 ps:本文只是简单一个整合介绍,属于抛砖引玉,具体实现还需大家深入研究哈.. 1.首先是生产者配置 ? 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32

(Spring Boot框架)快速入门

Spring Boot 系列文章推荐 Spring Boot 入门 Spring Boot 属性配置和使用 Spring Boot 集成MyBatis Spring Boot 静态资源处理 今天介绍一下如何利用Spring MVC快速的搭建一个简单的web应用. 环境准备 一个称手的文本编辑器(例如Vim.Emacs.Sublime Text)或者IDE(Eclipse.Idea Intellij) Java环境(JDK 1.7或以上版本) Maven 3.0+(Eclipse和Idea Int

Spring Boot【快速入门】

Spring Boot 概述 Build Anything with Spring Boot:Spring Boot is the starting point for building all Spring-based applications. Spring Boot is designed to get you up and running as quickly as possible, with minimal upfront configuration of Spring. 上面是引自