Spring Websocket 配置

本贴最后更新于 2690 天前,其中的信息可能已经物是人非

1. 依赖包

<dependency> <groupId>org.springframework</groupId> <artifactId>spring-websocket</artifactId> <version>4.1.4.RELEASE</version> </dependency>

2. spring 配置

我关闭了 sockjs,因为我的系统里使用了异步,打开了 <task:scheduler id="taskScheduler" pool-size="10"/><task:executor id="async_executor" pool-size="10"/> ,开启 sockjs 的时候会和 async_executor 这个 bean 冲突,没有找到解决办法,所以就先关闭了。。

<websocket:handlers allowed-origins="*"> <websocket:mapping path="/ws/user-channel" handler="userMessagesHandler"/> <websocket:handshake-interceptors> <bean class="com.xx.xx.websocket.interceptor.HandshakeInterceptor"/> </websocket:handshake-interceptors> <!-- 开启sockjs,去掉则关闭sockjs --> <!--<websocket:sockjs/>--> </websocket:handlers>

3. 拦截器实现

HandshakeInterceptor 这个拦截器很简单,我只做了把系统 session 里的用户名取出来放到 WebSocketSession 里,以便在服务端主动发消息时,能找到对应用户。

public class HandshakeInterceptor extends HttpSessionHandshakeInterceptor { @Override public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception { // 解决The extension [x-webkit-deflate-frame] is not supported问题 if (request.getHeaders().containsKey("Sec-WebSocket-Extensions")) { request.getHeaders().set("Sec-WebSocket-Extensions", "permessage-deflate"); } if (request instanceof ServletServerHttpRequest) { ServletServerHttpRequest servletRequest = (ServletServerHttpRequest) request; HttpSession session = servletRequest.getServletRequest().getSession(false); if (session != null) { //使用userName区分WebSocketHandler,以便定向发送消息 User user = (User) session.getAttribute(Constants.SessionAttr.USER.getValue()); String userName = user.getLoginName(); attributes.put(WebsocketEndPoint.WEBSOCKET_USERNAME, userName); } } return super.beforeHandshake(request, response, wsHandler, attributes); } @Override public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception ex) { super.afterHandshake(request, response, wsHandler, ex); } }

4. 消息处理类实现

UserMessagesHandler 是 websocket 消息处理类。

其中 afterConnectionEstablished 方法主要是将请求 session 缓存到 CopyOnWriteArraySet 。这里有个特殊处理,如果 sessionId 已存在缓存里,则将之前是连接关闭,并将之移出 CopyOnWriteArraySet

sendMessageToUser 方法是发送消息到具体用户的方法。

@Component public class UserMessagesHandler implements WebSocketHandler { private static final Logger logger = LoggerFactory.getLogger(UserMessagesHandler.class); public static final String WEBSOCKET_USERNAME = "WS_USERNAME"; public static final String SESSION_ID_ATTRIBUTE = "HTTP.SESSION.ID"; // private static final ArrayList<WebSocketSession> users; public static CopyOnWriteArraySet<WebSocketSession> users = new CopyOnWriteArraySet<WebSocketSession>(); @Resource private UserService userService; @Resource private UserNoticeDAO userNoticeDAO; @Override // @Transactional(readOnly = true) public void afterConnectionEstablished(WebSocketSession session) { try { String sessionId = (String) session.getAttributes().get(SESSION_ID_ATTRIBUTE); if (StringUtils.hasText(sessionId)) { for (WebSocketSession user : users) { if (sessionId.equals(user.getAttributes().get(SESSION_ID_ATTRIBUTE))) { user.close(new CloseStatus(CloseStatus.SESSION_NOT_RELIABLE.getCode(), "Multiple tab websocket connect")); users.remove(user); } } } logger.debug("connect to the websocket success......"); users.add(session); } catch (Exception e) { e.printStackTrace(); } } @Override public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception { //sendMessageToUsers(); TextMessage returnMessage = new TextMessage(message.getPayload()+" received at server"); // session.sendMessage(returnMessage); } @Override public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception { if(session.isOpen()){ session.close(); } logger.debug("websocket connection closed......"); users.remove(session); } @Override public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception { logger.debug("websocket connection closed......"); users.remove(session); } @Override public boolean supportsPartialMessages() { return false; } /** * 给所有在线用户发送消息 * * @param message */ public void sendMessageToUsers(TextMessage message) { for (WebSocketSession user : users) { try { if (user.isOpen()) { user.sendMessage(message); } } catch (IOException e) { e.printStackTrace(); } } } /** * 给某个用户发送消息 * * @param userName * @param message */ public void sendMessageToUser(String userName, TextMessage message) { for (WebSocketSession user : users) { if (user.getAttributes().get(WEBSOCKET_USERNAME).equals(userName)) { try { if (user.isOpen()) { user.sendMessage(message); } } catch (IOException e) { e.printStackTrace(); } break; } } } }
  • Spring

    Spring 是一个开源框架,是于 2003 年兴起的一个轻量级的 Java 开发框架,由 Rod Johnson 在其著作《Expert One-On-One J2EE Development and Design》中阐述的部分理念和原型衍生而来。它是为了解决企业应用开发的复杂性而创建的。框架的主要优势之一就是其分层架构,分层架构允许使用者选择使用哪一个组件,同时为 JavaEE 应用程序开发提供集成的框架。

    948 引用 • 1460 回帖
  • WebSocket

    WebSocket 是 HTML5 中定义的一种新协议,它实现了浏览器与服务器之间的全双工通信(full-duplex)。

    48 引用 • 206 回帖 • 295 关注

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...