书接上文,Spring Chapter4 WebSocket 胡乱翻译 (一)
4.4.4. 消息流
一旦暴露了STOMP端点,Spring应用程序就成为连接客户端的STOMP代理。 本节介绍服务器端的消息流。
Spring-messaging模块包含对源自Spring Integration的消息传递应用程序的基础支持,后来被提取并整合到Spring Framework中,以便在许多Spring项目和应用程序场景中得到更广泛的使用。 下面列出了一些可用的消息传递抽象:
- Message - 包含标头和内容的消息的简单表示。
- MessageHandler - 处理消息的合同。
- MessageChannel - 发送消息的合同,该消息允许生成者和使用者之间的松散耦合。
- SubscribableChannel - MessageChannel和MessageHandler订阅者。
- ExecutorSubscribableChannel - 使用Executor传递消息的SubscribableChannel。
@EnableWebSocketMessageBroker使用上面的组件来实现消息的工作流。
下图示意了spring自带的简单消息代理:
“clientInboundChannel” - 用于传递从WebSocket客户端收到的消息。
“clientOutboundChannel” - 用于将服务器消息发送到WebSocket客户端。
“brokerChannel” - 用于从服务器端的应用程序代码向消息代理发送消息。
下图显示了Spring使用外部的代理,比如ActiveMQ:
当从WebSocket连接接收消息时,它们被解码为STOMP帧,然后变为Spring消息表示,并发送到“clientInboundChannel”以进行进一步处理。 例如,目标头以“/ app”开头的STOMP消息可以被路由到带注释的控制器中的@MessageMapping方法,而“/ topic”和“/ queue”消息可以直接路由到消息代理。
处理来自客户端的STOMP消息的带注释的@Controller可以通过“brokerChannel”向消息代理发送消息,并且代理将通过“clientOutboundChannel”将消息广播给匹配的订户。 相同的控制器也可以响应HTTP请求执行相同的操作,因此客户端可以执行HTTP POST,然后@PostMapping方法可以向消息代理发送消息以向订阅的客户端广播。
让我们通过一个简单的例子来追踪消息流。 鉴于以下服务器设置:
@Configuration @EnableWebSocketMessageBroker public class WebSocketConfig implements WebSocketMessageBrokerConfigurer { @Override public void registerStompEndpoints(StompEndpointRegistry registry) { registry.addEndpoint("/portfolio"); } @Override public void configureMessageBroker(MessageBrokerRegistry registry) { registry.setApplicationDestinationPrefixes("/app"); registry.enableSimpleBroker("/topic"); } } @Controller public class GreetingController { @MessageMapping("/greeting"){ public String handle(String greeting) { return "[" + getTimestamp() + ": " + greeting; } }
1.客户端连接到"http://localhost:8080/portfolio",一旦建立了WebSocket连接,STOMP帧就会开始流动。
2.客户端发送带有目标头"/topic/greeting"的SUBSCRIBE帧。一旦接收并解码,该消息就被发送到“clientInboundChannel”,然后被路由到存储客户端订阅的消息代理。
3.客户端将SEND帧发送到"/app/greeting"。 "/app"前缀有助于将其路由到带注释的控制器。删除“/ app”前缀后,目标的剩余“/ greeting”部分将映射到GreetingController中的@MessageMapping方法。
4.从GreetingController返回的值变为Spring消息,其中消息内容基于返回值和默认目标头"/topic/greeting"(从输入目标派生,"/app"替换为"/topic" )。生成的消息将发送到"brokerChannel" 并由消息代理处理。
5.消息代理找到所有匹配的订户,并通过“clientOutboundChannel”向每个订户发送MESSAGE帧,消息被编码为STOMP帧并在WebSocket连接上发送。
4.4.5. Annotated Controllers
应用程序可以使用带@Controller注释的类来处理来自客户端的消息。 这些类可以声明@MessageMapping,@ SubscribeMapping和@ExceptionHandler方法,如下所述。
@MessageMapping
@MessageMapping注释可用于根据目标路由消息的方法。 它在方法级别和类型级别受支持。 在类型级别,@ MessessMapping用于表示控制器中所有方法的共享映射。
默认情况下,目标映射应为Ant样式,路径模式,例如, “/foo *”,“/foo/**”。 模式包括对模板变量的支持,例如 “/foo /{id}”,可以使用@DestinationVariable方法参数引用。
当@MessageMapping方法返回一个值时,默认情况下,该值通过配置的MessageConverter序列化为有效负载,然后作为消息发送到“brokerChannel”,从那里向用户广播。 出站消息的目的地与入站消息的目的地相同,但前缀为“/ topic”。
您可以使用@SendTo方法批注来自定义将消息内容发送到的目标。 @SendTo也可以在类级别使用,以共享发送消息的默认目标。 @SendToUser是仅向与消息关联的用户发送消息的变体。 有关详细信息,请参阅用户目标。
@MessageMapping方法的返回值可以用ListenableFuture,CompletableFuture或CompletionStage包装,以便异步生成消息内容。
作为从@MessageMapping方法返回消息内容的替代方法,您还可以使用SimpMessagingTemplate发送消息,这也是在封面下处理返回值的方式。 请参阅发送消息。
@SubscribeMapping
@SubscribeMapping类似于@MessageMapping,但仅将映射缩小为订阅消息。 它支持与@MessageMapping相同的方法参数。 但是对于返回值,默认情况下,消息通过“clientOutboundChannel”直接发送到客户端以响应订阅,而不是通过“brokerChannel”作为对匹配订阅的广播发送给代理。 添加@SendTo或@SendToUser会覆盖此行为并发送给代理。
什么时候有用? 假设应用程序控制器映射到“/app”时代理映射到“/topic”和“/queue”。 在此设置中,代理将所有订阅存储到旨在用于重复广播的“/topic”和“/queue”,并且不需要应用程序参与。 客户端还可以订阅一些“/app”目的地,并且控制器可以返回响应该订阅的值而不涉及代理,实际上是一次性的请求 - 回复交换,而无需再次存储或使用订阅。 一种情况是在启动时使用初始数据填充UI。
什么时候这没用? 不要尝试将代理和控制器映射到相同的目标前缀,除非您希望由于某种原因单独处理消息(包括订阅)。 入站消息是并行处理的。 无法保证代理或控制器是否将首先处理给定的消息。 如果在存储订阅并准备好广播时通知目标,则客户端应该在服务器支持时询问收据(简单代理不支持)。 例如,使用Java STOMP Client:
@Autowired private TaskScheduler messageBrokerTaskScheduler; // During initialization.. stompClient.setTaskScheduler(this.messageBrokerTaskScheduler); // When subscribing.. StompHeaders headers = new StompHeaders(); headers.setDestination("/topic/..."); headers.setReceipt("r1"); FrameHandler handler = ...; stompSession.subscribe(headers, handler).addReceiptTask(() -> { // Subscription ready... });
一个服务器端的选择是在brokerChannel上注册ExecutorChannelInterceptor,并实现在处理完消息(包括订阅)后调用的afterMessageHandled方法。
@MessageExceptionHandler
应用程序可以使用@MessageExceptionHandler方法来处理来自@MessageMapping方法的异常。 感兴趣的异常可以在注释本身中声明,或者如果要获取对异常实例的访问权限,则可以通过方法参数声明:
@Controller public class MyController { // ... @MessageExceptionHandler public ApplicationError handleException(MyException exception) { // ... return appError; } }
@MessageExceptionHandler方法支持灵活的方法签名,并支持与@MessageMapping方法相同的方法参数类型和返回值。
通常,@ MessessExceptionHandler方法在声明它们的@Controller类(或类层次结构)中应用。如果您希望这些方法在控制器之间全局应用更多,则可以在标有@ControllerAdvice的类中声明它们。 这与Spring MVC中的类似支持相当。
4.4.6. 发送消息
如果要从应用程序的任何部分向连接的客户端发送消息,该怎么办? 任何应用程序组件都可以向“brokerChannel”发送消息。 最简单的方法是注入一个SimpMessagingTemplate,并使用它来发送消息。 通常,应该很容易按类型注入,例如:
@Controller public class GreetingController { private SimpMessagingTemplate template; @Autowired public GreetingController(SimpMessagingTemplate template) { this.template = template; } @RequestMapping(path = "/greetings", method = POST) public void greet(String greeting) { String text = "[" + getTimestamp() + "]:" + greeting; this.template.convertAndSend("/topic/greetings", text); } }
但如果存在相同类型的另一个bean,它也可以通过其名称“brokerMessagingTemplate”进行限定。
4.4.7. Simple Broker
内置的简单消息代理处理来自客户端的订阅请求,将它们存储在内存中,并将消息广播到具有匹配目标的连接客户端。 代理支持类似路径的目标,包括对Ant样式目标模式的订阅。
4.4.8. External Broker
简单的代理非常适合入门但仅支持STOMP命令的子集(例如,没有ack,收据等),依赖于简单的消息发送循环,并且不适合于群集。 作为替代方案,应用程序可以升级到使用功能齐全的消息代理。
检查STOMP文档以查找您选择的消息代理(例如RabbitMQ,ActiveMQ等),安装代理,并在启用STOMP支持的情况下运行它。 然后在Spring配置中启用STOMP代理中继而不是简单代理。
以下是启用功能齐全的代理的示例配置:
@Configuration @EnableWebSocketMessageBroker public class WebSocketConfig implements WebSocketMessageBrokerConfigurer { @Override public void registerStompEndpoints(StompEndpointRegistry registry) { registry.addEndpoint("/portfolio").withSockJS(); } @Override public void configureMessageBroker(MessageBrokerRegistry registry) { registry.enableStompBrokerRelay("/topic", "/queue"); registry.setApplicationDestinationPrefixes("/app"); } }
上述配置中的“STOMP代理中继”是Spring MessageHandler,它通过将消息转发到外部消息代理来处理消息。 为此,它建立到代理的TCP连接,将所有消息转发给它,然后通过其WebSocket会话将从代理接收的所有消息转发给客户端。 从本质上讲,它充当“转发”,可以在两个方向上转发消息。
注意:请将io.projectreactor.ipc:reactor-netty和io.netty:netty-all dependencies添加到项目中以进行TCP连接管理。
此外,应用程序组件(例如,HTTP请求处理方法,业务服务等)也可以向代理中继发送消息,如发送消息中所述,以便向订阅的WebSocket客户端广播消息。
实际上,代理中继实现了健壮且可扩展的消息广播。
4.4.9. Connect to Broker
STOMP代理中继维护与代理的单个“系统”TCP连接。 此连接仅用于源自服务器端应用程序的消息,而不用于接收消息。 您可以为此连接配置STOMP凭据,即STOMP帧登录和密码标头。 这在XML命名空间和Java配置中都显示为systemLogin / systemPasscode属性,默认值为guest / guest。
STOMP代理中继还为每个连接的WebSocket客户端创建单独的TCP连接。 您可以配置STOMP凭据以用于代表客户端创建的所有TCP连接。 它在XML命名空间和Java配置中作为clientLogin / clientPasscode属性公开,默认值为guest / guest。
提示:STOMP代理中继始终在每个CONNECT帧上设置登录和密码头,它代表客户端转发给代理。 因此,WebSocket客户端无需设置这些标头; 他们会被忽略。 正如身份验证部分所述,WebSocket客户端应该依赖HTTP身份验证来保护WebSocket端点并建立客户端身份。
STOMP代理中继还通过“system”TCP连接向消息代理发送和接收心跳。 您可以配置发送和接收心跳的间隔(默认情况下每个10秒)。 如果与代理的连接丢失,代理中继将继续尝试每5秒重新连接一次,直到成功为止。
任何Spring bean都可以实现ApplicationListener <BrokerAvailabilityEvent>,以便在与代理的“系统”连接丢失并重新建立时接收通知。 例如,股票报价服务广播股票报价可以在没有活动的“system”连接时停止尝试发送消息。
默认情况下,STOMP代理中继始终连接,并在连接丢失时根据需要重新连接到同一主机和端口。 如果您希望提供多个地址,则在每次尝试连接时,您都可以配置地址供应商,而不是固定的主机和端口。 例如:
@Configuration @EnableWebSocketMessageBroker public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer { // ... @Override public void configureMessageBroker(MessageBrokerRegistry registry) { registry.enableStompBrokerRelay("/queue/", "/topic/").setTcpClient(createTcpClient()); registry.setApplicationDestinationPrefixes("/app"); } private ReactorNettyTcpClient<byte[]> createTcpClient() { Consumer<ClientOptions.Builder<?>> builderConsumer = builder -> { builder.connectAddress(() -> { // Select address to connect to ... }); }; return new ReactorNettyTcpClient<>(builderConsumer, new StompReactorNettyCodec()); } }
还可以使用virtualHost属性配置STOMP代理中继。 此属性的值将被设置为每个CONNECT帧的主机头,并且可能在例如云环境中有用,其中建立TCP连接的实际主机与提供基于云的STOMP服务的主机不同。
4.4.10 点作为分隔符
当消息路由到@MessageMapping方法时,它们与AntPathMatcher匹配,并且默认模式应使用斜杠“/”作为分隔符。 这是Web应用程序中的一个很好的约定,类似于HTTP URL。 但是,如果您更习惯于消息传递约定,则可以切换到使用点“.” 作为分隔符。
@Configuration @EnableWebSocketMessageBroker public class WebSocketConfig implements WebSocketMessageBrokerConfigurer { // ... @Override public void configureMessageBroker(MessageBrokerRegistry registry) { registry.setPathMatcher(new AntPathMatcher(".")); registry.enableStompBrokerRelay("/queue", "/topic"); registry.setApplicationDestinationPrefixes("/app"); } }
之后,控制器可以使用点“.” 作为@MessageMapping方法中的分隔符:
@Controller @MessageMapping("foo") public class FooController { @MessageMapping("bar.{baz}") public void handleBaz(@DestinationVariable String baz) { // ... } }
客户端现在可以向“/app/foo.bar.baz123”发送消息。
在上面的示例中,我们没有更改“代理中继”上的前缀,因为它们完全依赖于外部消息代理。 检查您正在使用的代理的STOMP文档页面,以查看它为目标标头支持的约定。
另一方面,“简单代理”确实依赖于配置的PathMatcher,因此如果您切换也将应用于代理的分隔符,并且将消息中的目标与订阅中的模式匹配。
4.4.11认证 (Authentication)
WebSocket消息传递会话中的每个STOMP都以HTTP请求开始 - 可以是升级到WebSockets的请求(即WebSocket握手),或者在SockJS回退一系列SockJS HTTP传输请求的情况下。
Web应用程序已经具有用于保护HTTP请求的身份验证和授权。 通常,用户通过Spring Security使用某种机制(例如登录页面,HTTP基本身份验证或其他)进行身份验证。 经过身份验证的用户的安全上下文保存在HTTP会话中,并与同一个基于cookie的会话中的后续请求相关联。
因此,对于WebSocket握手或SockJS HTTP传输请求,通常已经存在可通过HttpServletRequest#getUserPrincipal()访问的经过身份验证的用户。 Spring自动将该用户与为其创建的WebSocket或SockJS会话相关联,随后通过用户头与该会话上传输的所有STOMP消息相关联。
简而言之,为了安全性,典型的Web应用程序不需要做任何其他特殊的事情。 用户在HTTP请求级别进行身份验证,并通过基于cookie的HTTP会话维护安全上下文,然后将该会话与为该用户创建的WebSocket或SockJS会话相关联,并在流经应用程序的每个Message上标记用户标头。
请注意,STOMP协议在CONNECT帧上确实有“登录”和“密码”标头。 这些最初设计用于并且仍然需要例如用于TCP上的STOMP。 但是,对于STOMP over WebSocket,Spring默认忽略STOMP协议级别的授权标头,并假定用户已在HTTP传输级别进行了身份验证,并期望WebSocket或SockJS会话包含经过身份验证的用户。
提示:Spring Security提供WebSocket子协议授权,该授权使用ChannelInterceptor根据其中的用户头来授权消息。 此外,Spring Session还提供WebSocket集成,以确保在WebSocket会话仍处于活动状态时,用户HTTP会话不会过期。
4.4.12令牌认证
Spring Security OAuth支持基于令牌的安全性,包括JSON Web Token(JWT)。 这可以用作Web应用程序中的身份验证机制,包括STOMP over WebSocket交互,正如上一节所述,即通过基于cookie的会话维护身份。
同时,基于cookie的会话并不总是最适合,例如在压根不希望维护服务器端会话的应用程序中,或者在通常使用标头进行身份验证的手机应用程序中。
WebSocket协议RFC 6455“没有规定服务器在WebSocket握手期间可以对客户端进行身份验证的任何特定方式。” 实际上,浏览器客户端只能使用标准身份验证标头(即基本HTTP身份验证)或cookie,并且不能提供自定义标头。 同样,SockJS JavaScript客户端没有提供使用SockJS传输请求发送HTTP标头的方法,请参阅sockjs-client问题196.相反,它确实允许发送可用于发送令牌但具有其自身缺点的查询参数,例如 因为令牌可能无意中使用服务器日志中的URL进行了记录。
提示:以上限制适用于基于浏览器的客户端,不适用于基于Spring Java的STOMP客户端,该客户端支持使用WebSocket和SockJS请求发送头文件
因此,希望避免使用cookie的应用程序可能无法在HTTP协议级别进行身份验证。 他们可能更喜欢在STOMP消息传递协议级别使用标头进行身份验证,而不是使用Cookie。有两个简单的步骤可以做到这一点:
- 使用STOMP客户端在连接时传递身份验证标头。
- 使用ChannelInterceptor处理身份验证标头。
下面是注册自定义身份验证拦截器的示例服务器端配置。 请注意,拦截器只需要在CONNECT消息上进行身份验证并设置用户头。 Spring将记录并保存经过身份验证的用户,并将其与同一会话中的后续STOMP消息相关联:
@Configuration @EnableWebSocketMessageBroker public class MyConfig implements WebSocketMessageBrokerConfigurer { @Override public void configureClientInboundChannel(ChannelRegistration registration) { registration.setInterceptors(new ChannelInterceptorAdapter() { @Override public Message<?> preSend(Message<?> message, MessageChannel channel) { StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class); if (StompCommand.CONNECT.equals(accessor.getCommand())) { Authentication user = ...; // access authentication header(s) accessor.setUser(user); } return message; } }); } }
还要注意,当使用Spring Security的消息授权时,目前您需要确保在Spring Security之前申请认证ChannelInterceptor配置。 最好通过在自己的标记为@Order(Ordered.HIGHEST_PRECEDENCE + 99)的WebSocketMessageBrokerConfigurer实现中声明自定义拦截器来完成。
4.4.13 用户目的地
应用程序可以发送针对特定用户的消息,Spring的STOMP支持可识别以“/user/”为前缀的目标。 例如,客户端可能订阅目标“/user/queue/position-updates”。 该目的地将由UserDestinationMessageHandler处理并转换为用户会话唯一的目的地,例如,“/queue/position-updates-user123”。 这提供了订阅一般命名的目的地的便利性,同时确保不与订阅相同目的地的其他用户发生冲突,使得每个用户可以接收唯一的库存位置更新。
在发送方,消息可以发送到目的地,例如“/user/{username}/queue/position-updates”,然后由UserDestinationMessageHandler将其转换为一个或多个目的地,每个目的地对应用户会话。 这允许应用程序中的任何组件发送针对特定用户的消息,而不必知道除其名称和通用目标之外的任何内容。 通过注释和消息传递模板也支持这一点。
例如,消息处理方法可以向与通过@SendToUser注释处理的消息相关联的用户发送消息(在类级别上也支持共享公共目的地):
@Controller public class PortfolioController { @MessageMapping("/trade") @SendToUser("/queue/position-updates") public TradeResult executeTrade(Trade trade, Principal principal) { // ... return tradeResult; } }
如果用户有多个会话,则默认情况下,所有订阅给定目标的会话都是目标。 但是,有时可能需要仅定位发送正在处理的消息的会话。 这可以通过将broadcast属性设置为false来完成,例如:
@Controller public class MyController { @MessageMapping("/action") public void handleAction() throws Exception { // raise MyBusinessException here } @MessageExceptionHandler @SendToUser(destinations = "/queue/errors", broadcast = false) public ApplicationError handleException(MyBusinessException exception) { // ... return appError; } }
虽然用户目的地通常意味着经过身份验证的用户,但并不严格要求。 与经过身份验证的用户无关的WebSocket会话可以订阅用户目标。 在这种情况下,@ SendToUser注释的行为与broadcast = false完全相同,即仅针对发送正在处理的消息的会话。
例如,通过注入由Java配置或XML命名空间创建的SimpMessagingTemplate,也可以从任何应用程序组件向用户目标发送消息(如果需要使用@Qualifier进行限定,则bean名称为“brokerMessagingTemplate”):
@Service public class TradeServiceImpl implements TradeService { private final SimpMessagingTemplate messagingTemplate; @Autowired public TradeServiceImpl(SimpMessagingTemplate messagingTemplate) { this.messagingTemplate = messagingTemplate; } // ... public void afterTradeExecuted(Trade trade) { this.messagingTemplate.convertAndSendToUser(trade.getUserName(), "/queue/position-updates", trade.getResult()); } }
提示:将用户目标与外部消息代理一起使用时,请检查代理文档,了解如何管理非活动队列,以便在用户会话结束时删除所有唯一用户队列。 例如,当使用/exchange/amq.direct/position-updates等目标时,RabbitMQ会创建自动删除队列。 因此,在这种情况下,客户端可以订阅/user/exchange/amq.direct/position-updates。 同样,ActiveMQ具有用于清除非活动目标的配置选项。
在多应用程序服务器方案中,用户目标可能仍未解析,因为用户连接到不同的服务器。 在这种情况下,您可以配置目标以广播未解析的消息,以便其他服务器有机会尝试。 这可以通过Java config中的MessageBrokerRegistry的userDestinationBroadcast属性和XML中的message-broker元素的userdestination-broadcast属性来完成。
4.4.14消息顺序
来自代理的消息将发布到“clientOutboundChannel”,从那里将它们写入WebSocket会话。 由于通道由ThreadPoolExecutor支持,因此消息在不同的线程中处理,并且客户端接收的结果序列可能与发布的确切顺序不匹配。
如果这是一个问题,请启用以下标志:
@Configuration @EnableWebSocketMessageBroker public class MyConfig implements WebSocketMessageBrokerConfigurer { @Override protected void configureMessageBroker(MessageBrokerRegistry registry) { // ... registry.setPreservePublishOrder(true); } }
设置标志后,同一客户端会话每次只发送一个消息发布到“clientOutboundChannel”,以便保证发布顺序。 请注意,这会导致额外性能开销,因此仅在需要时才启用它。
4.4.15. 事件
发布了几个ApplicationContext事件(如下所列),可以通过实现Spring的ApplicationListener接口来接收它们。
- BrokerAvailabilityEvent
- SessionConnectEvent
- SessionConnectedEvent
- SessionSubscribeEvent
- SessionUnsubscribeEvent
- SessionDisconnectEvent
注意:使用功能齐全的代理时,STOMP“代理中继”会自动重新连接“system”连接,以防代理暂时不可用。 但是,客户端连接不会自动重新连接。 假设启用了心跳,客户端通常会注意到代理在10秒内没有响应。 客户端需要实现自己的重新连接逻辑。
4.4.16拦截
事件提供STOMP连接生命周期的通知,而不是每个客户端消息的通知。 应用程序还可以注册ChannelInterceptor来拦截任何消息,以及处理链的任何部分。 例如,拦截来自客户端的入站消息:
@Configuration @EnableWebSocketMessageBroker public class WebSocketConfig implements WebSocketMessageBrokerConfigurer { @Override public void configureClientInboundChannel(ChannelRegistration registration) { registration.setInterceptors(new MyChannelInterceptor()); } }
自定义的ChannelInterceptor可以使用StompHeaderAccessor或SimpMessageHeaderAccessor来访问有关消息的信息。
public class MyChannelInterceptor implements ChannelInterceptor { @Override public Message<?> preSend(Message<?> message, MessageChannel channel) { StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message); StompCommand command = accessor.getStompCommand(); // ... return message; } }
应用程序还可以实现ExecutorChannelInterceptor,它是ChannelInterceptor的子接口,在处理消息的线程中具有回调。 虽然为发送到通道的每个消息调用一次ChannelInterceptor,但ExecutorChannelInterceptor在订阅来自通道的消息的每个MessageHandler的线程中提供挂钩。
请注意,就像上面的SesionDisconnectEvent一样,可能已从客户端发送DISCONNECT消息,或者也可能在WebSocket会话关闭时自动生成。 在某些情况下,拦截器可能会在每个会话中多次拦截此消息。 对于多个断开连接事件,组件应该是幂等的。
4.4.17 STOMP客户端
Spring可以通过WebSocket客户端或者通过TCP客户端这2种方式提供STOMP。
要开始创建和配置WebSocketStompClient:
WebSocketClient webSocketClient = new StandardWebSocketClient(); WebSocketStompClient stompClient = new WebSocketStompClient(webSocketClient); stompClient.setMessageConverter(new StringMessageConverter()); stompClient.setTaskScheduler(taskScheduler); // for heartbeats
在上面的示例中,StandardWebSocketClient可以替换为SockJsClient,因为它也是WebSocketClient的实现。 SockJsClient可以使用WebSocket或基于HTTP的传输作为后备。 有关更多详细信息,请参阅SockJsClient。
接下来建立连接并为STOMP会话提供处理程序:
String url = "ws://127.0.0.1:8080/endpoint"; StompSessionHandler sessionHandler = new MyStompSessionHandler(); stompClient.connect(url, sessionHandler);
当会话准备好使用时,会通知处理程序:
public class MyStompSessionHandler extends StompSessionHandlerAdapter { @Override public void afterConnected(StompSession session, StompHeaders connectedHeaders) { // ... } }
建立会话后,可以发送任何消息内容,并使用配置的MessageConverter对其进行序列化:
session.send("/topic/foo", "payload");
您也可以订阅目的地。 订阅方法需要处理订阅消息的处理程序,并返回可用于取消订阅的订阅句柄。 对于每个收到的消息,处理程序可以指定消息内容应该反序列化的目标对象类型:
session.subscribe("/topic/foo", new StompFrameHandler() { @Override public Type getPayloadType(StompHeaders headers) { return String.class; } @Override public void handleFrame(StompHeaders headers, Object payload) { // ... } });
要启用STOMP心跳,请使用TaskScheduler配置WebSocketStompClient,并可选择自定义心跳间隔,写入不活动10秒,导致发送心跳,10秒读取不活动,关闭连接。
STOMP协议还支持收据,其中客户端必须添加“收据”标头,服务器在处理发送或订阅后用RECEIPT帧响应。 为了支持这一点,StompSession提供了setAutoReceipt(boolean),它导致在每个后续发送或订阅时添加“收据”标头。 或者,您也可以手动将“收据”标题添加到StompHeaders。 发送和订阅都返回一个Receiptable实例,可用于注册接收成功和失败回调。 对于此功能,客户端必须配置TaskScheduler和收据到期前的时间(默认为15秒)。
请注意,StompSessionHandler本身是一个StompFrameHandler,它允许它处理ERROR帧以及处理消息的异常的handleException回调,以及包含ConnectionLostException的传输级错误的handleTransportError。
4.4.18. WebSocket Scope
每个WebSocket会话都有一个属性映射。 映射作为标头附加到入站客户端消息,并且可以从控制器方法访问,例如:
@Controller public class MyController { @MessageMapping("/action") public void handle(SimpMessageHeaderAccessor headerAccessor) { Map<String, Object> attrs = headerAccessor.getSessionAttributes(); // ... } }
也可以在websocket范围中声明一个Spring管理的bean。 WebSocket范围的bean可以注入控制器和“clientInboundChannel”上注册的任何通道拦截器。 这些通常是单例,比任何单独的WebSocket会话都更长寿。 因此,您需要为WebSocket范围的bean使用范围代理模式:
@Component @Scope(scopeName = "websocket", proxyMode = ScopedProxyMode.TARGET_CLASS) public class MyBean { @PostConstruct public void init() { // Invoked after dependencies injected } // ... @PreDestroy public void destroy() { // Invoked when the WebSocket session ends } } @Controller public class MyController { private final MyBean myBean; @Autowired public MyController(MyBean myBean) { this.myBean = myBean; } @MessageMapping("/action") public void handle() { // this.myBean from the current WebSocket session } }
与任何自定义作用域一样,Spring在第一次从控制器访问时初始化一个新的MyBean实例,并将该实例存储在WebSocket会话属性中。 随后返回相同的实例,直到会话结束。 WebSocket范围的bean将调用所有Spring生命周期方法,如上面的示例所示。
4.4.19. Performance
略
4.4.20. Monitoring
略
4.4.21. Testing
略
原文地址:https://www.cnblogs.com/zhsmtp/p/9716209.html