fanout模式,生产者发送的消息到Exchange,Exchange同时往多个queue发送,多个消费者同时收到各自监听的queue消息
1、安装rabbitmq,pom.xml添加依赖,见之前博文有操作流程
2、添加配置文件,声明两个queue,一个fanoutExchange,然后将queue于Exchange进行绑定
import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @Author 冯战魁 * @Date 2018/1/12 下午2:50 */ @Configuration public class AmqpConfig { @Bean public ConnectionFactory connectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); connectionFactory.setAddresses("127.0.0.1:5672"); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); connectionFactory.setVirtualHost("/"); connectionFactory.setPublisherConfirms(true); //必须要设置 return connectionFactory; } @Bean(name="Amessage") public Queue AMessage() { return new Queue("fanout.A"); } @Bean(name="Bmessage") public Queue BMessage() { return new Queue("fanout.B"); } @Bean FanoutExchange fanoutExchange() { return new FanoutExchange("fanoutExchange");//配置广播路由器 } @Bean Binding bindingExchangeA(@Qualifier("Amessage") Queue AMessage, FanoutExchange fanoutExchange) { return BindingBuilder.bind(AMessage).to(fanoutExchange); } @Bean Binding bindingExchangeB(@Qualifier("Bmessage") Queue BMessage, FanoutExchange fanoutExchange) { return BindingBuilder.bind(BMessage).to(fanoutExchange); } }
3、编写生产者方法,发送四条消息
org.springframework.amqp.core.AmqpTemplate; org.springframework.beans.factory.annotation.; org.springframework.web.bind.annotation.; org.springframework.web.bind.annotation.; RabbitSenderController { AmqpTemplate ; () fanout(){ String[] tasks = {,,,}; (i=;i<tasks.;i++){ String content = tasks[i]; System..println(+ content); ..convertAndSend(,,content); } } }
4、编写消费者,分别监听两个queue
org.springframework.amqp.rabbit.annotation.; org.springframework.stereotype.; FanoutRabbit { (queues=) processA(String str1) { System..println(+str1); } (queues=) processB(String str) { System..println(+str); } }
5.执行生产者接口http://localhost:8080/fanout
消费者结果如图所示
可以看到,两个消费者接收到相同的生产者发送的消息
至此fanout模式结束
原文地址:http://blog.51cto.com/fengzhankui/2060345
时间: 2024-11-10 13:47:12