news 2026/4/15 5:52:55

MedGemma云端部署:基于FastAPI的高性能服务架构

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
MedGemma云端部署:基于FastAPI的高性能服务架构

MedGemma云端部署:基于FastAPI的高性能服务架构

如果你正在考虑把MedGemma这样的医学AI模型投入实际应用,那么云端部署就是绕不开的一步。直接跑在本地机器上做实验没问题,但真要给医生、研究人员或者医院系统用,就得考虑怎么让它稳定、高效、安全地跑在云端。

今天咱们就来聊聊,怎么用FastAPI给MedGemma搭建一个真正能用的云端服务。这不是那种“hello world”级别的教程,而是从实际生产角度出发,看看怎么设计一个能扛住真实流量的服务架构。

1. 为什么选择FastAPI来部署MedGemma?

先说说为什么是FastAPI。你可能知道Flask,也听说过Django,但FastAPI在AI模型部署这块确实有它的优势。

FastAPI最大的特点就是快,不是一般的快。它底层用的是Starlette和Pydantic,异步处理能力很强。对于MedGemma这种需要处理图像和文本的模型来说,请求往往不是瞬间就能完成的,模型推理需要时间。这时候异步处理就特别重要——一个请求在等模型推理的时候,服务器还能处理其他请求,不会干等着。

另一个好处是自动生成API文档。FastAPI会根据你的代码自动生成交互式API文档(Swagger UI和ReDoc),这对于团队协作和后续维护来说太方便了。医生或者研究人员想用你的服务,直接打开文档页面就能看到所有接口说明,还能在线测试。

还有类型提示和自动验证。你用Python的类型提示写代码,FastAPI会自动验证请求数据,不对就直接返回错误,不用你手动写一堆if-else去检查。这对于医疗应用特别重要——输入数据不对,模型输出就可能有问题。

# 一个简单的FastAPI应用示例 from fastapi import FastAPI, File, UploadFile from pydantic import BaseModel from typing import Optional app = FastAPI(title="MedGemma API服务", version="1.0.0") class AnalysisRequest(BaseModel): """分析请求的数据结构""" image_url: Optional[str] = None question: str system_prompt: str = "你是一位经验丰富的放射科医生。" @app.post("/analyze") async def analyze_image(request: AnalysisRequest, image_file: UploadFile = File(None)): """ 分析医学图像 - **image_url**: 图像URL(可选) - **image_file**: 上传的图像文件(可选) - **question**: 要问的问题 - **system_prompt**: 系统提示词,定义模型角色 至少需要提供image_url或image_file中的一个 """ # 这里会实现实际的图像分析和模型调用 return {"status": "success", "message": "分析请求已接收"}

上面这个例子虽然简单,但已经能看到FastAPI的好处了。清晰的类型定义、自动文档生成、异步支持,这些都是生产环境需要的。

2. 设计一个能扛住流量的异步接口

MedGemma服务最核心的部分就是接口设计。医生上传一张CT图像,问“有没有发现异常”,这个请求怎么处理?怎么保证同时来十个、一百个这样的请求时,服务还能正常响应?

2.1 异步处理的核心思路

关键是要把IO等待时间和CPU计算时间分开。模型加载图像、推理计算这些是CPU/GPU密集型任务,而网络传输、磁盘读写这些是IO密集型任务。在Python里,如果用同步的方式,一个请求在等模型推理的时候,整个线程就被占用了,其他请求只能排队。

异步的方式就不一样了。当模型在GPU上推理时,Python的异步任务可以“让出”控制权,去处理其他请求的准备工作,比如解析下一个请求的JSON数据、读取文件等。

import asyncio from concurrent.futures import ThreadPoolExecutor import aiofiles from PIL import Image import io # 创建一个线程池来处理CPU密集型任务 thread_pool = ThreadPoolExecutor(max_workers=4) @app.post("/analyze_async") async def analyze_image_async( image_file: UploadFile = File(...), question: str = "请分析这张图像" ): """ 异步处理图像分析请求 这个接口演示了如何把文件读取、图像预处理、模型推理等 不同性质的任务合理地安排到异步流程中 """ # 1. 异步读取上传的文件(IO操作,适合异步) contents = await image_file.read() # 2. 把CPU密集型的图像预处理放到线程池中执行 loop = asyncio.get_event_loop() pil_image = await loop.run_in_executor( thread_pool, process_image, # 这是一个同步函数 contents ) # 3. 模型推理(假设model_inference是同步的) # 如果模型推理本身支持异步更好,如果不支持,也放到线程池 analysis_result = await loop.run_in_executor( thread_pool, model_inference, pil_image, question ) # 4. 结果处理和后处理可以继续用异步 formatted_result = format_result(analysis_result) return { "image_id": image_file.filename, "question": question, "analysis": formatted_result, "processing_time": "异步处理完成" } def process_image(image_bytes: bytes) -> Image.Image: """同步的图像预处理函数""" image = Image.open(io.BytesIO(image_bytes)) # 这里可以添加图像预处理逻辑:调整大小、归一化等 return image def model_inference(image: Image.Image, question: str) -> dict: """同步的模型推理函数""" # 这里调用MedGemma模型进行推理 # 实际项目中,这里会是模型的实际调用代码 return {"finding": "检测到疑似病灶", "confidence": 0.87}

2.2 请求队列和限流机制

实际生产环境中,你不可能让所有请求都直接打到模型上。特别是MedGemma这种大模型,GPU内存有限,同时处理太多请求会爆内存。

这时候就需要请求队列。新来的请求先放到队列里,系统按顺序处理。还可以实现优先级队列——急诊的请求优先处理,科研用的请求可以排队等。

from fastapi import BackgroundTasks import queue import uuid from datetime import datetime # 内存中的请求队列(生产环境建议用Redis等外部队列) request_queue = queue.Queue() results_cache = {} # 用于存储处理结果 class AnalysisTask: """分析任务类""" def __init__(self, image_data: bytes, question: str, priority: int = 0): self.task_id = str(uuid.uuid4()) self.image_data = image_data self.question = question self.priority = priority # 优先级,数值越小优先级越高 self.status = "pending" self.created_at = datetime.now() self.started_at = None self.completed_at = None self.result = None @app.post("/submit_analysis") async def submit_analysis( background_tasks: BackgroundTasks, image_file: UploadFile = File(...), question: str = "请分析这张图像", priority: int = 0 ): """提交分析请求到队列""" contents = await image_file.read() task = AnalysisTask(contents, question, priority) # 将任务放入队列 request_queue.put(task) # 记录任务信息 results_cache[task.task_id] = { "status": "pending", "created_at": task.created_at.isoformat() } # 启动后台任务处理队列(如果还没启动的话) background_tasks.add_task(process_queue) return { "task_id": task.task_id, "status": "submitted", "message": "分析请求已提交到队列", "queue_position": request_queue.qsize() } @app.get("/task_status/{task_id}") async def get_task_status(task_id: str): """查询任务状态""" if task_id not in results_cache: return {"error": "任务不存在"} task_info = results_cache[task_id] return { "task_id": task_id, "status": task_info["status"], "created_at": task_info["created_at"], "started_at": task_info.get("started_at"), "completed_at": task_info.get("completed_at"), "result": task_info.get("result") } async def process_queue(): """处理队列中的任务""" while not request_queue.empty(): # 这里可以按优先级获取任务 task = request_queue.get() # 更新任务状态 results_cache[task.task_id]["status"] = "processing" results_cache[task.task_id]["started_at"] = datetime.now().isoformat() # 实际处理任务 try: # 这里调用模型处理 result = await process_analysis_task(task) # 更新结果 results_cache[task.task_id].update({ "status": "completed", "completed_at": datetime.now().isoformat(), "result": result }) except Exception as e: results_cache[task.task_id].update({ "status": "failed", "error": str(e) }) request_queue.task_done()

2.3 支持多种输入方式

在实际医疗场景中,图像来源可能多种多样。有的医生直接上传文件,有的医院系统通过URL提供图像,还有的可能已经上传到云存储了。好的API应该支持所有这些方式。

@app.post("/analyze_flexible") async def analyze_flexible( request: AnalysisRequest, image_file: UploadFile = File(None), background_tasks: BackgroundTasks = None ): """ 灵活的图像分析接口 支持多种输入方式: 1. 直接上传文件 2. 提供图像URL 3. 提供云存储路径(如S3、GCS) """ image_data = None # 方式1:上传文件 if image_file: image_data = await image_file.read() # 方式2:图像URL elif request.image_url: import aiohttp async with aiohttp.ClientSession() as session: async with session.get(request.image_url) as response: if response.status == 200: image_data = await response.read() else: return {"error": f"无法获取图像: {response.status}"} # 方式3:云存储路径(这里以S3为例) elif hasattr(request, 's3_path') and request.s3_path: # 实际项目中这里会调用AWS SDK # image_data = await download_from_s3(request.s3_path) pass else: return {"error": "请提供图像文件、URL或云存储路径"} if not image_data: return {"error": "无法获取图像数据"} # 处理图像 loop = asyncio.get_event_loop() pil_image = await loop.run_in_executor( thread_pool, process_image, image_data ) # 调用模型 result = await loop.run_in_executor( thread_pool, model_inference, pil_image, request.question ) # 如果需要长时间处理,可以放到后台任务 if background_tasks and request.background_mode: task_id = str(uuid.uuid4()) background_tasks.add_task( process_in_background, task_id, pil_image, request.question ) return {"task_id": task_id, "status": "processing_in_background"} return { "analysis": result, "input_type": "file" if image_file else "url" if request.image_url else "cloud" }

3. 负载均衡:让多个实例一起工作

单个服务实例能处理的请求总是有限的。特别是MedGemma这种模型,GPU内存就那么多,同时能处理的请求数有限。想要服务更多用户,就需要多个实例一起工作。

3.1 基于Nginx的负载均衡

最简单的方式是用Nginx做负载均衡。你启动多个MedGemma服务实例,每个实例在不同的端口,然后让Nginx把请求分发给它们。

# nginx.conf 负载均衡配置示例 http { upstream medgemma_backend { # 这里列出所有后端服务实例 server 127.0.0.1:8001; server 127.0.0.1:8002; server 127.0.0.1:8003; # 负载均衡策略 least_conn; # 最少连接数策略 # 或者用 ip_hash; # 同一个IP的请求总是发到同一个后端 } server { listen 80; server_name medgemma.yourdomain.com; location / { proxy_pass http://medgemma_backend; proxy_set_header Host $host; proxy_set_header X-Real-IP $remote_addr; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; # 增加超时时间,因为模型推理可能需要较长时间 proxy_read_timeout 300s; proxy_connect_timeout 75s; } # 健康检查端点 location /health { proxy_pass http://medgemma_backend/health; } } }

3.2 服务发现和动态扩缩容

在云环境中,服务实例可能随时创建或销毁。这时候就需要服务发现机制。新的实例启动后,自动注册到负载均衡器;实例下线时,自动从负载均衡器移除。

# 服务注册示例(简化版) import requests import time from threading import Thread class ServiceRegistry: """简单的服务注册中心""" def __init__(self, registry_url: str): self.registry_url = registry_url self.service_id = None def register(self, service_name: str, service_url: str): """注册服务""" payload = { "name": service_name, "url": service_url, "metadata": { "started_at": time.time(), "version": "1.0.0" } } response = requests.post( f"{self.registry_url}/register", json=payload ) if response.status_code == 200: data = response.json() self.service_id = data["service_id"] print(f"服务注册成功: {self.service_id}") # 启动心跳线程 heartbeat_thread = Thread( target=self._send_heartbeat, daemon=True ) heartbeat_thread.start() return response def _send_heartbeat(self): """发送心跳,保持服务活跃状态""" while True: time.sleep(30) # 每30秒发送一次心跳 if not self.service_id: continue try: requests.post( f"{self.registry_url}/heartbeat/{self.service_id}", timeout=5 ) except: print("心跳发送失败,尝试重新注册...") # 这里可以添加重新注册逻辑 def deregister(self): """注销服务""" if self.service_id: requests.post(f"{self.registry_url}/deregister/{self.service_id}") # 在FastAPI应用启动时注册服务 import contextlib from fastapi import FastAPI app = FastAPI() registry = ServiceRegistry("http://registry-service:8080") @app.on_event("startup") async def startup_event(): """应用启动时注册服务""" service_url = "http://localhost:8000" # 实际应该是可访问的URL registry.register("medgemma-service", service_url) @app.on_event("shutdown") async def shutdown_event(): """应用关闭时注销服务""" registry.deregister()

3.3 基于请求类型的路由

在医疗场景中,不同类型的请求可能需要不同的处理方式。比如,简单的图像分类请求可以用轻量级模型快速响应,而详细的诊断报告生成可能需要更复杂的模型,耗时也更长。

from fastapi import APIRouter # 创建不同的路由器处理不同类型的请求 quick_router = APIRouter() detailed_router = APIRouter() @quick_router.post("/classify") async def quick_classify(image_file: UploadFile = File(...)): """ 快速分类接口 适用于只需要知道图像类别(如正常/异常)的场景 响应速度快,适合实时应用 """ # 这里可能使用轻量级模型或缓存机制 return {"category": "abnormal", "confidence": 0.92} @detailed_router.post("/analyze_detail") async def detailed_analysis( image_file: UploadFile = File(...), generate_report: bool = True ): """ 详细分析接口 生成完整的诊断报告,包括病灶描述、位置、建议等 耗时较长,但信息更全面 """ # 这里使用完整的MedGemma模型 # 可能包括多个步骤:检测、分割、描述生成等 return { "findings": [ { "type": "nodule", "location": "right upper lobe", "size": "8mm", "characteristics": "well-defined, solid" } ], "impression": "建议3个月后复查CT", "report": "完整的放射学报告文本..." } # 将路由器挂载到主应用 app.include_router(quick_router, prefix="/quick", tags=["快速分析"]) app.include_router(detailed_router, prefix="/detailed", tags=["详细分析"]) # 负载均衡器可以根据路径前缀将请求路由到不同的实例组 # /quick/* 请求 -> 快速处理实例组 # /detailed/* 请求 -> 详细分析实例组

4. 自动扩缩容:根据流量自动调整

医疗服务的流量可能有明显的波峰波谷。白天工作时间请求多,晚上请求少。如果一直保持最大数量的实例,成本就太高了。自动扩缩容就是根据实际流量,自动增加或减少服务实例。

4.1 基于CPU/GPU利用率的扩缩容

最简单的扩缩容策略就是看资源使用率。GPU利用率持续高于80%?那就加实例。利用率低于30%持续一段时间?那就减实例。

# 监控和扩缩容逻辑示例(简化版) import psutil import GPUtil import time from typing import List class AutoScaler: """自动扩缩容管理器""" def __init__( self, min_instances: int = 1, max_instances: int = 10, scale_up_threshold: float = 0.8, scale_down_threshold: float = 0.3 ): self.min_instances = min_instances self.max_instances = max_instances self.scale_up_threshold = scale_up_threshold self.scale_down_threshold = scale_down_threshold self.current_instances = min_instances def check_and_scale(self): """检查资源使用情况并决定是否扩缩容""" # 获取GPU使用率(如果有GPU的话) gpu_usage = 0 try: gpus = GPUtil.getGPUs() if gpus: gpu_usage = max([gpu.load for gpu in gpus]) except: # 如果没有GPU或获取失败,使用CPU使用率 gpu_usage = psutil.cpu_percent() / 100 # 获取内存使用率 memory_usage = psutil.virtual_memory().percent / 100 # 使用较高的那个作为决策依据 resource_usage = max(gpu_usage, memory_usage) print(f"当前资源使用率: {resource_usage:.2%}") # 决策逻辑 if resource_usage > self.scale_up_threshold: if self.current_instances < self.max_instances: self.scale_up() elif resource_usage < self.scale_down_threshold: if self.current_instances > self.min_instances: self.scale_down() def scale_up(self): """扩容:增加一个实例""" print("资源使用率过高,正在扩容...") # 实际项目中,这里会调用云平台的API创建新实例 # 例如:aws.ec2.create_instance() # 或者:gcp.compute.create_instance() self.current_instances += 1 print(f"实例数增加到: {self.current_instances}") # 通知负载均衡器有新实例加入 self.register_new_instance() def scale_down(self): """缩容:减少一个实例""" print("资源使用率过低,正在缩容...") # 选择一个实例下线(比如最空闲的那个) instance_to_remove = self.select_instance_to_remove() # 实际项目中,这里会优雅地关闭实例 # 1. 先告诉负载均衡器停止向该实例发送新请求 # 2. 等待现有请求处理完成 # 3. 关闭实例 self.current_instances -= 1 print(f"实例数减少到: {self.current_instances}") def register_new_instance(self): """注册新实例到负载均衡器""" # 这里调用负载均衡器的API添加新后端 pass def select_instance_to_remove(self) -> str: """选择要移除的实例""" # 简单的策略:选择最近最空闲的实例 return "instance-3" # 后台运行自动扩缩容检查 def run_autoscaler(): scaler = AutoScaler() while True: scaler.check_and_scale() time.sleep(60) # 每分钟检查一次 # 在单独的线程中运行 import threading autoscaler_thread = threading.Thread(target=run_autoscaler, daemon=True) autoscaler_thread.start()

4.2 基于请求队列长度的扩缩容

对于有请求队列的系统,队列长度是更好的扩缩容指标。队列越来越长,说明处理不过来,需要加实例。队列一直很短,说明实例太多,可以减一些。

class QueueBasedScaler: """基于队列长度的扩缩容""" def __init__( self, queue_monitor_url: str, scale_up_queue_length: int = 20, scale_down_queue_length: int = 5 ): self.queue_monitor_url = queue_monitor_url self.scale_up_queue_length = scale_up_queue_length self.scale_down_queue_length = scale_down_queue_length def get_queue_length(self) -> int: """获取当前队列长度""" try: response = requests.get(f"{self.queue_monitor_url}/queue_stats") if response.status_code == 200: return response.json()["pending_tasks"] except: pass return 0 def check_and_scale(self): """基于队列长度检查并扩缩容""" queue_length = self.get_queue_length() print(f"当前队列长度: {queue_length}") if queue_length > self.scale_up_queue_length: print(f"队列过长({queue_length} > {self.scale_up_queue_length}),触发扩容") self.scale_up() elif queue_length < self.scale_down_queue_length: print(f"队列过短({queue_length} < {self.scale_down_queue_length}),触发缩容") self.scale_down() def scale_up(self): """扩容逻辑""" # 实际实现... pass def scale_down(self): """缩容逻辑""" # 实际实现... pass

4.3 基于预测的智能扩缩容

更高级的做法是基于历史数据进行预测。比如,你知道每周一上午9点到11点是请求高峰,那就提前准备好足够的实例。

import pandas as pd from datetime import datetime, timedelta class PredictiveScaler: """基于预测的智能扩缩容""" def __init__(self, history_data_path: str = "traffic_history.csv"): self.history_data = self.load_history_data(history_data_path) def load_history_data(self, path: str) -> pd.DataFrame: """加载历史流量数据""" try: df = pd.read_csv(path) df['timestamp'] = pd.to_datetime(df['timestamp']) return df except: # 如果没有历史数据,返回空DataFrame return pd.DataFrame(columns=['timestamp', 'request_count']) def predict_traffic(self, target_time: datetime) -> int: """预测目标时间的流量""" if self.history_data.empty: return 0 # 简单的预测:取相同星期几、相同时段的历史平均值 weekday = target_time.weekday() hour = target_time.hour historical = self.history_data[ (self.history_data['timestamp'].dt.weekday == weekday) & (self.history_data['timestamp'].dt.hour == hour) ] if len(historical) > 0: return int(historical['request_count'].mean()) else: return 0 def calculate_required_instances(self, predicted_traffic: int) -> int: """根据预测流量计算需要的实例数""" # 假设每个实例每分钟能处理10个请求 requests_per_instance_per_minute = 10 # 需要的实例数 = 预测流量 / 每个实例的处理能力 required = max(1, predicted_traffic // requests_per_instance_per_minute + 1) # 加上一些缓冲 required = min(required * 1.2, 20) # 最多20个实例 return int(required) def schedule_scaling(self): """安排扩缩容计划""" now = datetime.now() # 预测未来1小时的流量 future_time = now + timedelta(hours=1) predicted_traffic = self.predict_traffic(future_time) required_instances = self.calculate_required_instances(predicted_traffic) print(f"预测{future_time.strftime('%H:%M')}的流量: {predicted_traffic} 请求/分钟") print(f"建议实例数: {required_instances}") # 这里可以触发实际的扩缩容操作 return required_instances

5. 监控告警:知道服务发生了什么

服务部署好了,实例也能自动扩缩容了,但你怎么知道一切正常?有没有请求失败?响应时间是不是变慢了?GPU温度是不是太高了?这就需要监控告警系统。

5.1 关键指标监控

对于MedGemma这样的服务,有几个关键指标需要特别关注:

  1. 响应时间:从收到请求到返回响应的时间
  2. 错误率:失败请求的比例
  3. GPU使用率:模型推理主要用GPU
  4. 队列长度:如果有请求队列的话
  5. 并发请求数:同时处理的请求数
# 监控指标收集示例 import time from dataclasses import dataclass from typing import Dict, List import statistics @dataclass class RequestMetrics: """请求指标""" request_id: str endpoint: str start_time: float end_time: float success: bool error_message: str = "" @property def duration(self) -> float: return self.end_time - self.start_time class MetricsCollector: """指标收集器""" def __init__(self): self.request_history: List[RequestMetrics] = [] self.max_history = 1000 # 最多保存1000条记录 def record_request(self, metrics: RequestMetrics): """记录请求指标""" self.request_history.append(metrics) # 保持历史记录不超过最大值 if len(self.request_history) > self.max_history: self.request_history = self.request_history[-self.max_history:] def get_summary(self) -> Dict: """获取指标摘要""" if not self.request_history: return {} recent_metrics = self.request_history[-100:] # 最近100个请求 durations = [m.duration for m in recent_metrics] success_count = sum(1 for m in recent_metrics if m.success) total_count = len(recent_metrics) return { "avg_response_time": statistics.mean(durations) if durations else 0, "p95_response_time": statistics.quantiles(durations, n=20)[18] if len(durations) >= 20 else 0, "success_rate": success_count / total_count if total_count > 0 else 0, "total_requests": total_count, "endpoint_distribution": self.get_endpoint_distribution(recent_metrics) } def get_endpoint_distribution(self, metrics: List[RequestMetrics]) -> Dict: """获取端点分布""" distribution = {} for m in metrics: distribution[m.endpoint] = distribution.get(m.endpoint, 0) + 1 return distribution # 在FastAPI中间件中收集指标 from fastapi import Request import uuid metrics_collector = MetricsCollector() @app.middleware("http") async def collect_metrics(request: Request, call_next): """收集请求指标的中间件""" request_id = str(uuid.uuid4()) start_time = time.time() try: response = await call_next(request) end_time = time.time() metrics = RequestMetrics( request_id=request_id, endpoint=request.url.path, start_time=start_time, end_time=end_time, success=response.status_code < 400 ) metrics_collector.record_request(metrics) # 在响应头中添加请求ID response.headers["X-Request-ID"] = request_id response.headers["X-Processing-Time"] = str(end_time - start_time) return response except Exception as e: end_time = time.time() metrics = RequestMetrics( request_id=request_id, endpoint=request.url.path, start_time=start_time, end_time=end_time, success=False, error_message=str(e) ) metrics_collector.record_request(metrics) raise @app.get("/metrics") async def get_metrics(): """获取当前指标""" return metrics_collector.get_summary()

5.2 集成Prometheus和Grafana

对于生产环境,通常会用专业的监控系统,比如Prometheus收集指标,Grafana展示仪表盘。

# Prometheus指标导出示例 from prometheus_client import Counter, Histogram, generate_latest, CONTENT_TYPE_LATEST from fastapi import Response # 定义Prometheus指标 REQUEST_COUNT = Counter( 'medgemma_requests_total', 'Total number of requests', ['method', 'endpoint', 'status'] ) REQUEST_LATENCY = Histogram( 'medgemma_request_duration_seconds', 'Request latency in seconds', ['method', 'endpoint'] ) @app.middleware("http") async def prometheus_middleware(request: Request, call_next): """Prometheus指标收集中间件""" method = request.method endpoint = request.url.path # 记录开始时间 start_time = time.time() try: response = await call_next(request) # 记录请求计数 REQUEST_COUNT.labels( method=method, endpoint=endpoint, status=response.status_code ).inc() # 记录延迟 latency = time.time() - start_time REQUEST_LATENCY.labels( method=method, endpoint=endpoint ).observe(latency) return response except Exception as e: # 记录错误请求 REQUEST_COUNT.labels( method=method, endpoint=endpoint, status=500 ).inc() raise @app.get("/metrics/prometheus") async def prometheus_metrics(): """Prometheus指标端点""" return Response( content=generate_latest(), media_type=CONTENT_TYPE_LATEST )

5.3 告警规则配置

监控数据有了,还需要在出现问题时及时告警。

# alert_rules.yaml - 告警规则配置示例 groups: - name: medgemma_alerts rules: # 高错误率告警 - alert: HighErrorRate expr: rate(medgemma_requests_total{status=~"5.."}[5m]) / rate(medgemma_requests_total[5m]) > 0.05 for: 2m labels: severity: critical annotations: summary: "高错误率检测到" description: "错误率超过5%,当前值为 {{ $value }}" # 高延迟告警 - alert: HighLatency expr: histogram_quantile(0.95, rate(medgemma_request_duration_seconds_bucket[5m])) > 10 for: 2m labels: severity: warning annotations: summary: "高延迟检测到" description: "95分位响应时间超过10秒,当前值为 {{ $value }}秒" # GPU温度过高告警 - alert: GPUTemperatureHigh expr: gpu_temperature_celsius > 85 for: 1m labels: severity: critical annotations: summary: "GPU温度过高" description: "GPU温度达到 {{ $value }}°C"

6. 实际部署建议和注意事项

聊了这么多技术细节,最后说说实际部署时的一些建议。

6.1 容器化部署

现在部署服务,容器化几乎是标配。用Docker把MedGemma服务打包成镜像,部署、迁移、扩展都方便。

# Dockerfile示例 FROM python:3.9-slim # 安装系统依赖 RUN apt-get update && apt-get install -y \ build-essential \ libgl1-mesa-glx \ libglib2.0-0 \ && rm -rf /var/lib/apt/lists/* # 设置工作目录 WORKDIR /app # 复制依赖文件 COPY requirements.txt . # 安装Python依赖 RUN pip install --no-cache-dir -r requirements.txt # 复制应用代码 COPY . . # 创建非root用户 RUN useradd -m -u 1000 appuser && chown -R appuser:appuser /app USER appuser # 暴露端口 EXPOSE 8000 # 启动命令 CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]

6.2 健康检查

一定要实现健康检查接口,让负载均衡器知道服务是否正常。

@app.get("/health") async def health_check(): """健康检查接口""" # 检查关键依赖 checks = { "api": "healthy", "model": check_model_health(), "gpu": check_gpu_health(), "database": check_database_connection() } # 判断整体健康状态 all_healthy = all(status == "healthy" for status in checks.values()) return { "status": "healthy" if all_healthy else "unhealthy", "timestamp": datetime.now().isoformat(), "checks": checks } def check_model_health() -> str: """检查模型健康状态""" try: # 尝试一个简单的推理,确保模型能正常工作 test_input = "健康检查" # 这里调用模型进行简单测试 return "healthy" except Exception as e: print(f"模型健康检查失败: {e}") return "unhealthy"

6.3 优雅关闭

服务需要重启或下线时,要优雅地关闭,不能直接杀掉进程,否则正在处理的请求会失败。

import signal import asyncio from contextlib import asynccontextmanager # 全局变量,标记服务是否正在关闭 is_shutting_down = False @asynccontextmanager async def lifespan(app: FastAPI): """应用生命周期管理""" # 启动时 print("服务启动中...") # 设置信号处理 loop = asyncio.get_event_loop() for sig in (signal.SIGTERM, signal.SIGINT): loop.add_signal_handler(sig, lambda: shutdown_handler(sig)) yield # 应用运行 # 关闭时 print("服务关闭中...") await graceful_shutdown() def shutdown_handler(signum): """信号处理函数""" global is_shutting_down print(f"收到信号 {signum},开始优雅关闭...") is_shutting_down = True async def graceful_shutdown(): """优雅关闭""" global is_shutting_down # 1. 停止接受新请求 is_shutting_down = True print("已停止接受新请求") # 2. 等待正在处理的请求完成 # 这里可以根据实际情况实现,比如等待一段时间 await asyncio.sleep(10) # 3. 清理资源 await cleanup_resources() print("服务已完全关闭") @app.middleware("http") async def check_shutdown(request: Request, call_next): """检查服务是否正在关闭""" global is_shutting_down if is_shutting_down: return JSONResponse( status_code=503, content={"error": "服务正在关闭,请稍后重试"} ) return await call_next(request) # 创建应用时使用lifespan app = FastAPI(lifespan=lifespan)

6.4 安全考虑

医疗数据特别敏感,安全一定要重视。

  1. HTTPS:一定要用HTTPS,不能用HTTP
  2. 认证授权:谁可以访问服务?不同的用户可能有不同的权限
  3. 数据加密:传输中的数据要加密,存储的数据也要加密
  4. 访问日志:谁在什么时候访问了什么,都要有记录
  5. 输入验证:用户上传的数据一定要严格验证
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from fastapi import Depends, HTTPException, status security = HTTPBearer() async def verify_token(credentials: HTTPAuthorizationCredentials = Depends(security)): """验证访问令牌""" token = credentials.credentials # 这里实现实际的令牌验证逻辑 # 可以检查JWT令牌、API密钥等 if not is_valid_token(token): raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="无效的访问令牌", headers={"WWW-Authenticate": "Bearer"}, ) return token @app.post("/secure_analyze") async def secure_analyze( image_file: UploadFile = File(...), token: str = Depends(verify_token) ): """需要认证的分析接口""" # 只有通过认证的用户才能访问 user_info = get_user_info_from_token(token) # 记录访问日志 log_access(user_info, "analyze", image_file.filename) # 处理请求... return {"status": "success"}

获取更多AI镜像

想探索更多AI镜像和应用场景?访问 CSDN星图镜像广场,提供丰富的预置镜像,覆盖大模型推理、图像生成、视频生成、模型微调等多个领域,支持一键部署。

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/3/17 9:29:54

【YOLOv12多模态创新改进】全网独家首发创新篇| CVPR 2025 | 引入 MEPF掩膜增强像素级融合模块,高效融合 RGB 与红外信息,适合可见光与红外图像融合目标检测、多模态遥感小目标检测

一、本文介绍 🔥本文给大家介绍使用 MEPF掩膜增强像素级融合模块改进 YOLOv12 多模态目标检测模型,可在网络输入阶段以像素级方式高效融合 RGB 与红外信息,通过掩膜引导机制突出跨模态一致的目标区域并抑制背景冗余,从而显著增强小目标和弱目标的可见性。MEPF 在保持极低…

作者头像 李华
网站建设 2026/3/28 13:21:13

OneAPI SDK集成指南:Python/Java/Go多语言客户端快速接入

OneAPI SDK集成指南&#xff1a;Python/Java/Go多语言客户端快速接入 1. 为什么你需要一个统一的AI模型接入层 你有没有遇到过这样的情况&#xff1a;项目里要同时调用ChatGLM、通义千问和Claude&#xff0c;结果每个模型都要写一套鉴权逻辑、重试机制、错误处理和流式响应解…

作者头像 李华
网站建设 2026/4/8 19:59:06

PP-DocLayoutV3应用场景:制造业BOM表、电路图、设备说明书布局理解

PP-DocLayoutV3应用场景&#xff1a;制造业BOM表、电路图、设备说明书布局理解 在制造业数字化转型过程中&#xff0c;工程师每天要处理大量非标准格式的技术文档——歪斜扫描的BOM表、带折痕的电路原理图、卷曲边缘的设备说明书。这些文档往往存在透视变形、光照不均、纸张褶…

作者头像 李华
网站建设 2026/4/13 12:10:38

KOOK真实幻想艺术馆本地部署:Mac M2/M3芯片Metal加速适配方案

KOOK真实幻想艺术馆本地部署&#xff1a;Mac M2/M3芯片Metal加速适配方案 1. 为什么Mac用户需要专属部署方案 你可能已经试过在Mac上运行主流AI绘画工具&#xff0c;结果不是卡在模型加载阶段&#xff0c;就是生成一张图要等三分钟&#xff0c;还经常遇到显存不足的报错。更尴…

作者头像 李华