Gatling中进行数据库性能关联测试将JDBC协议和HTTP协议集成在同一场景中,建立前端用户压力和后端数据库负载之间的因果关系,精准定位从应用层到数据层的性能瓶颈。
Gatling JDBC协议集成
Gatling JDBC模块允许您直接对数据库进行负载测试,和HTTP请求混合编排,模拟真实业务中应用服务器和数据库的交互。
1. 基础环境配置
在build.sbt或pom.xml中必须显式引入JDBC依赖:
// build.sbt libraryDependencies += "io.gatling" % "gatling-jdbc" % "3.9.5"在Scala测试类中导入:
import io.gatling.jdbc.Predef._ import java.sql.PreparedStatement2. JDBC连接池配置
连接池配置直接决定测试的真实性和数据库压力模式:
val jdbcConfig = jdbc .url("jdbc:mysql://${DB_HOST}:3306/${DB_NAME}") .username("${DB_USER}") .password("${DB_PASSWORD}") .driver("com.mysql.cj.jdbc.Driver") // 连接池核心配置(建议和生产环境对齐) .maximumPoolSize(50) // 最大连接数 .minimumIdle(10) // 最小空闲连接 .connectionTimeout(30000) // 连接超时(ms) .idleTimeout(600000) // 空闲连接超时 .maxLifetime(1800000) // 连接最大生命周期 .validationQuery("SELECT 1") // 连接健康检查SQLSQL执行时间监控的实现方式
1. 基础SQL执行和计时
Gatling会自动记录每条SQL的执行时间,但需要写好检查点:
val scn = scenario("数据库性能测试") .exec( jdbc("查询用户订单") .select("SELECT * FROM orders WHERE user_id = ?") .params("${userId}") // 参数化查询 .check( // 检查执行结果 jdbcResponseTime.mean.lt(50), // 平均响应时间<50ms jdbcResponseTime.max.lt(200), // 最大响应时间<200ms jdbcResponseTime.percentile4.lt(100), // P95<100ms // 验证数据正确性 jdbcColumn("order_id").count.gt(0) ) )2. 监控SQL执行
要实现更详细的监控,需要捕获执行计划:
.exec( jdbc("复杂查询剖析") .select(""" EXPLAIN ANALYZE SELECT o.*, u.username FROM orders o JOIN users u ON o.user_id = u.id WHERE o.created_at > ? ORDER BY o.total_amount DESC LIMIT 100 """) .params("2024-01-01") .check( jdbcColumn("Execution Time").findAll.saveAs("execution_plan") ) ) .exec { session => // 解析执行计划中的主要指标 val plan = session("execution_plan").as[Seq[String]] val planningTime = extractTime(plan, "Planning Time") val executionTime = extractTime(plan, "Execution Time") println(s"SQL剖析结果 - 规划时间: ${planningTime}ms, 执行时间: ${executionTime}ms") session }数据库和HTTP请求的关联测试
1. 混合场景设计模拟完整事务
val mixedScenario = scenario("完整事务流程") .feed(userIds.feeder) .exec( http("用户登录API") .post("/api/login") .body(StringBody("""{"username":"${username}"}""")) .check(jsonPath("$.userId").saveAs("loggedInUserId")) ) .pause(1.second) // 关键:验证API调用是否触发预期数据库操作 .exec( jdbc("验证登录记录") .select("SELECT COUNT(*) as cnt FROM login_log WHERE user_id = ?") .params("${loggedInUserId}") .check(jdbcColumn("cnt").is(1)) // 断言恰好一条记录 ) .exec( http("提交订单API") .post("/api/order") .body(ElFileBody("templates/order.json")) .check(jsonPath("$.orderId").saveAs("apiOrderId")) ) // 验证订单是否持久化且数据一致 .exec( jdbc("验证订单数据一致性") .select(""" SELECT o.status, o.total_amount, oi.item_count FROM orders o JOIN order_items oi ON o.id = oi.order_id WHERE o.id = ? """) .params("${apiOrderId}") .check( jdbcColumn("status").is("PENDING"), jdbcColumn("total_amount").notNull ) )2. 竞态条件和并发问题测试
val concurrencyScenario = scenario("库存扣减并发测试") .feed(productIds.feeder) .exec( jdbc("读取初始库存") .select("SELECT stock FROM products WHERE id = ?") .params("${productId}") .check(jdbcColumn("stock").saveAs("initialStock")) ) // 模拟100个并发用户同时扣减库存 .exec( jdbc("并发扣减库存") .update(""" UPDATE products SET stock = stock - 1 WHERE id = ? AND stock > 0 """) .params("${productId}") .check(jdbcRowsUpdated.is(1)) // 断言只影响一行 ) .exec( jdbc("验证最终库存") .select("SELECT stock FROM products WHERE id = ?") .params("${productId}") .check(jdbcColumn("stock").is(session => { val initial = session("initialStock").as[Int] initial - 1 // 应正好减少1 })) )文章来源:卓码软件测评
精彩推荐:点击蓝字即可
▲软件负载测试▲API自动化测试▲软件测试▲第三方软件测试▲软件性能测试▲软件测试机构
监控诊断高级配置
1. 慢SQL捕获和告警
val slowSqlThreshold = 100 // 定义慢SQL阈值(ms) val scn = scenario("慢SQL监控") .exec( jdbc("潜在慢查询") .select(""" SELECT * FROM large_table WHERE created_at BETWEEN ? AND ? ORDER BY complex_calculation(column) """) .params("${startDate}", "${endDate}") .check( jdbcResponseTime.max.saveAs("sqlDuration") ) ) .doIf(session => session("sqlDuration").as[Int] > slowSqlThreshold) { exec(session => { val duration = session("sqlDuration").as[Int] // 触发告警逻辑 println(s"慢SQL告警: 执行时间 ${duration}ms") // 可集成到外部监控系统 sendToMonitoringSystem(session) session }) }2. 连接池性能监控
.exec(session => { // 定期采样连接池状态 val jdbcStats = jdbcConfig.connectionPool.getStats println(s""" |连接池状态报告: |活跃连接: ${jdbcStats.getActiveConnections} |空闲连接: ${jdbcStats.getIdleConnections} |等待线程: ${jdbcStats.getThreadsAwaitingConnection} |连接获取平均等待时间: ${jdbcStats.getConnectionTimeout} """.stripMargin) // 连接泄漏检测 if (jdbcStats.getActiveConnections > jdbcStats.getMaxConnections * 0.8) { println("连接池接近饱和,可能存在连接泄漏") } session })3. 和APM工具集成
通过自定义检查点将数据发送到New Relic、Datadog等APM:
.check( jdbcResponseTime.max.transform(duration => { // 发送自定义指标到APM val tags = Map( "sql_operation" -> "select_user_orders", "test_scenario" -> "checkout_flow" ) apmClient.sendMetric("database.query.duration", duration, tags) duration }) )数据库压力测试示例
class EcommerceDbTest extends Simulation { val jdbcConfig = jdbc .url("jdbc:mysql://localhost:3306/ecommerce") .username("perf_test") .password("test123") .maximumPoolSize(30) val httpProtocol = http.baseUrl("http://localhost:8080") val dbFeeder = csv("data/product_ids.csv").circular val orderCheckoutScenario = scenario("数据库密集型下单流程") .feed(dbFeeder) .exec( http("浏览商品") .get("/api/products/${productId}") .check(jsonPath("$.price").saveAs("productPrice")) ) .exec( jdbc("获取实时库存") .select(""" SELECT available_stock, warehouse_id FROM inventory WHERE product_id = ? FOR UPDATE NOWAIT """) .params("${productId}") .check( jdbcColumn("available_stock").gt(0), jdbcColumn("warehouse_id").saveAs("warehouseId") ) ) .tryMax(3) { // 库存更新重试机制 exec( jdbc("原子性扣减库存") .update(""" UPDATE inventory SET available_stock = available_stock - 1, locked_stock = locked_stock + 1 WHERE product_id = ? AND available_stock > 0 RETURNING updated_rows """) .params("${productId}") .check(jdbcRowsUpdated.is(1)) ) } .exec( http("确认下单") .post("/api/orders") .body(StringBody( """{"productId":${productId},"quantity":1}""" )) .check(status.is(201)) ) .exec( jdbc("事务完整性验证") .select(""" SELECT (SELECT available_stock FROM inventory WHERE product_id = ?) as final_stock, (SELECT COUNT(*) FROM orders WHERE product_id = ?) as order_count FROM dual """) .params("${productId}", "${productId}") .check( jdbcColumn("final_stock").transform(_ == session("initialStock").as[Int] - 1), jdbcColumn("order_count").is(1) ) ) // 注入策略:重点测试数据库并发处理能力 setUp( orderCheckoutScenario.inject( rampUsersPerSec(10).to(100).during(5.minutes) ) ).protocols(jdbcConfig, httpProtocol) .assertions( global.jdbcResponseTime.percentile4.lt(150), // P95<150ms jdbcAllRequests.percentile4.lt(200), // 所有SQL的P95<200ms jdbcFailedRequests.percent.is(0) // 数据库零失败 ) }指标诊断
数据库指标:
SQL响应时间分布:P50/P95/P99值,识别长尾效应
连接池使用率:活跃连接数/最大连接数,>80%需告警
事务成功率:提交和回滚比例
锁等待时间:特别是FOR UPDATE查询
关联分析:
比较HTTP P95响应时间和对应SQL的P95执行时间
识别N+1查询问题:单个HTTP请求触发的SQL数量
验证数据库操作是否和业务事务边界一致
这种测试方法能精确暴露连接池配置不当、缺失索引、事务隔离问题、锁竞争等数据库层瓶颈,实现真正的全链路性能可见。