Spring+Stomp+ActiveMq实现websocket长连接

stomp.js+spring+sockjs+activemq实现websocket长连接,使用java配置。

pom.xml(只列出除了spring基本依赖意外的依赖,spring-version为4.3.3.RELEASE):

<dependency>
        <groupId>javax.websocket</groupId>
        <artifactId>javax.websocket-api</artifactId>
        <version>1.1</version>
        <scope>provided</scope>
        <!-- 注意,scope必须为provided,否则runtime会冲突,如果使用tomcat 8,还需要将TOMCAT_HOME/lib下的javax.websocket-api.jar一并删除 -->
    </dependency>
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-websocket</artifactId>
        <version>${spring.version}</version>
    </dependency>
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-messaging</artifactId>
        <version>${spring.version}</version>
    </dependency>
    <dependency>
      <groupId>org.springframework</groupId>
      <artifactId>spring-jms</artifactId>
      <version>${spring.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.xbean</groupId>
      <artifactId>xbean-spring</artifactId>
      <version>3.16</version>
    </dependency>
    <dependency>
      <groupId>org.apache.activemq</groupId>
      <artifactId>activemq-core</artifactId>
      <version>5.7.0</version>
    </dependency>
    <dependency>
      <groupId>org.apache.activemq</groupId>
      <artifactId>activemq-pool</artifactId>
      <version>5.12.1</version>
    </dependency>
    <dependency>
      <groupId>com.fasterxml.jackson.core</groupId>
      <artifactId>jackson-core</artifactId>
      <version>2.8.1</version>
    </dependency>
    <dependency>
      <groupId>com.fasterxml.jackson.core</groupId>
      <artifactId>jackson-databind</artifactId>
      <version>2.8.1</version>
    </dependency>
    <dependency>
      <groupId>com.fasterxml.jackson.core</groupId>
      <artifactId>jackson-annotations</artifactId>
      <version>2.8.1</version>
    </dependency>
    <dependency>
        <groupId>io.projectreactor</groupId>
        <artifactId>reactor-net</artifactId>
        <version>2.0.7.RELEASE</version>
    </dependency>
    <dependency>
        <groupId>io.netty</groupId>
        <artifactId>netty-all</artifactId>
        <version>4.0.33.Final</version>
    </dependency>
    <dependency>
        <groupId>io.projectreactor</groupId>
        <artifactId>reactor-core</artifactId>
        <version>2.0.8.RELEASE</version>
    </dependency>

StompConfig.java

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;
import org.springframework.core.env.Environment;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.AbstractWebSocketMessageBrokerConfigurer;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;

/**
 * @EnableWebSocketMessageBroker包含@EnableWebSocket
 * @author TD
 *
 */
@Configuration
@EnableWebSocketMessageBroker
@PropertySource("classpath:activemq.properties")
public class StompConfig extends AbstractWebSocketMessageBrokerConfigurer{

    @Autowired
    private Environment env;
    /**
     * 注册代理,暴露节点用于连接。
     */
    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        // TODO Auto-generated method stub
        registry.addEndpoint("/stompEndPoint.do").withSockJS();
    }

    /**
     * 修改消息代理的配置,默认处理以/topic为前缀的消息
     * setApplicationDestinationPrefixes:配置请求的根路径,表示通过MessageMapping处理/td/*请求,不会发送到代理
     * enableStompBrokerRelay:配置代理,匹配路径的请求会进入代理:mq等
     */
    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        /*基于内存实现的stomp代理,单机适用
        registry.enableSimpleBroker("/queue","/topic");
        */
        //以/td为目的地的消息使用MessageMapping控制器处理,不走代理
        registry.setApplicationDestinationPrefixes("/td","/app");
        /**
         * 设置单独发送到某个user需要添加的前缀,用户订阅地址/user/topic/td1地址后会去掉/user,并加上用户名(需要springsecurity支持)等唯一标识组成新的目的地发送回去,
         * 对于这个url来说 加上后缀之后走代理。发送时需要制定用户名:convertAndSendToUser或者sendtouser注解.
        registry.setUserDestinationPrefix("/user")
         */
        /*基于mq实现stomp代理,适用于集群。
         * 以/topic和/queue开头的消息会发送到stomp代理中:mq等。
         * 每个mq适用的前缀不一样且有限制。activemq支持stomp的端口为61613
         */
        registry.enableStompBrokerRelay("/topic","/queue")
        .setRelayHost(env.getProperty("mq.brokenHost"))
        .setRelayPort(Integer.parseInt(env.getProperty("mq.brokenPort")))
        .setSystemLogin(env.getProperty("mq.username"))
        .setSystemPasscode(env.getProperty("mq.password"))
        .setClientLogin(env.getProperty("mq.username"))
        .setClientPasscode(env.getProperty("mq.password"));
        /*
         * systemLogin:设置代理所需的密码
         * client:设置客户端连接代理所需的密码,默认为guest
        */
    }

}

JSP页面:

<%@ page language="java" contentType="text/html; charset=UTF-8"
    pageEncoding="UTF-8"%>
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>stomp测试</title>
</head>
<body>
<button onclick="test()">stomp发送(服务端返回信息走订阅2)</button>
<button onclick="test2()">stomp订阅1(@subscribeMapping,不过代理)</button>
<button onclick="test3()">stomp订阅2 :通过代理</button>
</body>
<script type="text/javascript" src="/spring15_socket/js/sockjs-0.3.4.min.js"></script>
<script type="text/javascript" src="/spring15_socket/js/stomp.js"></script>
<script type="text/javascript">
var url = ‘http://localhost:8089/spring15_socket/stompEndPoint.do‘;
//创建sockjs链接
var sock = new SockJS(url);
//创建stomp客户端
var stomp = Stomp.over(sock);
var msg = JSON.stringify({‘name‘:‘td‘,‘age‘:13});
stomp.connect({},function(frame){
	console.log(‘connecting...‘+frame)
	stomp.send(‘/app/stomp1.do‘,{},msg);
})

function test(){
	stomp.send(‘/app/stomp1.do‘,{},msg);
}

function test2(){
	stomp.subscribe(‘/app/stomp2.do‘,function(msg){
		console.log("subscribemapping:"+JSON.parse(msg.body).content);
	})

}

function test3(){
	stomp.subscribe(‘/topic/hello‘,function(msg){
		console.log("topicHello:"+JSON.parse(msg.body).content);
	})

}

</script>
</html>

  

控制器:

package spring15_socket.controller;

import java.sql.SQLException;

import org.springframework.messaging.handler.annotation.MessageExceptionHandler;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.messaging.simp.annotation.SubscribeMapping;
import org.springframework.stereotype.Controller;
import org.springframework.ui.Model;
import org.springframework.web.bind.annotation.RequestMapping;

import spring15_socket.bean.User;

@Controller
public class TestController {

    @RequestMapping(value="/sockjs.do")
    public String test0(Model model){
        return "sockjs";
    }    

    @RequestMapping(value="/stompjs.do")
    public String test1(Model model){
        return "stomp";
    }    

    /**
     * 表示该方法处理客户端发来的/td/stomp1.do或者/app/stomp1.do。
     * sendTo:重新指定发送的位置,默认原路返回(url会加上/topic前缀)
     * 需走代理
     */
    @MessageMapping("/stomp1.do")
    @SendTo("/topic/hello")
    public User handleStomp(User user) {
        System.out.println("stomp接收到客户端的请求:"+user);
        user.setName("messagemapping返回user");
        return user;

    }

    /**
     * 用于处理messagemapping抛出的异常,类比exceptionhandler
     * @return
     */
    @MessageExceptionHandler({Exception.class,SQLException.class})
    @SendTo("/topic/errorTopic")
    public User errorHandler(Throwable t) {
        System.out.println("异常统一处理");
        User user = new User();
        user.setName("异常统一处理:"+t.getMessage());;
        return user;
    }

    /**
     * 触发方式和messagemapping一致。
     * sendTo:重新指定发送的位置,默认原路返回(url会加上/topic前缀)
     * 使用subscribemapping不走代理
     */
    @SubscribeMapping("/stomp2.do")
    @SendTo("/topic/hello")
    public User subsTest() {
        User user2 = new User();
        user2.setName("订阅name");
        user2.setPhone("subscribePhone");
        return user2;
    }

}

进入activemq的控制台,点击connection可以看到stomp连接:(这里连接了两个客户端,可以点击超链接查看连接状态)

查看启动日志可以发现:表明启动成功

进入页面点击第一个按钮,会触发send:

同时控制台可以看到:

单独点击第二个按钮不会有返回消息,点击第三个按钮之后会订阅/topic/hello,此时点击第一个触发send。在服务端执行完毕后通过sendto注解发布到/topic/hello,此时浏览器控制台会输出该频道发布的内容

注意点:

1:基于activemq作为代理,连接的端口号为61613。

2:setSystemLogin表示设置服务器连接代理(activemq)的账号密码,setClientLogin表示连接过来的客户端连接代理所需要的账号密码。

原文地址:https://www.cnblogs.com/ForsakenCoder/p/10121723.html

时间: 2024-11-03 23:56:42

Spring+Stomp+ActiveMq实现websocket长连接的相关文章

HTML5 中websocket长连接的具体实现方法

HTML5中通过调用与数据通信相关的Web Socket API,实现从服务器中推送信息到客户端. Socket又称为套接字,是基于W3C标准开发在一个TCP接口中进行双向通信的技术.通常情况下,Socket用于描述IP地址和端口,是通信过程中的一个字符句柄.当服务器端又多个应用服务绑定一个Socket时,通过通信中的字符句柄,实现不同端口对应不同应用服务功能.目前,大部分浏览器都支持HTML5中Socket API的运行. WebSocket连接服务器和客户端,这个链接是一个实时的长连接,服务

实现单台测试机6万websocket长连接

本文由作者郑银燕授权网易云社区发布. 本文是我在测试过程中的记录,实现了单台测试机发起最大的websocket长连接数.在一台测试机上,连接到一个远程服务时的本地端口是有限的.根据TCP/IP协议,由于端口是16位整数,也就只能是0到 65535,而0到1023是预留端口,所以能分配的端口只是1024到65534,也就是64511个.也就是说,一台机器一个IP只能创建六万多个websocket长连接. 一.客户端参数调优 本文采用的测试机分别为黑mac系统和linux系统(由于黑mac机器本身性

用nginx的Push_stream_module推送模块管理WebSocket长连接会不会限制WebSocket的双向通信的能力

最近项目在用nginx的Push_stream_module推送模块去实现服务器端向客户端信息的推送,本来只想实现这个单向通信的需求的,可是给客户端推送完消息之后,如果想让客户端给一个反馈,就没办法监听获取到客户端的反馈事件,并及时作出反应.后来知道push_stream_module管理长连接是基于pub/sub模式的,而且好像模块中也没有给出类似websocket中onmessage的API,是不是就是限制了webSocket全双工的通信能力,这点好像nodejs的socket.io做的就更

websocket长连接

代码如下: using UnityEngine; using System.Collections; using System.Collections.Generic; using System.IO; using System.Net; using System.Net.Sockets; using System.Runtime.InteropServices; namespace SYW_WebScoket { public class WebSocketManager : Singleto

WebSocket 长连接

<?php class SocketService { private $address = 'localhost'; private $port = 80; private $_sockets; public function __construct($address = '', $port='') { if(!empty($address)){ $this->address = $address; } if(!empty($port)) { $this->port = $port;

Vue+WebSocket 实现页面实时刷新长连接

Vue+WebSocket 实现页面实时刷新长连接:https://www.cnblogs.com/fmixue/p/9110074.html 看完让你彻底搞懂Websocket原理:https://blog.csdn.net/frank_good/article/details/50856585 webSocket长连接实现demo(场景:扫码自动跳转登录,或者替换轮询):https://blog.csdn.net/GordoHu/article/details/78293803 原文地址:h

Socket,长连接,消息推送,消息提醒,未读消息提醒,消息通知,未读消息通知

今天公司要搞个类似QQ空间的未读消息通知,于是想到用WebSocket长连接,搜索一些资料后,调试好久,还以为不行,结果发现是IIS版本的原因,我本地用的Win7的系统,是IIS6的,本地测试一直不行,后来查资料发现用这个WebSocket要IIS8,我直接放服务器上,测试就OK了,废话不多说,直接上代码.我用的MVC+EF的框架. 1.新建一个Controller,继承APIController [Description("消息")] public class MessageCont

RPC、基于netty的长连接和websocket

1 RPC RPC也采用C/S的编程模式,以模块调用的简单性忽略通讯的具体细节,以便程序员不用关心C/S之间的通讯协议,集中精力对付实现过程.这就决定了 RPC生成的通讯包不可能对每种应用都有最恰当的处理办法,与Socket方法相比,传输相同的有效数据,RPC占用更多的网络带宽. RPC实在socket的基础上实现的,但是它比socket需要更多的网络和资源系统. 2 基于netty的长连接 异步.高性能 Boss线程(一个服务器端口对于一个)---接收到客户端连接---生成Channel---

Websocket、长连接、循环连接

[转]转自知乎高票回答  https://www.zhihu.com/question/20215561 一.WebSocket是HTML5出的东西(协议),也就是说HTTP协议没有变化,或者说没关系,但HTTP是不支持持久连接的(长连接,循环连接的不算)首先HTTP有1.1和1.0之说,也就是所谓的keep-alive,把多个HTTP请求合并为一个,但是Websocket其实是一个新协议,跟HTTP协议基本没有关系,只是为了兼容现有浏览器的握手规范而已,也就是说它是HTTP协议上的一种补充可以