news 2026/7/2 1:46:37

SSE客户端C++实现(使用libcurl)

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
SSE客户端C++实现(使用libcurl)

SSE协议 的全称是 Server-Sent Events(服务器发送事件),本质是基于 HTTP 协议的 “单向实时推送技术”——只有服务器能主动给客户端发消息,除了发送订阅请求外,客户端只能接收数据。SSE消息是纯文本格式,SSE标准支持自动重连。

2、SSE与WebSocket

底层协议:SSE底层使用http协议(http/1.1 chunked编码传输或http/2DATA帧序列),其消息格式为纯文本。WebSocket握手(建立连接)通过http实现,连接建立后使用WebSocket协议(底层使用tcp协议),WebSocket协议帧头比http/1.1要小,而且SSE消息使用文本格式,WebSocket则可以为文本 / 二进制,所以WebSocket通信效率要高。

通信特点:WebSocket支持双向通信,SSE是单向通信。SSE标准支持自动重连,WebSocket则需要自己实现自动重连功能。

浏览器支持:SSE、WebSocket都属于JS原生标准,可以直接在浏览器里使用。

开发运维:SSE支持自动重连,WebSocket则需要自己实现心跳、连接状态判断等,SSE开发要比WebSocket简单。SSE使用http协议,所以可以复用http生态(端口、配置等),而WebSocket需要单独配置ws/wss协议、Nginx 转发。

适用场景:SSE——系统通知、实时文本数据推送。WebSocket——实时聊天、实时请求-应答(保持连接的请求-应答)。

3、SSE服务实现

实现SSE服务的话,推荐使用Spring MVC的SseEmitter(适用于传统的 Spring Boot Web 项目)或Spring WebFlux的Flux<ServerSentEvent>(响应式非阻塞编程,适合高并发的场景)。

  • 安全考虑

    • 使用HTTPS防止中间人攻击

    • 验证连接权限(如JWT Token)

    • 防止CSRF攻击

  • 监控与日志

    • 记录连接数、消息发送成功率

4、nginx与SSE

Nginx可能会开启缓冲(Buffering),比如试图收集足够多的数据(凑满4KB 或 8KB)后再进行转发,这就破坏了 SSE 的实时性,使用 Nginx 反向代理 SSE 服务,必须显式关闭缓冲:

使用 Nginx 反向代理SSE服务的话,调整read_timeout和proxy_read_timeout以禁用读取超时,read_timeout、send_timeout是nginx读取、写入客户的超时,proxy_read_timeout、proxy_send_timeout是nginx读取、写入后端服务的超时,proxy_connect_timeout是nginx连接后端服务的超时。

5、SSE数据包格式

SSE消息由一个或多个message组成,每个message由"\n\n"分隔。一个message可以由一个或多个filed组成,每个filed由\n分隔,filed有data、id、event、retry四种类型。

data表示消息数据,如"data:value\n"。

id表示数据包编号,当连接断开重连的时候,客户端应该发送一个 HTTP 头,里面包含一个特殊的Last-Event-ID头信息,值为收到的最后一条数据的id。

event表示自定义的事件类型。

retry表示浏览器重新发起连接的间隔时间,当时间间隔到期,客户端应该重新发起连接。

还可以有仅以冒号开头的行,表示注释。通常服务器每隔一段时间就会向浏览器发送一个注释,保持连接不中断。

包含一条message的消息:

data: {"username": "bobby", "time": "02:33:48"}

包含两条message的消息:

data:this is message A data:this is data:message B

包含三条message的消息:

:explan text data:this is message A data:this is data:message B

6、SSE客户端实现

SSE向服务端发送订阅推送请求的话,使用GET方式,如果需要发送参数的话,通过URL 查询参数或设置http请求头实现。

#include <thread> #include <functional> size_t dataCallback(void* ptr, size_t size, size_t nmemb, void* param) { if (ptr == NULL || size == 0 || param == NULL) return 0; int s = size * nmemb; std::string strMsg((char*)ptr, s); CSSEClient* client = static_cast<CSSEClient*>(param); if (client && client->onMessage) client->onMessage(strMsg); return s; } int progressCallback(void* param, curl_off_t dltotal, curl_off_t dlnow, curl_off_t ultotal, curl_off_t ulnow) { CSSEClient* client = static_cast<CSSEClient*>(param); return client->isExit() ? 1 : 0; // 返回非0值会中断curl_easy_perform() } class CSSEClient { public: CSSEClient(const std::string& strURL) { curl_global_init(CURL_GLOBAL_DEFAULT); _strURL = strURL; _thread = std::thread(&CSSEClient::run, this); } virtual ~CSSEClient() { _bExit = true; _thread.join(); curl_global_cleanup(); } bool isExit() { return _bExit; } std::function<void(const std::string&)> onMessage; private: void run() { while (!_bExit) { subscribeSSE(); if (_bExit) break; std::this_thread::sleep_for(std::chrono::seconds(1)); } } void subscribeSSE() { CURL* curl = curl_easy_init(); if (!curl) return; struct curl_slist* headers = NULL; headers = curl_slist_append(headers, "Accept: text/event-stream"); headers = curl_slist_append(headers, "Cache-Control: no-cache"); curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers); curl_easy_setopt(curl, CURLOPT_URL, _strURL.c_str()); curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, dataCallback); curl_easy_setopt(curl, CURLOPT_WRITEDATA, this); curl_easy_setopt(curl, CURLOPT_CONNECTTIMEOUT, 10L); curl_easy_setopt(curl, CURLOPT_TIMEOUT, 0L); curl_easy_setopt(curl, CURLOPT_TCP_KEEPALIVE, 1L); //开启TCP心跳 curl_easy_setopt(curl, CURLOPT_TCP_KEEPIDLE, 60L); //空闲60秒后探测 curl_easy_setopt(curl, CURLOPT_TCP_KEEPINTVL, 10L); //探测间隔10秒 curl_easy_setopt(curl, CURLOPT_NOPROGRESS, 0L); // 开启传输进度(已下载/上传数据量、总数据量等)回调 curl_easy_setopt(curl, CURLOPT_XFERINFOFUNCTION, progressCallback); //设置传输进度回调方法,即使没有数据传输该方法也会被调用(通常每秒1次),所以可以在该方法中判断是否需要退出 curl_easy_setopt(curl, CURLOPT_XFERINFODATA, this); //设置传输回调方法的第一个参数 CURLcode res = curl_easy_perform(curl); if (res == CURLE_ABORTED_BY_CALLBACK) { //SSE connection aborted by user; } else { //error }
版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/7/2 1:46:10

Qt问题记录002:QMap的erase陷阱,正常运行与调试模式结果不同

Qt的QMap循环删除元素&#xff08;erase&#xff09;&#xff0c;在运行时正常&#xff0c;在调试模式下报错&#xff0c;提供解决代码。关键词&#xff1a;QMap、erase、迭代器、遍历与删除问题描述&#xff1a;在使用 Qt 的QMap 容器时&#xff0c;尝试在遍历过程中删除元素&…

作者头像 李华
网站建设 2026/7/2 1:42:03

大模型评测与AI产品质量保障:第7篇 机器学习的三种学习范式

IT策士 10余年一线大厂经验&#xff0c;专注大模型测试、AI产品质量保障与职场进阶。我会在各个平台持续发布最新文章&#xff0c;助你少走弯路。 上一篇文章我们拆解了AI的六块技术拼图。但无论哪块拼图&#xff0c;背后驱动模型学习的都是三种核心范式——有监督学习、无监督…

作者头像 李华
网站建设 2026/7/2 1:39:44

turn_on_wheeltec_robot.launch

<launch><!-- Arguments参数 --><arg name"lidar_is_cx" default"false"/><arg name"car_mode" default"mini_4wd" doc"opt: mini_akm,senior_akm,top_akm_bs,top_akm_dl,mini_mec,senior_mec_bs,senior_m…

作者头像 李华
网站建设 2026/7/2 1:37:44

Skill安全系列之Skill基础

前言 在总结Skill中的相关风险前&#xff0c;我们先通过一篇的内容来看下什么是Skill&#xff0c;对其有一个初步的认识。 什么是Skill Skill翻译为技能的意思&#xff0c;在AI邻域中&#xff0c;Skill就是一个技能&#xff0c;比如给Agent一个A技能&#xff0c;那我们就需要拥…

作者头像 李华
网站建设 2026/7/2 1:34:17

跨架构容器化恶意软件动态分析沙箱设计与实现

1. 项目概述&#xff1a;跨架构容器化恶意软件动态分析沙箱在网络安全领域&#xff0c;动态恶意软件分析一直是识别和对抗新型威胁的重要手段。传统分析方案通常依赖于笨重的虚拟机管理程序或专用物理设备&#xff0c;这不仅限制了分析的灵活性和自动化程度&#xff0c;也难以适…

作者头像 李华