Java秒杀系统实战系列~整合RabbitMQ实现消息异步发送

摘要:

本篇博文是“Java秒杀系统实战系列文章”的第八篇,在这篇文章中我们将整合消息中间件RabbitMQ,包括添加依赖、加入配置信息以及自定义注入相关操作组件,比如RabbitTemplate等等,最终初步实现消息的发送和接收,并在下一篇章将其与邮件服务整合,实现“用户秒杀成功发送邮件通知消息”的功能!

内容:

对于消息中间件RabbitMQ,想必各位小伙伴没有用过、也该有听过,它是一款目前市面上应用相当广泛的消息中间件,可以实现消息异步通信、业务服务模块解耦、接口限流、消息分发等功能,在微服务、分布式系统架构中可以说是充当着一名了不起的角色!(详细的介绍,Debug在这里就不赘述了,各位小伙伴可以上官网看看其更多的介绍及其典型的应用场景)!

在本篇博文中,我们将使用RabbitMQ充当消息发送的组件,将它与后面篇章介绍的“邮件服务”结合实现“用户秒杀成功后异步发送邮件通知消息,告知用户秒杀已经成功!”,下面我们一起进入代码实战吧。

(1)要使用RabbitMQ,前提得在本地开发环境或者服务器安装RabbitMQ服务,如下图所示为Debug在本地安装RabbitMQ服务成功后访问其后端控制台应用的首页:

之后我们开始将其与SpringBoot进行整合。首先需要加入其依赖,其版本号跟SpringBoot的版本一致,版本号为1.5.7.RELEASE:

<!--rabbitmq-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
    <version>${spring-boot.version}</version>
</dependency>

然后需要在配置文件application.properties中加入RabbitMQ服务相关的配置,比如其服务所在的Host、端口Port等等:

#rabbitmq
spring.rabbitmq.virtual-host=/
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

spring.rabbitmq.listener.simple.concurrency=5
spring.rabbitmq.listener.simple.max-concurrency=15
spring.rabbitmq.listener.simple.prefetch=10

(2)紧接着,我们借助SpringBoot天然具有的一些特性,自动注入RabbitMQ一些组件的配置,包括其“单一实例消费者”配置、“多实例消费者”配置以及用于发送消息的操作组件实例“RabbitTemplate”的配置:

//通用化 Rabbitmq 配置
@Configuration
public class RabbitmqConfig {
  private final static Logger log = LoggerFactory.getLogger(RabbitmqConfig.class);

  @Autowired
  private Environment env;

  @Autowired
  private CachingConnectionFactory connectionFactory;

  @Autowired
  private SimpleRabbitListenerContainerFactoryConfigurer factoryConfigurer;

  //单一消费者
  @Bean(name = "singleListenerContainer")
  public SimpleRabbitListenerContainerFactory listenerContainer(){
      SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
      factory.setConnectionFactory(connectionFactory);
      factory.setMessageConverter(new Jackson2JsonMessageConverter());
      factory.setConcurrentConsumers(1);
      factory.setMaxConcurrentConsumers(1);
      factory.setPrefetchCount(1);
      factory.setTxSize(1);
      return factory;
  }

  //多个消费者
  @Bean(name = "multiListenerContainer")
  public SimpleRabbitListenerContainerFactory multiListenerContainer(){
      SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
      factoryConfigurer.configure(factory,connectionFactory);
      factory.setMessageConverter(new Jackson2JsonMessageConverter());
      //确认消费模式-NONE
      factory.setAcknowledgeMode(AcknowledgeMode.NONE);
      factory.setConcurrentConsumers(env.getProperty("spring.rabbitmq.listener.simple.concurrency",int.class));
      factory.setMaxConcurrentConsumers(env.getProperty("spring.rabbitmq.listener.simple.max-concurrency",int.class));
      factory.setPrefetchCount(env.getProperty("spring.rabbitmq.listener.simple.prefetch",int.class));
      return factory;
  }

  @Bean
  public RabbitTemplate rabbitTemplate(){
      connectionFactory.setPublisherConfirms(true);
      connectionFactory.setPublisherReturns(true);
      RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
      rabbitTemplate.setMandatory(true);
      rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
          @Override
          public void confirm(CorrelationData correlationData, boolean ack, String cause) {
              log.info("消息发送成功:correlationData({}),ack({}),cause({})",correlationData,ack,cause);
          }
      });
      rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
          @Override
          public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
              log.warn("消息丢失:exchange({}),route({}),replyCode({}),replyText({}),message:{}",exchange,routingKey,replyCode,replyText,message);
          }
      });
      return rabbitTemplate;
  }
}

在RabbitMQ的消息发送组件RabbitTemplate的配置中,我们还特意加入了“消息发送确认”、“消息丢失回调”的输出配置,即当消息正确进入到队列后,即代表消息发送成功;当消息找不到对应的队列(在某种程度上,其实也就是找不到交换机和路由)时,会输出消息丢失。

(3)完了之后,我们准备开始使用RabbitMQ实现消息的发送和接收。首先,我们需要在RabbitmqConfig配置类中创建队列、交换机、路由以及绑定等Bean组件,如下所示:

//构建异步发送邮箱通知的消息模型
    @Bean
    public Queue successEmailQueue(){
        return new Queue(env.getProperty("mq.kill.item.success.email.queue"),true);
    }

    @Bean
    public TopicExchange successEmailExchange(){
        return new TopicExchange(env.getProperty("mq.kill.item.success.email.exchange"),true,false);
    }

    @Bean
    public Binding successEmailBinding(){
        return BindingBuilder.bind(successEmailQueue()).to(successEmailExchange()).with(env.getProperty("mq.kill.item.success.email.routing.key"));
    }

其中,环境变量实例env读取的那些属性我们是配置在application.properties文件中的,如下所示:

mq.env=test

#秒杀成功异步发送邮件的消息模型
mq.kill.item.success.email.queue=${mq.env}.kill.item.success.email.queue
mq.kill.item.success.email.exchange=${mq.env}.kill.item.success.email.exchange
mq.kill.item.success.email.routing.key=${mq.env}.kill.item.success.email.routing.key

紧接着,我们需要在通用的消息发送服务类 RabbitSenderService 中写一段发送消息的方法,该方法用于接收“订单编号”参数,然后在数据库中查询其对应的详细订单记录,将该记录充当“消息”并发送至RabbitMQ的队列中,等待被监听消费:

/**
* RabbitMQ通用的消息发送服务
* @Author:debug (SteadyJack)
* @Date: 2019/6/21 21:47
**/
@Service
public class RabbitSenderService {

  public static final Logger log= LoggerFactory.getLogger(RabbitSenderService.class);

  @Autowired
  private RabbitTemplate rabbitTemplate;

  @Autowired
  private Environment env;

  @Autowired
  private ItemKillSuccessMapper itemKillSuccessMapper;

  //秒杀成功异步发送邮件通知消息
  public void sendKillSuccessEmailMsg(String orderNo){
      log.info("秒杀成功异步发送邮件通知消息-准备发送消息:{}",orderNo);

      try {
          if (StringUtils.isNotBlank(orderNo)){
              KillSuccessUserInfo info=itemKillSuccessMapper.selectByCode(orderNo);
              if (info!=null){
                  //TODO:rabbitmq发送消息的逻辑
                  rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
                  rabbitTemplate.setExchange(env.getProperty("mq.kill.item.success.email.exchange"));
                  rabbitTemplate.setRoutingKey(env.getProperty("mq.kill.item.success.email.routing.key"));

                  //TODO:将info充当消息发送至队列
                  rabbitTemplate.convertAndSend(info, new MessagePostProcessor() {
                      @Override
                      public Message postProcessMessage(Message message) throws AmqpException {
                          MessageProperties messageProperties=message.getMessageProperties();
                          messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
                          messageProperties.setHeader(AbstractJavaTypeMapper.DEFAULT_CONTENT_CLASSID_FIELD_NAME,KillSuccessUserInfo.class);
                          return message;
                      }
                  });
              }
          }
      }catch (Exception e){
          log.error("秒杀成功异步发送邮件通知消息-发生异常,消息为:{}",orderNo,e.fillInStackTrace());
      }
  }
}

(4)最后,是在通用的消息接收服务类RabbitReceiverService中实现消息的接收,其完整的源代码如下所示:

/**
 * RabbitMQ通用的消息接收服务
 * @Author:debug (SteadyJack)
 * @Date: 2019/6/21 21:47
 **/
@Service
public class RabbitReceiverService {

  public static final Logger log= LoggerFactory.getLogger(RabbitReceiverService.class);

  @Autowired
  private MailService mailService;

  @Autowired
  private Environment env;

  @Autowired
  private ItemKillSuccessMapper itemKillSuccessMapper;

  //秒杀异步邮件通知-接收消息
  @RabbitListener(queues = {"${mq.kill.item.success.email.queue}"},containerFactory = "singleListenerContainer")
  public void consumeEmailMsg(KillSuccessUserInfo info){
      try {
          log.info("秒杀异步邮件通知-接收消息:{}",info);
      //到时候这里将整合邮件服务发送邮件通知消息的逻辑

      }catch (Exception e){
          log.error("秒杀异步邮件通知-接收消息-发生异常:",e.fillInStackTrace());
      }
  }
}

至此,关于SpringBoot整合消息中间件RabbitMQ的代码实战,本篇文章就介绍到这里了。

最后一点,我们需要进行测试,即用户在界面发起“抢购”的请求操作之后,如果能秒杀成功,则RabbitMQ会发送、接收一条消息,如下所示:

好了,关于RabbitMQ的使用,本文到此就暂且告一段落了,在下一篇文章中我们将把它与邮件服务进行整合,实现“用户秒杀成功后异步发送邮件通知消息给到用户邮箱”的功能!除此之外,我们还将在后面的篇章介绍“如何使用RabbitMQ的死信队列,处理用户下单成功后却超时未支付的订单~在那里我们将采取失效的操作”。

补充:

1、目前,这一秒杀系统的整体构建与代码实战已经全部完成了,完整的源代码数据库地址可以来这里下载:gitee.com/steadyjack/… 记得Fork跟Star啊!!

2、最后,不要忘记了关注一下Debug的技术微信公众号:

原文地址:https://www.cnblogs.com/SteadyJack/p/11248698.html

时间: 2024-07-29 08:28:43

Java秒杀系统实战系列~整合RabbitMQ实现消息异步发送的相关文章

Java秒杀系统实战系列~整合Shiro实现用户登录认证

摘要: 本篇博文是"Java秒杀系统实战系列文章"的第五篇,在本篇博文中,我们将整合权限认证-授权框架Shiro,实现用户的登陆认证功能,主要用于:要求用户在抢购商品或者秒杀商品时,限制用户进行登陆!并对于特定的url(比如抢购请求对应的url)进行过滤(即当用户访问指定的url时,需要要求用户进行登陆). 内容: 对于Shiro,相信各位小伙伴应该听说过,甚至应该也使用过!简单而言,它是一个很好用的用户身份认证.权限授权框架,可以实现用户登录认证,权限.资源授权.会话管理等功能,在本

Java秒杀系统实战系列~商品秒杀代码实战

摘要: 本篇博文是"Java秒杀系统实战系列文章"的第六篇,本篇博文我们将进入整个秒杀系统核心功能模块的代码开发,即"商品秒杀"功能模块的代码实战. 内容: "商品秒杀"功能模块是建立在"商品详情"功能模块的基础之上,对于这一功能模块而言,其主要的核心流程在于:前端发起抢购请求,该请求将携带着一些请求数据:待秒杀Id跟当前用户Id等数据:后端接口在接收到请求之后,将执行一系列的判断与秒杀处理逻辑,最终将处理结果返回给到前端.

Java秒杀系统实战系列~分布式唯一ID生成订单编号

摘要: 本篇博文是"Java秒杀系统实战系列文章"的第七篇,在本博文中我们将重点介绍 "在高并发,如秒杀的业务场景下如何生成全局唯一.趋势递增的订单编号",我们将介绍两种方法,一种是传统的采用随机数生成的方式,另外一种是采用当前比较流行的"分布式唯一ID生成算法-雪花算法"来实现. 内容: 在上一篇博文,我们完成了商品秒杀业务逻辑的代码实战,在该代码中,我们还实现了"当用户秒杀成功后,需要在数据库表中为其生成一笔秒杀成功的订单记录&qu

springboot项目整合rabbitMq涉及消息的发送确认,消息的消费确认机制

1.引入maven依赖 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>2.在application.yml的配置: spring: rabbitmq: host: 106.52.82.241 port: 5672 username: yang

Java秒杀系统方案优化---高性能高并发实战

Java秒杀系统方案优化---高性能高并发实战网盘地址:https://pan.baidu.com/s/1htNv2zq 密码: ssyt备用地址(腾讯微云):https://share.weiyun.com/889808c023b6e9d9f504399a5b07276f 密码:1WaUHB 亮眼的!高并发秒杀系统核心技术 课程以"秒杀"场景为例,但技术都是通用的,举一反三,方得始终应对大并发:多层次多粒度缓存+消息队列异步+服务器分布式部署 专业的压测工具:有依有据,鉴证系统的优化

JavaEE秒杀系统实战应用(百度网盘)

课程目录 第01课00.秒杀系统项目需求分析 00:09:50 第02课01.秒杀系统项目说明 00:05:52 第03课02.秒杀系统-基础回顾Mybatis讲解1 00:36:16 第04课03.秒杀系统-基础回顾Mybatis讲解200:13:33 第05课04.秒杀系统-基础回顾Mybatis讲解300:31:40 第06课05.秒杀系统-基础回顾SpringMVC讲解00:21:19 第07课06.秒杀系统-数据库设计100:26:42 第08课07.秒杀系统-数据库设计200:21:

Java秒杀系统方案优化视频教程 Java高性能高并发实战

第1章 课程介绍及项目框架搭建 技术选型思路分析,基于Maven的Spring-Boot工程框架的搭建,集成Thymeleaf,集成Mybatis,安装Redis,集成Redis等等. 第2章 实现用户登录以及分布式session功能 实现用户登录功能,实现密码两次MD5入库以及分布式Session.一则熟悉SpringBoot开发模式和该选型下的技术衔接,二则为后面的秒杀功能提供必备的用户信息. 第3章 秒杀功能开发及管理后台 实现秒杀的交互设计和秒杀功能的前端和后台实现,随着后期优化策略,秒

Java秒杀系统方案优化 高性能高并发实战 视频教程

第1章 课程介绍及项目框架搭建 1-1 Java高并发商城秒杀优化导学 1-2 项目环境搭建(Eclipse) 1-3 项目环境搭建(IDEA) 1-4 集成mybatis 1-5 安装redis 1-6 集成redis上 1-7 集成redis中 1-8 集成redis下第2章 实现用户登录以及分布式session功能 2-1 两次md5 2-2 登录功能实现上 2-3 登录功能实现下 2-4 jsr303参数校验 2-5 异常处理 2-6 分布式session上 2-7 分布式session

项目四:Java秒杀系统方案优化-高性能高并发实战

技术栈 前端:Thymeleaf.Bootstrap.JQuery 后端:SpringBoot.JSR303.MyBatis 中间件:RabbitMQ.Redis.Druid 功能模块 分布式会话,商品列表页,商品详情页,订单详情页,系统压测,缓存优化,消息队列,接口安全. 一.项目框架搭建 1.Spring Boot环境搭建 2.集成Thymeleaf,Result结果封装 3.集成Mybatis+Druid 4.集成Jedis+Redis安装+通用缓存Key封装 二.实现登录功能 1.数据库