下面我给出一个基于 Redis Stream 的完整示例,包括生产者和消费者两个部分,演示如何写入消息、创建消费者组、消费消息并确认处理。
依赖环境
此示例使用 Lua 伪代码,假设你有一个支持 Redis 命令的客户端库,例如 redis-lua
、python-redis
或者其他语言的客户端,调用方式类似即可。
1. 生产者示例代码
local redis = require "redis"
local client = redis.connect("127.0.0.1", 6379)
local stream_key = "mystream"
-- 生产者发送消息函数
local function produce_message(task_type, user)
-- XADD mystream * task <task_type> user <user>
local message_id = client:xadd(stream_key, "*", "task", task_type, "user", user)
print("已发送消息,ID: " .. message_id)
end
-- 示例发送几条消息
produce_message("email", "alice")
produce_message("sms", "bob")
produce_message("push", "carol")
2. 创建消费者组(只需运行一次)
你可以用命令行或者代码创建消费者组,下面是Lua调用示例:
local redis = require "redis"
local client = redis.connect("127.0.0.1", 6379)
local stream_key = "mystream"
local group_name = "mygroup"
-- 创建消费者组,如果已存在会报错,可以捕获忽略
local ok, err = pcall(function()
client:xgroup("CREATE", stream_key, group_name, "0")
end)
if ok then
print("消费者组创建成功")
else
print("消费者组可能已存在,错误信息:" .. tostring(err))
end
3. 消费者示例代码
消费者会阻塞等待新消息,读取后处理并确认。
local redis = require "redis"
local client = redis.connect("127.0.0.1", 6379)
local stream_key = "mystream"
local group_name = "mygroup"
local consumer_name = "consumer1"
local block_time = 5000 -- 毫秒,阻塞等待时间
while true do
-- XREADGROUP GROUP mygroup consumer1 COUNT 1 BLOCK 5000 STREAMS mystream >
local streams = client:xreadgroup("GROUP", group_name, consumer_name,
"COUNT", 1,
"BLOCK", block_time,
"STREAMS", stream_key, ">")
if streams then
-- streams 格式: { {stream_key, { {message_id, {field1, value1, ...}}, ... } } }
for _, stream_data in ipairs(streams) do
local key = stream_data[1]
local messages = stream_data[2]
for _, message in ipairs(messages) do
local message_id = message[1]
local fields = message[2] -- 是一个数组,奇数位是字段名,偶数位是值
-- 解析字段(示例只取task和user)
local task_type, user
for i = 1, #fields, 2 do
if fields[i] == "task" then
task_type = fields[i+1]
elseif fields[i] == "user" then
user = fields[i+1]
end
end
print(string.format("收到消息ID=%s,任务类型=%s,用户=%s", message_id, task_type, user))
-- 模拟业务处理
local success = true -- 这里可以替换成实际处理逻辑及异常捕获
if success then
-- 处理成功,确认消息
client:xack(stream_key, group_name, message_id)
print("消息已确认,ID=" .. message_id)
else
-- 处理失败,可记录日志或重试
print("消息处理失败,ID=" .. message_id)
end
end
end
else
-- 阻塞超时无消息
print("无新消息,继续等待...")
end
end
4. 说明
- 生产者通过
XADD
写入消息,消息自动带有ID。
- 消费者组只需创建一次,并且支持多消费者并发消费。
- 消费者通过
XREADGROUP
阻塞等待消息,拉取未分发的消息。
- 处理完成后调用
XACK
确认消息,消息才会被移出 Pending List。
- 如果消费者异常退出,未确认消息会一直留在 Pending List,可以通过
XPENDING
和 XCLAIM
命令实现消息重试。