虽然ActiveMQ以被其他MQ所替代,但仍有学习的意义,本文采用邮件发送的例子展示ActiveMQ
1. 生产者1.1 引入maven依赖1.2 application.yml配置1.3 创建配置类ConfigQueue1.4 创建生产者类Producer1.5 启动类AppProducer2. 消费者2.1 引入maven依赖2.2 application.yml配置2.3 创建消费者类Consumer2.4 启动类AppConsumer3. 启动截图3.1 生产者截图3.2 消费者截图3.3 ActiveMQ后台截图3.4 邮件系统截图
1. 生产者
1.1 引入maven依赖
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.itmayiedu</groupId> <artifactId>springboot-p2p-roducer</artifactId> <version>0.0.1-SNAPSHOT</version> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.0.1.RELEASE</version> </parent> <!-- 管理依赖 --> <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>Finchley.M7</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <dependencies> <!-- SpringBoot整合Web组件 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- SpringBoot Activemq --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-activemq</artifactId> </dependency> <!-- https://mvnrepository.com/artifact/com.alibaba/fastjson --> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.49</version> </dependency>
</dependencies> <!-- 注意: 这里必须要添加, 否者各种依赖有问题 --> <repositories> <repository> <id>spring-milestones</id> <name>Spring Milestones</name> <url>https://repo.spring.io/libs-milestone</url> <snapshots> <enabled>false</enabled> </snapshots> </repository> </repositories>
</project>
1.2 application.yml配置
spring: activemq: ###MQ连接通讯地址 broker-url: tcp://127.0.0.1:61616 ###账号 user: admin ###密码 password: admin
###自定义队列 my_queue: springboot2.0-queue
server: port: 8089
1.3 创建配置类ConfigQueue
@Componentpublic class ConfigQueue {
@Value("${my_queue}") private String myQueue;
/** * 1.首先需要将队列注入springboot容器中 * * @return */ @Bean public Queue queue() { return new ActiveMQQueue(myQueue); }
}
1.4 创建生产者类Producer
@Componentpublic class Producer {
@Autowired private JmsMessagingTemplate jmsMessagingTemplate; @Autowired private Queue queue;
/** * 每隔5秒种时间向队列中发送消息 */ @Scheduled(fixedDelay = 5000) public void send() { String userName = System.currentTimeMillis() + ""; JSONObject jsonObject = new JSONObject(); jsonObject.put("userName", userName); jsonObject.put("email", "[email protected]"); String msg = jsonObject.toJSONString(); jmsMessagingTemplate.convertAndSend(queue, msg); System.out.println("采用点对点通讯模式,msg:" + msg); }}
1.5 启动类AppProducer
@SpringBootApplication@EnableSchedulingpublic class AppProducer {
public static void main(String[] args) { SpringApplication.run(AppProducer.class, args); }
}
2. 消费者
2.1 引入maven依赖
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.itmayiedu</groupId> <artifactId>springboot-p2p-consumer</artifactId> <version>0.0.1-SNAPSHOT</version>
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.0.1.RELEASE</version> </parent> <!-- 管理依赖 --> <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>Finchley.M7</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <dependencies> <!-- SpringBoot整合Web组件 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- SpringBoot Activemq --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-activemq</artifactId> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.49</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-mail</artifactId> </dependency>
<dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> </dependency>
</dependencies> <!-- 注意: 这里必须要添加, 否者各种依赖有问题 --> <repositories> <repository> <id>spring-milestones</id> <name>Spring Milestones</name> <url>https://repo.spring.io/libs-milestone</url> <snapshots> <enabled>false</enabled> </snapshots> </repository> </repositories></project>
2.2 application.yml配置
spring: activemq: ###MQ连接通讯地址 broker-url: tcp://127.0.0.1:61616 ###账号 user: admin ###密码 password: admin mail: ###163邮件服务 host: smtp.163.com ###发送邮件账号 username: [email protected] ###pop协议授权码 password: xxxxxx enable: true smtp: auth: true starttls: enable: true required: true
###自定义队列 my_queue: springboot2.0-queue
server: port: 8088
2.3 创建消费者类Consumer
import com.alibaba.fastjson.JSONObject;import org.apache.commons.lang3.StringUtils;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.jms.annotation.JmsListener;import org.springframework.mail.SimpleMailMessage;import org.springframework.mail.javamail.JavaMailSender;import org.springframework.stereotype.Component;
/** * 消费者 */@Componentpublic class Consumer {
@Autowired private JavaMailSender javaMailSender;
/** * 幂等性 * @param msg * @throws Exception */ @JmsListener(destination = "${my_queue}") public void receive(String msg) throws Exception { if (StringUtils.isEmpty(msg)) { return; } /** 1.解析json **/ JSONObject parseObject = JSONObject.parseObject(msg); String userName = parseObject.getString("userName"); String email = parseObject.getString("email"); sendSimpleMail(email, userName); System.out.println("采用点对点模式,消费者成功获取到生产者的消息,msg:" + msg);
}
public void sendSimpleMail(String eamil, String userName) throws Exception { SimpleMailMessage message = new SimpleMailMessage(); /** 邮件来自 自己发自己 **/ message.setFrom(eamil); /** 发送给谁 **/ message.setTo(eamil); /** 邮件标题 **/ message.setSubject("niceyoo 新邮件提醒"); /** 邮件内容 **/ message.setText("收到一批新的" + userName + "邮件!"); /** 发送邮件 **/ javaMailSender.send(message); System.out.println("邮件发送完成," + JSONObject.toJSONString(message)); }
}
2.4 启动类AppConsumer
@SpringBootApplicationpublic class AppConsumer {
public static void main(String[] args) { SpringApplication.run(AppConsumer.class, args); }
}
3. 启动截图
3.1 生产者截图
3.2 消费者截图
3.3 ActiveMQ后台截图
- Number Of Consumers 消费者 这个是消费者端的消费者数量 ;
- Number Of Pending Messages 等待消费的消息 这个是当前未出队列的数量。可以理解为总接收数-总出队列数;
- Messages Enqueued 进入队列的消息 进入队列的总数量,包括出队列的。 这个数量只增不减 ;
- Messages Dequeued 出了队列的消息 可以理解为是消费这消费掉的数量 ;
3.4 邮件系统截图
原文地址:https://www.cnblogs.com/niceyoo/p/11428381.html
时间: 2024-11-03 00:16:10