SseEmitter是 Spring MVC 提供的一个类,用于实现服务器向客户端的实时推送(Server-Sent Events,简称 SSE)。
一、核心概念
SSE(Server-Sent Events)是一种基于 HTTP 的单向通信机制:
- 服务器 → 浏览器(单向)
- 浏览器通过
EventSource建立长连接 - 服务器可以持续向客户端推送数据(流式返回)
二、SseEmitter 是干什么的?
简单说:
SseEmitter用于在 Spring 后端持续向前端发送“流式数据”。
典型使用场景:
- AI 对话流式输出(类似 ChatGPT)
- 实时日志推送
- 进度更新
- 消息通知
三、代码里的作用解析
你截图中的代码:
publicSseEmitterchatStream(...)说明:
- 这个接口返回的不是普通 JSON
- 而是一个流式响应(text/event-stream)
- 客户端会一直保持连接,不断接收数据
四、基本用法
1️⃣ 创建 emitter
SseEmitteremitter=newSseEmitter(30000L);// 超时时间2️⃣ 发送数据
emitter.send("hello");emitter.send(SseEmitter.event().data("流式数据"));3️⃣ 结束流
emitter.complete();4️⃣ 异常处理
emitter.completeWithError(e);五、完整示例(典型流式接口)
@GetMapping("/stream")publicSseEmitterstream(){SseEmitteremitter=newSseEmitter();newThread(()->{try{for(inti=0;i<5;i++){emitter.send("data: "+i);Thread.sleep(1000);}emitter.complete();}catch(Exceptione){emitter.completeWithError(e);}}).start();returnemitter;}六、前端如何接收?
constsource=newEventSource('/stream');source.onmessage=function(event){console.log(event.data);};七、和 WebSocket 的区别
| 特性 | SSE (SseEmitter) | WebSocket |
|---|---|---|
| 通信方向 | 单向(服务器→客户端) | 双向 |
| 协议 | HTTP | WS |
| 实现难度 | 简单 | 较复杂 |
| 适合场景 | 流式输出、通知 | 聊天、游戏 |
八、你这段代码的实际用途
你这个方法名:
chatStream结合SseEmitter,非常典型:
👉用于实现 AI 对话的流式返回(像 ChatGPT 一样一边生成一边输出)
九、常见坑(实战经验)
必须异步线程发送数据
- 否则会阻塞请求
要处理超时
emitter.onTimeout(()->emitter.complete());Nginx 需要关闭缓冲
proxy_buffering off;连接数要控制
- 每个用户一个长连接
总结一句话
SseEmitter= Spring 里实现“流式返回 / 实时推送”的核心工具(基于 SSE 协议)
例子
我给你一个**贴近你截图场景(AI 流式输出)**的完整例子,从后端到前端一步讲清楚。
🎯 场景
做一个类似 ChatGPT 的接口:
👉 用户输入:“你好”
👉 页面逐字输出:“你好呀,很高兴见到你…”
🧩 一、后端(Spring Boot + SseEmitter)
1️⃣ Controller
@RestController@RequestMapping("/api/llm")publicclassChatController{@GetMapping("/chat/stream")publicSseEmitterstreamChat(@RequestParamStringmessage){SseEmitteremitter=newSseEmitter(0L);// 不超时newThread(()->{try{// 模拟 AI 一点点返回String[]tokens={"你好","呀,","很高兴","见到你","!"};for(Stringtoken:tokens){Thread.sleep(500);// 模拟生成延迟emitter.send(token);// 👈 核心:推送数据}emitter.send("[DONE]");emitter.complete();// 👈 结束}catch(Exceptione){emitter.completeWithError(e);}}).start();returnemitter;}}🧠 这一段在干嘛?
👉 本质就是:
服务器每 0.5 秒说一句话等价于:
你好 呀, 很高兴 见到你 ! [DONE]🌐 二、前端(接收流)
方法1:用 fetch(和你图里一样)
asyncfunctionstreamChat(){constresponse=awaitfetch('/api/llm/chat/stream?message=你好');constreader=response.body.getReader();constdecoder=newTextDecoder();letresult="";while(true){const{done,value}=awaitreader.read();if(done)break;constchunk=decoder.decode(value);console.log("收到:",chunk);result+=chunk;// 👉 更新页面document.getElementById("output").innerText=result;}}页面效果(真实体验)
第1秒:你好 第2秒:你好呀, 第3秒:你好呀,很高兴 第4秒:你好呀,很高兴见到你 第5秒:你好呀,很高兴见到你!👉 这就是“流式输出”
🔁 三、对比普通接口
❌ 普通接口
等待 5 秒 一次性返回:你好呀,很高兴见到你!✅ SSE(SseEmitter)
边生成边返回: 你好 → 你好呀 → 你好呀,很高兴 → ...👉 用户体验:像 AI 在实时“打字”
🔧 四、你截图里的真实对应关系
你图里的代码可以映射成这样:
| 你图里的东西 | 这个例子里 |
|---|---|
SseEmitter | emitter |
emitter.send(...) | 推 token |
streamingModel.chat() | 模拟 tokens 数组 |
onPartialResponse | for 循环 |
[DONE] | 结束标志 |
| 前端 fetch | reader.read() |
⚠️ 五、实际项目注意点(很关键)
1️⃣ 一定要处理超时
newSseEmitter(0L);// 或设置长时间2️⃣ 要有结束标志
emitter.send("[DONE]");3️⃣ 异常要关闭连接
emitter.completeWithError(e);4️⃣ 不能阻塞主线程
👉 必须用线程 / 异步(你图里也是这样)
✅ 一句话总结
👉 这个例子本质就是:
后端每生成一点内容 → 立刻用 SseEmitter 推给前端 → 前端边收边显示
如果你下一步想更贴近你项目(比如 LangChain4j + OpenAI streaming 那种),我可以给你一个完全对标你截图代码的真实企业级写法。
实战
这段代码的作用可以概括成一句话:
它实现了一个“AI 聊天流式接口”
前端调用/chat/stream后,后端会把大模型生成的内容一边生成、一边推送给前端,而不是等全部生成完再一次性返回。
我按“整体功能 → 执行流程 → 每段代码作用”给你拆开讲。
一、这个接口是干什么的
@PostMapping(value="/chat/stream",produces=MediaType.TEXT_EVENT_STREAM_VALUE)publicSseEmitterchatStream(@RequestBodyMap<String,Object>request)这行的含义是:
- 这是一个POST 接口
- 路径是
/chat/stream - 返回类型是
text/event-stream - 返回值是
SseEmitter
通俗讲:
👉 前端发来一条聊天请求
👉 后端调用大模型
👉 大模型每吐出一点内容,就立刻发给前端
👉 前端就能看到“打字机效果”
这就是典型的SSE 流式聊天接口。
二、这个接口最终实现了哪些功能
它实际做了 6 件事:
- 拿到当前登录用户
- 读取前端传来的提问内容
- 检查用户有没有配置大模型 API Key
- 把用户消息先保存到数据库
- 调用大模型进行流式输出
- 把模型回复实时推给前端,最后再保存完整回复
三、整体执行流程
你可以把它理解成下面这条链路:
前端发起聊天请求 → 后端取用户信息和参数 → 检查 API Key → 保存用户消息 → 调用大模型流式生成 → 每生成一个 token 就发给前端 → 全部生成完后保存 assistant 消息 → 给前端发 [DONE] → 结束连接四、逐段解释代码作用
1)定义接口:返回 SSE 流
@PostMapping(value="/chat/stream",produces=MediaType.TEXT_EVENT_STREAM_VALUE)publicSseEmitterchatStream(@RequestBodyMap<String,Object>request)作用
这是一个聊天流式接口。
关键点
produces = MediaType.TEXT_EVENT_STREAM_VALUE
表示这个接口返回的不是普通 JSON,而是:
text/event-stream也就是 SSE 格式。
通俗理解
不是一次性返回完整答案,
而是“后端不断往前端推送消息”。
2)获取用户和请求参数
longuserId=StpUtil.getLoginIdAsLong();Stringprompt=(String)request.get("prompt");Stringcontext=(String)request.get("context");LongconversationId=request.get("conversationId")!=null?Long.valueOf(request.get("conversationId").toString()):null;作用
从当前请求里拿出聊天所需的数据。
分别是什么意思
userId
当前登录用户 ID。
longuserId=StpUtil.getLoginIdAsLong();说明这个接口是登录后才能调用的,系统要知道“是谁在提问”。
prompt
用户当前输入的问题。
比如:
"帮我写一份周报"context
上下文内容。
比如可能是前面几轮聊天记录,或者额外背景说明。
conversationId
会话 ID。
用于标识这条消息属于哪个聊天会话。
如果没有,就可能是新会话。
3)创建 SSE 通道和完整回复缓存
SseEmitteremitter=newSseEmitter(300000L);StringBuilderfullResponse=newStringBuilder();SseEmitter emitter
创建一个 SSE 推送对象。
newSseEmitter(300000L)表示:
- 创建一条 SSE 长连接
- 超时时间 300000 毫秒
- 也就是5 分钟
通俗理解
像开了一根“消息管道”,后面模型生成的内容都会从这根管道发给前端。
StringBuilder fullResponse
用于把模型返回的所有 token 拼起来。
因为模型是一个字一个字、一段一段返回的,最后要把完整答案拼出来,保存到数据库。
4)外层 try:整个流程的总保护
try{...}catch(Exceptione){...}作用
防止接口在初始化阶段直接崩掉。
比如:
- 取 API Key 失败
- 创建模型失败
- 参数异常
- 数据库保存失败
如果这里出错,会给前端发一个错误消息并结束连接。
5)检查用户有没有配置大模型 Key
varllmKey=llmKeyService.getDefaultLLMKey(userId);if(llmKey==null){emitter.send(SseEmitter.event().data("{\"event\":\"error\",\"data\":\"请先配置大模型 API Key\"}"));emitter.complete();returnemitter;}作用
先检查当前用户有没有配置默认的大模型 API Key。
为什么要做这个
因为后面调用大模型必须依赖:
- API Key
- Base URL
- 模型名
如果这些都没有,根本没法请求模型。
处理方式
如果没配置:
- 通过 SSE 给前端发错误信息
- 结束连接
- 直接返回
通俗理解
像系统先检查“你有没有油卡”,没有就别开车了。
6)拼接最终 prompt
StringfullPrompt=context!=null&&!context.isEmpty()?context+"\n"+prompt:prompt;作用
把上下文和当前问题拼成最终发给模型的输入。
逻辑
- 如果有
context,就:
context + 换行 + prompt- 如果没有,就只用
prompt
举个例子
假设:
context = "你是一个Java老师" prompt = "解释一下SSE"拼出来就是:
你是一个Java老师 解释一下SSE通俗理解
这是在告诉模型:
“这是背景,这是本次问题,请一起参考回答。”
7)先保存用户消息
chatMessageService.saveMessage(userId,conversationId,"user",fullPrompt,llmKey.getModelName());作用
把用户这次提问先存到数据库。
存了什么
大概率包括:
- 用户 ID
- 会话 ID
- 角色:
user - 内容:
fullPrompt - 模型名
为什么先存
因为这样聊天记录能完整保存下来,后面可以在会话列表里查看。
8)获取流式大模型对象
varstreamingModel=llmService.getStreamingChatModel(llmKey.getApiKey(),llmKey.getBaseUrl(),llmKey.getModelName());作用
根据用户配置的参数,创建一个“支持流式输出”的模型客户端。
这里用到了
- API Key
- Base URL
- 模型名
通俗理解
这一步是在“接通大模型服务”。
9)保存模型名
StringmodelName=llmKey.getModelName();作用
把模型名单独保存下来,方便在回调内部使用。
因为匿名内部类里引用外部变量时,通常希望变量明确、稳定。
10)发起流式聊天
streamingModel.chat(fullPrompt,newStreamingChatResponseHandler(){作用
调用大模型开始生成回答,并注册回调处理器。
通俗理解
你可以理解成:
把问题交给模型 然后告诉它: - 你每吐一点内容,就通知我 - 全部吐完了,也通知我 - 出错了,也通知我这个StreamingChatResponseHandler就是专门处理这些过程事件的。
五、三个核心回调方法
这是整段代码最关键的地方。
1)onPartialResponse:收到部分内容时
@OverridepublicvoidonPartialResponse(Stringtoken){try{fullResponse.append(token);emitter.send(SseEmitter.event().data(token));}catch(IOExceptione){log.error("Failed to send token: {}",e.getMessage());emitter.completeWithError(e);}}作用
每当模型生成一点内容,就会进这个方法。
这里做了两件事
第一件:拼接到完整答案里
fullResponse.append(token);比如模型依次返回:
你 好 ,很 高兴 见到你这里会慢慢拼成:
你好,很高兴见到你第二件:实时推给前端
emitter.send(SseEmitter.event().data(token));把当前 token 立刻通过 SSE 发给前端。
通俗理解
模型每“说一个字”,后端就立刻转发给页面。
所以前端看到的是“边生成边显示”。
异常处理
catch(IOExceptione){...emitter.completeWithError(e);}如果发送失败,比如:
- 前端断开了
- 网络中断
- SSE 通道出问题
那就记录日志,并把连接按异常结束。
2)onCompleteResponse:模型全部生成完成时
@OverridepublicvoidonCompleteResponse(ChatResponseresponse){try{chatMessageService.saveMessage(userId,conversationId,"assistant",fullResponse.toString(),modelName);emitter.send(SseEmitter.event().data("[DONE]"));emitter.complete();}catch(IOExceptione){log.error("Failed to send complete event: {}",e.getMessage());emitter.completeWithError(e);}}作用
当模型已经完整回答完时执行。
这里做了三件事
第一件:保存 assistant 的完整回答
chatMessageService.saveMessage(userId,conversationId,"assistant",fullResponse.toString(),modelName);把刚才拼出来的完整回复保存到数据库。
保存的是:
- 用户 ID
- 会话 ID
- 角色:
assistant - 完整内容
- 模型名
这就形成一条完整聊天记录。
第二件:给前端发送结束标志
emitter.send(SseEmitter.event().data("[DONE]"));告诉前端:
模型已经输出完了前端一般会根据[DONE]:
- 停止 loading
- 停止继续拼接流
- 把消息状态改为完成
第三件:关闭 SSE 连接
emitter.complete();正常结束。
通俗理解
这相当于:
模型说完了 → 把完整答案存档 → 通知前端“结束了” → 挂断连接3)onError:模型处理出错时
@OverridepublicvoidonError(Throwableerror){try{emitter.send(SseEmitter.event().data("[ERROR] "+error.getMessage()));emitter.complete();}catch(IOExceptione){log.error("Failed to send error event: {}",e.getMessage());emitter.completeWithError(e);}}作用
如果模型生成过程中出错,会进入这里。
比如:
- 模型接口调用失败
- API Key 无效
- 网络异常
- 上游服务超时
处理方式
- 给前端发错误信息
- 结束连接
通俗理解
不是正常说完,而是中途出故障了,于是告诉前端“报错了”。
六、外层 catch:启动阶段异常处理
}catch(Exceptione){log.error("Failed to start streaming chat: {}",e.getMessage(),e);try{emitter.send(SseEmitter.event().data("[ERROR] "+e.getMessage()));}catch(IOExceptionioException){log.error("Failed to send error event: {}",ioException.getMessage());}emitter.complete();}作用
处理“还没进入流式回调之前”的异常。
比如:
- 获取默认 Key 失败
- 拼 prompt 出错
- 保存用户消息失败
- 创建 streamingModel 失败
逻辑
- 记录错误日志
- 尝试通知前端
- 结束 SSE 连接
七、超时和完成回调
1)超时处理
emitter.onTimeout(()->{log.warn("SSE emitter timeout");emitter.complete();});作用
如果 SSE 超时了,就记日志并结束连接。
因为你前面设置的是:
newSseEmitter(300000L)所以 5 分钟没完成,就会触发超时处理。
通俗理解
为了防止连接一直挂着不结束。
2)完成回调
emitter.onCompletion(()->{log.debug("SSE emitter completed");});作用
当 SSE 连接最终结束时,打一个调试日志。
无论是:
- 正常结束
- 异常结束
- 超时结束
最后都可能触发 completion。
通俗理解
相当于留个“收尾记录”。
八、最后返回 emitter
returnemitter;作用
把这个 SSE 通道返回给前端。
返回后,前端和后端之间就建立了流式连接,后面后端就可以持续send(...)数据过去。
九、这段代码的核心价值
这段代码最核心的功能是:
1. 支持 AI 流式输出
不是等模型全答完,而是逐步返回。
2. 支持会话记录保存
用户消息和 AI 回复都会入库。
3. 支持多会话
通过conversationId区分聊天上下文。
4. 支持异常处理
没配置 Key、模型报错、发送失败,都有处理。
5. 支持超时控制
避免 SSE 长时间挂死。
十、把它翻译成大白话
如果把整段代码翻成生活语言,就是:
用户发来一个问题 → 系统先确认这个用户是谁 → 看他有没有配置大模型账号 → 把用户的问题记下来 → 去问大模型 → 模型每说一句,就立刻转发给前端 → 前端就能实时显示 → 模型说完后,把完整回答存起来 → 告诉前端“结束了” → 关闭连接十一、这段代码里最关键的几个点
你要抓住这几个核心点:
SseEmitter
负责“流式推送”
streamingModel.chat(...)
负责“调用支持流式输出的大模型”
onPartialResponse
负责“每来一点内容就发给前端”
fullResponse.append(token)
负责“把零散 token 拼成完整答案”
saveMessage(...)
负责“存聊天记录”
[DONE]
负责“告诉前端本次流结束”
十二、这段代码的一个小风险点
这段代码能跑,但从工程角度有一个值得注意的点:
streamingModel.chat(fullPrompt,newStreamingChatResponseHandler(){如果这个chat(...)底层不是异步的,而是阻塞当前请求线程,那么高并发下可能占住 Web 线程较久。
也就是说,要看streamingModel.chat的底层实现是不是异步回调模型。
如果它本身是异步的,这样写没问题。
如果它不是,生产环境一般会再做线程池隔离。
十三、一句话总结
这段代码实现了一个“带会话保存、异常处理、超时控制的 AI 流式聊天接口”,核心机制是用SseEmitter把大模型生成的内容实时推送给前端。
如果你愿意,我下一条可以继续帮你做两件事里的一个:
要么给你画这段代码的执行流程图,要么逐行解释每一行代码。