news 2026/4/16 19:36:53

Hyperf对接报表 当帆布报表的生成任务出现超时或失败时,如何借助 HyperF 的异步队列和任务重试机制设计一套可靠的报表任务调度系统?请说明死信队列的处理策略。

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
Hyperf对接报表 当帆布报表的生成任务出现超时或失败时,如何借助 HyperF 的异步队列和任务重试机制设计一套可靠的报表任务调度系统?请说明死信队列的处理策略。
HyperF 报表任务调度 + 死信队列系统 选型: hyperf/async-queue + hyperf/retry + Redis Stream 死信队列 + 钉钉告警 --- 架构总览 提交任务 └─ AsyncQueue(default)# 主队列└─ ReportJob# 执行报表生成├─ 成功 → 标记完成 ├─ 超时 → hyperf/retry 自动重试(指数退避)└─ 失败3次 → 死信队列(failed)└─ DeadLetterConsumer ├─ 人工干预通知(钉钉)├─ 可手动重放 └─ 超期归档 --- 一、队列配置<?php // config/autoload/async_queue.phpreturn[// 主队列'default'=>['driver'=>Hyperf\AsyncQueue\Driver\RedisDriver::class,'channel'=>'report:queue','timeout'=>300, // 单任务最长执行5分钟'retry_seconds'=>[10,30,60], // 三次重试间隔(指数退避)'handle_timeout'=>360,'processes'=>4,'concurrent'=>['limit'=>8],], // 死信队列'failed'=>['driver'=>Hyperf\AsyncQueue\Driver\RedisDriver::class,'channel'=>'report:failed','timeout'=>60,'retry_seconds'=>[], // 死信不自动重试'handle_timeout'=>120,'processes'=>1,'concurrent'=>['limit'=>2],],];--- 二、报表 Job — 超时 + 重试<?php // app/Job/ReportJob.php namespace App\Job;use App\Export\ReportExporter;use App\Service\ReportTaskService;use Hyperf\AsyncQueue\Job;use Hyperf\Retry\Annotation\Retry;class ReportJob extends Job{public int$maxAttempts=3;// 最多执行3次 publicfunction__construct(privatereadonlystring$taskId, privatereadonlyarray$params, public int$attempt=0,){}// hyperf/retry 注解:指数退避,仅重试可恢复异常#[Retry(maxAttempts:3, base:10, strategy:\Hyperf\Retry\Policy\ExponentialBackoffRetryPolicy::class, retryThrowables:[\RuntimeException::class,\PDOException::class], ignoreThrowables:[\InvalidArgumentException::class],)]publicfunctionhandle(): void{$svc=make(ReportTaskService::class);$svc->markRunning($this->taskId, ++$this->attempt);try{// 协程超时保护\Hyperf\Coroutine\Coroutine::create(function(){\Swoole\Coroutine::defer(fn()=>null);});$path=(new ReportExporter($this->params))->run();$svc->markDone($this->taskId,$path);}catch(\Throwable$e){$svc->markFailed($this->taskId,$e->getMessage(),$this->attempt);throw$e;// 抛出让队列驱动计入失败次数}}}--- 三、任务状态服务<?php // app/Service/ReportTaskService.php namespace App\Service;use App\Job\DeadLetterJob;use Hyperf\AsyncQueue\Driver\DriverFactory;use Hyperf\DbConnection\Db;class ReportTaskService{publicfunction__construct(privatereadonlyDriverFactory$queue){}publicfunctionsubmit(array$params): string{$id=uniqid('rpt_',true);Db::table('report_tasks')->insert(['id'=>$id,'status'=>'pending','params'=>json_encode($params),'max_retry'=>3,'attempt'=>0,'created_at'=>time(),]);$this->queue->get('default')->push(new\App\Job\ReportJob($id,$params));return$id;}publicfunctionmarkRunning(string$id, int$attempt): void{Db::table('report_tasks')->where('id',$id)->update(['status'=>'running','attempt'=>$attempt,'started_at'=>time(),]);}publicfunctionmarkDone(string$id, string$path): void{Db::table('report_tasks')->where('id',$id)->update(['status'=>'done','output_path'=>$path,'finished_at'=>time(),]);}publicfunctionmarkFailed(string$id, string$error, int$attempt): void{$task=Db::table('report_tasks')->where('id',$id)->first();if($attempt>=$task->max_retry){// 超过重试上限 → 投入死信队列 Db::table('report_tasks')->where('id',$id)->update(['status'=>'dead','last_error'=>$error,'finished_at'=>time(),]);$this->queue->get('failed')->push(new DeadLetterJob($id, json_decode($task->params,true),$error));return;}Db::table('report_tasks')->where('id',$id)->update(['status'=>'retrying','last_error'=>$error,'attempt'=>$attempt,]);}// 手动重放死信任务 publicfunctionreplay(string$taskId): void{$task=Db::table('report_tasks')->where('id',$taskId)->firstOrFail();Db::table('report_tasks')->where('id',$taskId)->update(['status'=>'pending','attempt'=>0,]);$this->queue->get('default')->push(new\App\Job\ReportJob($taskId, json_decode($task->params,true)));}}--- 四、死信 Job — 告警 + 归档<?php // app/Job/DeadLetterJob.php namespace App\Job;use Hyperf\AsyncQueue\Job;use Hyperf\DbConnection\Db;class DeadLetterJob extends Job{public int$maxAttempts=1;// 死信不重试 publicfunction__construct(privatereadonlystring$taskId, privatereadonlyarray$params, privatereadonlystring$error,){}publicfunctionhandle(): void{//1. 持久化死信记录 Db::table('dead_letter_tasks')->insert(['task_id'=>$this->taskId,'params'=>json_encode($this->params),'error'=>$this->error,'created_at'=>time(),'expire_at'=>time()+86400*7, //7天后归档]);//2. 即时告警$this->alert();}privatefunctionalert(): void{$webhook=env('DINGTALK_WEBHOOK');$msg="[报表死信告警]\n"."任务ID: {$this->taskId}\n"."错误: {$this->error}\n"."时间: ".date('Y-m-d H:i:s')."\n"."操作: POST /report/task/{$this->taskId}/replay";make(\GuzzleHttp\Client::class)->post($webhook,['json'=>['msgtype'=>'text','text'=>['content'=>$msg]],]);}}--- 五、定时扫描 — 超时任务兜底<?php // app/Crontab/TimeoutScanner.php namespace App\Crontab;use App\Service\ReportTaskService;use Hyperf\Crontab\Annotation\Crontab;use Hyperf\DbConnection\Db;#[Crontab(name: 'TimeoutScanner', rule: '* * * * *', memo: '扫描超时报表任务')]class TimeoutScanner{publicfunction__construct(privatereadonlyReportTaskService$svc){}publicfunctionexecute(): void{// running 超过6分钟视为超时$timeouts=Db::table('report_tasks')->where('status','running')->where('started_at','<', time()-360)->get(['id','params','attempt','max_retry']);foreach($timeoutsas$task){$this->svc->markFailed($task->id,'timeout: exceeded 360s',$task->attempt);}}}--- 六、Controller — 任务管理<?php // app/Controller/ReportTaskController.php namespace App\Controller;use App\Service\ReportTaskService;use Hyperf\DbConnection\Db;use Hyperf\HttpServer\Annotation\{Controller, Post, Get};use Hyperf\HttpServer\Contract\RequestInterface;#[Controller(prefix: '/report/task')]class ReportTaskController{publicfunction__construct(privatereadonlyReportTaskService$svc){}#[Post('/submit')]publicfunctionsubmit(RequestInterface$req): array{return['task_id'=>$this->svc->submit($req->all())];}#[Get('/{id}/status')]publicfunctionstatus(string$id): array{return(array)Db::table('report_tasks')->where('id',$id)->first(['id','status','attempt','last_error','output_path','finished_at']);}#[Post('/{id}/replay')]publicfunctionreplay(string$id): array{$this->svc->replay($id);return['ok'=>true];}#[Get('/dead-letters')]publicfunctiondeadLetters(): array{returnDb::table('dead_letter_tasks')->where('expire_at','>', time())->orderByDesc('created_at')->limit(50)->get()->toArray();}}--- 七、任务状态机 + 死信流转 submit │ ▼ pending ──── 入主队列 ────► running │ ┌──────────┴──────────┐ 成功 失败/超时 │ │ ▼ attempt<max_retrydone│ ┌────┴────┐ 是 否 │ │ retrying dead ──► 死信队列 │ │ 指数退避重新入队 告警+持久化[10s, 30s, 60s]7天后归档 可手动replay 重试退避时间:attempt=1→ 等待 10sattempt=2→ 等待 30sattempt=3→ 等待 60s → 仍失败 → dead --- 八、数据库表 CREATE TABLE report_tasks(idVARCHAR(40)PRIMARY KEY, status VARCHAR(16)NOT NULL DEFAULT'pending', params JSON, attempt TINYINT NOT NULL DEFAULT0, max_retry TINYINT NOT NULL DEFAULT3, last_error TEXT, output_path VARCHAR(512), started_at INT, finished_at INT, created_at INT NOT NULL, INDEX idx_status(status, created_at));CREATE TABLE dead_letter_tasks(idINT UNSIGNED AUTO_INCREMENT PRIMARY KEY, task_id VARCHAR(40)NOT NULL, params JSON, error TEXT, created_at INT NOT NULL, expire_at INT NOT NULL, INDEX idx_expire(expire_at));--- 九、关键设计决策 ┌───────────────┬──────────────────────┬──────────────────────────────────┐ │ 问题 │ 决策 │ 原因 │ ├───────────────┼──────────────────────┼──────────────────────────────────┤ │ 重试间隔 │ 指数退避[10,30,60]s │ 避免瞬时故障下的惊群效应 │ ├───────────────┼──────────────────────┼──────────────────────────────────┤ │ 超时兜底 │ 每分钟 Crontab 扫描 │ 进程崩溃时队列驱动无法感知超时 │ ├───────────────┼──────────────────────┼──────────────────────────────────┤ │ 死信不重试 │maxAttempts=1│ 死信需人工确认原因再决定是否重放 │ ├───────────────┼──────────────────────┼──────────────────────────────────┤ │ 死信7天留存 │ expire_at 字段 │ 给运维足够时间处理,到期自动清理 │ ├───────────────┼──────────────────────┼──────────────────────────────────┤ │ 状态持久化 │ MySQL │ Redis 重启丢失,任务状态必须落库 │ └───────────────┴──────────────────────┴──────────────────────────────────┘
版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/4/16 19:36:02

你的AI助手偷偷在学什么?这个浏览器仪表盘扒光了AI的脑子

你有没有过这种经历&#xff1f; 你花了一下午&#xff0c;跟你的 AI 助手反复强调&#xff1a;“我不吃香菜&#xff0c;我喜欢用 Tab 缩进&#xff0c;我下周要去上海出差”&#xff0c;结果过了三天&#xff0c;它给你推荐带香菜的菜&#xff0c;问你要不要用空格缩进&#…

作者头像 李华
网站建设 2026/4/16 19:32:59

生成式AI可观测性缺口正在吞噬ROI!2024最新Gartner评估显示:83%企业缺失Prompt-Level链路追踪能力

第一章&#xff1a;生成式AI可观测性危机的根源与影响 2026奇点智能技术大会(https://ml-summit.org) 当大语言模型在生产环境中持续输出不可追溯、不可归因、不可复现的结果时&#xff0c;可观测性不再仅是运维辅助能力&#xff0c;而成为系统可信性的底线防线。生成式AI的黑…

作者头像 李华
网站建设 2026/4/16 19:31:53

BetterGI深度解析:如何实现厘米级精度的原神智能导航与自动采集系统

BetterGI深度解析&#xff1a;如何实现厘米级精度的原神智能导航与自动采集系统 【免费下载链接】better-genshin-impact &#x1f4e6;BetterGI 更好的原神 - 自动拾取 | 自动剧情 | 全自动钓鱼(AI) | 全自动七圣召唤 | 自动伐木 | 自动刷本 | 自动采集/挖矿/锄地 | 一条龙 |…

作者头像 李华