news 2026/6/15 0:56:59

Redis 从入门到精通:Redis Stream —— 可靠消息队列

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
Redis 从入门到精通:Redis Stream —— 可靠消息队列

IT策士 10余年一线大厂经验,专注 IT 思维、架构、职场进阶。我会在各个平台持续发布最新文章,助你少走弯路。

前面我们学了 List 做队列、Pub/Sub 做广播,但它们都有一个硬伤:消息可靠性不足。List 弹出的消息就没了,客户端崩溃则消息丢失;Pub/Sub 干脆不持久化,订阅者不在线时消息直接蒸发。对于订单处理、异步任务、日志收集这类不能丢消息的场景,需要的是可靠消息队列

Redis 5.0 推出的Stream正是为此而生。它像 Kafka 一样支持消息持久化、消费者组、ACK 确认和消息回溯,又保持了 Redis 的简洁与高性能。本文将带你从命令到 Python 实战,用 Stream 构建一个真正生产可用的消息队列。

1. Stream 是什么?为什么要用?

Stream 是 Redis 追加日志型数据结构,用于存储时间序列的消息。每条消息有一个全局唯一的 ID 和若干键值对。它的核心能力:

  • 消息持久化:消息写入 Stream 后不会因消费者离线而丢失。

  • 消费者组:同组消费者竞争消费同一条消息,实现负载均衡。

  • ACK 确认:消费者处理完后发送XACK,消息从“待确认”变为“已确认”。

  • 消息回溯:可以按 ID 重新消费历史消息,不会像 List 一样弹出即销毁。

  • 阻塞读取XREAD可阻塞等待新消息,避免空轮询。

对比其他队列方案:

一句话总结:要可靠,上 Stream

2. 核心命令速览

2.1 添加消息:XADD

127.0.0.1:6379>XADD orders * action create user_id1001amount99.9"1680000000000-0"
  • orders是 Stream 的 key。

  • *表示让 Redis 自动生成消息 ID(格式:毫秒时间戳-序号)。

  • 后面跟着若干 field-value 对,构成了消息体。

  • 返回自动生成的 ID。

可以手动指定 ID,但强烈建议用*自动生成,保证单调递增。

2.2 读取消息:XREAD

# 读取所有消息(从头开始)127.0.0.1:6379>XREAD STREAMS orders0-01)1)"orders"2)1)1)"1680000000000-0"2)1)"action"2)"create"3)"user_id"4)"1001"5)"amount"6)"99.9"# 阻塞等待新消息(类似 BRPOP)127.0.0.1:6379>XREAD BLOCK5000STREAMS orders $(nil)# 5秒内没有新消息,返回空
  • 0-0表示从头读,$表示只读最新(类似 tail -f)。

  • BLOCK毫秒数,0 表示永久阻塞。

2.3 消费者组与 XREADGROUP

Stream 可以创建多个消费者组,每组独立维护消费进度。同组内的消费者竞争消费,各自处理完后 ACK。

# 创建消费者组(从头部开始消费)127.0.0.1:6379>XGROUP CREATE orders group10-0 OK# 消费者 A 读取 group1 未确认的消息(> 表示从未消费过的新消息)127.0.0.1:6379>XREADGROUP GROUP group1 consumerA COUNT1STREAMS orders>1)1)"orders"2)1)1)"1680000000000-0"2)1)"action"2)"create"...# 确认消息处理完毕127.0.0.1:6379>XACK orders group1"1680000000000-0"(integer)1
  • >:只返回从未投递给任何消费者组内成员的新消息。

  • XACK将消息标记为已处理,从待确认列表移除。

2.4 查看待处理消息:XPENDING

如果有消费者拿到消息后崩溃,消息会一直处于待确认状态,XPENDING可以查看这些消息。

# 查看待处理消息概要127.0.0.1:6379>XPENDING orders group11)(integer)0# 待确认消息数# 如果消费者 A 挂掉,消息会显示在 pending 列表中:127.0.0.1:6379>XPENDING orders group1 - +10(列出具体的待处理消息及其空闲时间)

2.5 消息转移:XCLAIM

当一个消费者长时间未 ACK(可能已死),可以由另一个消费者通过XCLAIM将消息“抢”过来处理。

127.0.0.1:6379>XCLAIM orders group1 consumerB60000"1680000000000-0"
  • 把空闲超过 60000 毫秒的消息转交给consumerB

3. Python 实战:订单处理系统

我们用 Stream 构建一个订单处理流水线:一个生产者发布订单,多个消费者组成消费组并行处理订单,处理失败的消息重试或转移。

3.1 环境准备

确保 Redis 版本 ≥ 5.0(Docker 镜像redis:7.2满足)。

3.2 生产者:发布订单

importredisimporttimeimportjsonimportrandom r=redis.Redis(host='localhost',port=6379,decode_responses=True)STREAM_KEY='orders'GROUP_NAME='order_processors'# 创建消费者组(如果不存在)try: r.xgroup_create(STREAM_KEY, GROUP_NAME,id='0-0',mkstream=True)print(f'消费者组 {GROUP_NAME} 已创建')except redis.exceptions.ResponseError as e:if'BUSYGROUP'instr(e): print(f'消费者组 {GROUP_NAME} 已存在')else: raise def publish_order(order_id, user_id, amount):"""发布订单消息""" msg={'order_id':order_id,'user_id':user_id,'amount':amount,'timestamp':time.time()}msg_id=r.xadd(STREAM_KEY, msg)print(f'[生产者] 发布订单 {order_id}: ID {msg_id}')returnmsg_id# 模拟发布 10 个订单foriinrange(1,11): publish_order(f'ORD-{1000+i}', random.randint(1,100), round(random.uniform(10,500),2))time.sleep(0.5)

输出示例:

消费者组 order_processors 已存在[生产者]发布订单 ORD-1001: ID1680000001234-0[生产者]发布订单 ORD-1002: ID1680000001735-0...

3.3 消费者:处理订单并 ACK

每个消费者从组中读取新消息,模拟处理(如扣减库存),成功后 ACK。

def process_order(msg_id, msg_data):"""模拟订单处理:成功返回 True,失败返回 False""" order_id=msg_data.get('order_id','unknown')amount=float(msg_data.get('amount',0))print(f'[消费者] 处理订单 {order_id} 金额 {amount}')# 模拟处理:随机成功或失败(80% 成功)success=random.random()<0.8ifsuccess: print(f'[消费者] 订单 {order_id} 处理成功')else: print(f'[消费者] 订单 {order_id} 处理失败!')returnsuccess def start_consumer(consumer_name):"""启动一个消费者,持续读取并处理消息""" print(f'[消费者 {consumer_name}] 启动')whileTrue: try:# 读取新消息(>),每次最多 1 条,阻塞 2 秒result=r.xreadgroup(GROUP_NAME, consumer_name,{STREAM_KEY:'>'},count=1,block=2000)ifnot result:# 没有消息,尝试处理本消费者的 pending 消息pending=r.xpending(STREAM_KEY, GROUP_NAME)ifpending['pending']>0:# 读取自己的 pending 消息pending_msgs=r.xpending_range(STREAM_KEY, GROUP_NAME,min='-',max='+',count=1,consumername=consumer_name)ifpending_msgs:forpinpending_msgs: msg_id=p['message_id']# 重新获取消息内容msgs=r.xrange(STREAM_KEY,min=msg_id,max=msg_id)ifmsgs: msg_data=msgs[0][1]print(f'[消费者 {consumer_name}] 重试 pending 消息 {msg_id}')ifprocess_order(msg_id, msg_data): r.xack(STREAM_KEY, GROUP_NAME, msg_id)continuestream_name, messages=result[0]formsg_id, msg_datainmessages: print(f'[消费者 {consumer_name}] 收到消息 {msg_id}')ifprocess_order(msg_id, msg_data): r.xack(STREAM_KEY, GROUP_NAME, msg_id)else:# 处理失败,不 ACK,消息留在 pending 中# 后续可以由 XCLAIM 转移或手动重试pass except Exception as e: print(f'[消费者 {consumer_name}] 异常: {e}')time.sleep(1)# 启动消费者if__name__=='__main__':importsys consumer_name=sys.argv[1]iflen(sys.argv)>1else'consumer-1'start_consumer(consumer_name)

启动多个消费者终端:

python consumer.py consumer-A&python consumer.py consumer-B&

生产者发布消息后,消费者输出(示例):

[消费者 consumer-A]收到消息1680000001234-0[消费者]处理订单 ORD-1001 金额99.90[消费者]订单 ORD-1001 处理成功[消费者 consumer-B]收到消息1680000001735-0[消费者]处理订单 ORD-1002 金额250.00[消费者]订单 ORD-1002 处理失败!

注意:消息不会重复消费,A 和 B 竞争。

3.4 处理失败消息(死信队列与重试)

对于长时间 pending 的消息(消费者崩溃或处理失败),可以定时扫描,用XCLAIM转移给健康消费者,或超过最大重试次数后移入死信 Stream。

def recover_pending(stream_key, group_name,idle_ms=60000,max_retries=3):"""恢复空闲消息:将超时的 pending 消息转移给活跃消费者"""# 获取所有 pending 消息pending=r.xpending(stream_key, group_name)ifpending['pending']==0:return# 获取空闲超过 idle_ms 的消息claimed=r.xpending_range(stream_key, group_name,min='-',max='+',count=10)forpinclaimed: msg_id=p['message_id']# 检查重试次数(可存储在消息字段或额外 Redis key)msgs=r.xrange(stream_key,min=msg_id,max=msg_id)ifnot msgs:continuemsg_data=msgs[0][1]retry_count=int(msg_data.get('retry',0))ifretry_count>=max_retries:# 移入死信队列r.xadd(f'{stream_key}:dead', msg_data)r.xack(stream_key, group_name, msg_id)r.xdel(stream_key, msg_id)print(f'消息 {msg_id} 超过重试次数,移入死信队列')elifp['time_since_delivered']>=idle_ms:# XCLAIM 转移给恢复消费者r.xclaim(stream_key, group_name,'recovery_consumer', idle_ms, msg_id)print(f'消息 {msg_id} 被 recovery_consumer 接管')# 定时任务调用whileTrue: recover_pending('orders','order_processors',idle_ms=30000)time.sleep(10)

3.5 异步消费者(redis.asyncio)

在异步框架中,Stream 同样适用。

importasyncioimportredis.asyncio as aioredis async def async_consumer(consumer_name): r=await aioredis.from_url('redis://localhost',decode_responses=True)try: await r.xgroup_create('orders','async_group',id='0-0',mkstream=True)except: passwhileTrue: result=await r.xreadgroup('async_group', consumer_name,{'orders':'>'},count=1,block=2000)ifresult:formsg_id, msg_datainresult[0][1]: print(f'[异步 {consumer_name}] 处理 {msg_id}')await r.xack('orders','async_group', msg_id)await asyncio.sleep(0.1)asyncio.run(async_consumer('worker-1'))

4. Stream 高级特性

  • 消息裁剪XTRIM限制 Stream 长度,避免无限膨胀。XADD ... MAXLEN ~ 1000近似裁剪。

  • 消息范围查询XRANGE/XREVRANGE按 ID 范围查询历史消息。

  • 消费组删除XGROUP DESTROY orders group1

  • 监控XINFO STREAM orders查看 Stream 概览(长度、最后 ID 等)。

5. 常见误区与最佳实践

  • 别忘了 ACK:未 ACK 的消息会堆积在 pending 列表,占内存且影响消费进度。

  • 合理设置 Stream 长度:历史消息会一直保存,用MAXLEN控制容量。

  • 消费者组名全局唯一:不要把不同业务的消费组名重名。

  • XREADGROUP 的 > 用法>是读取新消息,0-0是读取历史,具体ID是读取未确认的。

  • 死信队列:生产必须设计重试上限和死信转移,避免无限重试阻塞队列。

6. 动手试试

  1. 搭建三消费者组:一个 Stream 创建两个消费者组groupAgroupB,每个组各有两个消费者,验证同一条消息会被两个组独立消费,组内竞争消费。

  2. 模拟消费者崩溃:消费者读到消息后不 ACK,然后杀掉进程,用XPENDINGXCLAIM将消息转移给另一个消费者。

  3. 死信队列:实现一个最多重试 2 次的处理逻辑,超过后移入dead:ordersStream,并定期巡检死信 Stream。

  4. 性能测试:生产者批量 XADD 10 万条消息,观察消费者组吞吐量,及XINFO STREAM长度变化。

预期效果:多组消费互不影响;崩溃消息自动转移;死信队列正确隔离失败消息;批量处理稳定。

7. 总结

Redis Stream 把“可靠消息队列”集成到 Redis 内核中,具备持久化、消费者组、ACK、消息回溯等专业 MQ 的核心特性,且保持了 Redis 的简单与高性能。相比独立的 Kafka/RabbitMQ,它更适合中轻量级异步任务、事件驱动架构,且复用现有 Redis 基础设施,大幅降低运维成本。

掌握 Stream,你就拥有了在 Redis 生态中构建可靠消息管道的能力。下一篇,我们将进入性能调优,用慢日志、基准测试、大 Key 优化等手段,把 Redis 的性能彻底榨干。

想了解更多还可以去各个平台搜索「IT策士」,一起升级 IT 思维 !

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/6/15 0:53:36

绝地求生罗技鼠标宏终极指南:5分钟实现完美压枪控制

绝地求生罗技鼠标宏终极指南&#xff1a;5分钟实现完美压枪控制 【免费下载链接】logitech-pubg PUBG no recoil script for Logitech gaming mouse / 绝地求生 罗技 鼠标宏 项目地址: https://gitcode.com/gh_mirrors/lo/logitech-pubg 还在为《绝地求生》中难以控制的…

作者头像 李华
网站建设 2026/6/15 0:53:00

3步完成AutoHotkey v1到v2脚本转换:告别繁琐手动迁移的实用指南

3步完成AutoHotkey v1到v2脚本转换&#xff1a;告别繁琐手动迁移的实用指南 【免费下载链接】AHK-v2-script-converter AHK v1 -> v2 script converter 项目地址: https://gitcode.com/gh_mirrors/ah/AHK-v2-script-converter 你是否还在为AutoHotkey v1脚本升级到v2…

作者头像 李华
网站建设 2026/6/15 0:36:08

FanControl终极指南:三步骤彻底解决Windows电脑散热噪音问题

FanControl终极指南&#xff1a;三步骤彻底解决Windows电脑散热噪音问题 【免费下载链接】FanControl.Releases This is the release repository for Fan Control, a highly customizable fan controlling software for Windows. 项目地址: https://gitcode.com/GitHub_Trend…

作者头像 李华
网站建设 2026/6/15 0:34:15

法考背诵资料pdf|背诵|资料已整理

法考背诵资料pdf|背诵|资料已整理资料全科都有法考背诵资料pdf 背诵 PDFhttps://tool.nineya.com/s/1jr0lk22e 【英语真题】1. The report shows that regular practice can improve reading speed. The word "regular" is closest in meaning to&#xff08; &…

作者头像 李华
网站建设 2026/6/15 0:33:02

机组风闸立式制动器ZL250-Q

机组风闸立式制动器ZL250-Q机组风闸立式制动器ZL250-QZL250-T制动器&#xff08;俗称风闸&#xff09;是水轮发电机组机械制动系统中的重要组成部分&#xff0c;机组制动系统由制动器、油气管路、手动和自动控制装置组成。ZL120-B/ZL160-Q/ZL250-T制动器的结构形式也是多种多样…

作者头像 李华
网站建设 2026/6/15 0:30:46

如何让老款Mac运行最新macOS?OpenCore Legacy Patcher完整指南

如何让老款Mac运行最新macOS&#xff1f;OpenCore Legacy Patcher完整指南 【免费下载链接】OpenCore-Legacy-Patcher Experience macOS just like before 项目地址: https://gitcode.com/GitHub_Trending/op/OpenCore-Legacy-Patcher 你是否还在使用被苹果官方"抛…

作者头像 李华