1. 依赖包
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-websocket</artifactId>
<version>4.1.4.RELEASE</version>
</dependency>
2. spring 配置
我关闭了 sockjs,因为我的系统里使用了异步,打开了 <task:scheduler id="taskScheduler" pool-size="10"/>
和 <task:executor id="async_executor" pool-size="10"/>
,开启 sockjs 的时候会和 async_executor 这个 bean 冲突,没有找到解决办法,所以就先关闭了。。
<websocket:handlers allowed-origins="*">
<websocket:mapping path="/ws/user-channel" handler="userMessagesHandler"/>
<websocket:handshake-interceptors>
<bean class="com.xx.xx.websocket.interceptor.HandshakeInterceptor"/>
</websocket:handshake-interceptors>
<!-- 开启sockjs,去掉则关闭sockjs -->
<!--<websocket:sockjs/>-->
</websocket:handlers>
3. 拦截器实现
HandshakeInterceptor
这个拦截器很简单,我只做了把系统 session 里的用户名取出来放到 WebSocketSession 里,以便在服务端主动发消息时,能找到对应用户。
public class HandshakeInterceptor extends HttpSessionHandshakeInterceptor {
@Override
public boolean beforeHandshake(ServerHttpRequest request,
ServerHttpResponse response, WebSocketHandler wsHandler,
Map<String, Object> attributes) throws Exception {
// 解决The extension [x-webkit-deflate-frame] is not supported问题
if (request.getHeaders().containsKey("Sec-WebSocket-Extensions")) {
request.getHeaders().set("Sec-WebSocket-Extensions",
"permessage-deflate");
}
if (request instanceof ServletServerHttpRequest) {
ServletServerHttpRequest servletRequest = (ServletServerHttpRequest) request;
HttpSession session = servletRequest.getServletRequest().getSession(false);
if (session != null) {
//使用userName区分WebSocketHandler,以便定向发送消息
User user = (User) session.getAttribute(Constants.SessionAttr.USER.getValue());
String userName = user.getLoginName();
attributes.put(WebsocketEndPoint.WEBSOCKET_USERNAME, userName);
}
}
return super.beforeHandshake(request, response, wsHandler, attributes);
}
@Override
public void afterHandshake(ServerHttpRequest request,
ServerHttpResponse response, WebSocketHandler wsHandler,
Exception ex) {
super.afterHandshake(request, response, wsHandler, ex);
}
}
4. 消息处理类实现
UserMessagesHandler
是 websocket 消息处理类。
其中 afterConnectionEstablished
方法主要是将请求 session 缓存到 CopyOnWriteArraySet
。这里有个特殊处理,如果 sessionId 已存在缓存里,则将之前是连接关闭,并将之移出 CopyOnWriteArraySet
。
sendMessageToUser
方法是发送消息到具体用户的方法。
@Component
public class UserMessagesHandler implements WebSocketHandler {
private static final Logger logger = LoggerFactory.getLogger(UserMessagesHandler.class);
public static final String WEBSOCKET_USERNAME = "WS_USERNAME";
public static final String SESSION_ID_ATTRIBUTE = "HTTP.SESSION.ID";
// private static final ArrayList<WebSocketSession> users;
public static CopyOnWriteArraySet<WebSocketSession> users = new CopyOnWriteArraySet<WebSocketSession>();
@Resource
private UserService userService;
@Resource
private UserNoticeDAO userNoticeDAO;
@Override
// @Transactional(readOnly = true)
public void afterConnectionEstablished(WebSocketSession session) {
try {
String sessionId = (String) session.getAttributes().get(SESSION_ID_ATTRIBUTE);
if (StringUtils.hasText(sessionId)) {
for (WebSocketSession user : users) {
if (sessionId.equals(user.getAttributes().get(SESSION_ID_ATTRIBUTE))) {
user.close(new CloseStatus(CloseStatus.SESSION_NOT_RELIABLE.getCode(), "Multiple tab websocket connect"));
users.remove(user);
}
}
}
logger.debug("connect to the websocket success......");
users.add(session);
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception {
//sendMessageToUsers();
TextMessage returnMessage = new TextMessage(message.getPayload()+" received at server");
// session.sendMessage(returnMessage);
}
@Override
public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
if(session.isOpen()){
session.close();
}
logger.debug("websocket connection closed......");
users.remove(session);
}
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception {
logger.debug("websocket connection closed......");
users.remove(session);
}
@Override
public boolean supportsPartialMessages() {
return false;
}
/**
* 给所有在线用户发送消息
*
* @param message
*/
public void sendMessageToUsers(TextMessage message) {
for (WebSocketSession user : users) {
try {
if (user.isOpen()) {
user.sendMessage(message);
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
/**
* 给某个用户发送消息
*
* @param userName
* @param message
*/
public void sendMessageToUser(String userName, TextMessage message) {
for (WebSocketSession user : users) {
if (user.getAttributes().get(WEBSOCKET_USERNAME).equals(userName)) {
try {
if (user.isOpen()) {
user.sendMessage(message);
}
} catch (IOException e) {
e.printStackTrace();
}
break;
}
}
}
}
欢迎来到这里!
我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。
注册 关于