IM 项目顶层设计思路

本贴最后更新于 428 天前,其中的信息可能已经时移俗易

原文:www.yuque.com/snab/mallc...

前言

抛开详细代码不谈,从设计者的角度思考一下,想要实现一个完善的 IM 系统,需要必备什么样的功能:

image

一个千万级的消息系统,任何一个小功能,都可能成为性能瓶颈,都需要复杂的设计去支撑。

系统架构图

image

  • WebSocket:维护和用户的连接通道,可以接收消息,也可以推送消息,为有状态服务
  • IM 服务:负责消息的发送逻辑,处理单聊群聊的消息
  • Logic 服务:处理用户的心跳,上下线,联系人,加好友,创群组等逻辑
  • Auth 服务:处理用户认证,权限等需求
  • Router:推送消息时,不同用户在不同 WebSocket 服务上,确保正确推送,与可靠推送。

交互的逻辑大致如下:

  1. 用户 A 和 WebSocket 服务建立连接。之后都通过该连接发送消息,接受消息。
  2. 用户 A 发送了一条群消息“在吗”,WebSocket 服务将消息通过 dubbo 转发给 IM 服务,由于 IM 服务是无状态的,可以通过负载均衡随机发到某一台上。
  3. IM 服务将消息持久化,然后将消息投递到消息队列 MQ,这样能快速响应前端,并且 mq 的消费者根据负载慢慢的进行后续的推送,写扩散等操作。
  4. 消费者会判断,根据是否热点群聊的消息做不同逻辑。如果是热点群聊,只写热点信箱。如果是单聊或者普通群聊,会写扩散到每个群成员信箱。这里假设是小群,会写入 B 和 C 的信箱。
  5. 将消息投递信箱后,需要将消息推送给用户。这里可以根据是否在线,在线的进行 WebSocket 推送,离线的进行 push 通知。由于用户的连接在不同的 WebSocket 上,需要 Router 服务推送到 B 和 C 所在的不同 WebSocket 方案有两种,后续介绍。
  6. 推送的时候需要确保消息的可靠性,如果保证一定推送成功?可能要做应用层的 ack,类似 tcp 的滑动窗口确认。
  7. 用户在查询自己的会话列表的时候,需要有一个聚合层聚合用户信箱,以及热点信箱。再严格排序后返回给用户。所谓之推拉结合。

集群推送

http 和 websocket 的底层其实都是依赖 tcp 建立连接进行通信的。他们的差别就是:

  1. http 是无状态的。每次请求都会重新握手建立 tcp 链接再发送消息,并获取响应。因此 http 请求可以随意的进行负载均衡。第一次请求发给了机器 A,第二次请求就发给了机器 B。
  2. websocket 是有状态的,当他建立 tcp 连接后,就会一直复用那个连接进行通信。假设一开始连上了机器 A,收发信息就一直在机器 A。直到连接断开,重连后才会更换连接。

我们项目用的netty 来实现 websocket,和用户的连接就是一个 channel。需要给用户推送消息的时候,直接往 channel​里面 write​消息就好了。

/**
 * 给本地channel发送消息
 *
 * @param channel
 * @param wsBaseResp
 */
private void sendMsg(Channel channel, WSBaseResp<?> wsBaseResp) {
    channel.writeAndFlush(new TextWebSocketFrame(JSONUtil.toJsonStr(wsBaseResp)));
}

在之前 微信扫码登录技术方案实现1,我们已经描述了如何在用户登录的时候,将 uid 和 channel 关联起来缓存在 jvm 的 map 里,这样要推送的时候只需要通过 uid 取出 channel 进行推送

image

但在集群的场景下,上述方案会失效。因为集群环境下,如果使用一台服务器维护 uid 和 channel 的映射关系,就会导致这台机器成为整个集群的“单点瓶颈”

image

你要推送的用户,在别的机器上,你怎么找到对应的机器?根本的原因是因为连接的管理在 jvm 层面,假设 A 要发消息给 C,我们不知道 C 在哪一台WebSocket​。所以这时候可以借助一个中心化的中间件,比如 redis,来存储他们的关系。

redis 存储 channel ?

单机存储的方案在集群模式下不可行。那是否可以将 channel 和 uid 的关系存储在类似 Redis 这种第三方存储介质上面呢?通过redis.get(uid)​拿到字符串,并反序列化成 channel​,然后发送消息。

private void sendMsg( WSBaseResp<?> wsBaseResp,Long uid) {
    Channel channel = RedisUtils.get(uid.toString(), Channel.class);
    channel.writeAndFlush(new TextWebSocketFrame(JSONUtil.toJsonStr(wsBaseResp)));
}

其实这样会报错的,channel 是本地的 socket 连接,没法进行存储与反序列化。

精准投递消息

可以对上面的方案优化下,本地依然要维护 uid 和 channel 的关系(这可是根基),redis 再维护一个用户的连接状态,比如用户在哪台机器上连接。这样通过 router 发送消息的时候,就知道用户的消息应该发送到哪台机器上。

image

大概的流程如下:

  1. A 发送了一条给 C 的消息。通过 channel 发送给了 10.102.1.1 这台 websocket,它通过 dubbo 将消息随意转发给了一台 IM 服务。
  2. IM 服务将消息持久化后,调用 Router,推送消息给 C
  3. Router 查 redis 的中心化管理,查到 C 目前连接在 10.102.1.2 上,并通过 tcp 连接将 C 的消息推送给 10.102.1.2。Router 会和所有 websocket 都维护一条 tcp 连接。查到具体的 ip 后,对指定 websocket 服务器进行消息推送。
  4. websocket 服务收到的请求格式为给 uid 发送 xx 消息,所以它会通过 uid 在本地的连接管理中,查出用户具体的 channel。然后调用 channel.write(消息)方法,给用户推送消息。

存在的问题

1. 频繁操作数据库

需要频繁的更新 redis 去维护用户和 websocket 服务的映射。(这个是小问题,因为我们正好需要做用户的上下线,也复用这个功能)

2. 连接数爆炸

websocket 的瓶颈就在连接数,如果连接满了,就要水平扩容 websocket,这样才能支撑更多人同时在线。如果用户体量非常大,甚至需要上千个 websocket 和上千个 router。

image

  • C10K 问题2

这其实是一种很可怕的事,单纯的路由连接就达到上千,占用了 websocket 的连接资源,导致一台 websocket 能连接的用户数变少。

这种问题其实也会出现在 dubbo 服务中,dubbo 单实例集群达到 1000 以上,他们是怎么做的呢?

由于传统的 service 服务都是无状态的,我们的连接可以分组管理。每台 service 只需要和少数的下游实例维护 tcp 连接即可,不需要连接所有。

但是 websocket 是有状态的,一台 router 必须连接所有的 websocket,他的消息有可能需要路由到任意一台 websocket 上。

分层路由

image

我们可以用分层路由的思想,中间加一层路由,设定路由规则。这样可以有效减少连接数,但是会增加消息的推送链路,适用于真的很大型的集群场景。

临时连接

当然还有很多方案,甚至我们可以思考,为啥去保证 tcp 连接?如果每次发消息都建立 tcp 连接,那就是三次握手 3 倍的 RTT。如果我们不在乎这些连接耗时,也可以直接采用 http 推送,或者临时建立连接的方式。

image

这种订阅表,路由节点,就也有点像分布式集群的 mq,感兴趣可以看看文档

3. 实现复杂

需要指定 ip 推送,维护 tcp 连接。如果用 dubbo 会稍微简单些。

4. 延迟叠加

哪怕开启了多线程,也依然会有一台机器的单点,需要对所有的群成员进行消息的扇出(写扩散)。这里会导致接收者接到的消息延迟叠加。(当然,用线程池异步扇出可以提高速度,但是会回到问题 4,总体开销依然是不变的)

5. 雪崩问题

如果系统负载提高,比如需要推送的消息量突然变大,导致瓶颈从websocket​一直传导到 router​再传到 im服务​,整个集群会出现雪崩,需要有一个消息队列来进行削峰填谷。

集群广播消息

针对于以上精准推送的问题,如果我告诉你,有一种方案,能够无需维护连接,消息的发送只需要一个消息副本,并且没有消息扇出的压力,只需要写一次。你满不满意?它就是集群广播。

image

对于万人群聊,一般系统的压力就在于消息的扇出(写扩散)。如果按照精准投递的话,我们的消息需要查询 redis 中心路由,然后将消息投递 1w 次。而如果用消息广播的形式,消息只需要投递一次。由 websocket 自己进行广播消息的拉取过滤。

过滤流程

image

  1. mq 的消息消费模式为集群消费,确保每台websocket​都能消费到所有需要投递的消息。
  2. 对比推送的 uid 在不在本地连接管理​的列表,如果不在,直接丢弃消息,也叫过滤消息。
  3. 如果在本地连接管理,根据uid​取出 channel​,就可以进行消息推送了。

优化 1:消息副本优化

集群广播和精准投递比起来,消息副本少了很多。但是如果用刚刚的图展示,反而看不出来效果。

image

这差别可就大了,假设是万人群聊。

精准投递总共传输了 1w * 3= 3w 份消息, 1w x 3 = 3w 个 uid,

集群广播传输了 1w+3 份消息,2w+1w*2=4w 个 uid。

因为终端投递是省不了的,但是可见消息副本数大大下降。大家可能会觉得有些赖皮,1w 个连接涉及到的 websocket 其实会有更多个。消息大小远大于 uid,我们以消息为准

  • 精准投递:u*3 份消息,u=群聊在线人数,3=消息传输的链路,固定值
  • 集群广播:u+n 份消息,u=群聊在线人数,n=websocket 的集群数,不固定,但是极小。

可见该优化在大群聊下的效果有多强,当然其实集群广播方案也有缺点,就是在 im 中,小群聊甚至单聊的占比很大,在 u 很小,且 n 很大的情况下,集群广播就失去了他的优势。

优化 2 :消息过滤优化

根据刚刚的过滤流程,我们可知道消息内的 uid 过滤,都是在本地消费者逻辑去过滤的,不存在就直接丢弃消息。这时候大量与本 websocket 无关的消息都被拉取过来了,中间的网络 io 都浪费了。这还真没啥办法解决。但是我们可以在海绵里挤水呀,想想还有啥能优化的。

集群广播最大的毛病就在于,很多不属于我们 websocket 的消息推送,也会被我们读取,然后在本地过滤扔掉。这里浪费了网络 IO,浪费了本地反序列化的 cpu。

这两点能不能改善呢???

  1. 我们可以通过设置 header 的方式。将推送的 uids 存在 header 里。这样我们拉取消息到消费端的时候。就可以在序列化前,先在 mq 的过滤器里去过滤消息,节省不必要的反序列化。

    1. 这里需要注意一点,header 有长度限制。我们需要注意大群聊 uid 过多分批发送。或者群聊压根不过滤,就考虑单聊场景。
  2. 上面的方案是在消费端才过滤,只节省了反序列化。我还想贪心点,在 broker 端过滤消息,直接节省 IO 的传输成本。

    调研了两种 broker 端过滤方案,tag 的方式首先不支持我们的需求,没法存到用户纬度那么大。我们可以用 sql29 的方式写表达式匹配。把 uids 写到消息的 header 里,sql29 表达式可以去匹配。过滤出包含我们有的 uid 的消息。

    但是 sql29 并不支持动态过滤。而我们 websocket 的连接用户是会一直变化的。每一次请求都需要做到不一样。

    所以解决方案 2 的方案相当于破产了,但是 rocketmq 最大的优势,就是纯 java 编写。我们可以去魔改源码改 broker 的逻辑呀,专门改一个更适合 im 场景的 mq 过滤交互框架,也是一个不错的选择。

    无效

虽然方案不行,但是我依然带着大家去大胆假设,小心求证。目的是带着大家了解我们使用的工具,以及它的能力。这是一次对 rocketmq 高级用法的探索,和我们业务的完美结合,希望大家用心去思考,去感受。

存在的问题

其实问题也比较明显。每个 websocket 都需要接收全站所有消息的广播消息,然后内部进行过滤。如果全站的消息都是单聊消息,结果每个 websocket 都拉取消息,并且内部过滤,是带宽的浪费,以及 cpu 的浪费。

总结

根据前面的计算公式对比,我们很容易的得出一个简单的结论。

单聊消息多的,用精准投递。群聊消息多的,用广播消息。

对于抹茶,我们是有个全员群聊,很多个小群聊,和很多的单聊。发言频率最多的场景是全员群。所以抹茶最应该使用集群广播推送的方案。

应用扩展:百万直播间推送方案

IM 场景不仅仅只是“聊天”,只要涉及到“消息推送”的场景,都有可能用到 IM 的技术

甚至说,“消息”的载体不一定是“文本”。任何类型的消息都可以看作 IM 系统中的传递的数据

假设在抖音直播的场景下,一个热门的直播间 100w 人同时在线,大量的礼物,互动消息充斥在直播间,如何通知到每个人。保证消息的即时性,可靠性。

首先这明显更倾向于大群聊的一种场景,如果用精准投递,那么消息的扩散系数就是 100w 级。如果采用的是集群推送,假设 100w 的用户需要 500 台 websocket 进行连接,那么扩散系数只是 500 的级别。

但是这个假设是整个平台只有这个直播间,如果平台有更多的直播间。websocket 会更多,mq 的扩散系数也会更大。

image

每个方案又都优缺点,而应对极端场景,通常都是方案的组合,扬长避短。很类似于我们后面会提到的推拉结合。

推拉结合(方案合并)

这个场景的方案,我们可以设置一个热门阈值,比如 1w。超过 1w 的直播间,我们会进行直播间升级,升级成热门直播间。热门直播间的 websocket 单独管理。把直播间用户的 websocket 连接都统一路由到固定的几百台 websocket 上。由于目标用户都集中了,也就不需要精准投递了,可以采用广播投递消息到这指定 500 台机器上。再对应的推送给直播间的观众。

  • 这里其实是精准投递和集群推送的一个结合(你会发现,很多方案都是有优劣的,最后都是结合起来使用扬长避短)

这个方案的核心,就是要能将直播间所有用户通过网关路由到相同的 500 台 websocket 上,有了这个基础,才能用广播消息,那 500 台 websocket 都监听同一个 topic 的广播 mq 消息。能省下很大的带宽开销。而消息的发送端,需要知道消息究竟是发送到热门直播间的 topic 进行集群广播还是普通直播间的精准推送。还是得依赖 router 服务进行路由推送。

热门直播间升级

一开始的普通直播间,用户都分散在不同的 websocket 机器上。等到直播间人数突破阈值 1w。就需要开始直播间的热点升级

这时候服务器检测到直播间需要升级,动态扩缩容,启动一系列配套措施(k8s 现在已经使用的比较多了)。

一系列措施准备好后,相应的配置推送到网关路由机器上。指定以后该直播间的连接路由到我们新启动的 50 台 websocket 上。然后对当前在线的所有用户发送断连替换指令。

所有在线用户都断开连接,重连的时候会被网关路由到新的 websocket 上。

  • 优化:对于经常突破 1w 人的直播间,可以打个标。以后该直播间上线,默认就是热点,省略升级过程。

消息合并

直播间的点赞操作,一般发生在主播求赞的时候,大量人在同一时间段点赞。并且单人在同一时间也快速点赞。

可以在客户端对每个人的多次点赞首先进行合并一次(用户 a 点赞 20)。请求到后端后,由于路由已经做好,在每个点赞服务器,可以对多人的点赞再合并一次(用户 a+b 总点赞 40),进行入库。

给前端推送的时候,也可以合并推送,不需要每条点赞都推送。每隔 1s 推送一次直播间点赞总量达到(100w)。

优先级隔离

在 100w 直播间里,推送的消息会有很多。会导致部分消息到达产生延迟。这就类似 push 系统,消息应该区分优先级,不要被互相影响。

大礼物,和主播发言消息,这些应该独立在一个广播 topic 里,其他的不重要的消息,可以设置另一个 topic,区分优先级,不要影响重要消息。


消息时序性

消息时序性,主要解决的就是消息展示的顺序问题。它为啥那么难?

如果用户 a 给用户 b 同时发了三条消息 aa bb cc。而服务端接收到的消息是并发的,可能入库就是 aa cc bb。

就产生了发送方顺序和接收方顺序不一致的情况。

image

客户端排序

这时候可以让 a 发送的每条消息带上时间戳,服务存储 a 的消息时间戳。b 的展示根据时间戳排序。

image

但是如果在多端发送的情况下,a 有电脑端,手机端,每个端的时间可能是不一样的,就会出现发送的乱序问题

image

回头想想,只是为了解决用户快速发两三条消息时间内的局部排序而已。可以参考 腾讯 sdk 的实现。

给消息设置一个本地的自增 id,发送消息的时候带上。排序整体以服务器的时间为准,相同秒内的排序以自增 id 为准。

image

这样赌的就是,你发的消息再快,哪怕存储顺序变了,但是也都在 1s 内,对于 b 来说,1s 内的消息,按照自增 id 额外排序就好了。

这样发送者只需要保证指定时间内的消息自增就好。如果哪天 seq 丢失,或者在其他端发消息 seq 不一致,都没关系。

但是在群聊的场景下,每个群成员的客户端时间也是不一样的,没法作为排序的统一基准时间。只能采用服务端时间。并且 seq 也是不同的,相同秒内也没法排序,不适用该方案

image

服务端排序

对于群聊的场景,我们需要采用服务端排序。服务端排序其实本应该很简单。

对于单表来说,我们可以采用主键 id 来排序,也可以通过消息的时间戳来排序。id 肯定是严格自增了,时间戳要考虑精度问题,一般设置的精度是毫秒,也基本足够进行消息的排序了。因为毫秒内的消息,本身就没有上下文的关系,对顺序要求不高。

image

小绿小王小马,他们的消息还在发送中,互相都是根据上面已发的消息做的决策,所以他们三的顺序,并不重要,以服务端的时间为准即可。

除了消息的顺序性外,还有一个很重要的点,就是消息的唯一性。在游标翻页的场景下。

翻页的游标字段,需要同时保证顺序性与唯一性的作用。如果单纯用时间戳,毫秒内的消息,就没办法区分与排序,得额外再拼接其他唯一字段一同排序,不如就直接用唯一且有序的 id 作为游标。

image

**因此消息的时序性,我们通常都是用一个唯一 id 来保证。**​微信 sdk 也是用一个 id 来进行翻页。

总结

单聊场景可以用客户端的序列号 seq,群聊则用服务器的 seq 作为标准排序,保证单位升级内多条消息的顺序性。

消息不仅要保证时序性,还要保证唯一性,通常用消息 id 字段满足要求。


消息 id

消息 id 不仅要唯一,更要有序(递增)

保证唯一很简单(雪花 ID3)。但是严格保证有序就有点难度。

全局递增

消息在整个 IM 系统都是唯一且递增的。

一般对于单表来说主键就自然保证了递增。

但是如果消息量大了,省不了分库分表,分库分表后的消息递增,通常采用分布式 id。

但是分布式 id 通常保证的是趋势递增,而不是单调递增。

  • 单调递增

    • image
  • 趋势递增

    • image

所以雪花 id 不适用于 IM 这种严格时序性的系统。

事实上,严格的单调递增,意味着严重的单点竞争问题。对于一个都需要分库分表的系统,是很难实现这样的方案的

会话级别的递增

上面也说到,全局的单调递增,意味着严重的单点竞争。话说回来,我们为啥需要递增呢?不就是为了消息的顺序性展示吗?只需要保证单个群组内的消息 id 是有序的且唯一的,就足够了。QQ 就是这样的架构。

那么如何保证会话级别的递增呢?一个简单的做法分库分表以会话 id 分表。这样相同的会话必定在同一张表,又重新用回了主键自增。

一般分表就不用主键自增了,都是用分布式 id。因为这种方案很难支持之后的扩容,比如 4 扩 8。

用分布式 id 保证会话级别的单调递增

**单调递增,同时意味着单点问题。**两者是不可两全的。分布式 id 之所以没有单点问题,所以大多都是趋势递增。这里面涉及的可用性,以及单调递增这种一致性的取舍。非常有意思。最终实现的效果,类似 mysql 的主从,单点的 mysql 用于自增 id,以及主从保证高可用的切换。

实现参考:微信序列号生成器架构设计

收信箱递增

会话级别的递增,更多的适用于读扩散的场景。所有人拉取消息列表的时候,都去会话的消息表拉取。

image

收信箱的递增,适用于写扩散的场景。所有人都有自己的一个收信箱,维护自己的时间线即可。

image

收信箱的时间线的单调递增和 uid 相关。实现的方式和上面都一样,一个是以会话为 key,一个是以 uid 为 key。

**实现参考:**​微信序列号生成器架构设计

微信就是典型的写扩散场景,所以他的群聊最多只能 500 人。

总结

我们讨论了消息 id 重要的使命,用来确保唯一,和有序。

对于有序顺带提了单调递增和趋势递增。现实中分布式 id 大多数都是趋势递增的,这样比较高可用。

而有序,又讨论是三种有序的情况,为啥全局单调递增不好实现(因为单点竞争)。引出了单调递增 id 生成器的方案,和可用性保证。

如果你能了解市面上大多分布式 id 方案的优缺点,同时了解 IM 的业务诉求,必须单调递增的限制。又能够引出业界的解决方案。基本上能折服非该行业的面试官了,同时体现出你的专业性。

说了这么多,虽然抹茶用的就是单表 id 自增****。但是哪怕问到亿级数据的方案,我们也不带怕的!


消息可靠 ACK

发送成功分为 2 个步骤:

  1. 发送方发送消息给服务器,服务器成功接收返回 ACK
  2. 服务器推送给接收方,接收方成功接收返回 ACK

发送可靠

如何确保发送的消息可靠,基本都是靠 ack 来保证。有人会好奇,能不能借助 tcp 的 ack 呢?答案是不能的,业务层的 ack 不能靠网络层来保证

举一个简单的例子你就知道了,假设我发送消息,到达服务器了,这时候 tcp 的 ack 已经响应,代表服务器接收到了请求。但是服务器在入库的时候失败了。

业务层可能有很多种失败,业务校验,db 入库,代码执行。。。等等。只有业务层返回的 ack,才是业务可靠的保证。

image

  • 如果是从 websocket ​发出的消息,需要 websocket 再主动返回一个 ack 的消息。websocket 底层是 tcp 协议,他的特点就是发消息接收消息是没关系的。需要通过一个唯一标识,标识推出去的消息是用来响应上一个接收的消息的。同时发消息的客户端,也需要发送完进行阻塞等待(可超时),等待响应的到来。类似 dubbo 的底层交互原理

    • image
  • 如果是 http 协议,他的协议能够在 tcp 之上,通过收发的报文进行请求和响应的关联。

抹茶采用的是 http 进行发送消息,所以直接通过返回的标识,判断是否发送成功即可

  • 如果发送方发送失败(ack 明确失败,ack 超时)如果是明确失败,可能是业务校验问题,提示用户即可。
  • 如果是超时,底层帮助自动重发。确保发送可靠。

推送可靠

推送可靠一般体现在服务端消息入库成功后。推送给对应的消息接收方。需要保证消息能够到达消息的接收方。

为了保证严格的可靠性,这些推送给每个人的 ack 都是需要入库的,可以写到每个人的消息表持久化。接收到 ack 后,修改消息状态。

image

定时重试

可靠性,基本离不开一个不断 check 的重试节点。这个重试可能是后端,也可能是前端。如果信箱没有收到 ack。说明消息没有到达接收端,需要进行重新推送。可靠的前提,在于信箱是持久化的。定时任务又支持不断重试的。

image

但这有个问题,如果用户一直不在线,难道你的定时任务就一直拉取全部信箱消息,在逻辑判断是否在线,再推送吗?这每次拉取的消耗可不小。

在线推送

推送服务增加一个判断,如果是在线消息,才会进行推送,并且记录消息在内存中,定时任务也只会拉取内存 ack 队列的消息,进行推送重试。

image

如果接收到 ack。内存队列会移除对应的待 ack 消息。并且对持久化的信箱进行 ack 标识。如果内存待 ack 队列过多,可以采用 lru 的方式,排除最早入队的消息。

当然内存队列是肯定不可靠的,但他只是为了加速我们的可靠性推送的效率,最终还有我们的持久化收信箱做兜底。最后的保障,也就是我们的离线推送。

离线推送

对于不在线的用户,只能确保他在下一次连接上的时候,保证消息的可靠推送。在此之前,还有友情通过 push 的方式提示他消息到达。让他打开软件。

image

image

所有的复杂点都在用户上线的时间点。

发现用户上线后,要立马查出他所有未 ack 的消息,一股脑全部推送给他。并且 copy 一份到内存 ack 队列,确保可靠性。后面的流程就差不多了,定时任务继续重试,保证最终的一致性。

一次性推送所有消息,也可能会有瓶颈,对于一个很久没上线的用户来说。

可以做到分批推送,感兴趣看b 站文章

image

这样已经是一个完整的消息可靠性方案了。做到了持久化,重试,就能达到最终一致性。但是。。。

在我们的万人群聊场景下,一条消息,就意味着需要写入 1w 人的收信箱。你知道这种写扩散的系数有多爆炸吗???再换个角度来说,每条消息都存一份消息 id 到用户的个人信箱。对应的存储也增长了好多倍。微信为了降低写扩散的影响,都把群限制了只能 500 个人。(不是说解决不了,只是代价挺大的)。

以上我们讨论的都是消息推送的可靠性,遇到的热点问题,实际上还有一些优化方案,后面的推拉结合,和热点群聊,都会再讨论这个问题,综合一个更好的方案。

上面的讨论,我们了解了消息可靠的基本方案。虽然抹茶没做到消息的可靠保证,但是面试官问的的时候,你也能答得出来了。

抹茶为啥不做消息可靠的保证呢?一方面是复杂。另一方面抹茶是一个 web 项目,压根就不存储消息,消息到没到问题不是很大,哪怕消息到了,你刷新一下,消息也又没了。


消息重复避免

保证幂等的关键在于“唯一 Key”标识

  • 接口幂等设计4

发送幂等

消息发送的时候,如果遇到网络波动,底层会自动帮忙重试。如何唯一的标识这条消息?靠的是发送端生成一个唯一的标识,如果重试的时候,相同的消息带的是相同的标识。后端服务就能够检测出来,保证幂等。

**类似 kafka 发消息到 broker 的重试,也会在发送时生成个幂等标识。**​腾讯 sdk 的文档,也能看到发送消息的一个随机 md5,保证幂等标识。

image

1s 内的去重保证,是比较好的做法。否则需要去重判断的范围太大了,涉及的历史数据也太多了。这样的话,发送端的自动重试也要限制一下超过 1s 还没成功,就放弃重试了。

接收幂等

服务端对接收方的消息推送,也是有可靠性保证的。如果没有及时收到 ack,定时任务就会进行消息推送重试。同样需要考虑到幂等问题。

假设服务端对接收方推送了相同的消息两次。接收方会怎么展示呢?一般能够想到对消息做幂等判断,如果是客户方已经展示的消息,就跳过。那么这个唯一性怎么判断呢?

消息的唯一性,基本就靠消息 id 来判断了。上面也讨论了消息 id 的唯一性问题。

一般全局消息唯一的话。直接用唯一消息就好了。

如果是会话级别的消息 id 唯一,那判断的时候就需要判断 会话 id+ 消息 id 唯一。


推拉结合

推模式

有新消息,服务端需要主动推送给前端。需要用到 websocket。并且后台会维护一个定时任务,定时推送还未接收到 ack 的消息。保证消息的实时性。

拉模式

拉模式又分为短轮询和长轮询。前端主动询问后端是否有新消息。以定时的频率访问。我们项目已经用到了 websocket,一般就不用拉模式了。拉模式可以用在历史消息列表。新消息,还是要保证消息的即时性。

缺点:

  • 延迟较高:拉模式需要消费者主动请求数据,可能导致较高的延迟。
  • 重复请求:如果消费者频繁请求相同数据,可能导致带宽浪费和系统效率降低。

推拉结合

理论上保证消息的及时性,推模式足够了,为什么还要拉模式?推模式需要考虑推送失败的情况,又需要服务端启动定时任务,确保 ack,方案比较复杂,对服务器消耗也大。

实际上推送的失败概率没有那么高,如果客户端每隔一定的频率进行消息拉取。相当于客户端是那个定时任务。就能达到最终一致性。

采用推拉结合。推主要是保证及时性。而拉主要是保证最终一致性,也就是消息到达的可靠性。

image

  1. 推送新消息到达,里面没有任何特殊消息,仅仅是为了及时触发客户端的消息拉取动作。这个推是无状态的,也就可以任意调用,实现超级简单。
  2. 客户端接收到新消息提醒,或者是定时任务到达指定时间。对服务端发送拉取新消息的请求。只需要带个 token 就好了
  3. 服务端查询用户信箱里未 ack 的消息,全部返回给客户端。达到返回增量消息的目的。
  4. 客户端收到消息后,可批量 ack,服务端收到 ack。将信箱标记为已读取。

实例

服务端在拉取新消息的时候,指定的是两个字段,uid=用户,ack=false。为了能快速拉取到新消息。是不是需要一个联合索引,uid&ack 的联合索引。

新消息的推送,仅是一个无状态通知,最终一致性是靠客户端的拉取实现的。也就意味着我们先后到达两条消息 aa,bb。

  • aa 触发的新消息通知,由于网络异常丢失了。bb 消息入库时,进行新消息通知。这时候客户端拉取新消息的时候,也能把 aa 给拉到。这是单纯用消息推送无法做到的点。

由于整体的流程都变成无状态的请求了。这样客户端在请求新消息的时候,可以请求到批量的消息。客户端在 ack 的时候,也从原来的一条条 ack,变成了可以批量 ack。在大群聊很多消息的时候,我们可以通过控制新消息推送的速度,很很容易的达到合并拉取的效果

只要我们在索引上下了功夫,查询未 ack 的速度就很快。客户端定时向后端的新消息拉取请求,不会占用多少 cpu。另外由于没有新消息,拉取到的为空,也不占用多少带宽。

通过推拉结合的组合,通过无状态通知,减少服务端的压力。由客户端的请求触发最终一致性。收口了核心的接口达到复用效果。

但是依然没有解决热点群聊的写扩散问题,所有的消息依然需要写入到用户收信箱。


多端同步

服务端维护 ack 状态

上文提到的推拉结合是个好方案,但是在多端同步的场景下,就失效了。多端同步的核心,是确保消息可靠的到达多端。消息的可靠是通过 ack 完成的,问题就出来这里。

用户收信箱只有一个,消息的状态分为 ack=true,false。代表用户是否已经收到消息。但是在多端的场景下,如何去表示用户收到消息呢?

image

这样每个端的 ack,都需要独立去维护自己的 ack 信箱了。手机收到了两条消息,电脑收到了一条。在电脑进行新消息拉取的时候,拉取到的消息应该是 2,3,4。而手机端拉取到的应该是 3,4。

这样的设计可太烂了。难道我们要为每个端都准备一个信箱吗?如果有新的端加入,信箱还需要做历史数据修复吗?

为了兼容这个场景,我们不得不去设计一套新的交互方案。

客户端维护 ack 状态

我们来思考一个问题。可靠性,究竟是怎么保证的?是靠状态的持久化来保证的。比如收信箱里的 ack,就明确标识了用户接收了哪些,没接收到哪些,之后才能够推送未 ack 的。

保证最终一致性。而现在的状态存储遇到了问题。多端场景下,需要服务端维护多端状态。

这是一个难题,那么能不能把多端状态抛给客户端??

image

服务端不再维护 ack 状态,由客户端维护。客户端维护该端读取到的最后一条消息的游标。这个游标的选择,需要具备唯一性和有序性。关联上我们前面介绍的,也就是用消息 id 来保证了。

每次推送,都需要对多端进行新消息通知。

每次拉取新消息的时候,客户端不仅需要带上 token,还需要带上自己当前读到的游标。服务端根据游标,查询到 id 大于游标的消息,全部返回给客户端。

  • 手机请求新消息时,带上的是自己的游标是 2。查到大于 2 的消息 3,4。
  • 电脑请求新消息时,带上的是自己的游标是 1。查到大于 1 的消息 2,3,4。

这一切的前提,是 id 是全局递增的。这样服务端只需要维护一个游标。每次拉取新消息,都是拉取多个会话的消息。

  1. 如果 id 是收信箱递增的,对于单个用户来说,其实相当于是全局递增了。用的是收信箱的 id 做游标,而不是用消息 id。也能达到全局的效果。微信就是这个方案。
  2. 如果 id 是会话级别递增的,那么客户端就需要维护每个会话的游标了,每次拉取消息的时候需要带上会话 id+ 游标 id。每次定时任务的拉取,也需要带上多个会话 id+ 游标 id,为每个会话都尝试拉取最新消息。整体实现复杂了不少。这种方案通常不常见。

image

对于抹茶来说,是全局递增的 id,完全可以客户端存储阅读游标的方式实现多端消息同步。虽然我们暂时没有去做,但是大家可以这样去回答。

抹茶的多端同步也比较简单,毕竟是一个 web 项目不保存消息。登录每个端都一样。拉取最新的一页消息。后续新消息通知就拉取新消息。感兴趣历史消息,也可以通过翻页加载的方式加载老消息。对于客户端缓存消息来说,看老消息直接从库里查,都不需要翻页加载了。


单聊群聊

消息表怎么兼容单聊和群聊。我看市面上很多的设计都是这样的。

image

目标类型分为单聊和群聊。单聊的情况下目标 id 填 uid。

群聊情况下,目标 id 填群组 id。这样消息是兼容了,但是其他地方也都要兼容了。比如我们的个人收信箱。需要存所有消息。同时还要知道这个消息在哪个会话下的。

单聊群聊的会话要怎么去唯一标识呢,首先个人信箱,都需要个人 uid 去标识。一般单聊是加上好友 uid,就代表会话。群聊加上群 id。

image

这样收信箱也需要兼容单聊和群聊,设计两个字段,后续为了能快速命中会话,还需要设计组合索引。

后续其他表只要是涉及会话 id 的都需要这两个字段。想想就麻烦。本着复用和抽象的原则。能复用的地方,就给它抽出来,看看我们要怎么去优化它。

image

我们可以抽象一个房间表出来,群聊所有人在一个房间里面聊天。单聊无非就是两个人在房间里面聊天。这样房间关联上单聊或者群聊的相关信息。有了这层抽象。消息表和个人信箱都不需要两个字段标识会话了。直接用房间 id 就是会话 id。通过房间 id,自然就关联出对应的单聊群聊信息了。

image

  • 所以表结构设计就是这样的。通过房间表 room,抽象了一层,屏蔽了单聊群聊的差异,让其他关心会话的表设计起来更加简单。只需要关联一个房间 id 字段。

单聊群聊和房间,都是一对一的关系。相当于单聊表和群聊表都是扩展表。本质上是可以直接在 room 上面添加字段的。选择扩展能划分更加明确。

  • 群聊有对应的头像,群名称等。另外还有一个很重要的群成员。由于群成员和群是多对一的关系。一个群可以有多个群成员。
  • 群成员单独建了一张表。里面有个 role 字段,记录群成员在群里的属性,比如群主,管理员,普通成员。如果我们需要快速判断一个群的群主怎么办?为了走索引,所以设计了 group_id 和 role 的联合索引。当然你也可以作为冗余字段,直接记录在 group 表,添加个群主 uid 字段。

单聊表有个很重要的点,就是怎么去唯一确认一个房间。比如我新加了一个好友,系统为我们生成了一个单聊房间。后续在好友列表找到这个好友,对他发消息。这时候怎么找到我们之前的那个房间。这时候我只知道我的 uid 和好友的 uid。

所以在单聊表中有两个重要的字段,uid1 和 uid2 代表好友双方的 uid。为了能保证双方建立的房间的唯一性。我们增加了一个唯一字段 room_key 来唯一标识两个好友的房间。

他的生成规则很简单。uid1_uid2。其中 uid1 是双方 uid 较小的那个,uid2 是双方 uid 较大的那个。这样避免不同的排列顺序,产生二义性。


消息已读未读

为了满足各位老板的诉求,钉钉和企业微信都支持查看消息的已读未读。

image

这里面有几个关键的元素,总共多少人已读,多少人未读,已读未读列表。一条消息怎么知道对方读没读了?这又回到了 ack 的问题。刚刚我们的多端同步好不容易把 ack 给取消了,通过客户端维护自己的阅读游标,来保证消息的可靠性。现在服务端还是需要维护每个人的 ack,这样才知道消息读没读。

image

相当于每条消息,都需要投递到用户的收信箱,并且需要记录已读未读的状态。用户读取消息后,需要返回 ack。消息投递的可靠性依然由客户端拉取消息自行保证。收信箱的 ack 是用于已读未读的统计。

  1. 有人发了条消息 1,群消息入库,并插入到群成员 A 和 B 的收信箱,此时都是未读状态

  2. 推送服务通知 A 和 B 拉取新消息。

  3. A 正好在线,拉取了新消息 1,拉到消息后,返回 ACK。

  4. 推送服务标记 A 对消息 1 的 ACK。

  5. ack 的消息同时需要推送给消息的发送者,同步更新

  6. 发消息的人想看看消息的相关情况

    1. 已读数:​ select count(0) from 信箱 where msgId=xx and ack=true;
    2. 未读数: select count(0) from 信箱 where msgId=xx and ack=false;
    3. 已读列表:select uid from 信箱 where msgId=xx and ack=true order by ack_time;
    4. 未读列表:select uid from 信箱 where msgId=xx and ack=false order by create_time;

不过以上方案存在明显瓶颈

收信箱写指数扩散

对于一个万人群聊,每条消息都需要记录所有人的 ack。一条消息就需要写入 1w 条收信箱记录。每人发一条消息,那就是 1w*1w=1 亿的记录量。对存储来说是个很大的负担,有没有可能减轻负担呢?

换个思路,用户的收信箱能不能不为每条消息记录 ack,只记录用户阅读的最新时间线(类似之前的客户端存游标)。这样用户 ack 的时候,后台只需要更新 ack 的最新时间。

image

A 收到新消息 10003 后,提交 ack。收信箱记录 A 最后的阅读时间 10003。这样存储就不会因为消息而指数扩散了。

只和群成员数有关,每个人都只有一行阅读记录。

因此我们设计了如下的用户收信箱表,也叫会话表。

image

所以之前的查询已读未读数据的 SQL 就变成这样了:

  • 已读数: select count(0) from 信箱 where room_id=xx and read_time>消息时间;
  • 未读数: select count(0) from 信箱 where room_id=xx and read_time<消息时间;
  • 已读列表:select uid from 信箱 where room_id=xx and read_time>消息时间 order by create_time;
  • 未读列表:select uid from 信箱 where room_id=xx and read_time<消息时间 order by create_time;

所以这里面有个关键,需要 room_id 和 read_time 的联合索引,才能够加速查询。

消息阅读推送

类似钉钉和微信,发送完每条消息,就能看见他的阅读数在蹭蹭上涨。只要有人阅读了,他就加 1,看起来是实时增加的。这个怎么设计呢?

最暴力的做法。每个人阅读完群消息,都通知给发送方。然后消息阅读数 +1。

image

这样的消息推送频率也是很高的,特别是在万人群聊下,发一条消息,就要收到几千次的 ack 推送。又要几千次的 ack 给小王。

优化一下,将推送合并。可以采用服务端定时任务的方式。定时任务的间隔,就成为了合并的时间窗口。

  • image

小王发送新消息后,后端起个定时任务。每 5,20,40,100,200,指数上升的时间,给他推送一次消息的总阅读数。这样在 5s 内的 ack 都会被合并成一次总的阅读数推送给小王。

这样做的前提,是业务允许小王对消息阅读数一开始很关注,过了 30s 后,就不那么着急更新的时间了。过了几分钟后,小王压根就不关心消息的阅读数了。定时任务也可以停止了。

这个方案可以节省很大的性能。但是后端维护一个定时任务还是有些麻烦,小王真的有那么关心消息的阅读吗?这 5 分钟内,他都会在这个页面等着看最新的阅读数吗?

image

由前端自己查询消息的未读数。假设我发了十几条消息。我真的在乎那些消息有谁阅读了嘛?

只有我停留在那个房间,且一个屏幕内有我的发送过的消息,我才需要实时更新未读数。

image

比如这一屏内,我只有这两条消息需要看到未读数的更新,我只需要前端启个定时任务自己查这两条消息就好了。这样又节省了很大的性能,更多的时候,我可能已经退出了页面,就一条消息的未读数都不查了。

总结

很多时候,在服务端出现了性能瓶颈,都可以换一个角度思考:服务端真的需要做这些事情吗?业务需求真的重要吗?

我们可以将任务交给客户端来做,这样既能够节省服务端的资源,业务也能接收。

换位思考:服务端 <-----> 客户端。需求不一定真的由服务端、或者客户端实现,转变角度、灵活处理最关键!!!


会话列表设计

https://drawsql.app/teams/-328/diagrams/-7

image

会话列表设计:

image

这张会话表,就是用户优化过后的收信箱。来看看怎么靠他实现对应的功能。

会话表记录的是 uid 在某个 room 内的消息详情,比如最新消息时间,自己阅读到的时间。

  • 会话列表:select room_id from contact where uid =我 order by active_time desc。查到我的所有会话,并按照每个会话的最新消息时间倒序排序。为了能快速命中索引,需要 uid 和 active_time 的联合索引
  • 会话消息未读数:select count(0) from msg where room_id=我房间 and create_time> 我阅读。拿着自己会话表的阅读时间去消息表比较,由于有 room_id 和 create_time 的联合索引。查找速度很快。

优化:假设我很多年没上线了。消息的未读数达到了几万条,就算我有索引,也得扫描几万条记录,统计未读数。这个还是比较耗时的,怎么办呢?参考微信最大只展示 99 条消息的未读数。我们的未读数统计可以优化成

select count(0) from
(
  SELECT 1
    FROM msg
    WHERE room_id=我房间 and create_time>我阅读
    LIMIT 100
)

这样未读数最多只会扫描 100 条,避免了极端的场景。

抹茶的会话表设计,目前就采用了这样的方案。通过记录阅读时间线而不是记录每条消息的 ack。完美适配了会话列表的功能,和消息已读未读数的功能。


热点群聊(读写扩散混合)

前面留了那么多个坑,都依然没有解决万人群聊写扩散的问题。但是通过一步步的表结构设计的优化,已经逐渐接近了。

万人群聊最大的问题,就是每条消息,都需要写入用户的收信箱。也就是那个会话表的 active_time 字段。更新了这个字段,最新消息的那个会话才能排序在最前面。如果不更新,这个房间在用户的会话列表就排在很后面了。

这个更新时间可以记,但是能不能单独记,不扩散写到用户的收信箱,而是单独写到热点信箱,用户读取的时候综合读取自己收信箱的会话,合并热点信箱的会话。

image

这样每个小方块都是一个房间,里面的数字代表这房间的最新消息游标。

  • 对于小群聊写扩散到用户的每个收信箱,更新房间的最新消息游标。
  • 对于热点群聊,直接单独记录该房间的最新游标。
  • 用户在查询自己会话列表的时候,通过聚合层。聚合自己收信箱,和自己参与的热点群聊的排序,然后展示给用户

这样就可以节省每次热点群聊写扩散的消耗了,对热点群聊来说是个极致的提升。

https://drawsql.app/teams/-328/diagrams/-7

image

热点群聊相关的信息,直接记录在 room 表里,不需要写入到用户信箱。

为了聚合的时候效率更高。我们还可以把热点群聊直接缓存在 redis 的 zset 里。这样聚合的时候,速度更快。

精准时间聚合

在现实的场景里,跨服务分页一直是一个难题。最佳的方案就是写一张聚合表,将跨库的数据聚合成一张表,然后进行条件分页。然而我们的热点群聊就是为了避免写扩散,所以才需要聚合。这个方案行不通了。

那我们怎么去聚合呢。可以选一张主表用来分页,副表用来聚合。

image

  1. 以收信箱为主表,查询第一页,假设一页 3 条。查到 3,5,7,通过 3 和 7 作为条件,筛选热点群聊里面的房间,只能筛选到 4。2 怎么办???需要兼容第一页的场景,start 不应该限制。只限制 end<7,筛选出 2 和 4。聚合返回 2,3,4,5,7。

image

  1. 第二页只有两条,筛选出 9 和 10,通过 9 和 10 作为条件,筛选热点群聊房间,筛选不到。那 18 怎么办??需要兼容最后一页的场景,end 不应该限制,只限制 start>9.筛选出 18.聚合返回 9,10,18.

这种方案性能非常的高,但是需要业务能够接受本来一页三条数据,有可能一页 5 条,或者一页 4 条的情况。在极端场景这个人加入了很多热点群聊,一页可能会加载出 100 多条数据。

精准条数聚合

如果业务有强烈的诉求,期望一页得到的条数是固定的 3 条,我们也需要提供相应的方案来满足诉求。

image

查询第一页的三条,个人收信箱和热点信箱都是主表。双方各查出一页的数据候选(避免极端情况一页都是热点,或一页都是普通)。通过归并排序的方式,每次都 pk 普通和热点的房间,取出较小的那个放到结果集合。第一轮比较取出 2,第二轮比较取出 3,第三轮比较取出 4,填满即可

很像归并排序14

大文件的难点,在于内存放不下所有的数据。因此只能拆分成一个个局部排序的小文件。然后再对多个小文件,进行多路归并排序。

image

将多个文件的头结点,一起构造一颗小顶堆的树。每次从顶堆取值,就汇总最终的排序。顶堆支持每次弹出一个值就从候选队列再压入一个值。类似的是,我们的抹茶只有两路归并,简单了不少

二路归并

image

  1. 查询第二页。根据上一页的最后一条的游标,确定下一页开始的位置,这就是游标翻页。这个场景用普通翻页无法实现。根据上一页的最后一条消息是 4,往下双方各查一页。最终通过归并排序,返回结果 5,7,9。

这样就是做到精确分页了,他的核心是,id 必须全局单调递增。因为每个会话记录的都是最后一条消息的 id。这个 id 是用来排序的,如果 id 只能做到会话内递增**,那不同会话间压根没法比较。**

总结

我们会发现,所有的方案有有好有坏,没有最佳方案,只有最适合业务的方案。而这种方案一般都是扬长避短的去组合每个方案的优点来共同实现。推拉结合是这样。热点群聊的读写扩散混合也是这样。

为了避免热点群聊的写扩散**,我们设计了会话聚合模块。为了满足会话的排序,我们设计了精确时间聚合,或者精确条数聚合的方案。其中精确条数要求 id 全局单调递增。**

抹茶采用的方案是比较简单的精确时间聚合。由于抹茶是 id 全局单调递增的,想要改成精确条数聚合也特别的简单,大家可自行选择。


消息表支持多类型消息

image

我们能够支持文件,视频,pdf,图片,语音,文本。这么多类型的消息,这个表结构该如何设计呢?

首先来思考下,文件,视频,图片,他们都是啥。真抽象起来,不全部都是一个文件吗,一个存在 oss 里面的 url。有了这个思路就简单了。

我们只需要添加一个上传的接口,这些类型的消息,前端只需要上传后记录 url,提交给我们即可。

参考腾讯 sdk。我们的多类型消息文档

https://drawsql.app/teams/-328/diagrams/-8

image

消息最重要的其实就是两个字段。

  • type​:指定消息是什么类型。
  • extra​:放置不同类型消息的详情,如果是特别重要的消息,还可以通过设置关联表的方式,扩展出去,比如红包类型消息。

这里的content​,reply_msg_id​,gap_count​其实都是文本类型的消息才有的字段,也可以扔 extra 里面去。由于一开始的历史兼容问题,没有删除字段,后续可以移除,让消息表更简洁。

消息表最重要的就是谁,在哪个房间,发送了什么类型的消息。详情都可以在 extra 里面。


  1. 微信扫码登录技术方案实现

    原文:www.yuque.com/snab/mallc...

    前言

    我们的项目就是采用的扫公众号事件码的方式登录。

    image

    流程时序图

    image

    整体流程介绍的比较详细,用户初次注册才需要授权信息。第二次登陆只需要到第 9 步,就登录完成了。

    具体的操作可以看微信的公众号文档,接下来我们就代码走读下过程。

    生成带参数二维码:

    用户授权步骤:

    和微信的交互主要有三个点:

    1. 申请带参二维码
    2. 扫码事件通知
    3. 授权事件通知

    详细解析

    1. 建立 websocket 连接

    image

    进入到这个页面的时候,前端就开始和后端建立了 websocket,这时候就已经可以接受新消息推送啦。不过目前这个连接是一个未登录的用户。需要进行登录认证。

    前端通过new WebSocket("URL")​和后端建立连接。

    image

    后端通过在 netty 中添加处理器,记录并管理连接

        private WebSocketService webSocketService;
    
        // 当web客户端连接后,触发该方法
        @Override
        public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
            this.webSocketService = getService();
        }
    
        private WebSocketService getService() {
            return SpringUtil.getBean(WebSocketService.class);
        }
    

    将连接放入一个 map 中管理,目前该连接是未登录态。

    image

    2. 前端请求扫码登录

    前端点击登录,会发出一个请求登录二维码的 websocket 信息。后端做了三件事

    public void handleLoginReq(Channel channel) {
        //生成随机不重复的登录码
        Integer code = generateLoginCode(channel);
        //请求微信接口,获取登录码地址
        WxMpQrCodeTicket wxMpQrCodeTicket = wxMpService.getQrcodeService().qrCodeCreateTmpTicket(code, EXPIRE_SECONDS);
        //返回给前端
        sendMsg(channel, WSAdapter.buildLoginResp(wxMpQrCodeTicket));
    }
    
    1. 生成一个不重复的登录码​。并且将登录码和这个 Channel 关联起来。
    2. 然后请求微信的接口,将这个登录码​转成一个带参数的二维码链接
    3. 返回给前端,前端就按这个二维码链接转成二维码图片展示

    image

    这个事件码就是之后用户登录携带的参数,只有它能和这个channel​​连接关联起来,很重要。

    过期淘汰机制

    在“登录时间码”管理这个 Map 中,如果不对存储的对象进行定期清除的话,项目运行很久后就会累计了很多数据,造成 OOM。

    所以我们对这个存储数据结构有以下要求:

    1. Map 数据结构,可以存储 code --> channel 映射
    2. 存在过期策略
    3. 最大容量设定,避免 OOM

    于是我们引入了 Caffeine 这一个本地缓存包:

        // 保存请求登录的code与channel的关系(本地缓存) ———— 等待登录队列
        public static final Cache<Integer, Channel> WAIT_LOGIN_MAP = Caffeine.newBuilder()
                .expireAfterWrite(EXPIRE_TIME)
                .maximumSize(MAX_SIZE)
                .build();
    
    • Caffeine 是一个高性能的 Java 本地缓存,支持各种需求的缓存设置

    要根据业务的需求,选取符合要求的“数据结构”

    3. 用户扫码

    image

    这是一张携带参数的二维码。当用户扫码后关注公众号。公众号会给我们后台回调一个关注事件 OR 扫码事件。

    取决于用户是已关注还是未关注。不管啥事件,都会携带二维码参数 即登录码

    4. 用户注册

    用户扫码的回调,我们能够获得用户的 openid。和登录码。这还不够,对于新用户我们需要他的头像和昵称。

    这个信息需要额外的用户授权。

    image

    于是我们先让用户注册了。再额外给用户发一个推送,请求用户授权。

    用户注册后,会先临时保存一个 openid 和前端登录码的关系映射。

    image

    5. 授权用户信息

    我们推送的授权信息是一个微信授权地址,用户点击后,微信就会回调我们的系统,并且获取一个重定向的地址给用户展示。

    private static final String URL = 
    	"https://open.weixin.qq.com/connect/oauth2/authorize?appid=%s&redirect_uri=%s&response_type=code&scope=snsapi_userinfo&state=STATE#wechat_redirect";
    

    image

    用户看到这样一条信息,点击登录,微信就会给我们发送授权成功的回调。

    image

    这个 userInfo 我们就能拿到邮箱,昵称,openid 等。通过 openid 关联出之前的事件码,然后复用已注册的登录逻辑。

    image

    image

    6. 在线用户管理

    因为用户可能在多点登录。一个 uid 有多个连接。思考两个场景。用户所有连接都下线,才能算用户下线。私聊推送用户的时候,需要推送某个 uid 的所有连接。都需要额外有个 uid 作为 key 的关系映射。

    假设用户在手机端也登录了。就是这样的效果:

    image

    代码实现:

    // 所有在线用户和对应的socket
        /**
         * 具体场景考虑方面,这段代码可能用于一个WebSocket服务中,用于管理在线用户的连接状态和相关信息。
         * 通过将用户ID映射到其对应的WebSocket通道列表,可以方便地获取某个用户当前连接的所有通道,或者添加、删除或更新某个用户的通道信息。
         * 同时,使用CopyOnWriteArrayList可以确保在多线程环境下对通道列表的操作是线程安全的,从而保证系统的稳定性和可靠性。
         *
         * 此处使用List存储Channel的原因是为了实现 “多点登录”。用户有可能同时使用电脑和手机登录平台!
         */
        private static final ConcurrentHashMap<Long, CopyOnWriteArrayList<Channel>> ONLINE_UID_MAP = new ConcurrentHashMap<>();
    

    7. 后端主动推送

    image

    后端推送就很简单了,目前都是针对所有连接推送。所以我们应该选取在线连接管理里的所有连接。

    image

  2. epoll 是解决 C10K 问题的利器,通过两个方面解决了 select/poll 的问题。

  3. 分布式雪花算法基础知识

    原文:https://www.cnblogs.com/mikechenshare/p/16787023.html

    其他参考:

    雪花算法简介

    雪花算法,英文名为 snowflake,翻译过来就是是雪花,所以叫雪花算法。

    在大自然雪花形成过程中,会形成不同的结构分支,所以说不存在两片完全一样的雪花,表示生成的 id 如雪花般独一无二。

    雪花算法详解(原理优缺点及代码实现)-mikechen 的互联网架构

    雪花算法,它最早是 twitter 内部使用的分布式环境下的唯一分布式 ID 生成算法。

    雪花算法的优缺点

    优点

    1.系统环境 ID 不重复

    能满足高并发分布式系统环境ID不重复,比如大家熟知的分布式场景下的数据库表的ID生成。
    

    2.生成效率极高

    在高并发,以及分布式环境下,除了生成不重复 id,每秒可生成百万个不重复 id,生成效率极高。
    

    3.保证基本有序递增

    基于时间戳,可以保证基本有序递增,很多业务场景都有这个需求。
    

    4.不依赖第三方库

    不依赖第三方的库,或者中间件,算法简单,在内存中进行。
    

    缺点

    依赖服务器时间,服务器时钟回拨时可能会生成重复 id。

    雪花算法原理

    详细的雪花算法构造如下图所示:

    雪花算法详解(原理优缺点及代码实现)-mikechen 的互联网架构

    雪花算法的原理:就是生成一个的 64 位的 long 类型的唯一 id,主要分为如下 4 个部分组成:

    1)1 位保留 (基本不用)

    1 位标识:由于 long 基本类型在 Java 中是带符号的,最高位是符号位,正数是 0,负数是 1,所以 id 一般是正数,最高位是 0,所以这第一位都是 0。

    2)41 位时间戳 ​

    接下来 41 位存储毫秒级时间戳,41 位可以表示 2^41-1^ 个毫秒的值,转化成单位年则是: (241−1)/(1000∗60∗60∗24∗365) =69 年 。

    41 位时间戳 :也就是说这个时间戳可以使用 69 年不重复,大概可以使用 69 年。

    注意:41 位时间截不是存储当前时间的时间截,而是存储时间截的差值“​当前时间截 – 开始时间截​”得到的值。

    这里的的开始时间截,一般是我们的 id 生成器开始使用的时间,由我们程序来指定的,一般设置好后就不要去改变了,切记!!!

    因为,雪花算法有如下缺点:依赖服务器时间,​服务器时钟回拨时可能会生成重复 id​。

    3)10 位机器

    10 位的数据机器位,可以部署在 1024 个节点,包括 5 位 datacenterId 和 5 位 workerId,最多可以部署 2^10=1024 台机器。

    这里的 5 位可以表示的最大正整数是 2^5−1=31,即可以用 0、1、2、3、….31 这 32 个数字,来表示不同的 datecenterId,或 workerId。

    4) 12bit 序列号

    用来记录同毫秒内产生的不同 id,12 位的计数顺序号支持每个节点每毫秒(同一机器,同一时间截)产生 4096 个 ID 序号。

    理论上雪花算法方案的 QPS 约为 409.6w/s,这种分配方式可以保证在任何一个 IDC 的任何一台机器在任意毫秒内生成的 ID 都是不同的。

    代码实现

  4. 接口幂等设计

    原文:

    幂等性的定义

    在 HTTP/1.1 中,对幂等性进行了定义。

    • 它描述了==一次和多次请求某一个资源对于资源本身应该具有同样的结果(网络超时等问题除外)==,即第一次请求的时候对资源产生了副作用,但是以后的多次请求都不会再对资源产生副作用。

    这里的副作用是不会对结果产生破坏或者产生不可预料的结果。也就是说,其任意多次执行对资源本身所产生的影响均与一次执行的影响相同。

    为什么需要幂等

    之前还以为“幂等”是数据上的“一致性”概念,原来更侧重于网络。。。

    在接口调用时一般情况下都能正常返回信息不会重复提交,不过在遇见以下情况时可以就会出现问题,如:

    • 前端重复提交表单: 在填写一些表格时候,用户填写完成提交,很多时候会因网络波动没有及时对用户做出提交成功响应,致使用户认为没有成功提交,然后一直点提交按钮,这时就会发生重复提交表单请求。
    • 用户恶意进行刷单: 例如在实现用户投票这种功能时,如果用户针对一个用户进行重复提交投票,这样会导致接口接收到用户重复提交的投票信息,这样会使投票结果与事实严重不符。
    • 接口超时重复提交: 很多时候 HTTP 客户端工具都默认开启超时重试的机制,尤其是第三方调用接口时候,为了防止网络波动超时等造成的请求失败,都会添加重试机制,导致一个请求提交多次。
    • 消息进行重复消费: 当使用 MQ 消息中间件时候,如果发生消息中间件出现错误未及时提交消费信息,导致发生重复消费。

    使用幂等性最大的优势在于使接口保证任何幂等性操作,免去因重试等造成系统产生的未知的问题。

    以上情况都是属于“数据重复提交”

    判断接口是否需要实现幂等

    幂等性是为了简化客户端逻辑处理,能放置重复提交等操作,但却增加了服务端的逻辑复杂性和成本,其主要是:

    • 把并行执行的功能改为串行执行,降低了执行效率。
    • 增加了额外控制幂等的业务逻辑,复杂化了业务功能;

    所以在使用时候需要考虑是否引入幂等性的必要性,根据实际业务场景具体分析,除了业务上的特殊要求外,一般情况下不需要引入的接口幂等性。

    实现接口幂等

    幂等意味着一条请求的唯一性。不管是你哪个方案去设计幂等,都需要一个全局唯一的 ID,去标记这个请求是独一无二的。

    • 如果你是利用唯一索引控制幂等,那唯一索引是唯一的
    • 如果你是利用数据库主键控制幂等,那主键是唯一的
    • 如果你是悲观锁的方式,底层标记还是全局唯一的 ID

    2023-05-31

    美团常用的方式:

    1. 分布式锁1
    2. token 令牌1
    3. 状态机幂等1

    全局的唯一性 ID

    全局唯一性 ID,我们怎么去生成呢?你可以回想下,数据库主键 Id 怎么生成的呢?

    是的,我们可以使用 UUID​,但是 UUID 的缺点比较明显,它字符串占用的空间比较大,生成的 ID 过于随机,可读性差,而且没有递增。

    我们还可以使用 雪花 ID1 生成唯一性 ID。

    雪花算法是一种生成分布式全局唯一 ID 的算法,生成的 ID 称为 Snowflake IDs​。这种算法由 Twitter 创建,并用于推文的 ID。

    一个 Snowflake ID 有 64 位。

    • 第 1 位:Java 中 long 的最高位是符号位代表正负,正数是 0,负数是 1,一般生成 ID 都为正数,所以默认为 0。
    • 接下来前 41 位是时间戳,表示了自选定的时期以来的毫秒数。
    • 接下来的 10 位代表计算机 ID,防止冲突。
    • 其余 12 位代表每台机器上生成 ID 的序列号,这允许在同一毫秒内创建多个 Snowflake ID。

    图片

    当然,全局唯一性的 ID,还可以使用百度的 Uidgenerator​,或者美团的 Leaf​。

    幂等设计的基本流程

    幂等处理的过程,说到底其实就是过滤一下已经收到的请求,当然,请求一定要有一个 全局唯一的ID标记​哈。然后,怎么判断请求是否之前收到过呢?把请求储存起来,收到请求时,先查下存储记录,记录存在就返回上次的结果,不存在就处理请求。

    一般的幂等处理就是这样啦,如下:

    图片

    实现幂等的 8 种方案

    实现幂等需要唯一 ID,那么判断唯一 ID 需要一个存放 ID 数据库。数据库的选择可以是 MySQL 等关系型数据库,或者 Redis 等非关系型数据库

    select+insert+ 主键/唯一索引冲突

    日常开发中,为了实现交易接口幂等,我是这样实现的:

    交易请求过来,我会先根据请求的唯一流水号bizSeq​字段,先 select​一下数据库的流水表

    • 如果数据已经存在,就拦截是重复请求,直接返回成功;
    • 如果数据不存在,就执行 insert​插入,如果 insert​成功,则直接返回成功,如果 insert​产生主键冲突异常,则捕获异常,接着直接返回成功。

    流程图如下

    图片

    伪代码如下:

    /**
     * 幂等处理
     */
    Rsp idempotent(Request req){
      Object requestRecord =selectByBizSeq(bizSeq);
      
      if(requestRecord !=null){
        //拦截是重复请求
         log.info("重复请求,直接返回成功,流水号:{}",bizSeq);
         return rsp;
      }
      
      try{
        insert(req);
      }catch(DuplicateKeyException e){
        //拦截是重复请求,直接返回成功
        log.info("主键冲突,是重复请求,直接返回成功,流水号:{}",bizSeq);
        return rsp;
      }
      
      //正常处理请求
      dealRequest(req);
      
      return rsp;
    }
    

    为什么前面已经 select​查询了,还需要 try...catch...​捕获重复异常呢?

    是因为高并发场景下,两个请求去 select​的时候,可能都没查到,然后都走到 insert 的地方啦。

    当然,用唯一索引代替数据库主键也是可以的哈,都是全局唯一的 ID 即可。

    直接 insert + 主键/唯一索引冲突

    在 5.1 方案中,都会先查一下流水表的交易请求,判断是否存在,然后不存在再插入请求记录。如果重复请求的概率比较低的话,我们可以直接插入请求,利用主键/唯一索引冲突,去判断是重复请求

    流程图如下:

    图片

    伪代码如下:

    /**
     * 幂等处理
     */
    Rsp idempotent(Request req){
      
      try{
        insert(req);
      }catch(DuplicateKeyException e){
         //拦截是重复请求,直接返回成功
        log.info("主键冲突,是重复请求,直接返回成功,流水号:{}",bizSeq);
        return rsp;
      }
      
      //正常处理请求
      dealRequest(req);
      return rsp;
    }
    

    温馨提示 :

    大家别搞混哈,防重和幂等设计其实是有区别的。防重主要为了避免产生重复数据,把重复请求拦截下来即可。而幂等设计除了拦截已经处理的请求,还要求每次相同的请求都返回一样的效果。不过呢,很多时候,它们的处理流程可以是类似的。

    状态机幂等

    很多业务表,都是有状态的,比如转账流水表,就会有 0-待处理,1-处理中、2-成功、3-失败状态​。转账流水更新的时候,都会涉及流水状态更新,即涉及状态机 (即状态变更图)。我们可以利用状态机实现幂等,一起来看下它是怎么实现的。

    比如转账成功后,把处理中的转账流水更新为成功状态,SQL 这么写:

    update transfr_flow set status=2 where biz_seq=‘666’ and status=1;
    

    简要流程图如下:

    图片

    伪代码实现如下:

    Rsp idempotentTransfer(Request req){
       String bizSeq = req.getBizSeq();
       int rows= "update transfr_flow set status=2 where biz_seq=#{bizSeq} and status=1;"
       if(rows==1){
          log.info(“更新成功,可以处理该请求”);
          //其他业务逻辑处理
          return rsp;
       }else if(rows==0){
          log.info(“更新不成功,不处理该请求”);
          //不处理,直接返回
          return rsp;
       }
       
       log.warn("数据异常")
       return rsp:
    }
    

    状态机是怎么实现幂等的呢?

    • 第 1 次请求来时,bizSeq 流水号是 666​,该流水的状态是处理中,值是 1​,要更新为 2-成功的状态​,所以该 update 语句可以正常更新数据,sql 执行结果的影响行数是 1,流水状态最后变成了 2。
    • 第 2 请求也过来了,如果它的流水号还是 666​,因为该流水状态已经 2-成功的状态​了,所以更新结果是 0,不会再处理业务逻辑,接口直接返回。

    抽取防重表

    5.1 和 5.2 的方案,都是建立在业务流水表上 bizSeq​的唯一性上。

    很多时候,我们业务表唯一流水号希望后端系统生成,又或者我们希望防重功能与业务表分隔开来,这时候我们可以单独搞个防重表。当然防重表也是利用主键/索引的唯一性,如果插入防重表冲突即直接返回成功,如果插入成功,即去处理请求。

    token 令牌

    token 令牌方案一般包括两个请求阶段:

    1. 客户端请求申请获取 token,服务端生成 token 返回
    2. 客户端带着 token 请求,服务端校验 token

    流程图如下:

    图片

    1. 客户端发起请求,申请获取 token。
    2. 服务端生成全局唯一的 token,保存到 redis 中(一般会设置一个过期时间),然后返回给客户端。
    3. 客户端带着 token,发起请求。
    4. 服务端去 redis 确认 token 是否存在,一般用 redis.del(token)​的方式,如果存在会删除成功,即处理业务逻辑,如果删除失败不处理业务逻辑,直接返回结果。

    这就是 Austin 消息去重处理1 的方法!!!

    悲观锁 (如 select for update)

    什么是悲观锁

    通俗点讲就是很悲观,每次去操作数据时,都觉得别人中途会修改,所以每次在拿数据的时候都会上锁。官方点讲就是,共享资源每次只给一个线程使用,其它线程阻塞,用完后再把资源转让给其它线程。

    悲观锁如何控制幂等的呢?就是加锁呀,一般配合事务来实现。

    举个更新订单的业务场景:

    假设先查出订单,如果查到的是处理中状态,就处理完业务,再然后更新订单状态为完成。如果查到订单,并且是不是处理中的状态,则直接返回

    整体的伪代码如下:

    begin;  # 1.开始事务
    select * from order where order_id='666' # 查询订单,判断状态
    if(status !=处理中){
       //非处理中状态,直接返回;
       return ;
    }
    ## 处理业务逻辑
    update order set status='完成' where order_id='666' # 更新完成
    commit; # 5.提交事务
    

    这种场景是非原子操作的,在高并发环境下,可能会造成一个业务被执行两次的问题:

    当一个请求 A 在执行中时,而另一个请求 B 也开始状态判断的操作。因为请求 A 还未来得及更改状态,所以请求 B 也能执行成功,这就导致一个业务被执行了两次。

    可以使用数据库悲观锁(select ...for update​)解决这个问题.

    begin;  # 1.开始事务
    select * from order where order_id='666' for update # 查询订单,判断状态,锁住这条记录
    if(status !=处理中){
       //非处理中状态,直接返回;
       return ;
    }
    ## 处理业务逻辑
    update order set status='完成' where order_id='666' # 更新完成
    commit; # 5.提交事务
    
    • 这里面 order_id 需要是索引主键哈,要锁住这条记录就好,如果不是索引或者主键,会锁表的!
    • 悲观锁在同一事务操作过程中,锁住了一行数据。别的请求过来只能等待,如果当前事务耗时比较长,就很影响接口性能。所以一般不建议用悲观锁做这个事情。

    乐观锁

    悲观锁有性能问题,可以试下乐观锁

    什么是乐观锁

    乐观锁在操作数据时,则非常乐观,认为别人不会同时在修改数据,因此乐观锁不会上锁。只是在执行更新的时候判断一下,在此期间别人是否修改了数据。

    怎样实现乐观锁呢?

    就是给表的加多一列 version​版本号,每次更新记录 version​都升级一下(version=version+1​)。具体流程就是先查出当前的版本号 version​,然后去更新修改数据时,确认下是不是刚刚查出的版本号,如果是才执行更新

    比如,我们更新前,先查下数据,查出的版本号是 version =1

    select order_id,version from order where order_id='666';
    

    然后使用 version =1​和 订单Id​一起作为条件,再去更新

    update order set version = version +1,status='P' where  order_id='666' and version =1
    

    最后更新成功,才可以处理业务逻辑,如果更新失败,默认为重复请求,直接返回。

    流程图如下:

    图片

    为什么版本号建议自增的呢?

    因为乐观锁存在 ABA 的问题,如果 version 版本一直是自增的就不会出现 ABA 的情况啦。

    分布式锁

    分布式锁实现幂等性的逻辑就是,请求过来时,先去尝试获得分布式锁,如果获得成功,就执行业务逻辑,反之获取失败的话,就舍弃请求直接返回成功。执行流程如下图所示:

    图片

    • 分布式锁可以使用 Redis,也可以使用 ZooKeeper,不过还是 Redis 相对好点,因为较轻量级。
    • Redis 分布式锁,可以使用命令 SET EX PX NX + 唯一流水号​实现,分布式锁的 key​必须为业务的唯一标识哈
    • Redis 执行设置 key 的动作时,要设置过期时间哈,这个过期时间不能太短,太短拦截不了重复请求,也不能设置太长,会占存储空间。


    HTTP 的幂等

    我们的接口,一般都是基于 http 的,所以我们再来聊聊 Http 的幂等吧。HTTP 请求方法主要有以下这几种,我们看下各个接口是否都是幂等的。

    • GET 方法
    • HEAD 方法
    • OPTIONS 方法
    • DELETE 方法
    • POST 方法
    • PUT 方法

    6.1 GET 方法

    HTTP 的 GET 方法用于获取资源,可以类比于数据库的 select​查询,不应该有副作用,所以是幂等的。它不会改变资源的状态,不论你调用一次还是调用多次,效果一样的,都没有副作用。

    如果你的 GET 方法是获取最近最新的新闻,不同时间点调用,返回的资源内容虽然不一样,但是最终对资源本质是没有影响的哈,所以还是幂等的。

    6.2 HEAD 方法

    HTTP HEAD 和 GET 有点像,主要区别是 HEAD​不含有呈现数据,而仅仅是 HTTP 的头信息,所以它也是幂等的。如果想判断某个资源是否存在,很多人会使用 GET​,实际上用 HEAD​则更加恰当。即 HEAD​方法通常用来做探活使用。

    6.3 OPTIONS 方法

    HTTP OPTIONS 主要用于获取当前 URL 所支持的方法,也是有点像查询,因此也是幂等的。

    6.4 DELETE 方法

    HTTP DELETE 方法用于删除资源,它是的幂等的。比如我们要删除 id=666​​的帖子,一次执行和多次执行,影响的效果是一样的呢。

    6.5 POST 方法

    HTTP POST 方法用于创建资源,可以类比于 提交信息​,显然一次和多次提交是有副作用,执行效果是不一样的,不满足幂等性

    比如:POST http://www.tianluo.com/articles 的语义是在 http://www.tianluo.com/articles 下创建一篇帖子,HTTP 响应中应包含帖子的创建状态以及帖子的 URI。两次相同的 POST 请求会在服务器端创建两份资源,它们具有不同的 URI;所以,POST 方法不具备幂等性

    6.6 PUT 方法

    HTTP PUT 方法用于创建或更新操作,所对应的 URI 是要创建或更新的资源本身,有副作用,它应该满足幂等性

    比如:PUT http://www.tianluo.com/articles/666 的语义是创建或更新 ID 为 666 的帖子。对同一 URI 进行多次 PUT 的副作用和一次 PUT 是相同的;因此,PUT 方法具有幂等性。

  5. 分布式锁

    分布式锁实现幂等性的逻辑就是,请求过来时,先去尝试获得分布式锁,如果获得成功,就执行业务逻辑,反之获取失败的话,就舍弃请求直接返回成功。执行流程如下图所示:

    图片

    • 分布式锁可以使用 Redis,也可以使用 ZooKeeper,不过还是 Redis 相对好点,因为较轻量级。
    • Redis 分布式锁,可以使用命令 SET EX PX NX + 唯一流水号​实现,分布式锁的 key​必须为业务的唯一标识哈
    • Redis 执行设置 key 的动作时,要设置过期时间哈,这个过期时间不能太短,太短拦截不了重复请求,也不能设置太长,会占存储空间。


  6. token 令牌

    token 令牌方案一般包括两个请求阶段:

    1. 客户端请求申请获取 token,服务端生成 token 返回
    2. 客户端带着 token 请求,服务端校验 token

    流程图如下:

    图片

    1. 客户端发起请求,申请获取 token。
    2. 服务端生成全局唯一的 token,保存到 redis 中(一般会设置一个过期时间),然后返回给客户端。
    3. 客户端带着 token,发起请求。
    4. 服务端去 redis 确认 token 是否存在,一般用 redis.del(token)​的方式,如果存在会删除成功,即处理业务逻辑,如果删除失败不处理业务逻辑,直接返回结果。

    这就是 Austin 消息去重处理1 的方法!!!

  7. 消息去重处理

    文章详解:Austin 消息去重详解(面试)

    前言

    之前的去重服务是基于 Redis 来实现的。每条信息发送的时候,都将该信息的 ID 信息备份一份到 Redis 中。当下次发送的时候,再去 Redis 上查询是否存在,以达到去重的效果

    现在对去重模块进行重构。采用 中介者模式(Mediator)1 + 策略模式(Strategy)1,方便引入多种去重逻辑

    使用上述两种设计模式,将去重逻辑与上层调用耦合分离,达到很好的扩展作用。

    更新

    1. 频次去重采用普通的计数去重方法,限制的是每天发送的条数。
    2. 内容去重采用的是新开发的基于 redis​中 zset​的滑动窗口去重,可以做到严格控制单位时间内的频次
    3. redis​使用 lua​脚本来保证原子性和减少网络 io​的损耗
    4. redis​的 key​增加前缀做到数据隔离(后期可能有动态更换去重方法的需求)
    5. 把具体限流去重方法从 DeduplicationService​抽取出来,DeduplicationService​只需设置构造器注入时注入的 AbstractLimitService​(具体限流去重服务)类型即可动态更换去重的方法 6、使用雪花算法生成 zset​的唯一 value​,score​使用的是当前的时间戳

    Redis Lua 脚本

    --KEYS[1]: 限流key
    --ARGV[1]: 限流窗口,毫秒
    --ARGV[2]: 当前时间戳(作为score)
    --ARGV[3]: 阈值
    --ARGV[4]: key对应的唯一value(使用雪花算法生成,确保唯一)
    
    -- 1. 移除开始窗口前的数据
    redis.call('zremrangeByScore', KEYS[1], 0, ARGV[2] - ARGV[1])
    
    -- 2. 统计当前元素数量
    local res = redis.call('zcard', KEYS[1])
    
    -- 3. 是否超过阈值
    if (res == nil) or (res < tonumber(ARGV[3])) then
    	redis.call('zadd', KEYS[1], ARGV[2], ARGV[4])
    	redis.call('expire', KEYS[1], ARGV[1] / 1000)
    	return 0
    else
    	return 1
    end
    

    ZADD key score value

    ZREMRANGEBYSCORE key min max

    • 移除 score 在 [min, max] 范围内的所有 key

    设计大纲

    image

    去重主要由以下组件组成:

    • 去重 Service:负责对外提供去重的统一接口

      • 类似于:装饰模式(Wrapper、decorator)1
    • 去重服务 Param:去重服务需要的统一封装参数

    • 去重服务 Holder:中介者模式(Mediator)1,保存着各种去重服务的中介类,用于路由各种去重逻辑

    • 去重 limit:实现去重的核心组件。需要调用 redis 上保持的信息,实现最终的去重效果

    大致流程

    Task 调用 DeduplicationRuleService 中的 deduplicaiton 方法。该方法是整个 去重模块 的总入口:

    1. 通过传递的 taskInfo 参数,构建出 deduplicationParam 去重服务封装类

    2. 使用 Param,通过 DeduplicationHolder 路由器,路由到指定的去重服务中,由该服务方法执行去重

    3. 服务方法使用一个 AbstractDeduplicationService 抽象类管理,Abstract 会直接调用 LimitService 的方法执行去重逻辑

      1. LimitService 也是一个抽象类,实现了不同的去重方法
      2. LimitService 也是直接访问 Redis 的地方。通过 Redis 获取去重相关的唯一 key

    所以最终的调用路线是:

    DeduplicationRuleService ---> DeduplicationHolder(中介者) ----> AbstractDeduplicationService ---> AbstractLimitService

    其他组件都是在此基础上进行增加的,包括那些各式各样的 builder。

    • builder 采用简单工厂模式,为的就是易于扩展,同时不需要修改上层调用代码

    使用子类而不是 if-else

    在去重模块中,遇到不同的去重逻辑,第一时间肯定会想到使用 if-else 分支处理

    但是使用 if-else 处理的话,当分支处理的内容越来越多的时候,if-else 的代码量就会越来越长,导致代码变得繁杂,不容易开发与拓展

    这时候我们就需要将原本的 if-else 改为使用父子类或者 责任链模式1,将原本的分支抽离出来。

    这样就能够让代码变得简洁、易于拓展与 debug

    与之前对比

    看了最新的代码,发现最初的去重逻辑放在了 SimpleLimitService 中了:

    • 作为其中一种去重策略存在
    package com.quan.austinhandler.deduplication.limit;
    
    
    /**
     * Description:
     * date: 2022/11/13 下午 8:10
     *
     * @author Four
     */
    @Service("SimpleLimitService")
    public class SimpleLimitService extends AbstractLimitService {
    
        private static final String LIMIT_TAG = "SP_";
    
        @Autowired
        private RedisUtils redisUtils;
    
        @Override
        public Set<String> limitFilter(AbstractDeduplicationService service, TaskInfo taskInfo, DeduplicationParam param) {
            Set<String> filterReceiver = new HashSet<>(taskInfo.getReceiver().size());
    
            // 存储redis数据
            Map<String, String> readyPutRedisReceiver = new HashMap<>(taskInfo.getReceiver().size());
    
            // 获取redis中现存的数据
            List<String> keys = deduplicationAllKey(service, taskInfo).stream()
                    .map(key -> LIMIT_TAG + key)
                    .collect(Collectors.toList());
            Map<String, String > inRedisValues = redisUtils.mGet(keys);
    
    
            for (String receiver : taskInfo.getReceiver()) {
                String key = LIMIT_TAG + deduplicationSingleKey(service, taskInfo, receiver);
                String inRedisValue = inRedisValues.get(key);
    
                if (inRedisValue != null && Integer.parseInt(inRedisValue) >= param.getCountNum()) {
                    filterReceiver.add(receiver);
                } else {
                    readyPutRedisReceiver.put(receiver,  key);     // 相当于更新对应的receiver
                }
            }
    
            // 不符合条件的用户:需要更新Redis(无记录添加,有记录则累加次数)
            putInRedis(readyPutRedisReceiver, inRedisValues, param.getDeduplicationTime());
    
            return filterReceiver;
        }
    
        // 存入redis做记录,用作去重标记
        private void putInRedis(Map<String, String> readyPutRedisReceiver, Map<String, String> inRedisValues, Long deduplicationTime) {
            Map<String, String> updateData = new HashMap<>(readyPutRedisReceiver.size());
            for (Map.Entry<String, String> entry : readyPutRedisReceiver.entrySet()) {
                String key = entry.getKey();
                if (inRedisValues.containsKey(key)) {
                    updateData.put(key, String.valueOf(Integer.parseInt(inRedisValues.get(key)) + 1));
                } else {
                    updateData.put(key, String.valueOf(AustinConstant.TRUE));
                }
            }
    
            if (CollUtil.isNotEmpty(updateData)) {
                redisUtils.pipelineSetEx(updateData, deduplicationTime);
            }
        }
    }
    

  8. 中介者模式(Mediator)

    实战使用

    Austin 中介者模式使用1

    • 通过一个 HandlerHolder,管理一个对象与一组对象直接的交互


    定义

    原文:中介者设计模式 (refactoringguru.cn)

    中介者设计模式

    中介者模式建议你停止组件之间的直接交流并使其相互独立**。** 这些组件必须调用特殊的中介者对象**,** 通过中介者对象重定向调用行为**,** 以间接的方式进行合作**。** 最终**,** 组件仅依赖于一个中介者类**,** 无需与多个其他组件相耦合**。**

    结构

    image

    中介者模式适合应用场景

    当一些对象和其他对象紧密耦合以致难以对其进行修改时**,** 可使用中介者模式**。**
    

    该模式让你将对象间的所有关系抽取成为一个单独的类**,** 以使对于特定组件的修改工作独立于其他组件**。**

    当组件因过于依赖其他组件而无法在不同应用中复用时**,** 可使用中介者模式**。**

    应用中介者模式后**,** 每个组件不再知晓其他组件的情况**。** 尽管这些组件无法直接交流**,** 但它们仍可通过中介者对象进行间接交流**。** 如果你希望在不同应用中复用一个组件**,** 则需要为其提供一个新的中介者类**。**

    如果为了能在不同情景下复用一些基本行为**,** 导致你需要被迫创建大量组件子类时**,** 可使用中介者模式**。**

    由于所有组件间关系都被包含在中介者中**,** 因此你无需修改组件就能方便地新建中介者类以定义新的组件合作方式**。**

    实现方式

    1. 找到一组当前紧密耦合**,** 且提供其独立性能带来更大好处的类**(例如更易于维护或更方便复用**)
    2. 声明中介者接口并描述中介者和各种组件之间所需的交流接口**。** 在绝大多数情况下**,** 一个接收组件通知的方法就足够了**。**
      如果你希望在不同情景下复用组件类**,** 那么该接口将非常重要**。** 只要组件使用通用接口与其中介者合作**,** 你就能将该组件与不同实现中的中介者进行连接**。**
    3. 实现具体中介者类**。** 该类可从自行保存其下所有组件的引用中受益**。**
    4. 你可以更进一步**,** 让中介者负责组件对象的创建和销毁**。** 此后**,** 中介者可能会与工厂外观类似**。**
    5. 组件必须保存对于中介者对象的引用**。** 该连接通常在组件的构造函数中建立**,** 该函数会将中介者对象作为参数传递**。**
    6. 修改组件代码**,** 使其可调用中介者的通知方法**,** 而非其他组件的方法**。** 然后将调用其他组件的代码抽取到中介者类中**,** 并在中介者接收到该组件通知时执行这些代码**。**

    中介者模式优缺点

    优点:

    • 单职一责​**。** 你可以将多个组件间的交流抽取到同一位置**,** 使其更易于理解和维护**。**
    • 开原闭则​**。** 你无需修改实际组件就能增加新的中介者**。**
    • 你可以减轻应用中多个组件间的耦合情况**。**
    • 你可以更方便地复用各个组件**。**

    缺点:

    • 一段时间后**,** 中介者可能会演化成为上帝对象

      • 负责过多的职责,导致这个类过于臃肿

    与其他模式的关联

    • 责任链模式​**、** 和用于处理请求发送者和接收者之间的不同连接方式

      • 责链按照顺序将请求动态传递给一系列的潜在接收者**,** 直至其中一名接收者对请求进行处理**。**
      • 在发送者和请求者之间建立单向连接**。**
      • 中者清除了发送者和请求者之间的直接连接**,** 强制它们通过一个中介对象进行间接沟通**。**
      • 观者允许接收者动态地订阅或取消接收请求**。**
    • 外观模式中介者的职责类似**:** 它们都尝试在大量紧密耦合的类中组织起合作**。**

      • 为子系统中的所有对象定义了一个简单接口**,** 但是它不提供任何新功能**。** 子系统本身不会意识到外观的存在**。** 子系统中的对象可以直接进行交流**。**
      • 中者将系统中组件的沟通行为中心化**。** 各组件只知道中介者对象**,** 无法直接相互交流**。**
    • 中介者观察者之间的区别往往很难记住**。** 在大部分情况下**,** 你可以使用其中一种模式**,** 而有时可以同时使用**。** 让我们来看看如何做到这一点**。**

      • 中者的主要目标是消除一系列系统组件之间的相互依赖**。** 这些组件将依赖于同一个中介者对象**。** 观者的目标是在对象之间建立动态的单向连接**,** 使得部分对象可作为其他对象的附属发挥作用**。**
        有一种流行的中介者模式实现方式依赖于观者​**。** 中介者对象担当发布者的角色**,** 其他组件则作为订阅者**,** 可以订阅中介者的事件或取消订阅**。** 当中者以这种方式实现时**,** 它可能看上去与观者非常相似**。**
        当你感到疑惑时**,** 记住可以采用其他方式来实现中介者**。** 例如**,** 你可永久性地将所有组件链接到同一个中介者对象**。** 这种实现方式和观者并不相同**,** 但这仍是一种中介者模式**。**
        假设有一个程序**,** 其所有的组件都变成了发布者**,** 它们之间可以相互建立动态连接**。** 这样程序中就没有中心化的中介者对象**,** 而只有一些分布式的观察者**。**


    实例代码

    以公共聊天室为例,最小单元示例步骤:

    • 1、创建中介类。
      public class CharRoom {
          public static void showMessage(User user, String message) {
              Log.e("---", new Date().toString()
                      + " [" + user.getName() + "] : " + message);
          }
      }
            
          
    
    • 2、创建 user 类。

      public class User {
          private String name;
      
          public User(String name) {
              this.name = name;
          }
      
          public String getName() {
              return name;
          }
      
          public void setName(String name) {
              this.name = name;
          }
      
          public void sendMessage(String message) {
          	  // 使用中介类
              CharRoom.showMessage(this, message);
          }
      }
      
      
      
    • 3、使用 User 对象来显示他们之间的通信。

          User jingbin = new User("jingbin");
          jingbin.sendMessage("Hi~ youlookwhat!");
          //---: Sun Feb 02 08:11:47 GMT+00:00 2020 [jingbin] : Hi~ youlookwhat!
      
          User jingbin = new User("youlookwhat");
          jingbin.sendMessage("Hi~ jingbin!");
          //---: Sun Feb 02 08:11:49 GMT+00:00 2020 [youlookwhat] : Hi~ jingbin!
      
  9. 代码中的实现逻辑

    2022-10-4_1665025350715

    • 对于 “一对多” 的处理方式,均采用 Map 实现路由器的操作
    • 方便对代码进行统一管理!

  10. 策略模式(Strategy)

    定义

    如果一个类实现某种功能可以用不同的方法,那么策略模式就是对这些 ”不同的方法“ 进行封装的方式,它可以使这些算法按照需求进行替换,使得这个类实现功能的时候可以选择不同的方法。

    策略模式是针对程序运行时的算法的。

    真实世界类比

    各种出行策略

    各种前往机场的出行策略

    假如你需要前往机场**。** 你可以选择乘坐公共汽车**、** 预约出租车或骑自行车**。** 这些就是你的出行策略**。** 你可以根据预算或时间等因素来选择其中一种策略**。**

    策略模式适合应用场景

    当你想使用对象中各种不同的算法变体**,** 并希望能在运行时切换算法时**,** 可使用策略模式**。**
    
    • 策略模式让你能够将对象关联至可以不同方式执行特定子任务的不同子对象**,** 从而以间接方式在运行时更改对象行为**。**

    当你有许多仅在执行某些行为时略有不同的相似类时**,** 可使用策略模式**。**

    • 策略模式让你能将不同行为抽取到一个独立类层次结构中**,** 并将原始类组合成同一个**,** 从而减少重复代码**。**

    如果算法在上下文的逻辑中不是特别重要**,** 使用该模式能将类的业务逻辑与其算法实现细节隔离开来**。**

    • 策略模式让你能将各种算法的代码**、** 内部数据和依赖关系与其他代码隔离开来**。** 不同客户端可通过一个简单接口执行算法**,** 并能在运行时进行切换**。**

    结构

    image

    实例代码

    原本的实现

    假设有一个 Role 接口,里面定义了很多方法。我们需要创建 RoleA、RoleB 去实现这个接口。

    这样的话,A 和 B 都必须将 Role 接口中的所有方法都实现了,才能够正常运行。A 和 B 中的方法有可能出现重复情况,这时候就有冗余代码:

        public RoleB(String name) {
            this.name = name;
        }
    
        @Override
        protected void display() {
            Log.e("", "样子2");
        }
    
        @Override
        protected void run() {
            Log.e("", "金蝉脱壳");//拷贝,显得冗余
        }
    
        @Override
        protected void attack() {
            Log.e("", "降龙十八掌");//拷贝,显得冗余
        }
    
        @Override
        protected void defend() {
            Log.e("", "铁布衫");
        }
    
    • A 与 B 出现重复的地方

    将方法给抽离出来

    我们将里面的方法抽象为一个接口,这样我们就可以根据算法的不同来创建不同的实现类的。而且 RoleA、RoleB 的实现,只需要导入两者需要的实现类,而不需要在这两者中重复实现

    public abstract class Role {
    
        protected String name;
    
        private IDisplayBehavior iDisplayBehavior;
        private IDefendBehavior iDefendBehavior;
        private IRunBehavior iRunBehavior;
        private IAttackBehavior iAttackBehavior;
    

    image

    • 实现类

    每个 Role 实现类按需导入:(相当于选择自己适合的 ”策略”)

    RoleA roleA = new RoleA("---A");
    roleA.setiDisplayBehavior(new DisplayYZ())
          .setiAttackBehavior(new AttackXL())
          .setiDefendBehavior(new DefendTMS())
          .setiRunBehavior(new RunJCTQ());
    roleA.display();// 样子
    roleA.attack();// 攻击
    roleA.run();// 逃跑
    roleA.defend();// 防御
    

    本质上是通过接口将实现类进行抽离,这样就能够尽可能将冗余代码消除。

  11. 装饰模式(Wrapper、decorator)

    装饰设计(装饰者模式 / 装饰器模式) (refactoringguru.cn)

    定义

    装饰模式是一种结构型设计模式**,** 允许你通过将对象放入包含行为的特殊封装对象中来为原对象绑定新的行为**。**

    装饰设计模式

    动态地给子类添加额外更多的功能。

    并且在不需要额外创造更多子类的情况下,将对象的功能继续扩展

    代码拓展问题

    在项目开发后期,如果需要对该类进行拓展的话,有可能会造成代码量的激增,并且使代码难以管理

    假设你正在开发一个提供通知功能的库**,** 其他程序可使用它向用户发送关于重要事件的通知**。**

    库的最初版本基于** 通知器Noti­fi­er 类,** 其中只有很少的几个成员变量**,** 一个构造函数和一个** send发送方法。** 该方法可以接收来自客户端的消息参数**,** 并将该消息发送给一系列的邮箱**,** 邮箱列表则是通过构造函数传递给通知器的**。** 作为客户端的第三方程序仅会创建和配置通知器对象一次**,** 然后在有重要事件发生时对其进行调用**。**

    使用装饰模式前的库结构

    程序可以使用通知器类向预定义的邮箱发送重要事件通知。

    此后某个时刻**,** 你会发现库的用户希望使用除邮件通知之外的功能**。** 许多用户会希望接收关于紧急事件的手机短信**,** 还有些用户希望在微信上接收消息**,** 而公司用户则希望在 QQ 上接收消息**。**

    实现其他类型通知后的库结构

    每种通知类型都将作为通知器的一个子类得以实现。

    这有什么难的呢**?** 首先扩展** 通知器,** 然后在新的子类中加入额外的通知方法**。** 现在客户端要对所需通知形式的对应类进行初始化**,** 然后使用该类发送后续所有的通知消息**。**

    但是很快有人会问**:** ​** “为什么不同时使用多种通知形式呢?** 如果房子着火了**,** 你大概会想在所有渠道中都收到相同的消息吧**。 **

    你可以尝试创建一个特殊子类来将多种通知方法组合在一起以解决该问题**。** 但这种方式会使得代码量迅速膨胀**,** 不仅仅是程序库代码**,** 客户端代码也会如此**。**

    创建组合类后的程序库结构

    子类组合数量爆炸。

    你必须找到其他方法来规划通知类的结构**,** 否则它们的数量会在不经意之间打破吉尼斯纪录**。**

    真实世界类比

    装饰模式示例

    穿上多件衣服将获得组合性的效果。

    穿衣服是使用装饰的一个例子**。** 觉得冷时**,** 你可以穿一件毛衣**。** 如果穿毛衣还觉得冷**,** 你可以再套上一件夹克**。** 如果遇到下雨**,** 你还可以再穿一件雨衣**。** 所有这些衣物都** “扩展”** 了你的基本行为**,** 但它们并不是你的一部分**,** 如果你不再需要某件衣物**,** 可以方便地随时脱掉**。**

    使代码对修改 关闭,对增加 开发

    结构

    image

    实现代码

    装饰者模式:若要扩展功能,装饰者提供了比集成更有弹性的替代方案,动态地将责任附加到对象上。

    • 先简单描述下装饰者模式发挥作用的地方,当我们设计好了一个类,我们需要给这个类添加一些辅助的功能,并且不希望改变这个类的代码,这时候就是装饰者模式大展雄威的时候了。这里还体现了一个原则:类应该对扩展开放,对修改关闭。

    • 需求:设计游戏的装备系统,基本要求,要可以计算出每种装备在镶嵌了各种宝石后的攻击力和描述:

    • 1、装备的超类:IEquip.java

      • public interface IEquip {
        
            /**
             * 计算攻击力
             */
            public int caculateAttack();
        
            /**
             * 装备的描述
             */
            public String description();
        }
        
    • 2、各个装备的实现类:

      • eg:武器的实现类: ArmEquip.java
      • public class ArmEquip implements IEquip {
        
            @Override
            public int caculateAttack() {
                return 20;
            }
        
            @Override
            public String description() {
                return "屠龙宝刀";
            }
        }
        
    • 3、装饰品的超类(装饰品也属于装备):IEquipDecorator.java

      • public interface IEuipDecorator extends IEquip {
        }
        
    • 4、装饰品的实现类:

      • eg:蓝宝石的实现类(可累加): BlueGemDecorator.java

        • public class BlueGemDecorator implements IEuipDecorator {
          
              private IEquip iEquip;
          
              public BlueGemDecorator(IEquip iEquip) {
                  this.iEquip = iEquip;
              }
          
              /**
               * 累加攻击力
               */
              @Override
              public int caculateAttack() {
                  return 5 + iEquip.caculateAttack();
              }
          
              @Override
              public String description() {
                  return iEquip.description() + "+ 蓝宝石";
              }
          }
          
      • 红宝石装饰类

        • public class RedGemDecorator implements IEuipDecorator {
          
              private IEquip iEquip;
          
              public RedGemDecorator(IEquip iEquip) {
                  this.iEquip = iEquip;
              }
          
              /**
               * 累加攻击力
               */
              @Override
              public int caculateAttack() {
                  return 15 + iEquip.caculateAttack();
              }
          
              @Override
              public String description() {
                  return iEquip.description() + "+ 红宝石";
              }
          }
          
    • 5、最后测试:计算攻击力和查看描述:

       Log.e("---", "一个镶嵌2颗红宝石,1颗蓝宝石的靴子: ");
       IEquip iEquip = new RedGemDecorator(new RedGemDecorator(new BlueGemDecorator(new ShoeEquip())));
       Log.e("---", "攻击力:" + iEquip.caculateAttack());
       Log.e("---", "描述语:" + iEquip.description());
      
      
      

    上述装饰类都继承自一个基类 IEquip。同时,每个装饰类中,都有 IEquip 类

    这样就能够实现 “嵌套” 装饰了!!!

  12. 责任链模式

    关联项目

    项目中的责任链模式主要在 support 模块下的 pipeline 包中:

    image.png

    责任链通用实现

    现在我就默认大家都知道什么是责任链模式了,如果还对这个不懂的同学,可以先看看我之前的文章。

    图片

    首先,我们会有一个业务执行器接口,所有的业务实现都会实现该接口,这意味着上图的 逻辑A、B、C 都会实现这个接口

    /**
     * 业务执行器
     * @author 三歪
     */
    public interface BusinessProcess {
        void process(ProcessContext context);
    }
    

    可以看到的是接口异常的简单,只有一个 process 处理的方法,方法接收的是 ProcessContext

    为什么 process 方法需要接收 ProcessContext?很简单,我们在处理 逻辑A、B、C 的时候,可能 逻辑B 需要依赖 逻辑A 的处理结果。于是我们就需要有一个载体把这些给记录下来。

    所以,我们就有了 ProcessContext,它代表的是责任链的上下文。

    /**
     * 责任链上下文
     * @author 3y
     */
    public class ProcessContext {
        // 标识责任链的code
        private String code;
        // 存储上下文的真正载体
        private Model model;
        // 责任链中断的标识
        private Boolean needBreak = false;
    }
    
    

    现在责任链的执行器责任链所涉及的上下文都已经有了,这意味着我们已经有了责任链最主要的抽象了。

    接下来就是我们需要把链给串起来,于是我们需要一个 模板 ,其实我们做的就是用一个 List 来把 BusinessProcess 的子类给串起来。

    /**
     * 业务执行模板(把责任链的逻辑串起来)
     * @author 3y
     */
    public class ProcessTemplate {
        private List<BusinessProcess> processList;
        public List<BusinessProcess> getProcessList() {
            return processList;
        }
        public void setProcessList(List<BusinessProcess> processList) {
            this.processList = processList;
        }
    }
    

    OK,现在我们已经把责任链的整块给抽象好了,接下来就是暴露流程控制器去执行这个责任链:

    /**
     * 责任链的流程控制器(整个责任链的执行流程通用控制)
     * @author 3y 
     */
    @Data
    public class ProcessController {
        
        // 不同的code 对应不同的责任链
        private Map<String, ProcessTemplate> templateConfig = null;
    
        public void process(ProcessContext context) {
            //根据上下文的Code 执行不同的责任链
            String businessCode = context.getCode();
            ProcessTemplate processTemplate = templateConfig.get(businessCode);
            List<BusinessProcess> actionList = processTemplate.getProcessList();
            //遍历某个责任链的流程节点
            for (BusinessProcess action : actionList) {
                try {
                    action.process(context);
                    if (context.getNeedBreak()) {
                        break;
                    }
                } catch (Exception e2) {
                    //...
                }
            }
        }
    }
    

    我们可以看到的是在 ProcessController 执行链通用的流程控制器上会有一个 Map 去存储多个责任链的模板,这样做的好处就是:ProcessController 这个流程控制器可以根据 code 支持多个责任链执行。

    接下来就是我们有具体的 BusinessProcess 去加入到 ProcessTemplate 的链上,然后调用 ProcessController 的方法去执行整一条推送链。

    一般我们在 XML 注入就好了,比如说现在我们有两个 BusinessProcess 的实现,分别是白名单和发消息的逻辑:

    /**
     * 白名单处理器
     * @author 3y
     */
    @Service
    public class WhiteListProcess implements BusinessProcess {
        @Override
        public void process(ProcessContext context) {
            UserModel user = (UserModel) context.getModel();
            if ("3y".equals(user.getName())) {
                context.setNeedBreak(true);
            }
        }
    }
    
    /**
     * 发消息处理器
     * @author 三歪
     */
    @Service
    public class SendMessageProcess implements BusinessProcess {
    
        @Override
        public void process(ProcessContext context) {
            UserModel user = (UserModel) context.getModel();
            System.out.println("给"+user.getName()+"发消息");
        }
    }
    

    然后我们把上面两个处理器添加到 ProcessTemplate 的模板上,把 ProcessTemplate 添加到 ProcessControllerMap 上:

    <!--发送消息的责任链-->
    <bean id="sendMessageTemplate" class="com.chainofresponsibility.ProcessTemplate">
      <property name="processList">
        <list>
          <ref bean="whiteListProcess"></ref>
          <ref bean="sendMessageProcess"></ref>
        </list>
      </property>
    </bean>
    
    <!--通用流程处理器,维护多条责任链-->
    <bean id="processController" class="com.chainofresponsibility.ProcessController">
      <property name="templateConfig">
        <map>
          <entry key="sendMessage" value-ref="sendMessageTemplate" />
        </map>
      </property>
    </bean>
    

    可以用 springboot 代替掉 xml 的注入

    然后我们在接口里边执行这个责任链:

    @RestController
    public class UserController {
        @Autowired
        private ProcessController processController;
    
        @RequestMapping("/send")
        public void  send(String userName) {
            // 构建上下文
            ProcessContext processContext = new ProcessContext();
    
            UserModel userModel = new UserModel();
            userModel.setAge("24");
            userModel.setName(userName);
            processContext.setModel(userModel);
    
            processContext.setCode("sendMessage");
    
            processController.process(processContext);
        }
    }
    

    我做了这么大的一套东西实现了什么功能?其实就一个 if 逻辑:

    if ("3y".equals(userModel.getName())) {
      return;
    }
    System.out.println("给" + userModel.getName() + "发消息");
    

    下面我们还是来看看效果,从功能上我们可以发现,只要我们输入的不是「3y」,那就会打印消息

    图片

    上面的逻辑,实际上就是一套通用的责任链的代码,最核心的其实就是四个角色:「业务抽象接口」、「执行过程中的上下文」、「将业务实现类串起来」和「一个通用的控制器执行责任链」

    图片

    如果没看懂的同学, 三歪建议再对比一下代码看看 ,责任链这种设计模式是非常好用,在项目里边也是非常常见的。

    只要把 BusinessProcess/ProcessContext/ProcessTemplate/ProcessController 的代码给拷过去自己的项目中,这就能帮你把原有的 if else 逻辑给干掉。

    Pipeline

    不知道大家看过 Pipeline 这个词了没,在学 Redis 的时候可能会见过,在 Redis 里边我们会用 Pipeline 去做批量的操作。

    抛开 Redis 的 Pipeline,但从宏观的角度上来,Pipeline 其实是一种架构思想。

    同时我也认为它是「责任链模式」的实现之一。

    下面来看看我这边的一个 Pipeline 实现的架构图:

    图片

    可以看到前人实现的 Pipepline 还是相对复杂的,没有上面 通用 的责任链模式好理解,经过分析可以看到都是换汤不换药的。

    下次再见到 Pipeline 这个词的时候(因为这个词还是很常见的),你们就应该能想到 责任链模式,然后你就发现你看懂了


  13. 状态机幂等

    很多业务表,都是有状态的,比如转账流水表,就会有 0-待处理,1-处理中、2-成功、3-失败状态​。转账流水更新的时候,都会涉及流水状态更新,即涉及状态机 (即状态变更图)。我们可以利用状态机实现幂等,一起来看下它是怎么实现的。

    比如转账成功后,把处理中的转账流水更新为成功状态,SQL 这么写:

    update transfr_flow set status=2 where biz_seq=‘666’ and status=1;
    

    简要流程图如下:

    图片

    伪代码实现如下:

    Rsp idempotentTransfer(Request req){
       String bizSeq = req.getBizSeq();
       int rows= "update transfr_flow set status=2 where biz_seq=#{bizSeq} and status=1;"
       if(rows==1){
          log.info(“更新成功,可以处理该请求”);
          //其他业务逻辑处理
          return rsp;
       }else if(rows==0){
          log.info(“更新不成功,不处理该请求”);
          //不处理,直接返回
          return rsp;
       }
       
       log.warn("数据异常")
       return rsp:
    }
    

    状态机是怎么实现幂等的呢?

    • 第 1 次请求来时,bizSeq 流水号是 666​,该流水的状态是处理中,值是 1​,要更新为 2-成功的状态​,所以该 update 语句可以正常更新数据,sql 执行结果的影响行数是 1,流水状态最后变成了 2。
    • 第 2 请求也过来了,如果它的流水号还是 666​,因为该流水状态已经 2-成功的状态​了,所以更新结果是 0,不会再处理业务逻辑,接口直接返回。

  14. 归并排序模板

    数组归并

    • 基本思路:借助额外空间,合并两个有序数组,得到更长的有序数组。例如:「力扣」第 88 题:合并两个有序数组
    • 算法思想:分而治之(分治思想)。「分而治之」思想的形象理解是「曹冲称象」、MapReduce,在一定情况下可以并行化。

    class Solution {
        //归并排序
        public int[] sortArray(int[] nums) {
           return mergeSort(nums, 0, nums.length-1);
             
        }
        public int[] mergeSort(int[] nums, int left, int right){
            //递归退出条件
            //如果左指针大于右指针,就退出循环
            //经过左右拆分,数组元素形成单个元素的树
            if(left >=right){
                return nums;
            }
            //数组中的中位数
            int mid = (right+left)/2;
            //数组左拆分
            mergeSort(nums, left, mid);
            //数组右拆分
            mergeSort(nums, mid+1, right);
            //数组合并,将单个元素进行合并
            return merge(nums, left, mid, right);
        }
        public int[] merge(int[] nums, int left, int mid, int right){
            //定义一个临时数组,存储排序好的元素
            int[] temp = new int[right-left+1];
            //左排序的元素数组的左指针
            int i = left;
            //右排序的元素数组的左指针
            int j = mid+1;
            //定义一个指向临时数组的左指针
            int t = 0;
            //循环判断条件
            //左数组到mid,右数组到right
            //左右数组都有元素的时候,进行比较
            while(i<=mid&&j<=right){
                //取左右数组中较小的元素,填入临时数组中
                //并将较小元素所在数组的左指针和临时数组的左指针都一起右移
                if(nums[i]<=nums[j]){
                    temp[t++] = nums[i++];
                }else{
                    temp[t++] = nums[j++];
                }
            }
            //当左右数组其中一个数组没有元素的时候
            //如果左数组中还有剩余元素,就将剩余元素全部加入到临时数组中
            while(i<=mid){
                temp[t++]=nums[i++];
            }
            //如果有数组中还有元素,就将剩余元素全部加入到临时数组中
            while(j<=right){
                temp[t++] = nums[j++];
            }
            //将临时数组的元素复制到原数组中去
            for(int k = 0; k<temp.length;k++){
                //特别注意这便nums数组起始位置要从 k+left开始 
                //原因在加右数组的时候,起始位置要加left
                //这里不理解,直接把它记住。
                nums[left+k]=temp[k];
            }
            //返回原数组
            return nums;
        }
    }
    


    链表归并

    前言

    本来这是快手三面的算法题。我告诉面试官“我没做过归并算法,我尝试一下”,结果硬是给我搓出来了。。。

    而且算法逻辑上还基本没什么问题的。。。但就是无法输出正确答案。。。面试官都找不到问题所在。。。。

    • 不过还好最后过了,幸亏一开始就拼命强调“我没做过”,所以面试官认为我代码能力很强!
    • 而且当时我写得特别快。。。。进入状态了!

    题目难点

    这道题融合了多道算法思路:

    1. 使用快慢指针(双指针)1 找中点
      1. 合并两个有序链表1
    2. 链表截断,然后才能继续递归

    当时快速写出的写法

    import java.util.*;
    public class Main {
        public static void main(String[] args) {
            // 1 4 6 -2 10
            Node head = new Node(0), cur = head;
            cur.next= new Node(1);  cur = cur.next;
            cur.next = new Node(4); cur = cur.next;
            cur.next = new Node(6); cur = cur.next;
            cur.next = new Node(-2); cur = cur.next;
            cur.next = new Node(10);
    
            Node sortLsit = mergeSort(head.next);
            cur = sortLsit;
            while (cur != null) {
                System.out.println(cur.val);
                cur = cur.next;
            }
        }
    
        // 升序
        static Node mergeSort(Node head) {
            // 单个点
            if (head == null || head.next == null) return head;
    
            // 两个值,判断并交换
            if (head.next != null) {
                if (head.next.val < head.val) {
                    int temp = head.val;
                    head.val = head.next.val;
                    head.next.val = temp;
                }
                return head;
            }
    
            // 拆分,并递归(对半分)
            // 快慢指针找中点
            Node fast = head, slow = fast;
            while (fast != null && fast.next != null) {
                fast = fast.next.next;
                slow = slow.next;
            }
            // 截断链表,并进入下一个递归
            Node first = head, second = slow.next;
            slow.next = null;
    
            Node c1 = mergeSort(first);
            Node c2 = mergeSort(second);
    
            // 合并有序链表
            Node newArr = mergeList(c1, c2);
    
            return newArr;
        }
    
        static Node mergeList(Node first, Node second) {
            if (first == null) return second;
            if (second == null) return first;
    
            Node dummy = new Node(-1), cur = dummy;
            Node c1 = first, c2 = second;
            while (c1 != null && c2 != null) {
                if (c1.val < c2.val) {
                    cur.next = new Node(c1.val);
                    c1 = c1.next;
                } else {
                    cur.next = new Node(c2.val);
                    c2 = c2.next;
                }
    
                cur = cur.next;
            }
    
            while (c1 != null) {
                cur.next = new Node(c1.val);
                c1 = c1.next;
                cur = cur.next;
            }
    
            while (c2 != null) {
                cur.next = new Node(c2.val);
                c2 = c2.next;
                cur = cur.next;
            }
    
            return dummy.next;
        }
    }
    
    class Node {
        Node next;
        int val;
        Node(int val) {
            this.val = val;
        }
    }
    

    模板代码

    package com.quan;
    
    import java.util.*;
    public class Main {
        public static void main(String[] args) {
            // 1 4 6 -2 10
            Node head = new Node(0), cur = head;
            cur.next= new Node(1);  cur = cur.next;
            cur.next = new Node(4); cur = cur.next;
            cur.next = new Node(6); cur = cur.next;
            cur.next = new Node(-2); cur = cur.next;
            cur.next = new Node(10);
    
            Node sortLsit = mergeSort(head.next);
            cur = sortLsit;
            while (cur != null) {
                System.out.println(cur.val);
                cur = cur.next;
            }
        }
    
        // 升序
        static Node mergeSort(Node head) {
            // 单个点
            if (head == null || head.next == null) return head;
    
            // 拆分,并递归(对半分)
            // 快慢指针找中点
            Node fast = head, slow = head;
            // 要十分注意这里!!!
            while (fast.next != null && fast.next.next != null) {
                fast = fast.next.next;
                slow = slow.next;
            }
            // 截断链表,并进入下一个递归
            Node mid = slow.next;
            slow.next = null;
    
            // 进入递归
            Node left = mergeSort(head);
            Node right = mergeSort(mid);
    
            return mergeList(left, right);
        }
    
        static Node mergeList(Node first, Node second) {
            if (first == null) return second;
            if (second == null) return first;
    
            Node dummy = new Node(-1), cur = dummy;
            while (first != null && second != null) {
                if (first.val <= second.val) {
                    cur.next = new Node(first.val);
                    first = first.next;
                } else {
                    cur.next = new Node(second.val);
                    second = second.next;
                }
                // 一定不要忘记cur指针要前进
                cur = cur.next;
            }
    
            cur.next = (first != null) ? first : second;
    
            return dummy.next;
        }
    }
    
    class Node {
        Node next;
        int val;
        Node(int val) {
            this.val = val;
        }
    }
    

    golang:

    /**
     * Definition for singly-linked list.
     * type ListNode struct {
     *     Val int
     *     Next *ListNode
     * }
     */
    func sortList(head *ListNode) *ListNode {
    	if head == nil || head.Next == nil {
    		return head
    	}
    
    	// 取中点
    	fast, slow := head, head
    	for fast.Next != nil && fast.Next.Next != nil {
    		fast = fast.Next.Next
    		slow = slow.Next
    	}
    
    	first, second := head, slow.Next
    	// 截断处理
    	slow.Next = nil
    
    	return mergeList(sortList(first), sortList(second))
    }
    
    func mergeList(left, right *ListNode) *ListNode {
    	dummy := &ListNode{}
    	cur := dummy
    
    	for left != nil && right != nil {
    		if left.Val < right.Val {
    			cur.Next = &ListNode{Val: left.Val}
    			left = left.Next
    		} else {
    			cur.Next = &ListNode{Val: right.Val}
    			right = right.Next
    		}
    
    		cur = cur.Next
    	}
    
    	if left != nil {
    		cur.Next = left
    	} else {
    		cur.Next = right
    	}
    
    	return dummy.Next
    }
    

    核心代码模式

    class Solution {
        public ListNode sortList(ListNode head) {
            //如果链表为空,或者只有一个节点,直接返回即可,不用排序
            if (head == null || head.next == null)
                return head;
          
            //快慢指针移动,以寻找到中间节点
            ListNode slow = head;
            ListNode fast = head;
            while(fast.next!=null && fast.next.next !=null){
              fast = fast.next.next;
              slow = slow.next;
            }
            //找到中间节点,slow节点的next指针,指向mid
            ListNode mid = slow.next;
            //切断链表
            slow.next = null;
          
            //排序左子链表
            ListNode left = sortList(head);
            //排序左子链表
            ListNode right = sortList(mid);
          
            //合并链表
            return merge(left,right);
        }
          
        public ListNode merge(ListNode left, ListNode right) {
           ListNode head = new ListNode(0);
           ListNode temp = head;
           while (left != null && right != null) {
               if (left.val <= right.val) {
                    temp.next = left;
                    left = left.next;
                } else {
                    temp.next = right;
                    right = right.next;
                }
                temp = temp.next;
            }
              
    		temp.next = (left != null) ? left : right;
    
            return head.next;
        }
    }
    

    之前自己的错误点

    1. 使用快慢指针(双指针)1 while 条件写错

      1. 错误:while (fast != null && fast.next != null)​ —— 直接导致空指针异常!!!
      2. 正确:​while (fast.next != null && fast.next.next != null)
    2. 递归终止条件错误

      1. 不应该设置“两个节点”判断作为第二个终止条件,这个设置直接导致无法排序!!!

        1.          // 两个值,判断并交换
                   if (head.next != null) {
                       if (head.next.val < head.val) {
                           int temp = head.val;
                           head.val = head.next.val;
                           head.next.val = temp;
                       }
                       return head;
                   }
          
      2. if (head == null || head.next == null) return head;

        1. 这个就能够充当终止条件了!!!

    解决掉原有的问题后,自己的代码就直接能够跑起来了!!!

  15. 使用快慢指针(双指针)

    • 第一个指针先走,第二个指针后行。

    • 第一个指针走 k-1 步,然后第一个、第二个指针一起走。

    • 当第一个指针走到结尾时,第二个指针恰好就到了倒数 第 k 号位置!!!

    解释

    第二个指针慢走了 k-1 步,就等同于在最后减去了 k-1 步,所以第二个指针最后只能走到倒数第 k 号位置上。

  16. 21. 合并两个有序链表

    尝试

    class Solution {
        public ListNode mergeTwoLists(ListNode list1, ListNode list2) {
            if (list1 == null || list2 == null) {
                if (list1 == null) return list2;
                if (list2 == null) return list1;
            }
    
            ListNode dummyHead = new ListNode(0), cur = dummyHead;
            ListNode c1 = list1, c2 = list2;
            while (c1 != null && c2 != null) {
                if (c1.val <= c2.val) {
                    cur.next = c1;
                    c1 = c1.next;
                } else {
                    cur.next = c2;
                    c2 = c2.next;
                }
                cur = cur.next;
            }
    
            while (c1 != null) {
                cur.next = c1;
                c1 = c1.next;
                cur = cur.next;
            }
            while (c2 != null) {
                cur.next = c2;
                c2 = c2.next;
                cur = cur.next;
            }
    
            return dummyHead.next;
        }
    }
    
    • AC
    • 希望美团二面考这道题目

  • 架构

    我们平时所说的“架构”主要是指软件架构,这是有关软件整体结构与组件的抽象描述,用于指导软件系统各个方面的设计。另外还有“业务架构”、“网络架构”、“硬件架构”等细分领域。

    142 引用 • 442 回帖 • 1 关注

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...