Spring Boot+STOMP解决消息乱序问题

当我们使用Spring Boot+websocket进行前后端进行通信时,我们需要注意:服务器可以随时向客户端发送消息。默认的情况下,不保证:服务器发送的消息与到达客户端的消息的顺序是一致的。可能先发送的消息后到,后发送的消息先到。(注意:两个消息发送的时间差不多,不能相差太多,不然就是顺序的了。一般一秒以下都会造成乱序)。

如果你的需求需要服务器向客户端发送的消息是按照顺序的。请你往下看,如果你的需求不要求服务器向客户端发送的消息是顺序的。就没有必要看了。

我们先说以下造成的原因:就是websocket发送消息采用的是多线程发送消息的,造成有的先到的消息在一个线程中在阻塞,而后到的消息使用其他的线程发送出去了。就造成了乱序。

然后我们再来看一个实例,自己去体验一把最好不过了。

一、我们先导入websocket包。

<!--这个是websocket通信的JAR文件-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

<dependency>
<groupId>org.webjars</groupId>
<artifactId>webjars-locator-core</artifactId>
</dependency>
<!--这个是客户端连接服务器需要的文件-->
<dependency>
<groupId>org.webjars</groupId>
<artifactId>sockjs-client</artifactId>
<version>1.0.2</version>
</dependency>
<dependency>
<groupId>org.webjars</groupId>
<artifactId>stomp-websocket</artifactId>
<version>2.3.3</version>
</dependency>
<!--页面布局的文件-->
<dependency>
<groupId>org.webjars</groupId>
<artifactId>bootstrap</artifactId>
<version>3.3.7</version>
</dependency>
<dependency>
<groupId>org.webjars</groupId>
<artifactId>jquery</artifactId>
<version>3.1.0</version>
</dependency>
这个Jar包会将所有的SpringBoot的JAR文件都导入进去。

二、添加websocket的配置文件,进行websocket的通信。我们这里使用sockJS进行通信,他是websocket的一个改进,当浏览器不支持websocket的通信时,就会使用SockJS进行连接。

@Configuration
@EnableWebSocketMessageBroker
public class WebsocketConfig implements WebSocketMessageBrokerConfigurer {

//设置心跳的时间间隔
private static long HEART_BEAT=50000;

@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
//设置连接websocket的地址, 注意使用 “/项目名/ws-start” 进行连接websocket。
//setAllowedOrigins("*")是设置所有请求都可以访问,即允许跨域的问题,或者自己设置允许访问的域名。
//withSockJS() 在不支持websocket的浏览器中,使用sockJs备选作为连接。
registry.addEndpoint("/ws-start").setAllowedOrigins("*").withSockJS();
}

@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
//设置简单的消息代理器,它使用Memory(内存)作为消息代理器,
//其中/user和/topic都是我们发送到前台的数据前缀。前端必须订阅以/user开始的消息(.subscribe()进行监听)。
//setHeartbeatValue设置后台向前台发送的心跳,
//注意:setHeartbeatValue这个不能单独设置,不然不起作用,要配合后面setTaskScheduler才可以生效。
//对应的解决方法的网址:https://stackoverflow.com/questions/39220647/spring-stomp-over-websockets-not-scheduling-heartbeats
registry.enableSimpleBroker("/user","/topic").setHeartbeatValue(new long[]{HEART_BEAT,HEART_BEAT}).setTaskScheduler(new DefaultManagedTaskScheduler());
//设置我们前端发送:websocket请求的前缀地址。即client.send("/ws-send")作为前缀,然后再加上对应的@MessageMapping后面的地址
registry.setApplicationDestinationPrefixes("/ws-send");
}
}
三、创建控制器,客户端将消息发送到哪里。

@Controller
public class StompSequenceTest {

/**
* 这个就是用于向客户端发送消息的类
*/
@Autowired
private SimpMessagingTemplate simpMessagingTemplate;

@MessageMapping("/sequence")
public void sequence(){
int index=0;
while(index<200){
//用于向客户端发送前200个数字,我们希望树循序接收到0到199这200个数字
simpMessagingTemplate.convertAndSend("/topic/sequence",index++);
try {
//我们睡眠1毫秒,1毫秒对于发送数据已经很大了。CPU执行都是纳米级的
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

}
然后添加HTML页面,CSS文件,JS文件,注意要放在resource下的static文件夹中。

然后在浏览器中打开:localhost:8088.

具体的项目在:https://gitee.com/chang_501/springboot_stomp_all/tree/master/springboot_stomp_sequence中下载,并使用idea打开,然后运行。在浏览器中执行localhost:8088。就会显示下面的图片:

当我们单击send按钮之后,出现下面这个图片:

这个是因为我们使用默认的发送器,默认的发送器是采用多线程发送的。

下面就是解决方案:就是修改默认的发送器的线程个数为1,就可以了。

解决方案:

我们只要在WebsocketConfig类中添加3行代码。

/**
* 这是用于配置服务器向浏览器发送的消息。
* clientOut就表示出出口。还有一个inBoundChannel用于处理浏览器向服务器发送的消息
* @param registration
*/
@Override
public void configureClientOutboundChannel(ChannelRegistration registration) {
registration.taskExecutor().corePoolSize(1).maxPoolSize(1);
}
最终的文件:

package com.kmust.springboot_stomp_sequence.config;

import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.ChannelRegistration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.scheduling.concurrent.DefaultManagedTaskScheduler;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;

/**
* 作者: 常伟
* 创建时间: 2018/7/5 17:21
* 邮箱: [email protected]
* 功能:
*/
@Configuration
@EnableWebSocketMessageBroker
public class WebsocketConfig implements WebSocketMessageBrokerConfigurer {

//设置心跳的时间间隔
private static long HEART_BEAT=50000;

@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
//设置连接websocket的地址, 注意使用 “/项目名/ws-start” 进行连接websocket。
//setAllowedOrigins("*")是设置所有请求都可以访问,即允许跨域的问题,或者自己设置允许访问的域名。
//withSockJS() 在不支持websocket的浏览器中,使用sockJs备选作为连接。
registry.addEndpoint("/ws-start").setAllowedOrigins("*").withSockJS();
}

/**
* 这是用于配置服务器向浏览器发送的消息。
* clientOut就表示出出口。还有一个inBoundChannel用于处理浏览器向服务器发送的消息
* @param registration
*/
@Override
public void configureClientOutboundChannel(ChannelRegistration registration) {
registration.taskExecutor().corePoolSize(1).maxPoolSize(1);
}

@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
//设置简单的消息代理器,它使用Memory(内存)作为消息代理器,
//其中/user和/topic都是我们发送到前台的数据前缀。前端必须订阅以/user开始的消息(.subscribe()进行监听)。
//setHeartbeatValue设置后台向前台发送的心跳,
//注意:setHeartbeatValue这个不能单独设置,不然不起作用,要配合后面setTaskScheduler才可以生效。
//对应的解决方法的网址:https://stackoverflow.com/questions/39220647/spring-stomp-over-websockets-not-scheduling-heartbeats
registry.enableSimpleBroker("/user","/topic").setHeartbeatValue(new long[]{HEART_BEAT,HEART_BEAT}).setTaskScheduler(new DefaultManagedTaskScheduler());
//设置我们前端发送:websocket请求的前缀地址。即client.send("/ws-send")作为前缀,然后再加上对应的@MessageMapping后面的地址
registry.setApplicationDestinationPrefixes("/ws-send");
}
}
然后再次运行。然后访问localhost:8088.就会发现发送的消息都是顺序的了,不管实验多少次。

原文地址:https://www.cnblogs.com/exmyth/p/11581588.html

时间: 2024-10-10 00:53:48

Spring Boot+STOMP解决消息乱序问题的相关文章

spring boot下WebSocket消息推送

WebSocket协议 WebSocket是一种在单个TCP连接上进行全双工通讯的协议.WebSocket通信协议于2011年被IETF定为标准RFC 6455,并由RFC7936补充规范.WebSocket API也被W3C定为标准. WebSocket使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据.在WebSocket API中,浏览器和服务器只需要完成一次握手,两者之间就直接可以创建持久性的连接,并进行双向数据传输 STOMP协议 STOMP是面向文本的消息传

Spring Boot (25) RabbitMQ消息队列

MQ全程(Message Queue)又名消息队列,是一种异步通讯的中间件.可以理解为邮局,发送者将消息投递到邮局,然后邮局帮我们发送给具体的接收者,具体发送过程和时间与我们无关,常见的MQ又kafka.activemq.zeromq.rabbitmq等等. RabbitMQ RabbitMQ是一个遵循AMQP协议,由面向高并发的erlang语言开发而成,用在实时的对可靠性要求比较高的消息传递上,支持多种语言客户端,支持延迟队列. 基础概念 Broker:消息队列的服务器实体 Exchange:

Spring Kafka和Spring Boot整合实现消息发送与消费简单案例

本文主要分享下Spring Boot和Spring Kafka如何配置整合,实现发送和接收来自Spring Kafka的消息. 先前我已经分享了Kafka的基本介绍与集群环境搭建方法.关于Kafka的介绍请阅读Apache Kafka简介与安装(一),关于Kafka安装请阅读Apache Kafka安装,关于Kafka集群环境搭建请阅读Apache Kafka集群环境搭建 .这里关于服务器环境搭建不在赘述. Spring Kafka整合Spring Boot创建生产者客户端案例 创建一个kafk

Spring Boot 2.0(七):Spring Boot 如何解决项目启动时初始化资源

在我们实际工作中,总会遇到这样需求,在项目启动的时候需要做一些初始化的操作,比如初始化线程池,提前加载好加密证书等.今天就给大家介绍一个 Spring Boot 神器,专门帮助大家解决项目启动初始化资源操作. 这个神器就是 CommandLineRunner,CommandLineRunner 接口的 Component 会在所有 Spring Beans都初始化之后,SpringApplication.run()之前执行,非常适合在应用程序启动之初进行一些数据初始化的工作. 接下来我们就运用案

Spring Boot RabbitMQ 延迟消息实现完整版

概述 曾经去网易面试的时候,面试官问了我一个问题,说 下完订单后,如果用户未支付,需要取消订单,可以怎么做 我当时的回答是,用定时任务扫描DB表即可.面试官不是很满意,提出: 用定时任务无法做到准实时通知,有没有其他办法? 我当时的回答是: 可以用队列,订单下完后,发送一个消息到队列里,并指定过期时间,时间一到,执行回调接口. 面试官听完后,就不再问了.其实我当时的思路是对的,只不过讲的不是很专业而已.专业说法是利用延迟消息. 其实用定时任务,确实有点问题,原本业务系统希望10分钟后,如果订单未

IOCP 乱序

         关于IOCP乱序的探讨                   2011-07-14 10:55:49 标签:职场 休闲 IOCP乱序 关于IOCP的探讨 本文主要探讨一下windows平台上的完成端口开发及其与之相关的几个重要的技术概念,这些概念都是与基于IOCP的开发密切相关的,对开发人员来讲,又不得不给予足够重视的几个概念: 1) 基于IOCP实现的服务吞吐量 2)IOCP模式下的线程切换 3)基于IOCP实现的消息的乱序问题. 一.IOCP简介     提到IOCP,大家都

为什么说 Java 程序员到了必须掌握 Spring Boot 的时候?

Spring Boot 2.0 的推出又激起了一阵学习 Spring Boot 热,就单从我个人的博客的访问量大幅增加就可以感受到大家对学习 Spring Boot 的热情,那么在这么多人热衷于学习 Spring Boot 之时,我自己也在思考: Spring Boot 诞生的背景是什么?Spring 企业又是基于什么样的考虑创建 Spring Boot? 传统企业使用 Spring Boot 会给我们带来什么样变革? 带着这些问题,我们一起来了解下 Spring Boot 到底是什么? Spr

为什么说 Java 程序员必须掌握 Spring Boot ?(转)

Spring Boot 2.0 的推出又激起了一阵学习 Spring Boot 热,那么, Spring Boot 诞生的背景是什么?Spring 企业又是基于什么样的考虑创建 Spring Boot? 传统企业使用 Spring Boot 会给我们带来什么样变革? 带着这些问题,我们一起来了解下 Spring Boot 到底是什么? Spring 历史 说起 Spring Boot 我们不得不先了解一下 Spring 这个企业,不仅因为 Spring Boot 来源于 Spring 大家族,而

【转】为什么说 Java 程序员必须掌握 Spring Boot ?

Spring Boot 2.0 的推出又激起了一阵学习 Spring Boot 热,那么, Spring Boot 诞生的背景是什么?Spring 企业又是基于什么样的考虑创建 Spring Boot? 传统企业使用 Spring Boot 会给我们带来什么样变革? 带着这些问题,我们一起来了解下 Spring Boot 到底是什么? Spring 历史 说起 Spring Boot 我们不得不先了解一下 Spring 这个企业,不仅因为 Spring Boot 来源于 Spring 大家族,而