news 2026/2/15 4:50:22

使用 Go 实现 SSE 流式推送 + 打字机效果(模拟 Coze Chat)

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
使用 Go 实现 SSE 流式推送 + 打字机效果(模拟 Coze Chat)

使用 Go 实现 SSE 流式推送 + 打字机效果(模拟 Coze Chat)

在开发实时聊天、AI 助手或者协作应用时,我们经常需要SSE(Server-Sent Events)实现服务端向前端持续推送数据。本文将分享一个Go SSE 打字机式输出实现,并附上上游模拟示例、curl 测试和前端实时渲染示例。


功能特点

  1. 使用SSE推送消息流,前端无需轮询。
  2. 对消息进行逐字符打字机式输出,模拟 AI 打字效果。
  3. 支持上游 SSE 模拟,方便本地测试。
  4. 可轻松扩展为真实 AI 聊天接口的代理服务。

技术栈

  • Go 1.21+
  • CloudWeGo Hertz 作为 HTTP 框架
  • resty 用于上游 SSE 请求
  • SSE 流式推送(使用hertz-contrib/sse

完整示例代码

packagemainimport("bufio""context""encoding/json""fmt""math/rand""strings""time""github.com/cloudwego/hertz/pkg/app""github.com/cloudwego/hertz/pkg/app/server""github.com/cloudwego/hertz/pkg/common/hlog""github.com/hertz-contrib/sse""resty.dev/v3")funcmain(){h:=server.Default(server.WithHostPorts(":8380"))h.POST("/v3/chat",CozeParseThenTypeWriter)h.GET("/upstream",MockUpstreamSSE)hlog.Info("🚀 Coze SSE parse + typewriter proxy running at :8380")h.Spin()}// 核心逻辑:解析 conversation.message.delta → 打字机式输出funcCozeParseThenTypeWriter(ctx context.Context,c*app.RequestContext){// SSE Headerc.SetStatusCode(200)h:=c.Response.Header h.Set("Content-Type","text/event-stream; charset=utf-8")h.Set("Cache-Control","no-cache, no-store, must-revalidate")h.Set("Pragma","no-cache")h.Set("Connection","keep-alive")h.Set("X-Accel-Buffering","no")stream:=sse.NewStream(c)// Resty 上游请求client:=resty.New().SetTimeout(0)resp,err:=client.R().SetContext(ctx).SetDoNotParseResponse(true).SetHeader("Accept","text/event-stream").Get("http://localhost:8380/upstream")iferr!=nil||resp.RawResponse==nil||resp.RawResponse.Body==nil{hlog.Error("upstream connect failed")return}deferresp.RawResponse.Body.Close()// 打字机准备r:=rand.New(rand.NewSource(time.Now().UnixNano()))scanner:=bufio.NewScanner(resp.RawResponse.Body)varcurrentEventstring// SSE 解析循环forscanner.Scan(){select{case<-ctx.Done():hlog.Warn("client disconnected")returndefault:}line:=scanner.Text()ifline==""{currentEvent=""continue}ifstrings.HasPrefix(line,"event:"){currentEvent=strings.TrimSpace(strings.TrimPrefix(line,"event:"))continue}ifstrings.HasPrefix(line,"data:"){payload:=strings.TrimSpace(strings.TrimPrefix(line,"data:"))ifpayload=="[DONE]"{stream.Publish(&sse.Event{Data:[]byte("[DONE]")})return}ifcurrentEvent=="conversation.message.delta"{vardstruct{Contentstring`json:"content"`}iferr:=json.Unmarshal([]byte(payload),&d);err==nil{typeWriter(stream,r,d.Content)}}ifcurrentEvent=="conversation.message.completed"{stream.Publish(&sse.Event{Event:"conversation.message.completed",Data:[]byte(`{"status":"completed"}`),})}}}}// 打字机逐字符输出functypeWriter(stream*sse.Stream,r*rand.Rand,textstring){fori,ch:=rangetext{time.Sleep(getSleepDuration(r,ch))data:=map[string]any{"id":fmt.Sprintf("char_%d_%d",i,time.Now().UnixNano()%100000),"role":"assistant","type":"answer","content":string(ch),"created_at":time.Now().UnixMilli(),}b,_:=json.Marshal(data)_=stream.Publish(&sse.Event{Event:"conversation.message.delta",ID:fmt.Sprintf("char_%d",i),Data:b,})}}// 延迟策略funcgetSleepDuration(r*rand.Rand,chrune)time.Duration{switch{casech=='\n'||ch=='。'||ch=='!'||ch=='?':returntime.Duration(300+r.Intn(200))*time.Millisecondcasech=='、'||ch==' '||ch=='-'||ch==':'||ch==',':returntime.Duration(150+r.Intn(100))*time.Millisecondcasech=='#'||ch=='*'||ch=='>':returntime.Duration(200+r.Intn(150))*time.Milliseconddefault:returntime.Duration(60+r.Intn(60))*time.Millisecond}}// 上游 Mock(模拟 Coze SSE)funcMockUpstreamSSE(ctx context.Context,c*app.RequestContext){c.SetStatusCode(200)c.Header("Content-Type","text/event-stream")c.Header("Cache-Control","no-cache")c.Header("Connection","keep-alive")c.Header("X-Accel-Buffering","no")c.Flush()send:=func(event,datastring){c.Write([]byte("event: "+event+"\n"+"data: "+data+"\n\n",))c.Flush()}deltas:=[]string{"你","好",",","这","是"," Coze"," SSE"}for_,ch:=rangedeltas{send("conversation.message.delta",`{"content":"`+ch+`"}`)time.Sleep(80*time.Millisecond)}send("conversation.message.completed",`{}`)c.Write([]byte("data: [DONE]\n\n"))c.Flush()}

使用方法

  1. 启动服务
go run main.go
  1. 访问接口
  • 上游 SSE 测试(浏览器可直接访问):
    http://localhost:8380/upstream

  • 打字机代理接口(POST):
    http://localhost:8380/v3/chat


使用curl测试 SSE

# 测试上游 SSEcurl-N http://localhost:8380/upstream# 测试打字机代理 SSEcurl-N -X POST http://localhost:8380/v3/chat

参数说明:

  • -N/--no-buffer:禁用输出缓存,实时显示流式数据。
  • -X POST:因为代理接口是 POST。

运行后,你会在终端看到类似打字机逐字符输出:

event: conversation.message.delta data: {"id":"char_0_12345","role":"assistant","type":"answer","content":"你","created_at":1700000000000} event: conversation.message.delta data: {"id":"char_1_67890","role":"assistant","type":"answer","content":"好","created_at":1700000000050} ... event: conversation.message.completed data: {"status":"completed"} data: [DONE]

前端实时渲染打字机效果示例

在前端,可以使用EventSource监听 SSE 并动态显示内容:

<divid="chat"></div><script>constchatDiv=document.getElementById("chat");constevtSource=newEventSource("http://localhost:8380/v3/chat");evtSource.addEventListener("conversation.message.delta",e=>{constdata=JSON.parse(e.data);chatDiv.innerHTML+=data.content;});evtSource.addEventListener("conversation.message.completed",e=>{console.log("消息完成");});evtSource.onopen=()=>console.log("连接已打开");evtSource.onerror=()=>console.log("连接错误或关闭");</script>

效果:消息逐字符显示,模拟 AI 打字机输出。


核心解析

  1. SSE Header 设置
h.Set("Content-Type","text/event-stream; charset=utf-8")h.Set("Cache-Control","no-cache, no-store, must-revalidate")h.Set("Connection","keep-alive")h.Set("X-Accel-Buffering","no")

保证浏览器或代理实时接收流式数据。

  1. 打字机效果

根据字符类型不同设置不同延迟:

casech=='\n'||ch=='。'||ch=='!'||ch=='?':returntime.Duration(300+r.Intn(200))*time.Millisecond
  1. 上游 SSE 模拟

方便本地测试,无需真实 AI 接口即可验证前端打字机效果。


总结

通过本文示例,你可以快速实现:

  • Go SSE 服务端代理
  • AI 聊天消息打字机式输出
  • 上游 SSE 模拟
  • curl 测试和前端实时渲染
版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/2/4 6:46:51

flv怎么转换成m2ts?flv格式转m2ts格式操作技巧

M2TS是一种常用于高清视频存储的专业格式&#xff0c;广泛应用于蓝光光盘和高端摄像设备。将FLV转换为M2TS格式可以有效提升视频的兼容性和画质表现&#xff0c;特别适合需要高质量视频输出的场景。以下是使用简鹿视频格式转换器完成转换的具体步骤&#xff1a; 1.首先在电脑上…

作者头像 李华
网站建设 2026/2/4 8:21:55

BG3ModManager保姆级教程:零基础避坑安装指南

BG3ModManager保姆级教程&#xff1a;零基础避坑安装指南 【免费下载链接】BG3ModManager A mod manager for Baldurs Gate 3. 项目地址: https://gitcode.com/gh_mirrors/bg/BG3ModManager 作为《博德之门3》玩家必备的模组管理神器&#xff0c;BG3ModManager能够帮助您…

作者头像 李华
网站建设 2026/2/14 17:13:45

elsa-core工作流终极指南:从零构建企业级业务流程

elsa-core工作流终极指南&#xff1a;从零构建企业级业务流程 【免费下载链接】elsa-core A .NET workflows library 项目地址: https://gitcode.com/gh_mirrors/el/elsa-core 还在为复杂的业务流程管理而头疼吗&#xff1f;&#x1f680; elsa-core作为一款强大的.NET工…

作者头像 李华
网站建设 2026/2/8 6:01:09

【医疗 Agent 数据安全实战】:5 大隐私保护技术落地详解

第一章&#xff1a;医疗 Agent 数据安全的挑战与演进随着人工智能在医疗领域的深入应用&#xff0c;医疗 Agent 作为连接患者、医生与医疗系统的智能中介&#xff0c;承担着数据采集、分析与决策支持的关键职能。然而&#xff0c;其处理的数据高度敏感&#xff0c;涵盖个人健康…

作者头像 李华
网站建设 2026/2/10 11:08:12

2007-2020年税收调查企业匹配工商注册、专利数据

之前我们分享过2007-2020年税收调查数据 本次分享的数据在税调数据所含有指标的基础上&#xff0c;又根据企业名称和统一社会信用代码匹配了工商注册数据库和专利数据库的指标&#xff0c;形成了2007-2020年分年份版本的税调匹配工商注册和专利数据。 其中工商注册数据库所含…

作者头像 李华