因为公司项目需要使用消息中间件,实现相关业务的异步处理,所有选用了rabbitmq.通过看文档,爬过一个一个坑,终于还是实现了相关功能。
直接上配置文件:
1 <?xml version="1.0" encoding="UTF-8"?> 2 <beans xmlns="http://www.springframework.org/schema/beans" 3 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 4 xmlns:rabbit="http://www.springframework.org/schema/rabbit" 5 xmlns:util="http://www.springframework.org/schema/util" 6 xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd 7 http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd"> 8 <bean id="connectionFactory" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory"> 9 <property name="username" value="${mq.userName}" /> 10 <property name="password" value="${mq.password}" /> 11 <property name="host" value="${mq.host}" /> 12 <property name="port" value="${mq.port}" /> 13 <property name="virtualHost" value="${mq.virtualHost}"/> 14 <property name="channelCacheSize" value="${mq.cache.size}"/> 15 </bean> 16 17 <bean id="rabbitAdmin" class="org.springframework.amqp.rabbit.core.RabbitAdmin"> 18 <constructor-arg ref="connectionFactory" /> 19 </bean> 20 <bean id="rabbitTemplate" class="org.springframework.amqp.rabbit.core.RabbitTemplate"> 21 <constructor-arg ref="connectionFactory"></constructor-arg> 22 <property name="exchange" value="${mq.exchange}"/> 23 <property name="routingKey" value="${mq.routingKey}"/> 24 <property name="queue" value="${mq.queue}"/> 25 </bean> 26 27 28 <bean id="serializerMessageConverter" class="org.springframework.amqp.support.converter.SimpleMessageConverter"></bean> 29 30 31 <bean id="queue" class="org.springframework.amqp.core.Queue"> 32 <constructor-arg index="0" value="${mq.queue}"></constructor-arg> 33 <constructor-arg index="1" value="true"></constructor-arg> 34 <constructor-arg index="2" value="false"></constructor-arg> 35 <constructor-arg index="3" value="false"></constructor-arg> 36 </bean> 37 38 39 <bean id="directExchange" class="org.springframework.amqp.core.DirectExchange"> 40 <constructor-arg index="0" value="${mq.routingKey}"></constructor-arg> 41 <constructor-arg index="1" value="true"></constructor-arg> 42 <constructor-arg index="2" value="false"></constructor-arg> 43 </bean> 44 45 <util:map id="arguments"> 46 </util:map> 47 48 49 <bean id="binding" class="org.springframework.amqp.core.Binding"> 50 <constructor-arg index="0" value="${mq.queue}"></constructor-arg> 51 <constructor-arg index="1" value="QUEUE"></constructor-arg> 52 <constructor-arg index="2" value="${mq.exchange}"></constructor-arg> 53 <constructor-arg index="3" value="${mq.routingKey}"></constructor-arg> 54 <constructor-arg index="4" value="#{arguments}"></constructor-arg> 55 </bean> 56 57 58 <bean id="rmqConsumer" class="com.tqmall.grace.biz.rabbitmq.MessageQueueReceiver"></bean> 59 60 <bean id="messageListenerAdapter" class="org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter"> 61 <constructor-arg ref="rmqConsumer" /> 62 <property name="defaultListenerMethod" value="onMessage"></property> 63 <property name="messageConverter" ref="serializerMessageConverter"></property> 64 </bean> 65 66 <bean id="listenerContainer" class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer"> 67 <property name="queues" ref="queue"></property> 68 <property name="connectionFactory" ref="connectionFactory"></property> 69 <property name="messageListener" ref="messageListenerAdapter"></property> 70 </bean> 71 </beans>
在这个配置中,我使用的exchange模式是direct,生产者发送消息到 exchange,exchange根据其生产者的routingkey,找到对应bingkey的队列queue.这样消息就会存到对应的queue中,可以把queue理解为一个存储消息的地方。消费者会从该队列中获取消息,消费消息,返回消息应答,成功,该消息从消息队列中删除。
注意点:
配置queue的时候,Durability设为durable,这样在rabbitmq服务端重启的时候,消息队列里面的消息不回丢失。
配置文件中的如下,配置了在项目启动的时候,会自动去获得exchange,queue,在rabbitmq中注册。
<bean id="rabbitAdmin" class="org.springframework.amqp.rabbit.core.RabbitAdmin"> <constructor-arg ref="connectionFactory" /> </bean> 在配置中使用的spring的CachingConnectionFactory,用它来管理rabbit的connectionFactory, 其中可以设置其的channelCacheSize,默认是1.这个相当于建立一个channel的缓存池,channel的作用类似于session.一个connection可以创建多个channel. 在该例子中使用SimpleMessageListenerContainer来管理消费者。
时间: 2024-12-24 12:33:25