在数据同步、日志聚合或用户行为追踪等场景中,常常需要实现一种“幂等写入 + 智能合并”的逻辑:
- 使用一个唯一标识(如 mmh3 哈希值)作为文档 ID;
- 若文档不存在,则插入新文档;
- 若文档已存在,则合并部分字段(如标签数组去重),同时保留某些字段的原始值(如创建时间)。
本文将详细介绍如何在 Elasticsearch 中通过 Painless 脚本和_update/_bulkAPI 实现这一需求。假设输入的 JSON 数据中已包含预计算好的doc_id(即 mmh3 哈希值),无需在 ES 内部计算。
这篇文章中
- 目标:将外部传入的新数据与已有文档进行智能合并。
- 数据来源:一部分来自 ES 中已存在的文档(
ctx._source),另一部分来自客户端传入的新数据。- 必须通过
params传入外部数据 → 所以 必须有params。
1. 创建索引
首先定义索引结构,确保字段类型满足后续操作需求:
curl-uelastic:123456-XPUT"http://localhost:9200/user"-H'Content-Type: application/json'-d' { "mappings": { "properties": { "doc_id": { "type": "keyword" }, "name": { "type": "keyword" }, "age": { "type": "integer" }, "salary": { "type": "double" }, "tags": { "type": "keyword" }, "created_at": { "type": "date" } } } } '说明:
doc_id用于存储 mmh3 值,类型为keyword;tags为关键词数组,便于精确匹配和去重;created_at为日期类型,仅在首次插入时设置。
2. 单条文档的合并更新
使用_updateAPI 配合upsert和 Painless 脚本实现条件更新。
首次写入(文档不存在)
curl-uelastic:123456-XPOST"http://localhost:9200/user/_update/a1b2c3d4"\-H'Content-Type: application/json'-d' { "script": { "lang": "painless", "source": "Set mergedTags = new HashSet(); if (ctx._source.tags != null) { for (t in ctx._source.tags) mergedTags.add(t); } if (params.tags != null) { for (t in params.tags) mergedTags.add(t); } ctx._source.tags = new ArrayList(mergedTags); ctx._source.name = params.name; ctx._source.age = params.age; ctx._source.salary = params.salary; ctx._source.doc_id = params.doc_id;", "params": { "doc_id": "a1b2c3d4", "name": "Alice", "age": 25, "salary": 5000, "tags": ["engineer", "python"] } }, "upsert": { "doc_id": "a1b2c3d4", "name": "Alice", "age": 25, "salary": 5000, "tags": ["engineer", "python"], "created_at": "2026-02-27T10:00:00Z" } }'由于文档a1b2c3d4不存在,ES 将使用upsert中的内容创建新文档。
后续更新(文档已存在)
curl-uelastic:123456-XPOST"http://localhost:9200/user/_update/a1b2c3d4"\-H'Content-Type: application/json'-d' { "script": { "lang": "painless", "source": "Set mergedTags = new HashSet(); if (ctx._source.tags != null) { for (t in ctx._source.tags) mergedTags.add(t); } if (params.tags != null) { for (t in params.tags) mergedTags.add(t); } ctx._source.tags = new ArrayList(mergedTags); ctx._source.name = params.name; ctx._source.age = params.age; ctx._source.salary = params.salary; ctx._source.doc_id = params.doc_id;", "params": { "doc_id": "a1b2c3d4", "name": "Alice", "age": 25, "salary": 5500, "tags": ["python", "elasticsearch"] } }, "upsert": { "doc_id": "a1b2c3d4", "name": "Alice", "age": 25, "salary": 5500, "tags": ["python", "elasticsearch"], "created_at": "2026-02-28T00:00:00Z" } }'此时文档已存在,ES 执行脚本:
- 将新旧
tags合并并去重; - 更新
salary等可变字段; - 不修改
created_at(脚本中未赋值,故保留原值); upsert内容被忽略。
验证结果
curl-sS-uelastic:123456"http://localhost:9200/user/_doc/a1b2c3d4"|jq.输出应包含:
tags:["engineer", "python", "elasticsearch"]salary:5500created_at:"2026-02-27T10:00:00Z"(首次写入的时间)
3. 批量合并更新(_bulk API)
对于大量数据,应使用_bulkAPI 提升性能。
构造批量请求
curl-uelastic:123456-XPOST"http://localhost:9200/user/_bulk"\-H'Content-Type: application/json'-d' {"update":{"_id":"a1b2c3d4"}} {"script":{"lang":"painless","source":"Set mergedTags = new HashSet(); if (ctx._source.tags != null) { for (t in ctx._source.tags) mergedTags.add(t); } if (params.tags != null) { for (t in params.tags) mergedTags.add(t); } ctx._source.tags = new ArrayList(mergedTags); ctx._source.name = params.name; ctx._source.age = params.age; ctx._source.salary = params.salary; ctx._source.doc_id = params.doc_id;","params":{"doc_id":"a1b2c3d4","name":"Alice","age":25,"salary":5500,"tags":["python","elasticsearch"]}},"upsert":{"doc_id":"a1b2c3d4","name":"Alice","age":25,"salary":5500,"tags":["python","elasticsearch"],"created_at":"2026-02-28T00:00:00Z"}} {"update":{"_id":"e5f6g7h8"}} {"script":{"lang":"painless","source":"Set mergedTags = new HashSet(); if (ctx._source.tags != null) { for (t in ctx._source.tags) mergedTags.add(t); } if (params.tags != null) { for (t in params.tags) mergedTags.add(t); } ctx._source.tags = new ArrayList(mergedTags); ctx._source.name = params.name; ctx._source.age = params.age; ctx._source.salary = params.salary; ctx._source.doc_id = params.doc_id;","params":{"doc_id":"e5f6g7h8","name":"Bob","age":30,"salary":7000,"tags":["java","spring"]}},"upsert":{"doc_id":"e5f6g7h8","name":"Bob","age":30,"salary":7000,"tags":["java","spring"],"created_at":"2026-02-28T08:00:00Z"}} {"update":{"_id":"i9j0k1l2"}} {"script":{"lang":"painless","source":"Set mergedTags = new HashSet(); if (ctx._source.tags != null) { for (t in ctx._source.tags) mergedTags.add(t); } if (params.tags != null) { for (t in params.tags) mergedTags.add(t); } ctx._source.tags = new ArrayList(mergedTags); ctx._source.name = params.name; ctx._source.age = params.age; ctx._source.salary = params.salary; ctx._source.doc_id = params.doc_id;","params":{"doc_id":"i9j0k1l2","name":"Carol","age":28,"salary":6500,"tags":["go","docker"]}},"upsert":{"doc_id":"i9j0k1l2","name":"Carol","age":28,"salary":6500,"tags":["go","docker"],"created_at":"2026-02-28T09:00:00Z"}} '格式要求:
- 每两行为一组:第一行为 action(指定
_id),第二行为 data(含script和upsert); - 所有 JSON 必须为单行,无注释、无换行;
- 整个 payload 以换行符结尾(推荐)。
验证批量结果
curl-sS-uelastic:123456"http://localhost:9200/user/_search"|jq.预期:
- 已存在文档(如
a1b2c3d4):tags合并,created_at不变; - 新文档(如
e5f6g7h8、i9j0k1l2):按upsert内容完整插入。
4. 清理(可选)
curl-uelastic:123456-XDELETE"http://localhost:9200/user"5. 总结
通过结合_update/_bulkAPI、upsert机制和 Painless 脚本,可以高效实现基于 mmh3 的合并更新逻辑。该方案具备以下优势:
- 幂等性:相同
doc_id多次写入结果一致; - 字段级控制:灵活定义哪些字段合并、哪些覆盖、哪些保留;
- 高性能:批量操作适用于大规模数据同步。
此模式广泛适用于用户画像更新、设备状态同步、行为日志聚合等场景,是 Elasticsearch 高级写入模式的典型应用。