Redis 大数据分批操作
一般来讲,如果 Redis 一次性执行太多数据的话,可能导致 Redis 服务主进程被长时间阻塞,造成其他命令的执行收到影响。
可以使用 Lua 脚本对每次执行的数量做一个限制,数据量超过一定限制后,分批量执行
代码实现
来源:DelayQueue —— golang 实现延迟队列
local unack2retry = function(msgs)
local retryCounts = redis.call('HMGet', KEYS[2], unpack(msgs)) -- get retry count
for i,v in ipairs(retryCounts) do
local k = msgs[i]
if v ~= false and v ~= nil and v ~= '' and tonumber(v) > 0 then
redis.call("HIncrBy", KEYS[2], k, -1) -- reduce retry count
redis.call("LPush", KEYS[3], k) -- add to retry
else
redis.call("HDel", KEYS[2], k) -- del retry count
redis.call("SAdd", KEYS[4], k) -- add to garbage
end
end
end
local msgs = redis.call('ZRangeByScore', KEYS[1], '0', ARGV[1]) -- get retry msg
if (#msgs == 0) then return end
if #msgs < 4000 then
unack2retry(msgs)
else
local buf = {}
for _,v in ipairs(msgs) do
table.insert(buf, v)
if #buf == 4000 then
unack2retry(buf)
buf = {}
end
end
if (#buf > 0) then
unack2retry(buf)
end
end
redis.call('ZRemRangeByScore', KEYS[1], '0', ARGV[1]) -- remove msgs from unack
-
首先定义了一个名为
unack2retry
的函数,该函数接收一个参数msgs
,表示未确认的消息列表。函数内部首先获取每个消息的重试次数,然后遍历这些消息,根据重试次数将它们分为三类:- 如果重试次数大于 0,则将其添加到重试队列(使用
LPush
命令); - 否则,将其从重试次数哈希表中删除(使用
HDel
命令),并将其添加到垃圾队列(使用SAdd
命令)。
- 如果重试次数大于 0,则将其添加到重试队列(使用
-
接下来,从 Redis 中获取指定分数范围内的未确认消息(使用
ZRangeByScore
命令)。如果没有未确认消息,则直接返回。 -
如果未确认消息的数量小于 4000,则直接调用
unack2retry
函数处理所有未确认消息。否则,将未确认消息分批处理,每批最多包含 4000 条消息。对于每批消息,将其插入到一个缓冲区(buf
)中,当缓冲区满时,调用unack2retry
函数处理缓冲区中的未确认消息,并清空缓冲区。最后,如果缓冲区中仍有未确认消息,则调用unack2retry
函数处理剩余的消息。 -
最后,从 Redis 中删除已处理过的未确认消息(使用
ZRemRangeByScore
命令)。
欢迎来到这里!
我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。
注册 关于