springboot+rabbitmq整合示例程

关于什么是rabbitmq,请看另一篇文:

http://www.cnblogs.com/boshen-hzb/p/6840064.html

一、新建maven工程:springboot-rabbitmq

二、引入springboot和rabbitmq的依赖

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.springboot.rabbitmq</groupId>
  <artifactId>springboot-rabbitmq</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <name>springboot-rabbitmq</name>
  <description>springboot-rabbitmq</description>

  <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>1.4.1.RELEASE</version>
  </parent>
  <dependencies>
     <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
     </dependency>
     <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
     </dependency>
     <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
  </dependencies>
</project>
spring-boot-starter-test是为了后面写测试类用,
spring-boot-starter-amqp才是真正的使用rabbitmq的依赖

三、在src/main/resources里面新增application.properties该配置文件主要是对rabbimq的配置信息
spring.application.name=springboot-rabbitmq
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=15672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

四、新建springboot主类Application

该类初始化创建队列、转发器,并把队列绑定到转发器

package com.rabbit;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;

@SpringBootApplication
public class Application {
    final static String queueName = "hello";

    @Bean
    public Queue helloQueue() {
        return new Queue("hello");
    }

    @Bean
    public Queue userQueue() {
        return new Queue("user");
    }

    //===============以下是验证topic Exchange的队列==========
    @Bean
    public Queue queueMessage() {
        return new Queue("topic.message");
    }

    @Bean
    public Queue queueMessages() {
        return new Queue("topic.messages");
    }
  //===============以上是验证topic Exchange的队列==========

    //===============以下是验证Fanout Exchange的队列==========
    @Bean
    public Queue AMessage() {
        return new Queue("fanout.A");
    }

    @Bean
    public Queue BMessage() {
        return new Queue("fanout.B");
    }

    @Bean
    public Queue CMessage() {
        return new Queue("fanout.C");
    }
    //===============以上是验证Fanout Exchange的队列==========

    @Bean
    TopicExchange exchange() {
        return new TopicExchange("exchange");
    }
    @Bean
    FanoutExchange fanoutExchange() {
        return new FanoutExchange("fanoutExchange");
    }

    /**
     * 将队列topic.message与exchange绑定,routing_key为topic.message,就是完全匹配
     * @param queueMessage
     * @param exchange
     * @return
     */
    @Bean
    Binding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange) {
        return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message");
    }

    /**
     * 将队列topic.messages与exchange绑定,routing_key为topic.#,模糊匹配
     * @param queueMessage
     * @param exchange
     * @return
     */
    @Bean
    Binding bindingExchangeMessages(Queue queueMessages, TopicExchange exchange) {
        return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#");
    }

    @Bean
    Binding bindingExchangeA(Queue AMessage,FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(AMessage).to(fanoutExchange);
    }

    @Bean
    Binding bindingExchangeB(Queue BMessage, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(BMessage).to(fanoutExchange);
    }

    @Bean
    Binding bindingExchangeC(Queue CMessage, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(CMessage).to(fanoutExchange);
    }

    public static void main(String[] args) throws Exception {
        SpringApplication.run(Application.class, args);
    }
}

五、各种情景实现

1、最简单的hello生产和消费实现(单生产者和单消费者)

生产者:

package com.rabbit.hello;

import java.util.Date;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class HelloSender1 {

    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void send() {
        String sendMsg = "hello1 " + new Date();
        System.out.println("Sender1 : " + sendMsg);
        this.rabbitTemplate.convertAndSend("helloQueue", sendMsg);
    }

}

消费者:

package com.rabbit.hello;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = "helloQueue")
public class HelloReceiver1 {

    @RabbitHandler
    public void process(String hello) {
        System.out.println("Receiver1  : " + hello);
    }

}

controller:

package com.rabbit.controller;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import com.rabbit.hello.HelloSender1;

@RestController
@RequestMapping("/rabbit")
public class RabbitTest {

    @Autowired
    private HelloSender1 helloSender1;
    @Autowired
    private HelloSender1 helloSender2;

    @PostMapping("/hello")
    public void hello() {
        helloSender1.send();
    }
}

启动程序,执行:

结果如下:

Sender1 : hello1 Thu May 11 17:23:31 CST 2017
Receiver2  : hello1 Thu May 11 17:23:31 CST 2017

2、单生产者-多消费者

生产者:

package com.rabbit.hello;

import java.util.Date;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class HelloSender1 {

    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void send(String msg) {
        String sendMsg = msg + new Date();
        System.out.println("Sender1 : " + sendMsg);
        this.rabbitTemplate.convertAndSend("helloQueue", sendMsg);
    }

}

消费者1:

package com.rabbit.hello;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = "helloQueue")
public class HelloReceiver1 {

    @RabbitHandler
    public void process(String hello) {
        System.out.println("Receiver1  : " + hello);
    }

}

消费者2:

package com.rabbit.hello;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = "helloQueue")
public class HelloReceiver2 {

    @RabbitHandler
    public void process(String hello) {
        System.out.println("Receiver2  : " + hello);
    }

}

controller:

package com.rabbit.controller;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import com.rabbit.hello.HelloSender1;

@RestController
@RequestMapping("/rabbit")
public class RabbitTest {

    @Autowired
    private HelloSender1 helloSender1;
    @Autowired
    private HelloSender1 helloSender2;

    @PostMapping("/hello")
    public void hello() {
        helloSender1.send("hello1");
    }

    /**
     * 单生产者-多消费者
     */
    @PostMapping("/oneToMany")
    public void oneToMany() {
        for(int i=0;i<10;i++){
            helloSender1.send("hellomsg:"+i);
        }

    }
}

用post方式执行:

http://127.0.0.1:8080/rabbit/oneToMany

结果如下:

Sender1 : hellomsg:0Thu May 11 17:37:59 CST 2017
Sender1 : hellomsg:1Thu May 11 17:37:59 CST 2017
Sender1 : hellomsg:2Thu May 11 17:37:59 CST 2017
Sender1 : hellomsg:3Thu May 11 17:37:59 CST 2017
Sender1 : hellomsg:4Thu May 11 17:37:59 CST 2017
Sender1 : hellomsg:5Thu May 11 17:37:59 CST 2017
Sender1 : hellomsg:6Thu May 11 17:37:59 CST 2017
Sender1 : hellomsg:7Thu May 11 17:37:59 CST 2017
Sender1 : hellomsg:8Thu May 11 17:37:59 CST 2017
Sender1 : hellomsg:9Thu May 11 17:37:59 CST 2017
Receiver2  : hellomsg:1Thu May 11 17:37:59 CST 2017
Receiver1  : hellomsg:0Thu May 11 17:37:59 CST 2017
Receiver1  : hellomsg:3Thu May 11 17:37:59 CST 2017
Receiver1  : hellomsg:4Thu May 11 17:37:59 CST 2017
Receiver1  : hellomsg:5Thu May 11 17:37:59 CST 2017
Receiver2  : hellomsg:2Thu May 11 17:37:59 CST 2017
Receiver1  : hellomsg:6Thu May 11 17:37:59 CST 2017
Receiver2  : hellomsg:7Thu May 11 17:37:59 CST 2017
Receiver2  : hellomsg:8Thu May 11 17:37:59 CST 2017
Receiver1  : hellomsg:9Thu May 11 17:37:59 CST 2017

从以上结果可知,生产者发送的10条消息,分别被两个消费者接收了

时间: 2024-10-29 10:46:18

springboot+rabbitmq整合示例程的相关文章

springboot rabbitmq整合

这一篇我们来把消息中间件整合到springboot中 ===================================================================== 首先在服务器上安装rabbitmq的服务,用docker拉取即可,不再详细描述. 直接来撸代码 首先我们先添加rabbitmq的依赖 <dependency> <groupId>org.springframework.boot</groupId> <artifactId&g

消息中间件——RabbitMQ(十)RabbitMQ整合SpringBoot实战!(全)

前言 1. SpringBoot整合配置详解 publisher-confirms,实现一个监听器用于监听Broker端给我们返回的确认请求:RabbitTemplate.ConfirmCallback publisher-returns,保证消息对Broker端是可达的,如果出现路由键不可达的情况,则使用监听器对不可达的消息进行后续的处理,保证消息的路由成功:RabbitTemplate.ReturnCallback 注意一点,在发送消息的时候对template进行配置mandatory=tr

带着新人学springboot的应用06(springboot+RabbitMQ 中)

上一节说了这么多废话,看也看烦了,现在我们就来用鼠标点点点,来简单玩一下这个RabbitMQ. 注意:这一节还是不用敲什么代码,因为上一节我们设置了那个可视化工具,我们先用用可视化工具熟悉一下流程. 打开可视化页面,http://localhost:15672 顺便说一下RabbitMQ中的持持久化:这里持久化分为三种:消息持久化,交换器持久化,队列持久化... 举个例子,就简单说说交换器持久化,其实就是为了防止将消息发到交换器了,但是RabbitMQ服务器突然暴毙,没用了,那数据不就丧失了么?

带着新人学springboot的应用07(springboot+RabbitMQ 下)

说一两句废话,强烈推荐各位小伙伴空闲时候也可以写写自己的博客!不管水平高低,不管写的怎么样,不要觉得写不好或者水平不够就不写了(咳,我以前就是这样的想法...自我反省!). 但是开始写博客之后,你会发现很多你以为自己会的东西其实你并不会,然后你会经常在头脑中不断的搜索有关的片段,或者去别的大神博客里到处找有关的资料,最后领悟了属于自己的东西!然后再写出来和别人分享,别人也会给你点意见,你也会慢慢的改进.这不就是学习+复习+巩固+创新+分享+改进的这么的一个过程吗? 以前看过曹雪芹的红楼梦,让我印

RabbitMQ交换机、RabbitMQ整合springCloud

目标 1.交换机 2.RabbitMQ整合springCloud 交换机 蓝色区域===生产者 红色区域===Server:又称Broker,接受客户端的连接,实现AMQP实体服务 绿色区域===消费者 黄色区域===就是我们的交换机以及队列 由生产者投递信息到RabbitMQ Server里面某一个交换机对应的队列中,消费者则是从对应的队列中获取信息 交换机属性: Name:交换机名称 Type:交换机类型 direct.topic.fanout.headers Durability:是否需要

SpringBoot Kafka 整合实例教程

1.使用IDEA新建工程引导方式,创建消息生产工程 springboot-kafka-producer. 工程POM文件代码如下: 1 <?xml version="1.0" encoding="UTF-8"?> 2 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instanc

springBoot+SpringData 整合入门

SpringData概述 SpringData :Spring的一个子项目.用于简化数据库访问,支持NoSQL和关系数据存储.其主要目标是使用数据库的访问变得方便快捷. SpringData 项目所支持NoSQL存储: MongoDB(文档数据库) Neo4j(图形数据库) Redis(键/值存储) Hbase(列族数据库) SpringData 项目所支持的关系数据存储技术: JDBC JPA Spring Data : 致力于减少数据访问层 (DAO) 的开发量. 开发者唯一要做的,就只是声

springboot+security整合(1)

说明 springboot 版本 2.0.3源码地址:点击跳转 系列 springboot+security 整合(1) springboot+security 整合(2) springboot+security 整合(3) 一. 介绍 ??Spring Security 是一个能够为基于 Spring 的企业应用系统提供声明式的安全访问控制解决方案的安全框架.它提供了一组可以在 Spring 应用上下文中配置的 Bean,充分利用了 Spring IoC,DI(控制反转 Inversion o

SpringBoot RabbitMQ 延迟队列代码实现

场景 用户下单后,如果30min未支付,则删除该订单,这时候就要可以用延迟队列 准备 利用rabbitmq_delayed_message_exchange插件: 首先下载该插件:https://www.rabbitmq.com/community-plugins.html 然后把该插件放到rabbitmq安装目录plugins下: 进入到sbin目录下,执行"rabbitmq-plugins.bat enable rabbitmq_delayed_message_exchange";