前言
Linux安装RabbitMQ:https://www.cnblogs.com/jxd283465/p/11975094.html
SpringBoot整合RabbitMQ:https://www.cnblogs.com/jxd283465/p/11975136.html
流程
代码
数据库表
CREATE TABLE `msg_log` ( `msg_id` varchar(255) NOT NULL DEFAULT ‘‘ COMMENT ‘消息唯一标识‘, `msg` text COMMENT ‘消息体, json格式化‘, `exchange` varchar(255) NOT NULL DEFAULT ‘‘ COMMENT ‘交换机‘, `routing_key` varchar(255) NOT NULL DEFAULT ‘‘ COMMENT ‘路由键‘, `status` int(11) NOT NULL DEFAULT ‘0‘ COMMENT ‘状态: 0投递中 1投递成功 2投递失败 3已消费‘, `try_count` int(11) NOT NULL DEFAULT ‘0‘ COMMENT ‘重试次数‘, `next_try_time` datetime DEFAULT NULL COMMENT ‘下一次重试时间‘, `create_time` datetime DEFAULT NULL COMMENT ‘创建时间‘, `update_time` datetime DEFAULT NULL COMMENT ‘更新时间‘, PRIMARY KEY (`msg_id`), UNIQUE KEY `unq_msg_id` (`msg_id`) USING BTREE ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT=‘消息投递日志‘;
邮件发送类
package cc.mrbird.febs.common.utils; /** * 邮件发送工具类。 * 以下邮件中的配置参数,请在实际环境中,根据需要采取合适的配置方式。 * 发送邮件依赖 com.sun.mail(1.6.1) 包、javax.mail(1.5.0-b01) 包。 * 如果使用 Idea 运行,请将这两个包(可以直接到Maven目录下面去找)添加到项目的 Libraries 里面(快捷键:Ctrl + Alt + Shift + S) * * @author Zebe */ public class SendEmailUtil { /** * 发件人别名(可以为空) */ private final static String fromAliasName = "***"; /** * 登录用户名 */ private String ACCOUNT; /** * 登录密码 */ private String PASSWORD; /** * 邮件服务器地址 */ //QQ企业邮箱:smtp.exmail.qq.com //网易企业邮箱:smtphz.qiye.163.com private String HOST; /** * 发信端口 */ //QQ企业邮箱:465 //网易企业邮箱:994 private String PORT; /** * 发信协议 */ private final static String PROTOCOL = "ssl"; /** * 收件人 */ private String to; /** * 收件人名称 */ private String toName; /** * 主题 */ private String subject; /** * 内容 */ private String content; /** * 附件列表(可以为空) */ private List<String> attachFileList; /** * 构造器 * * @param attachFileList 附件列表 */ public SendEmailUtil(MailTemplate mailTemplate, List<String> attachFileList) { this.to = mailTemplate.getTo(); this.toName = mailTemplate.getToName(); this.subject = mailTemplate.getSubject(); this.content = mailTemplate.getContent(); this.attachFileList = attachFileList; this.ACCOUNT = mailTemplate.getAccount(); this.PASSWORD = mailTemplate.getPassword(); switch (mailTemplate.getSendType()) { case "qq": this.HOST = "smtp.exmail.qq.com"; this.PORT = "465"; break; case "163": this.HOST = "smtp.ym.163.com"; this.PORT = "994"; break; } } /** * 认证信息 */ static class MyAuthenticator extends Authenticator { /** * 用户名 */ String username = null; /** * 密码 */ String password = null; /** * 构造器 * * @param username 用户名 * @param password 密码 */ public MyAuthenticator(String username, String password) { this.username = username; this.password = password; } @Override protected PasswordAuthentication getPasswordAuthentication() { return new PasswordAuthentication(username, password); } } /** * 发送邮件 */ public boolean send() { // 设置邮件属性 Properties prop = new Properties(); prop.setProperty("mail.transport.protocol", PROTOCOL); prop.setProperty("mail.smtp.host", HOST); prop.setProperty("mail.smtp.port", PORT); prop.setProperty("mail.smtp.auth", "true"); MailSSLSocketFactory sslSocketFactory = null; try { sslSocketFactory = new MailSSLSocketFactory(); sslSocketFactory.setTrustAllHosts(true); } catch (GeneralSecurityException e1) { e1.printStackTrace(); } if (sslSocketFactory == null) { System.err.println("开启 MailSSLSocketFactory 失败"); } else { prop.put("mail.smtp.ssl.enable", "true"); prop.put("mail.smtp.ssl.socketFactory", sslSocketFactory); // 创建邮件会话(注意,如果要在一个进程中切换多个邮箱账号发信,应该用 Session.getInstance) Session session = Session.getDefaultInstance(prop, new MyAuthenticator(ACCOUNT, PASSWORD)); // 开启调试模式(生产环境中请不要开启此项) session.setDebug(true); try { MimeMessage mimeMessage = new MimeMessage(session); // 设置发件人别名(如果未设置别名就默认为发件人邮箱) mimeMessage.setFrom(new InternetAddress(ACCOUNT, fromAliasName)); // 设置主题和收件人、发信时间等信息 mimeMessage.addRecipient(Message.RecipientType.TO, new InternetAddress(to, toName)); mimeMessage.setSubject(subject); mimeMessage.setSentDate(new Date()); // 如果有附件信息,则添加附件 if (!attachFileList.isEmpty()) { Multipart multipart = new MimeMultipart(); MimeBodyPart body = new MimeBodyPart(); body.setContent(content, "text/html; charset=UTF-8"); multipart.addBodyPart(body); // 添加所有附件(添加时判断文件是否存在) for (String filePath : attachFileList) { if (Files.exists(Paths.get(filePath))) { MimeBodyPart tempBodyPart = new MimeBodyPart(); tempBodyPart.attachFile(filePath); multipart.addBodyPart(tempBodyPart); } } mimeMessage.setContent(multipart); } else { Multipart multipart = new MimeMultipart(); MimeBodyPart body = new MimeBodyPart(); body.setContent(content, "text/html; charset=UTF-8"); multipart.addBodyPart(body); mimeMessage.setContent(multipart); //mimeMessage.setText(content); } // 开始发信 mimeMessage.saveChanges(); Transport.send(mimeMessage); return true; } catch (MessagingException | IOException e) { e.printStackTrace(); return false; } } return false; } }
邮件模板
@Data @NoArgsConstructor public class MailTemplate implements Serializable { private String msgId; /** * 收件人 */ private String to; /** * 收件人名称 */ private String toName; /** * 主题 */ private String subject; /** * 内容 */ private String content; /** * 附件列表 */ private List<String> attachFileList; /** * 邮箱账号 */ private String account; /** * 邮箱密码 */ private String password; /** * 邮箱类型 */ private String sendType; /** * 构造器 * * @param to 收件人 * @param subject 主题 * @param content 内容 */ public MailTemplate(String account, String password, String sendType, String to, String toName, String subject, String content) { this.account = account; this.password = password; this.sendType = sendType; this.to = to; this.toName = toName; this.subject = subject; this.content = content; } /** * 构造器 * * @param to 收件人 * @param subject 主题 * @param content 内容 * @param attachFileList 附件列表 */ public MailTemplate(String account, String password, String sendType, String to, String toName, String subject, String content, List<String> attachFileList) { this(account, password, sendType, to, toName, subject, content); this.attachFileList = attachFileList; } }
rabbit mq配置类
@Configuration @Slf4j public class RabbitConfig { // 发送邮件 public static final String MAIL_QUEUE_NAME = "mail.queue"; public static final String MAIL_EXCHANGE_NAME = "mail.exchange"; public static final String MAIL_ROUTING_KEY_NAME = "mail.routing.key"; public final static Integer MAIL_DELIVER_SUCCESS = 1; public final static Integer MAIL_DELIVER_FAIL = 2; public final static Integer MAIL_CONSUMED_SUCCESS = 3; public static boolean ENABLE_SCHEDULED = false; private final CachingConnectionFactory connectionFactory; @Autowired private IMsgLogService iMsgLogService; public RabbitConfig(CachingConnectionFactory connectionFactory) { this.connectionFactory = connectionFactory; } @Bean public RabbitTemplate rabbitTemplate() { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); rabbitTemplate.setMessageConverter(converter()); // 消息是否成功发送到Exchange rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> { if (ack) { log.info("消息成功发送到Exchange"); String msgId = correlationData.getId(); UpdateWrapper<MsgLog> updateWrapper = new UpdateWrapper<>(); updateWrapper.eq("msg_id",msgId); MsgLog msgLog = new MsgLog(); msgLog.setStatus(MAIL_DELIVER_SUCCESS); iMsgLogService.update(msgLog, updateWrapper); } else { log.info("消息发送到Exchange失败, {}, cause: {}", correlationData, cause); } }); // 触发setReturnCallback回调必须设置mandatory=true, 否则Exchange没有找到Queue就会丢弃掉消息, 而不会触发回调 rabbitTemplate.setMandatory(true); // 消息是否从Exchange路由到Queue, 注意: 这是一个失败回调, 只有消息从Exchange路由到Queue失败才会回调这个方法 rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> { log.info("消息从Exchange路由到Queue失败: exchange: {}, route: {}, replyCode: {}, replyText: {}, message: {}", exchange, routingKey, replyCode, replyText, message); }); return rabbitTemplate; } @Bean public Jackson2JsonMessageConverter converter() { return new Jackson2JsonMessageConverter(); } @Bean public Queue mailQueue() { return new Queue(MAIL_QUEUE_NAME, true); } @Bean public DirectExchange mailExchange() { return new DirectExchange(MAIL_EXCHANGE_NAME, true, false); } @Bean public Binding mailBinding() { return BindingBuilder.bind(mailQueue()).to(mailExchange()).with(MAIL_ROUTING_KEY_NAME); } }
生产者
@RestController @RequestMapping("mail") public class MailController { @Autowired private IMsgLogService iMsgLogService; @Autowired private RabbitTemplate rabbitTemplate; @PostMapping("/test") public void test(String account, String password, String sendType) { try { for (int j = 1; j <= 3; j++) { for (int i = 1; i <= 15; i++) { for (int num = 1; num <= 73; num++) { // 设置发信参数 final String toName = "我是" + num + "号"; final String to = "test" + num + "@forexgwg.com"; String subject = num + " 第" + num + "次发送测试邮件标题"; final String content = "<p style=‘color:red‘>这是邮件内容正文。</p></br>"; MailTemplate mailTemplate = new MailTemplate(); String msgId = UUID.randomUUID().toString(); mailTemplate.setMsgId(msgId); mailTemplate.setAccount(account); mailTemplate.setPassword(password); mailTemplate.setSendType(sendType); mailTemplate.setToName(toName); mailTemplate.setTo(to); mailTemplate.setSubject(subject); mailTemplate.setContent(content); mailTemplate.setAttachFileList(new ArrayList<>()); MsgLog msgLog = new MsgLog(msgId, JSON.toJSONString(mailTemplate), RabbitConfig.MAIL_EXCHANGE_NAME, RabbitConfig.MAIL_ROUTING_KEY_NAME, LocalDateTime.now()); iMsgLogService.save(msgLog); CorrelationData correlationData = new CorrelationData(msgId); Thread.sleep(1000); rabbitTemplate.convertAndSend(RabbitConfig.MAIL_EXCHANGE_NAME, RabbitConfig.MAIL_ROUTING_KEY_NAME, JSON.toJSONString(mailTemplate), correlationData);// 发送消息 } } } RabbitConfig.ENABLE_SCHEDULED = true; } catch (Exception e) { System.out.println("错误: " + e); } } }
消费者
@Component @Slf4j public class MailConsumer { @Autowired private IMsgLogService iMsgLogService; @RabbitListener(queues = RabbitConfig.MAIL_QUEUE_NAME) public void consume(Message message, Channel channel) throws IOException { String msg = new String(message.getBody()); msg = msg.replaceAll("\\\\", ""); msg = msg.substring(1, msg.length() - 1); MailTemplate mailTemplate = JSON.parseObject(msg, MailTemplate.class); log.info("收到消息: {}", mailTemplate.toString()); String msgId = mailTemplate.getMsgId(); QueryWrapper<MsgLog> queryWrapper = new QueryWrapper<>(); queryWrapper.eq("msg_id", msgId); UpdateWrapper<MsgLog> updateWrapper = new UpdateWrapper<>(); updateWrapper.eq("msg_id", msgId); MsgLog msgLog = iMsgLogService.getOne(queryWrapper); // 消费幂等性 if (null == msgLog || msgLog.getStatus().equals(RabbitConfig.MAIL_CONSUMED_SUCCESS)) { log.info("重复消费, msgId: {}", msgId); return; } msgLog.setStatus(3); msgLog.setUpdateTime(LocalDateTime.now()); iMsgLogService.update(msgLog, updateWrapper); MessageProperties properties = message.getMessageProperties(); long tag = properties.getDeliveryTag(); boolean success = new SendEmailUtil(mailTemplate, new ArrayList<>()).send(); if (success) { msgLog.setStatus(RabbitConfig.MAIL_CONSUMED_SUCCESS); msgLog.setUpdateTime(LocalDateTime.now()); iMsgLogService.update(msgLog, updateWrapper); log.info("消费成功!"); channel.basicAck(tag, false);// 消费确认 } else { channel.basicNack(tag, false, true); } } }
重新发送
@Component @Slf4j public class ResendMsg { @Autowired private IMsgLogService iMsgLogService; @Autowired private RabbitTemplate rabbitTemplate; // 最大投递次数 private static final int MAX_TRY_COUNT = 3; /** * 每30s拉取投递失败的消息, 重新投递 */ @Scheduled(cron = "0 0/1 * * * ?") public void resend() { log.info("开始执行定时任务(重新投递消息)"); UpdateWrapper<MsgLog> updateWrapper = new UpdateWrapper<>(); QueryWrapper<MsgLog> queryWrapper = new QueryWrapper<>(); queryWrapper.ne("status", RabbitConfig.MAIL_DELIVER_FAIL).ne("status", RabbitConfig.MAIL_CONSUMED_SUCCESS); List<MsgLog> msgLogs = iMsgLogService.list(queryWrapper); if (msgLogs.size() == 0){ RabbitConfig.ENABLE_SCHEDULED = false; } msgLogs.forEach(msgLog -> { String msgId = msgLog.getMsgId(); updateWrapper.eq("msg_id", msgId); if (msgLog.getTryCount() >= MAX_TRY_COUNT) { msgLog.setStatus(RabbitConfig.MAIL_DELIVER_FAIL); msgLog.setUpdateTime(LocalDateTime.now()); iMsgLogService.update(msgLog, updateWrapper); log.info("超过最大重试次数, 消息投递失败, msgId: {}", msgId); } else { msgLog.setTryCount(msgLog.getTryCount() + 1); msgLog.setUpdateTime(LocalDateTime.now()); msgLog.setNextTryTime(LocalDateTime.now().plusSeconds(60)); iMsgLogService.update(msgLog, updateWrapper);// 投递次数+1 CorrelationData correlationData = new CorrelationData(msgId); rabbitTemplate.convertAndSend(msgLog.getExchange(), msgLog.getRoutingKey(), msgLog.getMsg(), correlationData);// 重新投递 log.info("第 " + (msgLog.getTryCount() + 1) + " 次重新投递消息"); } }); log.info("定时任务执行结束(重新投递消息)"); } }
原文地址:https://www.cnblogs.com/jxd283465/p/11982076.html
时间: 2024-10-12 23:23:10