RabbitMQ整合spring

  1 <?xml version="1.0" encoding="UTF-8"?>
  2 <beans xmlns="http://www.springframework.org/schema/beans"
  3     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
  4     xmlns:mvc="http://www.springframework.org/schema/mvc" xmlns:jdbc="http://www.springframework.org/schema/jdbc"
  5     xmlns:jee="http://www.springframework.org/schema/jee" xmlns:aop="http://www.springframework.org/schema/aop"
  6     xmlns:tx="http://www.springframework.org/schema/tx" xmlns:task="http://www.springframework.org/schema/task"
  7     xmlns:rabbit="http://www.springframework.org/schema/rabbit"
  8     xsi:schemaLocation="http://www.springframework.org/schema/beans
  9      http://www.springframework.org/schema/beans/spring-beans.xsd
 10       http://www.springframework.org/schema/context
 11        http://www.springframework.org/schema/context/spring-context-4.3.xsd
 12         http://www.springframework.org/schema/mvc
 13          http://www.springframework.org/schema/mvc/spring-mvc.xsd
 14             http://www.springframework.org/schema/tx
 15             http://www.springframework.org/schema/tx/spring-tx.xsd
 16             http://www.springframework.org/schema/aop
 17             http://www.springframework.org/schema/aop/spring-aop.xsd
 18             http://www.springframework.org/schema/task
 19         http://www.springframework.org/schema/task/spring-task.xsd
 20             http://www.springframework.org/schema/rabbit
 21             http://www.springframework.org/schema/rabbit/spring-rabbit-1.7.xsd">
 22
 23     <description>rabbitmq 连接服务配置</description>
 24     <!-- 不适用【发布确认】连接配置 -->
 25     <rabbit:connection-factory id="rabbitConnectionFactory"
 26         host="172.18.112.102" username="woms" password="woms" port="5672"
 27         virtual-host="lingyi" channel-cache-size="25" cache-mode="CHANNEL" publisher-confirms="true" publisher-returns="true" connection-timeout="200"/>
 28
 29
 30
 31  <bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate">
 32         <property name="backOffPolicy">
 33             <bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy">
 34                 <property name="initialInterval" value="200" />
 35                 <property name="maxInterval" value="30000" />
 36             </bean>
 37         </property>
 38         <property name="retryPolicy">
 39             <bean class="org.springframework.retry.policy.SimpleRetryPolicy">
 40                 <property name="maxAttempts" value="5"/>
 41             </bean>
 42         </property>
 43     </bean>
 44
 45
 46
 47     <!--通过指定下面的admin信息,当前producer中的exchange和queue会在rabbitmq服务器上自动生成 如果使用多exchange必须配置declared-by="connectAdmin" -->
 48     <rabbit:admin id="connectAdmin" connection-factory="connectionFactory"/>
 49
 50     <rabbit:template id="ampqTemplate" connection-factory="connectionFactory"
 51         exchange="test-mq-exchange" return-callback="sendReturnCallback"
 52         message-converter="jsonMessageConverter" routing-key="test_queue_key"
 53         mandatory="true" confirm-callback="confirmCallback" retry-template="retryTemplate"/>
 54
 55
 56     <bean id="confirmCallback" class="ly.net.rabbitmq.MsgSendConfirmCallBack" />
 57     <bean id="sendReturnCallback" class="ly.net.rabbitmq.MsgSendReturnCallback" />
 58     <!-- 消息对象json转换类 -->
 59     <bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" />
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74     <!-- queue配置 -->
 75     <!-- durable:是否持久化 -->
 76     <!-- exclusive: 仅创建者可以使用的私有队列,断开后自动删除 -->
 77     <!-- auto_delete: 当所有消费客户端连接断开后,是否自动删除队列 -->
 78     <rabbit:queue id="test_queue_key" name="test_queue_key" durable="true" auto-delete="false" exclusive="false" declared-by="rabbitAdmin" />
 79
 80
 81
 82
 83     <!-- exchange配置 -->
 84     <!-- rabbit:direct-exchange:定义exchange模式为direct,意思就是消息与一个特定的路由键完全匹配,才会转发。 -->
 85     <!-- rabbit:binding:设置消息queue匹配的key -->
 86     <rabbit:direct-exchange name="test-mq-exchange"
 87         durable="true" auto-delete="false" id="test-mq-exchange" declared-by="rabbitAdmin">
 88         <rabbit:bindings>
 89             <rabbit:binding queue="test_queue_key" key="test_queue_key" />
 90         </rabbit:bindings>
 91     </rabbit:direct-exchange>
 92
 93     <!-- <rabbit:topic-exchange name="${mq.queue}_exchange" durable="true" auto-delete="false"> -->
 94     <!-- <rabbit:bindings> -->
 95     <!-- 设置消息Queue匹配的pattern (direct模式为key) -->
 96     <!-- <rabbit:binding queue="test_queue" pattern="${mq.queue}_patt"/> -->
 97     <!-- </rabbit:bindings> -->
 98     <!-- </rabbit:topic-exchange> -->
 99
100
101     <bean id="mqConsumer" class="ly.net.rabbitmq.MQConsumer" />
102     <bean id="mqConsumer1" class="ly.net.rabbitmq.MQConsumerManual" />
103
104     <!-- listener配置 消费者 自动确认 -->
105     <!-- queues:监听的队列,多个的话用逗号(,)分隔 ref:监听器 -->
106     <rabbit:listener-container
107         connection-factory="connectionFactory" acknowledge="auto"
108         message-converter="jsonMessageConverter">
109         <rabbit:listener queues="test_queue_key" ref="mqConsumer" />
110     </rabbit:listener-container>
111     <!-- 消费者 手动确认 -->
112     <rabbit:listener-container
113         connection-factory="connectionFactory" acknowledge="manual">
114         <rabbit:listener queues="test_queue_key" ref="mqConsumer1" />
115     </rabbit:listener-container>
116
117
118
119
120
121
122
123 </beans>
 1 package ly.net.rabbitmq;
 2
 3
 4 import org.springframework.amqp.rabbit.core.RabbitTemplate;
 5 import org.springframework.amqp.rabbit.support.CorrelationData;
 6 public class MsgSendConfirmCallBack implements RabbitTemplate.ConfirmCallback {
 7
 8     @Override
 9     public void confirm(CorrelationData correlationData, boolean ack, String cause) {
10         // TODO Auto-generated method stub
11         if (ack) {
12             System.out.println("消息确认成功");
13         } else {
14             //处理丢失的消息
15             System.out.println("消息确认失败,"+cause);
16         }
17     }
18
19 } 
package ly.net.rabbitmq;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.beans.factory.annotation.Autowired;

public class MsgSendReturnCallback implements RabbitTemplate.ReturnCallback{
    @Autowired
    private RabbitTemplate errorTemplate;

    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        String msgJson  = new String(message.getBody());
        System.out.println("Returned Message:"+msgJson); 

        //重新发布
//        RepublishMessageRecoverer recoverer = new RepublishMessageRecoverer(errorTemplate,"errorExchange", "errorRoutingKey");
//        Throwable cause = new Exception(new Exception("route_fail_and_republish"));
//        recoverer.recover(message,cause);
//        System.out.println("Returned Message:"+replyText);
//
    }

}
 1 package ly.net.rabbitmq;
 2
 3 import org.springframework.amqp.core.Message;
 4 import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
 5
 6 import com.rabbitmq.client.Channel;
 7
 8 public class MQConsumerManual implements ChannelAwareMessageListener {
 9
10     @Override
11     public void onMessage(Message message, Channel channel) throws Exception {
12         // TODO Auto-generated method stub
13         //手动确认
14         channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
15     }
16
17 }
@Service
public class MQProducerImpl implements MQProducer {
    @Autowired
    private AmqpTemplate amqpTemplate;

    private final static Logger LOGGER = Logger.getLogger(MQProducerImpl.class);
   /*
    * convertAndSend:将Java对象转换为消息发送到匹配Key的交换机中Exchange,由于配置了JSON转换,这里是将Java对象转换成JSON字符串的形式。
    * 原文:Convert a Java object to an Amqp Message and send it to a default exchange with a specific routing key.
    */
    @Override
    public void sendDataToQueue(String queueKey, Object object) {
        try {
            amqpTemplate.convertAndSend(object);

        } catch (Exception e) {
            LOGGER.error(e);
        }

    }
}
public interface MQProducer {
    /**
     * 发送消息到指定队列
     * @param queueKey
     * @param object
     */
    public void sendDataToQueue(String queueKey, Object object);
}
时间: 2024-10-03 13:27:34

RabbitMQ整合spring的相关文章

RabbitMQ整合Spring Booot点对点模式

pom.xml: <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <m

消息中间件——RabbitMQ(十)RabbitMQ整合SpringBoot实战!(全)

前言 1. SpringBoot整合配置详解 publisher-confirms,实现一个监听器用于监听Broker端给我们返回的确认请求:RabbitTemplate.ConfirmCallback publisher-returns,保证消息对Broker端是可达的,如果出现路由键不可达的情况,则使用监听器对不可达的消息进行后续的处理,保证消息的路由成功:RabbitTemplate.ReturnCallback 注意一点,在发送消息的时候对template进行配置mandatory=tr

RabbitMQ交换机、RabbitMQ整合springCloud

目标 1.交换机 2.RabbitMQ整合springCloud 交换机 蓝色区域===生产者 红色区域===Server:又称Broker,接受客户端的连接,实现AMQP实体服务 绿色区域===消费者 黄色区域===就是我们的交换机以及队列 由生产者投递信息到RabbitMQ Server里面某一个交换机对应的队列中,消费者则是从对应的队列中获取信息 交换机属性: Name:交换机名称 Type:交换机类型 direct.topic.fanout.headers Durability:是否需要

Mybatis整合Spring 【转】

根据官方的说法,在ibatis3,也就是Mybatis3问世之前,Spring3的开发工作就已经完成了,所以Spring3中还是没有对Mybatis3的支持.因此由Mybatis社区自己开发了一个Mybatis-Spring用来满足Mybatis用户整合Spring的需求.下面就将通过Mybatis-Spring来整合Mybatis跟Spring的用法做一个简单的介绍. MapperFactoryBean 首先,我们需要从Mybatis官网上下载Mybatis-Spring的jar包添加到我们项

Netty5快速入门及实例视频教程(整合Spring)

Netty5快速入门及实例视频教程+源码(整合Spring) https://pan.baidu.com/s/1pL8qF0J 01.传统的Socket分析02.NIO的代码分析03.对于NIO的一些疑惑04.Netty服务端HelloWorld入门05.Netty服务端入门补充06.Netty客户端入门07.如何构建一个多线程NIO系统08.Netty源码分析一09.Netty源码分析二10.Netty5服务端入门案例11.Netty5客户端入门案例12.单客户端多连接程序13.Netty学习

框架整合——Spring与MyBatis框架整合

Spring整合MyBatis 1. 整合 Spring [整合目标:在spring的配置文件中配置SqlSessionFactory以及让mybatis用上spring的声明式事务] 1). 加入 Spring 的 jar 包和配置文件 <1>.Spring框架需要的jar包: com.springsource.net.sf.cglib-2.2.0.jarcom.springsource.org.aopalliance-1.0.0.jarcom.springsource.org.aspect

Shiro整合Spring

首先需要添加shiro的spring整合包. 要想在WEB应用中整合Spring和Shiro的话,首先需要添加一个由spring代理的过滤器如下: <!-- The filter-name matches name of a 'shiroFilter' bean inside applicationContext.xml --> <filter> <filter-name>shiroFilter</filter-name> <filter-class&

整合 Spring + SpringMVC + MyBatis

< 一 > POM 配置文件 ( 如果出现 JAR 包 引入错误, 请自行下载 ) <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.

【Java EE 学习第81天】【CXF框架】【CXF整合Spring】

一.CXF简介 CXF是Apache公司下的项目,CXF=Celtix+Xfire:它支持soap1.1.soap1.2,而且能够和spring进行快速无缝整合. 另外jax-ws是Sun公司发布的一套开发WebService服务的标准.早期的标准如jax-rpc已经很少使用,而cxf就是在新标准jax-ws下开发出来的WebService,jax-ws也内置到了jdk1.6当中. CXF官方下载地址:http://cxf.apache.org/download.html 下载完成之后,解压开压