业务背景
报文按五元组(源/目的 IP、源/目的端口、VPN 等)识别业务,并为每条业务维护一个流对象。不同协议通常分表存放(如 TCP、UDP、ICMP)。idle 时间可按协议区分:本文约定TCP、UDP 为 3 分钟,ICMP 为 15 秒(可按现网调整);
现有实现与痛点
定时(例如每5 秒)做一次全表扫描:遍历各流表,将判定为老化的流送入老化队列。在数十万级流表规模下,全量遍历的 CPU 与缓存开销明显;扫描周期与表规模耦合,扩展性一般。
LRU 算法概要
LRU(Least Recently Used)在容量受限时,用访问顺序近似“冷热”:每次读/写某个键,即视为该键最近被使用;若需腾出位置,则淘汰最久未被访问的条目。
经典实现为std::unordered_map+std::list:哈希表做 O(1) 定位,双向链表维护全序;命中时用splice调整位置,均摊O(1)(哈希冲突在平均意义下)。
作用
在有容量上限或需要决定淘汰顺序时,优先保留“近期更可能再被访问”的条目。LRU 解决的是淘汰策略,不替代业务上的是否应失效(例如协议状态机、绝对超时),二者常组合使用。
优点
- 热点友好:高频访问的键停留在链表“热端”,不易被误淘汰。
- 均摊 O(1):查找、调整次序、在已知迭代器下删除,代价可控。
- 实现直观:顺序由链表显式表达,不必维护全表排序或复杂堆结构。
- 决策局部化:淘汰时机可与“插入已满”或定时扫描结合,避免无差别全表扫描。
下面给出定长缓存的最小示例(键值为int,未命中时get返回-1),便于与后续流表实现对照。
#include<list>#include<unordered_map>classLruCache{public:explicitLruCache(intcapacity):cap_(capacity){}intget(intkey){autoit=idx_.find(key);if(it==idx_.end())return-1;lst_.splice(lst_.begin(),lst_,it->second);returnit->second->second;}voidput(intkey,intvalue){autoit=idx_.find(key);if(it!=idx_.end()){it->second->second=value;lst_.splice(lst_.begin(),lst_,it->second);return;}if(static_cast<int>(lst_.size())==cap_){idx_.erase(lst_.back().first);lst_.pop_back();}lst_.emplace_front(key,value);idx_[key]=lst_.begin();}private:intcap_{};std::list<std::pair<int,int>>lst_;std::unordered_map<int,std::list<std::pair<int,int>>::iterator>idx_;};基于 LRU 的流老化实现(单表示例:UDP)
将「五元组 → 流」与 LRU 结合时,典型做法是:哈希表按 key 查找,双向链表按最近访问时间排序。与上一节LruCache(最近用在表头)的演示不同,下面按老化扫描习惯采用:
- 链表头(
front):最久未被命中的流(优先做超时判断)。 - 链表尾:最近一次命中的流(MRU 端)。
报文命中时更新lastHitSec,并把结点splice到表尾。若全表 idle相同(此处 UDP 为3 分钟),从表头扫描时:一旦表头未超时,其后结点更“新”,可提前结束(与定时全表扫描相比,显著减少无效遍历)。
#include<cstdint>#include<functional>#include<list>#include<unordered_map>#include<utility>structFlowKey{std::uint32_tsrcIp{};std::uint32_tdstIp{};std::uint16_tsrcPort{};std::uint16_tdstPort{};std::uint32_tvpnId{};booloperator==(constFlowKey&o)const{returnsrcIp==o.srcIp&&dstIp==o.dstIp&&srcPort==o.srcPort&&dstPort==o.dstPort&&vpnId==o.vpnId;}};structFlowKeyHash{std::size_toperator()(constFlowKey&k)constnoexcept{std::size_t h=k.srcIp;h^=std::hash<std::uint32_t>{}(k.dstIp)+0x9e3779b9+(h<<6)+(h>>2);h^=std::hash<std::uint32_t>{}(k.srcPort)+0x9e3779b9+(h<<6)+(h>>2);h^=std::hash<std::uint32_t>{}(k.dstPort)+0x9e3779b9+(h<<6)+(h>>2);h^=std::hash<std::uint32_t>{}(k.vpnId)+0x9e3779b9+(h<<6)+(h>>2);returnh;}};structFlowEntry{std::uint64_tlastHitSec{};};classUdpFlowTable{public:staticconstexprstd::uint64_tkUdpTtlSec=3*60;voidonHit(constFlowKey&key,std::uint64_tnowSec){autoit=idx_.find(key);if(it!=idx_.end()){it->second->second.lastHitSec=nowSec;lru_.splice(lru_.end(),lru_,it->second);return;}lru_.emplace_back(key,FlowEntry{nowSec});idx_.emplace(key,std::prev(lru_.end()));}voidagingScan(std::uint64_tnowSec){while(!lru_.empty()){constauto&head=lru_.front();if(nowSec-head.second.lastHitSec<=kUdpTtlSec)break;idx_.erase(head.first);lru_.pop_front();}}std::size_tsize()const{returnlru_.size();}private:usingNode=std::pair<FlowKey,FlowEntry>;std::list<Node>lru_;std::unordered_map<FlowKey,std::list<Node>::iterator,FlowKeyHash>idx_;};说明:agingScan中表头一旦未超时即break,依赖不变量「表头 = 全局最久未更新」;若存在不经过onHit更新时间的路径,需自行保证链表顺序不被破坏。
跨线程场景下的 LRU 更新消息与散列
问题背景
- 多线程并发收发报文;命中某条业务流后,需向流管理线程发送LRU 结点更新消息。
- 为抑制抖动,对每条流设置了发送等待时间(节流)。
- 流量突增时,大量流在同一时刻满足发送条件,更新消息在时间上扎堆,易造成队列拥塞、丢包,进而导致 LRU 链与真实访问顺序不一致。
思路
- 将“发送更新消息”的时刻在时间上摊开:为等待时间设定一个区间
[base, base + range)(具体单位与实现一致,如毫秒)。 - 利用每条流唯一的
lru_index(流对象内保存的 LRU 结点索引)对range取模,把不同流映射到区间内不同偏移。 - 实际等待时间取:
base + (lru_index % range)(若与现有节流字段叠加,注意上限与溢出)。
这样同一突发窗口内,各流的通知时刻在range上大致均匀分布,降低瞬时洪峰;不改变 LRU 的语义,只改变何时把“命中”同步到管理线程。
总结
- 问题:大规模流表下,周期性全表扫描老化成本高;多线程命中后集中上报 LRU 更新,易造成消息突发与丢包。
- LRU 与流表:用哈希 + 双向链表维护「五元组 → 流」与访问顺序;表头最久未命中、命中移尾时,在统一 idle下可从表头做超时判断并提前结束,避免每次扫全表。
- 跨线程:在节流基础上对发送时刻做按
lru_index的散列,把更新打散到[base, base + range),缓解拥塞、利于链表与真实流量一致(lru_index可为你在流对象中自定义的 LRU 结点索引或等价标识)。 - 落地注意:**idle(是否超时)**与 **LRU(先后顺序)**是两层逻辑