1 整合RabbitMQ
1.1 RabbitMQ的相关概念
- 组成部分
- 队列(Queue)
声明队列
```java
@Bean
public Queue addUserQueue() {
return new Queue("demo-user-add");
}
```
- 交换机(Exchange)
用于转发消息,但是它不会做存储 ,如果没有 Queue bind 到 Exchange 的话,它会直接丢弃掉 Producer 发送过来的消息。
这里有一个比较重要的概念:路由键 。消息到交换机的时候,交互机会转发到对应的队列中,那么究竟转发到哪个队列,就要根据该路由键。
交换机的功能主要是接收消息并且转发到绑定的队列,交换机不存储消息,在启用ack模式后,交换机找不到队列会返回错误。交换机有四种类型:Direct, topic, Headers and Fanout
Direct
direct 类型的行为是"先匹配, 再投送". 即在绑定时设定一个 routing_key, 消息的routing_key 匹配时, 才会被交换器投送到绑定的队列中去。Direct Exchange是RabbitMQ默认的交换机模式,也是最简单的模式,根据key全文匹配去寻找队列。
- topic
类似Director模式,但是更加灵活,可以根据通配符去寻找对应的exchange。
- 匹配一个字符
#
匹配多个字符Headers
设置header attribute参数类型的交换机
Fanout
转发消息到所有绑定队列;
消息广播的模式,不管路由键或者是路由模式,会把消息发给绑定给它的全部队列,如果配置了routing_key会被忽略。
- 匹配一个字符
代码声明一个exchange:
java @Bean public TopicExchange demoTestTopicExchange() { return new TopicExchange("demoTestTopic"); }
- 绑定(Binding)
通过routing key声明exchange与queue之间关系的。从而确定了我这个msg发到哪个exchange上面
。然后与exchange再路由到对应的queue上面,从而发给了对应的消费者
@Bean public Binding addUserBinding() { return BindingBuilder.bind(addUserQueue()).to(addUserTopicExchange()).with("cn.com.user.add"); }
1.2 发送、接收消息
1.2.1 发送消息
```java
@Slf4j
@Component
public class UserMQSender {
@Autowired
private AmqpTemplate amqpTemplate;
/**
- 发送消息
- @param exchangeNanme 队列名称
- @param routingKey 路由key
- @param msg 具体消息内容
- @throws Exception
*/
public void sendUserMQ(String exchangeNanme, String routingKey, String msg) throws Exception {
log.info("向交换机:{},匹配规则:{}, 发送消息:{}", exchangeNanme, routingKey, msg);
this.amqpTemplate.convertAndSend(exchangeNanme, routingKey, msg);
}
}
##### 1.2.2 接收消息 * 通过注解的方式主动监听接收 > 声明我要监听哪个queue即可
java
@RabbitListener(queues = "demo-user-add")
public void getMsg(String msg) throws Exception{
log.info("获取消息{}", msg);
User user = (User) JSONObject.toBean(JSONObject.fromObject(msg), User.class);
userService.addUser(user);
}
* 被动接收
java
String data = (String) this.amqpTemplate.receiveAndConvert(queueName);
#### 1.3 模拟高并发取值
java
@Test
public void testGetMsg() throws Exception{
ExecutorService service = Executors.newCachedThreadPool(); //创建一个线程池
final CountDownLatch beginCountDownLatch = new CountDownLatch(1);
final CountDownLatch countDownLatch = new CountDownLatch(100);
for (int i = 0; i < 100; i++) {
Runnable runnable = new Runnable() {
int index = 1;
@Override
public void run() {
try {
/**
*如果调用对象上的await()方法,那么调用者就会一直阻塞在这里,直到别人通过cutDown方法,将计数减到0,才可以继续执行。
* 这里先调用beginCountDownLatch的await方法,等到循环结束后,内存中就有100个线程等待去运行。
* 所以等到beginCountDownLatch调用countDown的时候,100个线程就开始执行
*/
beginCountDownLatch.await();
log.info("------->index:{}", index);
String data = (String) amqpTemplate.receiveAndConvert("demo-test");
log.info("==============>消息n内容:{}", data);
countDownLatch.countDown();
index++;
} catch (Exception e) {
e.printStackTrace();
}
}
};
service.execute(runnable);
}
//释放主线程,之前声明的100个线程就开始执行
beginCountDownLatch.countDown();
//
countDownLatch.await();
}
```
原文地址:https://www.cnblogs.com/KevinStark/p/10193722.html