news 2026/1/11 20:45:02

DREAMVFIA Test Master 自动化测试平台 - 完整开源项目完整数据代码包(部分一/三)

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
DREAMVFIA Test Master 自动化测试平台 - 完整开源项目完整数据代码包(部分一/三)

🚀 TestMaster 自动化测试平台 - 完整开源项目

╔════════════════════════════════════════════════════════════════════════════╗ ║ DREAMVFIA 开源编程大师 V1.1 - 项目生成系统 ║ ║ TESTMASTER PROJECT GENERATION ║ ╚════════════════════════════════════════════════════════════════════════════╝ 🎯 项目定位: 端到端测试自动化解决方案 🔧 技术栈: React + TypeScript + Node.js + Python 🎨 架构模式: 微服务 + 事件驱动 🔐 安全级别: QUANTUM-SSS (企业级) 项目生成启动中...

第一部分:项目架构设计与技术选型

1.1 项目概述

1.1.1 项目背景

TestMaster 是一个企业级的端到端自动化测试平台,旨在解决现代软件开发中测试自动化的痛点:

核心痛点:

  1. 测试用例编写门槛高- 需要编程知识
  2. 跨平台测试复杂- 浏览器/设备兼容性难以保证
  3. 测试场景覆盖不足- 人工设计测试用例容易遗漏
  4. CI/CD集成困难- 测试与开发流程脱节
  5. 性能测试专业性强- 需要专门的性能测试工具

TestMaster 解决方案:

  • 无代码测试创建- 可视化拖拽式测试用例设计
  • AI 智能生成- 基于页面结构自动生成测试场景
  • 跨平台支持- 一次编写,多端运行
  • 性能测试集成- 内置负载测试和性能监控
  • 无缝 CI/CD- 与主流 CI/CD 工具深度集成

1.1.2 技术架构图

┌─────────────────────────────────────────────────────────────────────┐ │ TestMaster 架构图 │ ├─────────────────────────────────────────────────────────────────────┤ │ │ │ ┌──────────────────────────────────────────────────────────────┐ │ │ │ 前端层 (Frontend) │ │ │ │ ┌────────────┐ ┌────────────┐ ┌────────────┐ │ │ │ │ │ React UI │ │ 可视化编辑器│ │ 测试报告 │ │ │ │ │ │ Dashboard │ │ (No-Code) │ │ Dashboard │ │ │ │ │ └────────────┘ └────────────┘ └────────────┘ │ │ │ └──────────────────────────────────────────────────────────────┘ │ │ ↕ │ │ ┌──────────────────────────────────────────────────────────────┐ │ │ │ API 网关层 (Gateway) │ │ │ │ ┌─────────────────────────────┐ │ │ │ │ │ GraphQL / REST API Gateway │ │ │ │ │ │ 认证 / 授权 / 限流 │ │ │ │ │ └─────────────────────────────┘ │ │ │ └──────────────────────────────────────────────────────────────┘ │ │ ↕ │ │ ┌──────────────────────────────────────────────────────────────┐ │ │ │ 微服务层 (Microservices) │ │ │ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │ │ │ │测试引擎 │ │AI生成器 │ │执行器 │ │报告服务 │ │ │ │ │ │Service │ │Service │ │Service │ │Service │ │ │ │ │ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │ │ │ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │ │ │ │性能测试 │ │设备管理 │ │CI/CD集成 │ │用户管理 │ │ │ │ │ │Service │ │Service │ │Service │ │Service │ │ │ │ │ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │ │ │ └──────────────────────────────────────────────────────────────┘ │ │ ↕ │ │ ┌──────────────────────────────────────────────────────────────┐ │ │ │ 数据层 (Data Layer) │ │ │ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │ │ │ │PostgreSQL│ │MongoDB │ │Redis │ │S3/MinIO │ │ │ │ │ │(关系数据)│ │(测试数据)│ │(缓存) │ │(文件存储)│ │ │ │ │ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │ │ │ └──────────────────────────────────────────────────────────────┘ │ │ ↕ │ │ ┌──────────────────────────────────────────────────────────────┐ │ │ │ 基础设施层 (Infrastructure) │ │ │ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │ │ │ │Kubernetes│ │Docker │ │Selenium │ │Playwright│ │ │ │ │ │(编排) │ │(容器) │ │Grid │ │(浏览器) │ │ │ │ │ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │ │ │ └──────────────────────────────────────────────────────────────┘ │ │ │ └─────────────────────────────────────────────────────────────────────┘

1.1.3 技术栈选型

前端技术栈
{ "framework": "React 18.2", "language": "TypeScript 5.0", "stateManagement": "Zustand + React Query", "ui": "Tailwind CSS + shadcn/ui", "visualization": "Recharts + D3.js", "editor": "Monaco Editor (VS Code)", "dragDrop": "React DnD", "testing": "Vitest + React Testing Library" }
后端技术栈
{ "runtime": "Node.js 20 LTS", "framework": "NestJS (TypeScript)", "api": "GraphQL (Apollo) + REST", "testEngine": "Python 3.11 (Pytest + Selenium)", "aiEngine": "Python (OpenAI GPT-4 / Local LLM)", "database": { "relational": "PostgreSQL 15", "document": "MongoDB 6.0", "cache": "Redis 7.0", "search": "Elasticsearch 8.0" }, "messageQueue": "RabbitMQ / Apache Kafka", "storage": "MinIO (S3-compatible)" }
DevOps 技术栈
containerization: Docker orchestration: Kubernetes ci_cd: - GitHub Actions - GitLab CI - Jenkins monitoring: - Prometheus - Grafana - ELK Stack testing_tools: - Selenium Grid - Playwright - Appium (移动端) - JMeter (性能测试)

1.2 项目目录结构

testmaster/ ├── README.md # 项目说明 ├── LICENSE # MIT 许可证 ├── .gitignore # Git 忽略文件 ├── docker-compose.yml # Docker 编排 ├── kubernetes/ # K8s 部署配置 │ ├── namespace.yaml │ ├── deployments/ │ ├── services/ │ └── ingress.yaml │ ├── docs/ # 文档目录 │ ├── architecture.md # 架构文档 │ ├── api-reference.md # API 文档 │ ├── user-guide.md # 用户指南 │ └── development.md # 开发指南 │ ├── frontend/ # 前端应用 │ ├── package.json │ ├── tsconfig.json │ ├── vite.config.ts │ ├── tailwind.config.js │ ├── src/ │ │ ├── main.tsx # 入口文件 │ │ ├── App.tsx # 根组件 │ │ ├── components/ # 组件库 │ │ │ ├── ui/ # UI 组件 │ │ │ ├── editor/ # 可视化编辑器 │ │ │ ├── dashboard/ # 仪表盘 │ │ │ └── reports/ # 报告组件 │ │ ├── pages/ # 页面组件 │ │ │ ├── Dashboard.tsx │ │ │ ├── TestEditor.tsx │ │ │ ├── TestRunner.tsx │ │ │ └── Reports.tsx │ │ ├── hooks/ # 自定义 Hooks │ │ ├── store/ # 状态管理 │ │ ├── services/ # API 服务 │ │ ├── types/ # TypeScript 类型 │ │ └── utils/ # 工具函数 │ └── public/ # 静态资源 │ ├── backend/ # 后端服务 │ ├── gateway/ # API 网关 │ │ ├── package.json │ │ ├── src/ │ │ │ ├── main.ts │ │ │ ├── app.module.ts │ │ │ ├── auth/ # 认证模块 │ │ │ ├── graphql/ # GraphQL │ │ │ └── rest/ # REST API │ │ └── test/ │ │ │ ├── services/ # 微服务 │ │ ├── test-engine/ # 测试引擎服务 │ │ │ ├── package.json │ │ │ └── src/ │ │ │ ├── main.ts │ │ │ ├── test-executor/ │ │ │ ├── test-parser/ │ │ │ └── test-validator/ │ │ │ │ │ ├── ai-generator/ # AI 生成服务 │ │ │ ├── requirements.txt │ │ │ └── src/ │ │ │ ├── main.py │ │ │ ├── llm_service.py │ │ │ ├── scenario_generator.py │ │ │ └── page_analyzer.py │ │ │ │ │ ├── executor/ # 执行器服务 │ │ │ ├── requirements.txt │ │ │ └── src/ │ │ │ ├── main.py │ │ │ ├── selenium_runner.py │ │ │ ├── playwright_runner.py │ │ │ └── appium_runner.py │ │ │ │ │ ├── performance/ # 性能测试服务 │ │ │ ├── requirements.txt │ │ │ └── src/ │ │ │ ├── main.py │ │ │ ├── load_tester.py │ │ │ └── metrics_collector.py │ │ │ │ │ ├── device-manager/ # 设备管理服务 │ │ │ ├── package.json │ │ │ └── src/ │ │ │ ├── main.ts │ │ │ ├── browser-pool/ │ │ │ └── device-pool/ │ │ │ │ │ ├── report/ # 报告服务 │ │ │ ├── package.json │ │ │ └── src/ │ │ │ ├── main.ts │ │ │ ├── report-generator/ │ │ │ └── analytics/ │ │ │ │ │ └── cicd-integration/ # CI/CD 集成服务 │ │ ├── package.json │ │ └── src/ │ │ ├── main.ts │ │ ├── github-actions/ │ │ ├── gitlab-ci/ │ │ └── jenkins/ │ │ │ └── shared/ # 共享库 │ ├── types/ # 共享类型 │ ├── utils/ # 工具函数 │ └── constants/ # 常量定义 │ ├── database/ # 数据库脚本 │ ├── migrations/ # 数据库迁移 │ ├── seeds/ # 种子数据 │ └── schemas/ # 数据库模式 │ ├── scripts/ # 脚本工具 │ ├── setup.sh # 环境搭建 │ ├── deploy.sh # 部署脚本 │ └── test.sh # 测试脚本 │ └── tests/ # 测试文件 ├── e2e/ # 端到端测试 ├── integration/ # 集成测试 └── unit/ # 单元测试

1.3 数据库设计

1.3.1 PostgreSQL 关系型数据库设计

-- ============================================ -- TestMaster 数据库设计 -- ============================================ -- 用户表 CREATE TABLE users ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), email VARCHAR(255) UNIQUE NOT NULL, username VARCHAR(100) UNIQUE NOT NULL, password_hash VARCHAR(255) NOT NULL, full_name VARCHAR(255), avatar_url TEXT, role VARCHAR(50) DEFAULT 'user', is_active BOOLEAN DEFAULT true, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, last_login_at TIMESTAMP ); -- 项目表 CREATE TABLE projects ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), name VARCHAR(255) NOT NULL, description TEXT, owner_id UUID REFERENCES users(id) ON DELETE CASCADE, settings JSONB DEFAULT '{}', is_archived BOOLEAN DEFAULT false, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); -- 测试套件表 CREATE TABLE test_suites ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), project_id UUID REFERENCES projects(id) ON DELETE CASCADE, name VARCHAR(255) NOT NULL, description TEXT, tags TEXT[], created_by UUID REFERENCES users(id), created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); -- 测试用例表 CREATE TABLE test_cases ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), suite_id UUID REFERENCES test_suites(id) ON DELETE CASCADE, name VARCHAR(255) NOT NULL, description TEXT, priority VARCHAR(20) DEFAULT 'medium', status VARCHAR(50) DEFAULT 'draft', test_data JSONB NOT NULL, expected_results JSONB, tags TEXT[], created_by UUID REFERENCES users(id), created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); -- 测试步骤表 CREATE TABLE test_steps ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), test_case_id UUID REFERENCES test_cases(id) ON DELETE CASCADE, step_order INTEGER NOT NULL, action_type VARCHAR(100) NOT NULL, selector VARCHAR(500), value TEXT, wait_condition JSONB, screenshot BOOLEAN DEFAULT false, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); -- 测试执行表 CREATE TABLE test_executions ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), test_case_id UUID REFERENCES test_cases(id), suite_id UUID REFERENCES test_suites(id), environment VARCHAR(100), browser VARCHAR(50), device VARCHAR(100), status VARCHAR(50) DEFAULT 'pending', started_at TIMESTAMP, completed_at TIMESTAMP, duration_ms INTEGER, triggered_by UUID REFERENCES users(id), ci_build_id VARCHAR(255), created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); -- 测试结果表 CREATE TABLE test_results ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), execution_id UUID REFERENCES test_executions(id) ON DELETE CASCADE, step_id UUID REFERENCES test_steps(id), status VARCHAR(50) NOT NULL, error_message TEXT, screenshot_url TEXT, video_url TEXT, logs JSONB, metrics JSONB, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); -- 性能测试表 CREATE TABLE performance_tests ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), project_id UUID REFERENCES projects(id) ON DELETE CASCADE, name VARCHAR(255) NOT NULL, url TEXT NOT NULL, method VARCHAR(10) DEFAULT 'GET', headers JSONB, body TEXT, virtual_users INTEGER DEFAULT 10, duration_seconds INTEGER DEFAULT 60, ramp_up_seconds INTEGER DEFAULT 10, created_by UUID REFERENCES users(id), created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); -- 性能测试结果表 CREATE TABLE performance_results ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), test_id UUID REFERENCES performance_tests(id) ON DELETE CASCADE, execution_id UUID, total_requests INTEGER, successful_requests INTEGER, failed_requests INTEGER, avg_response_time_ms DECIMAL(10,2), min_response_time_ms DECIMAL(10,2), max_response_time_ms DECIMAL(10,2), p95_response_time_ms DECIMAL(10,2), p99_response_time_ms DECIMAL(10,2), requests_per_second DECIMAL(10,2), error_rate DECIMAL(5,2), started_at TIMESTAMP, completed_at TIMESTAMP, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); -- 设备/浏览器配置表 CREATE TABLE device_configs ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), project_id UUID REFERENCES projects(id) ON DELETE CASCADE, name VARCHAR(255) NOT NULL, type VARCHAR(50) NOT NULL, -- 'browser', 'mobile', 'tablet' browser VARCHAR(50), browser_version VARCHAR(50), os VARCHAR(100), os_version VARCHAR(50), screen_resolution VARCHAR(50), user_agent TEXT, capabilities JSONB, is_active BOOLEAN DEFAULT true, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); -- CI/CD 集成配置表 CREATE TABLE cicd_integrations ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), project_id UUID REFERENCES projects(id) ON DELETE CASCADE, platform VARCHAR(50) NOT NULL, -- 'github', 'gitlab', 'jenkins' config JSONB NOT NULL, webhook_url TEXT, is_active BOOLEAN DEFAULT true, created_by UUID REFERENCES users(id), created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); -- 索引优化 CREATE INDEX idx_test_cases_suite_id ON test_cases(suite_id); CREATE INDEX idx_test_steps_case_id ON test_steps(test_case_id); CREATE INDEX idx_test_executions_case_id ON test_executions(test_case_id); CREATE INDEX idx_test_results_execution_id ON test_results(execution_id); CREATE INDEX idx_projects_owner_id ON projects(owner_id); CREATE INDEX idx_test_executions_status ON test_executions(status); CREATE INDEX idx_test_executions_created_at ON test_executions(created_at DESC);

1.3.2 MongoDB 文档数据库设计

// ============================================ // TestMaster MongoDB 集合设计 // ============================================ // AI 生成的测试场景集合 db.createCollection("ai_test_scenarios", { validator: { $jsonSchema: { bsonType: "object", required: ["projectId", "url", "scenarios", "generatedAt"], properties: { projectId: { bsonType: "string" }, url: { bsonType: "string" }, pageStructure: { bsonType: "object", properties: { title: { bsonType: "string" }, elements: { bsonType: "array" }, forms: { bsonType: "array" }, links: { bsonType: "array" } } }, scenarios: { bsonType: "array", items: { bsonType: "object", properties: { name: { bsonType: "string" }, description: { bsonType: "string" }, steps: { bsonType: "array" }, priority: { bsonType: "string" }, confidence: { bsonType: "double" } } } }, generatedBy: { bsonType: "string" }, generatedAt: { bsonType: "date" }, model: { bsonType: "string" } } } } }); // 测试执行日志集合 db.createCollection("execution_logs", { validator: { $jsonSchema: { bsonType: "object", required: ["executionId", "timestamp", "level", "message"], properties: { executionId: { bsonType: "string" }, timestamp: { bsonType: "date" }, level: { enum: ["debug", "info", "warn", "error"] }, message: { bsonType: "string" }, context: { bsonType: "object" }, stackTrace: { bsonType: "string" } } } } }); // 性能指标时序数据集合 db.createCollection("performance_metrics", { validator: { $jsonSchema: { bsonType: "object", required: ["executionId", "timestamp", "metrics"], properties: { executionId: { bsonType: "string" }, timestamp: { bsonType: "date" }, metrics: { bsonType: "object", properties: { responseTime: { bsonType: "double" }, requestsPerSecond: { bsonType: "double" }, errorRate: { bsonType: "double" }, cpuUsage: { bsonType: "double" }, memoryUsage: { bsonType: "double" }, networkLatency: { bsonType: "double" } } }, labels: { bsonType: "object" } } } } }); // 创建索引 db.ai_test_scenarios.createIndex({ projectId: 1, generatedAt: -1 }); db.execution_logs.createIndex({ executionId: 1, timestamp: -1 }); db.performance_metrics.createIndex({ executionId: 1, timestamp: -1 }); db.execution_logs.createIndex({ timestamp: 1 }, { expireAfterSeconds: 2592000 }); // 30天过期

现在我将继续生成第二部分:核心代码实现。由于内容非常庞大,我将分多个回复完成。

第二部分:核心代码实现

2.1 前端核心代码

2.1.1 主入口文件frontend/src/main.tsx
import React from 'react'; import ReactDOM from 'react-dom/client'; import App from './App'; import './index.css'; // 错误边界 import { ErrorBoundary } from './components/ErrorBoundary'; ReactDOM.createRoot(document.getElementById('root')!).render( <React.StrictMode> <ErrorBoundary> <App /> </ErrorBoundary> </React.StrictMode> );
2.1.2 根组件frontend/src/App.tsx
import React from 'react'; import { BrowserRouter, Routes, Route, Navigate } from 'react-router-dom'; import { QueryClient, QueryClientProvider } from '@tanstack/react-query'; import { Toaster } from './components/ui/toaster'; // 页面组件 import Dashboard from './pages/Dashboard'; import TestEditor from './pages/TestEditor'; import TestRunner from './pages/TestRunner'; import Reports from './pages/Reports'; import PerformanceTest from './pages/PerformanceTest'; import Settings from './pages/Settings'; import Login from './pages/Login'; // 布局组件 import MainLayout from './components/layout/MainLayout'; import AuthGuard from './components/auth/AuthGuard'; // 创建 React Query 客户端 const queryClient = new QueryClient({ defaultOptions: { queries: { refetchOnWindowFocus: false, retry: 1, staleTime: 5 * 60 * 1000, // 5分钟 }, }, }); function App() { return ( <QueryClientProvider client={queryClient}> <BrowserRouter> <Routes> {/* 公开路由 */} <Route path="/login" element={<Login />} /> {/* 受保护路由 */} <Route element={<AuthGuard><MainLayout /></AuthGuard>}> <Route path="/" element={<Navigate to="/dashboard" replace />} /> <Route path="/dashboard" element={<Dashboard />} /> <Route path="/test-editor" element={<TestEditor />} /> <Route path="/test-editor/:id" element={<TestEditor />} /> <Route path="/test-runner" element={<TestRunner />} /> <Route path="/reports" element={<Reports />} /> <Route path="/performance" element={<PerformanceTest />} /> <Route path="/settings" element={<Settings />} /> </Route> </Routes> <Toaster /> </BrowserRouter> </QueryClientProvider> ); } export default App;
2.1.3 可视化测试编辑器核心组件frontend/src/components/editor/TestEditor.tsx
import React, { useState, useCallback } from 'react'; import { DndContext, DragEndEvent, closestCenter } from '@dnd-kit/core'; import { SortableContext, verticalListSortingStrategy, arrayMove } from '@dnd-kit/sortable'; import { useMutation, useQuery } from '@tanstack/react-query'; import { Plus, Play, Save, Wand2 } from 'lucide-react'; import { Button } from '../ui/button'; import { Card } from '../ui/card'; import TestStepItem from './TestStepItem'; import ActionPalette from './ActionPalette'; import AIScenarioGenerator from './AIScenarioGenerator'; import { TestStep, TestCase } from '../../types/test'; import { testCaseService } from '../../services/testCaseService'; import { useToast } from '../../hooks/use-toast'; interface TestEditorProps { testCaseId?: string; projectId: string; } const TestEditor: React.FC<TestEditorProps> = ({ testCaseId, projectId }) => { const { toast } = useToast(); const [testCase, setTestCase] = useState<TestCase>({ id: testCaseId || '', name: 'New Test Case', description: '', steps: [], projectId, }); const [showAIGenerator, setShowAIGenerator] = useState(false); // 加载现有测试用例 const { isLoading } = useQuery({ queryKey: ['testCase', testCaseId], queryFn: () => testCaseService.getTestCase(testCaseId!), enabled: !!testCaseId, onSuccess: (data) => setTestCase(data), }); // 保存测试用例 const saveMutation = useMutation({ mutationFn: () => testCaseService.saveTestCase(testCase), onSuccess: () => { toast({ title: 'Success', description: 'Test case saved successfully', }); }, onError: () => { toast({ title: 'Error', description: 'Failed to save test case', variant: 'destructive', }); }, }); // 添加测试步骤 const handleAddStep = useCallback((actionType: string) => { const newStep: TestStep = { id: `step-${Date.now()}`, order: testCase.steps.length, actionType, selector: '', value: '', waitCondition: null, screenshot: false, }; setTestCase(prev => ({ ...prev, steps: [...prev.steps, newStep], })); }, [testCase.steps.length]); // 更新测试步骤 const handleUpdateStep = useCallback((stepId: string, updates: Partial<TestStep>) => { setTestCase(prev => ({ ...prev, steps: prev.steps.map(step => step.id === stepId ? { ...step, ...updates } : step ), })); }, []); // 删除测试步骤 const handleDeleteStep = useCallback((stepId: string) => { setTestCase(prev => ({ ...prev, steps: prev.steps.filter(step => step.id !== stepId), })); }, []); // 拖拽排序 const handleDragEnd = useCallback((event: DragEndEvent) => { const { active, over } = event; if (over && active.id !== over.id) { setTestCase(prev => { const oldIndex = prev.steps.findIndex(step => step.id === active.id); const newIndex = prev.steps.findIndex(step => step.id === over.id); const newSteps = arrayMove(prev.steps, oldIndex, newIndex).map((step, index) => ({ ...step, order: index, })); return { ...prev, steps: newSteps }; }); } }, []); // AI 生成测试场景回调 const handleAIGenerated = useCallback((scenarios: TestCase[]) => { if (scenarios.length > 0) { setTestCase(prev => ({ ...prev, steps: scenarios[0].steps, })); setShowAIGenerator(false); toast({ title: 'AI Generation Complete', description: `Generated ${scenarios.length} test scenarios`, }); } }, [toast]); if (isLoading) { return <div className="flex items-center justify-center h-full">Loading...</div>; } return ( <div className="h-full flex flex-col gap-4 p-6"> {/* 头部工具栏 */} <div className="flex items-center justify-between"> <div className="flex-1"> <input type="text" value={testCase.name} onChange={(e) => setTestCase(prev => ({ ...prev, name: e.target.value }))} className="text-2xl font-bold bg-transparent border-none focus:outline-none focus:ring-2 focus:ring-blue-500 rounded px-2" placeholder="Test Case Name" /> <textarea value={testCase.description} onChange={(e) => setTestCase(prev => ({ ...prev, description: e.target.value }))} className="mt-2 w-full bg-transparent border-none focus:outline-none focus:ring-2 focus:ring-blue-500 rounded px-2 text-gray-600" placeholder="Description (optional)" rows={2} /> </div> <div className="flex gap-2"> <Button variant="outline" onClick={() => setShowAIGenerator(true)} className="gap-2" > <Wand2 className="w-4 h-4" /> AI Generate </Button> <Button variant="outline" onClick={() => saveMutation.mutate()} disabled={saveMutation.isLoading} className="gap-2" > <Save className="w-4 h-4" /> Save </Button> <Button className="gap-2"> <Play className="w-4 h-4" /> Run Test </Button> </div> </div> {/* 主编辑区域 */} <div className="flex-1 flex gap-4 overflow-hidden"> {/* 左侧:动作面板 */} <Card className="w-64 p-4 overflow-y-auto"> <h3 className="font-semibold mb-4">Actions</h3> <ActionPalette onActionSelect={handleAddStep} /> </Card> {/* 中间:测试步骤列表 */} <Card className="flex-1 p-4 overflow-y-auto"> <div className="flex items-center justify-between mb-4"> <h3 className="font-semibold">Test Steps</h3> <span className="text-sm text-gray-500"> {testCase.steps.length} steps </span> </div> {testCase.steps.length === 0 ? ( <div className="flex flex-col items-center justify-center h-64 text-gray-400"> <Plus className="w-12 h-12 mb-2" /> <p>No steps yet. Drag actions from the left panel or use AI to generate.</p> </div> ) : ( <DndContext collisionDetection={closestCenter} onDragEnd={handleDragEnd}> <SortableContext items={testCase.steps.map(s => s.id)} strategy={verticalListSortingStrategy} > <div className="space-y-2"> {testCase.steps.map((step) => ( <TestStepItem key={step.id} step={step} onUpdate={handleUpdateStep} onDelete={handleDeleteStep} /> ))} </div> </SortableContext> </DndContext> )} </Card> {/* 右侧:预览和配置 */} <Card className="w-80 p-4 overflow-y-auto"> <h3 className="font-semibold mb-4">Configuration</h3> <div className="space-y-4"> <div> <label className="block text-sm font-medium mb-1">Priority</label> <select value={testCase.priority || 'medium'} onChange={(e) => setTestCase(prev => ({ ...prev, priority: e.target.value }))} className="w-full border rounded px-3 py-2" > <option value="low">Low</option> <option value="medium">Medium</option> <option value="high">High</option> <option value="critical">Critical</option> </select> </div> <div> <label className="block text-sm font-medium mb-1">Tags</label> <input type="text" placeholder="smoke, regression, ..." className="w-full border rounded px-3 py-2" /> </div> </div> </Card> </div> {/* AI 生成器模态框 */} {showAIGenerator && ( <AIScenarioGenerator projectId={projectId} onGenerated={handleAIGenerated} onClose={() => setShowAIGenerator(false)} /> )} </div> ); }; export default TestEditor;

TestMaster 自动化测试平台 - 第二部分:后端核心服务代码

2.2 后端核心服务实现

2.2.1 API 网关服务backend/gateway/src/main.ts

import { NestFactory } from '@nestjs/core'; import { ValidationPipe } from '@nestjs/common'; import { SwaggerModule, DocumentBuilder } from '@nestjs/swagger'; import { AppModule } from './app.module'; import helmet from 'helmet'; import * as compression from 'compression'; async function bootstrap() { const app = await NestFactory.create(AppModule, { logger: ['error', 'warn', 'log', 'debug', 'verbose'], }); // 安全中间件 app.use(helmet()); // 压缩响应 app.use(compression()); // 全局验证管道 app.useGlobalPipes( new ValidationPipe({ whitelist: true, forbidNonWhitelisted: true, transform: true, transformOptions: { enableImplicitConversion: true, }, }), ); // CORS 配置 app.enableCors({ origin: process.env.FRONTEND_URL || 'http://localhost:5173', credentials: true, }); // Swagger API 文档 const config = new DocumentBuilder() .setTitle('TestMaster API') .setDescription('TestMaster 自动化测试平台 API 文档') .setVersion('1.0') .addBearerAuth() .addTag('auth', '认证相关接口') .addTag('projects', '项目管理接口') .addTag('test-cases', '测试用例接口') .addTag('executions', '测试执行接口') .addTag('reports', '测试报告接口') .build(); const document = SwaggerModule.createDocument(app, config); SwaggerModule.setup('api-docs', app, document); const port = process.env.PORT || 3000; await app.listen(port); console.log(` ╔════════════════════════════════════════════════════════════════╗ ║ ║ ║ 🚀 TestMaster API Gateway Started ║ ║ ║ ║ Server: http://localhost:${port} ║ ║ API Docs: http://localhost:${port}/api-docs ║ ║ GraphQL: http://localhost:${port}/graphql ║ ║ ║ ║ Environment: ${process.env.NODE_ENV || 'development'} ║ ║ Version: 1.0.0 ║ ║ ║ ╚════════════════════════════════════════════════════════════════╝ `); } bootstrap();

2.2.2 应用模块backend/gateway/src/app.module.ts

import { Module } from '@nestjs/common'; import { ConfigModule, ConfigService } from '@nestjs/config'; import { TypeOrmModule } from '@nestjs/typeorm'; import { GraphQLModule } from '@nestjs/graphql'; import { ApolloDriver, ApolloDriverConfig } from '@nestjs/apollo'; import { BullModule } from '@nestjs/bull'; import { CacheModule } from '@nestjs/cache-manager'; import { redisStore } from 'cache-manager-redis-store'; import { join } from 'path'; // 模块导入 import { AuthModule } from './auth/auth.module'; import { UsersModule } from './users/users.module'; import { ProjectsModule } from './projects/projects.module'; import { TestCasesModule } from './test-cases/test-cases.module'; import { ExecutionsModule } from './executions/executions.module'; import { ReportsModule } from './reports/reports.module'; import { PerformanceModule } from './performance/performance.module'; import { DevicesModule } from './devices/devices.module'; import { CicdModule } from './cicd/cicd.module'; import { AiModule } from './ai/ai.module'; @Module({ imports: [ // 配置模块 ConfigModule.forRoot({ isGlobal: true, envFilePath: `.env.${process.env.NODE_ENV || 'development'}`, }), // 数据库连接 TypeOrmModule.forRootAsync({ imports: [ConfigModule], useFactory: (configService: ConfigService) => ({ type: 'postgres', host: configService.get('DB_HOST'), port: configService.get('DB_PORT'), username: configService.get('DB_USERNAME'), password: configService.get('DB_PASSWORD'), database: configService.get('DB_DATABASE'), entities: [__dirname + '/**/*.entity{.ts,.js}'], synchronize: configService.get('NODE_ENV') === 'development', logging: configService.get('NODE_ENV') === 'development', ssl: configService.get('DB_SSL') === 'true' ? { rejectUnauthorized: false } : false, }), inject: [ConfigService], }), // GraphQL 配置 GraphQLModule.forRoot<ApolloDriverConfig>({ driver: ApolloDriver, autoSchemaFile: join(process.cwd(), 'src/schema.gql'), sortSchema: true, playground: true, context: ({ req, res }) => ({ req, res }), formatError: (error) => { return { message: error.message, code: error.extensions?.code, path: error.path, }; }, }), // Redis 缓存 CacheModule.registerAsync({ isGlobal: true, imports: [ConfigModule], useFactory: async (configService: ConfigService) => ({ store: redisStore as any, host: configService.get('REDIS_HOST'), port: configService.get('REDIS_PORT'), password: configService.get('REDIS_PASSWORD'), ttl: 300, // 5分钟默认缓存 }), inject: [ConfigService], }), // 消息队列 BullModule.forRootAsync({ imports: [ConfigModule], useFactory: async (configService: ConfigService) => ({ redis: { host: configService.get('REDIS_HOST'), port: configService.get('REDIS_PORT'), password: configService.get('REDIS_PASSWORD'), }, }), inject: [ConfigService], }), // 业务模块 AuthModule, UsersModule, ProjectsModule, TestCasesModule, ExecutionsModule, ReportsModule, PerformanceModule, DevicesModule, CicdModule, AiModule, ], }) export class AppModule {}

2.2.3 认证模块backend/gateway/src/auth/auth.service.ts

import { Injectable, UnauthorizedException } from '@nestjs/common'; import { JwtService } from '@nestjs/jwt'; import { InjectRepository } from '@nestjs/typeorm'; import { Repository } from 'typeorm'; import * as bcrypt from 'bcrypt'; import { User } from '../users/entities/user.entity'; import { LoginDto, RegisterDto } from './dto/auth.dto'; @Injectable() export class AuthService { constructor( @InjectRepository(User) private usersRepository: Repository<User>, private jwtService: JwtService, ) {} /** * 用户注册 */ async register(registerDto: RegisterDto) { const { email, username, password, fullName } = registerDto; // 检查用户是否已存在 const existingUser = await this.usersRepository.findOne({ where: [{ email }, { username }], }); if (existingUser) { throw new UnauthorizedException('Email or username already exists'); } // 密码加密 const salt = await bcrypt.genSalt(10); const passwordHash = await bcrypt.hash(password, salt); // 创建用户 const user = this.usersRepository.create({ email, username, passwordHash, fullName, role: 'user', isActive: true, }); await this.usersRepository.save(user); // 生成 JWT const tokens = await this.generateTokens(user); return { user: this.sanitizeUser(user), ...tokens, }; } /** * 用户登录 */ async login(loginDto: LoginDto) { const { email, password } = loginDto; // 查找用户 const user = await this.usersRepository.findOne({ where: { email }, }); if (!user) { throw new UnauthorizedException('Invalid credentials'); } // 验证密码 const isPasswordValid = await bcrypt.compare(password, user.passwordHash); if (!isPasswordValid) { throw new UnauthorizedException('Invalid credentials'); } // 检查用户是否激活 if (!user.isActive) { throw new UnauthorizedException('Account is deactivated'); } // 更新最后登录时间 user.lastLoginAt = new Date(); await this.usersRepository.save(user); // 生成 JWT const tokens = await this.generateTokens(user); return { user: this.sanitizeUser(user), ...tokens, }; } /** * 刷新令牌 */ async refreshToken(refreshToken: string) { try { const payload = this.jwtService.verify(refreshToken, { secret: process.env.JWT_REFRESH_SECRET, }); const user = await this.usersRepository.findOne({ where: { id: payload.sub }, }); if (!user || !user.isActive) { throw new UnauthorizedException('Invalid refresh token'); } return this.generateTokens(user); } catch (error) { throw new UnauthorizedException('Invalid refresh token'); } } /** * 验证用户 */ async validateUser(userId: string): Promise<User> { const user = await this.usersRepository.findOne({ where: { id: userId }, }); if (!user || !user.isActive) { throw new UnauthorizedException('User not found or inactive'); } return user; } /** * 生成访问令牌和刷新令牌 */ private async generateTokens(user: User) { const payload = { sub: user.id, email: user.email, username: user.username, role: user.role, }; const [accessToken, refreshToken] = await Promise.all([ this.jwtService.signAsync(payload, { secret: process.env.JWT_SECRET, expiresIn: '15m', }), this.jwtService.signAsync(payload, { secret: process.env.JWT_REFRESH_SECRET, expiresIn: '7d', }), ]); return { accessToken, refreshToken, expiresIn: 900, // 15分钟 }; } /** * 清理用户敏感信息 */ private sanitizeUser(user: User) { const { passwordHash, ...sanitized } = user; return sanitized; } }

2.2.4 测试用例服务backend/gateway/src/test-cases/test-cases.service.ts

import { Injectable, NotFoundException, ForbiddenException } from '@nestjs/common'; import { InjectRepository } from '@nestjs/typeorm'; import { Repository } from 'typeorm'; import { InjectQueue } from '@nestjs/bull'; import { Queue } from 'bull'; import { TestCase } from './entities/test-case.entity'; import { TestStep } from './entities/test-step.entity'; import { CreateTestCaseDto, UpdateTestCaseDto } from './dto/test-case.dto'; import { User } from '../users/entities/user.entity'; @Injectable() export class TestCasesService { constructor( @InjectRepository(TestCase) private testCasesRepository: Repository<TestCase>, @InjectRepository(TestStep) private testStepsRepository: Repository<TestStep>, @InjectQueue('test-execution') private executionQueue: Queue, ) {} /** * 创建测试用例 */ async create(createTestCaseDto: CreateTestCaseDto, user: User): Promise<TestCase> { const { steps, ...testCaseData } = createTestCaseDto; // 创建测试用例 const testCase = this.testCasesRepository.create({ ...testCaseData, createdBy: user, }); const savedTestCase = await this.testCasesRepository.save(testCase); // 创建测试步骤 if (steps && steps.length > 0) { const testSteps = steps.map((step, index) => this.testStepsRepository.create({ ...step, stepOrder: index, testCase: savedTestCase, }), ); await this.testStepsRepository.save(testSteps); } return this.findOne(savedTestCase.id); } /** * 查询所有测试用例 */ async findAll( suiteId?: string, projectId?: string, tags?: string[], status?: string, ): Promise<TestCase[]> { const query = this.testCasesRepository .createQueryBuilder('testCase') .leftJoinAndSelect('testCase.steps', 'steps') .leftJoinAndSelect('testCase.suite', 'suite') .leftJoinAndSelect('testCase.createdBy', 'createdBy') .orderBy('steps.stepOrder', 'ASC'); if (suiteId) { query.andWhere('testCase.suiteId = :suiteId', { suiteId }); } if (projectId) { query.andWhere('suite.projectId = :projectId', { projectId }); } if (tags && tags.length > 0) { query.andWhere('testCase.tags && :tags', { tags }); } if (status) { query.andWhere('testCase.status = :status', { status }); } return query.getMany(); } /** * 查询单个测试用例 */ async findOne(id: string): Promise<TestCase> { const testCase = await this.testCasesRepository.findOne({ where: { id }, relations: ['steps', 'suite', 'createdBy'], order: { steps: { stepOrder: 'ASC', }, }, }); if (!testCase) { throw new NotFoundException(`Test case with ID ${id} not found`); } return testCase; } /** * 更新测试用例 */ async update( id: string, updateTestCaseDto: UpdateTestCaseDto, user: User, ): Promise<TestCase> { const testCase = await this.findOne(id); // 权限检查 if (testCase.createdBy.id !== user.id && user.role !== 'admin') { throw new ForbiddenException('You do not have permission to update this test case'); } const { steps, ...testCaseData } = updateTestCaseDto; // 更新测试用例基本信息 Object.assign(testCase, testCaseData); await this.testCasesRepository.save(testCase); // 更新测试步骤 if (steps) { // 删除旧步骤 await this.testStepsRepository.delete({ testCase: { id } }); // 创建新步骤 if (steps.length > 0) { const testSteps = steps.map((step, index) => this.testStepsRepository.create({ ...step, stepOrder: index, testCase, }), ); await this.testStepsRepository.save(testSteps); } } return this.findOne(id); } /** * 删除测试用例 */ async remove(id: string, user: User): Promise<void> { const testCase = await this.findOne(id); // 权限检查 if (testCase.createdBy.id !== user.id && user.role !== 'admin') { throw new ForbiddenException('You do not have permission to delete this test case'); } await this.testCasesRepository.remove(testCase); } /** * 执行测试用例 */ async execute( id: string, environment: string, browser: string, device?: string, ): Promise<{ executionId: string }> { const testCase = await this.findOne(id); // 创建执行任务 const job = await this.executionQueue.add('execute-test', { testCaseId: testCase.id, environment, browser, device, timestamp: new Date(), }); return { executionId: job.id.toString(), }; } /** * 批量执行测试用例 */ async executeBatch( testCaseIds: string[], environment: string, browser: string, device?: string, ): Promise<{ executionIds: string[] }> { const jobs = await Promise.all( testCaseIds.map((id) => this.executionQueue.add('execute-test', { testCaseId: id, environment, browser, device, timestamp: new Date(), }), ), ); return { executionIds: jobs.map((job) => job.id.toString()), }; } /** * 复制测试用例 */ async duplicate(id: string, user: User): Promise<TestCase> { const originalTestCase = await this.findOne(id); const duplicatedTestCase = this.testCasesRepository.create({ name: `${originalTestCase.name} (Copy)`, description: originalTestCase.description, priority: originalTestCase.priority, tags: originalTestCase.tags, suite: originalTestCase.suite, createdBy: user, }); const savedTestCase = await this.testCasesRepository.save(duplicatedTestCase); // 复制测试步骤 if (originalTestCase.steps && originalTestCase.steps.length > 0) { const duplicatedSteps = originalTestCase.steps.map((step) => this.testStepsRepository.create({ stepOrder: step.stepOrder, actionType: step.actionType, selector: step.selector, value: step.value, waitCondition: step.waitCondition, screenshot: step.screenshot, testCase: savedTestCase, }), ); await this.testStepsRepository.save(duplicatedSteps); } return this.findOne(savedTestCase.id); } /** * 获取测试用例统计信息 */ async getStatistics(projectId: string) { const query = this.testCasesRepository .createQueryBuilder('testCase') .leftJoin('testCase.suite', 'suite') .where('suite.projectId = :projectId', { projectId }); const [total, byStatus, byPriority] = await Promise.all([ query.getCount(), query .select('testCase.status', 'status') .addSelect('COUNT(*)', 'count') .groupBy('testCase.status') .getRawMany(), query .select('testCase.priority', 'priority') .addSelect('COUNT(*)', 'count') .groupBy('testCase.priority') .getRawMany(), ]); return { total, byStatus: byStatus.reduce((acc, item) => { acc[item.status] = parseInt(item.count); return acc; }, {}), byPriority: byPriority.reduce((acc, item) => { acc[item.priority] = parseInt(item.count); return acc; }, {}), }; } }

2.2.5 测试用例实体backend/gateway/src/test-cases/entities/test-case.entity.ts

import { Entity, PrimaryGeneratedColumn, Column, ManyToOne, OneToMany, CreateDateColumn, UpdateDateColumn, JoinColumn, } from 'typeorm'; import { TestSuite } from '../../test-suites/entities/test-suite.entity'; import { TestStep } from './test-step.entity'; import { User } from '../../users/entities/user.entity'; import { TestExecution } from '../../executions/entities/test-execution.entity'; @Entity('test_cases') export class TestCase { @PrimaryGeneratedColumn('uuid') id: string; @ManyToOne(() => TestSuite, (suite) => suite.testCases, { onDelete: 'CASCADE' }) @JoinColumn({ name: 'suite_id' }) suite: TestSuite; @Column({ length: 255 }) name: string; @Column({ type: 'text', nullable: true }) description: string; @Column({ length: 20, default: 'medium' }) priority: string; @Column({ length: 50, default: 'draft' }) status: string; @Column({ type: 'jsonb', nullable: true }) testData: any; @Column({ type: 'jsonb', nullable: true }) expectedResults: any; @Column({ type: 'text', array: true, default: [] }) tags: string[]; @OneToMany(() => TestStep, (step) => step.testCase, { cascade: true }) steps: TestStep[]; @OneToMany(() => TestExecution, (execution) => execution.testCase) executions: TestExecution[]; @ManyToOne(() => User) @JoinColumn({ name: 'created_by' }) createdBy: User; @CreateDateColumn({ name: 'created_at' }) createdAt: Date; @UpdateDateColumn({ name: 'updated_at' }) updatedAt: Date; }

2.2.6 测试步骤实体backend/gateway/src/test-cases/entities/test-step.entity.ts

import { Entity, PrimaryGeneratedColumn, Column, ManyToOne, CreateDateColumn, JoinColumn, } from 'typeorm'; import { TestCase } from './test-case.entity'; @Entity('test_steps') export class TestStep { @PrimaryGeneratedColumn('uuid') id: string; @ManyToOne(() => TestCase, (testCase) => testCase.steps, { onDelete: 'CASCADE' }) @JoinColumn({ name: 'test_case_id' }) testCase: TestCase; @Column({ name: 'step_order' }) stepOrder: number; @Column({ name: 'action_type', length: 100 }) actionType: string; @Column({ length: 500, nullable: true }) selector: string; @Column({ type: 'text', nullable: true }) value: string; @Column({ name: 'wait_condition', type: 'jsonb', nullable: true }) waitCondition: { type: 'element' | 'time' | 'url' | 'custom'; value: string | number; timeout?: number; }; @Column({ default: false }) screenshot: boolean; @CreateDateColumn({ name: 'created_at' }) createdAt: Date; }

2.2.7 测试执行服务backend/gateway/src/executions/executions.service.ts

import { Injectable, NotFoundException } from '@nestjs/common'; import { InjectRepository } from '@nestjs/typeorm'; import { Repository, In, Between } from 'typeorm'; import { TestExecution } from './entities/test-execution.entity'; import { TestResult } from './entities/test-result.entity'; import { TestCasesService } from '../test-cases/test-cases.service'; @Injectable() export class ExecutionsService { constructor( @InjectRepository(TestExecution) private executionsRepository: Repository<TestExecution>, @InjectRepository(TestResult) private resultsRepository: Repository<TestResult>, private testCasesService: TestCasesService, ) {} /** * 创建测试执行记录 */ async create( testCaseId: string, environment: string, browser: string, device?: string, triggeredBy?: string, ciBuildId?: string, ): Promise<TestExecution> { const testCase = await this.testCasesService.findOne(testCaseId); const execution = this.executionsRepository.create({ testCase, environment, browser, device, status: 'pending', triggeredBy: triggeredBy ? { id: triggeredBy } as any : null, ciBuildId, }); return this.executionsRepository.save(execution); } /** * 查询所有执行记录 */ async findAll(filters: { projectId?: string; suiteId?: string; testCaseId?: string; status?: string; environment?: string; browser?: string; startDate?: Date; endDate?: Date; limit?: number; offset?: number; }): Promise<{ executions: TestExecution[]; total: number }> { const query = this.executionsRepository .createQueryBuilder('execution') .leftJoinAndSelect('execution.testCase', 'testCase') .leftJoinAndSelect('execution.suite', 'suite') .leftJoinAndSelect('execution.results', 'results') .leftJoinAndSelect('execution.triggeredBy', 'triggeredBy') .orderBy('execution.createdAt', 'DESC'); if (filters.testCaseId) { query.andWhere('execution.testCaseId = :testCaseId', { testCaseId: filters.testCaseId, }); } if (filters.suiteId) { query.andWhere('execution.suiteId = :suiteId', { suiteId: filters.suiteId, }); } if (filters.status) { query.andWhere('execution.status = :status', { status: filters.status }); } if (filters.environment) { query.andWhere('execution.environment = :environment', { environment: filters.environment, }); } if (filters.browser) { query.andWhere('execution.browser = :browser', { browser: filters.browser }); } if (filters.startDate && filters.endDate) { query.andWhere('execution.createdAt BETWEEN :startDate AND :endDate', { startDate: filters.startDate, endDate: filters.endDate, }); } const total = await query.getCount(); if (filters.limit) { query.take(filters.limit); } if (filters.offset) { query.skip(filters.offset); } const executions = await query.getMany(); return { executions, total }; } /** * 查询单个执行记录 */ async findOne(id: string): Promise<TestExecution> { const execution = await this.executionsRepository.findOne({ where: { id }, relations: ['testCase', 'testCase.steps', 'suite', 'results', 'triggeredBy'], order: { results: { createdAt: 'ASC', }, }, }); if (!execution) { throw new NotFoundException(`Execution with ID ${id} not found`); } return execution; } /** * 更新执行状态 */ async updateStatus( id: string, status: 'running' | 'passed' | 'failed' | 'skipped' | 'error', startedAt?: Date, completedAt?: Date, ): Promise<TestExecution> { const execution = await this.findOne(id); execution.status = status; if (startedAt) { execution.startedAt = startedAt; } if (completedAt) { execution.completedAt = completedAt; if (execution.startedAt) { execution.durationMs = completedAt.getTime() - execution.startedAt.getTime(); } } return this.executionsRepository.save(execution); } /** * 添加测试结果 */ async addResult( executionId: string, stepId: string, status: 'passed' | 'failed' | 'skipped' | 'error', errorMessage?: string, screenshotUrl?: string, videoUrl?: string, logs?: any, metrics?: any, ): Promise<TestResult> { const execution = await this.findOne(executionId); const result = this.resultsRepository.create({ execution, step: { id: stepId } as any, status, errorMessage, screenshotUrl, videoUrl, logs, metrics, }); return this.resultsRepository.save(result); } /** * 获取执行统计 */ async getStatistics( projectId?: string, startDate?: Date, endDate?: Date, ): Promise<any> { const query = this.executionsRepository .createQueryBuilder('execution') .leftJoin('execution.testCase', 'testCase') .leftJoin('testCase.suite', 'suite'); if (projectId) { query.where('suite.projectId = :projectId', { projectId }); } if (startDate && endDate) { query.andWhere('execution.createdAt BETWEEN :startDate AND :endDate', { startDate, endDate, }); } const [ total, byStatus, byEnvironment, byBrowser, avgDuration, successRate, ] = await Promise.all([ query.getCount(), query .select('execution.status', 'status') .addSelect('COUNT(*)', 'count') .groupBy('execution.status') .getRawMany(), query .select('execution.environment', 'environment') .addSelect('COUNT(*)', 'count') .groupBy('execution.environment') .getRawMany(), query .select('execution.browser', 'browser') .addSelect('COUNT(*)', 'count') .groupBy('execution.browser') .getRawMany(), query .select('AVG(execution.durationMs)', 'avgDuration') .where('execution.durationMs IS NOT NULL') .getRawOne(), query .select( `(COUNT(CASE WHEN execution.status = 'passed' THEN 1 END)::float / COUNT(*)::float * 100)`, 'successRate', ) .getRawOne(), ]); return { total, byStatus: byStatus.reduce((acc, item) => { acc[item.status] = parseInt(item.count); return acc; }, {}), byEnvironment: byEnvironment.reduce((acc, item) => { acc[item.environment] = parseInt(item.count); return acc; }, {}), byBrowser: byBrowser.reduce((acc, item) => { acc[item.browser] = parseInt(item.count); return acc; }, {}), avgDuration: parseFloat(avgDuration?.avgDuration || '0'), successRate: parseFloat(successRate?.successRate || '0'), }; } /** * 获取趋势数据 */ async getTrends( projectId: string, days: number = 30, ): Promise<any[]> { const startDate = new Date(); startDate.setDate(startDate.getDate() - days); const executions = await this.executionsRepository .createQueryBuilder('execution') .leftJoin('execution.testCase', 'testCase') .leftJoin('testCase.suite', 'suite') .where('suite.projectId = :projectId', { projectId }) .andWhere('execution.createdAt >= :startDate', { startDate }) .select([ `DATE(execution.createdAt) as date`, `COUNT(*) as total`, `COUNT(CASE WHEN execution.status = 'passed' THEN 1 END) as passed`, `COUNT(CASE WHEN execution.status = 'failed' THEN 1 END) as failed`, `AVG(execution.durationMs) as avgDuration`, ]) .groupBy('DATE(execution.createdAt)') .orderBy('date', 'ASC') .getRawMany(); return executions.map((item) => ({ date: item.date, total: parseInt(item.total), passed: parseInt(item.passed), failed: parseInt(item.failed), avgDuration: parseFloat(item.avgDuration || '0'), successRate: (parseInt(item.passed) / parseInt(item.total)) * 100, })); } }

继续下一部分...

2.2.8 测试执行消费者backend/gateway/src/executions/execution.processor.ts

import { Process, Processor } from '@nestjs/bull'; import { Logger } from '@nestjs/common'; import { Job } from 'bull'; import { ExecutionsService } from './executions.service'; import { TestEngineClient } from '../clients/test-engine.client'; @Processor('test-execution') export class ExecutionProcessor { private readonly logger = new Logger(ExecutionProcessor.name); constructor( private executionsService: ExecutionsService, private testEngineClient: TestEngineClient, ) {} @Process('execute-test') async handleTestExecution(job: Job) { const { testCaseId, environment, browser, device, triggeredBy, ciBuildId } = job.data; this.logger.log( `Starting test execution for test case ${testCaseId} on ${browser}`, ); try { // 创建执行记录 const execution = await this.executionsService.create( testCaseId, environment, browser, device, triggeredBy, ciBuildId, ); // 更新状态为运行中 await this.executionsService.updateStatus( execution.id, 'running', new Date(), ); // 调用测试引擎执行测试 const result = await this.testEngineClient.executeTest({ executionId: execution.id, testCaseId, environment, browser, device, }); // 更新执行结果 await this.executionsService.updateStatus( execution.id, result.status, result.startedAt, result.completedAt, ); // 保存每个步骤的结果 for (const stepResult of result.stepResults) { await this.executionsService.addResult( execution.id, stepResult.stepId, stepResult.status, stepResult.errorMessage, stepResult.screenshotUrl, stepResult.videoUrl, stepResult.logs, stepResult.metrics, ); } this.logger.log( `Test execution completed for ${testCaseId}: ${result.status}`, ); return { executionId: execution.id, status: result.status }; } catch (error) { this.logger.error( `Test execution failed for ${testCaseId}: ${error.message}`, error.stack, ); throw error; } } @Process('execute-suite') async handleSuiteExecution(job: Job) { const { suiteId, environment, browser, device, triggeredBy, ciBuildId } = job.data; this.logger.log(`Starting suite execution for suite ${suiteId}`); // 实现套件执行逻辑 // 获取套件中的所有测试用例,依次执行 return { suiteId, status: 'completed' }; } }

现在让我继续提供 AI 生成器服务和 Python 测试引擎的实现...

2.3 AI 生成器服务(Python)

2.3.1 AI 服务主文件backend/services/ai-generator/src/main.py
""" TestMaster AI 测试场景生成服务 使用 GPT-4 或本地 LLM 生成测试场景 """ from fastapi import FastAPI, HTTPException, Depends from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel, HttpUrl from typing import List, Optional, Dict, Any import logging import asyncio from datetime import datetime from llm_service import LLMService from scenario_generator import ScenarioGenerator from page_analyzer import PageAnalyzer # 配置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) # 创建 FastAPI 应用 app = FastAPI( title="TestMaster AI Generator", description="AI-powered test scenario generation service", version="1.0.0" ) # CORS 配置 app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # 请求模型 class GenerateRequest(BaseModel): url: HttpUrl project_id: str user_id: str options: Optional[Dict[str, Any]] = { "max_scenarios": 10, "include_edge_cases": True, "include_negative_tests": True, "complexity": "medium" # low, medium, high } class TestScenario(BaseModel): name: str description: str priority: str steps: List[Dict[str, Any]] confidence: float tags: List[str] class GenerateResponse(BaseModel): project_id: str url: str page_structure: Dict[str, Any] scenarios: List[TestScenario] generated_at: datetime model: str total_scenarios: int # 依赖注入 def get_llm_service(): return LLMService() def get_page_analyzer(): return PageAnalyzer() def get_scenario_generator( llm_service: LLMService = Depends(get_llm_service) ): return ScenarioGenerator(llm_service) # API 端点 @app.get("/health") async def health_check(): """健康检查""" return { "status": "healthy", "service": "ai-generator", "version": "1.0.0", "timestamp": datetime.now().isoformat() } @app.post("/generate", response_model=GenerateResponse) async def generate_test_scenarios( request: GenerateRequest, page_analyzer: PageAnalyzer = Depends(get_page_analyzer), scenario_generator: ScenarioGenerator = Depends(get_scenario_generator) ): """ 生成测试场景 Args: request: 生成请求,包含 URL 和选项 Returns: 生成的测试场景列表 """ try: logger.info(f"Generating test scenarios for URL: {request.url}") # 1. 分析页面结构 logger.info("Analyzing page structure...") page_structure = await page_analyzer.analyze(str(request.url)) # 2. 生成测试场景 logger.info("Generating test scenarios with AI...") scenarios = await scenario_generator.generate( url=str(request.url), page_structure=page_structure, options=request.options ) # 3. 构建响应 response = GenerateResponse( project_id=request.project_id, url=str(request.url), page_structure=page_structure, scenarios=scenarios, generated_at=datetime.now(), model=scenario_generator.model_name, total_scenarios=len(scenarios) ) logger.info(f"Successfully generated {len(scenarios)} test scenarios") return response except Exception as e: logger.error(f"Error generating test scenarios: {str(e)}", exc_info=True) raise HTTPException( status_code=500, detail=f"Failed to generate test scenarios: {str(e)}" ) @app.post("/analyze-page") async def analyze_page( url: HttpUrl, page_analyzer: PageAnalyzer = Depends(get_page_analyzer) ): """ 分析页面结构 Args: url: 要分析的页面 URL Returns: 页面结构分析结果 """ try: logger.info(f"Analyzing page: {url}") page_structure = await page_analyzer.analyze(str(url)) return { "url": str(url), "structure": page_structure, "analyzed_at": datetime.now().isoformat() } except Exception as e: logger.error(f"Error analyzing page: {str(e)}", exc_info=True) raise HTTPException( status_code=500, detail=f"Failed to analyze page: {str(e)}" ) if __name__ == "__main__": import uvicorn print(""" ╔════════════════════════════════════════════════════════════════╗ ║ ║ ║ 🤖 TestMaster AI Generator Service ║ ║ ║ ║ Server: http://localhost:8001 ║ ║ Docs: http://localhost:8001/docs ║ ║ ║ ║ Status: Starting... ║ ║ Version: 1.0.0 ║ ║ ║ ╚════════════════════════════════════════════════════════════════╝ """) uvicorn.run( "main:app", host="0.0.0.0", port=8001, reload=True, log_level="info" )
2.3.2 LLM 服务backend/services/ai-generator/src/llm_service.py
""" LLM 服务 - 支持 OpenAI GPT-4 和本地 LLM """ import os import json import logging from typing import List, Dict, Any, Optional from openai import AsyncOpenAI import anthropic logger = logging.getLogger(__name__) class LLMService: """大语言模型服务""" def __init__(self): self.provider = os.getenv("LLM_PROVIDER", "openai") # openai, anthropic, local self.model_name = os.getenv("LLM_MODEL", "gpt-4-turbo-preview") if self.provider == "openai": self.client = AsyncOpenAI(api_key=os.getenv("OPENAI_API_KEY")) elif self.provider == "anthropic": self.client = anthropic.AsyncAnthropic(api_key=os.getenv("ANTHROPIC_API_KEY")) else: # 本地 LLM (使用 Ollama 或其他) self.client = None logger.info("Using local LLM") async def generate_test_scenarios( self, url: str, page_structure: Dict[str, Any], options: Dict[str, Any] ) -> List[Dict[str, Any]]: """ 使用 LLM 生成测试场景 Args: url: 页面 URL page_structure: 页面结构分析结果 options: 生成选项 Returns: 测试场景列表 """ # 构建提示词 prompt = self._build_prompt(url, page_structure, options) # 调用 LLM if self.provider == "openai": response = await self._call_openai(prompt) elif self.provider == "anthropic": response = await self._call_anthropic(prompt) else: response = await self._call_local_llm(prompt) # 解析响应 scenarios = self._parse_response(response) return scenarios def _build_prompt( self, url: str, page_structure: Dict[str, Any], options: Dict[str, Any] ) -> str: """构建 LLM 提示词""" max_scenarios = options.get("max_scenarios", 10) include_edge_cases = options.get("include_edge_cases", True) include_negative_tests = options.get("include_negative_tests", True) complexity = options.get("complexity", "medium") prompt = f"""You are an expert QA automation engineer. Analyze the following web page and generate comprehensive test scenarios. **Page URL:** {url} **Page Structure:** ```json {json.dumps(page_structure, indent=2)}

Requirements:

  • Generate up to {max_scenarios} test scenarios
  • Include edge cases: {include_edge_cases}
  • Include negative tests: {include_negative_tests}
  • Complexity level: {complexity}

Output Format:
Return a JSON array of test scenarios. Each scenario should have:

  • name: Clear, descriptive test name
  • description: What the test validates
  • priority: "critical", "high", "medium", or "low"
  • steps: Array of test steps with:
    • action: Action type (navigate, click, type, select, wait, assert, etc.)
    • selector: CSS selector or XPath
    • value: Input value (if applicable)
    • description: Human-readable step description
  • confidence: Your confidence in this test (0.0 to 1.0)
  • tags: Relevant tags (e.g., ["smoke", "regression", "ui"])

Example:

[ {{ "name": "User Login - Valid Credentials", "description": "Verify that a user can successfully log in with valid credentials", "priority": "critical", "steps": [ {{ "action": "navigate", "selector": "", "value": "{url}", "description": "Navigate to login page" }}, {{ "action": "type", "selector": "#email", "value": "test@example.com", "description": "Enter email address" }}, {{ "action": "type", "selector": "#password", "value": "password123", "description": "Enter password" }}, {{ "action": "click", "selector": "button[type='submit']", "value": "", "description": "Click login button" }}, {{ "action": "assert", "selector": ".dashboard", "value": "visible", "description": "Verify dashboard is displayed" }} ], "confidence": 0.95, "tags": ["smoke", "authentication", "critical"] }} ]

Generate test scenarios now:"""

return prompt async def _call_openai(self, prompt: str) -> str: """调用 OpenAI API""" try: response = await self.client.chat.completions.create( model=self.model_name, messages=[ { "role": "system", "content": "You are an expert QA automation engineer specializing in web testing. Always respond with valid JSON." }, { "role": "user", "content": prompt } ], temperature=0.7, max_tokens=4000, response_format={"type": "json_object"} ) return response.choices[0].message.content except Exception as e: logger.error(f"OpenAI API error: {str(e)}") raise async def _call_anthropic(self, prompt: str) -> str: """调用 Anthropic Claude API""" try: response = await self.client.messages.create( model=self.model_name, max_tokens=4000, messages=[ { "role": "user", "content": prompt } ] ) return response.content[0].text except Exception as e: logger.error(f"Anthropic API error: {str(e)}") raise async def _call_local_llm(self, prompt: str) -> str: """调用本地 LLM (Ollama)""" import aiohttp try: async with aiohttp.ClientSession() as session: async with session.post( "http://localhost:11434/api/generate", json={ "model": "llama2", "prompt": prompt, "stream": False } ) as response: result = await response.json() return result["response"] except Exception as e: logger.error(f"Local LLM error: {str(e)}") raise def _parse_response(self, response: str) -> List[Dict[str, Any]]: """解析 LLM 响应""" try: # 尝试直接解析 JSON data = json.loads(response) # 如果返回的是对象而不是数组,尝试提取数组 if isinstance(data, dict): if "scenarios" in data: scenarios = data["scenarios"] elif "test_scenarios" in data: scenarios = data["test_scenarios"] else: # 假设整个对象就是一个场景 scenarios = [data] else: scenarios = data # 验证和清理场景 validated_scenarios = [] for scenario in scenarios: if self._validate_scenario(scenario): validated_scenarios.append(scenario) else: logger.warning(f"Invalid scenario skipped: {scenario.get('name', 'Unknown')}") return validated_scenarios except json.JSONDecodeError as e: logger.error(f"Failed to parse LLM response: {str(e)}") logger.error(f"Response: {response}") # 尝试提取 JSON 代码块 import re json_match = re.search(r'```json\s*(.*?)\s*```', response, re.DOTALL) if json_match: try: data = json.loads(json_match.group(1)) return self._parse_response(json.dumps(data)) except: pass return [] def _validate_scenario(self, scenario: Dict[str, Any]) -> bool: """验证测试场景格式""" required_fields = ["name", "description", "steps"] # 检查必需字段 for field in required_fields: if field not in scenario: return False # 检查步骤 if not isinstance(scenario["steps"], list) or len(scenario["steps"]) == 0: return False # 验证每个步骤 for step in scenario["steps"]: if not isinstance(step, dict): return False if "action" not in step: return False return True
继续下一部分... #### 2.3.3 页面分析器 `backend/services/ai-generator/src/page_analyzer.py` ```python """ 页面分析器 - 使用 Playwright 分析页面结构 """ import logging from typing import Dict, Any, List from playwright.async_api import async_playwright, Page import asyncio logger = logging.getLogger(__name__) class PageAnalyzer: """页面结构分析器""" def __init__(self): self.timeout = 30000 # 30秒超时 async def analyze(self, url: str) -> Dict[str, Any]: """ 分析页面结构 Args: url: 页面 URL Returns: 页面结构信息 """ async with async_playwright() as p: browser = await p.chromium.launch(headless=True) context = await browser.new_context( viewport={'width': 1920, 'height': 1080}, user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36' ) page = await context.new_page() try: # 导航到页面 logger.info(f"Navigating to {url}") await page.goto(url, wait_until='networkidle', timeout=self.timeout) # 等待页面加载完成 await page.wait_for_load_state('domcontentloaded') # 提取页面信息 structure = await self._extract_page_structure(page) return structure except Exception as e: logger.error(f"Error analyzing page {url}: {str(e)}") raise finally: await browser.close() async def _extract_page_structure(self, page: Page) -> Dict[str, Any]: """提取页面结构""" # 执行 JavaScript 提取页面元素 structure = await page.evaluate(""" () => { const result = { title: document.title, url: window.location.href, meta: {}, forms: [], inputs: [], buttons: [], links: [], headings: [], images: [], tables: [], interactiveElements: [] }; // Meta 信息 document.querySelectorAll('meta').forEach(meta => { const name = meta.getAttribute('name') || meta.getAttribute('property'); const content = meta.getAttribute('content'); if (name && content) { result.meta[name] = content; } }); // 表单 document.querySelectorAll('form').forEach((form, index) => { const formData = { id: form.id || `form-${index}`, name: form.name, action: form.action, method: form.method, inputs: [] }; form.querySelectorAll('input, select, textarea').forEach(input => { formData.inputs.push({ type: input.type || input.tagName.toLowerCase(), name: input.name, id: input.id, placeholder: input.placeholder, required: input.required, selector: `#${input.id}` || `[name="${input.name}"]` }); }); result.forms.push(formData); }); // 输入框 document.querySelectorAll('input, textarea, select').forEach(input => { result.inputs.push({ type: input.type || input.tagName.toLowerCase(), name: input.name, id: input.id, placeholder: input.placeholder, required: input.required, selector: input.id ? `#${input.id}` : `[name="${input.name}"]`, label: input.labels?.[0]?.textContent?.trim() }); }); // 按钮 document.querySelectorAll('button, input[type="submit"], input[type="button"]').forEach((btn, index) => { result.buttons.push({ text: btn.textContent?.trim() || btn.value, type: btn.type, id: btn.id || `button-${index}`, selector: btn.id ? `#${btn.id}` : `button:nth-of-type(${index + 1})`, disabled: btn.disabled }); }); // 链接 document.querySelectorAll('a[href]').forEach((link, index) => { result.links.push({ text: link.textContent?.trim(), href: link.href, id: link.id, selector: link.id ? `#${link.id}` : `a:nth-of-type(${index + 1})` }); }); // 标题 ['h1', 'h2', 'h3', 'h4', 'h5', 'h6'].forEach(tag => { document.querySelectorAll(tag).forEach((heading, index) => { result.headings.push({ level: tag, text: heading.textContent?.trim(), id: heading.id, selector: heading.id ? `#${heading.id}` : `${tag}:nth-of-type(${index + 1})` }); }); }); // 图片 document.querySelectorAll('img').forEach((img, index) => { result.images.push({ src: img.src, alt: img.alt, id: img.id, selector: img.id ? `#${img.id}` : `img:nth-of-type(${index + 1})` }); }); // 表格 document.querySelectorAll('table').forEach((table, index) => { result.tables.push({ id: table.id || `table-${index}`, rows: table.rows.length, columns: table.rows[0]?.cells.length || 0, selector: table.id ? `#${table.id}` : `table:nth-of-type(${index + 1})` }); }); // 交互元素 document.querySelectorAll('[onclick], [data-action], .clickable, [role="button"]').forEach((el, index) => { result.interactiveElements.push({ tag: el.tagName.toLowerCase(), text: el.textContent?.trim().substring(0, 50), id: el.id, className: el.className, selector: el.id ? `#${el.id}` : `.${el.className.split(' ')[0]}` }); }); return result; } """) # 添加截图 screenshot = await page.screenshot(full_page=True, type='png') structure['screenshot'] = screenshot.hex() # 添加页面性能指标 performance = await page.evaluate(""" () => { const perfData = performance.getEntriesByType('navigation')[0]; return { loadTime: perfData.loadEventEnd - perfData.fetchStart, domContentLoaded: perfData.domContentLoadedEventEnd - perfData.fetchStart, responseTime: perfData.responseEnd - perfData.requestStart }; } """) structure['performance'] = performance logger.info(f"Extracted {len(structure['forms'])} forms, " f"{len(structure['inputs'])} inputs, " f"{len(structure['buttons'])} buttons, " f"{len(structure['links'])} links") return structure
2.3.4 场景生成器backend/services/ai-generator/src/scenario_generator.py
""" 测试场景生成器 """ import logging from typing import List, Dict, Any from llm_service import LLMService logger = logging.getLogger(__name__) class ScenarioGenerator: """测试场景生成器""" def __init__(self, llm_service: LLMService): self.llm_service = llm_service self.model_name = llm_service.model_name async def generate( self, url: str, page_structure: Dict[str, Any], options: Dict[str, Any] ) -> List[Dict[str, Any]]: """ 生成测试场景 Args: url: 页面 URL page_structure: 页面结构 options: 生成选项 Returns: 测试场景列表 """ logger.info(f"Generating test scenarios for {url}") # 使用 LLM 生成场景 scenarios = await self.llm_service.generate_test_scenarios( url, page_structure, options ) # 后处理场景 processed_scenarios = self._post_process_scenarios(scenarios, page_structure) # 排序(按优先级和置信度) sorted_scenarios = self._sort_scenarios(processed_scenarios) # 限制数量 max_scenarios = options.get("max_scenarios", 10) return sorted_scenarios[:max_scenarios] def _post_process_scenarios( self, scenarios: List[Dict[str, Any]], page_structure: Dict[str, Any] ) -> List[Dict[str, Any]]: """后处理场景""" processed = [] for scenario in scenarios: # 验证选择器 scenario = self._validate_selectors(scenario, page_structure) # 添加等待条件 scenario = self._add_wait_conditions(scenario) # 添加断言 scenario = self._enhance_assertions(scenario) processed.append(scenario) return processed def _validate_selectors( self, scenario: Dict[str, Any], page_structure: Dict[str, Any] ) -> Dict[str, Any]: """验证和优化选择器""" # 提取所有可用的选择器 available_selectors = set() for input_elem in page_structure.get('inputs', []): if input_elem.get('selector'): available_selectors.add(input_elem['selector']) for button in page_structure.get('buttons', []): if button.get('selector'): available_selectors.add(button['selector']) # 验证步骤中的选择器 for step in scenario.get('steps', []): selector = step.get('selector', '') # 如果选择器不在可用列表中,尝试修复 if selector and selector not in available_selectors: # 尝试通过文本匹配找到更好的选择器 if step['action'] in ['click', 'type']: better_selector = self._find_better_selector( step, page_structure ) if better_selector: step['selector'] = better_selector logger.info(f"Updated selector from {selector} to {better_selector}") return scenario def _find_better_selector( self, step: Dict[str, Any], page_structure: Dict[str, Any] ) -> str: """查找更好的选择器""" action = step['action'] description = step.get('description', '').lower() if action == 'type': # 查找匹配的输入框 for input_elem in page_structure.get('inputs', []): label = input_elem.get('label', '').lower() placeholder = input_elem.get('placeholder', '').lower() if label in description or placeholder in description: return input_elem['selector'] elif action == 'click': # 查找匹配的按钮 for button in page_structure.get('buttons', []): text = button.get('text', '').lower() if text in description: return button['selector'] return '' def _add_wait_conditions(self, scenario: Dict[str, Any]) -> Dict[str, Any]: """添加等待条件""" for i, step in enumerate(scenario.get('steps', [])): # 在导航后添加等待 if step['action'] == 'navigate': if i + 1 < len(scenario['steps']): scenario['steps'].insert(i + 1, { 'action': 'wait', 'selector': 'body', 'value': 'load', 'description': 'Wait for page to load' }) # 在点击后添加短暂等待 elif step['action'] == 'click': step['waitAfter'] = 500 # 500ms return scenario def _enhance_assertions(self, scenario: Dict[str, Any]) -> Dict[str, Any]: """增强断言""" steps = scenario.get('steps', []) # 如果最后一步不是断言,添加一个 if steps and steps[-1]['action'] != 'assert': steps.append({ 'action': 'assert', 'selector': 'body', 'value': 'visible', 'description': 'Verify page is loaded successfully' }) return scenario def _sort_scenarios(self, scenarios: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """排序场景""" priority_order = {'critical': 0, 'high': 1, 'medium': 2, 'low': 3} return sorted( scenarios, key=lambda s: ( priority_order.get(s.get('priority', 'medium'), 2), -s.get('confidence', 0.5) ) )

现在继续提供测试执行引擎的实现...

2.4 测试执行引擎(Python + Selenium/Playwright)

2.4.1 执行器主文件backend/services/executor/src/main.py
""" TestMaster 测试执行引擎 支持 Selenium 和 Playwright """ from fastapi import FastAPI, HTTPException, BackgroundTasks from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel from typing import List, Dict, Any, Optional import logging from datetime import datetime import asyncio from selenium_runner import SeleniumRunner from playwright_runner import PlaywrightRunner from appium_runner import AppiumRunner # 配置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) # 创建 FastAPI 应用 app = FastAPI( title="TestMaster Executor", description="Test execution engine service", version="1.0.0" ) # CORS 配置 app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # 请求模型 class TestStep(BaseModel): id: str action: str selector: Optional[str] = None value: Optional[str] = None wait_condition: Optional[Dict[str, Any]] = None screenshot: bool = False class ExecuteRequest(BaseModel): execution_id: str test_case_id: str steps: List[TestStep] environment: str browser: str = "chrome" device: Optional[str] = None headless: bool = True video: bool = False class ExecuteResponse(BaseModel): execution_id: str status: str started_at: datetime completed_at: datetime duration_ms: int step_results: List[Dict[str, Any]] # 全局执行器实例 selenium_runner = SeleniumRunner() playwright_runner = PlaywrightRunner() appium_runner = AppiumRunner() # API 端点 @app.get("/health") async def health_check(): """健康检查""" return { "status": "healthy", "service": "executor", "version": "1.0.0", "runners": { "selenium": selenium_runner.is_available(), "playwright": playwright_runner.is_available(), "appium": appium_runner.is_available() } } @app.post("/execute", response_model=ExecuteResponse) async def execute_test(request: ExecuteRequest, background_tasks: BackgroundTasks): """ 执行测试 Args: request: 执行请求 background_tasks: 后台任务 Returns: 执行结果 """ logger.info(f"Executing test {request.test_case_id} on {request.browser}") started_at = datetime.now() try: # 选择执行器 if request.device: # 移动设备测试 runner = appium_runner elif request.browser in ['chrome', 'firefox', 'edge', 'safari']: # Web 浏览器测试 - 使用 Playwright (更快更稳定) runner = playwright_runner else: # 默认使用 Selenium runner = selenium_runner # 执行测试 step_results = await runner.execute( execution_id=request.execution_id, steps=[step.dict() for step in request.steps], browser=request.browser, device=request.device, headless=request.headless, video=request.video ) completed_at = datetime.now() duration_ms = int((completed_at - started_at).total_seconds() * 1000) # 判断整体状态 status = "passed" for result in step_results: if result['status'] in ['failed', 'error']: status = "failed" break response = ExecuteResponse( execution_id=request.execution_id, status=status, started_at=started_at, completed_at=completed_at, duration_ms=duration_ms, step_results=step_results ) logger.info(f"Test execution completed: {status} in {duration_ms}ms") return response except Exception as e: logger.error(f"Test execution failed: {str(e)}", exc_info=True) completed_at = datetime.now() duration_ms = int((completed_at - started_at).total_seconds() * 1000) return ExecuteResponse( execution_id=request.execution_id, status="error", started_at=started_at, completed_at=completed_at, duration_ms=duration_ms, step_results=[{ "step_id": "error", "status": "error", "error_message": str(e) }] ) @app.get("/browsers") async def get_available_browsers(): """获取可用的浏览器""" return { "browsers": [ {"name": "chrome", "version": "latest", "available": True}, {"name": "firefox", "version": "latest", "available": True}, {"name": "edge", "version": "latest", "available": True}, {"name": "safari", "version": "latest", "available": False} ] } @app.get("/devices") async def get_available_devices(): """获取可用的移动设备""" devices = await appium_runner.get_available_devices() return {"devices": devices} if __name__ == "__main__": import uvicorn print(""" ╔════════════════════════════════════════════════════════════════╗ ║ ║ ║ 🎯 TestMaster Executor Service ║ ║ ║ ║ Server: http://localhost:8002 ║ ║ Docs: http://localhost:8002/docs ║ ║ ║ ║ Runners: ║ ║ - Selenium Grid ║ ║ - Playwright ║ ║ - Appium ║ ║ ║ ╚════════════════════════════════════════════════════════════════╝ """) uvicorn.run( "main:app", host="0.0.0.0", port=8002, reload=True, log_level="info" )

TestMaster 自动化测试平台 - 第三部分:Playwright 执行器实现

2.4.2 Playwright 执行器backend/services/executor/src/playwright_runner.py

""" Playwright 测试执行器 支持 Chromium, Firefox, WebKit """ import logging import asyncio from typing import List, Dict, Any, Optional from datetime import datetime import os import json from pathlib import Path from playwright.async_api import async_playwright, Browser, BrowserContext, Page, Error import aiofiles logger = logging.getLogger(__name__) class PlaywrightRunner: """Playwright 测试执行器""" def __init__(self): self.screenshots_dir = Path("./screenshots") self.videos_dir = Path("./videos") self.logs_dir = Path("./logs") # 创建目录 self.screenshots_dir.mkdir(exist_ok=True) self.videos_dir.mkdir(exist_ok=True) self.logs_dir.mkdir(exist_ok=True) self.default_timeout = 30000 # 30秒 self.action_timeout = 10000 # 10秒 logger.info("PlaywrightRunner initialized") def is_available(self) -> bool: """检查 Playwright 是否可用""" try: import playwright return True except ImportError: return False async def execute( self, execution_id: str, steps: List[Dict[str, Any]], browser: str = "chromium", device: Optional[str] = None, headless: bool = True, video: bool = False ) -> List[Dict[str, Any]]: """ 执行测试步骤 Args: execution_id: 执行ID steps: 测试步骤列表 browser: 浏览器类型 (chromium, firefox, webkit) device: 设备模拟 (可选) headless: 是否无头模式 video: 是否录制视频 Returns: 步骤执行结果列表 """ logger.info(f"Starting Playwright execution {execution_id} on {browser}") step_results = [] playwright = None browser_instance = None context = None page = None try: # 启动 Playwright playwright = await async_playwright().start() # 选择浏览器 if browser == "firefox": browser_instance = await playwright.firefox.launch(headless=headless) elif browser == "webkit": browser_instance = await playwright.webkit.launch(headless=headless) else: # chromium/chrome browser_instance = await playwright.chromium.launch( headless=headless, args=[ '--disable-blink-features=AutomationControlled', '--disable-dev-shm-usage', '--no-sandbox' ] ) # 创建上下文 context_options = { 'viewport': {'width': 1920, 'height': 1080}, 'user_agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36', 'locale': 'zh-CN', 'timezone_id': 'Asia/Shanghai', 'permissions': ['geolocation', 'notifications'], 'ignore_https_errors': True } # 设备模拟 if device: device_descriptor = playwright.devices.get(device) if device_descriptor: context_options.update(device_descriptor) # 视频录制 if video: context_options['record_video_dir'] = str(self.videos_dir / execution_id) context_options['record_video_size'] = {'width': 1920, 'height': 1080} context = await browser_instance.new_context(**context_options) # 设置默认超时 context.set_default_timeout(self.default_timeout) context.set_default_navigation_timeout(self.default_timeout) # 创建页面 page = await context.new_page() # 设置控制台日志监听 page.on("console", lambda msg: logger.info(f"Console: {msg.text}")) page.on("pageerror", lambda err: logger.error(f"Page error: {err}")) # 执行每个步骤 for i, step in enumerate(steps): step_id = step.get('id', f'step-{i}') logger.info(f"Executing step {i+1}/{len(steps)}: {step['action']}") try: result = await self._execute_step( page=page, step=step, execution_id=execution_id, step_index=i ) result['step_id'] = step_id step_results.append(result) # 如果步骤失败,停止执行 if result['status'] in ['failed', 'error']: logger.warning(f"Step {i+1} failed, stopping execution") break except Exception as e: logger.error(f"Step {i+1} error: {str(e)}", exc_info=True) # 截图保存错误状态 screenshot_path = await self._take_screenshot( page, execution_id, f"error-step-{i}" ) step_results.append({ 'step_id': step_id, 'status': 'error', 'error_message': str(e), 'screenshot_url': screenshot_path, 'timestamp': datetime.now().isoformat() }) break # 保存最终截图 if page: await self._take_screenshot(page, execution_id, "final") logger.info(f"Execution {execution_id} completed with {len(step_results)} steps") return step_results except Exception as e: logger.error(f"Execution {execution_id} failed: {str(e)}", exc_info=True) raise finally: # 清理资源 if page: await page.close() if context: await context.close() if browser_instance: await browser_instance.close() if playwright: await playwright.stop() async def _execute_step( self, page: Page, step: Dict[str, Any], execution_id: str, step_index: int ) -> Dict[str, Any]: """ 执行单个测试步骤 Args: page: Playwright 页面对象 step: 步骤配置 execution_id: 执行ID step_index: 步骤索引 Returns: 步骤执行结果 """ action = step['action'] selector = step.get('selector', '') value = step.get('value', '') wait_condition = step.get('wait_condition', {}) screenshot = step.get('screenshot', False) start_time = datetime.now() result = { 'action': action, 'selector': selector, 'status': 'passed', 'timestamp': start_time.isoformat() } try: # 执行前等待 if wait_condition: await self._handle_wait(page, wait_condition) # 执行动作 if action == 'navigate': await page.goto(value, wait_until='networkidle', timeout=self.default_timeout) result['url'] = page.url elif action == 'click': element = page.locator(selector) await element.wait_for(state='visible', timeout=self.action_timeout) await element.click(timeout=self.action_timeout) elif action == 'type' or action == 'fill': element = page.locator(selector) await element.wait_for(state='visible', timeout=self.action_timeout) await element.clear() await element.fill(value, timeout=self.action_timeout) elif action == 'select': element = page.locator(selector) await element.wait_for(state='visible', timeout=self.action_timeout) await element.select_option(value, timeout=self.action_timeout) elif action == 'check' or action == 'uncheck': element = page.locator(selector) await element.wait_for(state='visible', timeout=self.action_timeout) if action == 'check': await element.check(timeout=self.action_timeout) else: await element.uncheck(timeout=self.action_timeout) elif action == 'hover': element = page.locator(selector) await element.wait_for(state='visible', timeout=self.action_timeout) await element.hover(timeout=self.action_timeout) elif action == 'scroll': if selector: element = page.locator(selector) await element.scroll_into_view_if_needed(timeout=self.action_timeout) else: # 滚动到页面底部 await page.evaluate(f'window.scrollTo(0, {value or "document.body.scrollHeight"})') elif action == 'wait': wait_type = wait_condition.get('type', 'time') wait_value = wait_condition.get('value', 1000) if wait_type == 'time': await asyncio.sleep(int(wait_value) / 1000) elif wait_type == 'element': await page.wait_for_selector(selector, timeout=int(wait_value)) elif wait_type == 'url': await page.wait_for_url(value, timeout=int(wait_value)) elif wait_type == 'load': await page.wait_for_load_state('networkidle', timeout=int(wait_value)) elif action == 'assert': await self._handle_assertion(page, selector, value, result) elif action == 'execute_script': script_result = await page.evaluate(value) result['script_result'] = script_result elif action == 'upload': element = page.locator(selector) await element.set_input_files(value, timeout=self.action_timeout) elif action == 'press': await page.keyboard.press(value) elif action == 'switch_frame': frame = page.frame(selector) if frame: # 在 Playwright 中,frame 操作是自动的 result['frame'] = selector else: raise Exception(f"Frame not found: {selector}") elif action == 'switch_window': # 等待新窗口 async with page.expect_popup() as popup_info: page = await popup_info.value result['window'] = page.url elif action == 'accept_alert': page.on("dialog", lambda dialog: dialog.accept()) elif action == 'dismiss_alert': page.on("dialog", lambda dialog: dialog.dismiss()) elif action == 'get_text': element = page.locator(selector) text = await element.text_content(timeout=self.action_timeout) result['text'] = text elif action == 'get_attribute': element = page.locator(selector) attr_value = await element.get_attribute(value, timeout=self.action_timeout) result['attribute_value'] = attr_value else: raise Exception(f"Unknown action: {action}") # 截图 if screenshot or action == 'assert': screenshot_path = await self._take_screenshot( page, execution_id, f"step-{step_index}" ) result['screenshot_url'] = screenshot_path # 收集性能指标 if action == 'navigate': metrics = await self._collect_performance_metrics(page) result['metrics'] = metrics # 计算执行时间 end_time = datetime.now() result['duration_ms'] = int((end_time - start_time).total_seconds() * 1000) logger.info(f"Step {action} completed successfully in {result['duration_ms']}ms") except Error as e: result['status'] = 'failed' result['error_message'] = str(e) logger.error(f"Step {action} failed: {str(e)}") # 错误截图 try: screenshot_path = await self._take_screenshot( page, execution_id, f"error-step-{step_index}" ) result['screenshot_url'] = screenshot_path except: pass except Exception as e: result['status'] = 'error' result['error_message'] = str(e) logger.error(f"Step {action} error: {str(e)}", exc_info=True) return result async def _handle_wait(self, page: Page, wait_condition: Dict[str, Any]): """处理等待条件""" wait_type = wait_condition.get('type', 'time') wait_value = wait_condition.get('value', 1000) timeout = wait_condition.get('timeout', self.action_timeout) if wait_type == 'time': await asyncio.sleep(int(wait_value) / 1000) elif wait_type == 'element': await page.wait_for_selector(str(wait_value), timeout=timeout) elif wait_type == 'url': await page.wait_for_url(str(wait_value), timeout=timeout) elif wait_type == 'load': await page.wait_for_load_state('networkidle', timeout=timeout) elif wait_type == 'custom': # 自定义 JavaScript 条件 await page.wait_for_function(str(wait_value), timeout=timeout) async def _handle_assertion( self, page: Page, selector: str, expected_value: str, result: Dict[str, Any] ): """处理断言""" assertion_type, assertion_value = expected_value.split(':', 1) if ':' in expected_value else ('visible', expected_value) try: if assertion_type == 'visible': element = page.locator(selector) await element.wait_for(state='visible', timeout=self.action_timeout) is_visible = await element.is_visible() if not is_visible: raise AssertionError(f"Element {selector} is not visible") elif assertion_type == 'hidden': element = page.locator(selector) is_hidden = await element.is_hidden() if not is_hidden: raise AssertionError(f"Element {selector} is not hidden") elif assertion_type == 'text': element = page.locator(selector) text = await element.text_content(timeout=self.action_timeout) if assertion_value not in text: raise AssertionError(f"Expected text '{assertion_value}' not found in '{text}'") elif assertion_type == 'value': element = page.locator(selector) actual_value = await element.input_value(timeout=self.action_timeout) if actual_value != assertion_value: raise AssertionError(f"Expected value '{assertion_value}', got '{actual_value}'") elif assertion_type == 'count': elements = page.locator(selector) count = await elements.count() expected_count = int(assertion_value) if count != expected_count: raise AssertionError(f"Expected {expected_count} elements, found {count}") elif assertion_type == 'url': current_url = page.url if assertion_value not in current_url: raise AssertionError(f"Expected URL to contain '{assertion_value}', got '{current_url}'") elif assertion_type == 'title': title = await page.title() if assertion_value not in title: raise AssertionError(f"Expected title to contain '{assertion_value}', got '{title}'") elif assertion_type == 'attribute': attr_name, expected_attr_value = assertion_value.split('=', 1) element = page.locator(selector) actual_attr_value = await element.get_attribute(attr_name, timeout=self.action_timeout) if actual_attr_value != expected_attr_value: raise AssertionError( f"Expected attribute {attr_name}='{expected_attr_value}', " f"got '{actual_attr_value}'" ) result['assertion_passed'] = True except AssertionError as e: result['status'] = 'failed' result['error_message'] = str(e) result['assertion_passed'] = False raise async def _take_screenshot( self, page: Page, execution_id: str, name: str ) -> str: """截图""" try: screenshot_path = self.screenshots_dir / execution_id / f"{name}.png" screenshot_path.parent.mkdir(parents=True, exist_ok=True) await page.screenshot( path=str(screenshot_path), full_page=True, type='png' ) # 返回相对路径 return f"/screenshots/{execution_id}/{name}.png" except Exception as e: logger.error(f"Failed to take screenshot: {str(e)}") return "" async def _collect_performance_metrics(self, page: Page) -> Dict[str, Any]: """收集性能指标""" try: # 使用 Performance API 收集指标 metrics = await page.evaluate(""" () => { const perfData = performance.getEntriesByType('navigation')[0]; const paintData = performance.getEntriesByType('paint'); return { // 导航时间 dns_lookup: perfData.domainLookupEnd - perfData.domainLookupStart, tcp_connection: perfData.connectEnd - perfData.connectStart, request_time: perfData.responseStart - perfData.requestStart, response_time: perfData.responseEnd - perfData.responseStart, dom_loading: perfData.domInteractive - perfData.domLoading, dom_interactive: perfData.domInteractive - perfData.fetchStart, dom_complete: perfData.domComplete - perfData.fetchStart, load_event: perfData.loadEventEnd - perfData.loadEventStart, // 总时间 total_load_time: perfData.loadEventEnd - perfData.fetchStart, // 绘制时间 first_paint: paintData.find(p => p.name === 'first-paint')?.startTime || 0, first_contentful_paint: paintData.find(p => p.name === 'first-contentful-paint')?.startTime || 0, // 资源 transfer_size: perfData.transferSize, encoded_body_size: perfData.encodedBodySize, decoded_body_size: perfData.decodedBodySize }; } """) # 获取内存使用(如果可用) try: memory = await page.evaluate("() => performance.memory") metrics['memory'] = { 'used_js_heap_size': memory.get('usedJSHeapSize', 0), 'total_js_heap_size': memory.get('totalJSHeapSize', 0), 'js_heap_size_limit': memory.get('jsHeapSizeLimit', 0) } except: pass return metrics except Exception as e: logger.error(f"Failed to collect performance metrics: {str(e)}") return {} async def get_available_devices(self) -> List[Dict[str, str]]: """获取可用的设备模拟列表""" async with async_playwright() as p: devices = [] # Playwright 内置设备 for device_name in [ 'iPhone 12', 'iPhone 12 Pro', 'iPhone 13', 'iPhone 13 Pro', 'iPhone 14', 'iPhone 14 Pro', 'Pixel 5', 'Pixel 6', 'Galaxy S9+', 'Galaxy S21', 'iPad Pro', 'iPad Mini', 'Desktop Chrome', 'Desktop Firefox', 'Desktop Safari' ]: if device_name in p.devices: device = p.devices[device_name] devices.append({ 'name': device_name, 'viewport': f"{device['viewport']['width']}x{device['viewport']['height']}", 'user_agent': device.get('user_agent', '')[:50] + '...' }) return devices

2.4.3 Selenium 执行器backend/services/executor/src/selenium_runner.py

""" Selenium WebDriver 测试执行器 支持 Chrome, Firefox, Edge, Safari """ import logging from typing import List, Dict, Any, Optional from datetime import datetime import os from pathlib import Path import time from selenium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from selenium.webdriver.common.action_chains import ActionChains from selenium.webdriver.common.keys import Keys from selenium.webdriver.support.select import Select from selenium.common.exceptions import ( TimeoutException, NoSuchElementException, WebDriverException ) logger = logging.getLogger(__name__) class SeleniumRunner: """Selenium 测试执行器""" def __init__(self): self.screenshots_dir = Path("./screenshots") self.logs_dir = Path("./logs") # 创建目录 self.screenshots_dir.mkdir(exist_ok=True) self.logs_dir.mkdir(exist_ok=True) self.default_timeout = 30 self.action_timeout = 10 logger.info("SeleniumRunner initialized") def is_available(self) -> bool: """检查 Selenium 是否可用""" try: import selenium return True except ImportError: return False async def execute( self, execution_id: str, steps: List[Dict[str, Any]], browser: str = "chrome", device: Optional[str] = None, headless: bool = True, video: bool = False ) -> List[Dict[str, Any]]: """ 执行测试步骤 Args: execution_id: 执行ID steps: 测试步骤列表 browser: 浏览器类型 device: 设备模拟 headless: 是否无头模式 video: 是否录制视频 Returns: 步骤执行结果列表 """ logger.info(f"Starting Selenium execution {execution_id} on {browser}") step_results = [] driver = None try: # 创建 WebDriver driver = self._create_driver(browser, headless, device) # 设置隐式等待 driver.implicitly_wait(self.action_timeout) # 执行每个步骤 for i, step in enumerate(steps): step_id = step.get('id', f'step-{i}') logger.info(f"Executing step {i+1}/{len(steps)}: {step['action']}") try: result = self._execute_step( driver=driver, step=step, execution_id=execution_id, step_index=i ) result['step_id'] = step_id step_results.append(result) # 如果步骤失败,停止执行 if result['status'] in ['failed', 'error']: logger.warning(f"Step {i+1} failed, stopping execution") break except Exception as e: logger.error(f"Step {i+1} error: {str(e)}", exc_info=True) # 截图保存错误状态 screenshot_path = self._take_screenshot( driver, execution_id, f"error-step-{i}" ) step_results.append({ 'step_id': step_id, 'status': 'error', 'error_message': str(e), 'screenshot_url': screenshot_path, 'timestamp': datetime.now().isoformat() }) break # 保存最终截图 if driver: self._take_screenshot(driver, execution_id, "final") logger.info(f"Execution {execution_id} completed with {len(step_results)} steps") return step_results except Exception as e: logger.error(f"Execution {execution_id} failed: {str(e)}", exc_info=True) raise finally: # 清理资源 if driver: driver.quit() def _create_driver( self, browser: str, headless: bool, device: Optional[str] ) -> webdriver.Remote: """创建 WebDriver 实例""" if browser == "chrome": options = webdriver.ChromeOptions() if headless: options.add_argument('--headless=new') options.add_argument('--no-sandbox') options.add_argument('--disable-dev-shm-usage') options.add_argument('--disable-blink-features=AutomationControlled') options.add_experimental_option("excludeSwitches", ["enable-automation"]) options.add_experimental_option('useAutomationExtension', False) # 设备模拟 if device: mobile_emulation = self._get_mobile_emulation(device) if mobile_emulation: options.add_experimental_option("mobileEmulation", mobile_emulation) driver = webdriver.Chrome(options=options) elif browser == "firefox": options = webdriver.FirefoxOptions() if headless: options.add_argument('--headless') driver = webdriver.Firefox(options=options) elif browser == "edge": options = webdriver.EdgeOptions() if headless: options.add_argument('--headless=new') driver = webdriver.Edge(options=options) else: raise ValueError(f"Unsupported browser: {browser}") # 设置窗口大小 if not device: driver.set_window_size(1920, 1080) return driver def _get_mobile_emulation(self, device: str) -> Optional[Dict[str, Any]]: """获取移动设备模拟配置""" devices = { 'iPhone 12': { 'deviceMetrics': {'width': 390, 'height': 844, 'pixelRatio': 3.0}, 'userAgent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 14_0 like Mac OS X)' }, 'iPhone 13': { 'deviceMetrics': {'width': 390, 'height': 844, 'pixelRatio': 3.0}, 'userAgent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 15_0 like Mac OS X)' }, 'Pixel 5': { 'deviceMetrics': {'width': 393, 'height': 851, 'pixelRatio': 2.75}, 'userAgent': 'Mozilla/5.0 (Linux; Android 11; Pixel 5)' }, 'iPad Pro': { 'deviceMetrics': {'width': 1024, 'height': 1366, 'pixelRatio': 2.0}, 'userAgent': 'Mozilla/5.0 (iPad; CPU OS 14_0 like Mac OS X)' } } return devices.get(device) def _execute_step( self, driver: webdriver.Remote, step: Dict[str, Any], execution_id: str, step_index: int ) -> Dict[str, Any]: """执行单个测试步骤""" action = step['action'] selector = step.get('selector', '') value = step.get('value', '') wait_condition = step.get('wait_condition', {}) screenshot = step.get('screenshot', False) start_time = datetime.now() result = { 'action': action, 'selector': selector, 'status': 'passed', 'timestamp': start_time.isoformat() } try: # 执行前等待 if wait_condition: self._handle_wait(driver, wait_condition) # 执行动作 if action == 'navigate': driver.get(value) result['url'] = driver.current_url elif action == 'click': element = self._find_element(driver, selector) element.click() elif action == 'type' or action == 'fill': element = self._find_element(driver, selector) element.clear() element.send_keys(value) elif action == 'select': element = self._find_element(driver, selector) select = Select(element) select.select_by_visible_text(value) elif action == 'check': element = self._find_element(driver, selector) if not element.is_selected(): element.click() elif action == 'uncheck': element = self._find_element(driver, selector) if element.is_selected(): element.click() elif action == 'hover': element = self._find_element(driver, selector) ActionChains(driver).move_to_element(element).perform() elif action == 'scroll': if selector: element = self._find_element(driver, selector) driver.execute_script("arguments[0].scrollIntoView(true);", element) else: driver.execute_script(f"window.scrollTo(0, {value or 'document.body.scrollHeight'});") elif action == 'wait': wait_type = wait_condition.get('type', 'time') wait_value = wait_condition.get('value', 1000) if wait_type == 'time': time.sleep(int(wait_value) / 1000) elif wait_type == 'element': WebDriverWait(driver, int(wait_value) / 1000).until( EC.presence_of_element_located(self._parse_selector(selector)) ) elif action == 'assert': self._handle_assertion(driver, selector, value, result) elif action == 'execute_script': script_result = driver.execute_script(value) result['script_result'] = script_result elif action == 'upload': element = self._find_element(driver, selector) element.send_keys(value) elif action == 'press': ActionChains(driver).send_keys(getattr(Keys, value.upper())).perform() elif action == 'switch_frame': driver.switch_to.frame(selector) elif action == 'switch_window': driver.switch_to.window(driver.window_handles[-1]) elif action == 'accept_alert': driver.switch_to.alert.accept() elif action == 'dismiss_alert': driver.switch_to.alert.dismiss() elif action == 'get_text': element = self._find_element(driver, selector) result['text'] = element.text elif action == 'get_attribute': element = self._find_element(driver, selector) result['attribute_value'] = element.get_attribute(value) else: raise Exception(f"Unknown action: {action}") # 截图 if screenshot or action == 'assert': screenshot_path = self._take_screenshot( driver, execution_id, f"step-{step_index}" ) result['screenshot_url'] = screenshot_path # 计算执行时间 end_time = datetime.now() result['duration_ms'] = int((end_time - start_time).total_seconds() * 1000) logger.info(f"Step {action} completed successfully in {result['duration_ms']}ms") except (TimeoutException, NoSuchElementException) as e: result['status'] = 'failed' result['error_message'] = str(e) logger.error(f"Step {action} failed: {str(e)}") # 错误截图 try: screenshot_path = self._take_screenshot( driver, execution_id, f"error-step-{step_index}" ) result['screenshot_url'] = screenshot_path except: pass except Exception as e: result['status'] = 'error' result['error_message'] = str(e) logger.error(f"Step {action} error: {str(e)}", exc_info=True) return result def _find_element(self, driver: webdriver.Remote, selector: str): """查找元素""" by, value = self._parse_selector(selector) wait = WebDriverWait(driver, self.action_timeout) element = wait.until(EC.presence_of_element_located((by, value))) return element def _parse_selector(self, selector: str) -> tuple: """解析选择器""" if selector.startswith('//'): return (By.XPATH, selector) elif selector.startswith('#'): return (By.ID, selector[1:]) elif selector.startswith('.'): return (By.CLASS_NAME, selector[1:]) elif selector.startswith('['): return (By.CSS_SELECTOR, selector) else: return (By.CSS_SELECTOR, selector) def _handle_wait(self, driver: webdriver.Remote, wait_condition: Dict[str, Any]): """处理等待条件""" wait_type = wait_condition.get('type', 'time') wait_value = wait_condition.get('value', 1000) timeout = wait_condition.get('timeout', self.action_timeout) if wait_type == 'time': time.sleep(int(wait_value) / 1000) elif wait_type == 'element': by, value = self._parse_selector(str(wait_value)) WebDriverWait(driver, timeout).until( EC.presence_of_element_located((by, value)) ) elif wait_type == 'url': WebDriverWait(driver, timeout).until( EC.url_contains(str(wait_value)) ) def _handle_assertion( self, driver: webdriver.Remote, selector: str, expected_value: str, result: Dict[str, Any] ): """处理断言""" assertion_type, assertion_value = expected_value.split(':', 1) if ':' in expected_value else ('visible', expected_value) try: if assertion_type == 'visible': element = self._find_element(driver, selector) if not element.is_displayed(): raise AssertionError(f"Element {selector} is not visible") elif assertion_type == 'text': element = self._find_element(driver, selector) if assertion_value not in element.text: raise AssertionError( f"Expected text '{assertion_value}' not found in '{element.text}'" ) elif assertion_type == 'value': element = self._find_element(driver, selector) if element.get_attribute('value') != assertion_value: raise AssertionError( f"Expected value '{assertion_value}', " f"got '{element.get_attribute('value')}'" ) elif assertion_type == 'url': if assertion_value not in driver.current_url: raise AssertionError( f"Expected URL to contain '{assertion_value}', " f"got '{driver.current_url}'" ) elif assertion_type == 'title': if assertion_value not in driver.title: raise AssertionError( f"Expected title to contain '{assertion_value}', " f"got '{driver.title}'" ) result['assertion_passed'] = True except AssertionError as e: result['status'] = 'failed' result['error_message'] = str(e) result['assertion_passed'] = False raise def _take_screenshot( self, driver: webdriver.Remote, execution_id: str, name: str ) -> str: """截图""" try: screenshot_path = self.screenshots_dir / execution_id / f"{name}.png" screenshot_path.parent.mkdir(parents=True, exist_ok=True) driver.save_screenshot(str(screenshot_path)) # 返回相对路径 return f"/screenshots/{execution_id}/{name}.png" except Exception as e: logger.error(f"Failed to take screenshot: {str(e)}") return ""

TestMaster 自动化测试平台 - 第四部分:Appium 移动端执行器实现

2.4.4 Appium 移动端执行器backend/services/executor/src/appium_runner.py

""" Appium 移动端测试执行器 支持 iOS 和 Android 原生应用、混合应用和移动 Web 测试 """ import logging from typing import List, Dict, Any, Optional from datetime import datetime import os from pathlib import Path import time import asyncio import subprocess import json from appium import webdriver from appium.webdriver.common.appiumby import AppiumBy from appium.options.android import UiAutomator2Options from appium.options.ios import XCUITestOptions from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from selenium.common.exceptions import ( TimeoutException, NoSuchElementException, WebDriverException ) logger = logging.getLogger(__name__) class AppiumRunner: """Appium 移动端测试执行器""" def __init__(self): self.screenshots_dir = Path("./screenshots") self.videos_dir = Path("./videos") self.logs_dir = Path("./logs") # 创建目录 self.screenshots_dir.mkdir(exist_ok=True) self.videos_dir.mkdir(exist_ok=True) self.logs_dir.mkdir(exist_ok=True) self.default_timeout = 30 self.action_timeout = 10 # Appium 服务器配置 self.appium_server_url = os.getenv('APPIUM_SERVER_URL', 'http://localhost:4723') logger.info("AppiumRunner initialized") def is_available(self) -> bool: """检查 Appium 是否可用""" try: import appium # 检查 Appium 服务器是否运行 import requests response = requests.get(f"{self.appium_server_url}/status", timeout=5) return response.status_code == 200 except: return False async def execute( self, execution_id: str, steps: List[Dict[str, Any]], browser: str = "chrome", device: Optional[str] = None, headless: bool = True, video: bool = False ) -> List[Dict[str, Any]]: """ 执行移动端测试步骤 Args: execution_id: 执行ID steps: 测试步骤列表 browser: 浏览器类型(移动端忽略) device: 设备配置(必需) headless: 是否无头模式(移动端忽略) video: 是否录制视频 Returns: 步骤执行结果列表 """ if not device: raise ValueError("Device configuration is required for mobile testing") logger.info(f"Starting Appium execution {execution_id} on device: {device}") step_results = [] driver = None try: # 解析设备配置 device_config = self._parse_device_config(device) # 创建 Appium Driver driver = await self._create_driver(device_config, video) # 设置隐式等待 driver.implicitly_wait(self.action_timeout) # 执行每个步骤 for i, step in enumerate(steps): step_id = step.get('id', f'step-{i}') logger.info(f"Executing step {i+1}/{len(steps)}: {step['action']}") try: result = await self._execute_step( driver=driver, step=step, execution_id=execution_id, step_index=i, device_config=device_config ) result['step_id'] = step_id step_results.append(result) # 如果步骤失败,停止执行 if result['status'] in ['failed', 'error']: logger.warning(f"Step {i+1} failed, stopping execution") break except Exception as e: logger.error(f"Step {i+1} error: {str(e)}", exc_info=True) # 截图保存错误状态 screenshot_path = self._take_screenshot( driver, execution_id, f"error-step-{i}" ) step_results.append({ 'step_id': step_id, 'status': 'error', 'error_message': str(e), 'screenshot_url': screenshot_path, 'timestamp': datetime.now().isoformat() }) break # 保存最终截图 if driver: self._take_screenshot(driver, execution_id, "final") logger.info(f"Execution {execution_id} completed with {len(step_results)} steps") return step_results except Exception as e: logger.error(f"Execution {execution_id} failed: {str(e)}", exc_info=True) raise finally: # 清理资源 if driver: try: driver.quit() except: pass def _parse_device_config(self, device: str) -> Dict[str, Any]: """ 解析设备配置 支持格式: - "android:emulator-5554" - Android 模拟器 - "android:real:device-id" - Android 真机 - "ios:simulator:iPhone 14" - iOS 模拟器 - "ios:real:udid" - iOS 真机 - JSON 字符串配置 """ # 尝试解析 JSON try: config = json.loads(device) return config except: pass # 解析简单格式 parts = device.split(':') if len(parts) < 2: raise ValueError(f"Invalid device format: {device}") platform = parts[0].lower() device_type = parts[1].lower() config = { 'platform': platform, 'deviceType': device_type } if platform == 'android': config['platformName'] = 'Android' if device_type == 'emulator': config['deviceName'] = parts[2] if len(parts) > 2 else 'emulator-5554' config['avd'] = parts[3] if len(parts) > 3 else None else: config['deviceName'] = parts[2] if len(parts) > 2 else 'Android Device' config['udid'] = parts[2] if len(parts) > 2 else None elif platform == 'ios': config['platformName'] = 'iOS' if device_type == 'simulator': config['deviceName'] = parts[2] if len(parts) > 2 else 'iPhone 14' config['platformVersion'] = parts[3] if len(parts) > 3 else '16.0' else: config['deviceName'] = 'iPhone' config['udid'] = parts[2] if len(parts) > 2 else None return config async def _create_driver( self, device_config: Dict[str, Any], video: bool ) -> webdriver.Remote: """创建 Appium Driver 实例""" platform = device_config['platform'] if platform == 'android': return await self._create_android_driver(device_config, video) elif platform == 'ios': return await self._create_ios_driver(device_config, video) else: raise ValueError(f"Unsupported platform: {platform}") async def _create_android_driver( self, device_config: Dict[str, Any], video: bool ) -> webdriver.Remote: """创建 Android Driver""" options = UiAutomator2Options() # 基础配置 options.platform_name = 'Android' options.automation_name = 'UiAutomator2' options.device_name = device_config.get('deviceName', 'Android Device') # 设备 ID if device_config.get('udid'): options.udid = device_config['udid'] # AVD(模拟器) if device_config.get('avd'): options.avd = device_config['avd'] options.avd_launch_timeout = 120000 # 应用配置 if device_config.get('app'): options.app = device_config['app'] elif device_config.get('appPackage') and device_config.get('appActivity'): options.app_package = device_config['appPackage'] options.app_activity = device_config['appActivity'] # 浏览器测试 if device_config.get('browserName'): options.browser_name = device_config['browserName'] # 性能配置 options.new_command_timeout = 300 options.no_reset = device_config.get('noReset', False) options.full_reset = device_config.get('fullReset', False) options.auto_grant_permissions = True # 视频录制 if video: options.video_quality = 'medium' options.video_fps = 30 # 其他配置 options.unicode_keyboard = True options.reset_keyboard = True options.disable_window_animation = True logger.info(f"Creating Android driver with options: {options.to_capabilities()}") # 创建 driver driver = webdriver.Remote( command_executor=self.appium_server_url, options=options ) return driver async def _create_ios_driver( self, device_config: Dict[str, Any], video: bool ) -> webdriver.Remote: """创建 iOS Driver""" options = XCUITestOptions() # 基础配置 options.platform_name = 'iOS' options.automation_name = 'XCUITest' options.device_name = device_config.get('deviceName', 'iPhone') options.platform_version = device_config.get('platformVersion', '16.0') # 设备 UDID if device_config.get('udid'): options.udid = device_config['udid'] # 应用配置 if device_config.get('app'): options.app = device_config['app'] elif device_config.get('bundleId'): options.bundle_id = device_config['bundleId'] # 浏览器测试 if device_config.get('browserName'): options.browser_name = device_config['browserName'] # 性能配置 options.new_command_timeout = 300 options.no_reset = device_config.get('noReset', False) options.full_reset = device_config.get('fullReset', False) # WebDriverAgent 配置 options.wda_launch_timeout = 120000 options.wda_connection_timeout = 60000 # 视频录制 if video: options.video_quality = 'medium' options.video_fps = 30 logger.info(f"Creating iOS driver with options: {options.to_capabilities()}") # 创建 driver driver = webdriver.Remote( command_executor=self.appium_server_url, options=options ) return driver async def _execute_step( self, driver: webdriver.Remote, step: Dict[str, Any], execution_id: str, step_index: int, device_config: Dict[str, Any] ) -> Dict[str, Any]: """执行单个测试步骤""" action = step['action'] selector = step.get('selector', '') value = step.get('value', '') wait_condition = step.get('wait_condition', {}) screenshot = step.get('screenshot', False) start_time = datetime.now() result = { 'action': action, 'selector': selector, 'status': 'passed', 'timestamp': start_time.isoformat() } try: # 执行前等待 if wait_condition: self._handle_wait(driver, wait_condition) # 执行动作 if action == 'navigate': # 移动端导航(打开 URL 或启动应用) if value.startswith('http'): driver.get(value) else: # 启动应用 if device_config['platform'] == 'android': driver.start_activity( app_package=value.split('/')[0], app_activity=value.split('/')[1] if '/' in value else '.MainActivity' ) else: driver.activate_app(value) elif action == 'click' or action == 'tap': element = self._find_element(driver, selector, device_config['platform']) element.click() elif action == 'type' or action == 'fill': element = self._find_element(driver, selector, device_config['platform']) element.clear() element.send_keys(value) elif action == 'swipe': # 滑动操作 # value 格式: "direction" 或 "x1,y1,x2,y2" if value in ['up', 'down', 'left', 'right']: self._swipe_direction(driver, value) else: coords = [int(c) for c in value.split(',')] driver.swipe(coords[0], coords[1], coords[2], coords[3], 500) elif action == 'scroll': # 滚动到元素 if selector: element = self._find_element(driver, selector, device_config['platform']) driver.execute_script('mobile: scroll', {'element': element, 'toVisible': True}) else: # 滚动方向 self._swipe_direction(driver, value or 'down') elif action == 'long_press': element = self._find_element(driver, selector, device_config['platform']) from appium.webdriver.common.touch_action import TouchAction TouchAction(driver).long_press(element).perform() elif action == 'double_tap': element = self._find_element(driver, selector, device_config['platform']) from appium.webdriver.common.multi_action import MultiAction from appium.webdriver.common.touch_action import TouchAction action1 = TouchAction(driver).tap(element) action2 = TouchAction(driver).tap(element) MultiAction(driver).add(action1, action2).perform() elif action == 'pinch' or action == 'zoom': # 缩放手势 element = self._find_element(driver, selector, device_config['platform']) if action == 'pinch': driver.pinch(element=element) else: driver.zoom(element=element) elif action == 'hide_keyboard': if driver.is_keyboard_shown(): driver.hide_keyboard() elif action == 'rotate': # 旋转设备 orientation = value.upper() # PORTRAIT, LANDSCAPE driver.orientation = orientation elif action == 'shake': # 摇晃设备(仅 iOS) if device_config['platform'] == 'ios': driver.shake() elif action == 'background_app': # 将应用置于后台 seconds = int(value) if value else 3 driver.background_app(seconds) elif action == 'install_app': driver.install_app(value) elif action == 'remove_app': driver.remove_app(value) elif action == 'launch_app': driver.launch_app() elif action == 'close_app': driver.close_app() elif action == 'reset_app': driver.reset() elif action == 'wait': wait_type = wait_condition.get('type', 'time') wait_value = wait_condition.get('value', 1000) if wait_type == 'time': await asyncio.sleep(int(wait_value) / 1000) elif wait_type == 'element': WebDriverWait(driver, int(wait_value) / 1000).until( EC.presence_of_element_located( self._parse_selector(selector, device_config['platform']) ) ) elif action == 'assert': self._handle_assertion(driver, selector, value, result, device_config['platform']) elif action == 'execute_script': script_result = driver.execute_script(value) result['script_result'] = script_result elif action == 'get_text': element = self._find_element(driver, selector, device_config['platform']) result['text'] = element.text elif action == 'get_attribute': element = self._find_element(driver, selector, device_config['platform']) result['attribute_value'] = element.get_attribute(value) elif action == 'switch_context': # 切换上下文(Native/WebView) if value: driver.switch_to.context(value) else: # 自动切换到 WebView contexts = driver.contexts for context in contexts: if 'WEBVIEW' in context: driver.switch_to.context(context) break result['context'] = driver.current_context elif action == 'get_contexts': result['contexts'] = driver.contexts elif action == 'set_network': # 设置网络状态(仅 Android) if device_config['platform'] == 'android': # value: wifi, data, airplane, none driver.set_network_connection(self._get_network_type(value)) elif action == 'set_location': # 设置地理位置 # value 格式: "latitude,longitude" lat, lon = value.split(',') driver.set_location(float(lat), float(lon), 0) else: raise Exception(f"Unknown action: {action}") # 截图 if screenshot or action == 'assert': screenshot_path = self._take_screenshot( driver, execution_id, f"step-{step_index}" ) result['screenshot_url'] = screenshot_path # 计算执行时间 end_time = datetime.now() result['duration_ms'] = int((end_time - start_time).total_seconds() * 1000) logger.info(f"Step {action} completed successfully in {result['duration_ms']}ms") except (TimeoutException, NoSuchElementException) as e: result['status'] = 'failed' result['error_message'] = str(e) logger.error(f"Step {action} failed: {str(e)}") # 错误截图 try: screenshot_path = self._take_screenshot( driver, execution_id, f"error-step-{step_index}" ) result['screenshot_url'] = screenshot_path except: pass except Exception as e: result['status'] = 'error' result['error_message'] = str(e) logger.error(f"Step {action} error: {str(e)}", exc_info=True) return result def _find_element( self, driver: webdriver.Remote, selector: str, platform: str ): """查找元素""" by, value = self._parse_selector(selector, platform) wait = WebDriverWait(driver, self.action_timeout) element = wait.until(EC.presence_of_element_located((by, value))) return element def _parse_selector(self, selector: str, platform: str) -> tuple: """ 解析选择器 支持的选择器类型: - id:element_id - 通过 ID - xpath://path - 通过 XPath - class:class_name - 通过 Class Name - text:visible_text - 通过文本 - accessibility:accessibility_id - 通过 Accessibility ID - android:uiautomator - Android UiAutomator (仅 Android) - ios:predicate - iOS Predicate (仅 iOS) - ios:chain - iOS Class Chain (仅 iOS) """ if ':' in selector: selector_type, selector_value = selector.split(':', 1) if selector_type == 'id': return (AppiumBy.ID, selector_value) elif selector_type == 'xpath': return (AppiumBy.XPATH, selector_value) elif selector_type == 'class': return (AppiumBy.CLASS_NAME, selector_value) elif selector_type == 'text': if platform == 'android': return (AppiumBy.ANDROID_UIAUTOMATOR, f'new UiSelector().text("{selector_value}")') else: return (AppiumBy.IOS_PREDICATE, f'label == "{selector_value}"') elif selector_type == 'accessibility': return (AppiumBy.ACCESSIBILITY_ID, selector_value) elif selector_type == 'android' and platform == 'android': return (AppiumBy.ANDROID_UIAUTOMATOR, selector_value) elif selector_type == 'ios' and platform == 'ios': return (AppiumBy.IOS_PREDICATE, selector_value) elif selector_type == 'chain' and platform == 'ios': return (AppiumBy.IOS_CLASS_CHAIN, selector_value) # 默认使用 XPath if selector.startswith('//'): return (AppiumBy.XPATH, selector) # 默认使用 ID return (AppiumBy.ID, selector) def _swipe_direction(self, driver: webdriver.Remote, direction: str): """按方向滑动""" size = driver.get_window_size() width = size['width'] height = size['height'] # 计算滑动坐标(从屏幕中心开始) start_x = width // 2 start_y = height // 2 if direction == 'up': end_x = start_x end_y = height // 4 elif direction == 'down': end_x = start_x end_y = height * 3 // 4 elif direction == 'left': end_x = width // 4 end_y = start_y elif direction == 'right': end_x = width * 3 // 4 end_y = start_y else: raise ValueError(f"Invalid swipe direction: {direction}") driver.swipe(start_x, start_y, end_x, end_y, 500) def _get_network_type(self, network: str) -> int: """获取网络类型常量""" network_types = { 'none': 0, 'airplane': 1, 'wifi': 2, 'data': 4, 'all': 6 } return network_types.get(network.lower(), 6) def _handle_wait(self, driver: webdriver.Remote, wait_condition: Dict[str, Any]): """处理等待条件""" wait_type = wait_condition.get('type', 'time') wait_value = wait_condition.get('value', 1000) timeout = wait_condition.get('timeout', self.action_timeout) if wait_type == 'time': time.sleep(int(wait_value) / 1000) elif wait_type == 'element': platform = driver.capabilities.get('platformName', '').lower() by, value = self._parse_selector(str(wait_value), platform) WebDriverWait(driver, timeout).until( EC.presence_of_element_located((by, value)) ) def _handle_assertion( self, driver: webdriver.Remote, selector: str, expected_value: str, result: Dict[str, Any], platform: str ): """处理断言""" assertion_type, assertion_value = expected_value.split(':', 1) if ':' in expected_value else ('visible', expected_value) try: if assertion_type == 'visible': element = self._find_element(driver, selector, platform) if not element.is_displayed(): raise AssertionError(f"Element {selector} is not visible") elif assertion_type == 'text': element = self._find_element(driver, selector, platform) if assertion_value not in element.text: raise AssertionError( f"Expected text '{assertion_value}' not found in '{element.text}'" ) elif assertion_type == 'enabled': element = self._find_element(driver, selector, platform) if not element.is_enabled(): raise AssertionError(f"Element {selector} is not enabled") elif assertion_type == 'selected': element = self._find_element(driver, selector, platform) if not element.is_selected(): raise AssertionError(f"Element {selector} is not selected") elif assertion_type == 'count': by, value = self._parse_selector(selector, platform) elements = driver.find_elements(by, value) expected_count = int(assertion_value) if len(elements) != expected_count: raise AssertionError( f"Expected {expected_count} elements, found {len(elements)}" ) result['assertion_passed'] = True except AssertionError as e: result['status'] = 'failed' result['error_message'] = str(e) result['assertion_passed'] = False raise def _take_screenshot( self, driver: webdriver.Remote, execution_id: str, name: str ) -> str: """截图""" try: screenshot_path = self.screenshots_dir / execution_id / f"{name}.png" screenshot_path.parent.mkdir(parents=True, exist_ok=True) driver.save_screenshot(str(screenshot_path)) # 返回相对路径 return f"/screenshots/{execution_id}/{name}.png" except Exception as e: logger.error(f"Failed to take screenshot: {str(e)}") return "" async def get_available_devices(self) -> List[Dict[str, Any]]: """获取可用的移动设备列表""" devices = [] # 获取 Android 设备 try: android_devices = await self._get_android_devices() devices.extend(android_devices) except Exception as e: logger.error(f"Failed to get Android devices: {str(e)}") # 获取 iOS 设备 try: ios_devices = await self._get_ios_devices() devices.extend(ios_devices) except Exception as e: logger.error(f"Failed to get iOS devices: {str(e)}") return devices async def _get_android_devices(self) -> List[Dict[str, Any]]: """获取 Android 设备列表""" devices = [] try: # 使用 adb 获取设备列表 result = subprocess.run( ['adb', 'devices', '-l'], capture_output=True, text=True, timeout=10 ) lines = result.stdout.strip().split('\n')[1:] # 跳过标题行 for line in lines: if line.strip(): parts = line.split() device_id = parts[0] # 获取设备详细信息 device_info = await self._get_android_device_info(device_id) devices.append({ 'id': device_id, 'name': device_info.get('model', 'Unknown Android Device'), 'platform': 'android', 'type': 'emulator' if 'emulator' in device_id else 'real', 'os_version': device_info.get('version', 'Unknown'), 'manufacturer': device_info.get('manufacturer', 'Unknown'), 'config': f"android:{'emulator' if 'emulator' in device_id else 'real'}:{device_id}" }) except FileNotFoundError: logger.warning("adb not found, skipping Android devices") except Exception as e: logger.error(f"Error getting Android devices: {str(e)}") return devices async def _get_android_device_info(self, device_id: str) -> Dict[str, str]: """获取 Android 设备详细信息""" info = {} try: # 获取设备型号 result = subprocess.run( ['adb', '-s', device_id, 'shell', 'getprop', 'ro.product.model'], capture_output=True, text=True, timeout=5 ) info['model'] = result.stdout.strip() # 获取 Android 版本 result = subprocess.run( ['adb', '-s', device_id, 'shell', 'getprop', 'ro.build.version.release'], capture_output=True, text=True, timeout=5 ) info['version'] = result.stdout.strip() # 获取制造商 result = subprocess.run( ['adb', '-s', device_id, 'shell', 'getprop', 'ro.product.manufacturer'], capture_output=True, text=True, timeout=5 ) info['manufacturer'] = result.stdout.strip() except Exception as e: logger.error(f"Error getting device info for {device_id}: {str(e)}") return info async def _get_ios_devices(self) -> List[Dict[str, Any]]: """获取 iOS 设备列表""" devices = [] try: # 使用 xcrun simctl 获取模拟器列表 result = subprocess.run( ['xcrun', 'simctl', 'list', 'devices', 'available', '--json'], capture_output=True, text=True, timeout=10 ) data = json.loads(result.stdout) for runtime, device_list in data.get('devices', {}).items(): for device in device_list: if device.get('isAvailable', False): devices.append({ 'id': device['udid'], 'name': device['name'], 'platform': 'ios', 'type': 'simulator', 'os_version': runtime.split('.')[-1], 'state': device.get('state', 'Unknown'), 'config': f"ios:simulator:{device['name']}" }) # 获取真机列表 result = subprocess.run( ['idevice_id', '-l'], capture_output=True, text=True, timeout=10 ) for udid in result.stdout.strip().split('\n'): if udid: # 获取设备名称 name_result = subprocess.run( ['ideviceinfo', '-u', udid, '-k', 'DeviceName'], capture_output=True, text=True, timeout=5 ) devices.append({ 'id': udid, 'name': name_result.stdout.strip() or 'iPhone', 'platform': 'ios', 'type': 'real', 'os_version': 'Unknown', 'config': f"ios:real:{udid}" }) except FileNotFoundError: logger.warning("iOS tools not found, skipping iOS devices") except Exception as e: logger.error(f"Error getting iOS devices: {str(e)}") return devices

2.4.5 移动端测试辅助工具backend/services/executor/src/mobile_utils.py

""" 移动端测试辅助工具 提供常用的移动端测试功能 """ import logging from typing import Dict, Any, Optional, List from appium import webdriver from appium.webdriver.common.appiumby import AppiumBy import time logger = logging.getLogger(__name__) class MobileTestUtils: """移动端测试工具类""" @staticmethod def wait_for_element( driver: webdriver.Remote, selector: str, platform: str, timeout: int = 10 ): """等待元素出现""" from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC by, value = MobileTestUtils._parse_selector(selector, platform) wait = WebDriverWait(driver, timeout) return wait.until(EC.presence_of_element_located((by, value))) @staticmethod def scroll_to_element( driver: webdriver.Remote, selector: str, platform: str, max_scrolls: int = 10 ): """滚动到元素可见""" for i in range(max_scrolls): try: element = driver.find_element( *MobileTestUtils._parse_selector(selector, platform) ) if element.is_displayed(): return element except: pass # 向下滑动 size = driver.get_window_size() driver.swipe( size['width'] // 2, size['height'] * 3 // 4, size['width'] // 2, size['height'] // 4, 500 ) time.sleep(0.5) raise Exception(f"Element {selector} not found after {max_scrolls} scrolls") @staticmethod def get_toast_message(driver: webdriver.Remote, timeout: int = 5) -> Optional[str]: """获取 Toast 消息(仅 Android)""" try: from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC toast_element = WebDriverWait(driver, timeout).until( EC.presence_of_element_located(( AppiumBy.XPATH, "//android.widget.Toast" )) ) return toast_element.text except: return None @staticmethod def switch_to_webview(driver: webdriver.Remote) -> bool: """切换到 WebView 上下文""" try: contexts = driver.contexts for context in contexts: if 'WEBVIEW' in context: driver.switch_to.context(context) logger.info(f"Switched to context: {context}") return True return False except Exception as e: logger.error(f"Failed to switch to WebView: {str(e)}") return False @staticmethod def switch_to_native(driver: webdriver.Remote) -> bool: """切换到 Native 上下文""" try: driver.switch_to.context('NATIVE_APP') logger.info("Switched to NATIVE_APP context") return True except Exception as e: logger.error(f"Failed to switch to Native: {str(e)}") return False @staticmethod def take_element_screenshot( driver: webdriver.Remote, selector: str, platform: str, filepath: str ): """截取元素截图""" element = driver.find_element( *MobileTestUtils._parse_selector(selector, platform) ) element.screenshot(filepath) @staticmethod def get_device_info(driver: webdriver.Remote) -> Dict[str, Any]: """获取设备信息""" capabilities = driver.capabilities return { 'platform': capabilities.get('platformName'), 'platform_version': capabilities.get('platformVersion'), 'device_name': capabilities.get('deviceName'), 'device_udid': capabilities.get('udid'), 'automation_name': capabilities.get('automationName'), 'app': capabilities.get('app'), 'browser': capabilities.get('browserName') } @staticmethod def set_clipboard(driver: webdriver.Remote, text: str): """设置剪贴板内容""" import base64 encoded_text = base64.b64encode(text.encode()).decode() driver.set_clipboard(encoded_text, 'plaintext') @staticmethod def get_clipboard(driver: webdriver.Remote) -> str: """获取剪贴板内容""" import base64 encoded_text = driver.get_clipboard('plaintext') return base64.b64decode(encoded_text).decode() @staticmethod def _parse_selector(selector: str, platform: str) -> tuple: """解析选择器""" if ':' in selector: selector_type, selector_value = selector.split(':', 1) if selector_type == 'id': return (AppiumBy.ID, selector_value) elif selector_type == 'xpath': return (AppiumBy.XPATH, selector_value) elif selector_type == 'class': return (AppiumBy.CLASS_NAME, selector_value) elif selector_type == 'accessibility': return (AppiumBy.ACCESSIBILITY_ID, selector_value) elif selector_type == 'android' and platform == 'android': return (AppiumBy.ANDROID_UIAUTOMATOR, selector_value) elif selector_type == 'ios' and platform == 'ios': return (AppiumBy.IOS_PREDICATE, selector_value) if selector.startswith('//'): return (AppiumBy.XPATH, selector) return (AppiumBy.ID, selector) class AndroidUtils: """Android 专用工具""" @staticmethod def press_back(driver: webdriver.Remote): """按返回键""" driver.press_keycode(4) @staticmethod def press_home(driver: webdriver.Remote): """按 Home 键""" driver.press_keycode(3) @staticmethod def press_menu(driver: webdriver.Remote): """按菜单键""" driver.press_keycode(82) @staticmethod def open_notifications(driver: webdriver.Remote): """打开通知栏""" driver.open_notifications() @staticmethod def get_network_connection(driver: webdriver.Remote) -> int: """获取网络连接状态""" return driver.network_connection @staticmethod def set_network_connection(driver: webdriver.Remote, connection_type: int): """设置网络连接状态""" driver.set_network_connection(connection_type) @staticmethod def start_activity( driver: webdriver.Remote, app_package: str, app_activity: str, **kwargs ): """启动 Activity""" driver.start_activity(app_package, app_activity, **kwargs) @staticmethod def get_current_activity(driver: webdriver.Remote) -> str: """获取当前 Activity""" return driver.current_activity @staticmethod def get_current_package(driver: webdriver.Remote) -> str: """获取当前包名""" return driver.current_package class IOSUtils: """iOS 专用工具""" @staticmethod def shake(driver: webdriver.Remote): """摇晃设备""" driver.shake() @staticmethod def lock(driver: webdriver.Remote, seconds: int = 0): """锁定设备""" driver.lock(seconds) @staticmethod def unlock(driver: webdriver.Remote): """解锁设备""" driver.unlock() @staticmethod def is_locked(driver: webdriver.Remote) -> bool: """检查设备是否锁定""" return driver.is_locked() @staticmethod def touch_id(driver: webdriver.Remote, match: bool = True): """模拟 Touch ID""" driver.touch_id(match) @staticmethod def toggle_touch_id_enrollment(driver: webdriver.Remote, enabled: bool = True): """切换 Touch ID 注册状态""" driver.toggle_touch_id_enrollment(enabled)

TestMaster 自动化测试平台 - 第五部分:性能测试模块实现

2.5 性能测试模块

2.5.1 性能测试服务主文件backend/services/performance/src/main.py

""" TestMaster 性能测试服务 支持 HTTP/HTTPS 压力测试、负载测试、并发测试 集成 Locust 和 JMeter """ from fastapi import FastAPI, HTTPException, BackgroundTasks, WebSocket, WebSocketDisconnect from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel, HttpUrl from typing import List, Dict, Any, Optional import logging import asyncio from datetime import datetime import json import uuid from locust_runner import LocustRunner from jmeter_runner import JMeterRunner from performance_analyzer import PerformanceAnalyzer from websocket_manager import WebSocketManager # 配置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) # 创建 FastAPI 应用 app = FastAPI( title="TestMaster Performance Testing", description="Performance testing service with Locust and JMeter integration", version="1.0.0" ) # CORS 配置 app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # 请求模型 class PerformanceTestRequest(BaseModel): test_id: str name: str target_url: HttpUrl test_type: str # load, stress, spike, endurance, scalability duration: int # 测试持续时间(秒) users: int # 并发用户数 spawn_rate: int # 每秒启动的用户数 scenarios: List[Dict[str, Any]] runner: str = "locust" # locust 或 jmeter options: Optional[Dict[str, Any]] = {} class LoadPattern(BaseModel): pattern_type: str # constant, ramp_up, step, spike min_users: int max_users: int duration: int steps: Optional[int] = None class PerformanceTestResponse(BaseModel): test_id: str status: str started_at: datetime completed_at: Optional[datetime] = None duration_seconds: Optional[int] = None metrics: Optional[Dict[str, Any]] = None report_url: Optional[str] = None # 全局实例 locust_runner = LocustRunner() jmeter_runner = JMeterRunner() performance_analyzer = PerformanceAnalyzer() ws_manager = WebSocketManager() # 存储运行中的测试 running_tests: Dict[str, Dict[str, Any]] = {} # API 端点 @app.get("/health") async def health_check(): """健康检查""" return { "status": "healthy", "service": "performance-testing", "version": "1.0.0", "runners": { "locust": locust_runner.is_available(), "jmeter": jmeter_runner.is_available() } } @app.post("/tests/start", response_model=PerformanceTestResponse) async def start_performance_test( request: PerformanceTestRequest, background_tasks: BackgroundTasks ): """ 启动性能测试 Args: request: 性能测试请求 background_tasks: 后台任务 Returns: 测试响应 """ logger.info(f"Starting performance test: {request.name}") # 选择执行器 if request.runner == "jmeter": runner = jmeter_runner else: runner = locust_runner # 创建测试配置 test_config = { "test_id": request.test_id, "name": request.name, "target_url": str(request.target_url), "test_type": request.test_type, "duration": request.duration, "users": request.users, "spawn_rate": request.spawn_rate, "scenarios": request.scenarios, "options": request.options, "started_at": datetime.now() } # 保存到运行中的测试 running_tests[request.test_id] = { "config": test_config, "status": "starting", "runner": request.runner } # 后台执行测试 background_tasks.add_task( run_performance_test, test_id=request.test_id, runner=runner, config=test_config ) return PerformanceTestResponse( test_id=request.test_id, status="starting", started_at=test_config["started_at"] ) @app.get("/tests/{test_id}/status") async def get_test_status(test_id: str): """获取测试状态""" if test_id not in running_tests: raise HTTPException(status_code=404, detail="Test not found") test_info = running_tests[test_id] return { "test_id": test_id, "status": test_info["status"], "config": test_info["config"], "metrics": test_info.get("metrics"), "started_at": test_info["config"]["started_at"].isoformat(), "completed_at": test_info.get("completed_at").isoformat() if test_info.get("completed_at") else None } @app.post("/tests/{test_id}/stop") async def stop_test(test_id: str): """停止测试""" if test_id not in running_tests: raise HTTPException(status_code=404, detail="Test not found") test_info = running_tests[test_id] runner_type = test_info["runner"] if runner_type == "jmeter": await jmeter_runner.stop_test(test_id) else: await locust_runner.stop_test(test_id) test_info["status"] = "stopped" test_info["completed_at"] = datetime.now() return {"message": "Test stopped successfully"} @app.get("/tests/{test_id}/metrics") async def get_test_metrics(test_id: str): """获取测试实时指标""" if test_id not in running_tests: raise HTTPException(status_code=404, detail="Test not found") test_info = running_tests[test_id] runner_type = test_info["runner"] if runner_type == "jmeter": metrics = await jmeter_runner.get_metrics(test_id) else: metrics = await locust_runner.get_metrics(test_id) return metrics @app.get("/tests/{test_id}/report") async def get_test_report(test_id: str): """获取测试报告""" if test_id not in running_tests: raise HTTPException(status_code=404, detail="Test not found") test_info = running_tests[test_id] # 生成详细报告 report = await performance_analyzer.generate_report( test_id=test_id, test_info=test_info ) return report @app.websocket("/ws/tests/{test_id}") async def websocket_endpoint(websocket: WebSocket, test_id: str): """WebSocket 实时推送测试指标""" await ws_manager.connect(websocket, test_id) try: while True: # 接收客户端消息(保持连接) data = await websocket.receive_text() # 可以处理客户端发送的控制命令 if data == "ping": await websocket.send_text("pong") except WebSocketDisconnect: ws_manager.disconnect(websocket, test_id) @app.get("/templates") async def get_test_templates(): """获取性能测试模板""" return { "templates": [ { "id": "api_load_test", "name": "API 负载测试", "description": "测试 API 在正常负载下的性能", "test_type": "load", "default_users": 100, "default_duration": 300, "scenarios": [ { "name": "GET Request", "method": "GET", "path": "/api/endpoint", "weight": 70 }, { "name": "POST Request", "method": "POST", "path": "/api/endpoint", "body": {"key": "value"}, "weight": 30 } ] }, { "id": "stress_test", "name": "压力测试", "description": "测试系统在极限负载下的表现", "test_type": "stress", "default_users": 1000, "default_duration": 600, "load_pattern": { "pattern_type": "ramp_up", "min_users": 100, "max_users": 1000, "duration": 600 } }, { "id": "spike_test", "name": "尖峰测试", "description": "测试系统应对突发流量的能力", "test_type": "spike", "default_users": 500, "default_duration": 300, "load_pattern": { "pattern_type": "spike", "min_users": 10, "max_users": 500, "duration": 300 } }, { "id": "endurance_test", "name": "耐久测试", "description": "测试系统长时间运行的稳定性", "test_type": "endurance", "default_users": 200, "default_duration": 7200, "load_pattern": { "pattern_type": "constant", "min_users": 200, "max_users": 200, "duration": 7200 } } ] } async def run_performance_test( test_id: str, runner: Any, config: Dict[str, Any] ): """后台执行性能测试""" try: logger.info(f"Running performance test {test_id}") # 更新状态 running_tests[test_id]["status"] = "running" # 执行测试 result = await runner.run_test(config) # 更新结果 running_tests[test_id]["status"] = "completed" running_tests[test_id]["completed_at"] = datetime.now() running_tests[test_id]["metrics"] = result["metrics"] running_tests[test_id]["report_url"] = result.get("report_url") # 分析结果 analysis = await performance_analyzer.analyze_results(result) running_tests[test_id]["analysis"] = analysis # 通过 WebSocket 推送完成通知 await ws_manager.broadcast( test_id, { "type": "test_completed", "test_id": test_id, "status": "completed", "metrics": result["metrics"] } ) logger.info(f"Performance test {test_id} completed successfully") except Exception as e: logger.error(f"Performance test {test_id} failed: {str(e)}", exc_info=True) running_tests[test_id]["status"] = "failed" running_tests[test_id]["completed_at"] = datetime.now() running_tests[test_id]["error"] = str(e) # 推送错误通知 await ws_manager.broadcast( test_id, { "type": "test_failed", "test_id": test_id, "error": str(e) } ) if __name__ == "__main__": import uvicorn print(""" ╔════════════════════════════════════════════════════════════════╗ ║ ║ ║ 📊 TestMaster Performance Testing Service ║ ║ ║ ║ Server: http://localhost:8003 ║ ║ Docs: http://localhost:8003/docs ║ ║ WebSocket: ws://localhost:8003/ws/tests/{test_id} ║ ║ ║ ║ Runners: ║ ║ - Locust (Python-based) ║ ║ - JMeter (Java-based) ║ ║ ║ ╚════════════════════════════════════════════════════════════════╝ """) uvicorn.run( "main:app", host="0.0.0.0", port=8003, reload=True, log_level="info" )

2.5.2 Locust 执行器backend/services/performance/src/locust_runner.py

""" Locust 性能测试执行器 基于 Python 的分布式负载测试工具 """ import logging from typing import Dict, Any, List, Optional import asyncio import subprocess import os from pathlib import Path import json import tempfile import signal import psutil logger = logging.getLogger(__name__) class LocustRunner: """Locust 测试执行器""" def __init__(self): self.results_dir = Path("./performance_results/locust") self.results_dir.mkdir(parents=True, exist_ok=True) self.running_processes: Dict[str, subprocess.Popen] = {} logger.info("LocustRunner initialized") def is_available(self) -> bool: """检查 Locust 是否可用""" try: result = subprocess.run( ["locust", "--version"], capture_output=True, text=True, timeout=5 ) return result.returncode == 0 except: return False async def run_test(self, config: Dict[str, Any]) -> Dict[str, Any]: """ 运行 Locust 测试 Args: config: 测试配置 Returns: 测试结果 """ test_id = config["test_id"] logger.info(f"Starting Locust test {test_id}") # 生成 Locust 文件 locustfile_path = await self._generate_locustfile(config) # 准备输出文件 results_file = self.results_dir / f"{test_id}_results.json" html_report = self.results_dir / f"{test_id}_report.html" csv_prefix = self.results_dir / f"{test_id}" # 构建 Locust 命令 cmd = [ "locust", "-f", str(locustfile_path), "--headless", "--users", str(config["users"]), "--spawn-rate", str(config["spawn_rate"]), "--run-time", f"{config['duration']}s", "--host", config["target_url"], "--json", "--html", str(html_report), "--csv", str(csv_prefix), "--loglevel", "INFO" ] # 添加额外选项 if config.get("options", {}).get("stop_on_error"): cmd.append("--stop-on-error") if config.get("options", {}).get("expect_workers"): cmd.extend(["--expect-workers", str(config["options"]["expect_workers"])]) logger.info(f"Locust command: {' '.join(cmd)}") # 启动 Locust 进程 process = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True ) self.running_processes[test_id] = process # 等待测试完成 try: stdout, stderr = await asyncio.wait_for( asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ).communicate(), timeout=config["duration"] + 60 ) # 解析结果 metrics = await self._parse_results(test_id, csv_prefix) return { "test_id": test_id, "status": "completed", "metrics": metrics, "report_url": f"/performance_results/locust/{test_id}_report.html" } except asyncio.TimeoutError: logger.error(f"Locust test {test_id} timed out") await self.stop_test(test_id) raise except Exception as e: logger.error(f"Locust test {test_id} failed: {str(e)}", exc_info=True) raise finally: if test_id in self.running_processes: del self.running_processes[test_id] async def _generate_locustfile(self, config: Dict[str, Any]) -> Path: """生成 Locustfile""" test_id = config["test_id"] scenarios = config["scenarios"] # 构建 Locustfile 内容 locustfile_content = f""" from locust import HttpUser, task, between, constant import json class PerformanceTestUser(HttpUser): wait_time = between(1, 3) host = "{config['target_url']}" """ # 添加场景任务 for i, scenario in enumerate(scenarios): method = scenario.get("method", "GET").upper() path = scenario.get("path", "/") weight = scenario.get("weight", 1) headers = scenario.get("headers", {}) body = scenario.get("body") task_name = scenario.get("name", f"task_{i}").replace(" ", "_") locustfile_content += f""" @task({weight}) def {task_name}(self): headers = {json.dumps(headers)} """ if method == "GET": locustfile_content += f""" with self.client.get("{path}", headers=headers, catch_response=True) as response: if response.status_code != 200: response.failure(f"Got status code {{response.status_code}}") """ elif method == "POST": body_str = json.dumps(body) if body else "{}" locustfile_content += f""" data = {body_str} with self.client.post("{path}", json=data, headers=headers, catch_response=True) as response: if response.status_code not in [200, 201]: response.failure(f"Got status code {{response.status_code}}") """ elif method == "PUT": body_str = json.dumps(body) if body else "{}" locustfile_content += f""" data = {body_str} with self.client.put("{path}", json=data, headers=headers, catch_response=True) as response: if response.status_code != 200: response.failure(f"Got status code {{response.status_code}}") """ elif method == "DELETE": locustfile_content += f""" with self.client.delete("{path}", headers=headers, catch_response=True) as response: if response.status_code not in [200, 204]: response.failure(f"Got status code {{response.status_code}}") """ # 保存 Locustfile locustfile_path = self.results_dir / f"{test_id}_locustfile.py" with open(locustfile_path, 'w') as f: f.write(locustfile_content) logger.info(f"Generated Locustfile: {locustfile_path}") return locustfile_path async def _parse_results(self, test_id: str, csv_prefix: Path) -> Dict[str, Any]: """解析 Locust 结果""" metrics = { "requests": {}, "failures": [], "summary": {} } # 解析请求统计 stats_file = Path(f"{csv_prefix}_stats.csv") if stats_file.exists(): import csv with open(stats_file, 'r') as f: reader = csv.DictReader(f) rows = list(reader) for row in rows: if row['Type'] == 'Aggregated': metrics["summary"] = { "total_requests": int(row['Request Count']), "total_failures": int(row['Failure Count']), "median_response_time": float(row['Median Response Time']), "average_response_time": float(row['Average Response Time']), "min_response_time": float(row['Min Response Time']), "max_response_time": float(row['Max Response Time']), "requests_per_second": float(row['Requests/s']), "failures_per_second": float(row['Failures/s']) } else: metrics["requests"][row['Name']] = { "method": row['Type'], "count": int(row['Request Count']), "failures": int(row['Failure Count']), "median_response_time": float(row['Median Response Time']), "average_response_time": float(row['Average Response Time']), "min_response_time": float(row['Min Response Time']), "max_response_time": float(row['Max Response Time']), "requests_per_second": float(row['Requests/s']) } # 解析失败记录 failures_file = Path(f"{csv_prefix}_failures.csv") if failures_file.exists(): import csv with open(failures_file, 'r') as f: reader = csv.DictReader(f) for row in reader: metrics["failures"].append({ "method": row['Method'], "name": row['Name'], "error": row['Error'], "occurrences": int(row['Occurrences']) }) return metrics async def stop_test(self, test_id: str): """停止测试""" if test_id in self.running_processes: process = self.running_processes[test_id] try: # 发送 SIGTERM 信号 process.terminate() # 等待进程结束 try: process.wait(timeout=10) except subprocess.TimeoutExpired: # 强制杀死进程 process.kill() process.wait() logger.info(f"Stopped Locust test {test_id}") except Exception as e: logger.error(f"Error stopping test {test_id}: {str(e)}") finally: del self.running_processes[test_id] async def get_metrics(self, test_id: str) -> Dict[str, Any]: """获取实时指标""" # Locust 提供了 Web UI 和 REST API # 这里可以通过 HTTP 请求获取实时指标 try: import aiohttp async with aiohttp.ClientSession() as session: async with session.get(f"http://localhost:8089/stats/requests") as response: if response.status == 200: data = await response.json() return data except Exception as e: logger.error(f"Error getting metrics: {str(e)}") return {}

2.5.3 JMeter 执行器backend/services/performance/src/jmeter_runner.py

""" JMeter 性能测试执行器 基于 Java 的成熟负载测试工具 """ import logging from typing import Dict, Any, List, Optional import asyncio import subprocess import os from pathlib import Path import json import xml.etree.ElementTree as ET from xml.dom import minidom import shutil logger = logging.getLogger(__name__) class JMeterRunner: """JMeter 测试执行器""" def __init__(self): self.results_dir = Path("./performance_results/jmeter") self.results_dir.mkdir(parents=True, exist_ok=True) self.jmeter_home = os.getenv("JMETER_HOME", "/opt/apache-jmeter") self.jmeter_bin = Path(self.jmeter_home) / "bin" / "jmeter" self.running_processes: Dict[str, subprocess.Popen] = {} logger.info("JMeterRunner initialized") def is_available(self) -> bool: """检查 JMeter 是否可用""" try: if not self.jmeter_bin.exists(): return False result = subprocess.run( [str(self.jmeter_bin), "--version"], capture_output=True, text=True, timeout=5 ) return result.returncode == 0 except: return False async def run_test(self, config: Dict[str, Any]) -> Dict[str, Any]: """ 运行 JMeter 测试 Args: config: 测试配置 Returns: 测试结果 """ test_id = config["test_id"] logger.info(f"Starting JMeter test {test_id}") # 生成 JMX 文件 jmx_path = await self._generate_jmx(config) # 准备输出文件 results_file = self.results_dir / f"{test_id}_results.jtl" html_report_dir = self.results_dir / f"{test_id}_report" log_file = self.results_dir / f"{test_id}.log" # 构建 JMeter 命令 cmd = [ str(self.jmeter_bin), "-n", # 非 GUI 模式 "-t", str(jmx_path), # 测试计划文件 "-l", str(results_file), # 结果文件 "-j", str(log_file), # 日志文件 "-e", # 生成报告 "-o", str(html_report_dir) # 报告输出目录 ] # 添加系统属性 if config.get("options", {}).get("java_opts"): java_opts = config["options"]["java_opts"] for key, value in java_opts.items(): cmd.extend(["-J", f"{key}={value}"]) logger.info(f"JMeter command: {' '.join(cmd)}") # 启动 JMeter 进程 process = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True ) self.running_processes[test_id] = process # 等待测试完成 try: stdout, stderr = process.communicate(timeout=config["duration"] + 120) if process.returncode != 0: logger.error(f"JMeter test failed: {stderr}") raise Exception(f"JMeter test failed: {stderr}") # 解析结果 metrics = await self._parse_results(results_file) return { "test_id": test_id, "status": "completed", "metrics": metrics, "report_url": f"/performance_results/jmeter/{test_id}_report/index.html" } except subprocess.TimeoutExpired: logger.error(f"JMeter test {test_id} timed out") await self.stop_test(test_id) raise except Exception as e: logger.error(f"JMeter test {test_id} failed: {str(e)}", exc_info=True) raise finally: if test_id in self.running_processes: del self.running_processes[test_id] async def _generate_jmx(self, config: Dict[str, Any]) -> Path: """生成 JMX 测试计划文件""" test_id = config["test_id"] # 创建根元素 root = ET.Element("jmeterTestPlan", { "version": "1.2", "properties": "5.0", "jmeter": "5.5" }) # 添加哈希树 hash_tree = ET.SubElement(root, "hashTree") # 测试计划 test_plan = ET.SubElement(hash_tree, "TestPlan", { "guiclass": "TestPlanGui", "testclass": "TestPlan", "testname": config["name"], "enabled": "true" }) # 测试计划属性 ET.SubElement(test_plan, "stringProp", {"name": "TestPlan.comments"}).text = "" ET.SubElement(test_plan, "boolProp", {"name": "TestPlan.functional_mode"}).text = "false" ET.SubElement(test_plan, "boolProp", {"name": "TestPlan.serialize_threadgroups"}).text = "false" # 用户定义变量 arguments = ET.SubElement(test_plan, "elementProp", { "name": "TestPlan.user_defined_variables", "elementType": "Arguments", "guiclass": "ArgumentsPanel", "testclass": "Arguments", "enabled": "true" }) ET.SubElement(arguments, "collectionProp", {"name": "Arguments.arguments"}) test_plan_tree = ET.SubElement(hash_tree, "hashTree") # 线程组 thread_group = ET.SubElement(test_plan_tree, "ThreadGroup", { "guiclass": "ThreadGroupGui", "testclass": "ThreadGroup", "testname": "Thread Group", "enabled": "true" }) ET.SubElement(thread_group, "stringProp", {"name": "ThreadGroup.on_sample_error"}).text = "continue" # 线程属性 thread_props = ET.SubElement(thread_group, "elementProp", { "name": "ThreadGroup.main_controller", "elementType": "LoopController", "guiclass": "LoopControlPanel", "testclass": "LoopController", "enabled": "true" }) ET.SubElement(thread_props, "boolProp", {"name": "LoopController.continue_forever"}).text = "false" ET.SubElement(thread_props, "intProp", {"name": "LoopController.loops"}).text = "-1" ET.SubElement(thread_group, "stringProp", {"name": "ThreadGroup.num_threads"}).text = str(config["users"]) ET.SubElement(thread_group, "stringProp", {"name": "ThreadGroup.ramp_time"}).text = str(config["users"] // config["spawn_rate"]) ET.SubElement(thread_group, "boolProp", {"name": "ThreadGroup.scheduler"}).text = "true" ET.SubElement(thread_group, "stringProp", {"name": "ThreadGroup.duration"}).text = str(config["duration"]) ET.SubElement(thread_group, "stringProp", {"name": "ThreadGroup.delay"}).text = "0" thread_group_tree = ET.SubElement(test_plan_tree, "hashTree") # 添加场景(HTTP 采样器) for scenario in config["scenarios"]: sampler = self._create_http_sampler(scenario, config["target_url"]) thread_group_tree.append(sampler) thread_group_tree.append(ET.Element("hashTree")) # 添加监听器 # 结果树 result_tree = ET.SubElement(thread_group_tree, "ResultCollector", { "guiclass": "ViewResultsFullVisualizer", "testclass": "ResultCollector", "testname": "View Results Tree", "enabled": "true" }) ET.SubElement(result_tree, "boolProp", {"name": "ResultCollector.error_logging"}).text = "false" thread_group_tree.append(ET.Element("hashTree")) # 聚合报告 aggregate = ET.SubElement(thread_group_tree, "ResultCollector", { "guiclass": "StatVisualizer", "testclass": "ResultCollector", "testname": "Aggregate Report", "enabled": "true" }) ET.SubElement(aggregate, "boolProp", {"name": "ResultCollector.error_logging"}).text = "false" thread_group_tree.append(ET.Element("hashTree")) # 格式化 XML xml_str = minidom.parseString(ET.tostring(root)).toprettyxml(indent=" ") # 保存 JMX 文件 jmx_path = self.results_dir / f"{test_id}.jmx" with open(jmx_path, 'w') as f: f.write(xml_str) logger.info(f"Generated JMX file: {jmx_path}") return jmx_path def _create_http_sampler(self, scenario: Dict[str, Any], base_url: str) -> ET.Element: """创建 HTTP 采样器""" from urllib.parse import urlparse parsed_url = urlparse(base_url) sampler = ET.Element("HTTPSamplerProxy", { "guiclass": "HttpTestSampleGui", "testclass": "HTTPSamplerProxy", "testname": scenario.get("name", "HTTP Request"), "enabled": "true" }) # 基本属性 ET.SubElement(sampler, "stringProp", {"name": "HTTPSampler.domain"}).text = parsed_url.netloc ET.SubElement(sampler, "stringProp", {"name": "HTTPSampler.port"}).text = str(parsed_url.port or (443 if parsed_url.scheme == 'https' else 80)) ET.SubElement(sampler, "stringProp", {"name": "HTTPSampler.protocol"}).text = parsed_url.scheme ET.SubElement(sampler, "stringProp", {"name": "HTTPSampler.path"}).text = scenario.get("path", "/") ET.SubElement(sampler, "stringProp", {"name": "HTTPSampler.method"}).text = scenario.get("method", "GET") # 请求体 if scenario.get("body"): body_data = json.dumps(scenario["body"]) if isinstance(scenario["body"], dict) else scenario["body"] ET.SubElement(sampler, "boolProp", {"name": "HTTPSampler.postBodyRaw"}).text = "true" arguments = ET.SubElement(sampler, "elementProp", { "name": "HTTPsampler.Arguments", "elementType": "Arguments" }) collection = ET.SubElement(arguments, "collectionProp", {"name": "Arguments.arguments"}) argument = ET.SubElement(collection, "elementProp", { "name": "", "elementType": "HTTPArgument" }) ET.SubElement(argument, "boolProp", {"name": "HTTPArgument.always_encode"}).text = "false" ET.SubElement(argument, "stringProp", {"name": "Argument.value"}).text = body_data ET.SubElement(argument, "stringProp", {"name": "Argument.metadata"}).text = "=" # 请求头 if scenario.get("headers"): header_manager = ET.SubElement(sampler, "HeaderManager", { "guiclass": "HeaderPanel", "testclass": "HeaderManager", "testname": "HTTP Header Manager", "enabled": "true" }) collection = ET.SubElement(header_manager, "collectionProp", {"name": "HeaderManager.headers"}) for key, value in scenario["headers"].items(): header = ET.SubElement(collection, "elementProp", { "name": "", "elementType": "Header" }) ET.SubElement(header, "stringProp", {"name": "Header.name"}).text = key ET.SubElement(header, "stringProp", {"name": "Header.value"}).text = value return sampler async def _parse_results(self, results_file: Path) -> Dict[str, Any]: """解析 JMeter 结果文件 (JTL)""" metrics = { "requests": {}, "summary": { "total_requests": 0, "total_failures": 0, "total_bytes": 0, "total_time": 0 } } if not results_file.exists(): return metrics try: # 解析 XML 格式的 JTL 文件 tree = ET.parse(results_file) root = tree.getroot() response_times = [] for sample in root.findall('.//httpSample') or root.findall('.//sample'): label = sample.get('lb', 'Unknown') success = sample.get('s', 'true') == 'true' time_ms = int(sample.get('t', '0')) bytes_received = int(sample.get('by', '0')) response_times.append(time_ms) if label not in metrics["requests"]: metrics["requests"][label] = { "count": 0, "failures": 0, "total_time": 0, "total_bytes": 0, "response_times": [] } metrics["requests"][label]["count"] += 1 metrics["requests"][label]["total_time"] += time_ms metrics["requests"][label]["total_bytes"] += bytes_received metrics["requests"][label]["response_times"].append(time_ms) if not success: metrics["requests"][label]["failures"] += 1 metrics["summary"]["total_requests"] += 1 metrics["summary"]["total_time"] += time_ms metrics["summary"]["total_bytes"] += bytes_received if not success: metrics["summary"]["total_failures"] += 1 # 计算统计指标 for label, data in metrics["requests"].items(): times = sorted(data["response_times"]) count = len(times) if count > 0: data["min_response_time"] = times[0] data["max_response_time"] = times[-1] data["average_response_time"] = sum(times) / count data["median_response_time"] = times[count // 2] data["percentile_90"] = times[int(count * 0.9)] data["percentile_95"] = times[int(count * 0.95)] data["percentile_99"] = times[int(count * 0.99)] # 删除原始响应时间列表(太大) del data["response_times"] # 计算总体统计 if response_times: times = sorted(response_times) count = len(times) metrics["summary"]["min_response_time"] = times[0] metrics["summary"]["max_response_time"] = times[-1] metrics["summary"]["average_response_time"] = sum(times) / count metrics["summary"]["median_response_time"] = times[count // 2] metrics["summary"]["percentile_90"] = times[int(count * 0.9)] metrics["summary"]["percentile_95"] = times[int(count * 0.95)] metrics["summary"]["percentile_99"] = times[int(count * 0.99)] except Exception as e: logger.error(f"Error parsing JMeter results: {str(e)}", exc_info=True) return metrics async def stop_test(self, test_id: str): """停止测试""" if test_id in self.running_processes: process = self.running_processes[test_id] try: # JMeter 使用 shutdown.sh 脚本优雅停止 shutdown_script = Path(self.jmeter_home) / "bin" / "shutdown.sh" if shutdown_script.exists(): subprocess.run([str(shutdown_script)], timeout=10) else: process.terminate() # 等待进程结束 try: process.wait(timeout=30) except subprocess.TimeoutExpired: process.kill() process.wait() logger.info(f"Stopped JMeter test {test_id}") except Exception as e: logger.error(f"Error stopping test {test_id}: {str(e)}") finally: del self.running_processes[test_id] async def get_metrics(self, test_id: str) -> Dict[str, Any]: """获取实时指标""" # JMeter 在运行时不提供实时指标 API # 可以通过解析日志文件或使用 Backend Listener 插件 return {}

继续下一部分...

2.5.4 性能分析器backend/services/performance/src/performance_analyzer.py

""" 性能测试结果分析器 提供智能分析和建议 """ import logging from typing import Dict, Any, List import statistics from datetime import datetime logger = logging.getLogger(__name__) class PerformanceAnalyzer: """性能分析器""" def __init__(self): # 性能阈值配置 self.thresholds = { "response_time": { "excellent": 100, # ms "good": 500, "acceptable": 1000, "poor": 3000 }, "error_rate": { "excellent": 0.1, # % "good": 1.0, "acceptable": 5.0, "poor": 10.0 }, "throughput": { "excellent": 1000, # requests/s "good": 500, "acceptable": 100, "poor": 50 } } async def analyze_results(self, result: Dict[str, Any]) -> Dict[str, Any]: """ 分析性能测试结果 Args: result: 测试结果 Returns: 分析报告 """ metrics = result.get("metrics", {}) summary = metrics.get("summary", {}) analysis = { "overall_score": 0, "performance_grade": "N/A", "bottlenecks": [], "recommendations": [], "strengths": [], "issues": [] } # 分析响应时间 response_time_analysis = self._analyze_response_time(summary) analysis["response_time"] = response_time_analysis # 分析错误率 error_rate_analysis = self._analyze_error_rate(summary) analysis["error_rate"] = error_rate_analysis # 分析吞吐量 throughput_analysis = self._analyze_throughput(summary) analysis["throughput"] = throughput_analysis # 识别瓶颈 bottlenecks = self._identify_bottlenecks(metrics) analysis["bottlenecks"] = bottlenecks # 生成建议 recommendations = self._generate_recommendations(analysis) analysis["recommendations"] = recommendations # 计算总体评分 overall_score = self._calculate_overall_score(analysis) analysis["overall_score"] = overall_score analysis["performance_grade"] = self._get_performance_grade(overall_score) return analysis def _analyze_response_time(self, summary: Dict[str, Any]) -> Dict[str, Any]: """分析响应时间""" avg_response_time = summary.get("average_response_time", 0) median_response_time = summary.get("median_response_time", 0) percentile_95 = summary.get("percentile_95", 0) percentile_99 = summary.get("percentile_99", 0) analysis = { "average": avg_response_time, "median": median_response_time, "p95": percentile_95, "p99": percentile_99, "rating": "N/A", "score": 0 } # 评级(基于 P95) if percentile_95 < self.thresholds["response_time"]["excellent"]: analysis["rating"] = "Excellent" analysis["score"] = 100 elif percentile_95 < self.thresholds["response_time"]["good"]: analysis["rating"] = "Good" analysis["score"] = 80 elif percentile_95 < self.thresholds["response_time"]["acceptable"]: analysis["rating"] = "Acceptable" analysis["score"] = 60 elif percentile_95 < self.thresholds["response_time"]["poor"]: analysis["rating"] = "Poor" analysis["score"] = 40 else: analysis["rating"] = "Very Poor" analysis["score"] = 20 return analysis def _analyze_error_rate(self, summary: Dict[str, Any]) -> Dict[str, Any]: """分析错误率""" total_requests = summary.get("total_requests", 0) total_failures = summary.get("total_failures", 0) error_rate = (total_failures / total_requests * 100) if total_requests > 0 else 0 analysis = { "total_requests": total_requests, "total_failures": total_failures, "error_rate_percent": round(error_rate, 2), "rating": "N/A", "score": 0 } # 评级 if error_rate < self.thresholds["error_rate"]["excellent"]: analysis["rating"] = "Excellent" analysis["score"] = 100 elif error_rate < self.thresholds["error_rate"]["good"]: analysis["rating"] = "Good" analysis["score"] = 80 elif error_rate < self.thresholds["error_rate"]["acceptable"]: analysis["rating"] = "Acceptable" analysis["score"] = 60 elif error_rate < self.thresholds["error_rate"]["poor"]: analysis["rating"] = "Poor" analysis["score"] = 40 else: analysis["rating"] = "Very Poor" analysis["score"] = 20 return analysis def _analyze_throughput(self, summary: Dict[str, Any]) -> Dict[str, Any]: """分析吞吐量""" requests_per_second = summary.get("requests_per_second", 0) analysis = { "requests_per_second": round(requests_per_second, 2), "rating": "N/A", "score": 0 } # 评级 if requests_per_second > self.thresholds["throughput"]["excellent"]: analysis["rating"] = "Excellent" analysis["score"] = 100 elif requests_per_second > self.thresholds["throughput"]["good"]: analysis["rating"] = "Good" analysis["score"] = 80 elif requests_per_second > self.thresholds["throughput"]["acceptable"]: analysis["rating"] = "Acceptable" analysis["score"] = 60 elif requests_per_second > self.thresholds["throughput"]["poor"]: analysis["rating"] = "Poor" analysis["score"] = 40 else: analysis["rating"] = "Very Poor" analysis["score"] = 20 return analysis def _identify_bottlenecks(self, metrics: Dict[str, Any]) -> List[Dict[str, Any]]: """识别性能瓶颈""" bottlenecks = [] requests = metrics.get("requests", {}) # 找出最慢的请求 slowest_requests = sorted( requests.items(), key=lambda x: x[1].get("average_response_time", 0), reverse=True )[:5] for name, data in slowest_requests: avg_time = data.get("average_response_time", 0) if avg_time > self.thresholds["response_time"]["acceptable"]: bottlenecks.append({ "type": "slow_request", "name": name, "average_response_time": avg_time, "severity": "high" if avg_time > self.thresholds["response_time"]["poor"] else "medium", "description": f"Request '{name}' has high average response time: {avg_time}ms" }) # 找出错误率最高的请求 error_requests = [ (name, data) for name, data in requests.items() if data.get("failures", 0) > 0 ] error_requests = sorted( error_requests, key=lambda x: x[1].get("failures", 0) / x[1].get("count", 1), reverse=True )[:5] for name, data in error_requests: error_rate = (data.get("failures", 0) / data.get("count", 1)) * 100 if error_rate > self.thresholds["error_rate"]["acceptable"]: bottlenecks.append({ "type": "high_error_rate", "name": name, "error_rate": round(error_rate, 2), "severity": "high" if error_rate > self.thresholds["error_rate"]["poor"] else "medium", "description": f"Request '{name}' has high error rate: {error_rate}%" }) return bottlenecks def _generate_recommendations(self, analysis: Dict[str, Any]) -> List[str]: """生成优化建议""" recommendations = [] # 响应时间建议 response_time = analysis.get("response_time", {}) if response_time.get("score", 0) < 60: recommendations.append( "⚠️ 响应时间较慢,建议:\n" " • 优化数据库查询,添加索引\n" " • 启用缓存机制(Redis/Memcached)\n" " • 使用 CDN 加速静态资源\n" " • 优化代码逻辑,减少不必要的计算" ) # 错误率建议 error_rate = analysis.get("error_rate", {}) if error_rate.get("score", 0) < 60: recommendations.append( "⚠️ 错误率较高,建议:\n" " • 检查应用日志,定位错误原因\n" " • 增加错误处理和重试机制\n" " • 优化资源配置(CPU/内存)\n" " • 检查第三方服务依赖" ) # 吞吐量建议 throughput = analysis.get("throughput", {}) if throughput.get("score", 0) < 60: recommendations.append( "⚠️ 吞吐量较低,建议:\n" " • 增加服务器实例(水平扩展)\n" " • 优化应用性能(垂直扩展)\n" " • 使用负载均衡\n" " • 启用异步处理" ) # 瓶颈建议 bottlenecks = analysis.get("bottlenecks", []) if bottlenecks: slow_requests = [b for b in bottlenecks if b["type"] == "slow_request"] if slow_requests: recommendations.append( f"⚠️ 发现 {len(slow_requests)} 个慢请求,建议优先优化:\n" + "\n".join([f" • {b['name']} ({b['average_response_time']}ms)" for b in slow_requests[:3]]) ) error_requests = [b for b in bottlenecks if b["type"] == "high_error_rate"] if error_requests: recommendations.append( f"⚠️ 发现 {len(error_requests)} 个高错误率请求,建议优先修复:\n" + "\n".join([f" • {b['name']} ({b['error_rate']}%)" for b in error_requests[:3]]) ) # 如果性能良好,给出肯定 if not recommendations: recommendations.append( "✅ 性能表现良好!继续保持:\n" " • 定期进行性能测试\n" " • 监控生产环境指标\n" " • 持续优化代码质量" ) return recommendations def _calculate_overall_score(self, analysis: Dict[str, Any]) -> int: """计算总体评分""" scores = [] if "response_time" in analysis: scores.append(analysis["response_time"].get("score", 0)) if "error_rate" in analysis: scores.append(analysis["error_rate"].get("score", 0)) if "throughput" in analysis: scores.append(analysis["throughput"].get("score", 0)) if scores: return int(sum(scores) / len(scores)) return 0 def _get_performance_grade(self, score: int) -> str: """获取性能等级""" if score >= 90: return "A+" elif score >= 80: return "A" elif score >= 70: return "B+" elif score >= 60: return "B" elif score >= 50: return "C" else: return "D" async def generate_report( self, test_id: str, test_info: Dict[str, Any] ) -> Dict[str, Any]: """生成完整的性能测试报告""" config = test_info.get("config", {}) metrics = test_info.get("metrics", {}) analysis = test_info.get("analysis", {}) report = { "test_id": test_id, "test_name": config.get("name", "Unknown"), "test_type": config.get("test_type", "load"), "target_url": config.get("target_url", ""), "configuration": { "users": config.get("users", 0), "spawn_rate": config.get("spawn_rate", 0), "duration": config.get("duration", 0) }, "execution": { "started_at": config.get("started_at").isoformat() if config.get("started_at") else None, "completed_at": test_info.get("completed_at").isoformat() if test_info.get("completed_at") else None, "status": test_info.get("status", "unknown") }, "metrics": metrics, "analysis": analysis, "generated_at": datetime.now().isoformat() } return report

2.5.5 WebSocket 管理器backend/services/performance/src/websocket_manager.py

""" WebSocket 连接管理器 用于实时推送性能测试指标 """ import logging from typing import Dict, List from fastapi import WebSocket import json import asyncio logger = logging.getLogger(__name__) class WebSocketManager: """WebSocket 连接管理器""" def __init__(self): # 存储活动连接 {test_id: [websocket1, websocket2, ...]} self.active_connections: Dict[str, List[WebSocket]] = {} async def connect(self, websocket: WebSocket, test_id: str): """接受 WebSocket 连接""" await websocket.accept() if test_id not in self.active_connections: self.active_connections[test_id] = [] self.active_connections[test_id].append(websocket) logger.info(f"WebSocket connected for test {test_id}. Total connections: {len(self.active_connections[test_id])}") # 发送欢迎消息 await websocket.send_json({ "type": "connected", "test_id": test_id, "message": "Connected to performance test stream" }) def disconnect(self, websocket: WebSocket, test_id: str): """断开 WebSocket 连接""" if test_id in self.active_connections: if websocket in self.active_connections[test_id]: self.active_connections[test_id].remove(websocket) logger.info(f"WebSocket disconnected for test {test_id}. Remaining connections: {len(self.active_connections[test_id])}") # 如果没有连接了,删除键 if not self.active_connections[test_id]: del self.active_connections[test_id] async def broadcast(self, test_id: str, message: Dict): """广播消息到所有连接""" if test_id not in self.active_connections: return # 移除断开的连接 disconnected = [] for websocket in self.active_connections[test_id]: try: await websocket.send_json(message) except Exception as e: logger.error(f"Error sending message: {str(e)}") disconnected.append(websocket) # 清理断开的连接 for websocket in disconnected: self.disconnect(websocket, test_id) async def send_metrics_update( self, test_id: str, metrics: Dict ): """发送指标更新""" await self.broadcast(test_id, { "type": "metrics_update", "test_id": test_id, "metrics": metrics, "timestamp": asyncio.get_event_loop().time() })

后续请阅读:DREAMVFIA Test Master 自动化测试平台 - 完整开源项目完整数据代码包 ——(部分二)

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/1/11 17:40:20

14、Linux系统用户管理脚本全解析

Linux系统用户管理脚本全解析 在Linux系统管理中,用户管理是一项至关重要的任务。不同的操作系统在用户管理方面存在着微妙的不兼容性,这给系统管理员带来了不少困扰。为了更高效地进行用户管理,我们可以使用一系列自定义脚本,包括添加用户、暂停用户账户、删除用户账户、…

作者头像 李华
网站建设 2026/1/11 18:27:43

Langchain-Chatchat如何实现语义去重?

Langchain-Chatchat如何实现语义去重&#xff1f; 在企业知识库日益庞大的今天&#xff0c;一个常见的尴尬场景是&#xff1a;用户问“年假要提前几天申请&#xff1f;”&#xff0c;系统却返回两条几乎一模一样的答案——一条说“需提前3天提交OA系统”&#xff0c;另一条写着…

作者头像 李华
网站建设 2025/12/16 7:51:28

Excalidraw npm安装失败?最新镜像源解决依赖问题

Excalidraw npm安装失败&#xff1f;最新镜像源解决依赖问题 在搭建一个基于 Excalidraw 的原型设计工具时&#xff0c;你是否曾经历过这样的场景&#xff1a;刚初始化项目&#xff0c;执行 npm install excalidraw&#xff0c;结果卡在 30%&#xff0c;终端不断刷出 ETIMEDOU…

作者头像 李华
网站建设 2026/1/10 9:20:03

COCO 2017数据集下载终极指南:快速获取计算机视觉核心资源

COCO 2017数据集下载终极指南&#xff1a;快速获取计算机视觉核心资源 【免费下载链接】COCO2017数据集百度网盘链接 COCO 2017 数据集百度网盘链接本仓库提供COCO 2017数据集的百度网盘下载链接&#xff0c;方便国内用户快速获取数据集 项目地址: https://gitcode.com/Open-…

作者头像 李华
网站建设 2026/1/11 5:45:06

从阻塞到流式:Triton异步推理的性能革命

从阻塞到流式&#xff1a;Triton异步推理的性能革命 【免费下载链接】server The Triton Inference Server provides an optimized cloud and edge inferencing solution. 项目地址: https://gitcode.com/gh_mirrors/server/server 场景困境&#xff1a;当同步调用成为性…

作者头像 李华
网站建设 2026/1/7 19:36:49

VSCELicense 终极指南:轻松管理 Visual Studio 社区版许可证

VSCELicense 终极指南&#xff1a;轻松管理 Visual Studio 社区版许可证 【免费下载链接】VSCELicense PowerShell module to get and set Visual Studio Community Edition license expiration date in registry 项目地址: https://gitcode.com/gh_mirrors/vs/VSCELicense …

作者头像 李华