你的 AI 应用生成速度很快,但用户盯着空白屏幕等了 8 秒才看到第一个字——这不是模型慢,是你没开流式输出。这篇文章把流式接入的坑全踩一遍,让你少走弯路。
为什么要用流式输出?
对话模型生成文本是逐 token 输出的,默认的非流式接口会等模型把整段回复都生成完,才一次性返回给你。生成 500 个 token 的回复,你就要等 5-10 秒的白屏。
流式输出(Streaming)的改变是:模型生成一个 token,立刻发给你一个,前端看到文字一个字一个字地蹦出来,这就是 ChatGPT 那种打字机效果。
核心收益:
- 首 token 延迟从 5-10 秒降到 0.5-1 秒:用户几乎立刻看到响应,心理等待感大幅降低
- 长文本体验完全不同:2000 字的文章,流式是"看着它写出来",非流式是"等 20 秒然后突然全出来"
- 提前中止:用户可以看到方向不对就立刻停止,不用等模型把无用内容全部生成完
SSE 协议基础
流式输出底层用的是 SSE(Server-Sent Events),这是一个基于 HTTP 的单向推送协议,比 WebSocket 轻得多,用普通 GET/POST 请求就能建立持久连接。
响应格式长这样:
HTTP/1.1 200 OK Content-Type: text/event-stream Cache-Control: no-cache data: {"id":"chatcmpl-xxx","choices":[{"delta":{"content":"你"},"index":0}]} data: {"id":"chatcmpl-xxx","choices":[{"delta":{"content":"好"},"index":0}]} data: {"id":"chatcmpl-xxx","choices":[{"delta":{"content":"!"},"index":0}]} data: [DONE]几个关键点:
data:前缀:每一行数据都以data:开头(注意有个空格)- 空行分隔:每条消息后面跟一个空行(
\n\n) [DONE]结束标记:流结束时服务端发一条data: [DONE],客户端收到就知道该收工了Content-Type: text/event-stream:响应头必须是这个,否则浏览器不会识别为 SSE 流
Python 流式调用
用openai库,只需加一个stream=True:
fromopenaiimportOpenAI client=OpenAI(base_url="https://api.therouter.ai/v1",api_key="你的 API Key",)stream=client.chat.completions.create(model="anthropic/claude-sonnet-4",messages=[{"role":"user","content":"写一篇 500 字的短文,介绍量子计算的基本原理"}],max_tokens=1024,stream=True,# 就这一行)forchunkinstream:delta=chunk.choices[0].deltaifdelta.content:print(delta.content,end="",flush=True)print()# 最后换行flush=True很重要——Python 的 stdout 有缓冲,不加这个,你会看到文字攒够一批才一起输出,流式效果打折。
带异常处理的完整版
fromopenaiimportOpenAI,APIStatusError,APIConnectionErrorimporttime client=OpenAI(base_url="https://api.therouter.ai/v1",api_key="你的 API Key",)defstream_with_retry(messages,model="anthropic/claude-sonnet-4",max_retries=3):forattemptinrange(max_retries):try:stream=client.chat.completions.create(model=model,messages=messages,max_tokens=2048,stream=True,)full_content=""forchunkinstream:delta=chunk.choices[0].deltaifdelta.content:print(delta.content,end="",flush=True)full_content+=delta.contentprint()returnfull_contentexceptAPIConnectionErrorase:# 网络断了,等一下重试wait=2**attemptprint(f"\n[连接中断,{wait}s 后重试 ({attempt+1}/{max_retries})]")time.sleep(wait)exceptAPIStatusErrorase:ife.status_codein(429,500,502,503):# 限流或服务端临时错误,退避重试wait=2**attemptprint(f"\n[{e.status_code}错误,{wait}s 后重试]")time.sleep(wait)else:# 4xx 客户端错误,不重试raiseraiseRuntimeError(f"超过最大重试次数{max_retries}")stream_with_retry([{"role":"user","content":"介绍一下 Rust 的所有权模型"}])Node.js 流式调用
importOpenAIfrom"openai";constclient=newOpenAI({baseURL:"https://api.therouter.ai/v1",apiKey:"你的 API Key",});asyncfunctionstreamChat(prompt:string){conststream=awaitclient.chat.completions.create({model:"anthropic/claude-sonnet-4",messages:[{role:"user",content:prompt}],max_tokens:1024,stream:true,});letfullContent="";forawait(constchunkofstream){constcontent=chunk.choices[0]?.delta?.content??"";if(content){process.stdout.write(content);fullContent+=content;}// 检查结束原因constfinishReason=chunk.choices[0]?.finish_reason;if(finishReason==="length"){console.warn("\n[警告] 输出因 max_tokens 限制被截断");}}console.log();returnfullContent;}streamChat("用 Node.js 实现一个简单的事件总线");前端接收 SSE:fetch + ReadableStream
浏览器端不要用EventSource——它只支持 GET 请求,而 AI API 用 POST。用fetch+ReadableStream来手动解析:
asyncfunctionstreamToUI(prompt:string,onToken:(token:string)=>void){constresponse=awaitfetch("https://api.therouter.ai/v1/chat/completions",{method:"POST",headers:{"Content-Type":"application/json",Authorization:`Bearer${API_KEY}`,},body:JSON.stringify({model:"anthropic/claude-sonnet-4",messages:[{role:"user",content:prompt}],stream:true,}),});if(!response.ok){consterr=awaitresponse.json();thrownewError(`API 错误${response.status}:${err.error?.message}`);}constreader=response.body!.getReader();constdecoder=newTextDecoder();letbuffer="";while(true){const{done,value}=awaitreader.read();if(done)break;// 将新数据追加到缓冲区(chunk 可能在消息边界被截断)buffer+=decoder.decode(value,{stream:true});// 按行解析constlines=buffer.split("\n");// 最后一行可能不完整,留到下次处理buffer=lines.pop()??"";for(constlineoflines){consttrimmed=line.trim();if(!trimmed||!trimmed.startsWith("data:"))continue;constdata=trimmed.slice(5).trim();if(data==="[DONE]")return;try{constparsed=JSON.parse(data);consttoken=parsed.choices?.[0]?.delta?.content;if(token)onToken(token);}catch{// 忽略解析失败的行(心跳包等)}}}}// React 中使用:// streamToUI(prompt, (token) => setContent(prev => prev + token))这里有两个细节值得注意:
- buffer 机制:网络传输不保证一条 SSE 消息整块到达,
chunk可能在data:和 JSON 之间被截断。必须用 buffer 拼接后再按\n切割。 TextDecoder的stream: true:告诉解码器这不是最后一块,遇到多字节 UTF-8 字符(比如中文)在边界被截断时不会乱码。
常见问题排查
问题一:断流——生成到一半就停了
现象:模型开始输出了,但在某个位置突然没了,[DONE]也没来。
原因:
- 代理/VPN 超时:很多代理默认 60 秒超时,AI 生成长文本可能超过这个时间
- Nginx 反代超时:默认
proxy_read_timeout 60s,流式请求必须调大 - 客户端代码没处理
done: true:reader.read()正常结束但你的循环没退出
Nginx 配置修复:
location /v1/ { proxy_pass http://backend; proxy_read_timeout 300s; # 调大到 5 分钟 proxy_buffering off; # 关闭缓冲,否则不是真流式 proxy_cache off; # SSE 必需的响应头 proxy_set_header Connection ''; proxy_http_version 1.1; chunked_transfer_encoding on; }问题二:HTTP 200 开始,但中途收到错误
现象:流正常开始,但某个 chunk 里收到的不是 token 而是错误信息。
这是流式接口的一个特殊情况:SSE 连接建立成功(200),但在流传输过程中模型服务出错了,错误会被编码进流里发过来,而不是通过 HTTP 状态码。
典型格式:
data: {"error":{"message":"Rate limit exceeded","type":"rate_limit_error","code":429}} data: [DONE]处理方式:
forchunkinstream:# 检查 chunk 里是否有错误字段ifhasattr(chunk,'error')andchunk.error:raiseException(f"流中错误:{chunk.error}")delta=chunk.choices[0].delta# 正常处理...前端版本:
constparsed=JSON.parse(data);// 先检查是否是错误包if(parsed.error){thrownewError(`流错误:${parsed.error.message}`);}consttoken=parsed.choices?.[0]?.delta?.content;问题三:Nginx / Cloudflare 缓冲,“一次性输出”
现象:后端明明在流式输出,前端却看到等了很久然后全部内容一次性出现——和非流式效果一样。
根因:中间层缓冲了响应,等缓冲区满了才一次性刷出。
Nginx 端:如上面配置,proxy_buffering off是关键。
Cloudflare 端:在 Cloudflare 控制台把对应路由的Response Buffering关掉;或者在响应头加X-Accel-Buffering: no(Nginx 识别这个头也会关闭缓冲)。
在代码里设置响应头:
# FastAPI/后端示例fromfastapi.responsesimportStreamingResponseasyncdefstream_endpoint():returnStreamingResponse(generate_stream(),media_type="text/event-stream",headers={"X-Accel-Buffering":"no",# 告诉 Nginx 不缓冲"Cache-Control":"no-cache",})问题四:finish_reason: length,内容被截断
模型还没写完,但达到了max_tokens上限,流就结束了,最后一个 chunk 的finish_reason是"length"而不是"stop"。
务必在代码里检测并提示用户,或者自动增加max_tokens重试。
重试策略
流式请求的重试比普通请求复杂,因为你不知道已经接收了多少内容:
简单场景:全量重试
适合生成内容较短(< 500 tokens)或不需要保留已生成内容时:
defretry_stream(messages,max_retries=3):foriinrange(max_retries):try:returndo_stream(messages)except(ConnectionError,TimeoutError)ase:ifi==max_retries-1:raisetime.sleep(2**i)# 指数退避:1s, 2s, 4s复杂场景:断点续传思路
如果已经流出了一半内容,重试时把已收到的内容作为上下文传回去,让模型从断点继续:
defstream_with_resume(user_message,max_retries=3):accumulated=""forattemptinrange(max_retries):try:ifaccumulated:# 把已有内容作为 assistant 消息,让模型续写messages=[{"role":"user","content":user_message},{"role":"assistant","content":accumulated+"(以下继续)\n"},{"role":"user","content":"请继续"},]else:messages=[{"role":"user","content":user_message}]stream=client.chat.completions.create(model="anthropic/claude-sonnet-4",messages=messages,stream=True,)forchunkinstream:content=chunk.choices[0].delta.contentor""accumulated+=contentyieldcontentreturn# 正常结束exceptExceptionase:ifattempt<max_retries-1:time.sleep(2**attempt)else:raise注意:续写效果依赖模型理解上下文,不是所有场景都适用。对于需要完整性的内容(代码、JSON),续写后可能需要人工拼接验证。
TheRouter 的流式稳定性机制
直接打 Anthropic / OpenAI 官方 API,流式响应有几个不稳定点:网络抖动断流、上游过载导致流中断、长文本生成超时。
TheRouter 在网关层做了几件事来保证稳定性:
- 连接保活:网关和上游之间维持长连接,减少建立连接的开销
- 上游切换:当主用上游不稳定时,自动路由到备用通道,对客户端透明
- 超时宽松:流式请求的超时配置远高于普通请求,避免长文本生成被误杀
- 心跳注释行:对于生成速度慢的模型,网关每隔 15 秒发一个
: keepalive注释行,防止代理层因为无数据而关闭连接(注释行不会被 SSE 解析器当作数据处理)
调用方式和普通请求完全一样,加stream=True即可:
client=OpenAI(base_url="https://api.therouter.ai/v1",api_key="你的 TheRouter Key",)stream=client.chat.completions.create(model="openai/gpt-4o",# 换模型也是一行的事messages=[...],stream=True,)小结
| 问题 | 原因 | 修复 |
|---|---|---|
| 断流 | 代理/Nginx 超时 | proxy_read_timeout 300s |
| 一次性输出 | 响应缓冲 | proxy_buffering off+X-Accel-Buffering: no |
| 流中错误 | 200 后出错 | 解析每个 chunk 检查error字段 |
| 内容截断 | max_tokens 不够 | 检查finish_reason === "length" |
| chunk 解析乱码 | 多字节字符被截断 | TextDecoder加{ stream: true } |
流式输出看起来只是加了一个参数,但落地到生产环境,中间件配置、错误处理、重试逻辑一个都不能少。希望这篇文章能帮你把这些坑一次性都填掉。