news 2026/1/22 3:12:52

redis 发布订阅功能

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
redis 发布订阅功能

redis发布订阅是一种消息通知模式,发布者发送消息,订阅者接收消息。

角色说明
发布者 (Publisher)向频道发送消息的客户端
订阅者 (Subscriber)订阅频道接收消息的客户端
频道 (Channel)

消息传递的管道/主题

基本命令

1. 订阅频道

#订阅一个或多个频道 SUBSCRIBE 频道1 频道2 #使用模式匹配订阅 PSUBSCRIBE news.* # 订阅所有以 news. 开头的频道

2. 取消订阅

# 退订指定频道 UNSUBSCRIBE channel1 # 退订模式匹配的频道 PUNSUBSCRIBE news.*

3. 发布消息

# 向指定频道发布消息 PUBLISH channel1 "Hello, World!"

4. 查看信息

# 查看活跃的频道 PUBSUB CHANNELS [pattern] # 查看频道的订阅者数量 PUBSUB NUMSUB channel1 # 查看模式订阅的数量 PUBSUB NUMPAT

📝 代码示例

步骤:

  1. 连接到Redis服务器。

    bool Redis::connect() { // 负责publish发布消息的上下文连接 _publish_context = redisConnect("127.0.0.1", 6379); if (nullptr == _publish_context) { cerr << "connect redis failed!" << endl; return false; } // 负责subscribe订阅消息的上下文连接 _subcribe_context = redisConnect("127.0.0.1", 6379); if (nullptr == _subcribe_context) { cerr << "connect redis failed!" << endl; return false; } // 在单独的线程中,监听通道上的事件,有消息给业务层进行上报 thread t([&]() { observer_channel_message(); }); t.detach(); cout << "connect redis-server success!" << endl; return true; }
  2. 订阅者订阅一个或多个频道。

    bool Redis::subscribe(int channel) { // SUBSCRIBE命令本身会造成线程阻塞等待通道里面发生消息,这里只做订阅通道,不接收通道消息 // 通道消息的接收专门在observer_channel_message函数中的独立线程中进行 // 只负责发送命令,不阻塞接收redis server响应消息,否则和notifyMsg线程抢占响应资源 if (REDIS_ERR == redisAppendCommand(this->_subcribe_context, "SUBSCRIBE %d", channel)) { cerr << "subscribe command failed!" << endl; return false; } // redisBufferWrite可以循环发送缓冲区,直到缓冲区数据发送完毕(done被置为1) int done = 0; while (!done) { if (REDIS_ERR == redisBufferWrite(this->_subcribe_context, &done)) { cerr << "subscribe command failed!" << endl; return false; } } // redisGetReply return true; }
  3. 发布者向频道发布消息。

    bool Redis::publish(int channel, string message) { redisReply *reply = (redisReply *)redisCommand(_publish_context, "PUBLISH %d %s", channel, message.c_str()); if (nullptr == reply) { cerr << "publish command failed!" << endl; return false; } freeReplyObject(reply); return true; }
  4. 订阅者接收并处理消息。

    void Redis::observer_channel_message() { redisReply *reply = nullptr; while (REDIS_OK == redisGetReply(this->_subcribe_context, (void **)&reply)) { // 订阅收到的消息是一个带三元素的数组 if (reply != nullptr && reply->element[2] != nullptr && reply->element[2]->str != nullptr) { // 给业务层上报通道上发生的消息 _notify_message_handler(atoi(reply->element[1]->str) , reply->element[2]->str); } freeReplyObject(reply); } cerr << ">>>>>>>>>>>>> observer_channel_message quit <<<<<<<<<<<<<" << endl; }

Redis 发布订阅原理深度解析

🏗️Redis 发布订阅架构总览

┌─────────────────────────────────────────────────────────┐ │ Redis Server │ │ │ │ ┌─────────────────────────────────────────────────┐ │ │ │ Pub/Sub 子系统 │ │ │ │ │ │ │ │ ┌────────────┐ ┌────────────┐ ┌────────────┐ │ │ │ │ │ Channel │ │ Pattern │ │ Clients │ │ │ │ │ │ Hash Table │ │ List │ │ List │ │ │ │ │ └─────┬──────┘ └─────┬──────┘ └─────┬──────┘ │ │ │ │ │ │ │ │ │ │ └────────┼───────────────┼───────────────┼────────┘ │ │ │ │ │ │ │ ┌──────▼──────┐ ┌──────▼──────┐ ┌──────▼──────┐ │ │ │ 普通订阅 │ │ 模式订阅 │ │ 客户端状态 │ │ │ └─────────────┘ └─────────────┘ └─────────────┘ │ └─────────────────────────────────────────────────────────┘

📚核心数据结构

1.频道订阅表

// Redis 源码中的数据结构(简化版) typedef struct redisServer { // 普通频道订阅:字典,键是频道名,值是客户端链表 dict *pubsub_channels; // Map<channel_name, List<client>> // 模式订阅:链表,每个节点包含模式和客户端链表 list *pubsub_patterns; // List<{pattern, List<client>}> // ... 其他字段 } redisServer;

2.客户端订阅状态

typedef struct client { // 客户端订阅的频道列表 dict *pubsub_channels; // Set<channel_name> // 客户端订阅的模式列表 list *pubsub_patterns; // List<pattern> // ... 其他字段 } client;

🔄消息传递流程

发布消息的完整流程

1. 客户端发送: PUBLISH channel "message" ↓ 2. Redis 接收命令并解析 ↓ 3. 查找 channel 的订阅者 ├── 3.1 在 pubsub_channels 字典中查找 channel ├── 3.2 获取该 channel 的客户端链表 └── 3.3 遍历链表,向每个客户端发送消息 ↓ 4. 查找模式匹配的订阅者 ├── 4.1 遍历 pubsub_patterns 链表 ├── 4.2 对每个 pattern,检查是否匹配 channel └── 4.3 向匹配的客户端发送消息 ↓ 5. 返回接收消息的客户端数量

源码级解析(基于 Redis 6.x)

// pubsub.c - 发布消息的核心函数 void publishCommand(client *c) { int receivers = pubsubPublishMessage(c->argv[1], c->argv[2]); addReplyLongLong(c, receivers); } int pubsubPublishMessage(robj *channel, robj *message) { int receivers = 0; // 1. 向普通频道订阅者发送 dictEntry *de = dictFind(server.pubsub_channels, channel); if (de) { list *list = dictGetVal(de); receivers += dictSize(list); // 遍历所有订阅该频道的客户端 dictIterator *di = dictGetIterator(list); dictEntry *de; while ((de = dictNext(di)) != NULL) { client *c = dictGetKey(de); addReplyPubsubMessage(c, channel, message); } dictReleaseIterator(di); } // 2. 向模式订阅者发送 if (listLength(server.pubsub_patterns)) { listIter li; listNode *ln; listRewind(server.pubsub_patterns, &li); while ((ln = listNext(&li)) != NULL) { pubsubPattern *pat = ln->value; // 检查频道是否匹配模式 if (stringmatchlen(pat->pattern->ptr, sdslen(pat->pattern->ptr), channel->ptr, sdslen(channel->ptr),0)) { addReplyPubsubPatMessage(pat->client, pat->pattern, channel, message); receivers++; } } } return receivers; }

🔍订阅机制详解

1. 普通订阅流程

// 客户端订阅频道的处理 void subscribeCommand(client *c) { for (int j = 1; j < c->argc; j++) { pubsubSubscribeChannel(c, c->argv[j]); } } void pubsubSubscribeChannel(client *c, robj *channel) { // 1. 将频道添加到客户端的订阅集合 if (dictAdd(c->pubsub_channels, channel, NULL) == DICT_OK) { incrRefCount(channel); // 2. 将客户端添加到服务器的频道订阅列表 dictEntry *de = dictFind(server.pubsub_channels, channel); if (de == NULL) { // 频道不存在,创建新的客户端列表 clients = dictCreate(&pubsubDictType); dictAdd(server.pubsub_channels, channel, clients); incrRefCount(channel); } else { clients = dictGetVal(de); } // 3. 将客户端添加到频道的订阅者列表 dictAdd(clients, c, NULL); } }

2. 模式订阅流程

// 客户端模式订阅的处理 void psubscribeCommand(client *c) { for (int j = 1; j < c->argc; j++) { pubsubSubscribePattern(c, c->argv[j]); } } void pubsubSubscribePattern(client *c, robj *pattern) { // 1. 检查客户端是否已经订阅了此模式 listIter li; listNode *ln; listRewind(c->pubsub_patterns, &li); while ((ln = listNext(&li)) != NULL) { pubsubPattern *pat = ln->value; if (equalStringObjects(pattern, pat->pattern)) { return; // 已订阅 } } // 2. 创建新的模式订阅节点 pubsubPattern *pat; pat = zmalloc(sizeof(*pat)); pat->pattern = getDecodedObject(pattern); pat->client = c; // 3. 添加到客户端的模式列表 listAddNodeTail(c->pubsub_patterns, pat); // 4. 添加到服务器的模式列表 listAddNodeTail(server.pubsub_patterns, pat); }

💾内存数据结构图示

普通订阅数据结构

server.pubsub_channels (字典) | ├── "news" → dict(clients) │ ├── client1 │ ├── client2 │ └── client3 │ ├── "sports" → dict(clients) │ ├── client1 │ └── client4 │ └── "weather" → dict(clients) └── client2 client.pubsub_channels (字典,客户端视角) | ├── client1: {"news", "sports"} ├── client2: {"news", "weather"} ├── client3: {"news"} └── client4: {"sports"}

模式订阅数据结构

server.pubsub_patterns (链表) | ├── node1: {pattern="news.*", client=client1} ├── node2: {pattern="*.sports", client=client2} ├── node3: {pattern="weather.*", client=client3} └── node4: {pattern="news.*", client=client4} client.pubsub_patterns (链表,客户端视角) | ├── client1: ["news.*"] ├── client2: ["*.sports"] ├── client3: ["weather.*"] └── client4: ["news.*"]

🚀高性能设计要点

1. 零拷贝消息传递

// Redis 使用引用计数,避免消息内容的复制 void addReplyPubsubMessage(client *c, robj *channel, robj *message) { // 增加引用计数,而不是复制消息内容 incrRefCount(channel); incrRefCount(message); // 构建响应但不复制数据 addReply(c, shared.mbulkhdr[3]); addReply(c, shared.messagebulk); addReplyBulk(c, channel); addReplyBulk(c, message); // 减少引用计数 decrRefCount(channel); decrRefCount(message); }

2. 批量消息发送优化

// 当多个客户端订阅同一频道时,优化网络发送 for (每个订阅者客户端) { if (clientHasPendingReplies(c)) { // 如果客户端输出缓冲区已有数据,先发送 writeToClient(c, 0); } // 添加新消息到缓冲区 addReplyPubsubMessage(c, channel, message); // 非阻塞发送尝试 if (clientHasPendingReplies(c)) { writeToClient(c, 0); } }

📊内存与性能统计

PUBSUB 命令实现

// PUBSUB CHANNELS [pattern] - 获取活跃频道 void pubsubChannelsCommand(client *c) { dictIterator *di; dictEntry *de; long mblen = 0; void *replylen = NULL; // 遍历所有频道 di = dictGetIterator(server.pubsub_channels); while((de = dictNext(di)) != NULL) { robj *cobj = dictGetKey(de); char *channel = cobj->ptr; // 如果有模式参数,进行匹配 if (pattern && !stringmatchlen(pattern, patlen, channel, clen, 0)) continue; addReplyBulk(c, cobj); } dictReleaseIterator(di); } // PUBSUB NUMSUB [channel ...] - 获取订阅者数量 // PUBSUB NUMPAT - 获取模式订阅数量

⚠️重要限制与边界条件

1. 消息不持久化

// Redis 不会将发布的消息写入磁盘 // 消息只在内存中传递 // 服务器重启或崩溃会导致消息丢失

2. 无消息确认机制

发布者 → Redis → 订阅者 发送 接收 ↓ ↓ 无确认 ←───── 无ACK

3. 客户端缓冲区限制

// 每个客户端有输出缓冲区限制 #define PROTO_REPLY_CHUNK_BYTES (16*1024) // 16KB 块 // 如果订阅者处理速度慢,缓冲区可能满 // Redis 会断开慢客户端连接 if (client->bufpos >= client->buf_soft_limit) { freeClient(client); }

4. 订阅数量限制

// 理论上无限制,但受内存限制 // 每个订阅在内存中占用: // - 频道名:共享字符串对象 // - 客户端指针:8字节 // - 字典条目开销:~24字节

🔄与 Redis Streams 的对比

特性Pub/SubStreams
消息持久化❌ 不持久化✅ 可持久化
消费者组❌ 不支持✅ 支持
消息确认❌ 不支持✅ 支持
历史消息❌ 不保留✅ 可回溯
性能⭐⭐⭐⭐⭐ 极高⭐⭐⭐⭐ 高
内存使用临时内存持久内存

🎯设计哲学

1. 简单性优于复杂性

// Redis 选择简单的广播模型,而不是复杂的消息队列 // 设计目标:高吞吐量、低延迟

2. 内存速度优先

// 所有操作在内存中完成 // 使用高效的数据结构(字典、链表) // 避免磁盘I/O和复杂的事务

3. 客户端驱动模型

// Redis 不主动推送,而是响应客户端命令 // 但 Pub/Sub 是例外,服务器主动向订阅者推送
版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/1/15 3:45:32

【保姆级教程】10分钟轻松搭建属于自己的AI助手

想拥有专属AI助手&#xff0c;无需复杂编码&#xff0c;无需漫长等待&#xff01;借助扣子平台的可视化能力&#xff0c;搭配数眼智能搜索与网页阅读接口&#xff0c;全程免费&#xff01;10分钟就能快速搭建完成&#xff0c;轻松实现精准信息检索与网页内容解析。本文为你带来…

作者头像 李华
网站建设 2026/1/22 9:21:14

3D模型生成终极指南:腾讯Hunyuan3D-2mini轻量化技术深度解析

还在为复杂的3D建模软件发愁吗&#xff1f;专业建模师需要花费数小时完成的工作&#xff0c;现在普通人只需输入文字描述&#xff0c;30秒内就能获得完整的3D模型。腾讯最新开源的Hunyuan3D-2mini模型&#xff0c;以仅0.6B的参数规模&#xff0c;实现了前所未有的"轻量高速…

作者头像 李华
网站建设 2026/1/21 6:47:38

Kubernetes Dashboard可视化监控:从架构原理到生产实践

在Kubernetes集群运维中&#xff0c;命令行工具虽然功能强大但学习曲线陡峭&#xff0c;而Dashboard作为官方提供的Web管理界面&#xff0c;通过直观的可视化方式降低了操作门槛。本文将深入解析Dashboard的部署架构、安全认证机制和实际应用场景&#xff0c;帮助您构建可靠的可…

作者头像 李华
网站建设 2026/1/22 9:25:42

基于DWS MCP Server搭建数据分析Agent

本文分享自华为云社区《基于DWS MCP Server搭建数据分析Agent》 1. 前言 MCP&#xff08;Model Context Protocol&#xff09;是由Anthropic于2024年11月提出的开放协议标准&#xff0c;旨在解决大型语言模型与外部系统&#xff08;如数据库、API&#xff09;交互的碎片化问题。…

作者头像 李华
网站建设 2026/1/22 0:51:57

兰州失控车辆证明科技已偷走车辆的控制权,黑客入侵会如何?

兰州失控车辆以115公里时速狂奔4个多小时&#xff0c;直到燃油耗尽才将车辆停下&#xff0c;证明了电子控制系统的不可靠&#xff0c;那么那些已赋予智驾更多控制权的车辆呢&#xff1f;想想都觉得后背发凉&#xff0c;事实证明科技无法为人类提供足够的安全保障&#xff01;在…

作者头像 李华