news 2026/5/24 6:17:24

流式响应 (SSE) 的坑与最佳实践:处理断流、重试和错误码

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
流式响应 (SSE) 的坑与最佳实践:处理断流、重试和错误码

你的 AI 应用生成速度很快,但用户盯着空白屏幕等了 8 秒才看到第一个字——这不是模型慢,是你没开流式输出。这篇文章把流式接入的坑全踩一遍,让你少走弯路。

为什么要用流式输出?

对话模型生成文本是逐 token 输出的,默认的非流式接口会等模型把整段回复都生成完,才一次性返回给你。生成 500 个 token 的回复,你就要等 5-10 秒的白屏。

流式输出(Streaming)的改变是:模型生成一个 token,立刻发给你一个,前端看到文字一个字一个字地蹦出来,这就是 ChatGPT 那种打字机效果。

核心收益:

  • 首 token 延迟从 5-10 秒降到 0.5-1 秒:用户几乎立刻看到响应,心理等待感大幅降低
  • 长文本体验完全不同:2000 字的文章,流式是"看着它写出来",非流式是"等 20 秒然后突然全出来"
  • 提前中止:用户可以看到方向不对就立刻停止,不用等模型把无用内容全部生成完

SSE 协议基础

流式输出底层用的是 SSE(Server-Sent Events),这是一个基于 HTTP 的单向推送协议,比 WebSocket 轻得多,用普通 GET/POST 请求就能建立持久连接。

响应格式长这样:

HTTP/1.1 200 OK Content-Type: text/event-stream Cache-Control: no-cache data: {"id":"chatcmpl-xxx","choices":[{"delta":{"content":"你"},"index":0}]} data: {"id":"chatcmpl-xxx","choices":[{"delta":{"content":"好"},"index":0}]} data: {"id":"chatcmpl-xxx","choices":[{"delta":{"content":"!"},"index":0}]} data: [DONE]

几个关键点:

  • data:前缀:每一行数据都以data:开头(注意有个空格)
  • 空行分隔:每条消息后面跟一个空行(\n\n
  • [DONE]结束标记:流结束时服务端发一条data: [DONE],客户端收到就知道该收工了
  • Content-Type: text/event-stream:响应头必须是这个,否则浏览器不会识别为 SSE 流

Python 流式调用

openai库,只需加一个stream=True

fromopenaiimportOpenAI client=OpenAI(base_url="https://api.therouter.ai/v1",api_key="你的 API Key",)stream=client.chat.completions.create(model="anthropic/claude-sonnet-4",messages=[{"role":"user","content":"写一篇 500 字的短文,介绍量子计算的基本原理"}],max_tokens=1024,stream=True,# 就这一行)forchunkinstream:delta=chunk.choices[0].deltaifdelta.content:print(delta.content,end="",flush=True)print()# 最后换行

flush=True很重要——Python 的 stdout 有缓冲,不加这个,你会看到文字攒够一批才一起输出,流式效果打折。

带异常处理的完整版

fromopenaiimportOpenAI,APIStatusError,APIConnectionErrorimporttime client=OpenAI(base_url="https://api.therouter.ai/v1",api_key="你的 API Key",)defstream_with_retry(messages,model="anthropic/claude-sonnet-4",max_retries=3):forattemptinrange(max_retries):try:stream=client.chat.completions.create(model=model,messages=messages,max_tokens=2048,stream=True,)full_content=""forchunkinstream:delta=chunk.choices[0].deltaifdelta.content:print(delta.content,end="",flush=True)full_content+=delta.contentprint()returnfull_contentexceptAPIConnectionErrorase:# 网络断了,等一下重试wait=2**attemptprint(f"\n[连接中断,{wait}s 后重试 ({attempt+1}/{max_retries})]")time.sleep(wait)exceptAPIStatusErrorase:ife.status_codein(429,500,502,503):# 限流或服务端临时错误,退避重试wait=2**attemptprint(f"\n[{e.status_code}错误,{wait}s 后重试]")time.sleep(wait)else:# 4xx 客户端错误,不重试raiseraiseRuntimeError(f"超过最大重试次数{max_retries}")stream_with_retry([{"role":"user","content":"介绍一下 Rust 的所有权模型"}])

Node.js 流式调用

importOpenAIfrom"openai";constclient=newOpenAI({baseURL:"https://api.therouter.ai/v1",apiKey:"你的 API Key",});asyncfunctionstreamChat(prompt:string){conststream=awaitclient.chat.completions.create({model:"anthropic/claude-sonnet-4",messages:[{role:"user",content:prompt}],max_tokens:1024,stream:true,});letfullContent="";forawait(constchunkofstream){constcontent=chunk.choices[0]?.delta?.content??"";if(content){process.stdout.write(content);fullContent+=content;}// 检查结束原因constfinishReason=chunk.choices[0]?.finish_reason;if(finishReason==="length"){console.warn("\n[警告] 输出因 max_tokens 限制被截断");}}console.log();returnfullContent;}streamChat("用 Node.js 实现一个简单的事件总线");

前端接收 SSE:fetch + ReadableStream

浏览器端不要用EventSource——它只支持 GET 请求,而 AI API 用 POST。用fetch+ReadableStream来手动解析:

asyncfunctionstreamToUI(prompt:string,onToken:(token:string)=>void){constresponse=awaitfetch("https://api.therouter.ai/v1/chat/completions",{method:"POST",headers:{"Content-Type":"application/json",Authorization:`Bearer${API_KEY}`,},body:JSON.stringify({model:"anthropic/claude-sonnet-4",messages:[{role:"user",content:prompt}],stream:true,}),});if(!response.ok){consterr=awaitresponse.json();thrownewError(`API 错误${response.status}:${err.error?.message}`);}constreader=response.body!.getReader();constdecoder=newTextDecoder();letbuffer="";while(true){const{done,value}=awaitreader.read();if(done)break;// 将新数据追加到缓冲区(chunk 可能在消息边界被截断)buffer+=decoder.decode(value,{stream:true});// 按行解析constlines=buffer.split("\n");// 最后一行可能不完整,留到下次处理buffer=lines.pop()??"";for(constlineoflines){consttrimmed=line.trim();if(!trimmed||!trimmed.startsWith("data:"))continue;constdata=trimmed.slice(5).trim();if(data==="[DONE]")return;try{constparsed=JSON.parse(data);consttoken=parsed.choices?.[0]?.delta?.content;if(token)onToken(token);}catch{// 忽略解析失败的行(心跳包等)}}}}// React 中使用:// streamToUI(prompt, (token) => setContent(prev => prev + token))

这里有两个细节值得注意:

  1. buffer 机制:网络传输不保证一条 SSE 消息整块到达,chunk可能在data:和 JSON 之间被截断。必须用 buffer 拼接后再按\n切割。
  2. TextDecoderstream: true:告诉解码器这不是最后一块,遇到多字节 UTF-8 字符(比如中文)在边界被截断时不会乱码。

常见问题排查

问题一:断流——生成到一半就停了

现象:模型开始输出了,但在某个位置突然没了,[DONE]也没来。

原因

  • 代理/VPN 超时:很多代理默认 60 秒超时,AI 生成长文本可能超过这个时间
  • Nginx 反代超时:默认proxy_read_timeout 60s,流式请求必须调大
  • 客户端代码没处理done: truereader.read()正常结束但你的循环没退出

Nginx 配置修复

location /v1/ { proxy_pass http://backend; proxy_read_timeout 300s; # 调大到 5 分钟 proxy_buffering off; # 关闭缓冲,否则不是真流式 proxy_cache off; # SSE 必需的响应头 proxy_set_header Connection ''; proxy_http_version 1.1; chunked_transfer_encoding on; }

问题二:HTTP 200 开始,但中途收到错误

现象:流正常开始,但某个 chunk 里收到的不是 token 而是错误信息。

这是流式接口的一个特殊情况:SSE 连接建立成功(200),但在流传输过程中模型服务出错了,错误会被编码进流里发过来,而不是通过 HTTP 状态码。

典型格式:

data: {"error":{"message":"Rate limit exceeded","type":"rate_limit_error","code":429}} data: [DONE]

处理方式:

forchunkinstream:# 检查 chunk 里是否有错误字段ifhasattr(chunk,'error')andchunk.error:raiseException(f"流中错误:{chunk.error}")delta=chunk.choices[0].delta# 正常处理...

前端版本:

constparsed=JSON.parse(data);// 先检查是否是错误包if(parsed.error){thrownewError(`流错误:${parsed.error.message}`);}consttoken=parsed.choices?.[0]?.delta?.content;

问题三:Nginx / Cloudflare 缓冲,“一次性输出”

现象:后端明明在流式输出,前端却看到等了很久然后全部内容一次性出现——和非流式效果一样。

根因:中间层缓冲了响应,等缓冲区满了才一次性刷出。

Nginx 端:如上面配置,proxy_buffering off是关键。

Cloudflare 端:在 Cloudflare 控制台把对应路由的Response Buffering关掉;或者在响应头加X-Accel-Buffering: no(Nginx 识别这个头也会关闭缓冲)。

在代码里设置响应头:

# FastAPI/后端示例fromfastapi.responsesimportStreamingResponseasyncdefstream_endpoint():returnStreamingResponse(generate_stream(),media_type="text/event-stream",headers={"X-Accel-Buffering":"no",# 告诉 Nginx 不缓冲"Cache-Control":"no-cache",})

问题四:finish_reason: length,内容被截断

模型还没写完,但达到了max_tokens上限,流就结束了,最后一个 chunk 的finish_reason"length"而不是"stop"

务必在代码里检测并提示用户,或者自动增加max_tokens重试。


重试策略

流式请求的重试比普通请求复杂,因为你不知道已经接收了多少内容:

简单场景:全量重试

适合生成内容较短(< 500 tokens)或不需要保留已生成内容时:

defretry_stream(messages,max_retries=3):foriinrange(max_retries):try:returndo_stream(messages)except(ConnectionError,TimeoutError)ase:ifi==max_retries-1:raisetime.sleep(2**i)# 指数退避:1s, 2s, 4s

复杂场景:断点续传思路

如果已经流出了一半内容,重试时把已收到的内容作为上下文传回去,让模型从断点继续:

defstream_with_resume(user_message,max_retries=3):accumulated=""forattemptinrange(max_retries):try:ifaccumulated:# 把已有内容作为 assistant 消息,让模型续写messages=[{"role":"user","content":user_message},{"role":"assistant","content":accumulated+"(以下继续)\n"},{"role":"user","content":"请继续"},]else:messages=[{"role":"user","content":user_message}]stream=client.chat.completions.create(model="anthropic/claude-sonnet-4",messages=messages,stream=True,)forchunkinstream:content=chunk.choices[0].delta.contentor""accumulated+=contentyieldcontentreturn# 正常结束exceptExceptionase:ifattempt<max_retries-1:time.sleep(2**attempt)else:raise

注意:续写效果依赖模型理解上下文,不是所有场景都适用。对于需要完整性的内容(代码、JSON),续写后可能需要人工拼接验证。


TheRouter 的流式稳定性机制

直接打 Anthropic / OpenAI 官方 API,流式响应有几个不稳定点:网络抖动断流、上游过载导致流中断、长文本生成超时。

TheRouter 在网关层做了几件事来保证稳定性:

  • 连接保活:网关和上游之间维持长连接,减少建立连接的开销
  • 上游切换:当主用上游不稳定时,自动路由到备用通道,对客户端透明
  • 超时宽松:流式请求的超时配置远高于普通请求,避免长文本生成被误杀
  • 心跳注释行:对于生成速度慢的模型,网关每隔 15 秒发一个: keepalive注释行,防止代理层因为无数据而关闭连接(注释行不会被 SSE 解析器当作数据处理)

调用方式和普通请求完全一样,加stream=True即可:

client=OpenAI(base_url="https://api.therouter.ai/v1",api_key="你的 TheRouter Key",)stream=client.chat.completions.create(model="openai/gpt-4o",# 换模型也是一行的事messages=[...],stream=True,)

小结

问题原因修复
断流代理/Nginx 超时proxy_read_timeout 300s
一次性输出响应缓冲proxy_buffering off+X-Accel-Buffering: no
流中错误200 后出错解析每个 chunk 检查error字段
内容截断max_tokens 不够检查finish_reason === "length"
chunk 解析乱码多字节字符被截断TextDecoder{ stream: true }

流式输出看起来只是加了一个参数,但落地到生产环境,中间件配置、错误处理、重试逻辑一个都不能少。希望这篇文章能帮你把这些坑一次性都填掉。

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/5/24 6:17:09

一键搞定完整网页截图:Chrome扩展终极解决方案

一键搞定完整网页截图&#xff1a;Chrome扩展终极解决方案 【免费下载链接】full-page-screen-capture-chrome-extension One-click full page screen captures in Google Chrome 项目地址: https://gitcode.com/gh_mirrors/fu/full-page-screen-capture-chrome-extension …

作者头像 李华
网站建设 2026/5/24 6:17:09

WarcraftHelper:全面优化魔兽争霸III体验的实用工具

WarcraftHelper&#xff1a;全面优化魔兽争霸III体验的实用工具 【免费下载链接】WarcraftHelper Warcraft III Helper , support 1.20e, 1.24e, 1.26a, 1.27a, 1.27b 项目地址: https://gitcode.com/gh_mirrors/wa/WarcraftHelper WarcraftHelper是一款专为魔兽争霸III…

作者头像 李华
网站建设 2026/5/23 1:36:32

化疗对女性的杀伤力

化疗人群普遍存在体质虚弱、免疫力下降、食欲差、体力透支、肠胃不适等问题&#xff0c;白之品海参凭借其天然温和的营养特点&#xff0c;能在多个方面提供针对性支持&#xff1a;温和滋补&#xff0c;不刺激肠胃海参性平温和&#xff0c;不燥热、不寒凉&#xff0c;对化疗后敏…

作者头像 李华
网站建设 2026/5/23 1:36:30

告别重复造轮子:用快马AI高效生成openclaw启动项目核心工具模块

最近在开发一个机械爪控制项目&#xff08;代号openclaw&#xff09;时&#xff0c;发现每次切换硬件或调整动作流程都要重写大量底层代码。经过摸索&#xff0c;我总结出一套用Python构建高效开发工具集的方法&#xff0c;特别适合需要快速迭代的硬件控制类项目。这个方案最棒…

作者头像 李华
网站建设 2026/5/23 1:36:30

文沥:以经销商数据管理平台,赋能高露洁渠道数字化腾飞

在当今竞争激烈且瞬息万变的全球商业格局中&#xff0c;快速消费品行业正经历着前所未有的变革。对于众多企业而言&#xff0c;如何精准掌控渠道和终端&#xff0c;实现营销管理的数字化升级&#xff0c;已成为在市场中脱颖而出、保持持续竞争力的关键所在。文沥&#xff0c;作…

作者头像 李华
网站建设 2026/5/23 1:36:31

OpenMS终极指南:如何快速掌握专业质谱数据分析的完整方案

OpenMS终极指南&#xff1a;如何快速掌握专业质谱数据分析的完整方案 【免费下载链接】OpenMS The codebase of the OpenMS project 项目地址: https://gitcode.com/gh_mirrors/op/OpenMS 蛋白质组学、代谢组学、质谱数据分析、OpenMS开源平台、生物信息学工具 在生命科…

作者头像 李华