一、限流算法理论基础与实现
1.1 计数器法:最简单直接的限流实现
计数器法是最基础也是最容易实现的限流算法,其核心思想是在固定时间窗口内统计请求次数,超过阈值则拒绝后续请求。
算法原理图解:
text
时间窗口:1分钟,阈值:100请求 ┌─────────────────────────────────┐ │ 0:00 0:30 0:59 │ │ ├──────┬──────┬──────┬───────┤│ │ │ 45 │ 30 │ 25 │ ││ │ └──────┴──────┴──────┴───────┘│ └─────────────────────────────────┘ 当0:59时,统计0:00-0:59的请求总数=100,新请求被拒绝
伪代码实现:
java
public class CounterLimiter { private long timeStamp = System.currentTimeMillis(); // 当前时间 private long limitCount = 100; // 限流阈值 private long interval = 60 * 1000; // 时间窗口(毫秒) private long requestCount = 0; // 当前窗口请求数 public synchronized boolean tryAcquire() { long now = System.currentTimeMillis(); // 判断是否进入下一个时间窗口 if (now - timeStamp < interval) { // 仍在当前窗口 if (requestCount < limitCount) { requestCount++; return true; } else { return false; // 限流 } } else { // 进入新的时间窗口 timeStamp = now; requestCount = 1; // 重置计数器 return true; } } }计数器法的缺陷:
临界问题:在时间窗口切换瞬间可能出现流量突刺
精度问题:固定窗口导致统计不够精确
无法应对突发流量:窗口内前半段空闲,后半段突发流量仍会被拒绝
1.2 滑动时间窗口算法:解决计数器法的精度问题
滑动窗口算法是对计数器法的改进,将固定时间窗口划分为多个小格子,每次滑动一个格子,使统计更加精确平滑。
算法原理图解:
text
时间窗口:1分钟,划分为6个格子(每格10秒) 初始状态:格子0-5 [ 0-9s:15 ][10-19s:20][20-29s:25][30-39s:18][40-49s:22][50-59s:30] 总请求=130 滑动后:格子1-6 [10-19s:20][20-29s:25][30-39s:18][40-49s:22][50-59s:30][60-69s:0 ] 总请求=115 ↑新增格子 ↑移除最旧格子
Sentinel滑动窗口源码实现分析:
核心类:LeapArray和WindowWrap
java
// Sentinel中的滑动窗口核心数据结构 public abstract class LeapArray<T> { // 窗口长度(毫秒) protected int windowLengthInMs; // 采样窗口数量 protected int sampleCount; // 总的时间窗口长度(毫秒)= windowLengthInMs * sampleCount protected int intervalInMs; // 窗口数组 protected final AtomicReferenceArray<WindowWrap<T>> array; // 获取当前时间窗口 public WindowWrap<T> currentWindow(long timeMillis) { if (timeMillis < 0) { return null; } // 计算当前时间对应的窗口索引 int idx = calculateTimeIdx(timeMillis); // 计算窗口开始时间 long windowStart = calculateWindowStart(timeMillis); while (true) { WindowWrap<T> old = array.get(idx); if (old == null) { // 创建新窗口 WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis)); if (array.compareAndSet(idx, null, window)) { return window; } else { Thread.yield(); } } else if (windowStart == old.windowStart()) { // 正好是当前窗口 return old; } else if (windowStart > old.windowStart()) { // 需要创建新窗口,替换旧窗口 if (updateLock.tryLock()) { try { return resetWindowTo(old, windowStart); } finally { updateLock.unlock(); } } else { Thread.yield(); } } else if (windowStart < old.windowStart()) { // 不应该发生,说明时钟回拨 return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis)); } } } }窗口包装类WindowWrap:
java
public class WindowWrap<T> { // 窗口长度(毫秒) private final long windowLengthInMs; // 窗口开始时间(毫秒) private long windowStart; // 窗口存储的数据(如统计值) private T value; // 添加值到窗口 public void addValue(T item) { // 具体实现由子类决定 } // 重置窗口 public void resetTo(long startTime) { this.windowStart = startTime; // 重置存储的数据 } }滑动窗口统计实现BucketLeapArray:
java
public class BucketLeapArray extends LeapArray<MetricBucket> { public BucketLeapArray(int sampleCount, int intervalInMs) { // 默认每个窗口500ms,2个窗口组成1秒的时间窗口 super(sampleCount, intervalInMs); } @Override public MetricBucket newEmptyBucket(long time) { return new MetricBucket(); } @Override protected WindowWrap<MetricBucket> resetWindowTo(WindowWrap<MetricBucket> windowWrap, long startTime) { windowWrap.resetTo(startTime); return windowWrap; } }统计值类MetricBucket:
java
public class MetricBucket { // 使用LongAdder保证并发安全和高性能 private final LongAdder[] counters; // 统计维度 private static final int PASS = 0; // 通过 private static final int BLOCK = 1; // 阻塞 private static final int EXCEPTION = 2; // 异常 private static final int SUCCESS = 3; // 成功 private static final int RT = 4; // 响应时间 public MetricBucket() { counters = new LongAdder[MetricEvent.values().length]; for (int i = 0; i < counters.length; i++) { counters[i] = new LongAdder(); } } // 增加统计值 public void add(MetricEvent event, long count) { counters[event.ordinal()].add(count); } // 获取统计值 public long get(MetricEvent event) { return counters[event.ordinal()].sum(); } }滑动窗口算法的优势:
更精确的统计:通过多格子细分,统计更接近真实流量
平滑过渡:窗口滑动而非跳跃,避免临界问题
可配置精度:格子数越多,统计越精确(但内存消耗越大)
1.3 漏桶算法:恒定速率处理请求
漏桶算法模拟一个固定容量的漏桶,请求以任意速率进入桶内,但以恒定速率流出,当桶满时新请求被拒绝。
算法原理图解:
text
请求流入 → [ 漏桶(容量100) ] → 恒定速率流出(10请求/秒) ↑ ↑ 任意速率 恒定速率
伪代码实现:
java
public class LeakyBucketLimiter { private long capacity; // 桶容量 private long rate; // 流出速率(请求/秒) private long water; // 当前水量(当前请求数) private long lastLeakTime; // 上次漏水时间 public LeakyBucketLimiter(long capacity, long rate) { this.capacity = capacity; this.rate = rate; this.water = 0; this.lastLeakTime = System.currentTimeMillis(); } public synchronized boolean tryAcquire() { leakWater(); // 先漏水 if (water < capacity) { water++; // 加水 return true; } else { return false; // 桶满,拒绝 } } private void leakWater() { long now = System.currentTimeMillis(); long elapsedTime = now - lastLeakTime; // 计算这段时间应该漏掉的水量 long leakAmount = elapsedTime * rate / 1000; if (leakAmount > 0) { water = Math.max(0, water - leakAmount); lastLeakTime = now; } } }Sentinel中的漏桶算法实现:
Sentinel中的匀速排队(Rate Limiter)模式就是基于漏桶算法实现的:
java
// 匀速排队控制器 public class RateLimiterController implements TrafficShapingController { private final int maxQueueingTimeMs; // 最大排队时间 private final double count; // 阈值 private final AtomicLong latestPassedTime = new AtomicLong(-1); public RateLimiterController(int timeOut, double count) { this.maxQueueingTimeMs = timeOut; this.count = count; } @Override public boolean canPass(Node node, int acquireCount, boolean prioritized) { // 当请求来到的时间点,最近一次请求通过的时间点 long currentTime = TimeUtil.currentTimeMillis(); // 计算每个请求的理论通过时间间隔 long costTime = Math.round(1.0 * (acquireCount) / count * 1000); // 理论通过时间 long expectedTime = costTime + latestPassedTime.get(); if (expectedTime <= currentTime) { // 可以通过,更新最后通过时间 latestPassedTime.set(currentTime); return true; } else { // 计算需要等待的时间 long waitTime = costTime + latestPassedTime.get() - currentTime; if (waitTime > maxQueueingTimeMs) { // 等待时间超过最大排队时间,拒绝 return false; } else { // 更新最后通过时间 long oldTime = latestPassedTime.addAndGet(costTime); try { // 等待 waitTime = oldTime - TimeUtil.currentTimeMillis(); if (waitTime > 0) { Thread.sleep(waitTime); } return true; } catch (InterruptedException e) { // 处理中断 return false; } } } } }1.4 令牌桶算法:允许突发流量
令牌桶算法以恒定速率生成令牌放入桶中,请求需要获取令牌才能通过,当桶满时新令牌被丢弃,桶空时请求被拒绝。
算法原理图解:
text
令牌生成(10令牌/秒)→ [ 令牌桶(容量100) ] → 请求获取令牌 ↑ ↓ 恒定速率 突发获取
伪代码实现:
java
public class TokenBucketLimiter { private long capacity; // 桶容量 private long rate; // 令牌生成速率(令牌/秒) private long tokens; // 当前令牌数 private long lastRefillTime; // 上次补充令牌时间 public TokenBucketLimiter(long capacity, long rate) { this.capacity = capacity; this.rate = rate; this.tokens = capacity; // 初始时桶满 this.lastRefillTime = System.currentTimeMillis(); } public synchronized boolean tryAcquire() { refillTokens(); // 先补充令牌 if (tokens > 0) { tokens--; // 消耗令牌 return true; } else { return false; // 令牌不足,拒绝 } } private void refillTokens() { long now = System.currentTimeMillis(); long elapsedTime = now - lastRefillTime; // 计算这段时间应该补充的令牌数 long refillAmount = elapsedTime * rate / 1000; if (refillAmount > 0) { tokens = Math.min(capacity, tokens + refillAmount); lastRefillTime = now; } } }Sentinel中的Warm Up预热模式:
Sentinel的Warm Up模式基于令牌桶算法实现,在冷启动阶段缓慢提升流量:
java
// 预热控制器 public class WarmUpController implements TrafficShapingController { protected double count; // 阈值 private int coldFactor; // 冷启动因子,默认3 private int warningToken = 0; // 预警令牌数 private int maxToken; // 最大令牌数 private double slope; // 斜率 private AtomicLong storedTokens = new AtomicLong(0); // 当前存储的令牌数 private AtomicLong lastFilledTime = new AtomicLong(0); // 上次填充时间 public WarmUpController(double count, int warmUpPeriodInSec, int coldFactor) { construct(count, warmUpPeriodInSec, coldFactor); } private void construct(double count, int warmUpPeriodInSec, int coldFactor) { if (coldFactor <= 1) { throw new IllegalArgumentException("Cold factor should be larger than 1"); } this.count = count; this.coldFactor = coldFactor; // 阈值除以冷启动因子,得到预热期间的阈值 warningToken = (int)(warmUpPeriodInSec * count) / (coldFactor - 1); maxToken = warningToken + (int)(2 * warmUpPeriodInSec * count / (1.0 + coldFactor)); // 斜率 = (coldFactor - 1) / count / (maxToken - warningToken) slope = (coldFactor - 1.0) / count / (maxToken - warningToken); } @Override public boolean canPass(Node node, int acquireCount, boolean prioritized) { long passQps = (long) node.passQps(); // 当前通过的QPS long previousQps = (long) node.previousPassQps(); // 上一秒的QPS syncToken(previousQps); // 同步令牌 // 开始计算当前消耗的令牌数 long restToken = storedTokens.get(); if (restToken >= warningToken) { // 当前令牌数高于预警值,说明还在预热期 long aboveToken = restToken - warningToken; // 警告区间消耗令牌的速度更快 double warningQps = Math.nextUp(1.0 / (aboveToken * slope + 1.0 / count)); if (passQps + acquireCount <= warningQps) { return true; } } else { // 已经过了预热期,使用正常的阈值 if (passQps + acquireCount <= count) { return true; } } return false; } // 同步令牌,根据上一秒的QPS计算当前应存储的令牌数 protected void syncToken(long passQps) { long currentTime = TimeUtil.currentTimeMillis(); currentTime = currentTime - currentTime % 1000; long oldLastFillTime = lastFilledTime.get(); if (currentTime <= oldLastFillTime) { return; } long oldValue = storedTokens.get(); long newValue = coolDownTokens(currentTime, passQps); if (storedTokens.compareAndSet(oldValue, newValue)) { // 从当前存储的令牌中减去上一秒的QPS long currentValue = storedTokens.addAndGet(0 - passQps); if (currentValue < 0) { storedTokens.set(0L); } lastFilledTime.set(currentTime); } } // 冷却(计算当前应存储的令牌数) private long coolDownTokens(long currentTime, long passQps) { long oldValue = storedTokens.get(); long newValue = oldValue; // 添加令牌的前提:当前令牌数低于预警值 if (oldValue < warningToken) { newValue = (long)(oldValue + (currentTime - lastFilledTime.get()) * count / 1000); } else if (oldValue > warningToken) { if (passQps < (int)count / coldFactor) { // 当前QPS低于阈值/冷启动因子,说明系统压力不大,可以增加令牌 newValue = (long)(oldValue + (currentTime - lastFilledTime.get()) * count / 1000); } } return Math.min(newValue, maxToken); } }1.5 限流算法对比总结
| 算法 | 原理 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|---|
| 计数器法 | 固定时间窗口计数 | 实现简单,内存消耗小 | 临界问题,精度低 | 简单限流场景 |
| 滑动窗口 | 多个小时间窗口滑动统计 | 统计更精确,解决临界问题 | 内存消耗较大 | 需要精确统计的场景 |
| 漏桶算法 | 恒定速率处理,桶满则溢 | 输出流量恒定,平滑突发 | 无法应对突发流量 | 需要恒定速率处理的场景 |
| 令牌桶算法 | 恒定速率生成令牌 | 允许突发流量,灵活性高 | 实现较复杂 | 需要应对突发流量的场景 |
二、Sentinel核心架构:Slot责任链模式
2.1 Sentinel整体架构设计
Sentinel采用Slot责任链模式处理每个请求,每个Slot负责不同的功能,形成完整的处理流水线:
text
请求入口 ↓ NodeSelectorSlot(节点选择) ↓ ClusterBuilderSlot(集群构建) ↓ LogSlot(日志记录) ↓ StatisticSlot(统计) ↓ AuthoritySlot(授权) ↓ SystemSlot(系统保护) ↓ FlowSlot(流量控制) ↓ DegradeSlot(熔断降级) ↓ 业务逻辑执行/返回结果
2.2 核心Slot源码分析
2.2.1 NodeSelectorSlot:资源调用树构建
java
// NodeSelectorSlot负责构建调用树,记录资源之间的调用关系 public class NodeSelectorSlot extends AbstractLinkedProcessorSlot<DefaultNode> { // 线程本地变量,存储当前上下文 private ThreadLocal<Context> contextThreadLocal = new ThreadLocal<>(); @Override public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable { // 获取或创建默认节点 DefaultNode node = context.getCurNode(); if (node == null) { // 这里可能会初始化入口节点 node = new EntranceNode(new StringResourceWrapper(resourceWrapper.getName(), EntryType.IN)); } // 设置当前节点 context.setCurNode(node); // 触发下一个Slot fireEntry(context, resourceWrapper, node, count, prioritized, args); } @Override public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) { // 清理上下文 context.setCurNode(null); // 触发下一个Slot fireExit(context, resourceWrapper, count, args); } }2.2.2 StatisticSlot:指标统计核心
java
// StatisticSlot负责统计各项指标,是限流和熔断的基础 public class StatisticSlot extends AbstractLinkedProcessorSlot<DefaultNode> { @Override public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable { try { // 触发下一个Slot fireEntry(context, resourceWrapper, node, count, prioritized, args); // 请求通过,增加通过计数 node.increaseThreadNum(); node.addPassRequest(count); // 调用成功,增加成功计数 if (context.getCurEntry().getError() == null) { node.increaseSuccess(count); } } catch (BlockException e) { // 被限流或降级,增加阻塞计数 node.increaseBlockQps(count); throw e; } catch (Throwable e) { // 出现业务异常,增加异常计数 node.increaseExceptionQps(count); throw e; } } @Override public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) { // 减少线程数 node.decreaseThreadNum(); // 记录响应时间 Entry entry = context.getCurEntry(); if (entry.getError() == null) { long rt = TimeUtil.currentTimeMillis() - entry.getCreateTime(); if (rt > TimeUtil.currentTimeMillis()) { rt = 0L; } node.addRt(rt); } // 触发下一个Slot fireExit(context, resourceWrapper, count, args); } }2.2.3 FlowSlot:流量控制实现
java
// FlowSlot负责流量控制,检查是否超过阈值 public class FlowSlot extends AbstractLinkedProcessorSlot<DefaultNode> { private final FlowRuleChecker checker; public FlowSlot() { this(new FlowRuleChecker()); } @Override public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable { // 检查流量规则 checker.checkFlow(resourceWrapper, context, node, count, prioritized); // 触发下一个Slot fireEntry(context, resourceWrapper, node, count, prioritized, args); } } // 流量规则检查器 class FlowRuleChecker { public void checkFlow(ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized) throws BlockException { // 检查所有流量规则 List<FlowRule> rules = FlowRuleManager.getRulesForResource(resource.getName()); if (rules != null) { for (FlowRule rule : rules) { if (!canPassCheck(rule, context, node, count, prioritized)) { throw new FlowException(rule.getLimitApp(), rule); } } } } private boolean canPassCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount, boolean prioritized) { // 根据规则类型选择不同的检查策略 switch (rule.getGrade()) { case RuleConstant.FLOW_GRADE_QPS: // QPS限流检查 return passQpsCheck(rule, context, node, acquireCount, prioritized); case RuleConstant.FLOW_GRADE_THREAD: // 线程数限流检查 return passThreadCheck(rule, context, node); default: return true; } } private boolean passQpsCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount, boolean prioritized) { // 获取限流控制器 TrafficShapingController controller = rule.getRater(); if (controller == null) { // 根据策略创建对应的控制器 switch (rule.getControlBehavior()) { case RuleConstant.CONTROL_BEHAVIOR_WARM_UP: // 预热模式 controller = new WarmUpController(rule.getCount(), rule.getWarmUpPeriodSec(), 3); break; case RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER: // 匀速排队模式 controller = new RateLimiterController(rule.getMaxQueueingTimeMs(), rule.getCount()); break; case RuleConstant.CONTROL_BEHAVIOR_WARM_UP_RATE_LIMITER: // 预热+匀速排队模式 controller = new WarmUpRateLimiterController(rule.getCount(), rule.getWarmUpPeriodSec(), rule.getMaxQueueingTimeMs(), rule.getColdFactor()); break; default: // 默认快速失败模式 controller = new DefaultController(rule.getCount(), rule.getGrade()); } rule.setRater(controller); } // 检查是否可以通过 return controller.canPass(node, acquireCount, prioritized); } }2.2.4 DegradeSlot:熔断降级实现
java
// DegradeSlot负责熔断降级检查 public class DegradeSlot extends AbstractLinkedProcessorSlot<DefaultNode> { @Override public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable { // 检查熔断规则 performChecking(context, resourceWrapper); // 触发下一个Slot fireEntry(context, resourceWrapper, node, count, prioritized, args); } private void performChecking(Context context, ResourceWrapper resource) throws BlockException { // 获取所有熔断规则 List<CircuitBreaker> circuitBreakers = DegradeRuleManager.getCircuitBreakers(resource.getName()); if (circuitBreakers == null || circuitBreakers.isEmpty()) { return; } for (CircuitBreaker cb : circuitBreakers) { // 检查每个熔断器 if (!cb.tryPass(context)) { throw new DegradeException(cb.getRule().getLimitApp(), cb.getRule()); } } } } // 抽象熔断器 public abstract class AbstractCircuitBreaker implements CircuitBreaker { protected final DegradeRule rule; protected final int recoveryTimeoutMs; // 恢复超时时间 protected volatile long nextRetryTimestamp; // 下次重试时间 // 熔断器状态 protected final AtomicReference<State> state = new AtomicReference<>(State.CLOSED); enum State { CLOSED, // 关闭状态(正常) OPEN, // 打开状态(熔断) HALF_OPEN // 半开状态(探测恢复) } @Override public boolean tryPass(Context context) { // 处理特殊情况 if (currentState() == State.CLOSED) { return true; } if (currentState() == State.OPEN) { // 对于熔断状态,如果已超过恢复时间,则切换到半开状态 if (retryTimeoutArrived()) { return fromOpenToHalfOpen(context); } return false; } // 半开状态 return false; } @Override public void onRequestComplete(Context context) { // 请求完成时的回调,更新统计信息 Entry entry = context.getCurEntry(); if (entry == null) { return; } Throwable error = entry.getError(); // 更新成功/失败计数 // 根据规则判断是否需要熔断 if (currentState() == State.CLOSED) { // 检查是否需要触发熔断 if (shouldPassCheck()) { // 触发熔断 transformToOpen(); } } else if (currentState() == State.HALF_OPEN) { // 半开状态下,根据请求结果决定切换到关闭还是打开状态 if (error == null) { // 请求成功,切换到关闭状态 fromHalfOpenToClose(); } else { // 请求失败,重新切换到打开状态 fromHalfOpenToOpen(); } } } }2.2.5 SystemSlot:系统保护实现
java
// SystemSlot负责系统级别的保护 public class SystemSlot extends AbstractLinkedProcessorSlot<DefaultNode> { private final SystemRuleManager systemRuleManager = SystemRuleManager.getInstance(); @Override public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable { // 检查系统规则 SystemRuleManager.checkSystem(resourceWrapper); // 触发下一个Slot fireEntry(context, resourceWrapper, node, count, prioritized, args); } } // 系统规则管理器 public class SystemRuleManager { private static volatile Map<String, List<SystemRule>> systemRules = new HashMap<>(); private static final SystemStatusListener statusListener = new SystemStatusListener(); // 检查系统规则 public static void checkSystem(ResourceWrapper resourceWrapper) throws BlockException { // 如果系统规则为空,直接返回 if (systemRules == null) { return; } // 检查每个系统规则 for (SystemRule rule : systemRules.values()) { // 根据规则类型进行检查 switch (rule.getGrade()) { case RuleConstant.DEGRADE_GRADE_RT: // 检查平均RT if (rule.getCount() < getCurrentRt()) { throw new SystemBlockException(rule.getLimitApp(), rule); } break; case RuleConstant.DEGRADE_GRADE_THREAD: // 检查线程数 if (rule.getCount() < getCurrentThreadNum()) { throw new SystemBlockException(rule.getLimitApp(), rule); } break; case RuleConstant.DEGRADE_GRADE_EXCEPTION_RATIO: // 检查异常比例 if (rule.getCount() < getCurrentExceptionRatio()) { throw new SystemBlockException(rule.getLimitApp(), rule); } break; case RuleConstant.DEGRADE_GRADE_EXCEPTION_COUNT: // 检查异常数 if (rule.getCount() < getCurrentExceptionCount()) { throw new SystemBlockException(rule.getLimitApp(), rule); } break; } } } // 获取当前平均RT private static double getCurrentRt() { // 从系统指标统计中获取 return ENTRY_NODE.avgRt(); } // 获取当前线程数 private static double getCurrentThreadNum() { // 从系统指标统计中获取 return ENTRY_NODE.curThreadNum(); } }2.3 Slot责任链构建过程
Sentinel通过CtSph类构建Slot责任链:
java
public class CtSph implements Sph { private static final Object lock = new Object(); // Slot责任链 private static volatile ProcessorSlot<DefaultNode> chain; @Override public Entry entry(String name, EntryType type, int count, Object... args) throws BlockException { // 获取或创建Slot责任链 ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper); // 创建上下文 Context context = ContextUtil.getContext(); if (context == null) { context = MyContextUtil.myEnter(Constants.CONTEXT_DEFAULT_NAME, "", type); } // 执行Slot链 try { chain.entry(context, resourceWrapper, null, count, args); } catch (BlockException e1) { throw e1; } catch (Throwable e1) { throw new SystemBlockException(e1.getMessage()); } // 返回Entry return new CtEntry(resourceWrapper, chain, context); } // 查找或创建Slot链 ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) { ProcessorSlotChain chain = chainMap.get(resourceWrapper); if (chain == null) { synchronized (LOCK) { chain = chainMap.get(resourceWrapper); if (chain == null) { // 创建Slot链 chain = SlotChainProvider.newSlotChain(); chainMap.put(resourceWrapper, chain); } } } return chain; } } // Slot链构建器 public class DefaultSlotChainBuilder implements SlotChainBuilder { @Override public ProcessorSlotChain build() { ProcessorSlotChain chain = new DefaultProcessorSlotChain(); // 按照顺序添加Slot chain.addLast(new NodeSelectorSlot()); chain.addLast(new ClusterBuilderSlot()); chain.addLast(new LogSlot()); chain.addLast(new StatisticSlot()); chain.addLast(new AuthoritySlot()); chain.addLast(new SystemSlot()); chain.addLast(new FlowSlot()); chain.addLast(new DegradeSlot()); return chain; } }三、Sentinel性能优化与设计模式
3.1 高性能统计设计
Sentinel使用LongAdder和Striped64实现高性能并发计数:
java
// 使用LongAdder替代AtomicLong提高并发性能 public class MetricBucket { private final LongAdder[] counters; public MetricBucket() { counters = new LongAdder[MetricEvent.values().length]; for (int i = 0; i < counters.length; i++) { counters[i] = new LongAdder(); } } public void add(MetricEvent event, long count) { counters[event.ordinal()].add(count); } }3.2 缓存优化
Sentinel使用缓存机制减少规则查找开销:
java
// 规则缓存 public class FlowRuleUtil { private static final ConcurrentMap<String, List<FlowRule>> flowRules = new ConcurrentHashMap<>(); // 带缓存的规则获取 public static List<FlowRule> getRules(String resourceName) { List<FlowRule> rules = flowRules.get(resourceName); if (rules == null) { synchronized (flowRules) { rules = flowRules.get(resourceName); if (rules == null) { // 从规则管理器获取 rules = FlowRuleManager.getRulesForResource(resourceName); flowRules.put(resourceName, rules); } } } return rules; } }3.3 设计模式应用
3.3.1 责任链模式(Chain of Responsibility)
java
// 抽象的Slot接口 public interface ProcessorSlot<T> { void entry(Context context, ResourceWrapper resourceWrapper, T param, int count, boolean prioritized, Object... args) throws Throwable; void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args); } // Slot链实现 public class DefaultProcessorSlotChain extends ProcessorSlotChain { private AbstractLinkedProcessorSlot<?> first = new AbstractLinkedProcessorSlot<Object>() { @Override public void entry(Context context, ResourceWrapper resourceWrapper, Object param, int count, boolean prioritized, Object... args) throws Throwable { super.fireEntry(context, resourceWrapper, param, count, prioritized, args); } @Override public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) { super.fireExit(context, resourceWrapper, count, args); } }; private AbstractLinkedProcessorSlot<?> end = first; @Override public void addFirst(AbstractLinkedProcessorSlot<?> protocolProcessor) { protocolProcessor.setNext(first.getNext()); first.setNext(protocolProcessor); if (end == first) { end = protocolProcessor; } } @Override public void addLast(AbstractLinkedProcessorSlot<?> protocolProcessor) { end.setNext(protocolProcessor); end = protocolProcessor; } }3.3.2 工厂模式(Factory Pattern)
java
// Slot链工厂 public final class SlotChainProvider { private static volatile SlotChainBuilder slotChainBuilder = null; public static ProcessorSlotChain newSlotChain() { if (slotChainBuilder == null) { synchronized (SlotChainProvider.class) { if (slotChainBuilder == null) { // 使用SPI机制加载SlotChainBuilder slotChainBuilder = new DefaultSlotChainBuilder(); } } } return slotChainBuilder.build(); } }3.3.3 策略模式(Strategy Pattern)
java
// 流量控制策略接口 public interface TrafficShapingController { boolean canPass(Node node, int acquireCount, boolean prioritized); } // 不同的策略实现 class DefaultController implements TrafficShapingController { /* 快速失败 */ } class WarmUpController implements TrafficShapingController { /* 预热 */ } class RateLimiterController implements TrafficShapingController { /* 匀速排队 */ }四、总结与最佳实践
4.1 Sentinel核心设计要点
模块化设计:通过Slot责任链实现功能解耦,每个Slot职责单一
高性能统计:使用LongAdder和滑动窗口实现高并发下的精确统计
灵活的规则引擎:支持多种限流算法和熔断策略
可扩展架构:通过SPI机制支持自定义扩展
4.2 限流算法选择建议
| 场景 | 推荐算法 | 理由 |
|---|---|---|
| API网关入口限流 | 令牌桶算法 | 允许突发流量,适合网关场景 |
| 内部服务保护 | 滑动窗口 | 统计精确,避免临界问题 |
| 数据库访问限流 | 漏桶算法 | 恒定速率,保护数据库 |
| 秒杀场景 | 预热+令牌桶 | 冷启动保护+允许合理突发 |
4.3 性能调优建议
合理设置滑动窗口参数:
java
// 窗口越小统计越精确,但内存消耗越大 // 推荐:500ms窗口,2个窗口组成1秒统计 LeapArray<MetricBucket> data = new BucketLeapArray(2, 1000);
监控关键指标:
java
// 监控内存使用 Runtime runtime = Runtime.getRuntime(); long usedMemory = runtime.totalMemory() - runtime.freeMemory(); // 监控规则数量 int ruleCount = FlowRuleManager.getRules().size();
合理配置JVM参数:
bash
# Sentinel对内存和GC比较敏感 -Xms2g -Xmx2g # 堆内存 -XX:+UseG1GC # 使用G1垃圾收集器 -XX:MaxGCPauseMillis=100 # 最大GC停顿时间
4.4 生产环境部署建议
Sentinel Dashboard高可用:
bash
# 多节点部署,使用Nginx负载均衡 upstream sentinel_dashboard { server 192.168.1.101:8080; server 192.168.1.102:8080; server 192.168.1.103:8080; }客户端配置优化:
yaml
spring: cloud: sentinel: transport: dashboard: sentinel-dashboard.com:8080 port: 8719 client-ip: ${spring.cloud.client.ip-address} eager: true # 立即初始化 log: dir: /var/log/sentinel规则持久化方案:
java
// 使用Nacos作为规则配置中心 @Configuration public class SentinelRuleConfig { @PostConstruct public void initRules() { // 从Nacos加载规则 List<FlowRule> rules = loadRulesFromNacos(); FlowRuleManager.loadRules(rules); } }