java B2B2C 仿淘宝电子商城系统-基于Rabbitmq实现延迟消息

1. 预备知识

1.1 消息传递

首先我们知道消费者是从队列中获取消息的,那么消息是如何到达队列的?

当我们发送一条消息时,首先会发给交换器(exchange),交换器根据规则(路由键:routing key)将会确定消息投递到那个队列(queue)。

需要JAVA Spring Cloud大型企业分布式微服务云构建的B2B2C电子商务平台源码 一零三八七七四六二六

带着这几个关键字:交换器、路由键和队列。

1.2 交换器类型

如之前所说,交换器根据规则决定消息的路由方向。因此,rabbitmq的消息投递分类便是从交换器开始的,不同的交换器实现不同的路由算法便实现了不同的消息投递方式。

direct交换器

direct -> routingKey -> queue,相当一种点对点的消息投递,如果路由键匹配,就直接投递到相应的队列

fanout交换器

fanout交换器相当于实现了一(交换器)对多(队列)的广播投递方式

topic交换器

提供一种模式匹配的投递方式,我们可以根据主题来决定消息投递到哪个队列。

1.3 消息延迟

本文想要实现一个可延迟发送的消息机制。消息如何延迟?

ttl (time to live) 消息存活时间

ttl是指一个消息的存活时间。

Per-Queue Message TTL in Queues

引用官方的一句话:

TTL can be set for a given queue by setting the x-message-ttl argument to queue.declare, or by setting the message-ttl policy. A message that has been in the queue for longer than the configured TTL is said to be dead.
我们可以通过x-message-ttl设置一个队列中消息的过期时间,消息一旦过期,将会变成死信(dead-letter),可以选择重新路由。

Per-Message TTL in Publishers

引用官方的一句话:

A TTL can be specified on a per-message basis, by setting the expiration field in the basic AMQP class when sending a basic.publish.

The value of the expiration field describes the TTL period in milliseconds. The same constraints as for x-message-ttl apply. Since the expiration field must be a string, the broker will (only) accept the string representation of the number.

我们可以通过设置每一条消息的属性expiration,指定单条消息有效期。消息一旦过期,将会变成死信(dead-letter),可以选择重新路由。

重新路由-死信交换机(Dead Letter Exchanges)
引用官方一句话:

Dead Letter Exchanges

Messages from a queue can be ‘dead-lettered’; that is, republished to
another exchange when any of the following events occur:

The message is rejected (basic.reject or basic.nack) with
requeue=false, The TTL for the message expires; or The queue length
limit is exceeded. Dead letter exchanges (DLXs) are normal exchanges.
They can be any of the usual types and are declared as usual.
To set the dead letter exchange for a queue, set the x-dead-letter-exchange argument to the name of the exchange.

我们可以通过设置死信交换器(x-dead-letter-exchange)来重新发送消息到另外一个队列,而这个队列将是最终的消费队列。

2. 具体实现

rabbitmq配置

属性文件-rabbitmq.properties

交换、路由等配置按照以上策略,其中,添加了prefetch参数来根据服务器能力控制消费数量。

连接用户名

mq.user =sms_user

密码

mq.password =123456

主机

mq.host =192.168.99.100

端口

mq.port =5672

默认virtual-host

mq.vhost =/

the default cache size for channels is 25

mq.channelCacheSize =50

发送消息路由

sms.route.key =sms_route_key

延迟消息队列

sms.delay.queue =sms_delay_queue

延迟消息交换器

sms.delay.exchange =sms_delay_exchange

消息的消费队列

sms.queue =sms_queue

消息交换器

sms.exchange =sms_exchange

每秒消费消息数量

sms.prefetch =30

配置rabbitmq.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit-1.7.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
<context:property-placeholder location="rabbitmq.properties"/>
<!--配置connection-factory,指定连接rabbit server参数 -->
<rabbit:connection-factory id="connectionFactory"
username="${mq.user}" password="${mq.password}"
host="${mq.host}" port="${mq.port}" virtual-host="${mq.vhost}" />

<!--定义rabbit template用于数据的接收和发送 -->
<rabbit:template id="amqpTemplate" connection-factory="connectionFactory" />

<!--通过指定下面的admin信息,当前producer中的exchange和queue会在rabbitmq服务器上自动生成 -->
<rabbit:admin connection-factory="connectionFactory" />

<!--定义queue -->
<rabbit:queue name="${sms.queue}" durable="true" auto-delete="false" exclusive="false" />
<!-- 创建延迟,有消息有效期的队列 -->
<rabbit:queue name="${sms.delay.queue}" durable="true" auto-delete="false">
<rabbit:queue-arguments>
<entry key="x-message-ttl">
<!-- 队列默认消息过期时间 -->
<value type="java.lang.Long">3600000</value>
</entry>
<!-- 消息过期根据重新路由 -->
<entry key="x-dead-letter-exchange" value="${sms.exchange}"/>
</rabbit:queue-arguments>
</rabbit:queue>

<!-- 定义direct exchange,sms_queue -->
<rabbit:direct-exchange name="${sms.exchange}" durable="true" auto-delete="false">
<rabbit:bindings>
<rabbit:binding queue="${sms.queue}" key="${sms.route.key}"/>
</rabbit:bindings>
</rabbit:direct-exchange>
<!-- 延迟消息配置,durable=true 持久化生效 -->
<rabbit:direct-exchange name="${sms.delay.exchange}" durable="true" auto-delete="false">
<rabbit:bindings>
<rabbit:binding queue="${sms.delay.queue}" key="${sms.route.key}"/>
</rabbit:bindings>
</rabbit:direct-exchange>

<!-- 消息接收者 -->
<bean id="messageReceiver" class="git.yampery.consumer.MsgConsumer"/>
<!-- queue litener 观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象-->
<rabbit:listener-container connection-factory="connectionFactory" prefetch="${sms.prefetch}">
<rabbit:listener queues="${sms.queue}" ref="messageReceiver"/>
</rabbit:listener-container>
</beans>

  

消息发布者

package git.yampery.producer;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

/**
* @decription MsgProducer
* <p>生产者</p>
* @author Yampery
* @date 2018/2/11 11:44
*/
@Component
public class MsgProducer {

@Resource
private AmqpTemplate amqpTemplate;
@Value("${sms.delay.exchange}") private String SMS_DELAY_EXCHANGE;
@Value("${sms.exchange}") private String SMS_EXCHANGE;
@Value("${sms.route.key}") private String SMS_ROUTE_KEY;

/**
* 延迟消息放入延迟队列中
* @param msg
* @param expiration
*/
public void publish(String msg, String expiration) {
amqpTemplate.convertAndSend(SMS_DELAY_EXCHANGE, SMS_ROUTE_KEY, msg, message -> {
// 设置消息属性-过期时间
message.getMessageProperties().setExpiration(expiration);
return message;
});
}

/**
* 非延迟消息放入待消费队列
* @param msg
*/
public void publish(String msg) {
amqpTemplate.convertAndSend(SMS_EXCHANGE, SMS_ROUTE_KEY, msg);
}
}

  

消费者

package git.yampery.consumer;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;

/**
* @decription MsgConsumer
* <p>消费者</p>
* @author Yampery
* @date 2018/2/11 11:43
*/
public class MsgConsumer implements MessageListener {
@Override
public void onMessage(Message message) {
String msg;
try {
// 线程每秒消费一次
Thread.sleep(1000);
msg = new String(message.getBody(), "utf-8");
System.out.println(msg);

} catch (Exception e) {
// 这里并没有对服务异常等失败的消息做处理,直接丢弃了
// 防止因业务异常导致消息失败造成unack阻塞再队列里
// 可以选择路由到另外一个专门处理消费失败的队列
return;
}
}
}

  

  

测试

package git.yampery.mq;

import com.alibaba.fastjson.JSONObject;
import git.yampery.producer.MsgProducer;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import javax.annotation.Resource;

/**
* @decription TestMq
* <p>测试</p>
* @author Yampery
* @date 2018/2/11 15:03
*/
@RunWith(SpringRunner.class)
@SpringBootTest
public class TestMq {

@Resource
private MsgProducer producer;

@Test
public void testMq() {
JSONObject jObj = new JSONObject();
jObj.put("msg", "这是一条短信");
producer.publish(jObj.toJSONString(), String.valueOf(10 * 1000));
}
}

java B2B2C Springcloud电子商务平台源码

原文地址:https://www.cnblogs.com/myspringcloud/p/10643384.html

时间: 2024-10-27 05:05:54

java B2B2C 仿淘宝电子商城系统-基于Rabbitmq实现延迟消息的相关文章

java B2B2C Springcloud仿淘宝电子商城系统

鸿鹄云商大型企业分布式互联网电子商务平台,推出PC+微信+APP+云服务的云商平台系统,其中包括B2B.B2C.C2C.O2O.新零售.直播电商等子平台. 分布式.微服务.云架构电子商务平台 java b2b2c o2o 技术解决方案 开发语言: java.j2ee 数据库:mysql JDK支持版本: JDK1.6.JDK1.7.JDK1.8版本 通用框架:maven+springmvc+mybatis+spring cloud+spring boot+redis 核心技术:分布式.云服务.微

仿淘宝,京东红包雨(基于Phaser框架)

本红包雨项目是基于HTML5的游戏框架Phaser写的,最终形成的是一个canvas,所以性能很好,但是必须要说的是这个框架比较大,压缩后也有700K左右,所以请慎用. 代码地址: https://github.com/AmosXu/red-packet-rain 1. 效果展示         图片依次是倒计时页面,抢红包页面,拆红包页,红包展示页,这些页面都是写在一个canvas里面的,无刷新的切换效果,性能超级棒 2.代码展示 贴上主要的代码js代码和注释 //初始化图片 let imgj

java B2B2C 源码 多级分销Springboot多租户电子商城系统-docker-feign配置(五)

简介 上一节我们讨论了怎么用feign声明式调用cloud的生产者,这节我们讨论一下feign配置,通过编写配置类,我们可以自定义feign的日志级别,日志扫描目录,可以通过feign调用服务在eureka上的调用信息 feign声明接口之后,在代码中通过@Resource或者@Autowired注入之后即可使用. @FeignClient标签的常用属性如下: name:指定FeignClient的名称,如果项目使用了Ribbon,name属性会作为微服务的名称,用于服务发现 url: url一

高仿淘宝和聚美优品商城详情页实现《IT蓝豹》

android-vertical-slide-view高仿淘宝和聚美优品商城详情页实现,在商品详情页,向上拖动时,可以加载下一页.使用ViewDragHelper,滑动比较流畅. scrollView滑动到底部的时候,再行向上拖动时,添加了一些阻力.本项目来源:https://github.com/xmuSistone/android-vertical-slide-view主要代码如下:首先先看一下布局:  <com.stone.verticalslide.DragLayout        a

自定义View之仿淘宝详情页

自定义View之仿淘宝详情页 转载请标明出处: http://blog.csdn.net/lisdye2/article/details/52353071 本文出自:[Alex_MaHao的博客] 项目中的源码已经共享到github,有需要者请移步[Alex_MaHao的github] 基本介绍 现在的一些购物类App例如淘宝,京东等,在物品详情页,都采用了类似分层的模式,即上拉加载详情的方式,节省了空间,使用户的体验更加的舒适.只要对于某个东西的介绍很多时,都可以采取这样的方式,第一个页面显示

Java中间件:淘宝网系统高性能利器

[TechTarget中国原创]淘宝网是亚太最大的网络零售商圈,其知名度毋庸置疑,吸引着越来越多的消费者从街头移步这里,成为其忠实粉丝.如此多的用户和交易量,也意味着海量的信息处理,其背后的IT架构的稳定性.可靠性也显得尤为重要.那么,他们是怎么办到的呢? 曾宪杰(花名花黎)是淘宝Java中间件团队成员,他认为大型网站就是要同时满足高访问量和高数据量的要求,核心是通过分布式系统解决数据的处理.存储及访问问题. 消息中间件Notify 早期,淘宝并没有Java中间件,其系统框架比较简单.下面我们就

一款基于jQuery仿淘宝红色分类导航

今天给大家分享一款基于jQuery仿淘宝红色分类导航.这款分类导航适用浏览器:IE8.360.FireFox.Chrome.Safari.Opera.傲游.搜狗.世界之窗.效果图如下: 在线预览   源码下载 实现的代码. html代码: <div id="nav"> <div class="area clearfix"> <div class="separate"> </div> <div

android版高仿淘宝客户端源码V2.3

android版高仿淘宝客户端源码V2.3,这个版本我已经更新到2.3了,源码也上传到源码天堂那里了,大家可以看一下吧,该应用实现了我们常用的购物功能了,也就是在手机上进行网购的流程的,如查看产品(浏览),下订单,进行付款等流程,该应用一一实现了,同时还可以远程读取图片功能,和实时监控网络状态等操作,大家如果有什么不同的意见可以留下,我们会定时来查看. 原文地址:http://www.cnblogs.com/androidioscom/p/3613035.html [1].[代码] [Java]

Listview嵌套Viewpager实现仿淘宝搜狐广告主页,并实现listview的下拉刷新

Android实现功能:Listview嵌套viewpager仿淘宝搜狐视频主页面,和listview的下拉刷新. 什么都不说了:直接上图说效果 listview嵌套viewpager实现仿淘宝的广告滑动主页面 源码连接:(http://download.csdn.net/detail/qq_30000411/9528977) APK下载连接:(http://download.csdn.net/detail/qq_30000411/9528973) 下面给出我源码的主要文件构成: MyListV