redis发布订阅是一种消息通知模式,发布者发送消息,订阅者接收消息。
| 角色 | 说明 |
|---|---|
| 发布者 (Publisher) | 向频道发送消息的客户端 |
| 订阅者 (Subscriber) | 订阅频道接收消息的客户端 |
| 频道 (Channel) | 消息传递的管道/主题 |
基本命令
1. 订阅频道
#订阅一个或多个频道 SUBSCRIBE 频道1 频道2 #使用模式匹配订阅 PSUBSCRIBE news.* # 订阅所有以 news. 开头的频道2. 取消订阅
# 退订指定频道 UNSUBSCRIBE channel1 # 退订模式匹配的频道 PUNSUBSCRIBE news.*3. 发布消息
# 向指定频道发布消息 PUBLISH channel1 "Hello, World!"4. 查看信息
# 查看活跃的频道 PUBSUB CHANNELS [pattern] # 查看频道的订阅者数量 PUBSUB NUMSUB channel1 # 查看模式订阅的数量 PUBSUB NUMPAT📝 代码示例
步骤:
连接到Redis服务器。
bool Redis::connect() { // 负责publish发布消息的上下文连接 _publish_context = redisConnect("127.0.0.1", 6379); if (nullptr == _publish_context) { cerr << "connect redis failed!" << endl; return false; } // 负责subscribe订阅消息的上下文连接 _subcribe_context = redisConnect("127.0.0.1", 6379); if (nullptr == _subcribe_context) { cerr << "connect redis failed!" << endl; return false; } // 在单独的线程中,监听通道上的事件,有消息给业务层进行上报 thread t([&]() { observer_channel_message(); }); t.detach(); cout << "connect redis-server success!" << endl; return true; }订阅者订阅一个或多个频道。
bool Redis::subscribe(int channel) { // SUBSCRIBE命令本身会造成线程阻塞等待通道里面发生消息,这里只做订阅通道,不接收通道消息 // 通道消息的接收专门在observer_channel_message函数中的独立线程中进行 // 只负责发送命令,不阻塞接收redis server响应消息,否则和notifyMsg线程抢占响应资源 if (REDIS_ERR == redisAppendCommand(this->_subcribe_context, "SUBSCRIBE %d", channel)) { cerr << "subscribe command failed!" << endl; return false; } // redisBufferWrite可以循环发送缓冲区,直到缓冲区数据发送完毕(done被置为1) int done = 0; while (!done) { if (REDIS_ERR == redisBufferWrite(this->_subcribe_context, &done)) { cerr << "subscribe command failed!" << endl; return false; } } // redisGetReply return true; }发布者向频道发布消息。
bool Redis::publish(int channel, string message) { redisReply *reply = (redisReply *)redisCommand(_publish_context, "PUBLISH %d %s", channel, message.c_str()); if (nullptr == reply) { cerr << "publish command failed!" << endl; return false; } freeReplyObject(reply); return true; }订阅者接收并处理消息。
void Redis::observer_channel_message() { redisReply *reply = nullptr; while (REDIS_OK == redisGetReply(this->_subcribe_context, (void **)&reply)) { // 订阅收到的消息是一个带三元素的数组 if (reply != nullptr && reply->element[2] != nullptr && reply->element[2]->str != nullptr) { // 给业务层上报通道上发生的消息 _notify_message_handler(atoi(reply->element[1]->str) , reply->element[2]->str); } freeReplyObject(reply); } cerr << ">>>>>>>>>>>>> observer_channel_message quit <<<<<<<<<<<<<" << endl; }
Redis 发布订阅原理深度解析
🏗️Redis 发布订阅架构总览
┌─────────────────────────────────────────────────────────┐ │ Redis Server │ │ │ │ ┌─────────────────────────────────────────────────┐ │ │ │ Pub/Sub 子系统 │ │ │ │ │ │ │ │ ┌────────────┐ ┌────────────┐ ┌────────────┐ │ │ │ │ │ Channel │ │ Pattern │ │ Clients │ │ │ │ │ │ Hash Table │ │ List │ │ List │ │ │ │ │ └─────┬──────┘ └─────┬──────┘ └─────┬──────┘ │ │ │ │ │ │ │ │ │ │ └────────┼───────────────┼───────────────┼────────┘ │ │ │ │ │ │ │ ┌──────▼──────┐ ┌──────▼──────┐ ┌──────▼──────┐ │ │ │ 普通订阅 │ │ 模式订阅 │ │ 客户端状态 │ │ │ └─────────────┘ └─────────────┘ └─────────────┘ │ └─────────────────────────────────────────────────────────┘
📚核心数据结构
1.频道订阅表
// Redis 源码中的数据结构(简化版) typedef struct redisServer { // 普通频道订阅:字典,键是频道名,值是客户端链表 dict *pubsub_channels; // Map<channel_name, List<client>> // 模式订阅:链表,每个节点包含模式和客户端链表 list *pubsub_patterns; // List<{pattern, List<client>}> // ... 其他字段 } redisServer;2.客户端订阅状态
typedef struct client { // 客户端订阅的频道列表 dict *pubsub_channels; // Set<channel_name> // 客户端订阅的模式列表 list *pubsub_patterns; // List<pattern> // ... 其他字段 } client;🔄消息传递流程
发布消息的完整流程
1. 客户端发送: PUBLISH channel "message" ↓ 2. Redis 接收命令并解析 ↓ 3. 查找 channel 的订阅者 ├── 3.1 在 pubsub_channels 字典中查找 channel ├── 3.2 获取该 channel 的客户端链表 └── 3.3 遍历链表,向每个客户端发送消息 ↓ 4. 查找模式匹配的订阅者 ├── 4.1 遍历 pubsub_patterns 链表 ├── 4.2 对每个 pattern,检查是否匹配 channel └── 4.3 向匹配的客户端发送消息 ↓ 5. 返回接收消息的客户端数量
源码级解析(基于 Redis 6.x)
// pubsub.c - 发布消息的核心函数 void publishCommand(client *c) { int receivers = pubsubPublishMessage(c->argv[1], c->argv[2]); addReplyLongLong(c, receivers); } int pubsubPublishMessage(robj *channel, robj *message) { int receivers = 0; // 1. 向普通频道订阅者发送 dictEntry *de = dictFind(server.pubsub_channels, channel); if (de) { list *list = dictGetVal(de); receivers += dictSize(list); // 遍历所有订阅该频道的客户端 dictIterator *di = dictGetIterator(list); dictEntry *de; while ((de = dictNext(di)) != NULL) { client *c = dictGetKey(de); addReplyPubsubMessage(c, channel, message); } dictReleaseIterator(di); } // 2. 向模式订阅者发送 if (listLength(server.pubsub_patterns)) { listIter li; listNode *ln; listRewind(server.pubsub_patterns, &li); while ((ln = listNext(&li)) != NULL) { pubsubPattern *pat = ln->value; // 检查频道是否匹配模式 if (stringmatchlen(pat->pattern->ptr, sdslen(pat->pattern->ptr), channel->ptr, sdslen(channel->ptr),0)) { addReplyPubsubPatMessage(pat->client, pat->pattern, channel, message); receivers++; } } } return receivers; }🔍订阅机制详解
1. 普通订阅流程
// 客户端订阅频道的处理 void subscribeCommand(client *c) { for (int j = 1; j < c->argc; j++) { pubsubSubscribeChannel(c, c->argv[j]); } } void pubsubSubscribeChannel(client *c, robj *channel) { // 1. 将频道添加到客户端的订阅集合 if (dictAdd(c->pubsub_channels, channel, NULL) == DICT_OK) { incrRefCount(channel); // 2. 将客户端添加到服务器的频道订阅列表 dictEntry *de = dictFind(server.pubsub_channels, channel); if (de == NULL) { // 频道不存在,创建新的客户端列表 clients = dictCreate(&pubsubDictType); dictAdd(server.pubsub_channels, channel, clients); incrRefCount(channel); } else { clients = dictGetVal(de); } // 3. 将客户端添加到频道的订阅者列表 dictAdd(clients, c, NULL); } }2. 模式订阅流程
// 客户端模式订阅的处理 void psubscribeCommand(client *c) { for (int j = 1; j < c->argc; j++) { pubsubSubscribePattern(c, c->argv[j]); } } void pubsubSubscribePattern(client *c, robj *pattern) { // 1. 检查客户端是否已经订阅了此模式 listIter li; listNode *ln; listRewind(c->pubsub_patterns, &li); while ((ln = listNext(&li)) != NULL) { pubsubPattern *pat = ln->value; if (equalStringObjects(pattern, pat->pattern)) { return; // 已订阅 } } // 2. 创建新的模式订阅节点 pubsubPattern *pat; pat = zmalloc(sizeof(*pat)); pat->pattern = getDecodedObject(pattern); pat->client = c; // 3. 添加到客户端的模式列表 listAddNodeTail(c->pubsub_patterns, pat); // 4. 添加到服务器的模式列表 listAddNodeTail(server.pubsub_patterns, pat); }💾内存数据结构图示
普通订阅数据结构
server.pubsub_channels (字典) | ├── "news" → dict(clients) │ ├── client1 │ ├── client2 │ └── client3 │ ├── "sports" → dict(clients) │ ├── client1 │ └── client4 │ └── "weather" → dict(clients) └── client2 client.pubsub_channels (字典,客户端视角) | ├── client1: {"news", "sports"} ├── client2: {"news", "weather"} ├── client3: {"news"} └── client4: {"sports"}模式订阅数据结构
server.pubsub_patterns (链表) | ├── node1: {pattern="news.*", client=client1} ├── node2: {pattern="*.sports", client=client2} ├── node3: {pattern="weather.*", client=client3} └── node4: {pattern="news.*", client=client4} client.pubsub_patterns (链表,客户端视角) | ├── client1: ["news.*"] ├── client2: ["*.sports"] ├── client3: ["weather.*"] └── client4: ["news.*"]🚀高性能设计要点
1. 零拷贝消息传递
// Redis 使用引用计数,避免消息内容的复制 void addReplyPubsubMessage(client *c, robj *channel, robj *message) { // 增加引用计数,而不是复制消息内容 incrRefCount(channel); incrRefCount(message); // 构建响应但不复制数据 addReply(c, shared.mbulkhdr[3]); addReply(c, shared.messagebulk); addReplyBulk(c, channel); addReplyBulk(c, message); // 减少引用计数 decrRefCount(channel); decrRefCount(message); }2. 批量消息发送优化
// 当多个客户端订阅同一频道时,优化网络发送 for (每个订阅者客户端) { if (clientHasPendingReplies(c)) { // 如果客户端输出缓冲区已有数据,先发送 writeToClient(c, 0); } // 添加新消息到缓冲区 addReplyPubsubMessage(c, channel, message); // 非阻塞发送尝试 if (clientHasPendingReplies(c)) { writeToClient(c, 0); } }📊内存与性能统计
PUBSUB 命令实现
// PUBSUB CHANNELS [pattern] - 获取活跃频道 void pubsubChannelsCommand(client *c) { dictIterator *di; dictEntry *de; long mblen = 0; void *replylen = NULL; // 遍历所有频道 di = dictGetIterator(server.pubsub_channels); while((de = dictNext(di)) != NULL) { robj *cobj = dictGetKey(de); char *channel = cobj->ptr; // 如果有模式参数,进行匹配 if (pattern && !stringmatchlen(pattern, patlen, channel, clen, 0)) continue; addReplyBulk(c, cobj); } dictReleaseIterator(di); } // PUBSUB NUMSUB [channel ...] - 获取订阅者数量 // PUBSUB NUMPAT - 获取模式订阅数量⚠️重要限制与边界条件
1. 消息不持久化
// Redis 不会将发布的消息写入磁盘 // 消息只在内存中传递 // 服务器重启或崩溃会导致消息丢失
2. 无消息确认机制
发布者 → Redis → 订阅者 发送 接收 ↓ ↓ 无确认 ←───── 无ACK
3. 客户端缓冲区限制
// 每个客户端有输出缓冲区限制 #define PROTO_REPLY_CHUNK_BYTES (16*1024) // 16KB 块 // 如果订阅者处理速度慢,缓冲区可能满 // Redis 会断开慢客户端连接 if (client->bufpos >= client->buf_soft_limit) { freeClient(client); }4. 订阅数量限制
// 理论上无限制,但受内存限制 // 每个订阅在内存中占用: // - 频道名:共享字符串对象 // - 客户端指针:8字节 // - 字典条目开销:~24字节
🔄与 Redis Streams 的对比
| 特性 | Pub/Sub | Streams |
|---|---|---|
| 消息持久化 | ❌ 不持久化 | ✅ 可持久化 |
| 消费者组 | ❌ 不支持 | ✅ 支持 |
| 消息确认 | ❌ 不支持 | ✅ 支持 |
| 历史消息 | ❌ 不保留 | ✅ 可回溯 |
| 性能 | ⭐⭐⭐⭐⭐ 极高 | ⭐⭐⭐⭐ 高 |
| 内存使用 | 临时内存 | 持久内存 |
🎯设计哲学
1. 简单性优于复杂性
// Redis 选择简单的广播模型,而不是复杂的消息队列 // 设计目标:高吞吐量、低延迟
2. 内存速度优先
// 所有操作在内存中完成 // 使用高效的数据结构(字典、链表) // 避免磁盘I/O和复杂的事务
3. 客户端驱动模型
// Redis 不主动推送,而是响应客户端命令 // 但 Pub/Sub 是例外,服务器主动向订阅者推送