Redis 发布订阅模式

本贴最后更新于 870 天前,其中的信息可能已经事过景迁

前言

  项目中新增在线咨询功能,使用 netty 框架实现。代码开发完毕在本地调试没有问题,放到 sit 环境测试中出现部分消息无法接收到的情况,排查后定位到问题是由于后端项目集群部署导致,记录一下处理过程。

原因分析

  netty 框架中客户端与服务端建立的 websocket 链接后会在当前服务器产生一个 channel 通道,后续的发送消息以及接收消息都是通过这个通道来实现,出现消息发送失败问题的原因是后端项目集群部署会有多个应用服务器,建立 websocket 链接的应用服务器可能和发送消息的应用服务器不是同一个。比如用户 A 与用户 B 与应用服务器 A 建立了 websocket 链接,应用服务器 A 中存在两个用户的链接通道,但是当用户 A 给用户 B 发送消息的时候,这次发送请求被负载均衡转发到了应用服务器 B 上面,应用服务器 B 中并没有存在用户 B 的通道,就没有办法给用户 B 推送消息。

解决方案

  通过 redis 的发布订阅模式解决(MQ 同理),当用户 A 给用户 B 发送消息时,接收到发送请求的应用服务器向 redis 的指定消息队列中添加参数,无需处理发送。同时所有应用服务器监听此队列,只要队列发生了变化,监听器就能获取到队列中的参数,获取到消息后在监听器中判断当前服务器中是否存在用户 B 的通道,如果存在了则使用该通道给用户 B 推送消息,否则不处理即可。只要用户 B 在线,则其只可能与一台应用服务器建立了通道,也就是 redis 中的队列消息必定会被与用户 B 建立了链接的应用服务器监听到,消息自然就能推送出去了。如果都不存在则说明用户 B 不在线,再进行其他业务逻辑处理即可。

代码实现

  1. 建立两个集合,用来存在 channel 通道以及用户与通道的对应关系。

    /**
     * 存储每个客户端接入进来时的channel对象
     * 主要用于使用writeAndFlush方法广播信息
     */
    public static ChannelGroup imsChannelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
    
    /**
     * 用于客户端和服务端握手时存储用户id和netty ChannelId对应关系
     */
    public static Map<String, ChannelId> channelMap = new ConcurrentHashMap<>();
    
  2. 建立链接时,将相关信息存入集合中。

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        // 省略部分代码...
        Channel channel = ctx.channel();
        imsChannelGroup.add(channel);
        // 这里的userid为自定义,标记消息接收方用户唯一即可
        channelMap.put(userid, channel.id());
    }
    
  3. 发送消息时,向 redis 的指定通道队列中添加参数(使用 redisTemplate)。

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, WebSocketFrame frame) {
        // 省略部分代码...
        // 组装放入redis队列中的参数,根据业务逻辑自定义
        JSONObject jsonObject = new JSONObject();
        // 将唯一标识放入参数中,确保其他应用服务器能够通过用户唯一标识获取到消息接收方的channel
        jsonObject.putOpt("userid", userid);
        // 其他业务参数
        jsonObject.putOpt("xxx", xxx);
        // 第一个参数为redis的消息队列名称
        redisTemplate.convertAndSend("imstopic", jsonObject);
    }
    
  4. 添加监听器,监听队列消息,进行逻辑处理(可以根据传入的其他业务参数进行业务逻辑处理)。

    @Component
    public class ImsListenerAdapter implements MessageListener {
    
        @Override
        public void onMessage(Message message, byte[] bytes) {
    	// 获取到队列中的消息
    	String jsonStr = JSONUtil.formatJsonStr(new String(message.getBody()));
    	JSONObject jsonObject = JSONUtil.parseObj(jsonStr);
    	// 根据userid判断当前应用服务器是否存在消息接收方的channel
    	String userid = jsonObject.getStr("userid");
    	ChannelId channelId = channelMap.get(userid);
            if (channelId == null || imsChannelGroup.find(channelId) == null) {
    	    // 不存在不做处理
                return;
            }
    	// 存在,根据消息接收方的channel发送消息
    	Channel toUserChannel = imsChannelGroup.find(channelId);
    	// TextWebSocketFrame的构造参数为字符串,这里因为业务需要所以发送了json格式消息,由前端将json中的文本拿出来显示,如果不需要也可以直接在构造中传入消息文本字符串
            toUserChannel.writeAndFlush(new TextWebSocketFrame(JSONUtil.toJsonStr(xxx)));
        }
    }
    
  5. 添加 redis 配置,配置监听器监听 redis 消息队列。

    @Configuration
    public class RedisCacheConfig {
    
        /**
         * 订阅消息队列配置
         */
        @Bean
        RedisMessageListenerContainer container(RedisConnectionFactory redisConnectionFactory, MessageListenerAdapter imsMessageListenerAdapter) {
            RedisMessageListenerContainer container = new RedisMessageListenerContainer();
            container.setConnectionFactory(redisConnectionFactory);
            // 可以添加多个messageListener,配置不同的交换机
            container.addMessageListener(imsMessageListenerAdapter, new PatternTopic("imstopic"));
            return container;
        }
    
        /**
         * 实例化监听器
         */
        @Bean
        MessageListenerAdapter imsMessageListenerAdapter() {
            return new MessageListenerAdapter(new ImsListenerAdapter());
        }
    }
    

扩展

  消息发送中其实经常存在对方不在线的情况,为了防止消息丢失,在逻辑上做了以下处理:

  1. 在第 3 步往 redis 消息队列插入数据之前,先将这条消息存入数据库中,状态置为未发送,将表中消息 id 一起放入队列参数中。
  2. 在第 5 步判断当前应用服务器是否存在消息接收方 channel 时,从队列参数中获取消息 id,根据 id 从数据库中查出消息内容,然后调用 writeAndFlush 方法发送消息。消息发送成功后,将表中该条消息置为已发送。
  3. 如果对方不在线,设置一个定时任务查询未发送的消息,通过其他方式提醒给用户,如微信公众号消息提醒等。这个查询时记得设置时间间隔,比如当前时间 10s 前的数据,防止将刚插入库的、监听器还未处理完的消息查询出来。

  在做上述逻辑处理过程中还遇到了另外一个问题,在监听器中查询消息以及更新消息时,需要使用 dao 层方法,使用 @Autowired 注入 dao 层对象时发现 dao 层对象为 null。百度查了一下发现在监听器中无法注入 bean,于是找了其他办法来解决这个问题,后面会单独写一篇文章来记录。

  • Redis

    Redis 是一个开源的使用 ANSI C 语言编写、支持网络、可基于内存亦可持久化的日志型、Key-Value 数据库,并提供多种语言的 API。从 2010 年 3 月 15 日起,Redis 的开发工作由 VMware 主持。从 2013 年 5 月开始,Redis 的开发由 Pivotal 赞助。

    284 引用 • 247 回帖 • 181 关注
  • WebSocket

    WebSocket 是 HTML5 中定义的一种新协议,它实现了浏览器与服务器之间的全双工通信(full-duplex)。

    48 引用 • 206 回帖 • 398 关注
  • Netty

    Netty 是一个基于 NIO 的客户端-服务器编程框架,使用 Netty 可以让你快速、简单地开发出一个可维护、高性能的网络应用,例如实现了某种协议的客户、服务端应用。

    49 引用 • 33 回帖 • 22 关注

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...