news 2025/12/27 9:04:00

【AI 面试题】大模型训练技术

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
【AI 面试题】大模型训练技术

【AI 面试题】大模型训练技术

文章目录

  • 【AI 面试题】大模型训练技术
        • 题目 10:QLoRA 原理与实现
        • 题目 11:大模型训练稳定性优化
        • 题目 12:持续预训练与增量微调
      • 第三部分:推理优化与部署
        • 题目 13:大模型推理加速(KV Cache/量化/算子融合)
        • 题目 14:模型量化技术(INT8/INT4/FP4)
        • 题目 15:大模型部署框架与实践
        • 题目 16:动态批处理与PagedAttention
        • 题目 17:多模态模型推理优化
        • 题目 18:边缘端大模型部署
      • 第四部分:Agent架构设计
        • 题目 19:Agent核心架构设计
题目 10:QLoRA 原理与实现

📋 面试题
请解释 QLoRA (Quantized LoRA) 的核心原理,相比 LoRA 有哪些优势?并实现核心代码。

✅ 标准答案

┌─────────────────────────────────────────────────────────────┐ │ QLoRA 核心原理 │ ├─────────────────────────────────────────────────────────────┤ │ 1. 核心思想:4-bit 量化预训练模型 + LoRA 低秩适配 │ │ 2. 关键技术: │ │ - NF4 量化:针对正态分布权重优化的 4-bit 量化格式 │ │ - Double Quantization:量化量化常数,进一步节省显存 │ │ - Paged Optimizers:分页优化器,减少峰值显存占用 │ │ 3. 显存对比: │ │ - 全量微调 (FP16):70B 模型 ≈ 140GB 显存 │ │ - LoRA (FP16):70B 模型 ≈ 70GB 显存 │ │ - QLoRA (4-bit):70B 模型 ≈ 12GB 显存 │ └─────────────────────────────────────────────────────────────┘

📖 详细解析

importtorchimporttorch.nnasnnimportbitsandbytesasbnb# QLoRA核心依赖importtransformersclassQLoRALayer(nn.Module):"""QLoRA 核心层实现"""def__init__(self,in_features:int,out_features:int,rank:int=8,alpha:float=16,quantized:bool=True,quant_type:str="nf4"# 可选: nf4/fp4/int4):super().__init__()self.rank=rank self.alpha=alpha self.scaling=alpha/rank# 1. 量化预训练权重(4-bit)ifquantized:# 使用 bitsandbytes 实现 4-bit 量化self.weight=bnb.nn.Params4bit(torch.empty(out_features,in_features,dtype=torch.float16),requires_grad=False,quant_type=quant_type,fp32_weight=False# 不保存FP32副本)else:self.weight=nn.Parameter(torch.empty(out_features,in_features))self.weight.requires_grad=False# 2. LoRA 低秩矩阵(FP16,保证微调精度)self.lora_A=nn.Parameter(torch.empty(rank,in_features,dtype=torch.float16))self.lora_B=nn.Parameter(torch.empty(out_features,rank,dtype=torch.float16))# 初始化nn.init.kaiming_uniform_(self.lora_A,a=math.sqrt(5))nn.init.zeros_(self.lora_B)defforward(self,x):# 量化权重前向(bitsandbytes 自动处理反量化)result=nn.functional.linear(x,self.weight)# LoRA 增量(FP16计算)lora_out=nn.functional.linear(nn.functional.linear(x,self.lora_A),self.lora_B)returnresult+self.scaling*lora_out# QLoRA 训练配置(实战示例)defsetup_qlora_training(model_name:str):"""配置QLoRA训练参数"""bnb_config=transformers.BitsAndBytesConfig(load_in_4bit=True,# 4-bit量化加载bnb_4bit_use_double_quant=True,# 双重量化bnb_4bit_quant_type="nf4",# NF4量化格式bnb_4bit_compute_dtype=torch.float16,# 计算用FP16)# 加载量化模型model=transformers.AutoModelForCausalLM.from_pretrained(model_name,quantization_config=bnb_config,device_map="auto",# 自动设备映射torch_dtype=torch.float16,)# 配置LoRApeft_config=transformers.LoraConfig(r=8,# 低秩维度lora_alpha=16,# 缩放系数target_modules=["q_proj","v_proj"],# 目标模块lora_dropout=0.05,bias="none",task_type="CAUSAL_LM",)model=transformers.get_peft_model(model,peft_config)model.print_trainable_parameters()# 打印可训练参数(通常<1%)returnmodel# 核心优势总结:# 1. 显存占用降低 5-10 倍,70B模型可在单张A100(40GB)训练# 2. 几乎无精度损失(NF4适配大模型权重分布)# 3. 训练速度与LoRA相当,无需额外硬件
题目 11:大模型训练稳定性优化

📋 面试题
大模型训练中常出现梯度爆炸、训练崩溃、收敛缓慢等问题,有哪些核心优化策略?

✅ 标准答案

┌─────────────────────────────────────────────────────────────┐ │ 大模型训练稳定性优化策略 │ ├─────────────────────────────────────────────────────────────┤ │ 1. 梯度优化: │ │ - 梯度裁剪(Gradient Clipping):限制梯度范数≤1.0 │ │ - 梯度检查点(Gradient Checkpointing):以时间换显存 │ │ 2. 学习率策略: │ │ - Cosine Annealing LR:余弦退火学习率 │ │ - Warmup:前1000步线性升温,避免初始大学习率震荡 │ │ 3. 数值稳定性: │ │ - 混合精度训练(FP16/FP8):减少显存+加速,保留FP32主权重 │ │ - 权重衰减(Weight Decay):L2正则,防止过拟合 │ │ 4. 优化器选择: │ │ - AdamW:修正Adam的权重衰减偏差 │ │ - Lion:无动量的符号优化器,显存更省 │ └─────────────────────────────────────────────────────────────┘

📖 详细解析

importtorchimporttorch.nnasnnimportmathfromtorch.optimimportAdamWfromtorch.optim.lr_schedulerimportCosineAnnealingWarmRestartsclassStableTrainer:"""大模型稳定训练器"""def__init__(self,model,lr=2e-5,max_steps=100000):self.model=model self.max_steps=max_steps# 1. 优化器配置(AdamW + 权重衰减)self.optimizer=AdamW(model.parameters(),lr=lr,weight_decay=0.01,# 权重衰减betas=(0.9,0.95),# 适配大模型的beta值eps=1e-8,)# 2. 学习率调度(Warmup + Cosine Annealing)self.lr_scheduler=self._get_lr_scheduler()# 3. 混合精度训练self.scaler=torch.cuda.amp.GradScaler()# 4. 梯度裁剪阈值self.clip_norm=1.0def_get_lr_scheduler(self):"""构建学习率调度器"""# 第一步:Warmup调度器warmup_steps=int(0.05*self.max_steps)# 前5%步warmupdeflr_lambda(step):ifstep<warmup_steps:returnstep/warmup_steps# 线性升温else:# 余弦退火progress=(step-warmup_steps)/(self.max_steps-warmup_steps)return0.5*(1+math.cos(math.pi*progress))returntorch.optim.lr_scheduler.LambdaLR(self.optimizer,lr_lambda)deftrain_step(self,batch):"""稳定的训练步骤"""input_ids,labels=batch["input_ids"],batch["labels"]# 混合精度前向withtorch.cuda.amp.autocast(dtype=torch.float16):outputs=self.model(input_ids=input_ids,labels=labels)loss=outputs.loss# 梯度缩放反向self.scaler.scale(loss).backward()# 梯度裁剪self.scaler.unscale_(self.optimizer)torch.nn.utils.clip_grad_norm_(self.model.parameters(),max_norm=self.clip_norm)# 优化器步进self.scaler.step(self.optimizer)self.scaler.update()self.optimizer.zero_grad()# 更新学习率self.lr_scheduler.step()return{"loss":loss.item(),"lr":self.lr_scheduler.get_last_lr()[0],"scale":self.scaler.get_scale()}# 额外稳定性技巧defenable_gradient_checkpointing(model):"""启用梯度检查点,节省50%显存"""model.gradient_checkpointing_enable(gradient_checkpointing_kwargs={"use_reentrant":False})deffix_layer_norm(model):"""修复LayerNorm数值稳定性"""formoduleinmodel.modules():ifisinstance(module,nn.LayerNorm):module.float()# LayerNorm保持FP32
题目 12:持续预训练与增量微调

📋 面试题
什么是大模型的持续预训练和增量微调?如何缓解灾难性遗忘问题?

✅ 标准答案

┌─────────────────────────────────────────────────────────────┐ │ 持续预训练 vs 增量微调 │ ├───────────────┬──────────────────┬──────────────────────────┤ │ 维度 │ 持续预训练 │ 增量微调 │ ├───────────────┼──────────────────┼──────────────────────────┤ │ 目标 │ 扩展通用能力 │ 适配特定领域/任务 │ │ 数据 │ 通用语料 │ 领域专属数据 │ │ 学习率 │ 极低(1e-6~1e-5)│ 低(1e-5~2e-5) │ │ 训练策略 │ 全量参数更新 │ 部分参数更新(LoRA) │ ├───────────────┴──────────────────┴──────────────────────────┤ │ 灾难性遗忘缓解策略: │ │ 1. 弹性权重整合(EWC):对核心参数添加正则保护 │ │ 2. 增量参数(LoRA):新增参数适配新任务,不修改原参数 │ │ 3. 混合数据:微调时混入10%通用数据,保留基础能力 │ │ 4. 模型蒸馏:将旧能力蒸馏到新模型 │ └─────────────────────────────────────────────────────────────┘

📖 详细解析

importtorchimporttorch.nnasnnimporttorch.nn.functionalasF# 1. EWC 实现(缓解灾难性遗忘)classEWCTrainer:"""弹性权重整合训练器"""def__init__(self,model,ewc_lambda=100):self.model=model self.ewc_lambda=ewc_lambda self.fisher_matrix=None# 费雪矩阵,衡量参数重要性self.params_init={}# 初始参数值defcompute_fisher_matrix(self,dataloader):"""计算费雪矩阵(参数重要性)"""fisher={n:torch.zeros_like(p)forn,pinself.model.named_parameters()}self.model.eval()forbatchindataloader:self.model.zero_grad()outputs=self.model(**batch)loss=outputs.loss loss.backward()# 累积梯度平方(费雪矩阵近似)forn,pinself.model.named_parameters():ifp.gradisnotNone:fisher[n]+=(p.grad**2)/len(dataloader)self.fisher_matrix=fisher# 保存初始参数self.params_init={n:p.detach().clone()forn,pinself.model.named_parameters()}defewc_loss(self):"""计算EWC正则损失"""loss=0.0forn,pinself.model.named_parameters():ifninself.fisher_matrix:loss+=(self.fisher_matrix[n]*(p-self.params_init[n])**2).sum()returnself.ewc_lambda*lossdeftrain_step(self,batch,use_ewc=True):"""带EWC的训练步骤"""outputs=self.model(**batch)task_loss=outputs.lossifuse_ewcandself.fisher_matrixisnotNone:ewc_loss=self.ewc_loss()total_loss=task_loss+ewc_losselse:total_loss=task_loss total_loss.backward()self.optimizer.step()self.optimizer.zero_grad()return{"task_loss":task_loss.item(),"ewc_loss":ewc_loss.item()ifuse_ewcelse0,"total_loss":total_loss.item()}# 2. 持续预训练配置defsetup_continual_pretraining(model,dataloader_general,dataloader_domain):""" 持续预训练策略: - 通用数据 + 领域数据混合 - 低学习率 + 慢衰减 """# 混合数据加载器defmixed_dataloader():gen_iter=iter(dataloader_general)dom_iter=iter(dataloader_domain)whileTrue:try:gen_batch=next(gen_iter)dom_batch=next(dom_iter)# 80%领域数据 + 20%通用数据iftorch.rand(1)<0.8:yielddom_batchelse:yieldgen_batchexceptStopIteration:break# 极低学习率optimizer=AdamW(model.parameters(),lr=1e-6,weight_decay=0.01)returnmixed_dataloader(),optimizer

第三部分:推理优化与部署

题目 13:大模型推理加速(KV Cache/量化/算子融合)

📋 面试题
请详细解释大模型推理加速的核心技术(KV Cache、算子融合、量化),并实现KV Cache核心逻辑。

✅ 标准答案

┌─────────────────────────────────────────────────────────────┐ │ 大模型推理加速技术 │ ├─────────────────────────────────────────────────────────────┤ │ 1. KV Cache(键值缓存): │ │ - 原理:缓存已计算的 K/V 向量,避免重复计算 │ │ - 加速比:生成N个token,复杂度从O(N²)→O(N) │ │ - 显存开销:额外存储N×d_model的缓存 │ │ 2. 算子融合(Operator Fusion): │ │ - 原理:将多个小算子合并为单个核函数,减少GPU调度开销 │ │ - 示例:Attention(Q,K,V) → 单算子实现 │ │ 3. 量化推理: │ │ - 原理:FP16→INT8/INT4,减少计算量和显存 │ │ - 精度损失:通过校准集优化,精度损失<1% │ └─────────────────────────────────────────────────────────────┘

📖 详细解析

importtorchimporttorch.nnasnnimportmath# KV Cache 核心实现classKVCacheAttention(nn.Module):"""带KV Cache的Attention层"""def__init__(self,d_model,n_heads):super().__init__()self.d_model=d_model self.n_heads=n_heads self.d_k=d_model//n_heads self.q_proj=nn.Linear(d_model,d_model)self.k_proj=nn.Linear(d_model,d_model)self.v_proj=nn.Linear(d_model,d_model)self.o_proj=nn.Linear(d_model,d_model)# KV Cache 存储self.cache_k=Noneself.cache_v=Nonedefreset_cache(self):"""重置KV Cache"""self.cache_k=Noneself.cache_v=Nonedefforward(self,x,use_cache=True):batch_size,seq_len,_=x.shape# 投影Q/K/Vq=self.q_proj(x).view(batch_size,seq_len,self.n_heads,self.d_k).transpose(1,2)k=self.k_proj(x).view(batch_size,seq_len,self.n_heads,self.d_k).transpose(1,2)v=self.v_proj(x).view(batch_size,seq_len,self.n_heads,self.d_k).transpose(1,2)# 使用KV Cacheifuse_cache:ifself.cache_kisnotNone:# 拼接缓存的K/V([batch, heads, cache_len, d_k])k=torch.cat([self.cache_k,k],dim=2)v=torch.cat([self.cache_v,v],dim=2)# 更新缓存(仅缓存最新的K/V)self.cache_k=k self.cache_v=v# 注意力计算scores=torch.matmul(q,k.transpose(-2,-1))/math.sqrt(self.d_k)attn=F.softmax(scores,dim=-1)output=torch.matmul(attn,v)# 拼接输出output=output.transpose(1,2).contiguous().view(batch_size,seq_len,self.d_model)returnself.o_proj(output)# 推理加速测试deftest_kv_cache_speed(model,prompt,max_new_tokens=100):"""测试KV Cache加速效果"""model.reset_cache()tokens=tokenizer.encode(prompt,return_tensors="pt").to("cuda")# 预热withtorch.no_grad():# 第一遍:计算prompt的K/V(无缓存)start_time=torch.cuda.Event(enable_timing=True)end_time=torch.cuda.Event(enable_timing=True)start_time.record()output=model(tokens,use_cache=True)end_time.record()torch.cuda.synchronize()prompt_time=start_time.elapsed_time(end_time)# 生成新token(使用缓存)gen_times=[]for_inrange(max_new_tokens):start_time.record()# 取最后一个token的输出next_token=torch.argmax(output[:,-1,:],dim=-1,keepdim=True)output=model(next_token,use_cache=True)end_time.record()torch.cuda.synchronize()gen_times.append(start_time.elapsed_time(end_time))print(f"Prompt处理时间:{prompt_time:.2f}ms")print(f"平均token生成时间:{sum(gen_times)/len(gen_times):.2f}ms")print(f"总加速比:{prompt_time/(sum(gen_times)/len(gen_times)):.2f}x")# 算子融合示例(使用TorchScript)@torch.jit.scriptdeffused_attention(q,k,v,mask=None):"""融合的注意力计算算子"""scores=torch.matmul(q,k.transpose(-2,-1))/math.sqrt(q.size(-1))ifmaskisnotNone:scores=scores+mask attn=torch.softmax(scores,dim=-1)output=torch.matmul(attn,v)returnoutput# 算子融合优势:减少GPU kernel启动次数,加速~20-30%
题目 14:模型量化技术(INT8/INT4/FP4)

📋 面试题
请解释大模型量化的核心原理,不同量化精度(INT8/INT4/FP4)的适用场景,并实现INT8量化推理。

✅ 标准答案

┌─────────────────────────────────────────────────────────────┐ │ 模型量化技术对比 │ ├───────────┬──────────┬──────────┬──────────┬────────────────┤ │ 量化精度 │ 显存节省 │ 速度提升 │ 精度损失 │ 适用场景 │ ├───────────┼──────────┼──────────┼──────────┼────────────────┤ │ FP16 │ 0% │ 0% │ 0% │ 高精度推理 │ │ INT8 │ 50% │ 2-3x │ <1% │ 通用推理 │ │ INT4 │ 75% │ 3-4x │ 1-3% │ 低精度场景 │ │ FP4 │ 75% │ 3-4x │ 2-4% │ 移动端/边缘端 │ ├───────────┴──────────┴──────────┴──────────┴────────────────┤ │ 量化核心原理: │ │ 1. 对称量化:x_quant = round(x / scale),scale=max(|x|)/127 │ │ 2. 非对称量化:x_quant = round((x - zero_point)/scale) │ │ 3. 量化感知训练(QAT):训练时模拟量化误差,提升精度 │ └─────────────────────────────────────────────────────────────┘

📖 详细解析

importtorchimporttorch.nnasnnimporttorch.nn.functionalasFclassInt8Linear(nn.Module):"""INT8量化线性层"""def__init__(self,in_features,out_features):super().__init__()# 量化参数self.scale=Noneself.zero_point=None# 量化权重(INT8)self.weight_int8=nn.Parameter(torch.empty(out_features,in_features,dtype=torch.int8),requires_grad=False)# 原始权重(FP16,用于校准)self.weight_fp16=nn.Parameter(torch.empty(out_features,in_features,dtype=torch.float16),requires_grad=False)defcalibrate(self,calib_data):"""校准量化参数(使用校准集)"""# 1. 计算权重的量化参数(对称量化)weight_max=torch.max(torch.abs(self.weight_fp16))self.scale=weight_max/127.0self.zero_point=0# 2. 量化权重self.weight_int8.data=torch.clamp(torch.round(self.weight_fp16/self.scale),-128,127).to(torch.int8)# 3. 计算输入的量化参数(动态量化)input_max=torch.max(torch.abs(calib_data))self.input_scale=input_max/127.0defforward(self,x):"""INT8推理"""# 1. 输入量化(FP16→INT8)x_int8=torch.clamp(torch.round(x/self.input_scale),-128,127).to(torch.int8)# 2. INT8矩阵乘法(使用torch.ops.aten)output_int8=torch.ops.aten.matmul_int8(x_int8,self.weight_int8.t(),self.input_scale,self.scale,self.zero_point,self.zero_point)# 3. 反量化(INT32→FP16)output_fp16=output_int8.to(torch.float16)returnoutput_fp16# 量化模型封装classQuantizedModel(nn.Module):"""INT8量化模型"""def__init__(self,original_model):super().__init__()self.original_model=original_model self.quantized_layers=nn.ModuleDict()# 替换所有线性层为INT8线性层forname,moduleinoriginal_model.named_modules():ifisinstance(module,nn.Linear):quant_layer=Int8Linear(module.in_features,module.out_features)quant_layer.weight_fp16.data=module.weight.data.to(torch.float16)self.quantized_layers[name]=quant_layerdefcalibrate(self,calib_dataloader):"""校准整个模型"""calib_data=[]forbatchincalib_dataloader:calib_data.append(batch["input_ids"])iflen(calib_data)>=100:# 取100个校准样本breakcalib_tensor=torch.cat(calib_data,dim=0)forname,layerinself.quantized_layers.items():layer.calibrate(calib_tensor)defforward(self,x):"""量化推理"""# 替换原始模型的线性层输出defreplace_linear(module,input,output):forname,quant_layerinself.quantized_layers.items():ifmodule==self.original_model.get_submodule(name):returnquant_layer(input[0])# 注册前向钩子hooks=[]fornameinself.quantized_layers.keys():module=self.original_model.get_submodule(name)hook=module.register_forward_hook(replace_linear)hooks.append(hook)# 前向推理output=self.original_model(x)# 移除钩子forhookinhooks:hook.remove()returnoutput# HuggingFace 量化推理示例defload_quantized_model(model_name):"""加载INT8量化模型(使用transformers)"""fromtransformersimportAutoModelForCausalLM,AutoTokenizer model=AutoModelForCausalLM.from_pretrained(model_name,load_in_8bit=True,# 加载INT8量化模型device_map="auto",)tokenizer=AutoTokenizer.from_pretrained(model_name)# 推理inputs=tokenizer("Hello, my name is",return_tensors="pt").to("cuda")outputs=model.generate(**inputs,max_new_tokens=10)print(tokenizer.decode(outputs[0]))returnmodel,tokenizer
题目 15:大模型部署框架与实践

📋 面试题
常用的大模型部署框架有哪些(vLLM/TensorRT/ONNX Runtime)?各自的优势和适用场景是什么?

✅ 标准答案

┌─────────────────────────────────────────────────────────────┐ │ 大模型部署框架对比 │ ├─────────────┬──────────┬──────────┬──────────┬──────────────┤ │ 框架 │ 核心优势 │ 速度提升 │ 显存效率 │ 适用场景 │ ├─────────────┼──────────┼──────────┼──────────┼──────────────┤ │ vLLM │ PagedAttention │ 4-8x │ 高(分页KV) │ 高吞吐推理 │ │ TensorRT │ 算子优化/编译 │ 2-3x │ 中 │ 低延迟推理 │ │ ONNX Runtime │ 跨平台 │ 1.5-2x │ 中 │ 多框架兼容 │ │ TGI │ 易用性/开源 │ 2-3x │ 中 │ 快速部署 │ └─────────────────────────────────────────────────────────────┘

📖 详细解析

# 1. vLLM 部署示例(高吞吐)defdeploy_with_vllm(model_name:str,port:int=8000):"""使用vLLM部署大模型"""fromvllmimportLLM,SamplingParamsfromvllm.entrypoints.openaiimportapi_server# 初始化vLLMllm=LLM(model=model_name,tensor_parallel_size=2,# 张量并行gpu_memory_utilization=0.9,# 显存利用率max_num_batched_tokens=8192,# 最大批处理token数)# 采样参数sampling_params=SamplingParams(temperature=0.7,top_p=0.9,max_tokens=1024,)# 启动OpenAI兼容API服务api_server.serve(llm=llm,model_name=model_name,host="0.0.0.0",port=port,)# 2. TensorRT-LLM 部署示例(低延迟)defdeploy_with_tensorrt_llm(model_name:str):"""使用TensorRT-LLM编译并部署模型"""importtensorrt_llmfromtensorrt_llm.runtimeimportGenerationSession# 1. 转换模型为TensorRT格式tensorrt_llm.logger.set_level('error')build_dir=f"./{model_name}_trt"# 编译模型cmd=f""" python -m tensorrt_llm.builder \ --model_dir{model_name}\ --dtype float16 \ --enable_context_fmha \ --output_dir{build_dir}"""importsubprocess subprocess.run(cmd,shell=True)# 2. 加载编译后的模型session=GenerationSession(build_dir,"cuda:0")# 3. 推理input_text="What is AI?"input_ids=tokenizer.encode(input_text,return_tensors="pt").to("cuda")output_ids=session.generate(input_ids,max_new_tokens=100,temperature=0.7,)output_text=tokenizer.decode(output_ids[0])print(output_text)# 3. ONNX Runtime 部署示例(跨平台)defexport_and_deploy_onnx(model_name:str):"""导出为ONNX并部署"""fromtransformersimportAutoModelForCausalLM,AutoTokenizerimportonnxruntimeasort# 1. 加载模型并导出ONNXmodel=AutoModelForCausalLM.from_pretrained(model_name)tokenizer=AutoTokenizer.from_pretrained(model_name)# 导出ONNXdummy_input=tokenizer("Hello",return_tensors="pt")onnx_path=f"{model_name}.onnx"torch.onnx.export(model,(dummy_input["input_ids"],dummy_input["attention_mask"]),onnx_path,input_names=["input_ids","attention_mask"],output_names=["logits"],dynamic_axes={"input_ids":{0:"batch_size",1:"seq_len"},"attention_mask":{0:"batch_size",1:"seq_len"},},opset_version=17,)# 2. ONNX Runtime推理ort_session=ort.InferenceSession(onnx_path,providers=["CUDAExecutionProvider","CPUExecutionProvider"])# 推理inputs=tokenizer("What is machine learning?",return_tensors="np")outputs=ort_session.run(None,{"input_ids":inputs["input_ids"],"attention_mask":inputs["attention_mask"],},)returnoutputs# 部署性能优化建议DEPLOYMENT_TIPS={"高吞吐场景":"使用vLLM + 动态批处理 + 分页KV Cache","低延迟场景":"使用TensorRT-LLM + INT8量化 + 算子融合","跨平台场景":"使用ONNX Runtime + FP16精度","边缘端场景":"使用TinyChat + 模型裁剪 + INT4量化",}
题目 16:动态批处理与PagedAttention

📋 面试题
请解释 vLLM 的核心技术 PagedAttention(分页注意力)和动态批处理的原理,以及如何提升推理吞吐。

✅ 标准答案

┌─────────────────────────────────────────────────────────────┐ │ PagedAttention 原理 │ ├─────────────────────────────────────────────────────────────┤ │ 1. 传统KV Cache问题: │ │ - 每个请求独占连续的KV Cache空间,显存碎片严重 │ │ - 长请求占用大量显存,短请求浪费空间 │ │ 2. PagedAttention解决方案: │ │ - 将KV Cache划分为固定大小的Page(如16/32 tokens) │ │ - 每个请求的KV Cache分散存储在多个Page中 │ │ - 页表记录Page位置,支持非连续存储 │ │ 3. 动态批处理优势: │ │ - 按token生成阶段批处理,而非按请求 │ │ - 高吞吐:批处理大小提升3-5倍 │ │ - 低延迟:减少请求等待时间 │ └─────────────────────────────────────────────────────────────┘

📖 详细解析

importtorchimporttorch.nnasnnimportmathfromcollectionsimportdefaultdict# PagedAttention 核心模拟实现classPagedKVCache:"""分页KV Cache实现"""def__init__(self,page_size=16,max_pages=1024):self.page_size=page_size# 每页token数self.max_pages=max_pages# 最大页数# K/V Page存储([page_id, head, page_size, d_k])self.k_pages=defaultdict(lambda:torch.zeros(max_pages,n_heads,page_size,d_k,dtype=torch.float16,device="cuda"))self.v_pages=defaultdict(lambda:torch.zeros(max_pages,n_heads,page_size,d_k,dtype=torch.float16,device="cuda"))# 页表:req_id → list of page_idsself.page_table=defaultdict(list)# 空闲页队列self.free_pages=list(range(max_pages))defallocate_pages(self,req_id,num_tokens):"""为请求分配Page"""num_pages=math.ceil(num_tokens/self.page_size)# 分配空闲页allocated_pages=[]for_inrange(num_pages):ifself.free_pages:allocated_pages.append(self.free_pages.pop(0))else:raiseRuntimeError("Out of cache memory")self.page_table[req_id]=allocated_pagesreturnallocated_pagesdefstore(self,req_id,positions,k,v):"""存储K/V到Page"""pages=self.page_table[req_id]batch_size,n_heads,seq_len,d_k=k.shapeforposinrange(seq_len):# 计算页号和页内偏移page_idx=pos//self.page_size offset=pos%self.page_size page_id=pages[page_idx]# 存储到对应Pageself.k_pages[page_id][:,:,offset,:]=k[:,:,pos,:]self.v_pages[page_id][:,:,offset,:]=v[:,:,pos,:]defretrieve(self,req_id,start_pos=0,end_pos=None):"""从Page中检索K/V"""pages=self.page_table[req_id]ifend_posisNone:end_pos=len(pages)*self.page_size# 收集所有相关Page的K/Vk_list=[]v_list=[]forposinrange(start_pos,end_pos):page_idx=pos//self.page_size offset=pos%self.page_size page_id=pages[page_idx]k_list.append(self.k_pages[page_id][:,:,offset,:])v_list.append(self.v_pages[page_id][:,:,offset,:])# 拼接成连续张量k=torch.stack(k_list,dim=2)v=torch.stack(v_list,dim=2)returnk,vdeffree_pages(self,req_id):"""释放请求的Page"""pages=self.page_table.pop(req_id,[])self.free_pages.extend(pages)# 动态批处理调度器classDynamicBatcher:"""动态批处理调度器"""def__init__(self,max_batch_size=64,max_tokens=8192):self.max_batch_size=max_batch_size self.max_tokens=max_tokens self.pending_requests=[]# 待处理请求self.running_batches=[]# 运行中的批处理defadd_request(self,req_id,input_ids,max_new_tokens):"""添加新请求"""self.pending_requests.append({"req_id":req_id,"input_ids":input_ids,"max_new_tokens":max_new_tokens,"generated_tokens":0,"state":"pending"})defschedule_batch(self):"""调度批处理"""# 选择可批处理的请求(按生成阶段分组)batch=[]total_tokens=0# 优先选择生成阶段相同的请求forreqinself.pending_requests:iflen(batch)>=self.max_batch_size:breakreq_tokens=len(req["input_ids"])+req["generated_tokens"]iftotal_tokens+req_tokens<=self.max_tokens:batch.append(req)total_tokens+=req_tokens req["state"]="running"# 从待处理队列移除self.pending_requests=[rforrinself.pending_requestsifr["state"]!="running"]self.running_batches.append(batch)returnbatchdefupdate_batch(self,batch,generated_ids):"""更新批处理状态"""completed_reqs=[]fori,reqinenumerate(batch):req["generated_tokens"]+=1req["input_ids"]=torch.cat([req["input_ids"],generated_ids[i:i+1]],dim=1)# 检查是否完成ifreq["generated_tokens"]>=req["max_new_tokens"]:completed_reqs.append(req)# 移除完成的请求forreqincompleted_reqs:batch.remove(req)req["state"]="completed"# 将未完成的请求放回待处理队列self.pending_requests.extend(batch)self.running_batches.remove(batch)returncompleted_reqs# vLLM 核心优势总结:# 1. 显存利用率提升 2-3 倍(减少碎片)# 2. 吞吐提升 4-8 倍(动态批处理 + 分页Cache)# 3. 支持更长序列(百万级tokens)# 4. 兼容OpenAI API,部署成本低
题目 17:多模态模型推理优化

📋 面试题
多模态模型(如GPT-4V/LLaVA)的推理优化有哪些关键策略?如何解决视觉-文本特征对齐问题?

✅ 标准答案

┌─────────────────────────────────────────────────────────────┐ │ 多模态模型推理优化 │ ├─────────────────────────────────────────────────────────────┤ │ 1. 视觉分支优化: │ │ - 图像特征提取离线化:预提取图像特征,推理时直接加载 │ │ - 视觉塔量化:INT8量化视觉编码器,精度损失<1% │ │ - 特征降维:将视觉特征从2048→768维,减少计算量 │ │ 2. 特征对齐优化: │ │ - 跨模态注意力优化:只计算文本-图像关键区域的注意力 │ │ - 特征融合轻量化:使用Adapter代替全连接层 │ │ 3. 推理流程优化: │ │ - 异步推理:图像编码与文本生成并行 │ │ - 缓存复用:相同图像复用视觉特征 │ └─────────────────────────────────────────────────────────────┘

📖 详细解析

importtorchimporttorch.nnasnnfromPILimportImagefromtransformersimportAutoProcessor,AutoModelForCausalLM# 1. 视觉特征离线提取与缓存classImageFeatureCache:"""图像特征缓存"""def__init__(self,vision_model,cache_dir="./image_cache"):self.vision_model=vision_model self.cache_dir=cache_dir self.cache={}# img_hash → featureos.makedirs(cache_dir,exist_ok=True)defget_image_hash(self,image):"""计算图像哈希"""importhashlib img_bytes=image.tobytes()returnhashlib.md5(img_bytes).hexdigest()defextract_feature(self,image):"""提取并缓存图像特征"""img_hash=self.get_image_hash(image)# 检查缓存ifimg_hashinself.cache:returnself.cache[img_hash]# 离线提取特征pixel_values=AutoProcessor.from_pretrained("llava-hf/llava-1.5-7b-hf")(images=image,return_tensors="pt").pixel_values.to("cuda")withtorch.no_grad():feature=self.vision_model(pixel_values).last_hidden_state# 缓存特征self.cache[img_hash]=feature torch.save(feature,f"{self.cache_dir}/{img_hash}.pt")returnfeature# 2. 跨模态注意力优化classEfficientCrossAttention(nn.Module):"""高效跨模态注意力"""def__init__(self,d_model,n_heads,img_patch_num=256):super().__init__()self.d_model=d_model self.n_heads=n_heads self.img_patch_num=img_patch_num self.top_k=32# 只关注前32个图像patchdefforward(self,text_feat,img_feat):""" text_feat: [batch, text_len, d_model] img_feat: [batch, img_patch_num, d_model] """# 1. 计算文本-图像相似度similarity=torch.matmul(text_feat,img_feat.transpose(-2,-1))# [batch, text_len, img_patch_num]# 2. 选择Top-K图像patchtop_k_idx=torch.topk(similarity,self.top_k,dim=-1).indices# [batch, text_len, top_k]# 3. 收集Top-K图像特征batch_size,text_len,_=text_feat.shape img_feat_topk=torch.zeros(batch_size,text_len,self.top_k,self.d_model,device=img_feat.device)forbinrange(batch_size):fortinrange(text_len):img_feat_topk[b,t]=img_feat[b,top_k_idx[b,t]]# 4. 计算跨模态注意力(仅Top-K)q=text_feat.view(batch_size,text_len,self.n_heads,-1).transpose(1,2)k=img_feat_topk.view(batch_size,text_len,self.top_k,self.n_heads,-1).transpose(2,3)v=img_feat_topk.view(batch_size,text_len,self.top_k,self.n_heads,-1).transpose(2,3)scores=torch.matmul(q,k.transpose(-2,-1))/math.sqrt(self.d_model//self.n_heads)attn=F.softmax(scores,dim=-1)output=torch.matmul(attn,v)returnoutput.transpose(1,2).contiguous().view(batch_size,text_len,self.d_model)# 3. 多模态推理优化示例classOptimizedMultimodalInference:"""优化的多模态推理"""def__init__(self,model_name="llava-hf/llava-1.5-7b-hf"):# 加载模型(量化)self.model=AutoModelForCausalLM.from_pretrained(model_name,load_in_8bit=True,device_map="auto",)self.processor=AutoProcessor.from_pretrained(model_name)# 初始化特征缓存self.feature_cache=ImageFeatureCache(self.model.vision_tower)# 替换原始跨模态注意力为高效版本forlayerinself.model.language_model.model.layers:ifhasattr(layer,"cross_attn"):layer.cross_attn=EfficientCrossAttention(d_model=layer.cross_attn.embed_dim,n_heads=layer.cross_attn.num_heads)@torch.no_grad()definfer(self,image:Image.Image,prompt:str):"""多模态推理"""# 1. 异步提取图像特征(模拟)img_feat=self.feature_cache.extract_feature(image)# 2. 构建输入inputs=self.processor(text=prompt,images=None,# 不重复提取图像特征return_tensors="pt").to("cuda")# 注入预提取的图像特征inputs["image_features"]=img_feat# 3. 推理(使用KV Cache)outputs=self.model.generate(**inputs,max_new_tokens=100,use_cache=True,do_sample=True,temperature=0.7,)returnself.processor.decode(outputs[0],skip_special_tokens=True)# 多模态推理性能优化对比PERFORMANCE_COMPARE={"原始推理":{"图像编码时间":"200ms","文本生成时间":"50ms/token","显存占用":"16GB"},"优化后推理":{"图像编码时间":"0ms(缓存)","文本生成时间":"20ms/token","显存占用":"8GB(量化+降维)"}}
题目 18:边缘端大模型部署

📋 面试题
如何将大模型部署到边缘设备(手机/嵌入式设备)?核心优化策略有哪些?

✅ 标准答案

┌─────────────────────────────────────────────────────────────┐ │ 边缘端大模型部署策略 │ ├─────────────────────────────────────────────────────────────┤ │ 1. 模型裁剪: │ │ - 层数裁剪:保留核心层(如LLaMA-7B→4层) │ │ - 维度裁剪:d_model从4096→512,减少计算量 │ │ 2. 模型蒸馏: │ │ - 知识蒸馏:用大模型指导小模型训练 │ │ - 量化蒸馏:训练时模拟量化误差 │ │ 3. 推理优化: │ │ - INT4/FP4量化:极致显存优化 │ │ - 算子轻量化:适配移动端GPU/NPU │ │ - 内存复用:减少峰值内存占用 │ └─────────────────────────────────────────────────────────────┘

📖 详细解析

importtorchimporttorch.nnasnnimporttorch.nn.functionalasF# 1. 模型裁剪defprune_model(model,layer_keep_ratio=0.5,dim_keep_ratio=0.5):"""裁剪模型层数和维度"""# 1. 层数裁剪original_layers=model.model.layers num_keep_layers=int(len(original_layers)*layer_keep_ratio)model.model.layers=nn.ModuleList(original_layers[:num_keep_layers])# 2. 维度裁剪d_model=model.config.hidden_size new_d_model=int(d_model*dim_keep_ratio)# 替换所有线性层defreplace_linear(module):forname,childinmodule.named_children():ifisinstance(child,nn.Linear):# 裁剪输入/输出维度new_linear=nn.Linear(in_features=min(child.in_features,new_d_model),out_features=min(child.out_features,new_d_model),bias=child.biasisnotNone)# 复制权重(截断)new_linear.weight.data=child.weight.data[:new_d_model,:new_d_model]ifchild.biasisnotNone:new_linear.bias.data=child.bias.data[:new_d_model]setattr(module,name,new_linear)else:replace_linear(child)replace_linear(model)model.config.hidden_size=new_d_model model.config.n_embd=new_d_model# 兼容不同命名returnmodel# 2. 模型蒸馏classDistiller(nn.Module):"""大模型蒸馏小模型"""def__init__(self,teacher_model,student_model,temperature=2.0):super().__init__()self.teacher=teacher_model self.student=student_model self.temperature=temperature self.alpha=0.7# 蒸馏损失权重self.beta=0.3# 分类损失权重defforward(self,input_ids,labels=None):# 教师模型输出(不更新)withtorch.no_grad():teacher_logits=self.teacher(input_ids).logits/self.temperature# 学生模型输出student_logits=self.student(input_ids).logits/self.temperature# 蒸馏损失(KL散度)distill_loss=F.kl_div(F.log_softmax(student_logits,dim=-1),F.softmax(teacher_logits,dim=-1),reduction="batchmean")*(self.temperature**2)# 分类损失iflabelsisnotNone:cls_loss=F.cross_entropy(student_logits.view(-1,student_logits.size(-1)),labels.view(-1))total_loss=self.alpha*distill_loss+self.beta*cls_losselse:total_loss=distill_lossreturn{"loss":total_loss,"distill_loss":distill_loss,"cls_loss":cls_lossiflabelsisnotNoneelse0}# 3. 边缘端推理引擎(适配移动端)classEdgeInferenceEngine:"""边缘端推理引擎"""def__init__(self,model,device="cpu"):self.model=model.eval()self.device=device self.model.to(device)# 量化为INT4self.quantize_model()defquantize_model(self):"""INT4量化模型"""forname,paraminself.model.named_parameters():# INT4量化max_val=torch.max(torch.abs(param))scale=max_val/7.0# INT4范围:-8~7param_quant=torch.clamp(torch.round(param/scale),-8,7).to(torch.int8)# 存储量化参数setattr(self.model,f"{name}_quant",param_quant)setattr(self.model,f"{name}_scale",scale)# 移除原始参数delparamdefforward(self,x):"""INT4推理"""x=x.to(self.device)# 模拟INT4推理defint4_linear(x,weight_quant,weight_scale,bias=None):# 反量化权重weight=weight_quant.to(torch.float32)*weight_scale# 矩阵乘法output=F.linear(x,weight,bias)# 量化输出(减少内存)output_max=torch.max(torch.abs(output))output_scale=output_max/127.0output_quant=torch.clamp(torch.round(output/output_scale),-128,127).to(torch.int8)returnoutput_quant,output_scale# 前向推理(简化版)forlayerinself.model.model.layers:# 自注意力层INT4推理q_quant,q_scale=int4_linear(x,layer.attention.q_proj_weight_quant,layer.attention.q_proj_scale)# 反量化用于计算q=q_quant.to(torch.float32)*q_scale# 后续计算(省略)x=layer(x)returnx# 4. 导出为移动端格式(ONNX/TFLite)defexport_to_tflite(model,dummy_input,output_path="model.tflite"):"""导出为TFLite格式(适配移动端)"""# 1. 导出为ONNXonnx_path="model.onnx"torch.onnx.export(model,dummy_input,onnx_path,input_names=["input_ids"],output_names=["logits"],opset_version=16,verbose=False)# 2. 转换为TFLiteimporttensorflowastf converter=tf.lite.TFLiteConverter.from_onnx_model(onnx_path)# 启用INT8量化converter.optimizations=[tf.lite.Optimize.DEFAULT]converter.target_spec.supported_types=[tf.int8]# 转换tflite_model=converter.convert()# 保存withopen(output_path,"wb")asf:f.write(tflite_model)print(f"模型已导出为{output_path},大小:{len(tflite_model)/1024/1024:.2f}MB")# 边缘端部署示例(手机端)EDGE_DEPLOY_EXAMPLE={"模型":"LLaMA-7B → 裁剪+蒸馏+INT4量化","模型大小":"7B→128MB","推理速度":"5ms/token(手机NPU)","显存占用":"512MB","精度":"保留85%+的通用能力"}

第四部分:Agent架构设计

题目 19:Agent核心架构设计

📋 面试题
请设计一个通用的大模型Agent架构,并解释各模块的核心功能。

✅ 标准答案

┌─────────────────────────────────────────────────────────────┐ │ Agent 核心架构 │ ├─────────────────────────────────────────────────────────────┤ │ 1. 感知模块(Perception): │ │ - 输入解析:理解用户指令、提取关键信息 │ │ - 多模态处理:文本/图像/语音输入转换 │ │ 2. 记忆模块(Memory): │ │ - 短期记忆:当前会话上下文 │ │ - 长期记忆:用户历史、知识库、工具调用记录 │ │ 3. 规划模块(Planning): │ │ - 任务分解:复杂任务拆分为子任务 │ │ - 策略选择:选择执行路径(直接回答/调用工具/多轮对话) │ │ 4. 执行模块(Execution): │ │ - 工具调用:调用外部API/函数 │ │ - 动作执行:执行具体操作 │ │ 5. 反思模块(Reflection): │ │ - 结果评估:检查执行结果是否符合预期 │ │ - 错误修正:调整策略重新执行 │ └─────────────────────────────────────────────────────────────┘

📖 详细解析

importjsonimporttimefromdataclassesimportdataclassfromtypingimportList,Dict,Any,Optional# 数据结构定义@dataclassclassAgentState:"""Agent状态"""query:str# 用户查询context:List[str]=None# 上下文memory:Dict[str,Any]=None# 记忆plan:List[str]=None# 执行计划tools_used:List[str]=None# 使用的工具reflection:str=None# 反思结果final_answer:str=None# 最终答案def__post_init__(self):self.context=self.contextor[]self.memory=self.memoryor{}self.plan=self.planor[]self.tools_used=self.tools_usedor[]# 1. 感知模块classPerceptionModule:"""感知模块"""def__init__(self,llm):self.llm=llmdefparse_input(self,state:AgentState):"""解析用户输入"""prompt=f""" 解析以下用户查询,提取关键信息: - 意图:用户想要完成的任务 - 实体:涉及的关键实体 - 约束:时间/格式/精度等约束 - 所需工具:可能需要调用的工具(如计算器/搜索引擎) 查询:{state.query}输出格式:JSON """response=self.llm.generate(prompt)parsed_info=json.loads(response)# 更新状态state.memory["parsed_info"]=parsed_inforeturnstate# 2. 记忆模块classMemoryModule:"""记忆模块"""def__init__(self,vector_db=None):self.vector_db=vector_db# 向量数据库self.short_term_memory={}# 会话级记忆self.long_term_memory={}# 长期记忆defsave_short_term(self,session_id:str,state:AgentState):"""保存短期记忆"""self.short_term_memory[session_id]={"context":state.context,"last_update":time.time()}defretrieve_long_term(self,query:str,top_k=3):"""检索长期记忆"""ifnotself.vector_db:return[]# 向量检索query_embedding=self.vector_db.encode(query)results=self.vector_db.search(query_embedding,top_k=top_k)returnresultsdefupdate_memory(self,state:AgentState,key:str,value:Any):"""更新记忆"""state.memory[key]=valuereturnstate# 3. 规划模块classPlanningModule:"""规划模块"""def__init__(self,llm):self.llm=llmdefgenerate_plan(self,state:AgentState):"""生成执行计划"""parsed_info=state.memory.get("parsed_info",{})prompt=f""" 根据以下信息生成执行计划: - 用户查询:{state.query}- 解析信息:{json.dumps(parsed_info,indent=2)}- 可用工具:计算器、搜索引擎、地图、代码执行器 计划要求: 1. 拆分为具体的子任务 2. 说明每个子任务的执行方式(直接回答/调用工具) 3. 预估执行顺序和依赖关系 输出格式:按步骤列出计划 """plan=self.llm.generate(prompt).strip().split("\n")state.plan=[p.strip()forpinplanifp.strip()]returnstate# 4. 执行模块classExecutionModule:"""执行模块"""def__init__(self,tools:Dict[str,Any]):self.tools=tools# 工具注册表defexecute_step(self,state:AgentState,step:str):"""执行单个步骤"""# 判断是否需要调用工具prompt=f""" 判断以下步骤是否需要调用工具,如需调用请指定工具名称和参数: 步骤:{step}可用工具:{list(self.tools.keys())}输出格式: - 无需调用:返回"直接执行" - 需要调用:返回JSON {{'tool': '工具名', 'params': {{参数}}}} """response=self.llm.generate(prompt)if"直接执行"inresponse:# 直接执行(LLM回答)result=self.llm.generate(f"回答以下步骤:{step}")returnresultelse:# 调用工具try:tool_call=json.loads(response)tool_name=tool_call["tool"]params=tool_call["params"]iftool_namenotinself.tools:returnf"工具不存在:{tool_name}"# 执行工具tool=self.tools[tool_name]result=tool(**params)# 记录使用的工具state.tools_used.append(f"{tool_name}({json.dumps(params)})")returnresultexceptExceptionase:returnf"工具调用失败:{str(e)}"defexecute_plan(self,state:AgentState):"""执行完整计划"""results=[]fori,stepinenumerate(state.plan):result=self.execute_step(state,step)results.append(f"步骤{i+1}{step}→ 结果:{result}")# 更新上下文state.context.append(f"步骤{i+1}执行结果:{result}")state.memory["execution_results"]=resultsreturnstate# 5. 反思模块classReflectionModule:"""反思模块"""def__init__(self,llm):self.llm=llmdefreflect(self,state:AgentState):"""反思执行结果"""prompt=f""" 评估以下执行过程和结果: - 用户查询:{state.query}- 执行计划:{json.dumps(state.plan,indent=2)}- 执行结果:{json.dumps(state.memory.get('execution_results',[]),indent=2)}请回答: 1. 执行结果是否满足用户需求? 2. 是否有错误或遗漏? 3. 如何改进执行计划? 输出格式:详细说明 """reflection=self.llm.generate(prompt)state.reflection=reflection# 判断是否需要重新执行if"不满足"inreflectionor"错误"inreflection:# 生成改进计划state.plan=self._generate_improved_plan(state,reflection)returnFalse# 执行失败returnTrue# 执行成功def_generate_improved_plan(self,state:AgentState,reflection:str):"""生成改进计划"""prompt=f""" 根据反思结果改进执行计划: - 原始计划:{json.dumps(state.plan,indent=2)}- 反思结果:{reflection}输出改进后的计划(按步骤列出): """improved_plan=self.llm.generate(prompt).strip().split("\n")return[p.strip()forpinimproved_planifp.strip()]# 6. Agent主类classGeneralAgent:"""通用Agent"""def__init__(self,llm,tools=None,vector_db=None):self.llm=llm self.perception=PerceptionModule(llm)self.memory=MemoryModule(vector_db)self.planning=PlanningModule(llm)self.execution=ExecutionModule(toolsor{})self.reflection=ReflectionModule(llm)defrun(self,query:str,session_id:str="default",max_retries=2):"""运行Agent"""# 初始化状态state=AgentState(query=query)# 加载记忆long_term_mem=self.memory.retrieve_long_term(query)state.memory["long_term_context"]=long_term_mem# 感知state=self.perception.parse_input(state)# 规划state=self.planning.generate_plan(state)# 执行(带重试)retries=0success=Falsewhileretries<max_retriesandnotsuccess:state=self.execution.execute_plan(state)success=self.reflection.reflect(state)retries+=1# 生成最终答案state.final_answer=self._generate_final_answer(state)# 保存记忆self.memory.save_short_term(session_id,state)returnstatedef_generate_final_answer(self,state:AgentState):"""生成最终答案"""prompt=f""" 根据以下信息生成最终回答: - 用户查询:{state.query}- 执行过程:{json.dumps(state.memory.get('execution_results',[]),indent=2)}- 反思结果:{state.reflection}要求: 1. 回答准确、完整 2. 格式清晰 3. 说明使用的工具和步骤(如有) """returnself.llm.generate(prompt)# 示例工具defcalculator_tool(a:float,b:float,op:str):"""计算器工具"""ifop=="+":returna+belifop=="-":returna-belifop=="*":returna*belifop=="/":returna/bifb!=0else"除数不能为0"else:returnf"不支持的操作:{op}"# 使用示例defagent_demo():"""Agent演示"""# 模拟LLMclassMockLLM:defgenerate(self,prompt):# 模拟生成if"解析"inprompt:returnjson.dumps({"意图":"计算2+3","实体":{"数字1":2,"数字2":3},"约束":"","所需工具":"计算器"})elif"生成执行计划"inprompt:return"1. 调用计算器工具,计算2+3\n2. 返回计算结果"elif"判断以下步骤"inprompt:returnjson.dumps({"tool":"calculator","params":{"a":2,"b":3,"op":"+"}})elif"评估"inprompt:return"执行结果满足用户需求,无错误"else:return"2+3的计算结果是5"
版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!