Redis- 探究:集群扩容导致 Jedis 客户端报 JedisMovedDataException 异常的原因

本贴最后更新于 2129 天前,其中的信息可能已经时移世改

🌹🌹 如果您觉得我的文章对您有帮助的话,记得在 GitHub 上 star 一波哈 🌹🌹

🌹🌹GitHub_awesome-it-blog 🌹🌹


0 问题的产生

由于线上 Redis 集群内存使用量已经接近达到预警阈值,需要对 Redis 集群扩容。(使用的是 Redis 自带的 Redis-Cluster)

目前有 6 台主节点,6 台从节点。

暂时称为:

  • redis-master001 ~ redis-master006
  • redis-slave001 ~ redis-slave006

需要增加 3 主 3 从。

  • redis-master007 ~ redis-master009
  • redis-slave007 ~ redis-master009

之前 Redis 集群的 16384 个槽均匀分配在 6 台主节点中,每个节点 2730 个槽。

为保证扩容后,槽依然均匀分布,需要将之前 6 台的每台机器上迁移出 910 个槽,方案如下:

  • redis-master001 的 910 个 slot 迁移到 redis-master007
  • redis-master002 的 910 个 slot 迁移到 redis-master007
  • redis-master003 的 910 个 slot 迁移到 redis-master008
  • redis-master004 的 910 个 slot 迁移到 redis-master008
  • redis-master005 的 910 个 slot 迁移到 redis-master009
  • redis-master006 的 910 个 slot 迁移到 redis-master009

分配完之后,每台节点 1820 个 slot。

当将 redis-master001 的 910 个 slot 迁移到 redis-master007 后,业务上开始报下面的异常

RedisError.jpg

在马赛克的上一行,可以看到是调 Jedis 的 get 方法出的问题。

1 原因及解决方案

问题的原因在于使用了 Jedis 客户端,改为使用 JedisCluster 客户端即可解决问题。

出问题的 get 方法是这样写的(在 Jedis 原生基础上包装了一层)

// 自己封装的get方法 public String get(String key) { String result ="; // 为了打印获取连接耗时的日志,这里单独获取了一下Jedis连接 try (Jedis jedis = this.getResourceLog(key)) { TimeCost timeCost = new TimeCost(); result = jedis.get(key); // 这里报错 debugLogger.debug("redis cluster get TimeCost={}", timeCost.getCostMillSeconds()); } // 其实改为下面这样get就可以解决一直报JedisMovedDataException问题 // return jedisCluster.get(key); return result; }

getResourceLog 方法的作用是根据 key 计算出这个 key 所在的 slot,再通过 slot 获取 Redis 连接。代码如下

private Jedis getResourceLog(String key) { TimeCost tc = new TimeCost(); int slot = JedisClusterCRC16.getSlot(key); // CRC计算slot debugLogger.debug("calc slot TimeCost={}", tc.getCostMillSeconds()); tc.reset(); Jedis jedis = connectionHandler.getConnectionFromSlot(slot); // 通过slot获取连接 debugLogger.debug("get connection TimeCost={}", tc.getCostMillSeconds()); return jedis; }

上面的 get 方法可以直接改为 JedisCluster 的 get 方法解决。

再考虑另外一种情况,如果必须通过 Jedis 操作呢?比如 watch 方法,JedisCluster 是不提供 watch 的,那么只能通过上述方法在 Redis 集群中根据 key 获取到 slot,再通过 slot 获取到 jedis 链接,然后调用 watch。这样一来,在调 watch 的地方也会报 JedisMovedDataException。

例如下面的代码,在业务上需要保证事务的情况下(或乐观锁),可能会这样实现:

Jedis jedis = null; String key = ...; // redis key try { // 通过上面的getResource方法获取jedis链接 jedis = getResource(userId); // 通过jedis watch key if (RedisConstants.SAVE_TO_REDIS_OK.equals(jedis.watch(key))) { // .... 业务逻辑 .... // .... // 通过jedis链接开始事务 Transaction transaction = jedis.multi(); // ... // ... 执行一些transaction操作... // ... // 提交事务 List<Object> execResult = transaction.exec(); return ...; } } catch (Exception ex) { // do something ... } finally { if (jedis != null) { try { if (!flag) { jedis.unwatch(); } } finally { jedis.close(); } } }

此时如果发生 slot 迁移,就会报 JedisMovedDataException。

那这种情况下的解决方案是什么呢?

其实,优先 catch 住 JedisMovedDataException,然后通过 JedisCluster.get(key);一下就行,如下:

Jedis jedis = null; String key = ...; // redis key try { // 通过上面的getResource方法获取jedis链接 jedis = getResource(userId); // 通过jedis watch key if (RedisConstants.SAVE_TO_REDIS_OK.equals(jedis.watch(key))) { // .... 业务逻辑 .... // .... // 通过jedis链接开始事务 Transaction transaction = jedis.multi(); // ... // ... 执行一些transaction操作... // ... // 提交事务 List<Object> execResult = transaction.exec(); return ...; } } catch (JedisMovedDataException jmde) { jmde.printStackTrace(); // redisClusterService中维护着jedisCluster实例,这个get实际上调用的是jedisCluster的get redisClusterService.get(key); return ...; } catch (Exception ex) { // do something ... } finally { if (jedis != null) { try { if (!flag) { jedis.unwatch(); } } finally { jedis.close(); } } }

需要注意的是,用 Jedis 的 get 是不能解决的。

2 JedisCluster 类图

JedisCluster 整体的 UML 关系如下,先有个整体的印象,在后面的源码分析中,可以再回来看。

RedisError2.jpg

3 为什么通过 RedisCluster.get 一下可以解决?

下面通过 JedisCluster 源码解释为什么这么做可以解决问题,注释中会有详细说明。

JedisCluster.get 源码如下:

@Override public String get(final String key) { return new JedisClusterCommand<String>(connectionHandler, maxAttempts) { @Override public String execute(Jedis connection) { return connection.get(key); } }.run(key); }

发现他是委托给 JedisClusterCommand 来完成 get 操作的,也可以发现 execute 方法实际上是使用 Jedis 来执行的 get。这个 Jedis 实际上就是通过上述方法,先计算出 slot,再通过 slot 获取到 Jedis 链接的。关键在于最下面 run 方法的执行,下面具体看一下。

Run 方法源码如下:

public T run(String key) { // JedisClusterCRC16.getSlot(key) 计算出slot return runWithRetries(JedisClusterCRC16.getSlot(key), this.maxAttempts, false, null); }

runWithRetries 源码如下

private T runWithRetries(final int slot, int attempts, boolean tryRandomNode, JedisRedirectionException redirect) { // 这里是一个重试机制,报异常时触发 if (attempts <= 0) { throw new JedisClusterMaxAttemptsException("No more cluster attempts left."); } Jedis connection = null; try { if (redirect != null) { connection = this.connectionHandler.getConnectionFromNode(redirect.getTargetNode()); if (redirect instanceof JedisAskDataException) { // TODO: Pipeline asking with the original command to make it faster.... connection.asking(); } } else { if (tryRandomNode) { connection = connectionHandler.getConnection(); } else { // 执行到这里,通过slot获取到Jedis connection // 内部是通过一个map维护的slot到JedisPool的映射关系 connection = connectionHandler.getConnectionFromSlot(slot); } } // 执行上面JedisClusterCommand定义的execute方法。 return execute(connection); } catch (JedisNoReachableClusterNodeException jnrcne) { throw jnrcne; } catch (JedisConnectionException jce) { // release current connection before recursion releaseConnection(connection); connection = null; if (attempts <= 1) { //We need this because if node is not reachable anymore - we need to finally initiate slots //renewing, or we can stuck with cluster state without one node in opposite case. //But now if maxAttempts = [1 or 2] we will do it too often. //TODO make tracking of successful/unsuccessful operations for node - do renewing only //if there were no successful responses from this node last few seconds this.connectionHandler.renewSlotCache(); } return runWithRetries(slot, attempts - 1, tryRandomNode, redirect); } catch (JedisRedirectionException are) { // *** 关键在这 *** // if MOVED redirection occurred, // JedisMovedDataException是JedisRedirectionException的子类,所以会执行下面if中的代码 if (jre instanceof JedisMovedDataException) { // it rebuilds cluster's slot cache recommended by Redis cluster specification // 重新通过这个jedis链接获取RedisCluster中的Node信息以及slot信息 this.connectionHandler.renewSlotCache(connection); } // release current connection before recursion releaseConnection(connection); connection = null; return runWithRetries(slot, attempts - 1, false, jre); } finally { releaseConnection(connection); } }

注释中说到了最终会通过 this.connectionHandler.renewSlotCache(connection);来重新获取 slot 信息。下面来看下这个方法。

public void renewSlotCache(Jedis jedis) { cache.renewClusterSlots(jedis); }

调用了 cache 的 renewClusterSlots 方法来重新获取 slot 信息,这个 cache 是 JedisClusterInfoCache 类的实例,他里面维护这 Node 和 Slot 信息,如下:

public class JedisClusterInfoCache { private final Map<String, JedisPool> nodes = new HashMap<String, JedisPool>(); private final Map<Integer, JedisPool> slots = new HashMap<Integer, JedisPool>(); // .. }

renewClusterSlots 方法如下

public void renewClusterSlots(Jedis jedis) { //If rediscovering is already in process - no need to start one more same rediscovering, just return if (!rediscovering) { try { w.lock(); if (!rediscovering) { rediscovering = true; try { if (jedis != null) { try { // 关键在于这一步,这个方法会重新从远程集群中获取最新的slot信息 discoverClusterSlots(jedis); return; } catch (JedisException e) { //try nodes from all pools } } for (JedisPool jp : getShuffledNodesPool()) { Jedis j = null; try { j = jp.getResource(); discoverClusterSlots(j); return; } catch (JedisConnectionException e) { // try next nodes } finally { if (j != null) { j.close(); } } } } finally { rediscovering = false; } } } finally { w.unlock(); } } }

关键在于 discoverClusterSlots 方法,这个方法的实现如下:

private void discoverClusterSlots(Jedis jedis) { // 通过slots命令从远程获取slot信息 List<Object> slots = jedis.clusterSlots(); this.slots.clear(); // 清除本地缓存slot信息 // 每个slotInfoObj包含集群中某一节点的slot信息 for (Object slotInfoObj : slots) { List<Object> slotInfo = (List<Object>) slotInfoObj; if (slotInfo.size() <= MASTER_NODE_INDEX) { continue; } // 计算当前节点的slot信息 List<Integer> slotNums = getAssignedSlotArray(slotInfo); // hostInfos // 获取这组slot所在的节点信息 List<Object> hostInfos = (List<Object>) slotInfo.get(MASTER_NODE_INDEX); if (hostInfos.isEmpty()) { continue; } // at this time, we just use master, discard slave information HostAndPort targetNode = generateHostAndPort(hostInfos); // 重新关联这组slot到远程节点的映射,至此,完成slot信息的刷新 assignSlotsToNode(slotNums, targetNode); } }

4 为什么 Jedis 的 get 不行?

首先我们来对比一下 JedisCluster 的 get 和 Jedis 的 get

JedisCluster.get

@Override public String get(final String key) { return new JedisClusterCommand<String>(connectionHandler, maxAttempts) { @Override public String execute(Jedis connection) { return connection.get(key); // 这里追踪进去,就是Jedis.get } }.run(key); }

Jedis.get

@Override public String get(final String key) { checkIsInMultiOrPipeline(); client.get(key); return client.getBulkReply(); }

由此可知,Jedis.get 没有了 run 方法中的异常重试和重新发现机制,所以 Jedis.get 不行。

5 总结

本文从一次线上扩容引发问题的讨论,由扩容引出了 slot 的迁移,由 slot 的迁移引出线上报错-JedisMovedDataException,然后说明了引发这个异常的原因,是因为我们使用了 Jedis 客户端,导致无法自动发现远程集群 slot 的变化。

然后提出了解决方案,通过使用 JedisCluster 来解决无法自动发现 slot 变化的问题。并从源码的角度说明了为什么 JedisCluster 的 get 方法可以自动发现远程 slot 的变化。

  • Redis

    Redis 是一个开源的使用 ANSI C 语言编写、支持网络、可基于内存亦可持久化的日志型、Key-Value 数据库,并提供多种语言的 API。从 2010 年 3 月 15 日起,Redis 的开发工作由 VMware 主持。从 2013 年 5 月开始,Redis 的开发由 Pivotal 赞助。

    284 引用 • 248 回帖
  • Jedis
    7 引用 • 11 回帖
  • 扩容
    2 引用

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...