news 2026/5/11 9:21:39

AI应用监控与可观测性2026:追踪每一次LLM推理的完整工程方案

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
AI应用监控与可观测性2026:追踪每一次LLM推理的完整工程方案

你的LLM应用跑在生产环境里,但你知道它到底在做什么吗?本文系统讲解AI应用的可观测性工程,从基础日志到全链路追踪,帮你建立一套真正有用的监控体系。

一、为什么AI应用的可观测性与传统应用不同传统应用的可观测性已经有成熟的方案:Prometheus采集指标、ELK收集日志、Jaeger做链路追踪。但AI应用面临独特的挑战:非确定性输出:同样的输入,不同温度下输出不同,如何判断"异常"?质量难以量化:HTTP 200不代表回答正确,如何衡量输出质量?Token成本追踪:每次调用都有成本,必须精确到调用级别。提示词版本管理:Prompt变更会影响所有输出,需要版本关联。幻觉检测:LLM可能自信地给出错误答案,传统错误率指标无法捕捉。## 二、核心可观测性维度### 2.1 四个黄金指标(DORA for AI)延迟(Latency):从请求到第一个token(TTFT)+ 总完成时间吞吐量(Throughput):每秒处理的请求数、每秒生成的token数错误率(Error Rate):API错误、解析错误、超时比例饱和度(Saturation):token limit使用率、并发请求积压加上AI特有的:质量得分(Quality Score):基于人工评估或自动评估的输出质量成本(Cost):每请求token消耗与美元成本幻觉率(Hallucination Rate):检测到的事实错误比例### 2.2 基础追踪数据结构pythonfrom dataclasses import dataclass, fieldfrom datetime import datetimefrom typing import Optional, List, Anyfrom enum import Enumimport uuidclass LLMProvider(str, Enum): OPENAI = "openai" ANTHROPIC = "anthropic" GEMINI = "gemini" VLLM = "vllm_local"@dataclassclass TokenUsage: prompt_tokens: int completion_tokens: int total_tokens: int cached_tokens: int = 0 reasoning_tokens: int = 0 # 推理模型的思考token @property def cost_usd(self) -> float: """估算成本(以gpt-4o为例)""" # 价格: $2.5/M input, $10/M output input_cost = (self.prompt_tokens - self.cached_tokens) * 2.5 / 1_000_000 cached_cost = self.cached_tokens * 1.25 / 1_000_000 # 缓存价格更低 output_cost = self.completion_tokens * 10 / 1_000_000 return input_cost + cached_cost + output_cost@dataclassclass LLMTrace: """单次LLM调用的完整追踪记录""" trace_id: str = field(default_factory=lambda: str(uuid.uuid4())) parent_trace_id: Optional[str] = None # 用于链路追踪 session_id: Optional[str] = None user_id: Optional[str] = None # 请求信息 provider: LLMProvider = LLMProvider.OPENAI model: str = "" prompt_version: Optional[str] = None messages: List[dict] = field(default_factory=list) parameters: dict = field(default_factory=dict) # 响应信息 output: Optional[str] = None finish_reason: Optional[str] = None # 时间指标 request_time: datetime = field(default_factory=datetime.now) first_token_time: Optional[datetime] = None end_time: Optional[datetime] = None # Token & 成本 token_usage: Optional[TokenUsage] = None # 质量指标 quality_score: Optional[float] = None feedback: Optional[str] = None # thumbs_up/thumbs_down/null # 错误信息 error: Optional[str] = None error_type: Optional[str] = None # 自定义标签 tags: dict = field(default_factory=dict) @property def ttft_ms(self) -> Optional[float]: """Time to First Token(毫秒)""" if self.first_token_time and self.request_time: return (self.first_token_time - self.request_time).total_seconds() * 1000 return None @property def total_latency_ms(self) -> Optional[float]: if self.end_time and self.request_time: return (self.end_time - self.request_time).total_seconds() * 1000 return None @property def tokens_per_second(self) -> Optional[float]: if self.token_usage and self.total_latency_ms: return self.token_usage.completion_tokens / (self.total_latency_ms / 1000) return None## 三、全链路追踪实现### 3.1 基于OpenTelemetry的追踪pythonfrom opentelemetry import tracefrom opentelemetry.sdk.trace import TracerProviderfrom opentelemetry.sdk.trace.export import BatchSpanProcessorfrom opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporterfrom opentelemetry.trace.status import Status, StatusCodeimport functools# 初始化OpenTelemetrydef setup_tracing(service_name: str, otlp_endpoint: str = "http://localhost:4317"): provider = TracerProvider() exporter = OTLPSpanExporter(endpoint=otlp_endpoint) processor = BatchSpanProcessor(exporter) provider.add_span_processor(processor) trace.set_tracer_provider(provider) return trace.get_tracer(service_name)tracer = setup_tracing("ai-service")def trace_llm_call(operation_name: str = None): """LLM调用追踪装饰器""" def decorator(func): @functools.wraps(func) async def wrapper(*args, **kwargs): span_name = operation_name or f"llm.{func.__name__}" with tracer.start_as_current_span(span_name) as span: # 记录请求参数 if 'model' in kwargs: span.set_attribute("llm.model", kwargs['model']) if 'messages' in kwargs: span.set_attribute("llm.message_count", len(kwargs['messages'])) # 只记录最后一条用户消息(避免PII) last_user = next( (m for m in reversed(kwargs['messages']) if m['role'] == 'user'), None ) if last_user: span.set_attribute("llm.last_user_message", last_user['content'][:200]) try: result = await func(*args, **kwargs) # 记录响应指标 if hasattr(result, 'usage'): span.set_attribute("llm.prompt_tokens", result.usage.prompt_tokens) span.set_attribute("llm.completion_tokens", result.usage.completion_tokens) span.set_attribute("llm.total_tokens", result.usage.total_tokens) span.set_status(Status(StatusCode.OK)) return result except Exception as e: span.set_status(Status(StatusCode.ERROR, str(e))) span.record_exception(e) raise return wrapper return decorator# 使用示例@trace_llm_call("rag.retrieval_and_generation")async def rag_query(question: str, context_docs: list) -> str: # RAG流程:每个子步骤自动有父span关联 with tracer.start_as_current_span("embedding.encode") as embed_span: embeddings = await encode_query(question) embed_span.set_attribute("embedding.dimension", len(embeddings)) with tracer.start_as_current_span("vector_search") as search_span: docs = await vector_search(embeddings, top_k=5) search_span.set_attribute("search.result_count", len(docs)) with tracer.start_as_current_span("llm.generate") as gen_span: response = await generate_answer(question, docs) return response### 3.2 Langfuse集成(专为LLM设计的可观测性平台)pythonfrom langfuse import Langfusefrom langfuse.decorators import observe, langfuse_contextfrom openai import OpenAIlangfuse = Langfuse( public_key="pk-lf-xxx", secret_key="sk-lf-xxx", host="https://cloud.langfuse.com" # 或私有部署地址)openai_client = OpenAI()# 自动追踪所有LLM调用@observe()def chat_with_rag(user_question: str, session_id: str) -> str: # 更新当前观测的元数据 langfuse_context.update_current_observation( name="rag_chat", session_id=session_id, tags=["rag", "production"], metadata={"question_length": len(user_question)} ) # 检索相关文档 docs = retrieve_documents(user_question) langfuse_context.update_current_observation( metadata={"retrieved_docs": len(docs)} ) # LLM生成 response = openai_client.chat.completions.create( model="gpt-4o", messages=[ {"role": "system", "content": f"基于以下文档回答:\n{docs}"}, {"role": "user", "content": user_question} ] ) answer = response.choices[0].message.content # 记录评分(如果有的话) langfuse_context.score_current_observation( name="relevance", value=0.9, # 可以是自动评估分数 comment="Auto-evaluated by relevance model" ) return answer# 记录用户反馈def record_user_feedback(trace_id: str, is_positive: bool, comment: str = None): langfuse.score( trace_id=trace_id, name="user_feedback", value=1 if is_positive else 0, comment=comment, data_type="BOOLEAN" )## 四、自动化质量评估### 4.1 LLM-as-Judge评估pythonfrom openai import OpenAIfrom pydantic import BaseModelfrom typing import Listclient = OpenAI()class QualityEvaluation(BaseModel): overall_score: float # 0-10 accuracy: float # 事实准确性 relevance: float # 回答相关性 completeness: float # 完整性 clarity: float # 清晰度 issues: List[str] # 发现的问题 reasoning: str # 评估理由async def evaluate_response_quality( question: str, context: str, response: str, ground_truth: str = None) -> QualityEvaluation: """使用LLM自动评估响应质量""" eval_prompt = f"""评估以下AI回复的质量。用户问题:{question}参考上下文:{context[:2000]} AI回复:{response}{"参考答案:" + ground_truth if ground_truth else ""}请从以下维度评分(0-10):1. 准确性:回复中的信息是否正确,有无幻觉2. 相关性:是否真正回答了用户的问题3. 完整性:是否涵盖了所有重要方面4. 清晰度:表达是否清晰易懂给出综合评分和具体发现的问题。""" response_obj = client.beta.chat.completions.parse( model="gpt-4o", messages=[ {"role": "system", "content": "你是一个专业的AI回复质量评估专家。请客观、严格地评估。"}, {"role": "user", "content": eval_prompt} ], response_format=QualityEvaluation, ) return response_obj.choices[0].message.parsedclass ContinuousEvaluator: """持续评估流水线""" def __init__(self, sample_rate: float = 0.1): """sample_rate: 采样率,生产中不必评估每条""" self.sample_rate = sample_rate self.results = [] async def maybe_evaluate(self, trace: LLMTrace) -> Optional[QualityEvaluation]: import random if random.random() > self.sample_rate: return None if not trace.output: return None question = next( (m['content'] for m in reversed(trace.messages) if m['role'] == 'user'), "" ) context = " ".join( m['content'] for m in trace.messages if m['role'] == 'system' ) eval_result = await evaluate_response_quality( question=question, context=context, response=trace.output ) self.results.append(eval_result) return eval_result def get_stats(self) -> dict: if not self.results: return {} scores = [r.overall_score for r in self.results] return { "avg_score": sum(scores) / len(scores), "min_score": min(scores), "below_threshold": sum(1 for s in scores if s < 6) / len(scores), "evaluated_count": len(self.results) }## 五、成本监控与告警pythonfrom datetime import datetime, timedeltafrom collections import defaultdictimport asyncioclass CostTracker: """LLM成本追踪与告警""" # 各模型价格(美元/1M tokens) MODEL_PRICES = { "gpt-4o": {"input": 2.5, "output": 10.0}, "gpt-4o-mini": {"input": 0.15, "output": 0.6}, "o3": {"input": 15.0, "output": 60.0}, "o4-mini": {"input": 1.1, "output": 4.4}, "claude-3-5-sonnet": {"input": 3.0, "output": 15.0}, } def __init__(self, daily_budget_usd: float = 100.0): self.daily_budget = daily_budget_usd self.daily_cost: dict[str, float] = defaultdict(float) # date -> cost self.alerts_sent = set() def record_call(self, model: str, prompt_tokens: int, completion_tokens: int, date: str = None) -> float: if date is None: date = datetime.now().strftime("%Y-%m-%d") prices = self.MODEL_PRICES.get(model, {"input": 5.0, "output": 15.0}) cost = (prompt_tokens * prices["input"] + completion_tokens * prices["output"]) / 1_000_000 self.daily_cost[date] += cost # 检查是否需要告警 self._check_budget_alert(date) return cost def _check_budget_alert(self, date: str): usage_ratio = self.daily_cost[date] / self.daily_budget thresholds = [0.5, 0.8, 1.0] for threshold in thresholds: alert_key = f"{date}_{threshold}" if usage_ratio >= threshold and alert_key not in self.alerts_sent: self.alerts_sent.add(alert_key) self._send_alert( f"🚨 LLM成本告警:今日已使用预算的{threshold*100:.0f}%", f"今日成本: ${self.daily_cost[date]:.2f} / 预算: ${self.daily_budget:.2f}" ) def _send_alert(self, title: str, message: str): # 发送到Slack/钉钉/企业微信等 print(f"[ALERT] {title}: {message}") # webhook_notify(title, message) def get_report(self, days: int = 7) -> dict: today = datetime.now() report = {} for i in range(days): date = (today - timedelta(days=i)).strftime("%Y-%m-%d") report[date] = { "cost_usd": round(self.daily_cost.get(date, 0), 4), "budget_usage": self.daily_cost.get(date, 0) / self.daily_budget } return report## 六、Prometheus指标接入pythonfrom prometheus_client import Counter, Histogram, Gauge, start_http_serverimport time# 定义指标llm_request_total = Counter( 'llm_requests_total', 'Total LLM API calls', ['model', 'status'])llm_latency_seconds = Histogram( 'llm_latency_seconds', 'LLM response latency in seconds', ['model'], buckets=[0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 30.0, 60.0])llm_tokens_total = Counter( 'llm_tokens_total', 'Total tokens consumed', ['model', 'token_type'] # prompt/completion)llm_cost_usd_total = Counter( 'llm_cost_usd_total', 'Total LLM API cost in USD', ['model'])llm_quality_score = Histogram( 'llm_quality_score', 'LLM output quality score distribution', ['model'], buckets=[1, 2, 3, 4, 5, 6, 7, 8, 9, 10])def record_llm_metrics(trace: LLMTrace): """将追踪数据转换为Prometheus指标""" status = "error" if trace.error else "success" llm_request_total.labels(model=trace.model, status=status).inc() if trace.total_latency_ms: llm_latency_seconds.labels(model=trace.model).observe( trace.total_latency_ms / 1000 ) if trace.token_usage: llm_tokens_total.labels(model=trace.model, token_type="prompt").inc( trace.token_usage.prompt_tokens ) llm_tokens_total.labels(model=trace.model, token_type="completion").inc( trace.token_usage.completion_tokens ) llm_cost_usd_total.labels(model=trace.model).inc( trace.token_usage.cost_usd ) if trace.quality_score: llm_quality_score.labels(model=trace.model).observe(trace.quality_score)# 启动metrics端点(供Prometheus抓取)# start_http_server(9090)## 七、Dashboard设计建议实时仪表盘(Grafana):- 请求QPS、错误率、P50/P95延迟(过去1小时)- Token消耗速率(tokens/分钟)- 今日成本 vs 预算(进度条)- 模型分布饼图日报仪表盘:- 每日成本趋势(7天)- 质量分数分布- Top 10高延迟请求- 错误分类分布告警规则yaml# AlertManager规则示例groups: - name: llm_alerts rules: - alert: LLMHighErrorRate expr: rate(llm_requests_total{status="error"}[5m]) / rate(llm_requests_total[5m]) > 0.05 for: 2m annotations: summary: "LLM错误率超过5%" - alert: LLMHighLatency expr: histogram_quantile(0.95, llm_latency_seconds_bucket) > 10 for: 5m annotations: summary: "LLM P95延迟超过10秒"## 总结AI应用的可观测性不是可选项,而是生产级产品的必要条件。核心建议:1.从结构化日志开始:统一追踪数据结构,存储每次调用2.TTFT是最重要的用户体验指标:用户感受最直接3.成本追踪必须精确到调用级别:按feature分组才能做优化决策4.质量评估要自动化:10%采样 + LLM-as-Judge5.Langfuse是专用工具的首选:专为LLM设计,开箱即用6.Prometheus是基础设施标配:与现有监控体系打通从Day 1就建立可观测性,比事后补救的成本低100倍。

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/5/11 9:17:33

ESP32-CAM智能小车:从零搭建远程图传与控制系统

1. 硬件选型与搭建基础 搞过智能小车的朋友都知道&#xff0c;硬件选型是项目成功的第一步。我当初做ESP32-CAM小车时&#xff0c;在淘宝上逛了整整三天才凑齐所有配件。核心部件ESP32-CAM模块现在价格已经降到30元左右&#xff0c;这个巴掌大的板子集成了摄像头和WiFi功能&…

作者头像 李华
网站建设 2026/5/11 9:11:55

大气层系统:为任天堂Switch开启无限可能的自定义固件之旅

大气层系统&#xff1a;为任天堂Switch开启无限可能的自定义固件之旅 【免费下载链接】Atmosphere-stable 大气层整合包系统稳定版 项目地址: https://gitcode.com/gh_mirrors/at/Atmosphere-stable 你是一个文章写手&#xff0c;你负责为开源项目写专业易懂的文章。 想…

作者头像 李华
网站建设 2026/5/11 9:09:59

智能代码探索工具CodingAgentExplorer:基于LLM的代码理解与导航实践

1. 项目概述&#xff1a;一个为开发者赋能的智能代码探索工具最近在GitHub上看到一个挺有意思的项目&#xff0c;叫tndata/CodingAgentExplorer。光看名字&#xff0c;你可能会觉得这又是一个“AI写代码”的工具&#xff0c;但实际深入使用和研究后&#xff0c;我发现它的定位远…

作者头像 李华
网站建设 2026/5/11 9:08:45

Godot 4.x 游戏开发实战:从零到一掌握 GDScript 与核心机制

1. 项目概述与核心价值如果你正在寻找一个能让你从零开始&#xff0c;快速上手 Godot 引擎&#xff0c;并且能直接看到代码如何驱动游戏逻辑的实战项目&#xff0c;那么zfoo-project/godot-start这个开源仓库绝对值得你花时间深入研究。它不是一本枯燥的教科书&#xff0c;而是…

作者头像 李华