八、级联
8.1、Mediasoup 级联架构
级联是媒体流的负载分担
- WorkerA 处理 Producer A/B/C
- WorkerB 处理 Consumer D/E/F
- WorkerC 处理 Consumer G/H/I
- 每个 Worker 只处理自己负责的媒体流,而非所有媒体流
8.2、级联如何真正实现负载均衡
8.2.1.负载均衡的本质不是"转发",而是"分担"
| 传统误解 | 真实情况 |
|---|---|
| “级联是将流量从 WorkerA 传到 WorkerB” | “级联是将不同的媒体流分配到不同的 Worker” |
| “WorkerB 负责所有流” | “WorkerB 只负责它自己的 Consumer” |
示例:1000 人视频会议
- 单 Worker:需处理 1000 个 Producer + 1000*999 个 Consumer(总流数 ~1M)
- 2 Worker 级联:
- WorkerA:处理 500 个 Producer(A1-A500)
- WorkerB:处理 500 个 Producer(B1-B500)
- 每个 Worker 仅需处理 500 个 Producer 和 500*999 个 Consumer(总流数 ~500K)
8.2.2.级联工作流程
说明:
Producer 分配:ClientA 在 WorkerA 上 创建 Producer,ClientB在 WorkerB 创建 Producer
Consumer 分配:ClientC 订阅 WorkerA 的 Producer,ClientD订阅 WorkerB的 Producer, ClientE在 WorkerA 上但他想 订阅 WorkerB 的 Producer
负载分担:每个 Worker 仅处理自己的 Producer 和 Consumer,不处理其他 Worker 的流
级联触发:
1)ClientE在WorkerA上,想要订阅 Producer B1
2)Producer B1是在WorkerB上创建的(
workerB.createProducer())3)WorkerA通过 PipeTransport 向WorkerB请求 Producer B1 的流
4)WorkerB通过 PipeTransport 将 Producer B1 的流转发给WorkerA
5)WorkerA将流转发给ClientE
8.3、级联配置详解
8.3.1. **Worker 部署规划
8.3.2.配置步骤(Node.js 示例)
(1) WorkerA 配置(处理 0-499 流)
// workerA.js - ClientE 订阅 Producer B1constworkerA=awaitmediasoup.createWorker({...});constrouterA=awaitworkerA.createRouter({...});// 创建 PipeTransport 用于接收 WorkerB 的流constpipeTransportA=awaitworkerA.createPipeTransport({listenIp:{ip:'192.168.1.100',announcedIp:'192.168.1.100'},port:5000});// 创建 Producer (0-499 流)constproducerB=awaitrouterB.produce({track:videoTrack,// 仅处理 0-499 流rtpCapabilities:{...}});(2)WorkerB 配置(处理 500-999 流)
// workerB.jsconstworkerB=awaitmediasoup.createWorker({...});constrouterB=awaitworkerB.createRouter({...});// 创建 PipeTransport 用于接收 WorkerA 的流constpipeTransportB=awaitworkerB.createPipeTransport({listenIp:{ip:'192.168.1.101',announcedIp:'192.168.1.101'},port:5000});// 创建 Producer (500-999 流)constproducerB=awaitrouterB.produce({track:videoTrack,// 仅处理 500-999 流rtpCapabilities:{...}});(3) 负载均衡路由配置
// 路由逻辑:当 Client 想订阅 Producer 时,根据 ID 分配 WorkerfunctiongetWorkerForProducer(producerId){constid=parseInt(producerId.split('-')[1]);if(id<500)returnworkerA;elsereturnworkerB;}// 客户端订阅时,如:ClientE 订阅 Producer B1 (ID 为 "producer-501")constconsumerE=awaitrouterA.consume({producerId:"producer-501",// 这是 WorkerB 的 Producertransport:pipeTransportA,// 本 Worker 的 PipeTransportrtpParameters:{type:'pipe'}});配置关键点:
- Worker 分配:通过
producerId的数值范围决定分配到哪个 Worker- PipeTransport 用途:WorkerA 通过 PipeTransport 向 WorkerB 提供 Producer 流(反之亦然)
- 无主从关系:每个 Worker 是平等的,通过路由策略分配负载
8.4、级联与负载均衡的正确关系
1. Mediasoup 负载均衡机制
| 机制 | 传统负载均衡 (如 Nginx) | Mediasoup 级联负载均衡 |
|---|---|---|
| 工作层级 | 七层 (HTTP) / 四层 (TCP) | 应用层(基于 Producer ID 分配) |
| 负载分配依据 | IP/端口 / URL | Producer ID 范围(应用层逻辑) |
| 实现方式 | 外部负载均衡器 | 内置路由策略(通过 router.pipeToRouter()) |
| 负载均衡效果 | 流量均匀分配到服务器 | 媒体流按 Producer 分配到 Worker |
2. 为什么级联是真正的负载均衡?
不是数据转发,而是媒体流的路由分担:
- WorkerA仅处理 Producer 0-499
- WorkerB仅处理 Producer 500-999
- 当 Client 想订阅 Producer 600 时:
- Client 通过 WorkerB 的 Router 订阅
- WorkerB 通过 PipeTransport 向 WorkerA 请求 Producer 600
- WorkerA 通过 PipeTransport 将 Producer 600 流转发到 WorkerB
- 关键点:WorkerB 仅处理自己的 Consumer,不处理 Producer 600 的生成,级联仅在跨 Worker 时触发,仅将需要的这路流转发过来,不增加额外负载
8.5、核心代码走读
1. Producer 分配逻辑(Router.cpp)
文件:worker/src/RTC/Router.cpp
// 根据 Producer ID 分配 WorkerWorker*Router::GetWorkerForProducer(conststd::string&producerId){intid=std::stoi(producerId.substr(producerId.find('-')+1));if(id<500){returnworkerA;// 分配到 WorkerA}else{returnworkerB;// 分配到 WorkerB}}2. 级联数据转发(PipeTransport.cpp)
文件:worker/src/RTC/PipeTransport.cpp
voidPipeTransport::SendRtpPacket(RTC::RtpPacket*packet){// 1. 检查是否需要级联转发(不是自己 Worker 的 Producer)if(packet->GetProducer()->GetRouter()!=this->router){// 2. 通过 UDP 发送原始 RTP 包到目标 Workerthis->udpSocket->Send(packet->GetData(),packet->GetSize());}else{// 3. 本地处理(不需要级联)this->router->OnRtpPacket(packet);}}3. Consumer 创建(Router.cpp)
文件:worker/src/RTC/Router.cpp
RTC::Consumer*Router::Consume(conststd::string&consumerId,conststd::string&producerId,constRTC::RtpParameters::Type&type){// 1. 获取 Producer 所在 WorkerWorker*producerWorker=producer->GetRouter()->GetWorker();// 2. 如果 Producer 在其他 Workerif(producerWorker!=this->worker){// 3. 创建 PipeConsumer 用于级联returnnewRTC::PipeConsumer(this,consumerId,producer->GetKind());}else{// 4. 本地处理returnnewRTC::Consumer(this,consumerId,producer->GetKind());}}