[TOC]
前言
最近公司里遇到一个问题, 在集群中一些 websocket 的消息丢失了.
产生问题的原理很简单, 发送消息的服务和接收者连接的服务不是同一个服务.
解决方案
用中间件 (mq, redis etc.) 来在服务之间进行通信.
不直接发送 websocket 消息, 而是将消息放在 mq 或者 redis 的 list 中.
并在 redis 中维护连接信息, 服务根据连接信息来判断自己是否需要处理消息, 或者将消息发给接收者连接的服务.
代码示例
我们的项目中使用的是 Spring WebSocket, 并且使用了 STOMP 协议, 可以去官网 https://docs.spring.io/spring/docs/5.0.0.BUILD-SNAPSHOT/spring-framework-reference/html/websocket.html 查看文档.
代码示例只做维护连接信息的代码示例, 其他部分就不放上来了.
维护连接信息的代码示例
想要在维护 STOMP 协议的连接信息, 可以查看文档的这一部分 Listening To ApplicationContext Events and Intercepting Messages https://docs.spring.io/spring/docs/5.0.0.BUILD-SNAPSHOT/spring-framework-reference/html/websocket.html#websocket-stomp-appplication-context-events
这里的连接信息只要是能够标识出不同的服务就 OK.
一下是监听了订阅事件的 Listener 的部分代码:
- package cn.fjhdtp.websocket.interceptor;
- import java.util.Map;
- import org.apache.commons.lang.StringUtils;
- import org.springframework.http.server.ServerHttpRequest;
- import org.springframework.http.server.ServerHttpResponse;
- import org.springframework.web.socket.WebSocketHandler;
- import org.springframework.web.socket.server.support.HttpSessionHandshakeInterceptor;
- public class LoginInfoInterceptor extends HttpSessionHandshakeInterceptor {
- @Override
- public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response,
- WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception {
- // 握手前, 往 attributes 中增加所需信息
- Object loginBean = ...;// 获取登录的用户信息(或其他信息)
- attributes.put(WebSocketConstant.WEBSOKET_LOGINBEAN,loginBean);
- return super.beforeHandshake(request, response, wsHandler, attributes);
- }
- }
- package cn.fjhdtp.listener;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.beans.factory.annotation.Qualifier;
- import org.springframework.context.ApplicationListener;
- import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
- import org.springframework.stereotype.Component;
- import org.springframework.web.socket.messaging.SessionSubscribeEvent;
- import java.util.Map;
- @Component
- public class SessionSubscribeEventListener implements ApplicationListener<SessionSubscribeEvent> {
- @Autowired
- @Qualifier("serversideMessageTaskExecutor")
- private ThreadPoolTaskExecutor threadPoolTaskExecutor;
- @Autowired
- private IMessageHandler messageHandler;
- @Override
- public void onApplicationEvent(SessionSubscribeEvent event) {
- // 获取订阅的 destination
- String destination = (String) event.getMessage().getHeaders().get("simpDestination");
- // 获取登录信息
- Object loginBean = ((Map) event.getMessage().getHeaders().get("simpSessionAttributes")).get(WebSocketConstant.WEBSOKET_LOGINBEAN);
- //TODO 向 redis 中增加连接信息
- }
- }
- package cn.fjhdtp.message.listener;
- import org.springframework.context.ApplicationListener;
- import org.springframework.stereotype.Component;
- import org.springframework.web.socket.messaging.SessionDisconnectEvent;
- import java.util.Map;
- @Component
- public class SessionDisconnectEventListener implements ApplicationListener<SessionDisconnectEvent> {
- @Override
- public void onApplicationEvent(SessionDisconnectEvent event) {
- // stomp 连接断开, 清除连接信息
- // 从 attributes 中获取登录信息(或其他信息)
- Object loginBean = ((Map) event.getMessage().getHeaders().get("simpSessionAttributes")).get(WebSocketConstant.WEBSOKET_LOGINBEAN);
- // 从 redis 中移除连接信息
- }
- }
当然, 有些情况下可能不会正常的触发断开连接的事件(在 was 下就不会有这个事件), 因此还会需要 HeartBeat.
来源: http://www.bubuko.com/infodetail-2589849.html