Spring Boot 二三事:WEB 应用消息推送的那点事

阅读对象:本文适合SpringBoot 初学者及对SpringBoot感兴趣的童鞋阅读。

背景介绍:在企业级 WEB 应用开发中,为了更好的用户体验&提升响应速度,往往会将一些耗时费力的请求 (Excel导入or导出,复杂计算, etc.) 进行异步化处理。 由此带来的一个重要的问题是如何通知用户任务状态,常见的方法大致分为2类4种:

  • HTTP Polling client pull
  • HTTP Long-Polling client pull
  • Server-Sent Events (SSE) server push
  • WebSocket server push

1. Polling 短轮询

是一种非常简单的实现方式。就是client通过定时任务不断得重复请求服务器,从而获取新消息,而server按时间顺序提供自上次请求以后发生的单个或多个消息。

短轮询的优点非常明显,就是实现简单。当两个方向上的数据都非常少,并且请求间隔不是非常密集时,这种方法就会非常有效。例如,新闻评论信息可以每半分钟更新一次,这对用户来说是可以的。

它得缺点也是非常明显,一旦我们对数据实时性要求非常高时,为了保证消息的及时送达,请求间隔必须缩短,在这种情况下,会加剧服务器资源的浪费,降低服务的可用性。另一个缺点就是在消息的数量较少时,将会有大量的 request做无用功,进而也导致服务器资源的浪费。

2. Long-Polling 长轮询

长轮询的官方定义是:

The server attempts to "hold open" (notimmediately reply to) each HTTP request, responding only when there are events to deliver. In this way, there is always a pending request to which the server can reply for the purpose of delivering events as they occur, thereby minimizing the latency in message delivery.

如果与Polling的方式相比,会发现Long-Polling的优点是通过hold open HTTP request 从而减少了无用的请求。

大致步骤为:

  1. client向server请求并等待响应。
  2. 服务端将请求阻塞,并不断检查是否有新消息。如果在这个期间有新消息产生时就立即返回。否则一直等待至请求超时
  3. 当client 获取到新消息请求超时,进行消息处理并发起下一次请求。

Long-Polling的缺点之一也是服务器资源的浪费,因为它和Polling的一样都属于被动获取,都需要不断的向服务器请求。在并发很高的情况下,对服务器性能是个严峻的考验。

Note:因为以上2两种方式的实现都比较简单,所以我们这里就不做代码演示了。接下来我们重点介绍一下Server-Sent EventsWebSocket

3. Demo概要

下面我们将通过一个下载文件的案例进行演示SSEWebSocket的消息推送,在这之前,我们先简单说一下我们项目的结构,整个项目基于SpringBoot 构建。

首先我们定义一个供前端访问的APIDownloadController

@RestController
public class DownloadController {
    private static final Logger log = getLogger(DownloadController.class);
    @Autowired
    private MockDownloadComponent downloadComponent;  

    @GetMapping("/api/download/{type}")
    public String download(@PathVariable String type, HttpServletRequest request) {  // (A)
        HttpSession session = request.getSession();
        String sessionid = session.getId();
        log.info("sessionid=[{}]", sessionid);
        downloadComponent.mockDownload(type, sessionid);  // (B)
        return "success"; // (C)
    }
}
  • (A) type参数用于区分使用哪种推送方式,这里为sse,ws,stomp这三种类型。
  • (B) MockDownloadComponent用于异步模拟下载文件的过程。
  • (C) 因为下载过程为异步化,所以该方法不会被阻塞并立即向客户端返回success,用于表明下载开始

DownloadController中我们调用MockDownloadComponentmockDownload()的方法进行模拟真正的下载逻辑。

@Component
public class MockDownloadComponent {
    private static final Logger log = LoggerFactory.getLogger(DownloadController.class);

    @Async // (A)
    public void mockDownload(String type, String sessionid) {
        for (int i = 0; i < 100; i++) {
            try {
                TimeUnit.MILLISECONDS.sleep(100); // (B)

                int percent = i + 1;
                String content = String.format("{\"username\":\"%s\",\"percent\":%d}", sessionid, percent); // (C)
                log.info("username={}‘s file has been finished [{}]% ", sessionid, percent);

                switch (type) { // (D)
                    case "sse":
                        SseNotificationController.usesSsePush(sessionid, content);
                        break;
                    case "ws":
                        WebSocketNotificationHandler.usesWSPush(sessionid, content);
                        break;
                    case "stomp":
                        this.usesStompPush(sessionid, content);
                        break;
                    default:
                        throw new UnsupportedOperationException("");
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}
  • (A) 我们使用@Async让使其异步化
  • (B) 模拟下载耗时。
  • (C) 消息的格式为{"username":"abc","percent":1}
  • (D) 根据不同的type选择消息推送方式。

4. Server-Sent Events

SSE 是W3C定义的一组API规范,这使服务器能够通过HTTP将数据推送到Web页面,它具有如下特点:

  • 单向半双工:只能由server向client推送消息
  • 基于http:数据被编码为“text/event-stream”内容并使用HTTP流机制进行传输
  • 数据格式无限制:消息只是遵循规范定义的一组key-value格式&UTF-8编码的文本数据流,我们可以在消息payload中可以使用JSON或者XML或自定义数据格式。
  • http 长连接: 消息的实际传递是通过一个长期存在的HTTP连接完成的,消耗资源更少
  • 简单易用的API

浏览器支持情况:

Note:IE 浏览器可通过第三方JS库进行支持SSE

4.1 SpringBoot 中使用SSE

从Spring 4.2开始支持SSE规范,我们只需要在Controller中返回SseEmitter对象即可。

Note:Spring 5 中提供了Spring Webflux 可以更加方便的使用SSE,但是为更贴近我们的实际项目,所以文本仅演示使用Spring MVC SSE。

我们在服务器端定义一个SseNotificationController用于和客户端处理和保存SSE连接. 其endpoint/api/sse-notification

@RestController
public class SseNotificationController {

    public static final Map<String, SseEmitter> SSE_HOLDER = new ConcurrentHashMap<>(); // (A)

    @GetMapping("/api/sse-notification")
    public SseEmitter files(HttpServletRequest request) {
        long millis = TimeUnit.SECONDS.toMillis(60);
        SseEmitter sseEmitter = new SseEmitter(millis); // (B)

        HttpSession session = request.getSession();
        String sessionid = session.getId();

        SSE_HOLDER.put(sessionid, sseEmitter);
        return sseEmitter;
    }

    /**
     * 通过sessionId获取对应的客户端进行推送消息
     */
    public static void usesSsePush(String sessionid, String content) {  // (C)
        SseEmitter emitter = SseNotificationController.SSE_HOLDER.get(sessionid);
        if (Objects.nonNull(emitter)) {
            try {
                emitter.send(content);
            } catch (IOException | IllegalStateException e) {
                log.warn("sse send error", e);
                SseNotificationController.SSE_HOLDER.remove(sessionid);
            }
        }
    }

}
  • (A) SSE_HOLDER保存了所有客户端的SseEmitter,用于后续通知对应客户端。
  • (B) 根据指定超时时间创建一个SseEmitter对象, 它是SpringMVC提供用于操作SSE的类。
  • (C) usesSsePush()提供根据sessionId向对应客户端发送消息。发送只需要调用SseEmittersend()方法即可。

至此服务端已经完成,我们使用Vue编写客户端Download.html进行测试。核心代码如下:

     usesSSENotification: function () {
                var tt = this;
                var url = "/api/sse-notification";
                var sseClient = new EventSource(url);  // (A)
                sseClient.onopen = function () {...}; // (B)

                sseClient.onmessage = function (msg) {   // (C)
                    var jsonStr = msg.data;
                    console.log(‘message‘, jsonStr);
                    var obj = JSON.parse(jsonStr);
                    var percent = obj.percent;
                    tt.sseMsg += ‘SSE 通知您:已下载完成‘ + percent + "%\r\n";
                    if (percent === 100) {
                        sseClient.close();  // (D)
                    }
                };
                sseClient.onerror = function () {
                    console.log("EventSource failed.");
                };
            }
  • (A) 开启一个新的 SSE connection 并访问 /api/sse-notification
  • (B) 当连接成功时的callback。
  • (C) 当有新消息时的callback。
  • (D) 当下载进度为100%时,关闭连接。

效果演示

4. WebSocket

WebSocket 类似于标准的TCP连接,它是IETF(RFC 6455)定义的通过TCP进行实时全双工通信一种通信方式,这意味这它的功能更强大,常用于如股票报价器,聊天应用。

相比于SSE,它不仅可以双向通信,而且甚至还能处理音频/视频等二进制内容。

Note:使用WebSocket,在高并发情况下,服务器将拥有许多长连接。这对网络代理层组件及WebSocket服务器都是一个不小的性能挑战,我们需要考虑其负载均衡方案。同时连接安全等问题也不容忽视。

4.1 Spring WebSocket (低级API)

Spring 4提供了一个新的Spring-WebSocket模块,用于适应各种WebSocket引擎,它与Java WebSocket API标准(JSR-356)兼容,并且提供了额外的增强功能。

Note: 对于应用程序来说,直接使用WebSocket API会大大增加开发难度,所以Spring为我们提供了 STOMP over WebSocket 更高级别的API使用WebSocket。在本文中将会分别演示通过low level API及higher level API进行演示。

如果想在SpringBoot中使用WebSocket,首先需要引入spring-boot-starter-websocket依赖

     <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-websocket</artifactId>
        </dependency>

然后就可以配置相关信息,我们先通过low level API进行演示。

首先需要自定义一个WebSocketNotificationHandler用于处理WebSocket 的连接及消息处理。我们只需要实现WebSocketHandler或子类TextWebSocketHandler BinaryWebSocketHandler

public class WebSocketNotificationHandler extends TextWebSocketHandler {

    private static final Logger log = getLogger(WebSocketNotificationHandler.class);

    public static final Map<String, WebSocketSession> WS_HOLDER= new ConcurrentHashMap<>();  // (A)

    @Override
    public void afterConnectionEstablished(WebSocketSession session) throws Exception {   // (B)
        String httpSessionId = (String) session.getAttributes().get(HttpSessionHandshakeInterceptor.HTTP_SESSION_ID_ATTR_NAME);
        WS_HOLDER.put(httpSessionId, session);
    }

    @Override
    protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
        log.info("handleTextMessage={}", message.getPayload());
    }

    public static void usesWSPush(String sessionid, String content) {    // (C)
        WebSocketSession wssession = WebSocketNotificationHandler.WS_HOLDER.get(sessionid);
        if (Objects.nonNull(wssession)) {
            TextMessage textMessage = new TextMessage(content);
            try {
                wssession.sendMessage(textMessage);
            } catch (IOException | IllegalStateException e) {
                WebSocketNotificationHandler.SESSIONS.remove(sessionid);
            }
        }
    }
}
  • (A) WS_HOLDER用于保存客户端的WebSocket Session
  • (B) 重写afterConnectionEstablished()方法,当连接建立之后,按sessionIdWebSocket Session保存至WS_HOLDER,用于后续向client推送消息。
  • (C) 根据sessionId获取对应WebSocket Session,并调用WebSocket SessionsendMessage(textMessage)方法向client发送消息。

使用@EnableWebSocket开启WebSocket,并实现WebSocketConfigurer进行配置。

@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {

    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {

        WebSocketNotificationHandler notificationHandler = new WebSocketNotificationHandler(); 

        registry.addHandler(notificationHandler, "/ws-notification") // (A)
                .addInterceptors(new HttpSessionHandshakeInterceptor())  // (B)
                .withSockJS();  // (C)
    }
}
  • (A) 将我们自定义的WebSocketNotificationHandler注册至WebSocketHandlerRegistry.
  • (B) HttpSessionHandshakeInterceptor是一个内置的拦截器,用于传递HTTP会话属性到WebSocket会话。当然你也可以通过HandshakeInterceptor接口实现自己的拦截器。
  • (C) 开启SockJS的支持,SockJS的目标是让应用程序使用WebSocket API时,当发现浏览器不支持时,无需要更改任何代码,即可使用非WebSocket替代方案,尽可能的模拟WebSocket。关于SockJS的更多资料,可参考https://github.com/sockjs/sockjs-client

server端至此就基本大功告成,接下来我们来完善一下client端Download.html,其核心方法如下:

usesWSNotification: function () {
                var tt = this;
                var url = "http://localhost:8080/ws-notification";
                var sock = new SockJS(url);   // (A)
                sock.onopen = function () {
                    console.log(‘open‘);
                    sock.send(‘test‘);
                };

                sock.onmessage = function (msg) {   // (B)
                    var jsonStr = msg.data;

                    console.log(‘message‘, jsonStr);

                    var obj = JSON.parse(jsonStr);
                    var percent = obj.percent;
                    tt.wsMsg += ‘WS 通知您:已下载完成‘ + percent + "%\r\n";
                    if (percent === 100) {
                        sock.close();
                    }
                };

                sock.onclose = function () {
                    console.log(‘ws  close‘);
                };
            }
  • (A) 首先需要在项目中引入SockJS Client , 并根据指定URL创建一个SockJS对象。
  • (B) 当有新消息时的callback,我们可以在该方法中处理我们的消息。

效果演示

4.2 STOMP over WebSocket (高级API)

WebSocket虽然定义了两种类型的消息,文本和二进制,但是针对消息的内容没有定义,为了更方便的处理消息,我们希望Client和Server都需要就某种协议达成一致,以帮助处理消息。那么,有没有已经造好的轮子呢?答案肯定是有的。这就是STOMP。

" rel="nofollow">STOMP是一种简单的面向文本的消息传递协议,它其实是消息队列的一种协议, 和AMQP,JMS是平级的。 只不过由于它的简单性恰巧可以用于定义WS的消息体格式。虽然STOMP是面向文本的协议,但消息的内容也可以是二进制数据。同时STOMP 可已使用任何可靠的双向流网络协议,如TCP和WebSocket,目前很多服务端消息队列都已经支持了STOMP, 比如RabbitMQ, ActiveMQ等。

它结构是一种基于帧的协议,一帧由一个命令,一组可选的Header和一个可选的Body组成。

COMMAND
header1:value1
header2:value2

Body^@

客户端可以使用SENDSUBSCRIBE命令发送或订阅消息。 通过destination标记述消息应由谁来接收处理,形成了类似于MQ的发布订阅机制。

STOMP的优势也非常明显,即:

  1. 不需要创建自定义消息格式
  2. 我们可以使用现有的stomp.js客户端
  3. 可以实现消息路由及广播
  4. 可以使用第三方成熟的消息代理中间件,如RabbitMQ, ActiveMQ等

最重要的是,Spring STOMP 为我们提供了能够像Spring MVC一样的编程模型,减少了我们的学习成本。

下面将我们的DEMO稍作调整,使用Spring STOMP来实现消息推送,在本例中我们使用SimpleBroker模式,我们的应用将会内置一个STOMP Broker,将所有信息保存至内存中。

具体代码如下:

@Configuration
@EnableWebSocketMessageBroker  // (A)
public class WebSocketBrokerConfig implements WebSocketMessageBrokerConfigurer {

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {

        registry.addEndpoint("/ws-stomp-notification")
                .addInterceptors(httpSessionHandshakeInterceptor())   // (B)
                .setHandshakeHandler(httpSessionHandshakeHandler())  // (C)
                .withSockJS();
    }

    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        registry.setApplicationDestinationPrefixes("/app")  // (D)
                .enableSimpleBroker("/topic", "/queue");  // (E)
    }

    @Bean
    public HttpSessionHandshakeInterceptor httpSessionHandshakeInterceptor() {
        return new HttpSessionHandshakeInterceptor();
    }

    @Bean
    public HttpSessionHandshakeHandler httpSessionHandshakeHandler() {
        return new HttpSessionHandshakeHandler();
    }

}
  • (A) 使用@EnableWebSocketMessageBroker注解开启支持STOMP
  • (B) 创建一个拦截器,用于传递HTTP会话属性到WebSocket会话。
  • (C) 配置一个自定义的HttpSessionHandshakeHandler,其主要作用是按sessionId标记识别连接。
  • (D) 设置消息处理器路由前缀,当消息的destination/app开头时,将会把该消息路由到server端的对应的消息处理方法中。(在本例中无实际意义)
  • (E) 设置客户端订阅消息的路径前缀

HttpSessionHandshakeHandler代码如下:

public class HttpSessionHandshakeHandler extends DefaultHandshakeHandler {

    @Override
    protected Principal determineUser(ServerHttpRequest request, WebSocketHandler wsHandler, Map<String, Object> attributes) {
        String sessionId = (String) attributes.get(HttpSessionHandshakeInterceptor.HTTP_SESSION_ID_ATTR_NAME);
        return new HttpSessionPrincipal(sessionId);

    }
}

当我们需要向client发送消息时,只需要注入SimpMessagingTemplate对象即可,是不是感觉非常熟悉?! 没错,这种Template模式和我们日常使用的RestTemplate JDBCTemplate是一样的。
我们只需要调用SimpMessagingTemplateconvertAndSendToUser()方法即可向对应用户发送消息了。

  private void usesStompPush(String sessionid, String content) {
        String destination = "/queue/download-notification";
        messagingTemplate.convertAndSendToUser(sessionid, destination, content);
    }

在浏览器端,client可以使用stomp.js和sockjs-client进行如下连接:

usesStompNotification: function () {
                var tt = this;
                var url = "http://localhost:8080/ws-stomp-notification";
                // 公共topic
                // var notificationTopic = "/topic/download-notification";
                // 点对点广播
                var notificationTopic = "/user/queue/download-notification"; // (A)

                var socket = new SockJS(url);
                var stompClient = Stomp.over(socket);

                stompClient.connect({}, function (frame) {
                    console.log("STOMP connection successful");

                    stompClient.subscribe(notificationTopic, function (msg) {   // (B)
                        var jsonStr = msg.body;

                        var obj = JSON.parse(jsonStr);
                        var percent = obj.percent;
                        tt.stompMsg += ‘STOMP 通知您:已下载完成‘ + percent + "%\r\n";
                        if (percent === 100) {
                            stompClient.disconnect()
                        }

                    });

                }, function (error) {
                    console.log("STOMP protocol error " + error)
                })
            }
  • (A) 如果想针对特定用户接收消息,我们需要以/user/为前缀,Spring STOMP会把以/user/为前缀的消息交给UserDestinationMessageHandler进行处理并发给特定的用户,当然这个/user/是可以通过WebSocketBrokerConfig进行个性化配置的,为了简单起见,我们这里就使用默认配置,所以我们的topic url就是/user/queue/download-notification
  • (B) 设置stompClient消息处理callback进行消息处理。

效果演示

5 总结

在文中为大家简单讲解了几种常用的消息推送方案,并通过一个下载案例重点演示了SSEWebSocket这两种server push模式的消息推送。当然还有很多细节并没有在文中说明,建议大家下载源码对照参考。

相比较这几种模式,小编认为如果我们的需求仅仅是向客户端推送消息,那么使用SSE的性价比更高一些,Long-Polling次之。使用WebSocket有一种杀鸡用牛刀的感觉,并且给我们系统也带来了更多的复杂性,得不偿失,所以不太推荐。而Polling虽然实现方式最简单且兼容性最强,但是其效率过低,所以不建议使用。当然如果您有其他见解,欢迎留言讨论交流。

文中示例源码:https://github.com/leven-space/SpringBootNotification.git

如果您觉得这篇文章有用,请留下您的小

原文地址:https://blog.51cto.com/14479714/2425775

时间: 2024-10-11 08:57:57

Spring Boot 二三事:WEB 应用消息推送的那点事的相关文章

VDN For PB Web实现消息推送

利用VesnData.Net(VDN)的互联网数据驱动功能我们实现了PB连接互联网数据库的功能.在互联网开发的过程中我们往往有些消息或者数据希望即时能够通知到各个客户端,现在比较流行的一种技术就是消息推送,现在PB就可以利用VDN来实现消息的推送. 1.  首先在窗口里放置一个uo_VDNCore 组件,设置一下URL.DesKey.SN属性 2.  增加一个登录按钮,实现消息在线的登录.因为VDNCore已经封装好了这些功能,所以实现很简单.这里我们还可以获取服务器端返回的客户端的IP地址和唯

利用Hessian10分钟配置出一个简单的跨Web服务消息推送

笔者,之前对Web跨服务推送数据一无所知,今天研究了一下.其实有些事物,在不理解的时候完全觉得好似天外来物.但了解一点点之后,又会觉得十分有趣.每天闲扯一下很开心,下面一个简单的实例10分钟配置出跨Web服务的消息推送.一.被调用端web.xml配置 <!--HelloHessian --> <servlet> <servlet-name>HelloHessian</servlet-name> <servlet-class>com.caucho.

Spring Boot二:Web 综合开发

Web 开发 Spring Boot Web 开发非常的简单,其中包括常用的 json 输出.filters.property.log 等 json 接口开发 在以前使用 Spring 开发项目,需要提供 json 接口时需要做哪些配置呢 添加 jackjson 等相关 jar 包 配置 Spring Controller 扫描 对接的方法添加 @ResponseBody 就这样我们会经常由于配置错误,导致406错误等等,Spring Boot 如何做呢,只需要类添加 @RestControll

Notification web 桌面消息推送

var NotificationHandler = { isNotificationSupported: 'Notification' in window, isPermissionGranted: function () { return Notification.permission === 'granted'; }, requestPermission: function () { if (!this.isNotificationSupported) { console.log('当前浏览

web消息推送-goesay

原文:http://www.upwqy.com/details/22.html 1 GoEasy简介: GoEasy - Web实时消息推送服务专家 最简单的方式将消息从服务器端推送至客户端 最简单的方式将消息从各种客户端推送至客户端 任何Web浏览器 任何开发语言 实时高效  隐私安全 稳定可靠 简单易用 2 注册获取到相关配置 注册链接:https://center.goeasy.io/cn/account/form 3 登录后创建免费应用 获取相关配置 创建一个免费应用  有一年的试用期

消息推送技术

消息推送 消息推送是针对 Web 应用开发领域的技术,指服务端以主动方式将信息送达客户端.主要用于提升用户体验,避免用户刷新页面从服务端拉取数据.例如 Web 邮件中自动出现刚收到的邮件项,Web 即时通讯自动提示新到消息等应用场景. 要实现消息推送机制,涉及两方面的内容: Web 层消息推送 服务层消息服务 Web 层消息推送 套接字 可以使用套接字接口进行全双工通讯.可以通过 Flash XMLSocket.Java Applet 技术实现.但由于实现方案与厂商技术绑定过紧,不属于 Web

在Spring Boot框架下使用WebSocket实现消息推送

Spring Boot的学习持续进行中.前面两篇博客我们介绍了如何使用Spring Boot容器搭建Web项目(使用Spring Boot开发Web项目)以及怎样为我们的Project添加HTTPS的支持(使用Spring Boot开发Web项目(二)之添加HTTPS支持),在这两篇文章的基础上,我们今天来看看如何在Spring Boot中使用WebSocket. 什么是WebSocket WebSocket为浏览器和服务器之间提供了双工异步通信功能,也就是说我们可以利用浏览器给服务器发送消息,

spring boot下WebSocket消息推送

WebSocket协议 WebSocket是一种在单个TCP连接上进行全双工通讯的协议.WebSocket通信协议于2011年被IETF定为标准RFC 6455,并由RFC7936补充规范.WebSocket API也被W3C定为标准. WebSocket使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据.在WebSocket API中,浏览器和服务器只需要完成一次握手,两者之间就直接可以创建持久性的连接,并进行双向数据传输 STOMP协议 STOMP是面向文本的消息传

Spring Boot:使用Rabbit MQ消息队列

综合概述 消息队列 消息队列就是一个消息的链表,可以把消息看作一个记录,具有特定的格式以及特定的优先级.对消息队列有写权限的进程可以向消息队列中按照一定的规则添加新消息,对消息队列有读权限的进程则可以从消息队列中读走消息,而消息队列就是在消息的传输过程中保存消息的容器,你可以简单的把消息队列理解为类似快递柜,快递员(消息发布者)往快递柜(消息队列)投递物件(消息),接受者(消息订阅者)从快递柜(消息队列)接收物件(消息),当然消息队列往往还包含一些特定的消息传递和接收机制. 消息队列作为分布式系