springboot websocket集群(stomp协议)连接时候传递参数

最近在公司项目中接到个需求。就是后台跟前端浏览器要保持长连接,后台主动往前台推数据。

网上查了下,websocket stomp协议处理这个很简单。尤其是跟springboot 集成。

但是由于开始是单机玩的,很顺利。

但是后面部署到生产搞集群的话,就会出问题了。

假如集群两个节点,浏览器A与节点A建立连接,A节点发的消息浏览器A节点肯定能收到。但是B节点由于没有跟浏览器A建立连接。B节点发的消息浏览器就收不到了。

网上也查了好多,但是没有一个说的很清楚的,也很多都是理论层面的。

还有很多思路都是通过session获取信息的。但是这都不是我需要的。我需要的是从前台传递参数,连接的时候每个节点保存下。然后通过SimpleUserRegistry.getUser获取。

话不多说,直接上代码。

<script type="text/javascript" src="${request.contextPath}/scripts/sockjs.min.js"></script>
<script type="text/javascript" src="${request.contextPath}/scripts/stomp.min.js"></script>
var WEB_SOCKET = {

        topic : "",
        url : "",
        stompClient : null,

        connect : function(url, topic, callback,userid) {
            this.url = url;
            this.topic = topic;
            var socket = new SockJS(url); //连接SockJS的endpoint名称为"endpointOyzc"
            WEB_SOCKET.stompClient = Stomp.over(socket);//使用STMOP子协议的WebSocket客户端
            WEB_SOCKET.stompClient.connect({userid:userid},function(frame){//连接WebSocket服务端
                // console.log(‘Connected:‘ + frame);
                //通过stompClient.subscribe订阅/topic/getResponse 目标(destination)发送的消息
                WEB_SOCKET.stompClient.subscribe(topic, callback);
            });
        }
};

这是响应的前端代码。只需要引入两个js。调用new SockJS(url) 就代表跟服务器建立连接了。

@Configuration

//注解开启使用STOMP协议来传输基于代理(message broker)的消息,这时控制器支持使用@MessageMapping,就像使用@RequestMapping一样
@EnableWebSocketMessageBroker
public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer {

    @Autowired
    private GetHeaderParamInterceptor getHeaderParamInterceptor;

    @Override
    //注册STOMP协议的节点(endpoint),并映射指定的url
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        //注册一个STOMP的endpoint,并指定使用SockJS协议
        registry.addEndpoint("/endpointOyzc")
                .setAllowedOrigins("*")
                .withSockJS();
       /* registry.addEndpoint("/endpointOyzc")
                .setAllowedOrigins("*")
                .setHandshakeHandler(xlHandshakeHandler)
                .withSockJS();*/
    }

    @Override
    //配置消息代理(Message Broker)
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        //点对点应配置一个/user消息代理,广播式应配置一个/topic消息代理
        registry.enableSimpleBroker("/topic", "/user");
        // 全局使用的消息前缀(客户端订阅路径上会体现出来)
        //registry.setApplicationDestinationPrefixes("/app");
        //点对点使用的订阅前缀(客户端订阅路径上会体现出来),不设置的话,默认也是/user/
        registry.setUserDestinationPrefix("/user");
    }

    /**
     * 采用自定义拦截器,获取connect时候传递的参数
     *
     * @param registration
     */
    @Override
    public void configureClientInboundChannel(ChannelRegistration registration) {
        registration.interceptors(getHeaderParamInterceptor);
    }
}

注:上面的endpointOyzc就是前端的url。后面注册端点,前台链接。

然后注意下configureClientInboundChannel这个方法,这个方法里面注入拦截器就是为了链接时候接收参数的。

/**
 * @author : hao
 * @description : websocket建立链接的时候获取headeri里认证的参数拦截器。
 * @time : 2019/7/3 20:42
 */
@Component
public class GetHeaderParamInterceptor extends ChannelInterceptorAdapter {

    @Override
    public Message<?> preSend(Message<?> message, MessageChannel channel) {
        StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
        if (StompCommand.CONNECT.equals(accessor.getCommand())) {
            Object raw = message.getHeaders().get(SimpMessageHeaderAccessor.NATIVE_HEADERS);
            if (raw instanceof Map) {
                Object name = ((Map) raw).get("userid");
                if (name instanceof LinkedList) {
                    // 设置当前访问的认证用户
                    accessor.setUser(new JqxxPrincipal(((LinkedList) name).get(0).toString()));
                }
            }
        }
        return message;
    }
}
/**
 * @author : hao
 * @description : 自定义的java.security.Principal
 * @time : 2019/7/3 20:42
 */
public class JqxxPrincipal implements Principal {

    private String loginName;

    public JqxxPrincipal(String loginName) {
        this.loginName = loginName;
    }

    @Override
    public String getName() {
        return loginName;
    }
}

这样就存入的前台传的参数。

后台发消息的时候怎么发呢?

/**
 * @author : hao
 * @description : websocket发送代理,负责发送消息
 * @time : 2019/7/4 11:01
 */
@Component
@Slf4j
public class WebsocketSendProxy<T> {
    @Autowired
    private SimpMessagingTemplate template;

    @Autowired
    private SimpUserRegistry userRegistry;

    @Resource(name = "redisServiceImpl")
    private RedisService redisService;

    @Value("spring.redis.message.topic-name")
    private String topicName;

    public void sendMsg(RedisWebsocketMsg<T> redisWebsocketMsg) {
        SimpUser simpUser = userRegistry.getUser(redisWebsocketMsg.getReceiver());
        log.info("发送消息前获取接收方为{},根据Registry获取本节点上这个用户{}", redisWebsocketMsg.getReceiver(), simpUser);
        if (simpUser != null && StringUtils.isNotBlank(simpUser.getName())) {
            //2. 获取WebSocket客户端的订阅地址
            WebSocketChannelEnum channelEnum = WebSocketChannelEnum.fromCode(redisWebsocketMsg.getChannelCode());
            if (channelEnum != null) {
                //3. 给WebSocket客户端发送消息
                template.convertAndSendToUser(redisWebsocketMsg.getReceiver(), channelEnum.getSubscribeUrl(), redisWebsocketMsg.getContent());
            }
        } else {
            //给其他订阅了主题的节点发消息,因为本节点没有
            redisService.convertAndSend(topicName, redisWebsocketMsg);
        }

    }
}

可以发现上面代码利用了redis监听模型,也就是redis模型的消息队列

/**
 * @author : hao
 * @description : redis消息监听实现类,接收处理类
 * @time : 2019/7/3 14:00
 */
@Component
@Slf4j
public class MessageReceiver {

    @Autowired
    private SimpMessagingTemplate messagingTemplate;

    @Autowired
    private SimpUserRegistry userRegistry;

    /**
     * 处理WebSocket消息
     */
    public void receiveMessage(RedisWebsocketMsg redisWebsocketMsg) {
        log.info(MessageFormat.format("Received Message: {0}", redisWebsocketMsg));
        //1. 取出用户名并判断是否连接到当前应用节点的WebSocket
        SimpUser simpUser = userRegistry.getUser(redisWebsocketMsg.getReceiver());

        if (simpUser != null && StringUtils.isNotBlank(simpUser.getName())) {
            //2. 获取WebSocket客户端的订阅地址
            WebSocketChannelEnum channelEnum = WebSocketChannelEnum.fromCode(redisWebsocketMsg.getChannelCode());
            if (channelEnum != null) {
                //3. 给WebSocket客户端发送消息
                messagingTemplate.convertAndSendToUser(redisWebsocketMsg.getReceiver(), channelEnum.getSubscribeUrl(), redisWebsocketMsg.getContent());
            }
        }
    }
}

redis消息模型只贴部分代码就好了

/**
     * 消息监听器
     */
    @Bean
    MessageListenerAdapter messageListenerAdapter(MessageReceiver messageReceiver, Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer){
        //消息接收者以及对应的默认处理方法
        MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter(messageReceiver, "receiveMessage");
        //消息的反序列化方式
        messageListenerAdapter.setSerializer(jackson2JsonRedisSerializer);

        return messageListenerAdapter;
    }

    /**
     * message listener container
     */
    @Bean
    RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory
            , MessageListenerAdapter messageListenerAdapter){
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        //添加消息监听器
        container.addMessageListener(messageListenerAdapter, new PatternTopic(topicName));

        return container;
    }

上面的思路大体如下:客户端简历链接时候,传过来userid保存起来。发消息的时候 通过userRegistry获取,能获取到就证明是跟本节点建立的链接,直接用本节点发消息就好了。

如果不是就利用redis消息队列,把消息推出去。每个节点去判断获取看下是不是本节点的userid。这样就实现了集群的部署。

原文地址:https://www.cnblogs.com/haoerlv/p/11138538.html

时间: 2024-10-01 03:59:49

springboot websocket集群(stomp协议)连接时候传递参数的相关文章

spring websocket集群问题的简单记录

[TOC] 前言 最近公司里遇到一个问题,在集群中一些websocket的消息丢失了. 产生问题的原理很简单,发送消息的服务和接收者连接的服务不是同一个服务. 解决方案 用中间件(mq, redis etc.)来在服务之间进行通信. 不直接发送websocket消息,而是将消息放在mq或者redis的list中. 并在redis中维护连接信息,服务根据连接信息来判断自己是否需要处理消息,或者将消息发给接收者连接的服务. 代码示例 我们的项目中使用的是Spring WebSocket,并且使用了S

谈谈websocket集群的解决方式

上文我们已经利用websocket实现微信二维码支付的业务. 上述实现在单机环境中实现是没有什么问题的,无非就是客户端连接服务端,首先将连接的websocketsession存在一个map里面,当异步响应的时候, 根据流水号获取map里面对于的websocketSession,给指定的客户端发送消息. 但实际生产环境中,服务器一般是采用集群模式,首先,比方支付来说,接收第三方响应的服务器可能是有多台,然后具体是根据nginx随机路由转发, 假设异步响应的服务器有2台,A和B,而且连接websoc

Redis集群~StackExchange.redis连接Twemproxy代理服务器

回到目录 本文是Redis集群系列的一篇文章,主要介绍使用StackExchange.Redis进行Twemproxy(文中简称TW)代理服务的连接过程,事务上,对于TW来说,我们需要理解一下它的物理架构,它类似于Nugix,主要实现的是请求转发,但它还有一个重要的功能,那就是自动分片,这对于大数据是很必要的,你的服务器需要横向扩展时,不需要告诉客户端,这是一种很理解化的设计模式,当然,也对于Redis来说,在配置TW之后,是可以被全美支持的! 关于tw和Redis集群的设计图 关于StackE

Redis集群~StackExchange.redis连接Sentinel服务器并订阅相关事件(原创)

回到目录 对于redis-sentinel我在之前的文章中已经说过,它是一个仲裁者,当主master挂了后,它将在所有slave服务器中进行选举,选举的原则当然可以看它的官方文章,这与我们使用者没有什么关系,而对于sentinel来说,它在进行主从切换时,会触发相关事件,这是和我们开发人员有关系的,如当+switch-master事件被触发时,说明当前Sentinal已经完成了一次主从的切换,并所有服务已经正常运转了. 下面是我这几天作的测试,对于Twemproxy代理和Sentinal哨兵都已

Springboot Session集群处理

在集群环境下,常见的基于Session的身份认证就会有一个问题,因为Session是跟着服务器走的,当用户在服务器1登陆成功后,当用户在访问服务器2的时候会因为服务器2没有用户的身份信息而再次跳转到认证页面.解决的方案很简单,不在服务器上单独的管理Session,而是把原本放在服务器上的Session抽离出来放在一个独立的存储中. ⒈使用spirng-session解决Session集群问题 我们只需要告诉spirng-session存放Session的独立存储是什么,以及独立存储的连接信息即可

websocket集群情况下Nginx 代理出现的坑

那么问题的背景: A想给B发送socket 消息 ! A这消息 这时候被Nginx 轮询发到了C 服务器上! 擦!   这时候就蛋疼了!   要接收消息那个人在B服务器上!     B就这样苦逼的收不到了! 最简解决方案: 1.我先百度了下,草.发现各种说发  ,什么MQ进行什么的! 反正麻烦! 2,redis 中有一个发布和订阅  !   搞定收工!

linux集群系列(4) --- LVS之负载均衡集群 --- 持久连接

一.简介 1.1. lvs的持久连接简介 先说说lvs的持久连接能干什么:无论你选择lvs的何种转发策略,如果你想确定来自某一个客户端的所有连接均返回到同一台Real Server,就需要LVS的持久连接了.经常用于SSL,建立一个SSL连接,需要交换SSL密钥,当启用持久性连接时,只需要做一次验证即可. 显然LVS持久连接的定义也就知道了:某一个周期(时间段)之内,来自于同一个用户的请求,都转向同一个RS. 1.2. 持久连接模板: 当使用LVS持久连接时,分发器使用一个连接跟踪(持久连接模板

Tomcat学习四步走:内核、集群、参数及性能

主题简介: 内核实现原理 分布式集群 生产部署关键参数 性能监控和分析 一.内核实现原理 HTTP Web服务器与浏览器之间以HTTP协议通信,浏览器要访问服务器即向服务器发送HTTP请求报文. 如图,此处用get方法访问了localhost的8080端口的Web.Index.JSP,服务器返回200状态码并将一些HTTP报文返回到客户端. HTTP报文 从图中可以看到,HTTP报文中的请求报文和响应报文都由三部分组成.请求报文由请求行.请求头和请求体三部分组成,其中请求行主要包括method.

redis3.0集群搭建

Redis集群搭建 redis cluster介绍 节点自动发现.集群容错slave选举.Cluster管理.集群配置管理. 集群中的每个Redis节点需要2个TCP连接端口,如6379端口用于Client连接,16379端口用于集群数据通信 集群采用Hash Slot方案,而不是一致性哈希,共16384个Hashslot.如果有3台机器,那么NodeA在0-5500,NodeB 在5501-11000,NodeC在11001-16384.这种设计下,添加,删除新Node比较方便. 由于Hash