1,首先引入配置文件org.springframework.amqp,如下:
<dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>1.7.1.RELEASE</version> </dependency>
2,准备工作:安装好rabbitmq,并在项目中增加配置文件 rabbit.properties 内容如下:
rmq.ip=192.188.113.114 rmq.port=5672 rmq.producer.num=20 rmq.manager.user=admin rmq.manager.password=admin
3,配置spring-rabbitmq.xml,内容如下:
<!-- 公共部分 --> <!-- 创建连接类 连接安装好的 rabbitmq --> <bean id="connectionFactory" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory"> <constructor-arg value="localhost" /> <property name="username" value="${rmq.manager.user}" /> <property name="password" value="${rmq.manager.password}" /> <property name="host" value="${rmq.ip}" /> <property name="port" value="${rmq.port}" /> </bean> <rabbit:admin connection-factory="connectionFactory"/> <!--定义消息队列,durable:是否持久化;exclusive: 仅创建者可以使用的私有队列,断开后自动删除;auto_delete: 当所有消费客户端连接断开后,是否自动删除队列 --> <rabbit:queue name="spittle.alert.queue.1" id="queue_1" durable="true" auto-delete="false" exclusive="false" /> <rabbit:queue name="spittle.alert.queue.2" id="queue_2" durable="true" auto-delete="false" exclusive="false" /> <rabbit:queue name="spittle.alert.queue.3" id="queue_3" durable="true" auto-delete="false" exclusive="false" /> <!--绑定队列,rabbitmq的exchangeType常用的三种模式:direct,fanout,topic三种,我们用direct模式,即rabbit:direct-exchange标签,Direct交换器很简单,如果是Direct类型,就会将消息中的RoutingKey与该Exchange关联的所有Binding中的BindingKey进行比较,如果相等,则发送到该Binding对应的Queue中。有一个需要注意的地方:如果找不到指定的exchange,就会报错。但routing key找不到的话,不会报错,这条消息会直接丢失,所以此处要小心 --> <rabbit:direct-exchange id="spittle.fanout" name="spittle.fanout" durable="true" auto-delete="false"> <rabbit:bindings> <rabbit:binding queue="spittle.alert.queue.1" key="{alert.queue.1}"></rabbit:binding> <rabbit:binding queue="spittle.alert.queue.2" key="{alert.queue.2}"></rabbit:binding> <rabbit:binding queue="spittle.alert.queue.3" key="{alert.queue.3}"></rabbit:binding> </rabbit:bindings> </rabbit:fanout-exchange> <!-- 生产者部分 --> <!-- 发送消息的producer类,也就是生产者 --> <bean id="msgProducer" class="com.asdf.sdf.ClassA"> <!-- value中的值就是producer中的的routingKey,也就是队列名称,它与上面的rabbit:bindings标签中的key必须相同 --> <property name="queueName" value="{alert.queue.1}"/> </bean> <!-- spring amqp默认的是jackson 的一个插件,目的将生产者生产的数据转换为json存入消息队列,由于fastjson的速度快于jackson,这里替换为fastjson的一个实现 --> <bean id="jsonMessageConverter" class="com.jy.utils.FastJsonMessageConverter"></bean> <!-- 或者配置jackson --> <!-- <bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" /> --> <rabbit:template exchange="test-exchange" id="rabbitTemplate" connection-factory="connectionFactory" message-converter="jsonMessageConverter" /> <!-- 消费者部分 --> <!-- 自定义接口类 --> <bean id="testHandler" class="com.rabbit.TestHandler"></bean> <!-- 用于消息的监听的代理类MessageListenerAdapter --> <bean id="testQueueListenerAdapter" class="org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter" > <!-- 类名 --> <constructor-arg ref="testHandler" /> <!-- 方法名 --> <property name="defaultListenerMethod" value="handlerTest"></property><property name="messageConverter" ref="jsonMessageConverter"></property> </bean> <!-- 配置监听acknowledeg="manual"设置手动应答.当消息处理失败时:会一直重发,直到消息处理成功,监听容器acknowledge="auto" concurrency="30"设置发送次数,最多发送30次 --> <rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto" concurrency="20"> <rabbit:listener queues="spittle.alert.queue.1" ref="testQueueListenerAdapter" /> <rabbit:listener queues="spittle.alert.queue.2" ref="testQueueListenerAdapter" /> <rabbit:listener queues="spittle.alert.queue.2" ref="testQueueListenerAdapter" /></rabbit:listener-container>
4,生产者(发送端)代码:
@Resource private RabbitTemplate rabbitTemplate; private String queueName; public void sendMessage(CommonMessage msg){ try { logger.error("发送信息开始"); System.out.println(rabbitTemplate.getConnectionFactory().getHost()); //发送信息 queueName交换机,就是上面的routingKey msg.getSource() 为 test_key rabbitTemplate.convertAndSend(queueName,msg.getSource(), msg); logger.error("发送信息结束"); } catch (Exception e) { e.printStackTrace(); } } public void setQueueName(String queueName) { this.queueName = queueName; }
5,消费端代码:TestHandler 类
public class TestHandler { @Override public void handlerTest(CommonMessage commonMessage) { System.out.println("DetailQueueConsumer: " + new String(message.getBody())); } }
本文转自:
https://blog.csdn.net/nandao158/article/details/81065892
https://www.cnblogs.com/LipeiNet/p/6079427.html
原文地址:https://www.cnblogs.com/nizuimeiabc1/p/9608763.html
时间: 2024-10-11 08:34:34