文件3: device_manager.cpp - 设备状态管理
/** * @file device_manager.cpp * @brief IoT设备管理器实现 * @author IoT Platform Team * @version 1.0 * @date 2024 * * 实现发布-订阅模式和观察者模式 * 管理设备注册、状态跟踪和分组广播 */ #include "config.h" #include <memory> #include <unordered_map> #include <unordered_set> #include <map> #include <set> #include <vector> #include <mutex> #include <shared_mutex> #include <algorithm> #include <chrono> #include <functional> namespace iot { /** * @class DeviceState * @brief 设备状态类 * @details 封装设备运行时状态信息 * * 设计模式:备忘录模式(Memento Pattern) * - 处理突发事件:设备状态丢失、状态恢复、历史状态查询 * - 工程作用:保存和恢复设备状态,支持状态回滚 */ class DeviceState { private: DeviceInfo device_info_; ///< 设备基本信息 std::atomic<bool> online_; ///< 在线状态(原子操作保证线程安全) uint64_t last_data_timestamp_; ///< 最后数据上报时间 uint64_t last_command_timestamp_; ///< 最后命令下发时间 // 传感器数据历史(循环缓冲区) static const size_t HISTORY_SIZE = 100; ///< 历史数据保留数量 std::vector<SensorData> data_history_; ///< 数据历史记录 size_t history_index_; ///< 历史记录索引 // 设备配置 std::map<std::string, std::string> configs_; ///< 设备配置项 // 统计信息 uint64_t total_data_points_; ///< 总数据点数 uint64_t total_commands_sent_; ///< 总发送命令数 uint64_t total_errors_; ///< 总错误数 mutable std::shared_mutex state_mutex_; ///< 状态读写锁(读多写少场景) public: /** * @brief 构造函数 * @param info 设备信息 */ DeviceState(const DeviceInfo& info) : device_info_(info), online_(false), last_data_timestamp_(0), last_command_timestamp_(0), history_index_(0), total_data_points_(0), total_commands_sent_(0), total_errors_(0) { data_history_.resize(HISTORY_SIZE); ///< 预分配历史数据空间 } /** * @brief 更新设备在线状态 * @param online 是否在线 * * 性能分析:原子操作,O(1)时间复杂度 */ void setOnline(bool online) { online_ = online; if (online) { device_info_.last_seen = WebSocketConnection::getCurrentTimestamp(); } } /** * @brief 检查设备是否在线 * @return 在线状态 */ bool isOnline() const { return online_; ///< 原子读取,无需加锁 } /** * @brief 更新设备最后活跃时间 */ void updateLastSeen() { std::unique_lock<std::shared_mutex> lock(state_mutex_); device_info_.last_seen = WebSocketConnection::getCurrentTimestamp(); } /** * @brief 添加传感器数据 * @param data 传感器数据 * * 使用循环缓冲区存储历史数据,避免无限增长 */ void addSensorData(const SensorData& data) { std::unique_lock<std::shared_mutex> lock(state_mutex_); // 更新最后数据时间 last_data_timestamp_ = data.timestamp; // 存储到循环缓冲区 data_history_[history_index_ % HISTORY_SIZE] = data; history_index_++; // 更新统计 total_data_points_++; // 更新设备信号强度(如果有) // 这里假设数据中包含信号强度信息 } /** * @brief 获取最近N条传感器数据 * @param count 数据条数 * @return 传感器数据列表 */ std::vector<SensorData> getRecentData(size_t count) const { std::shared_lock<std::shared_mutex> lock(state_mutex_); ///< 共享读锁 count = std::min(count, static_cast<size_t>(history_index_)); std::vector<SensorData> result; result.reserve(count); if (history_index_ == 0) { return result; ///< 没有历史数据 } // 从最新数据开始获取 size_t start = (history_index_ - 1) % HISTORY_SIZE; for (size_t i = 0; i < count; ++i) { size_t idx = (start - i + HISTORY_SIZE) % HISTORY_SIZE; result.push_back(data_history_[idx]); if (idx == 0 && i + 1 < count) { // 已经到达缓冲区开头 break; } } return result; } /** * @brief 更新设备配置 * @param key 配置键 * @param value 配置值 */ void updateConfig(const std::string& key, const std::string& value) { std::unique_lock<std::shared_mutex> lock(state_mutex_); configs_[key] = value; } /** * @brief 获取设备配置 * @param key 配置键 * @return 配置值,不存在返回空字符串 */ std::string getConfig(const std::string& key) const { std::shared_lock<std::shared_mutex> lock(state_mutex_); auto it = configs_.find(key); if (it != configs_.end()) { return it->second; } return ""; } /** * @brief 记录命令下发 */ void recordCommandSent() { std::unique_lock<std::shared_mutex> lock(state_mutex_); last_command_timestamp_ = WebSocketConnection::getCurrentTimestamp(); total_commands_sent_++; } /** * @brief 记录设备错误 */ void recordError() { std::unique_lock<std::shared_mutex> lock(state_mutex_); total_errors_++; } /** * @brief 获取设备信息快照(备忘录模式) * @return 设备信息副本 * * 用于状态保存和恢复 */ DeviceInfo getDeviceInfoSnapshot() const { std::shared_lock<std::shared_mutex> lock(state_mutex_); return device_info_; ///< 返回副本 } /** * @brief 从快照恢复设备信息 * @param snapshot 设备信息快照 * * 用于状态恢复 */ void restoreFromSnapshot(const DeviceInfo& snapshot) { std::unique_lock<std::shared_mutex> lock(state_mutex_); device_info_ = snapshot; } /** * @brief 获取设备统计信息 * @return 统计信息字符串 */ std::string getStats() const { std::shared_lock<std::shared_mutex> lock(state_mutex_); char buffer[256]; snprintf(buffer, sizeof(buffer), "Device: %s\n" "Type: %d\n" "Online: %s\n" "Data Points: %llu\n" "Commands Sent: %llu\n" "Errors: %llu\n" "Last Seen: %llu\n", device_info_.device_id, static_cast<int>(device_info_.type), online_ ? "Yes" : "No", total_data_points_, total_commands_sent_, total_errors_, device_info_.last_seen); return std::string(buffer); } // Getter方法(使用共享锁保护) DeviceType getDeviceType() const { std::shared_lock<std::shared_mutex> lock(state_mutex_); return device_info_.type; } const char* getDeviceId() const { /// @note: 返回设备ID指针,调用者需要保证设备状态对象存活 return device_info_.device_id; } uint64_t getLastSeen() const { std::shared_lock<std::shared_mutex> lock(state_mutex_); return device_info_.last_seen; } }; /** * @class DeviceGroup * @brief 设备分组类 * @details 管理设备分组,支持组播和广播 * * 设计模式:组合模式(Composite Pattern) * - 处理突发事件:设备加入/离开组、组状态同步、组权限变更 * - 工程作用:树形结构管理设备,支持递归操作 */ class DeviceGroup { private: std::string group_id_; ///< 组ID std::string group_name_; ///< 组名称 DeviceType allowed_types_; ///< 允许的设备类型 std::unordered_set<std::string> device_ids_; ///< 组内设备ID集合 std::vector<std::shared_ptr<DeviceGroup>> subgroups_; ///< 子分组 std::weak_ptr<DeviceGroup> parent_group_; ///< 父分组 mutable std::shared_mutex group_mutex_; ///< 分组读写锁 public: /** * @brief 构造函数 * @param id 组ID * @param name 组名称 * @param types 允许的设备类型(按位掩码) */ DeviceGroup(const std::string& id, const std::string& name, DeviceType types = DeviceType::UNKNOWN) : group_id_(id), group_name_(name), allowed_types_(types) {} /** * @brief 添加设备到组 * @param device_id 设备ID * @param device_type 设备类型 * @return 成功返回true */ bool addDevice(const std::string& device_id, DeviceType device_type) { std::unique_lock<std::shared_mutex> lock(group_mutex_); // 检查设备类型是否允许 if (allowed_types_ != DeviceType::UNKNOWN && allowed_types_ != device_type) { return false; } device_ids_.insert(device_id); return true; } /** * @brief 从组中移除设备 * @param device_id 设备ID */ void removeDevice(const std::string& device_id) { std::unique_lock<std::shared_mutex> lock(group_mutex_); device_ids_.erase(device_id); } /** * @brief 检查设备是否在组中 * @param device_id 设备ID * @return 存在返回true */ bool containsDevice(const std::string& device_id) const { std::shared_lock<std::shared_mutex> lock(group_mutex_); return device_ids_.find(device_id) != device_ids_.end(); } /** * @brief 获取组内所有设备ID * @return 设备ID列表 */ std::vector<std::string> getAllDevices() const { std::shared_lock<std::shared_mutex> lock(group_mutex_); std::vector<std::string> devices; devices.reserve(device_ids_.size()); // 添加本组设备 for (const auto& device_id : device_ids_) { devices.push_back(device_id); } // 递归添加子组设备(组合模式的核心) for (const auto& subgroup : subgroups_) { auto subgroup_devices = subgroup->getAllDevices(); devices.insert(devices.end(), subgroup_devices.begin(), subgroup_devices.end()); } return devices; } /** * @brief 添加子分组 * @param subgroup 子分组指针 */ void addSubgroup(std::shared_ptr<DeviceGroup> subgroup) { std::unique_lock<std::shared_mutex> lock(group_mutex_); subgroup->setParent(shared_from_this()); subgroups_.push_back(subgroup); } /** * @brief 设置父分组 * @param parent 父分组指针 */ void setParent(std::shared_ptr<DeviceGroup> parent) { parent_group_ = parent; } /** * @brief 获取分组信息 * @return 分组信息字符串 */ std::string getGroupInfo() const { std::shared_lock<std::shared_mutex> lock(group_mutex_); char buffer[512]; snprintf(buffer, sizeof(buffer), "Group ID: %s\n" "Group Name: %s\n" "Device Count: %zu\n" "Subgroup Count: %zu\n", group_id_.c_str(), group_name_.c_str(), device_ids_.size(), subgroups_.size()); return std::string(buffer); } // Getter方法 const std::string& getGroupId() const { return group_id_; } const std::string& getGroupName() const { return group_name_; } private: /// @brief 获取指向自身的shared_ptr(用于组合模式) std::shared_ptr<DeviceGroup> shared_from_this() { return std::static_pointer_cast<DeviceGroup>( std::enable_shared_from_this<DeviceGroup>::shared_from_this()); } }; /** * @class DeviceManager * @brief 设备管理器类 * @details 管理所有设备的状态和分组 * * 设计模式:发布-订阅模式(Pub/Sub Pattern) * - 处理突发事件:设备状态变化通知、批量设备操作、设备发现 * - 工程作用:解耦设备状态管理和其他组件 */ class DeviceManager : public std::enable_shared_from_this<DeviceManager> { private: // 设备状态存储 std::unordered_map<std::string, std::shared_ptr<DeviceState>> devices_; ///< 设备ID到状态的映射 std::unordered_map<DeviceType, std::unordered_set<std::string>> devices_by_type_; ///< 按类型索引 // 设备分组 std::unordered_map<std::string, std::shared_ptr<DeviceGroup>> groups_; ///< 分组映射 std::unordered_map<std::string, std::unordered_set<std::string>> device_groups_; ///< 设备所属分组 // 连接映射(设备ID到WebSocket连接) std::unordered_map<std::string, int> device_connections_; ///< 设备ID到文件描述符的映射 std::unordered_map<int, std::string> connection_devices_; ///< 文件描述符到设备ID的映射 // 观察者模式:状态变化回调 struct DeviceEventHandler { std::function<void(const std::string&, bool)> on_online_change; ///< 在线状态变化 std::function<void(const std::string&, const SensorData&)> on_data_received; ///< 数据接收 std::function<void(const std::string&, const std::string&)> on_config_updated; ///< 配置更新 }; DeviceEventHandler event_handler_; ///< 事件处理器 // 线程安全 mutable std::shared_mutex devices_mutex_; ///< 设备数据读写锁 mutable std::shared_mutex groups_mutex_; ///< 分组数据读写锁 mutable std::shared_mutex connections_mutex_; ///< 连接数据读写锁 public: /** * @brief 构造函数 */ DeviceManager() = default; /** * @brief 注册设备 * @param device_info 设备信息 * @param connection_fd 连接文件描述符 * @return 成功返回true * * 性能分析:O(1)平均时间复杂度(哈希表操作) */ bool registerDevice(const DeviceInfo& device_info, int connection_fd) { std::string device_id(device_info.device_id); { std::unique_lock<std::shared_mutex> lock1(devices_mutex_); std::unique_lock<std::shared_mutex> lock2(connections_mutex_); // 检查设备是否已注册 if (devices_.find(device_id) != devices_.end()) { // 设备已存在,更新连接 auto it = device_connections_.find(device_id); if (it != device_connections_.end()) { // 移除旧的连接映射 connection_devices_.erase(it->second); it->second = connection_fd; } connection_devices_[connection_fd] = device_id; // 更新设备状态为在线 devices_[device_id]->setOnline(true); devices_[device_id]->updateLastSeen(); return true; } // 创建新的设备状态 auto device_state = std::make_shared<DeviceState>(device_info); device_state->setOnline(true); // 添加到设备映射 devices_[device_id] = device_state; // 添加到类型索引 devices_by_type_[device_info.type].insert(device_id); // 添加到连接映射 device_connections_[device_id] = connection_fd; connection_devices_[connection_fd] = device_id; } // 触发事件回调(发布-订阅模式) if (event_handler_.on_online_change) { event_handler_.on_online_change(device_id, true); } std::cout << "Device registered: " << device_id << " (Type: " << static_cast<int>(device_info.type) << ")" << std::endl; return true; } /** * @brief 设备注销 * @param device_id 设备ID * * 注意:不删除设备状态,只标记为离线 */ void unregisterDevice(const std::string& device_id) { { std::unique_lock<std::shared_mutex> lock(devices_mutex_); auto it = devices_.find(device_id); if (it != devices_.end()) { it->second->setOnline(false); } } // 清理连接映射 { std::unique_lock<std::shared_mutex> lock(connections_mutex_); auto conn_it = device_connections_.find(device_id); if (conn_it != device_connections_.end()) { connection_devices_.erase(conn_it->second); device_connections_.erase(conn_it); } } // 触发事件回调 if (event_handler_.on_online_change) { event_handler_.on_online_change(device_id, false); } std::cout << "Device unregistered: " << device_id << std::endl; } /** * @brief 根据连接文件描述符查找设备ID * @param connection_fd 连接文件描述符 * @return 设备ID,找不到返回空字符串 */ std::string getDeviceIdByConnection(int connection_fd) const { std::shared_lock<std::shared_mutex> lock(connections_mutex_); auto it = connection_devices_.find(connection_fd); if (it != connection_devices_.end()) { return it->second; } return ""; } /** * @brief 根据设备ID查找连接文件描述符 * @param device_id 设备ID * @return 连接文件描述符,找不到返回-1 */ int getConnectionByDeviceId(const std::string& device_id) const { std::shared_lock<std::shared_mutex> lock(connections_mutex_); auto it = device_connections_.find(device_id); if (it != device_connections_.end()) { return it->second; } return -1; } /** * @brief 处理设备数据 * @param device_id 设备ID * @param data 传感器数据 */ void handleDeviceData(const std::string& device_id, const SensorData& data) { std::shared_ptr<DeviceState> device_state; { std::shared_lock<std::shared_mutex> lock(devices_mutex_); auto it = devices_.find(device_id); if (it == devices_.end()) { std::cerr << "Unknown device: " << device_id << std::endl; return; } device_state = it->second; } // 更新设备状态 device_state->addSensorData(data); device_state->updateLastSeen(); // 触发事件回调 if (event_handler_.on_data_received) { event_handler_.on_data_received(device_id, data); } // 调试输出 std::cout << "Data from device " << device_id << " at " << data.timestamp << std::endl; } /** * @brief 创建设备分组 * @param group_id 组ID * @param group_name 组名称 * @param allowed_types 允许的设备类型 * @return 成功返回true */ bool createGroup(const std::string& group_id, const std::string& group_name, DeviceType allowed_types = DeviceType::UNKNOWN) { std::unique_lock<std::shared_mutex> lock(groups_mutex_); if (groups_.find(group_id) != groups_.end()) { return false; ///< 分组已存在 } auto group = std::make_shared<DeviceGroup>(group_id, group_name, allowed_types); groups_[group_id] = group; std::cout << "Group created: " << group_id << " (" << group_name << ")" << std::endl; return true; } /** * @brief 添加设备到分组 * @param device_id 设备ID * @param group_id 组ID * @return 成功返回true */ bool addDeviceToGroup(const std::string& device_id, const std::string& group_id) { std::shared_ptr<DeviceState> device_state; std::shared_ptr<DeviceGroup> group; { std::shared_lock<std::shared_mutex> lock1(devices_mutex_); std::shared_lock<std::shared_mutex> lock2(groups_mutex_); auto device_it = devices_.find(device_id); if (device_it == devices_.end()) { return false; ///< 设备不存在 } device_state = device_it->second; auto group_it = groups_.find(group_id); if (group_it == groups_.end()) { return false; ///< 分组不存在 } group = group_it->second; } // 添加设备到分组 if (group->addDevice(device_id, device_state->getDeviceType())) { // 更新设备分组索引 std::unique_lock<std::shared_mutex> lock(groups_mutex_); device_groups_[device_id].insert(group_id); std::cout << "Device " << device_id << " added to group " << group_id << std::endl; return true; } return false; } /** * @brief 获取分组内所有设备 * @param group_id 组ID * @return 设备ID列表 */ std::vector<std::string> getDevicesInGroup(const std::string& group_id) const { std::shared_lock<std::shared_mutex> lock(groups_mutex_); auto it = groups_.find(group_id); if (it == groups_.end()) { return {}; } return it->second->getAllDevices(); } /** * @brief 获取设备所在的所有分组 * @param device_id 设备ID * @return 分组ID列表 */ std::vector<std::string> getGroupsForDevice(const std::string& device_id) const { std::shared_lock<std::shared_mutex> lock(groups_mutex_); auto it = device_groups_.find(device_id); if (it == device_groups_.end()) { return {}; } return std::vector<std::string>(it->second.begin(), it->second.end()); } /** * @brief 获取所有在线设备 * @return 在线设备ID列表 */ std::vector<std::string> getAllOnlineDevices() const { std::vector<std::string> online_devices; { std::shared_lock<std::shared_mutex> lock(devices_mutex_); online_devices.reserve(devices_.size()); for (const auto& pair : devices_) { if (pair.second->isOnline()) { online_devices.push_back(pair.first); } } } return online_devices; } /** * @brief 获取指定类型的设备 * @param type 设备类型 * @return 设备ID列表 */ std::vector<std::string> getDevicesByType(DeviceType type) const { std::vector<std::string> devices; { std::shared_lock<std::shared_mutex> lock(devices_mutex_); auto it = devices_by_type_.find(type); if (it != devices_by_type_.end()) { devices.reserve(it->second.size()); devices.insert(devices.end(), it->second.begin(), it->second.end()); } } return devices; } /** * @brief 获取设备统计信息 * @return 统计信息字符串 */ std::string getStats() const { std::shared_lock<std::shared_mutex> lock1(devices_mutex_); std::shared_lock<std::shared_mutex> lock2(groups_mutex_); std::shared_lock<std::shared_mutex> lock3(connections_mutex_); char buffer[1024]; snprintf(buffer, sizeof(buffer), "=== Device Manager Stats ===\n" "Total Devices: %zu\n" "Online Devices: %zu\n" "Total Groups: %zu\n" "Active Connections: %zu\n" "Device Types: %zu\n", devices_.size(), std::count_if(devices_.begin(), devices_.end(), [](const auto& p) { return p.second->isOnline(); }), groups_.size(), connection_devices_.size(), devices_by_type_.size()); return std::string(buffer); } /** * @brief 设置事件处理器 * @param handler 事件处理器结构体 */ void setEventHandler(const DeviceEventHandler& handler) { event_handler_ = handler; } /** * @brief 清理过期设备(长时间离线) * @param timeout_seconds 超时时间(秒) * @return 清理的设备数量 * * 性能分析:O(n)时间复杂度,n为设备总数 */ size_t cleanupExpiredDevices(uint64_t timeout_seconds) { uint64_t now = WebSocketConnection::getCurrentTimestamp(); uint64_t timeout_ms = timeout_seconds * 1000; std::vector<std::string> expired_devices; { std::shared_lock<std::shared_mutex> lock(devices_mutex_); for (const auto& pair : devices_) { const auto& device_state = pair.second; if (!device_state->isOnline() && (now - device_state->getLastSeen()) > timeout_ms) { expired_devices.push_back(pair.first); } } } // 移除过期设备 for (const auto& device_id : expired_devices) { unregisterDevice(device_id); } return expired_devices.size(); } }; } // namespace iot设计模式分析(第三部分)
1.发布-订阅模式(Pub/Sub Pattern)
应用位置:
DeviceManager类的事件处理器处理突发事件:
设备状态变化:自动通知所有订阅者
数据到达风暴:异步回调避免阻塞
组件解耦:状态管理不依赖具体业务逻辑
工程作用:
松耦合架构,便于扩展
支持多订阅者同时处理事件
异步处理提高系统响应性
性能分析:
事件分发:O(1)直接函数调用
内存开销:每个回调函数对象约16-32字节
并发处理:回调在调用者线程执行,需要注意线程安全
2.备忘录模式(Memento Pattern)
应用位置:
DeviceState类的状态快照功能处理突发事件:
系统崩溃恢复:从快照恢复设备状态
状态回滚:操作错误时恢复之前状态
历史状态查询:提供状态变化历史
工程作用:
状态持久化支持
操作原子性保证
调试和监控支持
性能分析:
快照创建:O(1)复制固定大小结构
状态恢复:O(1)恢复操作
内存占用:快照占用额外内存,但可控制频率
3.组合模式(Composite Pattern)
应用位置:
DeviceGroup类的树形结构管理处理突发事件:
动态分组调整:支持运行时添加/删除子组
递归操作:一键操作整个组树
权限继承:子组继承父组属性
工程作用:
统一处理单个对象和对象组合
支持复杂的设备组织结构
简化客户端代码
性能分析:
遍历操作:O(n)递归遍历,n为总设备数
内存结构:树形结构增加指针开销
查找效率:哈希表辅助查找提高性能
4.观察者模式(Observer Pattern)
应用位置:
DeviceManager的事件回调机制与Pub/Sub的关系:观察者模式是Pub/Sub的特殊形式
处理突发事件:状态变化实时通知
运行性能分析
数据结构优化:
哈希表为主:O(1)平均查找时间
读写锁优化:读多写少场景使用shared_mutex
索引冗余:多种索引方式加快不同查询
内存管理:
智能指针:自动内存管理,避免泄漏
循环缓冲区:固定大小历史数据,避免无限增长
预分配:vector预分配减少重分配
并发控制:
锁粒度优化:不同数据使用不同锁,减少竞争
原子操作:在线状态使用原子变量,无需锁
读写分离:读操作使用共享锁,支持并发读
突发事件处理策略
| 突发事件 | 处理策略 | 设计模式应用 |
|---|---|---|
| 设备频繁上下线 | 状态缓存,去抖动处理 | 备忘录模式保存状态 |
| 分组结构频繁变动 | 延迟更新,批量操作 | 组合模式支持动态调整 |
| 事件处理阻塞 | 异步回调,线程池处理 | 发布-订阅模式解耦 |
| 内存泄漏 | 智能指针,定期清理 | RAII模式自动管理 |
| 数据一致性 | 读写锁,原子操作 | 事务性状态更新 |
数据流结构大小设计
1. 设备状态内存占用:
DeviceState内存占用估算: - DeviceInfo: 64字节 - 原子变量: 1字节(对齐到8字节) - 时间戳: 16字节 - 历史数据: 100 * sizeof(SensorData) ≈ 100 * 50 = 5000字节 - 配置映射: 平均10个配置,每个平均32字节 = 320字节 - 统计信息: 24字节 - 锁: 56字节(shared_mutex) - 总计: ~5500字节/设备
2. 分组管理开销:
DeviceGroup内存占用: - 字符串: 平均64字节 - 集合: 每个设备ID 40字节(字符串+节点开销) - 子组指针: 每个8字节 - 锁: 56字节 - 总计: 基础128字节 + 每个设备40字节
3. 连接映射优化:
双向映射:设备↔连接,O(1)双向查找
哈希表负载因子控制:0.75保持性能
预分配桶数量:基于最大连接数预估
第三部分总结:实现了设备状态管理和分组功能,采用发布-订阅模式处理设备事件,备忘录模式支持状态恢复,组合模式管理设备分组。为大规模IoT设备提供了高效的状态管理和组织能力。