上一篇 Spring Websocket 配置 介绍了单机下的服务端 websocket 配置。
这种实现方法,在多台服务器的情况下会出问题。因为我这个业务场景是经常服务器主动下发消息,所以会将 WebSocketSession
缓存到内存里,在需要的时候根据用户主键去查找对应的连接发送数据。
那么在多台服务器这种内存缓存就不凑效了,怎么办呢?第一想法是把 WebSocketSession
集中缓存,如缓存到 Redis 中。然而 WebSocketSession
不支持序列化,无法存储 redis 中。
既然无法集中缓存,那么,我们在需要发送数据时,分别向各台服务器发送通知:请向用户 A 发消息。每台服务器收到通知后,分别遍历自己缓存内的 WebSocketSession
如果有用户 A 的连接,则发送消息即可。这不就是 MQ 发布/订阅模式的应用场景嘛?
恰好,我们系统使用了 redis ,而 redis 支持发布/订阅模式,那就开始改造吧。
1. redis 配置
<context:annotation-config/>
<bean class="org.springframework.session.data.redis.config.annotation.web.http.RedisHttpSessionConfiguration">
<!-- 超时时间(秒) -->
<property name="maxInactiveIntervalInSeconds" value="3600" />
</bean>
<bean id="jedisPoolConfig" class="redis.clients.jedis.JedisPoolConfig">
<property name="maxTotal" value="30"/>
<property name="maxIdle" value="10"/>
<property name="minIdle" value="1"/>
<property name="maxWaitMillis" value="30000"/>
<property name="testOnBorrow" value="true"/>
<property name="testOnReturn" value="false"/>
<property name="testWhileIdle" value="false"/>
</bean>
<!--2-->
<bean id="jedisConnectionFactory" class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory">
<property name="hostName" value="${redis_server}" />
<property name="port" value="6379" />
<property name="password" value="${redis_password}" />
<property name="timeout" value="3000" />
<property name="poolConfig" ref="jedisPoolConfig" />
<property name="usePool" value="true" />
</bean>
<bean id="redisTemplate" class="org.springframework.data.redis.core.RedisTemplate">
<property name="connectionFactory" ref="jedisConnectionFactory"/>
<property name="defaultSerializer">
<bean class="org.springframework.data.redis.serializer.StringRedisSerializer"/>
</property>
</bean>
<bean id="websocketTopicMessageListener" class="com.xx.xx.websocket.redisListener.WebsocketTopicMessageListener">
</bean>
<bean id="topicContainer" class="org.springframework.data.redis.listener.RedisMessageListenerContainer" destroy-method="destroy">
<property name="connectionFactory" ref="jedisConnectionFactory"/>
<property name="taskExecutor"><!-- 此处有个奇怪的问题,无法正确使用其他类型的Executor -->
<bean class="org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler">
<property name="poolSize" value="3"></property>
</bean>
</property>
<property name="messageListeners">
<map>
<entry key-ref="websocketTopicMessageListener">
<bean class="org.springframework.data.redis.listener.ChannelTopic">
<constructor-arg value="websocket:sendMsgTopic"/>
</bean>
</entry>
</map>
</property>
</bean>
上面配置中,我们定义了一个 Topic:websocket:sendMsgTopic
并定义了对应的监听器 websocketTopicMessageListener
。
监听器的实现如下,实现很简单,收到订阅的消息后通过 userMessagesHandler.sendMessageToUser()
方法向 websocket 连接发送数据。
public class WebsocketTopicMessageListener implements MessageListener {
@Resource
private RedisTemplate redisTemplate;
@Resource
private UserMessagesHandler userMessagesHandler;
@Override
@Transactional(readOnly = true)
public void onMessage(Message message, byte[] pattern) {
byte[] body = message.getBody();
byte[] channel = message.getChannel();
String itemValue = (String) redisTemplate.getValueSerializer().deserialize(body);
String topic = (String) redisTemplate.getStringSerializer().deserialize(channel);
Gson gson = new Gson();
UserNotice userNotice = gson.fromJson(itemValue, UserNotice.class);
if (null != userNotice) {
User user = userDAO.get(userNotice.getUserId());
userMessagesHandler.sendMessageToUser(user.getLoginName(), new TextMessage(itemValue));
}
}
}
所以,之前调用 userMessagesHandler.sendMessageToUser()
的地方就可以改为向 Topic:websocket:sendMsgTopic
发布消息了。
public void sendNotification(UserNotice userNotice) {
String channel = "websocket:sendMsgTopic";
Gson gson = new Gson();
redisTemplate.convertAndSend(channel, gson.toJson(userNotice));
}
通过以上改造,我们的 websocket 就能支持多服务器负载均衡部署了。
欢迎来到这里!
我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。
注册 关于