背景痛点:同步调用把 CPU 干成了“摸鱼王”
去年做有声书项目时,我们先用最省事的同步方式调 CosyVoice:for loop一条条推文本,平均 320 字/条,返回 16 kHz WAV。压测结果惨不忍睹:
- 平均延迟 1.8 s(P99 2.4 s)
- 单核 QPS ≈ 0.55
- CPU 利用率 8%,网卡 3%,机器几乎在“睡觉”
瓶颈不在 CosyVoice 本身,而是阻塞 IO 把线程钉死——线程等回包时不能干别的,并发一高就排队,线程上下文切换还把耗时再抬高 15%。一句话:同步模型下,资源闲置比浪费更贵。
技术对比:三种调用模式的硬数据
我们在 4C8G 容器里跑同样 5 k 条文本,统计总耗时与吞吐量(音频时长 / 总耗时)。结果如下:
| 模式 | 并发数 | 总耗时(s) | 吞吐(×RTF) | 内存峰值 | 选择建议 |
|---|---|---|---|---|---|
| 同步 | 1×64 线程 | 9 018 | 0.52 | 0.9 GB | 快速原型,别上生产 |
| 异步单条 | 64 协程 | 1 204 | 3.9 | 1.1 GB | 延迟敏感、小流量 |
| 异步批处理 | 32 协程×batch=8 | 312 | 15.1 | 2.3 GB | 高吞吐场景首选 |
RTF(Real-Time Factor)>1 代表合成比实时快,15 倍 RTF 意味着 1 h 音频 4 min 搞定。
结论:批处理+异步能把吞吐提升 300% 以上,代价是内存翻倍;只要内存够,就是真香。
核心实现:Python 异步批处理最佳实践
下面给出可直接落地的asyncio + aiohttp模板,已在线上稳定跑 3 个月,日合成 20 万条。
1. 环境准备
pip install aiohttp==3.9.1 aiofiles==23.2.1 tenacity==8.2.32. 代码骨架(PEP8 + 类型注解)
import asyncio, aiohttp, json, time from typing import List, Tuple from tenacity import retry, wait_exponential, stop_after_attempt API_BASE = "https://api.cosyvoice.example.com/v2" BATCH_SIZE = 8 # 实测 8 条时 RTF 最高 MAX_CONCURRENCY = 32 # 受容器端口数/FD 限制 TOKEN_QPS = 50 # 官方限流 60,留 10 做缓冲 class CosyVoiceClient: def __init__(self): # 1. 连接池:保持长连接,避免 SSL 握手 connector = aiohttp.TCPConnector( limit=MAX_CONCURRENCY, limit_per_host=MAX_CONCURRENCY, keepalive_timeout=30, enable_cleanup_closed=True, ) timeout = aiohttp.ClientTimeout(total=25) self.session = aiohttp.ClientSession( connector=connector, timeout=timeout ) async def close(self): await self.session.close() @retry(wait=wait_exponential(multiplier=1, min=2, max=20), stop=stop_after_attempt(5)) async def _post_batch(self, texts: List[str]) -> List[bytes]: """单次批处理请求,返回 wav 字节列表""" async with self.session.post( f"{API_BASE}/synthesize/batch", json={"texts": texts, "voice_id": "zh_female_001"}, headers={"Authorization": "Bearer YOUR_KEY"}, ) as resp: resp.raise_for_status() data = await resp.read() # CosyVoice 返回 zip 包,内含 batch 个 wav return self._unpack_zip(data) async def _worker(self, queue: asyncio.Queue, results: List[bytes]): while True: batch = await queue.get() if batch is None: # 毒丸 break wavs = await self._post_batch(batch) results.extend(wavs) queue.task_done() async def run(self, texts: List[str]) -> List[bytes]: queue: asyncio.Queue = asyncio.Queue(maxsize=MAX_CONCURRENCY) results: List[bytes] = [] # 2. 启动 worker 协程 workers = [ asyncio.create_task(self._worker(queue, results)) for _ in range(MAX_CONCURRENCY) ] # 3. 按 BATCH_SIZE 切片入队 for i in range(0, len(texts), BATCH_SIZE): await queue.put(texts[i:i+BATCH_SIZE]) # 4. 毒丸,通知结束 for _ in workers: await queue.put(None) await queue.join() await asyncio.gather(*workers) return results3. 调用示例
async def main(): client = CosyVoiceClient() try: texts = open("chapters.txt").read().splitlines() t0 = time.time() wavs = await client.run(texts) print(f"合成完成,共 {len(wavs)} 条,耗时 {time.time()-t0:.2f}s") finally: await client.close() if __name__ == "__main__": asyncio.run(main())4. 关键注释
- 连接池:
limit与limit_per_host保持一致,防止单域名连接数被内核打满。 - 错误重试:指数退避
wait_exponential,第一次 2 s,第二次 4 s… 最大 20 s,兼顾快速恢复与避免雪崩。 - 负载均衡:官方提供 3 个 endpoint,可在
_post_batch里随机挑选,或按响应时间动态选择(代码略)。
性能优化:batch size 与限流
1. batch size 曲线
保持并发 32 不变,仅改 batch size,得到 RTF-内存曲线:
| batch | 平均 RTF | 内存峰值 | 备注 |
|---|---|---|---|
| 1 | 3.9 | 1.1 GB | 延迟最低 |
| 4 | 9.7 | 1.5 GB | 折中 |
| 8 | 15.1 | 2.3 GB | 最佳吞吐 |
| 16 | 14.9 | 3.6 GB | 内存陡增,RTF 反而下降 |
原因:batch 过大时,服务端 GPU 并行度受限,首包等待时间变长,协程也会把大量音频缓存在内存,导致“协程泄漏式内存膨胀”。
2. 令牌桶限流
官方文档给的是 60 QPS,但突发会 429。本地用令牌桶做提前限流,比被重试更省时间:
import asyncio, time from typing import Optional class TokenBucket: def __init__(self, rate: int, capacity: int): self._rate = rate self._cap = capacity self._tokens = capacity self._last = time.monotonic() self._lock = asyncio.Lock() async def acquire(self, need: int = 1) -> None: async with self._lock: while self._tokens < need: await asyncio.sleep(0.05) now = time.monotonic() added = int((now - self._last) * self._rate) self._tokens = min(self._cap, self._tokens + added) self._last = now self._tokens -= need在_post_batch前调用await bucket.acquire(),可把 429 率压到 <0.3%。
避坑指南:生产环境 3 大血泪教训
连接泄漏
现象:跑 6 h 后lsof看到 20 k 条CLOSE_WAIT。
根因:异常时未resp.release()。
解决:用async with包裹所有请求;在ClientSession加enable_cleanup_closed=True。SSL 握手超时
现象:高并发下偶发aiohttp.ClientConnectorSSLError。
根因:默认timeout.total包含握手,网络抖动即超时。
解决:把ClientTimeout(total=25, connect=5)拆开,给连接阶段更小值,握手失败次数下降 90%。协程堆积导致 OOM
现象:batch size 16 + 高网络抖动,内存飙到 6 GB 被 OOMKill。
根因:结果队列无背压,协程不停接收新任务。
解决:asyncio.Queue(maxsize=MAX_CONCURRENCY)做天然背压;queue 满时await queue.put()会阻塞上游,防止无限堆积。
延伸思考:用 Kafka 做异步任务队列
当合成量涨到每天 50 万条,单机跑脚本已难维护,可拆成“生产 + 消费 + 结果存储”三段:
- 文本生产者把章节拆段 → 推 Kafka
voice-raw主题,key=book_id,保证顺序消费。 - 消费者用本文的
CosyVoiceClient批量拉取 → 合成后上传 OSS → 把音频 URL 回写voice-done。 - 背压策略:Kafka 的
max.poll.records=8,与 batch size 对齐;消费端限流仍用令牌桶,防止 429 导致频繁重平衡。
背压控制还可结合协程池 + 信号量,当下游 API 抖动时自动降低并发,把压力留在应用层而不是消息层,避免 Kafka 反复重平衡造成消费停滞。
把同步改成异步批处理后,我们线上 4 台 4C8G 容器就把日 20 万条吃得死死的,CPU 利用率拉到 55%,RTF 稳定在 15 左右,比之前省了一半机器。代码上线当天,运维同事还怀疑是不是少发了几台节点——省机器就是省钱,真香定律诚不我欺。下一步我准备把 Kafka 方案搬上舞台,让合成任务像水流一样顺畅,也欢迎你来试试,然后把踩到的新坑分享给我。