spring中使用RabbitMQ

常见的消息中间件产品:

(1)ActiveMQ

ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现。

(2)RabbitMQ

AMQP协议的领导实现,支持多种场景。淘宝的MySQL集群内部有使用它进行通讯,OpenStack开源云平台的通信组件,最先在金融行业得到运用。我们在本次课程中介绍 RabbitMQ的使用。

(3)ZeroMQ

史上最快的消息队列系统

(4)Kafka

Apache下的一个子项目 。特点:高吞吐,在一台普通的服务器上既可以达到10W/s的吞吐速率;完全的分布式系统。适合处理海量数据。

(5)RocketMQ 阿里巴巴

消息中间件利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。通过提供消息传递和消息排队模型,它可以在分布式环境下扩展进程间的通信。对于消息中间件,常见的角色大致也就有Producer(生产者)、Consumer(消费者)。

消息队列中间件是分布式系统中重要的组件,主要解决应用解耦,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构。

? Spring-amqp是对AMQP协议的抽象实现,而spring-rabbit 是对协议的具体实现,也是目前的唯一实现。底层使用的就是RabbitMQ。

已经配置好了ssm的开发环境

1.导入依赖

<dependencies>
    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>5.5.3</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.amqp</groupId>
        <artifactId>spring-rabbit</artifactId>
        <version>2.1.3.RELEASE</version>
    </dependency>

    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-databind</artifactId>
        <version>2.9.5</version>
    </dependency>
</dependencies>

2.编写生产者

2.1配置文件

<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/rabbit
    http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd
    http://www.springframework.org/schema/beans
    http://www.springframework.org/schema/beans/spring-beans-4.1.xsd
    http://www.springframework.org/schema/context
    http://www.springframework.org/schema/context/spring-context.xsd">

    <context:component-scan base-package="cn.test.rabbitmq.spring"/>

<!-- 配置连接工厂 -->
<rabbit:connection-factory id="connectionFactory" virtual-host="/saas"
                           host="127.0.0.1" port="5672" username="saas" password="saas" />
<!-- 定义mq管理 -->
<rabbit:admin connection-factory="connectionFactory" />

<!-- 声明队列 -->
<rabbit:queue name="spring.test.queue" auto-declare="true" durable="true" />

<!-- 定义交换机绑定队列(路由模式) -->
<rabbit:direct-exchange  name="spring.test.exchange">
    <rabbit:bindings>
        <rabbit:binding queue="spring.test.queue" key="user.insert" />
    </rabbit:bindings>
</rabbit:direct-exchange>
<!-- 定义交换机绑定队列(路由模式)使用匹配符
<rabbit:topic-exchange  id="springTestExchange" name="spring.test.exchange">
    <rabbit:bindings>
        <rabbit:binding queue="spring.test.queue" pattern="#.#" />
    </rabbit:bindings>
</rabbit:topic-exchange>
-->
<!-- 消息对象json转换类 -->
<bean id="jsonMessageConverter"
      class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" />

<!-- 定义模版 -->
<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"
                 exchange="spring.test.exchange"
                 message-converter="jsonMessageConverter"/>

</beans>

2.2 发送方代码

这里是往RabbitMQ队列中放入任务,让消费者去取

package cn.test.rabbitmq.spring;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class MqSender {
    @Autowired
    private AmqpTemplate amqpTemplate;
    public void sendMessage(){
        //根据key发送到对应的队列
        amqpTemplate.convertAndSend("user.insert","spring整合RabbitMQ消息");
        System.out.println("发送成功........");
    }
}

2.3 测试代码

package cn.test.rabbitmq.spring;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.springframework.test.context.junit4.SpringRunner;

import java.util.HashMap;

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:applicationContext-mq-send.xml")
public class MqSendDemo {

    @Autowired
    private MqSender mqSender;
    @Test
    public void test(){
        //根据key发送到对应的队列
        mqSender.sendMessage();
    }
}

3.编写消费者

3.1 配置文件

<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/rabbit
    http://www.springframework.org/schema/rabbit/spring-rabbit.xsd
    http://www.springframework.org/schema/beans
    http://www.springframework.org/schema/beans/spring-beans.xsd">

    <!-- 配置连接工厂 -->
    <rabbit:connection-factory id="connectionFactory" virtual-host="/saas"
                               host="127.0.0.1" port="5672" username="saas" password="saas" />
    <!-- 定义mq管理 -->
    <rabbit:admin connection-factory="connectionFactory" />

    <!-- 声明队列 -->
    <rabbit:queue  name="spring.test.queue" auto-declare="true" durable="true" />

    <!-- 定义消费者 -->
    <bean id="testMqListener" class="cn.test.rabbitmq.spring.MqListener" />

    <!-- 定义消费者监听队列 -->
    <rabbit:listener-container
            connection-factory="connectionFactory">
        <rabbit:listener ref="testMqListener" queues="spring.test.queue" />
    </rabbit:listener-container>

</beans>

3.2 监听代码

package cn.test.rabbitmq.spring;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.stereotype.Component;

import java.io.UnsupportedEncodingException;

public class MqListener implements MessageListener {

    public void onMessage(Message message) {
        try {
            System.out.println(message.getBody());
            String ms = new String(message.getBody(), "UTF-8");
            System.out.println(ms);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

3.3 测试代码

package cn.itcast.rabbitmq.spring;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:applicationContext-mq-receive.xml")
public class MqReceiveDemo {

    @Test
    public void test(){
      //等待队列中放入任务,如果有任务,立即消费任务
        while (true){
        }
    }
}

原文地址:https://www.cnblogs.com/changchangchang/p/12024401.html

时间: 2024-10-29 18:17:40

spring中使用RabbitMQ的相关文章

简单介绍下怎么在spring中使用RabbitMQ

这篇文章主要介绍了简单了解如何在spring中使用RabbitMQ,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 常见的消息中间件产品: (1)ActiveMQ ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线.ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现. (2)RabbitMQ AMQP协议的领导实现,支持多种场景.淘宝的MySQL集群内部有使用它进行通讯,Open

由一个RABBITMQ监听器死循环引出的SPRING中BEAN和MAPPER接口的注入问题

1 @Slf4j 2 @RestController 3 @Component 4 public class VouchersReceiverController implements MessageListener { 5 6 @Autowired 7 private VouchersService vouchersService; 8 9 String MerchantCode = PropertyReader.getValue("MerchantCode"); 10 11 /**

RabbitMQ实例详解+Spring中的MQ使用

RabbitMQ实例详解 消息队列中间件是分布式系统中重要的组件,主要解决应用解耦,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构. Queue Queue(队列)是RabbitMQ的内部对象,用于存储消息,用下图表示. RabbitMQ中的消息都只能存储在Queue中,生产者(下图中的P)生产消息并最终投递到Queue中,消费者(下图中的C)可以从Queue中获取消息并消费. 多个消费者可以订阅同一个Queue,这时Queue中的消息会被平均分摊给多个消费者进行处理,而不

Spring Boot中使用RabbitMQ的示例代码

很久没有写Spring Boot的内容了,正好最近在写Spring Cloud Bus的内容,因为内容会有一些相关性,所以先补一篇关于AMQP的整合. http://www.ljhseo.com/http://www.xyrjkf.net/http://www.xyrjkf.cn/http://www.xyrjkf.com.cn/http://www.zjdygsi.cn/http://www.zjdaiyun.cn/http://www.jsdygsi.cn/http://www.xyrjkf

详解Spring Boot中的RabbitMQ

RabbitMQ 即一个消息队列,主要是用来实现应用程序的异步和解耦,同时也能起到消息缓冲,消息分发的作用. 消息中间件在互联网公司的使用中越来越多,刚才还看到新闻阿里将 RocketMQ 捐献给了 Apache,当然了今天的主角还是讲 RabbitMQ.消息中间件最主要的作用是解耦,中间件最标准的用法是生产者生产消息传送到队列,消费者从队列中拿取消息并处理,生产者不用关心是谁来消费,消费者不用关心谁在生产消息,从而达到解耦的目的.在分布式的系统中,消息队列也会被用在很多其它的方面,比如:分布式

spring XD结合RabbitMQ方法介绍

1,Rabbitmq工作流程 消息生产者(即下图中的ClientA,ClientB)生产message并交给交换机Exchange,Exchange将消息按照RoutingKey分发到相应的队列Queue中,而我们的消费者(即下图中的Client1,Client2,Client3)需要做的事情就是监听队列Queue的事件,当有新的message到达的时候,做出相应的处理. Rabbitmq结构图: 2,Exchange交换类型介绍 具体来说,Exchange也有几个类型,完全根据key进行投递的

Spring Boot 实现 RabbitMQ 延迟消费和延迟重试队列

本文主要摘录自:详细介绍Spring Boot + RabbitMQ实现延迟队列 并增加了自己的一些理解,记录下来,以便日后查阅. 项目源码: spring-boot-rabbitmq-delay-queue 实现 stream-rabbitmq-delay-queue 实现 背景 何为延迟队列? 顾名思义,延迟队列就是进入该队列的消息会被延迟消费的队列.而一般的队列,消息一旦入队了之后就会被消费者马上消费. 延迟队列能做什么?延迟队列多用于需要延迟工作的场景.最常见的是以下两种场景: 延迟消费

Spring Boot (25) RabbitMQ消息队列

MQ全程(Message Queue)又名消息队列,是一种异步通讯的中间件.可以理解为邮局,发送者将消息投递到邮局,然后邮局帮我们发送给具体的接收者,具体发送过程和时间与我们无关,常见的MQ又kafka.activemq.zeromq.rabbitmq等等. RabbitMQ RabbitMQ是一个遵循AMQP协议,由面向高并发的erlang语言开发而成,用在实时的对可靠性要求比较高的消息传递上,支持多种语言客户端,支持延迟队列. 基础概念 Broker:消息队列的服务器实体 Exchange:

Spring Cloud 集成 RabbitMQ

同步 or 异步 前言:我们现在有一个用微服务架构模式开发的系统,系统里有一个商品服务和订单服务,且它们都是同步通信的. 目前我们商品服务和订单服务之间的通信方式是同步的,当业务扩大之后,如果还继续使用同步的方式进行服务之间的通信,会使得服务之间的耦合增大.例如我们登录操作可能需要同步调用用户服务.积分服务.短信服务等等,而服务之间可能又依赖别的服务,那么这样一个登录过程就会耗费不少的时间,以致用户的体验降低. 那我们在微服务架构下要如何对服务之间的通信进行解耦呢?这就需要使用到消息中间件了,消