一、文档写入完整流程:路由、分片写入、副本同步
1.1 文档路由机制
当用户向ES发送一个文档写入请求时,ES首先需要确定这个文档应该存储在哪个主键片上。这个过程就是路由(Routing)。
1.1.1 路由算法
ES使用以下公式计算文档应该路由到哪个主分片:
shard = hash(_routing) % number_of_primary_shards默认路由:如果未指定_routing字段,ES使用文档的_id作为路由值。
// 示例:计算文档应该路由到哪个分片// 假设有3个主分片,文档ID为 "user_123"// 1. 获取文档ID的hash值String _id="user_123";int hash=_id.hashCode();// 假设hash值为 -123456789// 2. 计算分片号int numberOfPrimaryShards=3;int shard=Math.abs(hash%numberOfPrimaryShards);// shard = 1// 结论:文档 "user_123" 会被路由到分片11.1.2 自定义路由
ES允许自定义路由字段,适用于需要将相关联的文档存储在同一分片的场景。
// 示例1:使用自定义路由写入文档PUT/orders/_doc/1001?routing=user_123{"order_id":1001,"user_id":"user_123","product":"MacBook Pro","amount":19999}// 示例2:查询时也需要指定路由GET/orders/_doc/1001?routing=user_123// 示例3:使用路由进行查询,提高查询效率GET/orders/_search?routing=user_123{"query":{"match":{"user_id":"user_123"}}}自定义路由的优势:
- ✅提高查询效率:相关文档在同一分片,避免跨分片查询
- ✅数据局部性:相关联的数据物理上存储在一起
- ⚠️风险:如果路由值分布不均匀,可能导致数据倾斜
1.1.3 路由最佳实践
// 最佳实践1:使用业务ID作为路由PUT/orders/_doc/1001?routing=user_123{"user_id":"user_123","order_id":1001}// 最佳实践2:避免热点问题// ❌ 错误:使用常量作为路由,所有数据都会到一个分片PUT/logs/_doc/1?routing=constant// ✅ 正确:使用时间+用户ID组合PUT/logs/_doc/1?routing=2024-01-01_user123// 最佳实践3:监控路由分布GET/_cat/shards/orders?v// 观察各分片的数据量是否均衡1.2 主分片写入流程
文档经过路由计算后,会被发送到对应的主分片进行写入。主分片的写入流程是一个严格的有序过程。
1.2.1 写入流程详解
┌─────────────────────────────────────────────────────────────┐ │ 文档写入完整流程 │ ├─────────────────────────────────────────────────────────────┤ │ 1. 客户端发送写入请求 │ │ ↓ │ │ 2. 协调节点接收请求 │ │ ↓ │ │ 3. 根据路由算法计算目标主分片 │ │ ↓ │ │ 4. 转发请求到主分片所在的节点 │ │ ↓ │ │ 5. 主分片节点执行写入 │ │ ├─ 5.1 检查索引是否存在 │ │ ├─ 5.2 检查Mapping是否匹配 │ │ ├─ 5.3 生成文档的Version(乐观锁) │ │ ├─ 5.4 将文档写入Index Buffer(内存缓冲区) │ │ ├─ 5.5 将操作记录到Translog(预写日志) │ │ └─ 5.6 返回写入成功响应 │ │ ↓ │ │ 6. 并行发送写入请求到所有副本分片 │ │ ↓ │ │ 7. 等待副本分片确认(根据consistency级别) │ │ ↓ │ │ 8. 向协调节点返回最终结果 │ │ ↓ │ │ 9. 协调节点向客户端返回响应 │ └─────────────────────────────────────────────────────────────┘1.2.2 主分片写入代码示例
// ES源码简化版:主分片写入核心逻辑publicclassPrimaryShardWriteOperation{publicWriteResultexecuteWriteRequest(IndexRequestrequest){// 1. 验证请求validateRequest(request);// 2. 获取或创建文档版本longversion=getOrCreateVersion(request);// 3. 检查是否违反乐观锁if(request.getVersion()!=version){thrownewVersionConflictEngineException(...);}// 4. 将文档写入Index Buffer(内存)indexBuffer.add(request.getId(),request.getSource());// 5. 写入Translog(预写日志,保证持久化)translog.add(newTranslog.Index(request.getId(),request.getSource(),version));// 6. 更新版本号version++;// 7. 返回写入结果returnnewWriteResult(true,version);}}1.2.3 写入缓冲机制
主分片写入时,文档并不是立即写入磁盘,而是先写入内存缓冲区(Index Buffer)。
// 查看Index Buffer配置GET/_cluster/settings?include_defaults=true{"defaults":{"indices.memory.index_buffer_size":"10%"// 默认占用JVM堆的10%}}// 动态调整Index Buffer大小PUT/_cluster/settings{"persistent":{"indices.memory.index_buffer_size":"20%"}}// Index Buffer的工作原理┌─────────────────────────────────────────┐ │ IndexBuffer(内存)│ ├─────────────────────────────────────────┤ │ ┌─────────┐ ┌─────────┐ ┌────────┐│ │ │ Doc1 │ │ Doc2 │ │ Doc3 ││ │ │ _id:1│ │ _id:2│ │ _id:3││ │ │...│ │...│ │...││ │ └─────────┘ └─────────┘ └────────┘│ │ │ │ 当满足以下条件之一时,数据会被刷新到磁盘:│ │ • 索引缓冲区满了 │ │ • 达到refresh_interval时间(默认1s) │ │ • 手动触发refresh │ └─────────────────────────────────────────┘1.3 副本同步机制
ES采用**推模式(Push Model)**进行副本同步:主分片写入成功后,主动将数据推送给副本分片。
1.3.1 副本同步流程
┌──────────────────────────────────────────────────────────────┐ │ 副本同步详细流程 │ ├──────────────────────────────────────────────────────────────┤ │ 主分片节点 │ │ ┌────────────────────────────────────┐ │ │ │ 1. 写入主分片成功 │ │ │ │ 2. 并行发送写入请求到所有副本分片 │ │ │ └───────────────┬────────────────────┘ │ │ ↓ │ │ ┌────────────────────────────────────┐ │ │ │ 3. 等待副本分片确认 │ │ │ │ • 根据consistency级别决定等待策略 │ │ │ └───────────────┬────────────────────┘ │ │ ↓ │ │ ┌────────────────────────────────────┐ │ │ │ 4. 所有副本确认后,返回成功响应 │ │ │ └────────────────────────────────────┘ │ │ │ │ 副本分片节点 │ │ ┌────────────────────────────────────┐ │ │ │ A. 接收主分片的写入请求 │ │ │ │ B. 执行写入操作 │ │ │ │ C. 写入Translog │ │ │ │ D. 返回确认给主分片 │ │ │ └────────────────────────────────────┘ │ └──────────────────────────────────────────────────────────────┘1.3.2 一致性级别配置
ES提供三种一致性级别,控制写入时需要等待多少个副本分片确认。
// 一致性级别配置// 1. consistency: "one" - 只要主分片写入成功就返回PUT/products/_doc/1?consistency=one{"name":"iPhone 15 Pro","price":8999}// 特点:写入速度快,但数据安全性低// 2. consistency: "quorum" (默认) - 等待多数分片写入成功PUT/products/_doc/2?consistency=quorum{"name":"MacBook Air","price":9999}// 计算quorum:int((primary + replicas) / 2) + 1// 示例:1个主分片 + 2个副本 = 3,quorum = (1+2)/2 + 1 = 2// 需要至少2个分片(主分片+1个副本)写入成功才返回// 3. consistency: "all" - 等待所有副本分片写入成功PUT/products/_doc/3?consistency=all{"name":"iPad Pro","price":6999}// 特点:数据安全性最高,但写入延迟大// 动态设置默认一致性级别PUT/products/_settings{"index.write.wait_for_active_shards":"2"}// 含义:写入时需要等待至少2个活跃分片(包括主分片)1.3.3 副本同步失败处理
// 场景:副本分片写入失败// 1. 主分片写入成功,但副本分片写入失败PUT/products/_doc/100?consistency=all{"name":"Test Product"}// 可能的结果:// ✅ 情况1:主分片写入成功,副本分片写入成功 → 返回成功// ❌ 情况2:主分片写入成功,但副本分片写入失败 → 返回失败// ⚠️ 情况3:主分片写入成功,部分副本成功 → 根据consistency决定// 2. ES的自动恢复机制// 当副本分片恢复后,主分片会自动同步数据// 查看分片状态GET/_cat/shards/products?v// 输出示例:// index shard prirep state docs store ip node// products 0 p STARTED 100 1mb 127.0.0.1 node-1 ← 主分片// products 0 r STARTED 100 1mb 127.0.0.1 node-2 ← 副本分片(正常)// products 0 r UNASSIGNED - - - - ← 副本分片(失败)// 3. 手动恢复UNASSIGNED的分片POST/_cluster/reroute{"commands":[{"allocate_empty_primary":{"index":"products","shard":0,"node":"node-3","accept_data_loss":true}}]}1.4 写入性能优化
1.4.1 批量写入
// 使用Bulk API进行批量写入,显著提升性能POST/_bulk{"index":{"_index":"products","_id":1}}{"name":"Product 1","price":100}{"index":{"_index":"products","_id":2}}{"name":"Product 2","price":200}{"index":{"_index":"products","_id":3}}{"name":"Product 3","price":300}// Java代码示例RestHighLevelClient client=newRestHighLevelClient(...);BulkRequest bulkRequest=newBulkRequest();bulkRequest.add(newIndexRequest("products").id("1").source(XContentType.JSON,"name","Product 1"));bulkRequest.add(newIndexRequest("products").id("2").source(XContentType.JSON,"name","Product 2"));bulkRequest.add(newIndexRequest("products").id("3").source(XContentType.JSON,"name","Product 3"));BulkResponse bulkResponse=client.bulk(bulkRequest,RequestOptions.DEFAULT);1.4.2 调整刷新策略
// 方法1:禁用自动刷新(适合大批量导入)PUT/products/_settings{"index":{"refresh_interval":"-1"}}// 方法2:大批量导入完成后,手动刷新POST/products/_refresh// 方法3:恢复自动刷新PUT/products/_settings{"index":{"refresh_interval":"1s"}}// 方法4:写入时指定刷新策略PUT/products/_doc/1?refresh=true{"name":"Product 1"}// refresh=true:立即刷新,文档可立即搜索(性能差)// refresh=false(默认):不立即刷新,等待自动刷新// refresh=wait_for:等待自动刷新后再返回(平衡方案)1.4.3 并行写入优化
// 使用多线程并行写入ExecutorServiceexecutor=Executors.newFixedThreadPool(10);for(inti=0;i<100000;i++){finalintdocId=i;executor.submit(()->{IndexRequestrequest=newIndexRequest("products").id(String.valueOf(docId)).source(XContentType.JSON,"name","Product "+docId);try{client.index(request,RequestOptions.DEFAULT);}catch(Exceptione){e.printStackTrace();}});}executor.shutdown();executor.awaitTermination(1,TimeUnit.HOURS);1.5 本章总结
核心要点回顾:
路由机制:
- 默认使用文档
_id进行路由 - 支持自定义路由,提高查询效率
- 避免数据倾斜和热点问题
- 默认使用文档
主分片写入流程:
- 写入Index Buffer(内存)
- 记录Translog(预写日志)
- 满足刷新条件时,数据才会写入磁盘
副本同步机制:
- 主分片主动推送数据到副本分片
- 支持三种一致性级别:
one、quorum、all - 副本失败后有自动恢复机制
性能优化:
- 使用Bulk API批量写入
- 大批量导入时禁用自动刷新
- 使用多线程并行写入
面试高频问题:
- ❓ ES的路由算法是什么?如何自定义路由?
- ❓ 主分片写入成功后,副本分片是如何同步的?
- ❓ consistency级别有哪些?如何选择?
- ❓ 为什么ES写入后不能立即搜索?如何解决?
下节预告:下一节将深入讲解检索执行流程,包括请求分发、分片查询、结果聚合返回的完整机制。