上一篇 Spring Websocket 配置 介绍了单机下的服务端 websocket 配置。
这种实现方法,在多台服务器的情况下会出问题。因为我这个业务场景是经常服务器主动下发消息,所以会将 WebSocketSession
缓存到内存里,在需要的时候根据用户主键去查找对应的连接发送数据。
那么在多台服务器这种内存缓存就不凑效了,怎么办呢?第一想法是把 WebSocketSession
集中缓存,如缓存到 Redis 中。然而 WebSocketSession
不支持序列化,无法存储 redis 中。
既然无法集中缓存,那么,我们在需要发送数据时,分别向各台服务器发送通知:请向用户 A 发消息。每台服务器收到通知后,分别遍历自己缓存内的 WebSocketSession
如果有用户 A 的连接,则发送消息即可。这不就是 MQ 发布/订阅模式的应用场景嘛?
恰好,我们系统使用了 redis ,而 redis 支持发布/订阅模式,那就开始改造吧。
1. redis 配置
<context:annotation-config/> <bean class="org.springframework.session.data.redis.config.annotation.web.http.RedisHttpSessionConfiguration"> <!-- 超时时间(秒) --> <property name="maxInactiveIntervalInSeconds" value="3600" /> </bean> <bean id="jedisPoolConfig" class="redis.clients.jedis.JedisPoolConfig"> <property name="maxTotal" value="30"/> <property name="maxIdle" value="10"/> <property name="minIdle" value="1"/> <property name="maxWaitMillis" value="30000"/> <property name="testOnBorrow" value="true"/> <property name="testOnReturn" value="false"/> <property name="testWhileIdle" value="false"/> </bean> <!--2--> <bean id="jedisConnectionFactory" class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory"> <property name="hostName" value="${redis_server}" /> <property name="port" value="6379" /> <property name="password" value="${redis_password}" /> <property name="timeout" value="3000" /> <property name="poolConfig" ref="jedisPoolConfig" /> <property name="usePool" value="true" /> </bean> <bean id="redisTemplate" class="org.springframework.data.redis.core.RedisTemplate"> <property name="connectionFactory" ref="jedisConnectionFactory"/> <property name="defaultSerializer"> <bean class="org.springframework.data.redis.serializer.StringRedisSerializer"/> </property> </bean> <bean id="websocketTopicMessageListener" class="com.xx.xx.websocket.redisListener.WebsocketTopicMessageListener"> </bean> <bean id="topicContainer" class="org.springframework.data.redis.listener.RedisMessageListenerContainer" destroy-method="destroy"> <property name="connectionFactory" ref="jedisConnectionFactory"/> <property name="taskExecutor"><!-- 此处有个奇怪的问题,无法正确使用其他类型的Executor --> <bean class="org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler"> <property name="poolSize" value="3"></property> </bean> </property> <property name="messageListeners"> <map> <entry key-ref="websocketTopicMessageListener"> <bean class="org.springframework.data.redis.listener.ChannelTopic"> <constructor-arg value="websocket:sendMsgTopic"/> </bean> </entry> </map> </property> </bean>
上面配置中,我们定义了一个 Topic:websocket:sendMsgTopic
并定义了对应的监听器 websocketTopicMessageListener
。
监听器的实现如下,实现很简单,收到订阅的消息后通过 userMessagesHandler.sendMessageToUser()
方法向 websocket 连接发送数据。
public class WebsocketTopicMessageListener implements MessageListener { @Resource private RedisTemplate redisTemplate; @Resource private UserMessagesHandler userMessagesHandler; @Override @Transactional(readOnly = true) public void onMessage(Message message, byte[] pattern) { byte[] body = message.getBody(); byte[] channel = message.getChannel(); String itemValue = (String) redisTemplate.getValueSerializer().deserialize(body); String topic = (String) redisTemplate.getStringSerializer().deserialize(channel); Gson gson = new Gson(); UserNotice userNotice = gson.fromJson(itemValue, UserNotice.class); if (null != userNotice) { User user = userDAO.get(userNotice.getUserId()); userMessagesHandler.sendMessageToUser(user.getLoginName(), new TextMessage(itemValue)); } } }
所以,之前调用 userMessagesHandler.sendMessageToUser()
的地方就可以改为向 Topic:websocket:sendMsgTopic
发布消息了。
public void sendNotification(UserNotice userNotice) { String channel = "websocket:sendMsgTopic"; Gson gson = new Gson(); redisTemplate.convertAndSend(channel, gson.toJson(userNotice)); }
通过以上改造,我们的 websocket 就能支持多服务器负载均衡部署了。
欢迎来到这里!
我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。
注册 关于