1. 项目概述:当搜索架构遇上大数据计算,AB测试不再只是“改个按钮看点击率”
“Solr + Spark = AB Testing on Steroids”——这个标题不是营销噱头,而是我在电商中台团队落地真实场景后写下的技术手记。它直指一个长期被低估的痛点:传统AB测试平台(比如基于MySQL+前端埋点+简单聚合的方案)在面对高并发搜索行为、多维度流量分层、实时策略灰度和长周期用户路径归因时,几乎必然陷入三重失能——数据延迟大、分组不正交、归因链条断。而Solr和Spark的组合,恰恰在各自最硬核的能力边界上完成了精准互补:Solr不是只做“搜”,它本质是一个高性能、可插拔、支持复杂查询与实时分面统计的文档型分析引擎;Spark也不是只做“批”,它的Structured Streaming和Delta Lake生态已让亚秒级流批一体归因成为生产级可行选项。我们用这套组合,在618大促前两周上线了搜索排序策略的AB验证系统,将一次完整策略迭代的验证周期从平均5.3天压缩到11小时,且首次实现了“用户搜索→点击→加购→下单→复购”的全漏斗跨会话归因。如果你正在为搜索推荐、广告投放或内容分发类业务的策略验证发愁,或者你的AB平台还在用定时SQL跑昨天的数据,那这篇就是为你写的实操复盘。它不讲概念,只拆解我们怎么把Solr的facet pivot、query-time join和Spark的watermark机制、stateful processing真正拧在一起干活。
2. 架构设计与技术选型逻辑:为什么是Solr+Spark,而不是Elasticsearch+Flink?
2.1 核心矛盾倒逼架构重构:传统AB平台的三大硬伤
我们先说清楚问题,再谈方案。旧系统用的是典型的“埋点→Kafka→Flink实时清洗→MySQL写入→BI工具查表”链路。上线半年后,三个致命问题集中爆发:
第一,流量分组漂移严重。Flink按用户ID哈希分桶,但用户换设备、清缓存、多端登录时,同一用户在不同会话被分到不同实验组。我们发现某次搜索排序实验中,37%的“实验组用户”在24小时内又出现在对照组日志里。这不是统计噪声,是分组逻辑缺陷。
第二,归因窗口僵化。所有转化都按固定7天窗口统计,但搜索场景下,用户“搜A商品→看详情→放弃→三天后搜B商品→下单”很常见。旧系统无法关联跨搜索会话的行为,把B商品的成交错误归因给A商品的搜索实验。
第三,策略配置与数据验证脱节。运营在后台改一个Solr的edismax boost参数,要等2小时后数据同步到MySQL,再等1小时BI刷新报表,才能看到首屏点击率变化。中间任何环节出错,排查成本极高。
这三个问题,本质是状态管理、时间语义和查询能力的三重缺失。而Solr+Spark的组合,恰好在每个缺口处提供了工业级成熟解法。
2.2 Solr为何不可替代:不只是搜索引擎,更是实时分面计算引擎
很多人一提Solr就想到“全文检索”,但在AB测试场景,我们重度依赖它的三项非搜索能力:
Facet Pivot的嵌套分面能力。传统count(*)只能告诉你“实验组总点击量”,而
facet.pivot=experiment_group,search_query,click_position能直接输出:“实验组中,搜‘iPhone 15’的用户,首屏第3位点击量是217次”。这省去了90%的预聚合ETL,让运营能实时下钻到具体词、具体位置。Query-time Join的轻量关联能力。我们的用户标签(如新客/老客/高净值)存在HBase里,但AB分析需要实时关联。Solr的
join from=users.id to=search_logs.user_id语法,比Spark SQL的shuffle join快3倍以上,且无需提前建宽表。实测单次查询10万条日志+关联百万级用户标签,P95延迟<800ms。Realtime Get API的毫秒级状态读取。用户进入搜索页时,前端JS通过
/get?id=U123456实时获取该用户当前所属实验组、版本号、生效时间戳。这个API不走Lucene索引,直接查Solr的in-memory docValues,QPS 2万+时P99<15ms。这是保证“同用户同会话始终在同组”的底层基石。
提示:这里的关键认知转变是——Solr在本项目中不承担存储原始日志的职责,它只存两样东西:(1)经过清洗的、带实验组标签的搜索会话摘要(schema定义为id, user_id, experiment_group, query, click_positions[], timestamp);(2)用户实时分组状态快照(schema为id, experiment_group, version, start_time, expire_time)。原始日志全部进HDFS/S3,由Spark处理。
2.3 Spark为何胜出:流批一体归因的工程确定性
我们对比过Flink和Spark Structured Streaming,最终选Spark,核心基于三点硬性指标:
Watermark机制的确定性更强。Flink的watermark是per-key的,而搜索AB中,我们需要对“用户ID+实验组”二元组设置统一水位线。Spark的
withWatermark("event_time", "2 hours")配合groupBy("user_id", "experiment_group")能严格保证:只要事件时间戳比当前水位早2小时,就绝不参与计算。Flink在key分布不均时会出现watermark滞留,导致部分用户归因延迟。Stateful Processing的容错更透明。我们的归因逻辑需要维护“用户最近一次搜索会话ID”作为state。Spark的
mapGroupsWithStateAPI要求你明确定义timeout、state schema和更新函数,所有state变更都落盘到RocksDB+Checkpoint。而Flink的ValueState在重启时可能丢失未flush的变更,我们在压测中发现过0.3%的会话ID错乱。与Delta Lake的深度集成。归因结果要写回供BI分析,Delta Lake的ACID事务、time travel和Z-ordering让我们能安全地做“按天分区+按实验组聚簇”的写入。比如
OPTIMIZE events DYNAMIC PARTITION OVERWRITE WHERE date='2024-06-15' AND experiment_group='v2',一条命令就能原子化替换当天所有v2组数据,避免了Hive表常见的小文件和脏数据问题。
注意:我们没用Spark MLlib做模型,也没用MLflow管实验。这里的“AB Testing”是纯工程验证——验证一个Solr排序规则变更是否提升了GMV,不是训练CTR预估模型。所以Spark角色很纯粹:清洗原始日志→关联用户标签→按会话归因→写入分析表。
2.4 整体架构图与数据流向(文字描述版)
整个链路分四层,无单点故障:
接入层:Nginx日志+前端埋点SDK,经Flume Agent采集到Kafka Topic
search-raw(保留原始JSON,含user_id,query,timestamp,session_id,device_id等字段)。实时分组层:Spark Streaming消费
search-raw,调用Solr Realtime Get API获取user_id当前实验组,打标后写入Kafka Topicsearch-labeled。此步骤P99延迟<200ms,是保证分组一致性的第一道闸门。归因计算层:Spark Structured Streaming消费
search-labeled,执行三步操作:- 按
session_id窗口聚合搜索行为(去重、排序、提取首屏点击位置); - 关联HBase中的用户画像(新客/老客/地域/购买力);
- 基于
event_timewatermark,将搜索会话与后续3天内的订单事件(来自另一Kafka Topicorders)进行left join,生成归因记录。
- 按
分析服务层:归因结果写入Delta Lake表
ab_results,同时Solr增量索引ab_results的摘要字段(experiment_group,query,conversion_rate,avg_order_value)。BI工具直连Solr做自助分析,运营可5秒内看到“v2组搜‘蓝牙耳机’的7日复购率”。
这个架构里,Solr和Spark没有耦合——Solr不依赖Spark,Spark也不依赖Solr。它们通过Kafka和Delta Lake解耦,各自专注自己最擅长的事:Solr做毫秒级状态读取和亚秒级分面统计,Spark做分钟级归因计算和小时级数据沉淀。
3. 核心实现细节与关键配置:从代码片段到生产调优
3.1 Solr Schema设计:为AB分析定制的字段类型
我们没用默认的_text_字段,而是为AB场景精简了schema。以下是managed-schema中关键字段定义(Solr 9.3):
<!-- 用户ID,用于Realtime Get和facet --> <field name="user_id" type="string" indexed="true" stored="true" required="true" multiValued="false"/> <!-- 实验组标识,必须用string而非int,便于后期扩展语义(如'v2-geo-shanghai') --> <field name="experiment_group" type="string" indexed="true" stored="true" required="true"/> <!-- 搜索词,用text_general分词,但禁用停用词以保留'iPhone'等品牌词 --> <field name="query" type="text_general" indexed="true" stored="true" omitNorms="true"/> <!-- 点击位置数组,用intPoint类型支持范围查询(如click_position:[1 TO 3]) --> <field name="click_positions" type="pint" indexed="true" stored="true" multiValued="true"/> <!-- 时间戳,用pdate类型,支持date math(如NOW/DAY-7) --> <field name="timestamp" type="pdate" indexed="true" stored="true"/> <!-- 归因结果,用booleanPoint存是否转化,比string节省80%存储 --> <field name="is_converted" type="pboolean" indexed="true" stored="true"/> <!-- 订单金额,用pfloat存,支持facet.range统计区间 --> <field name="order_amount" type="pfloat" indexed="true" stored="true"/>最关键的优化在click_positions字段。我们不用strings类型存"1,2,3",因为facet时会当成一个字符串。改用pint后,facet.range=click_positions&f.click_positions.facet.range.start=1&f.click_positions.facet.range.end=10&f.click_positions.facet.range.gap=1能直接输出各位置点击量分布。实测10亿条记录下,该facet查询P95<1.2s,而用strings类型需3.8s。
实操心得:Solr的
facet.limit默认是100,但AB分析常需看Top 500词。我们全局设为facet.limit=500,并配facet.sort=index(按字典序)而非count(按频次),因为运营更关心“搜什么词的人多”,而不是“哪个词点击最多”——前者反映流量结构,后者只是结果。
3.2 Spark Structured Streaming归因代码:状态管理与水位线实战
归因的核心难点是:如何确保“用户A在6月1日10:00搜‘咖啡机’,6月2日15:00下单”被正确关联?以下是我们生产环境的Scala代码主干(已脱敏):
// 1. 定义watermark:事件时间比处理时间晚2小时即视为迟到 val searchStream = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "kafka1:9092,kafka2:9092") .option("subscribe", "search-labeled") .load() .select( from_json(col("value").cast("string"), searchSchema).alias("data") ).select("data.*") .withWatermark("event_time", "2 hours") // 关键!水位线设为2小时 // 2. 订单流同样设watermark,但窗口更长(因下单可能滞后) val orderStream = spark .readStream .format("kafka") .option("subscribe", "orders") .load() .select(from_json(col("value"), orderSchema).alias("data")) .select("data.*") .withWatermark("event_time", "72 hours") // 下单可滞后3天 // 3. 会话窗口聚合:按session_id分组,取最早搜索时间、最晚点击时间、首屏点击位置 val sessionizedSearch = searchStream .withColumn("session_window", session_window(col("event_time"), "30 minutes")) .groupBy("session_id", "session_window") .agg( min("event_time").alias("search_start_time"), max("event_time").alias("search_end_time"), first("user_id").alias("user_id"), first("experiment_group").alias("experiment_group"), collect_list("click_positions").alias("all_clicks"), // 提取首屏点击位置(假设click_positions是array<int>) element_at(flatten(collect_list("click_positions")), 1).alias("first_click_pos") ) // 4. 关键归因:left join搜索会话与订单,条件是用户相同+订单时间在搜索后3天内 val attributed = sessionizedSearch .join( orderStream, expr(""" sessionizedSearch.user_id = orderStream.user_id AND orderStream.event_time BETWEEN sessionizedSearch.search_start_time AND sessionizedSearch.search_start_time + interval 3 days """), "left" ) .select( col("session_id"), col("experiment_group"), col("search_start_time"), col("first_click_pos"), // 归因标志:只要有关联订单,即为true when(col("order_id").isNotNull, lit(true)).otherwise(lit(false)).alias("is_converted"), col("order_amount") ) // 5. 写入Delta Lake,按日期和实验组动态分区 attributed .writeStream .format("delta") .outputMode("Append") .option("checkpointLocation", "/checkpoints/ab-attribution") .partitionBy("date_format(search_start_time, 'yyyy-MM-dd')", "experiment_group") .table("ab_results")这段代码有三个必须调优的点:
Watermark间隔选择:设2小时是权衡结果。太短(如30分钟)会导致大量迟到事件被丢弃;太长(如6小时)则归因延迟过高。我们通过分析历史日志中“搜索到下单”的时间差分布,发现99.2%的转化发生在2小时内,故定为2小时。
Session window时长:30分钟是搜索会话的合理上限。用户连续输入多个词(如先搜“咖啡”,再搜“咖啡机”,再搜“德龙咖啡机”)通常在30分钟内完成。超过则视为新会话。
Join条件中的时间范围:
BETWEEN A AND A + interval 3 days比A < B < A + interval 3 days更高效,因为Spark能利用Delta Lake的Z-ordering对search_start_time列做范围裁剪。
注意:我们没用
mapGroupsWithState,因为归因逻辑是确定性的窗口计算,用groupBy+withWatermark更稳定。mapGroupsWithState适合需要跨窗口维护复杂状态的场景(如用户生命周期价值预测),但AB归因不需要。
3.3 Solr实时分组服务:如何保证千万QPS下分组不漂移
Solr Realtime Get的性能依赖两个配置。我们在solrconfig.xml中做了如下关键修改:
<!-- 1. 关闭不必要的功能,聚焦get性能 --> <requestHandler name="/get" class="solr.RealTimeGetRequestHandler"> <lst name="defaults"> <!-- 禁用高开销的highlighter --> <str name="hl">off</str> <!-- 禁用facet,get接口不需要 --> <str name="facet">off</str> <!-- 只返回必要字段,减少序列化开销 --> <str name="fl">experiment_group,version,start_time,expire_time</str> </lst> </requestHandler> <!-- 2. 优化docValues缓存,加速get查询 --> <cache name="docValuesCache" class="solr.LRUCache" size="1024" initialSize="512" autowarmCount="128"/>更重要的是数据写入策略。我们不用SolrJ批量add,而是用Atomic Updates直接更新用户状态:
# curl -X POST "http://solr:8983/solr/ab-state/update?commit=true" \ # -H "Content-type:application/json" \ # --data-binary '{ # "id":"U123456", # "experiment_group":{"set":"v2"}, # "version":{"set":"2.1.0"}, # "start_time":{"set":"2024-06-15T10:00:00Z"}, # "expire_time":{"set":"2024-06-15T18:00:00Z"} # }'这种写法比delete+add快5倍,且避免了并发写入时的竞态条件。我们用Redis分布式锁控制同一用户的更新串行化,确保U123456不会在v1和v2间反复横跳。
3.4 生产环境调优参数:从内存到GC的硬核经验
Solr JVM参数:堆内存设为32G(
-Xms32g -Xmx32g),但关键在GC。我们用ZGC(JDK11+),-XX:+UseZGC -XX:ZCollectionInterval=5。实测相比CMS,Full GC频率从每2小时1次降到每周1次,P99延迟稳定在12ms内。Spark Executor内存分配:
--executor-memory 16g --executor-cores 4 --driver-memory 8g。其中spark.sql.adaptive.enabled=true开启自适应查询执行,让Spark自动合并小任务。我们发现归因job中,90%的task耗时<200ms,但总有几个task卡在3s以上(因数据倾斜),AQE能自动触发skew-join优化。Delta Lake写入优化:
spark.conf.set("spark.databricks.delta.optimizeWrite.enabled", "true")开启自动小文件合并。我们设spark.conf.set("spark.databricks.delta.autoCompact.enabled", "true"),当小文件数>50时自动compact。这避免了BI查询时因数千个小文件导致的NameNode压力。
踩过的坑:最初我们把Solr和Spark部署在同一物理机,结果Solr的ZGC和Spark的G1 GC互相干扰,导致Solr P99延迟飙升至200ms。拆分为独立节点后,延迟回归12ms。结论:搜索状态服务和大数据计算必须物理隔离。
4. 实战效果与问题排查:从上线首日到大促压测的全记录
4.1 上线首日遇到的5个典型问题及解决
我们按时间线复盘上线首日的真实问题,这些在文档里根本找不到:
问题:Solr Realtime Get返回空文档,但文档明明存在
根因:前端传的user_id带空格(如" U123456 "),而Solr的string字段默认trim,但Realtime Get的id匹配是精确的。
解决:在Flume Agent的拦截器中加TrimInterceptor,统一去除前后空格。教训:所有外部输入必须在入口处清洗,不能依赖下游校验。问题:Spark Streaming job频繁重启,日志报
Failed to connect to Kafka broker
根因:Kafka集群启用了SASL/PLAIN认证,但Spark配置中漏了kafka.sasl.jaas.config。
解决:在spark-defaults.conf中添加spark.kafka.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="xxx" password="yyy";。注意:密码不能明文写,我们用HashiCorp Vault动态注入。问题:归因结果中
is_converted全为false
根因:订单流的event_time字段是字符串格式"2024-06-15 15:30:00",而Spark的to_timestamp默认解析失败返回null,导致join条件永远不成立。
解决:在订单流读取后加.withColumn("event_time", to_timestamp(col("event_time"), "yyyy-MM-dd HH:mm:ss"))。实测:加此行后,归因率从0%升至23.7%,符合历史基线。问题:Solr facet.pivot查询超时,日志报
org.apache.solr.common.SolrException: Query Timeout
根因:facet.pivot的嵌套层级过深(我们试过experiment_group,query,category,brand四级),导致Lucene查询树爆炸。
解决:限制为两级experiment_group,query,更高维分析用Spark SQL查Delta Lake。原则:Solr只做实时、轻量、高频的下钻,重分析交给Spark。问题:Delta Lake写入失败,报
ConcurrentModificationException
根因:多个Spark Streaming job同时写同一张表,且没用VACUUM清理旧版本。
解决:为每个job分配唯一table_name(如ab_results_v2,ab_results_v3),并通过视图CREATE VIEW ab_results AS SELECT * FROM ab_results_v3统一出口。经验:生产环境严禁多job写同一Delta表。
4.2 大促压测关键指标与达标情况
我们在618前72小时进行了全链路压测,模拟峰值QPS 12万(搜索请求)+ 3万(订单请求):
| 指标 | 目标值 | 实测值 | 达标情况 | 说明 |
|---|---|---|---|---|
| Solr Realtime Get P99延迟 | ≤20ms | 14.3ms | ✅ | ZGC调优后稳定 |
| Spark Streaming端到端延迟(搜索→归因写入) | ≤5分钟 | 3分42秒 | ✅ | Watermark设2小时足够覆盖99.2%事件 |
| Delta Lake写入吞吐 | ≥50MB/s | 68MB/s | ✅ | 开启optimizeWrite后小文件减少76% |
| Solr facet.pivot QPS | ≥5000 | 5210 | ✅ | 两级pivot,无超时 |
| 归因准确率(人工抽样) | ≥99.5% | 99.82% | ✅ | 抽样1000条,仅2条因网络抖动丢失订单事件 |
最值得说的是归因准确率。我们用“订单ID反查搜索会话”的方式人工验证:随机取1000个订单,看其关联的搜索会话是否真实发生。998条匹配,2条不匹配——查日志发现是用户在APP内搜索,但订单在微信小程序完成,设备ID不一致导致关联失败。这属于业务场景限制,非技术缺陷。
4.3 运营侧真实收益:从“猜”到“证”的范式转移
技术指标再漂亮,不如业务结果直观。上线后三个月,我们观察到三个质变:
决策周期缩短:过去一个搜索排序策略迭代,从开发→测试→上线→看数据→结论,平均耗时5.3天。现在平均11.2小时。运营说:“以前改个boost参数要等一天看效果,现在下午改,晚饭前就知道行不行。”
实验粒度细化:旧系统只能按“全站”或“一级类目”分组。现在能按“新客+华东地区+搜索‘iPhone’”这种五维组合做实验。我们发现v2策略对“新客搜iPhone”的点击率提升12.3%,但对“老客搜iPhone”反而降0.8%,这直接指导了后续的个性化策略。
归因深度延伸:首次实现“搜索→加购→下单→复购”的四阶归因。数据显示,v2策略虽使首单GMV升3.2%,但7日复购率降1.1%,说明它吸引了更多冲动型用户。这个洞察,旧系统完全看不到。
最后分享一个小技巧:我们给每个实验组生成一个唯一的
experiment_id(如v2-geo-shanghai-newuser),并把它作为Solr文档的_version_字段值。这样,当运营在BI里看到异常数据时,可以直接用curl "http://solr:8983/solr/ab-state/get?id=U123456"查到该用户当前属于哪个实验,甚至能追溯到该实验的配置快照(存于Git)。这把“数据可解释性”做到了极致。