第34章:视频任务管理
34.1 概述
视频任务管理系统是剪映小助手的核心组件,负责管理视频生成任务的提交、执行、状态跟踪和结果获取。该系统采用异步任务队列架构,支持任务的并发处理、状态监控和错误处理,确保视频生成过程的可靠性和高效性。
34.2 核心组件设计
34.2.1 任务状态枚举
系统定义了完整的任务状态生命周期:
classTaskStatus(Enum):PENDING="pending"# 等待处理PROCESSING="processing"# 正在处理COMPLETED="completed"# 处理完成FAILED="failed"# 处理失败状态转换流程:
- PENDING → PROCESSING:任务开始执行
- PROCESSING → COMPLETED:任务成功完成
- PROCESSING → FAILED:任务执行失败
34.2.2 视频生成任务数据类
VideoGenTask类封装了视频生成任务的所有信息:
@dataclassclassVideoGenTask:task_id:str# 任务唯一标识draft_url:str# 草稿文件URLstatus:TaskStatus# 任务状态result:Optional[dict]=None# 任务结果error:Optional[str]=None# 错误信息created_at:datetime=field(default_factory=datetime.now)updated_at:datetime=field(default_factory=datetime.now)34.2.3 任务管理器单例类
VideoGenTaskManager采用单例模式,确保系统中只有一个任务管理器实例:
classVideoGenTaskManager:_instance=None_lock=threading.Lock()def__new__(cls):ifcls._instanceisNone:withcls._lock:ifcls._instanceisNone:cls._instance=super().__new__(cls)returncls._instance34.3 任务生命周期管理
34.3.1 任务提交
任务提交接口负责接收新的视频生成请求:
defsubmit_task(self,draft_url:str)->str:"""提交视频生成任务"""task_id=str(uuid.uuid4())task=VideoGenTask(task_id=task_id,draft_url=draft_url,status=TaskStatus.PENDING)withself._lock:self._tasks[task_id]=task self._task_queue.put(task_id)logger.info(f"提交视频生成任务:{task_id}, 草稿URL:{draft_url}")returntask_id34.3.2 任务状态查询
提供任务状态查询接口:
defget_task_status(self,task_id:str)->Optional[dict]:"""获取任务状态"""withself._lock:task=self._tasks.get(task_id)ifnottask:returnNonereturn{"task_id":task.task_id,"status":task.status.value,"result":task.result,"error":task.error,"created_at":task.created_at.isoformat(),"updated_at":task.updated_at.isoformat()}34.3.3 工作线程循环
工作线程负责处理任务队列中的任务:
def_worker_loop(self):"""工作线程主循环"""logger.info("视频生成工作线程启动")whileself._running:try:# 从队列获取任务ID,超时1秒task_id=self._task_queue.get(timeout=1)self._process_task(task_id)self._task_queue.task_done()exceptqueue.Empty:continueexceptExceptionase:logger.error(f"工作线程异常:{e}",exc_info=True)34.4 视频生成核心逻辑
34.4.1 任务处理流程
任务处理是系统的核心功能:
def_process_task(self,task_id:str):"""处理单个任务"""withself._lock:task=self._tasks.get(task_id)ifnottaskortask.status!=TaskStatus.PENDING:return# 更新状态为处理中task.status=TaskStatus.PROCESSING task.updated_at=datetime.now()logger.info(f"开始处理任务:{task_id}")try:# 执行视频生成result=self._generate_video(task.draft_url)# 更新任务状态withself._lock:task.status=TaskStatus.COMPLETED task.result=result task.updated_at=datetime.now()logger.info(f"任务处理完成:{task_id}")exceptExceptionase:logger.error(f"任务处理失败:{task_id}, 错误:{e}",exc_info=True)# 更新任务状态为失败withself._lock:task.status=TaskStatus.FAILED task.error=str(e)task.updated_at=datetime.now()34.4.2 视频生成实现
视频生成逻辑调用剪映的导出功能:
def_generate_video(self,draft_url:str)->dict:"""生成视频"""logger.info(f"开始生成视频,草稿URL:{draft_url}")# 提取草稿IDdraft_id=self._extract_draft_id(draft_url)ifnotdraft_id:raiseValueError(f"无法从URL提取草稿ID:{draft_url}")logger.info(f"提取到草稿ID:{draft_id}")# 调用剪映导出功能# 这里模拟实际的视频生成过程export_result=self._export_video(draft_id)ifnotexport_result.get("success"):raiseRuntimeError(f"视频导出失败:{export_result.get('error','未知错误')}")return{"video_url":export_result.get("video_url"),"duration":export_result.get("duration"),"file_size":export_result.get("file_size"),"draft_id":draft_id}34.4.3 草稿ID提取
从URL中提取草稿ID:
def_extract_draft_id(self,draft_url:str)->Optional[str]:"""从草稿URL提取草稿ID"""try:# 解析URLparsed=urlparse(draft_url)# 尝试从查询参数获取query_params=parse_qs(parsed.query)if'draft_id'inquery_params:returnquery_params['draft_id'][0]# 尝试从路径获取path_parts=parsed.path.strip('/').split('/')iflen(path_parts)>=2andpath_parts[-2]=='draft':returnpath_parts[-1]# 尝试从文件名获取ifparsed.path.endswith('.json'):filename=os.path.basename(parsed.path)returnfilename[:-5]# 移除.json后缀returnNoneexceptExceptionase:logger.error(f"提取草稿ID失败:{e}")returnNone34.5 错误处理与重试机制
34.5.1 异常分类处理
def_process_task_with_retry(self,task_id:str,max_retries:int=3):"""带重试的任务处理"""forattemptinrange(max_retries):try:self._process_task(task_id)return# 成功则返回exceptNetworkErrorase:logger.warning(f"网络错误,尝试重试{attempt+1}/{max_retries}:{e}")ifattempt<max_retries-1:time.sleep(2**attempt)# 指数退避else:raiseexceptExceptionase:logger.error(f"任务处理失败,不再重试:{e}")raise34.5.2 任务超时处理
def_process_task_with_timeout(self,task_id:str,timeout:int=300):"""带超时的任务处理"""deftimeout_handler(signum,frame):raiseTimeoutError("任务处理超时")# 设置超时信号signal.signal(signal.SIGALRM,timeout_handler)signal.alarm(timeout)try:self._process_task(task_id)finally:signal.alarm(0)# 取消超时34.6 并发控制与资源管理
34.6.1 并发任务限制
classVideoGenTaskManager:def__init__(self,max_concurrent_tasks:int=5):self.max_concurrent_tasks=max_concurrent_tasks self.active_tasks=0self.task_semaphore=threading.Semaphore(max_concurrent_tasks)def_process_task(self,task_id:str):"""处理任务(带并发控制)"""withself.task_semaphore:# 实际的业务处理逻辑self._do_process_task(task_id)34.6.2 资源清理
defcleanup_completed_tasks(self,max_age_hours:int=24):"""清理已完成的任务"""current_time=datetime.now()cutoff_time=current_time-timedelta(hours=max_age_hours)withself._lock:completed_tasks=[task_idfortask_id,taskinself._tasks.items()iftask.statusin[TaskStatus.COMPLETED,TaskStatus.FAILED]andtask.updated_at<cutoff_time]fortask_idincompleted_tasks:delself._tasks[task_id]logger.info(f"清理完成任务:{task_id}")34.7 监控与统计
34.7.1 任务统计
defget_task_statistics(self)->dict:"""获取任务统计信息"""withself._lock:total_tasks=len(self._tasks)pending_tasks=sum(1fortaskinself._tasks.values()iftask.status==TaskStatus.PENDING)processing_tasks=sum(1fortaskinself._tasks.values()iftask.status==TaskStatus.PROCESSING)completed_tasks=sum(1fortaskinself._tasks.values()iftask.status==TaskStatus.COMPLETED)failed_tasks=sum(1fortaskinself._tasks.values()iftask.status==TaskStatus.FAILED)return{"total_tasks":total_tasks,"pending_tasks":pending_tasks,"processing_tasks":processing_tasks,"completed_tasks":completed_tasks,"failed_tasks":failed_tasks,"queue_size":self._task_queue.qsize()}34.7.2 性能指标
defget_performance_metrics(self)->dict:"""获取性能指标"""stats=self.get_task_statistics()# 计算成功率total_completed=stats["completed_tasks"]+stats["failed_tasks"]success_rate=(stats["completed_tasks"]/total_completed*100iftotal_completed>0else0)# 计算平均处理时间completed_tasks=[taskfortaskinself._tasks.values()iftask.status==TaskStatus.COMPLETED]avg_processing_time=0ifcompleted_tasks:processing_times=[(task.updated_at-task.created_at).total_seconds()fortaskincompleted_tasks]avg_processing_time=sum(processing_times)/len(processing_times)return{"success_rate":round(success_rate,2),"average_processing_time_seconds":round(avg_processing_time,2),"active_worker_threads":threading.active_count()-1# 减去主线程}34.8 扩展性设计
34.8.1 插件化架构
支持自定义任务处理器:
classTaskProcessor(ABC):"""任务处理器抽象类"""@abstractmethoddefcan_process(self,task:VideoGenTask)->bool:"""判断是否可处理该任务"""pass@abstractmethoddefprocess(self,task:VideoGenTask)->dict:"""处理任务"""passclassDefaultVideoProcessor(TaskProcessor):"""默认视频处理器"""defcan_process(self,task:VideoGenTask)->bool:returntask.draft_url.endswith('.json')defprocess(self,task:VideoGenTask)->dict:returnself._generate_video(task.draft_url)34.8.2 分布式任务处理
支持多节点分布式处理:
classDistributedTaskManager:"""分布式任务管理器"""def__init__(self,redis_client,node_id:str):self.redis=redis_client self.node_id=node_id self.task_queue_key="video_gen_tasks"defsubmit_task(self,draft_url:str)->str:"""提交任务到分布式队列"""task_id=str(uuid.uuid4())task_data={"task_id":task_id,"draft_url":draft_url,"node_id":None,# 未分配节点"status":TaskStatus.PENDING.value}# 将任务添加到Redis队列self.redis.lpush(self.task_queue_key,json.dumps(task_data))returntask_id附录
代码仓库地址:
- GitHub:
https://github.com/Hommy-master/capcut-mate - Gitee:
https://gitee.com/taohongmin-gitee/capcut-mate
接口文档地址:
- API文档地址:
https://docs.jcaigc.cn