基于Linux内核链表和线程的邮箱系统完整实现
一、项目概述与架构
这是一个基于Linux内核风格双向链表实现的线程间通信邮箱系统。系统采用生产者-消费者模型,使用队列作为消息缓冲区,支持多线程间的异步消息传递。
系统架构图
┌─────────────────────────────────────────────┐ │ 邮箱系统 (MBS) │ │ ┌─────────────────────────────────────┐ │ │ │ 线程链表 (list_head) │ │ │ │ ┌─────┐ ┌─────┐ ┌─────┐ ... │ │ │ │ │线程1│←→│线程2│←→│线程3│←→... │ │ │ │ └─────┘ └─────┘ └─────┘ │ │ │ │ │ │ │ │ │ │ │ ┌──┴──┐ ┌──┴──┐ ┌──┴──┐ │ │ │ │ │队列1│ │队列2│ │队列3│ │ │ │ │ └─────┘ └─────┘ └─────┘ │ │ │ └─────────────────────────────────────┘ │ │ ↑ ↑ │ │ │ │ │ │ send_msg() recv_msg() │ │ │ │ │ │ ┌───┴──┐ ┌───┴──┐ │ │ │生产者│ │消费者│ │ │ └──────┘ └──────┘ │ └─────────────────────────────────────────────┘
二、完整代码实现
1. 链表头文件list.h
#ifndef _LINUX_LIST_H #define _LINUX_LIST_H #include <linux/stddef.h> #include <stdio.h> #define LIST_POISON1 ((void *)0) #define LIST_POISON2 ((void *)0) // 计算结构体成员的偏移量 #define offsetof(TYPE, MEMBER) ((size_t) & ((TYPE *)0)->MEMBER) // 通过成员指针获取结构体指针 #define container_of(ptr, type, member) \ ({ \ const typeof(((type *)0)->member) *__mptr = (ptr); \ (type *)((char *)__mptr - offsetof(type, member)); \ }) // 双向链表节点 struct list_head { struct list_head *next, *prev; }; // 链表头初始化宏 #define LIST_HEAD_INIT(name) { &(name), &(name) } #define LIST_HEAD(name) struct list_head name = LIST_HEAD_INIT(name) // 初始化链表头 static inline void INIT_LIST_HEAD(struct list_head *list) { list->next = list; list->prev = list; } // 内部链表添加函数 static inline void __list_add(struct list_head *new, struct list_head *prev, struct list_head *next) { next->prev = new; new->next = next; new->prev = prev; prev->next = new; } // 在头部添加节点(适合实现栈) static inline void list_add(struct list_head *new, struct list_head *head) { __list_add(new, head, head->next); } // 在尾部添加节点(适合实现队列) static inline void list_add_tail(struct list_head *new, struct list_head *head) { __list_add(new, head->prev, head); } // 内部链表删除函数 static inline void __list_del(struct list_head *prev, struct list_head *next) { next->prev = prev; prev->next = next; } // 删除节点 static inline void list_del(struct list_head *entry) { __list_del(entry->prev, entry->next); entry->next = LIST_POISON1; entry->prev = LIST_POISON2; } // 删除并重新初始化节点 static inline void list_del_init(struct list_head *entry) { __list_del(entry->prev, entry->next); INIT_LIST_HEAD(entry); } // 检查链表是否为空 static inline int list_empty(const struct list_head *head) { return head->next == head; } // 检查是否是链表的最后一个节点 static inline int list_is_last(const struct list_head *list, const struct list_head *head) { return list->next == head; } // 检查链表是否只有一个节点 static inline int list_is_singular(const struct list_head *head) { return !list_empty(head) && (head->next == head->prev); } // 替换节点 static inline void list_replace(struct list_head *old, struct list_head *new) { new->next = old->next; new->next->prev = new; new->prev = old->prev; new->prev->next = new; } // 移动节点到另一个链表的头部 static inline void list_move(struct list_head *list, struct list_head *head) { __list_del(list->prev, list->next); list_add(list, head); } // 移动节点到另一个链表的尾部 static inline void list_move_tail(struct list_head *list, struct list_head *head) { __list_del(list->prev, list->next); list_add_tail(list, head); } // 连接两个链表 static inline void __list_splice(const struct list_head *list, struct list_head *prev, struct list_head *next) { struct list_head *first = list->next; struct list_head *last = list->prev; first->prev = prev; prev->next = first; last->next = next; next->prev = last; } // 将list链表连接到head后面 static inline void list_splice(const struct list_head *list, struct list_head *head) { if (!list_empty(list)) __list_splice(list, head, head->next); } // 将list链表连接到head前面 static inline void list_splice_tail(struct list_head *list, struct list_head *head) { if (!list_empty(list)) __list_splice(list, head->prev, head); } // ================== 遍历宏 ================== // 基本遍历(不安全,遍历时不能删除节点) #define list_for_each(pos, head) \ for (pos = (head)->next; pos != (head); pos = pos->next) // 安全遍历(遍历时可以删除节点) #define list_for_each_safe(pos, n, head) \ for (pos = (head)->next, n = pos->next; pos != (head); \ pos = n, n = pos->next) // 反向安全遍历 #define list_for_each_prev_safe(pos, n, head) \ for (pos = (head)->prev, n = pos->prev; \ pos != (head); pos = n, n = pos->prev) // 通过链表节点获取包含它的结构体 #define list_entry(ptr, type, member) container_of(ptr, type, member) // 获取链表的第一个元素 #define list_first_entry(ptr, type, member) \ list_entry((ptr)->next, type, member) // 遍历包含链表节点的结构体 #define list_for_each_entry(pos, head, member) \ for (pos = list_entry((head)->next, typeof(*pos), member); \ &pos->member != (head); \ pos = list_entry(pos->member.next, typeof(*pos), member)) // 安全遍历包含链表节点的结构体(可删除) #define list_for_each_entry_safe(pos, n, head, member) \ for (pos = list_entry((head)->next, typeof(*pos), member), \ n = list_entry(pos->member.next, typeof(*pos), member); \ &pos->member != (head); \ pos = n, n = list_entry(n->member.next, typeof(*n), member)) // 从当前点继续遍历 #define list_for_each_entry_continue(pos, head, member) \ for (pos = list_entry(pos->member.next, typeof(*pos), member); \ &pos->member != (head); \ pos = list_entry(pos->member.next, typeof(*pos), member)) // 反向遍历包含链表节点的结构体 #define list_for_each_entry_reverse(pos, head, member) \ for (pos = list_entry((head)->prev, typeof(*pos), member); \ &pos->member != (head); \ pos = list_entry(pos->member.prev, typeof(*pos), member)) // ================== 哈希链表 ================== // 哈希链表头(单指针) struct hlist_head { struct hlist_node *first; }; // 哈希链表节点 struct hlist_node { struct hlist_node *next, **pprev; }; // 初始化哈希链表头 #define INIT_HLIST_HEAD(ptr) ((ptr)->first = NULL) // 初始化哈希链表节点 static inline void INIT_HLIST_NODE(struct hlist_node *h) { h->next = NULL; h->pprev = NULL; } // 检查哈希链表是否为空 static inline int hlist_empty(const struct hlist_head *h) { return !h->first; } // 检查哈希链表节点是否未链接 static inline int hlist_unhashed(const struct hlist_node *h) { return !h->pprev; } // 删除哈希链表节点 static inline void hlist_del(struct hlist_node *n) { struct hlist_node *next = n->next; struct hlist_node **pprev = n->pprev; *pprev = next; if (next) next->pprev = pprev; n->next = LIST_POISON1; n->pprev = LIST_POISON2; } // 删除并重新初始化哈希链表节点 static inline void hlist_del_init(struct hlist_node *n) { if (!hlist_unhashed(n)) { hlist_del(n); INIT_HLIST_NODE(n); } } // 在哈希链表头部添加节点 static inline void hlist_add_head(struct hlist_node *n, struct hlist_head *h) { struct hlist_node *first = h->first; n->next = first; if (first) first->pprev = &n->next; h->first = n; n->pprev = &h->first; } // 哈希链表遍历宏 #define hlist_for_each(pos, head) \ for (pos = (head)->first; pos; pos = pos->next) // 哈希链表安全遍历宏 #define hlist_for_each_safe(pos, n, head) \ for (pos = (head)->first; pos && ({ \ n = pos->next; \ 1; \ }); \ pos = n) // 遍历哈希链表获取包含结构体 #define hlist_for_each_entry(tpos, pos, head, member) \ for (pos = (head)->first; pos && ({ \ tpos = \ hlist_entry(pos, typeof(*tpos), member); \ 1; \ }); \ pos = pos->next) // 哈希链表安全遍历获取包含结构体 #define hlist_for_each_entry_safe(tpos, pos, n, head, member) \ for (pos = (head)->first; pos && ({ \ n = pos->next; \ 1; \ }) && \ ({ \ tpos = \ hlist_entry(pos, typeof(*tpos), member); \ 1; \ }); \ pos = n) #endif /* _LINUX_LIST_H */2. 队列头文件linkque.h
#ifndef _LINKQUE_H_ #define _LINKQUE_H_ #include <pthread.h> // 邮件数据结构 typedef struct mail_data { pthread_t id_of_sender; // 发送者线程ID char name_of_sender[256]; // 发送者名称 pthread_t id_of_recver; // 接收者线程ID char name_of_recver[256]; // 接收者名称 char data[256]; // 消息内容 } MAIL_DATA; typedef MAIL_DATA DATATYPE; // 队列数据类型 // 队列节点结构 typedef struct quenode { DATATYPE data; // 数据 struct quenode *next; // 下一个节点指针 } LinkQueNode; // 队列结构 typedef struct _linkque { LinkQueNode *head; // 队头指针 LinkQueNode *tail; // 队尾指针 int clen; // 当前队列长度 } LinkQue; // ================== 队列操作接口 ================== // 创建队列 LinkQue *CreateLinkQue(); // 入队操作 int EnterLinkQue(LinkQue *lq, DATATYPE *newdata); // 出队操作 int QuitLinkQue(LinkQue *lq); // 获取队头元素(不删除) DATATYPE *GetHeadLinkQue(LinkQue *lq); // 获取队列大小 int GetSizeLinkQue(LinkQue *lq); // 检查队列是否为空 int IsEmptyLinkQue(LinkQue *lq); // 销毁队列 int DestroyLinkQue(LinkQue *lq); #endif /* _LINKQUE_H_ */3. 队列实现linkque.c
#include "linkque.h" #include <stdio.h> #include <stdlib.h> #include <string.h> // 创建空队列 LinkQue *CreateLinkQue() { LinkQue *lq = (LinkQue *)malloc(sizeof(LinkQue)); if (NULL == lq) { printf("CreateLinkQue malloc error\n"); return NULL; } // 初始化队列 lq->head = NULL; lq->tail = NULL; lq->clen = 0; return lq; } // 元素入队 int EnterLinkQue(LinkQue *lq, DATATYPE *newdata) { // 创建新节点 LinkQueNode *newnode = (LinkQueNode *)malloc(sizeof(LinkQueNode)); if (NULL == newnode) { printf("EnterLinkQue malloc error\n"); return 1; } // 复制数据到新节点 memcpy(&newnode->data, newdata, sizeof(DATATYPE)); newnode->next = NULL; // 如果队列为空,新节点同时是队头和队尾 if (IsEmptyLinkQue(lq)) { lq->head = newnode; lq->tail = newnode; } else { // 否则添加到队尾 lq->tail->next = newnode; lq->tail = newnode; } lq->clen++; // 队列长度加1 return 0; } // 元素出队 int QuitLinkQue(LinkQue *lq) { if (IsEmptyLinkQue(lq)) { return 1; // 队列为空,出队失败 } LinkQueNode *tmp = lq->head; // 保存队头节点 lq->head = lq->head->next; // 队头指向下一个节点 // 如果队列变空,队尾也要置空 if (NULL == lq->head) { lq->tail = NULL; } free(tmp); // 释放原队头节点 lq->clen--; // 队列长度减1 return 0; } // 获取队头元素(不删除) DATATYPE *GetHeadLinkQue(LinkQue *lq) { if (IsEmptyLinkQue(lq)) { return NULL; // 队列为空,返回NULL } return &lq->head->data; // 返回队头数据指针 } // 获取队列当前大小 int GetSizeLinkQue(LinkQue *lq) { return lq->clen; } // 检查队列是否为空 int IsEmptyLinkQue(LinkQue *lq) { return 0 == lq->clen; } // 销毁队列 int DestroyLinkQue(LinkQue *lq) { int size = GetSizeLinkQue(lq); int i = 0; // 循环出队所有元素 for (i = 0; i < size; i++) { QuitLinkQue(lq); } free(lq); // 释放队列结构体 return 0; }4. 邮箱系统头文件mailbox.h
#ifndef _MAILBOX_H_ #define _MAILBOX_H_ #include <pthread.h> #include "linkque.h" #include "list.h" // 线程函数指针类型 typedef void* (*th_fun)(void* arg); // 线程节点结构 typedef struct thread_node { pthread_t tid; // 线程ID char name[256]; // 线程名称(必须唯一) LinkQue* lq; // 该线程对应的消息队列 th_fun th; // 线程函数 struct list_head node; // 链表节点 } LIST_DATA; // 邮箱系统结构 typedef struct mail_box_system { pthread_mutex_t mutex; // 保护邮件系统的互斥锁 struct list_head head; // 线程节点链表头 } MBS; // ================== 邮箱系统接口 ================== // 创建邮箱系统 MBS* create_mail_box_system(); // 销毁邮箱系统 void destroy_mail_box_system(MBS* mbs); // 注册线程到邮箱系统 int register_to_mail_system(MBS* mbs, char name[], th_fun th); // 等待所有线程结束 int wait_all_end(MBS* mbs); // 发送消息 int send_msg(MBS* mbs, char* recvname, MAIL_DATA* data); // 接收消息 int recv_msg(MBS* mbs, MAIL_DATA* data); #endif /* _MAILBOX_H_ */5. 邮箱系统实现mailbox.c
#include "mailbox.h" #include <pthread.h> #include <stdlib.h> #include <string.h> #include <stdio.h> #include "linkque.h" // 创建邮箱系统 MBS* create_mail_box_system() { MBS* m_mbs = malloc(sizeof(MBS)); if (NULL == m_mbs) { perror("create_mail_box_system malloc"); return NULL; } // 初始化链表头和互斥锁 INIT_LIST_HEAD(&m_mbs->head); pthread_mutex_init(&m_mbs->mutex, NULL); return m_mbs; } // 注册线程到邮箱系统 int register_to_mail_system(MBS* mbs, char name[], th_fun th) { LIST_DATA* list_node = malloc(sizeof(LIST_DATA)); if (NULL == list_node) { perror("register_to_mail_system malloc"); return 1; } // 初始化线程节点 strcpy(list_node->name, name); list_node->lq = CreateLinkQue(); // 为线程创建消息队列 list_node->th = th; // 添加到链表 list_add(&list_node->node, &mbs->head); // 创建线程 pthread_create(&list_node->tid, NULL, th, NULL); return 0; } // 等待所有注册的线程结束 int wait_all_end(MBS* mbs) { LIST_DATA *pos, *q; // 安全遍历所有线程节点 list_for_each_entry_safe(pos, q, &mbs->head, node) { pthread_join(pos->tid, NULL); // 等待线程结束 } return 0; } // 通过名称查找线程节点 LIST_DATA* find_node_byname(MBS* mbs, char* name) { LIST_DATA *pos, *q; // 遍历链表查找匹配名称的节点 list_for_each_entry_safe(pos, q, &mbs->head, node) { if (0 == strcmp(pos->name, name)) { return pos; } } return NULL; } // 通过线程ID查找线程节点 LIST_DATA* find_node_byid(MBS* mbs, pthread_t id) { LIST_DATA *pos, *q; // 遍历链表查找匹配ID的节点 list_for_each_entry_safe(pos, q, &mbs->head, node) { if (pos->tid == id) { return pos; } } return NULL; } // 发送消息给指定名称的线程 int send_msg(MBS* mbs, char* recvname, MAIL_DATA* data) { // 查找发送者自己的信息 LIST_DATA* myself = find_node_byid(mbs, pthread_self()); if (NULL == myself) { fprintf(stderr, "sendmsg error, find myself id\n"); return 1; } // 填充发送者信息到消息中 >#include <pthread.h> #include <stdio.h> #include <stdlib.h> #include <string.h> #include <time.h> #include <unistd.h> #include "linkque.h" #include "list.h" #include "mailbox.h" // 全局邮箱系统指针 MBS* g_mbs; // 显示线程函数 - 接收并显示温度数据 void* show_th(void* arg) { MAIL_DATA data; while (1) { bzero(&data, sizeof(MAIL_DATA)); // 清空数据缓冲区 // 尝试接收消息 int ret = recv_msg(g_mbs, &data); if (1 == ret) { sleep(1); // 没有消息,休眠1秒 continue; } // 显示接收到的消息 printf("show_th sendname[%s]: %s\n", data.name_of_sender, data.data); } return NULL; } // socket线程函数 - 接收并显示温度数据(模拟网络发送) void* sock_th(void* arg) { MAIL_DATA data; while (1) { bzero(&data, sizeof(MAIL_DATA)); // 清空数据缓冲区 // 尝试接收消息 int ret = recv_msg(g_mbs, &data); if (1 == ret) { sleep(1); // 没有消息,休眠1秒 continue; } // 显示接收到的消息 printf("sock_th sendname[%s]: %s\n", data.name_of_sender, data.data); } return NULL; } // 数据生成线程函数 - 生成随机温度数据并发送 void* data_th(void* arg) { srand(time(NULL)); // 初始化随机数种子 MAIL_DATA data; while (1) { // 生成30.00-50.00之间的随机温度 int num = rand() % 2000 + 3000; // 3000-4999 float tmp = num / 100.0; // 30.00-49.99 bzero(&data, sizeof(data)); // 清空数据缓冲区 // 格式化温度数据 sprintf(data.data, "Temperature: %.2f°C", tmp); // 发送给显示线程 send_msg(g_mbs, "show", &data); sleep(rand() % 3); // 随机休眠0-2秒 // 发送给socket线程 send_msg(g_mbs, "sock", &data); sleep(rand() % 2); // 随机休眠0-1秒 } return NULL; } // 主函数 int main(int argc, char** argv) { // 1. 创建邮箱系统 g_mbs = create_mail_box_system(); if (NULL == g_mbs) { fprintf(stderr, "Failed to create mail box system\n"); return 1; } printf("Mailbox system created successfully!\n"); // 2. 注册三个工作线程 printf("Registering threads...\n"); register_to_mail_system(g_mbs, "show", show_th); // 显示线程 register_to_mail_system(g_mbs, "sock", sock_th); // 网络线程 register_to_mail_system(g_mbs, "data", data_th); // 数据生成线程 printf("All threads registered. Starting communication...\n"); printf("==============================================\n"); // 3. 等待所有线程结束(实际上线程不会主动结束,需要Ctrl+C终止) wait_all_end(g_mbs); // 4. 销毁邮箱系统(正常情况下不会执行到这里) destroy_mail_box_system(g_mbs); return 0; }三、编译与运行
1. 编译脚本Makefile
CC = gcc CFLAGS = -Wall -g -pthread TARGET = mailbox_system OBJS = main.o linkque.o mailbox.o all: $(TARGET) $(TARGET): $(OBJS) $(CC) $(CFLAGS) -o $(TARGET) $(OBJS) main.o: main.c mailbox.h linkque.h list.h $(CC) $(CFLAGS) -c main.c linkque.o: linkque.c linkque.h $(CC) $(CFLAGS) -c linkque.c mailbox.o: mailbox.c mailbox.h linkque.h list.h $(CC) $(CFLAGS) -c mailbox.c clean: rm -f $(OBJS) $(TARGET) run: $(TARGET) ./$(TARGET)2. 编译命令
# 方法1:使用Makefile make make run # 方法2:直接编译 gcc -o mailbox_system main.c linkque.c mailbox.c -lpthread
3. 运行输出示例
Mailbox system created successfully! Registering threads... All threads registered. Starting communication... ============================================== show_th sendname[data]: Temperature: 42.15°C sock_th sendname[data]: Temperature: 42.15°C show_th sendname[data]: Temperature: 36.78°C sock_th sendname[data]: Temperature: 36.78°C show_th sendname[data]: Temperature: 45.92°C sock_th sendname[data]: Temperature: 45.92°C ...
四、系统设计与原理
1. 设计模式
生产者-消费者模式
生产者:
data_th线程生成数据消费者:
show_th和sock_th线程消费数据缓冲区: 消息队列作为缓冲区,解耦生产者和消费者
观察者模式
一个数据源(生产者)可以通知多个观察者(消费者)
观察者通过注册自己的消息队列来接收通知
2. 线程安全机制
// 关键同步点 pthread_mutex_lock(&mbs->mutex); // 临界区操作:队列入队/出队 pthread_mutex_unlock(&mbs->mutex);
3. 内存管理
创建时分配:
邮箱系统结构体
线程节点结构体
消息队列结构体
消息节点(入队时)
销毁时释放:
逆序释放所有分配的内存
防止内存泄漏
4. 消息传递流程
1. data_th线程生成数据 2. 调用send_msg()发送给"show"和"sock" 3. send_msg()查找接收者队列 4. 消息入队到接收者队列 5. show_th/sock_th调用recv_msg() 6. recv_msg()从自己的队列出队 7. 处理接收到的消息
五、扩展与优化
1. 功能扩展建议
// 1. 消息优先级 typedef struct mail_data { // ... 原有字段 int priority; // 消息优先级 time_t timestamp; // 发送时间戳 } MAIL_DATA; // 2. 消息确认机制 typedef struct ack_data { pthread_t original_sender; char message_id[64]; int status; // 0=未确认, 1=已接收, 2=已处理 } ACK_DATA; // 3. 广播消息 int broadcast_msg(MBS* mbs, MAIL_DATA* data) { LIST_DATA *pos, *q; list_for_each_entry_safe(pos, q, &mbs->head, node) { EnterLinkQue(pos->lq, data); } return 0; }2. 性能优化建议
锁粒度优化:
// 细粒度锁:每个队列单独一把锁 typedef struct thread_node { pthread_mutex_t queue_mutex; // 队列专用锁 // ... 其他字段 } LIST_DATA;内存池优化:
// 预分配消息节点内存池 #define POOL_SIZE 1000 LinkQueNode *node_pool[POOL_SIZE];
无锁队列:
使用原子操作实现无锁队列
适用高并发场景
3. 错误处理增强
// 添加错误码枚举 typedef enum { MB_SUCCESS = 0, MB_ERR_MALLOC, MB_ERR_QUEUE_FULL, MB_ERR_THREAD_NOT_FOUND, MB_ERR_TIMEOUT, // ... 其他错误码 } MB_ERROR_CODE; // 增强的错误处理函数 MB_ERROR_CODE send_msg_ex(MBS* mbs, char* recvname, MAIL_DATA* data, int timeout_ms);六、应用场景
1. 实时数据采集系统
传感器数据采集线程 → 数据处理线程 → 显示/存储线程
2. 网络服务器
接收线程 → 业务处理线程 → 发送线程
3. 事件驱动架构
事件源 → 事件分发器 → 事件处理器
4. 插件系统
主程序 → 插件管理器 → 各个插件
七、总结
这个邮箱系统实现具有以下特点:
优点:
模块化设计:链表、队列、邮箱系统分离,易于维护
线程安全:完善的锁机制保护共享资源
灵活扩展:基于Linux内核链表,易于添加新功能
异步通信:生产者消费者解耦,提高系统响应性
跨平台:基于POSIX线程,可移植性好
注意事项:
线程函数需要正确处理退出条件
消息队列可能积压,需要监控队列长度
线程名称必须唯一,否则消息路由会出错
需要考虑消息丢失和重复的处理