news 2026/5/12 20:15:30

基于 mmh3 的 Elasticsearch 合并更新方案

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
基于 mmh3 的 Elasticsearch 合并更新方案

在数据同步、日志聚合或用户行为追踪等场景中,常常需要实现一种“幂等写入 + 智能合并”的逻辑:

  • 使用一个唯一标识(如 mmh3 哈希值)作为文档 ID;
  • 若文档不存在,则插入新文档;
  • 若文档已存在,则合并部分字段(如标签数组去重),同时保留某些字段的原始值(如创建时间)。

本文将详细介绍如何在 Elasticsearch 中通过 Painless 脚本和_update/_bulkAPI 实现这一需求。假设输入的 JSON 数据中已包含预计算好的doc_id(即 mmh3 哈希值),无需在 ES 内部计算。

这篇文章中

  • 目标:将外部传入的新数据与已有文档进行智能合并。
  • 数据来源:一部分来自 ES 中已存在的文档(ctx._source),另一部分来自客户端传入的新数据。
  • 必须通过params传入外部数据 → 所以 必须有params

1. 创建索引

首先定义索引结构,确保字段类型满足后续操作需求:

curl-uelastic:123456-XPUT"http://localhost:9200/user"-H'Content-Type: application/json'-d' { "mappings": { "properties": { "doc_id": { "type": "keyword" }, "name": { "type": "keyword" }, "age": { "type": "integer" }, "salary": { "type": "double" }, "tags": { "type": "keyword" }, "created_at": { "type": "date" } } } } '

说明:

  • doc_id用于存储 mmh3 值,类型为keyword
  • tags为关键词数组,便于精确匹配和去重;
  • created_at为日期类型,仅在首次插入时设置。

2. 单条文档的合并更新

使用_updateAPI 配合upsert和 Painless 脚本实现条件更新。

首次写入(文档不存在)

curl-uelastic:123456-XPOST"http://localhost:9200/user/_update/a1b2c3d4"\-H'Content-Type: application/json'-d' { "script": { "lang": "painless", "source": "Set mergedTags = new HashSet(); if (ctx._source.tags != null) { for (t in ctx._source.tags) mergedTags.add(t); } if (params.tags != null) { for (t in params.tags) mergedTags.add(t); } ctx._source.tags = new ArrayList(mergedTags); ctx._source.name = params.name; ctx._source.age = params.age; ctx._source.salary = params.salary; ctx._source.doc_id = params.doc_id;", "params": { "doc_id": "a1b2c3d4", "name": "Alice", "age": 25, "salary": 5000, "tags": ["engineer", "python"] } }, "upsert": { "doc_id": "a1b2c3d4", "name": "Alice", "age": 25, "salary": 5000, "tags": ["engineer", "python"], "created_at": "2026-02-27T10:00:00Z" } }'

由于文档a1b2c3d4不存在,ES 将使用upsert中的内容创建新文档。

后续更新(文档已存在)

curl-uelastic:123456-XPOST"http://localhost:9200/user/_update/a1b2c3d4"\-H'Content-Type: application/json'-d' { "script": { "lang": "painless", "source": "Set mergedTags = new HashSet(); if (ctx._source.tags != null) { for (t in ctx._source.tags) mergedTags.add(t); } if (params.tags != null) { for (t in params.tags) mergedTags.add(t); } ctx._source.tags = new ArrayList(mergedTags); ctx._source.name = params.name; ctx._source.age = params.age; ctx._source.salary = params.salary; ctx._source.doc_id = params.doc_id;", "params": { "doc_id": "a1b2c3d4", "name": "Alice", "age": 25, "salary": 5500, "tags": ["python", "elasticsearch"] } }, "upsert": { "doc_id": "a1b2c3d4", "name": "Alice", "age": 25, "salary": 5500, "tags": ["python", "elasticsearch"], "created_at": "2026-02-28T00:00:00Z" } }'

此时文档已存在,ES 执行脚本:

  • 将新旧tags合并并去重;
  • 更新salary等可变字段;
  • 不修改created_at(脚本中未赋值,故保留原值);
  • upsert内容被忽略。

验证结果

curl-sS-uelastic:123456"http://localhost:9200/user/_doc/a1b2c3d4"|jq.

输出应包含:

  • tags:["engineer", "python", "elasticsearch"]
  • salary:5500
  • created_at:"2026-02-27T10:00:00Z"(首次写入的时间)

3. 批量合并更新(_bulk API)

对于大量数据,应使用_bulkAPI 提升性能。

构造批量请求

curl-uelastic:123456-XPOST"http://localhost:9200/user/_bulk"\-H'Content-Type: application/json'-d' {"update":{"_id":"a1b2c3d4"}} {"script":{"lang":"painless","source":"Set mergedTags = new HashSet(); if (ctx._source.tags != null) { for (t in ctx._source.tags) mergedTags.add(t); } if (params.tags != null) { for (t in params.tags) mergedTags.add(t); } ctx._source.tags = new ArrayList(mergedTags); ctx._source.name = params.name; ctx._source.age = params.age; ctx._source.salary = params.salary; ctx._source.doc_id = params.doc_id;","params":{"doc_id":"a1b2c3d4","name":"Alice","age":25,"salary":5500,"tags":["python","elasticsearch"]}},"upsert":{"doc_id":"a1b2c3d4","name":"Alice","age":25,"salary":5500,"tags":["python","elasticsearch"],"created_at":"2026-02-28T00:00:00Z"}} {"update":{"_id":"e5f6g7h8"}} {"script":{"lang":"painless","source":"Set mergedTags = new HashSet(); if (ctx._source.tags != null) { for (t in ctx._source.tags) mergedTags.add(t); } if (params.tags != null) { for (t in params.tags) mergedTags.add(t); } ctx._source.tags = new ArrayList(mergedTags); ctx._source.name = params.name; ctx._source.age = params.age; ctx._source.salary = params.salary; ctx._source.doc_id = params.doc_id;","params":{"doc_id":"e5f6g7h8","name":"Bob","age":30,"salary":7000,"tags":["java","spring"]}},"upsert":{"doc_id":"e5f6g7h8","name":"Bob","age":30,"salary":7000,"tags":["java","spring"],"created_at":"2026-02-28T08:00:00Z"}} {"update":{"_id":"i9j0k1l2"}} {"script":{"lang":"painless","source":"Set mergedTags = new HashSet(); if (ctx._source.tags != null) { for (t in ctx._source.tags) mergedTags.add(t); } if (params.tags != null) { for (t in params.tags) mergedTags.add(t); } ctx._source.tags = new ArrayList(mergedTags); ctx._source.name = params.name; ctx._source.age = params.age; ctx._source.salary = params.salary; ctx._source.doc_id = params.doc_id;","params":{"doc_id":"i9j0k1l2","name":"Carol","age":28,"salary":6500,"tags":["go","docker"]}},"upsert":{"doc_id":"i9j0k1l2","name":"Carol","age":28,"salary":6500,"tags":["go","docker"],"created_at":"2026-02-28T09:00:00Z"}} '

格式要求:

  • 每两行为一组:第一行为 action(指定_id),第二行为 data(含scriptupsert);
  • 所有 JSON 必须为单行,无注释、无换行;
  • 整个 payload 以换行符结尾(推荐)。

验证批量结果

curl-sS-uelastic:123456"http://localhost:9200/user/_search"|jq.

预期:

  • 已存在文档(如a1b2c3d4):tags合并,created_at不变;
  • 新文档(如e5f6g7h8i9j0k1l2):按upsert内容完整插入。

4. 清理(可选)

curl-uelastic:123456-XDELETE"http://localhost:9200/user"

5. 总结

通过结合_update/_bulkAPI、upsert机制和 Painless 脚本,可以高效实现基于 mmh3 的合并更新逻辑。该方案具备以下优势:

  • 幂等性:相同doc_id多次写入结果一致;
  • 字段级控制:灵活定义哪些字段合并、哪些覆盖、哪些保留;
  • 高性能:批量操作适用于大规模数据同步。

此模式广泛适用于用户画像更新、设备状态同步、行为日志聚合等场景,是 Elasticsearch 高级写入模式的典型应用。

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/4/18 20:26:35

5个革命性特性重新定义启动器体验:PCL2开源项目深度解析

5个革命性特性重新定义启动器体验:PCL2开源项目深度解析 【免费下载链接】PCL 项目地址: https://gitcode.com/gh_mirrors/pc/PCL 打造专属启动体验:重新定义轻量级启动器定位 PCL2(Personal Configurable Lightweight Launcher 2&a…

作者头像 李华
网站建设 2026/4/18 20:26:21

墨语灵犀跨平台开发体验:在Windows与macOS上的部署对比

墨语灵犀跨平台开发体验:在Windows与macOS上的部署对比 最近在折腾一个叫“墨语灵犀”的AI开发环境,想看看它在不同电脑上跑起来到底怎么样。我手头正好有一台Windows 11的笔记本和一台搭载M1芯片的MacBook Pro,于是干脆把两边都装了一遍&am…

作者头像 李华
网站建设 2026/4/25 2:33:30

5分钟搞定Nanbeige4.1-3B:Chainlit前端交互,零代码体验大模型

5分钟搞定Nanbeige4.1-3B:Chainlit前端交互,零代码体验大模型 想体验最新的大语言模型,但又不想折腾复杂的命令行和代码?今天给大家介绍一个超级简单的方法:用Chainlit前端,零代码直接和Nanbeige4.1-3B模型…

作者头像 李华
网站建设 2026/4/18 20:26:32

深入解析CosyVoice与vLLM的协同优化:提升语音模型推理效率

最近在折腾语音相关的项目,发现一个挺普遍的问题:很多语音模型,比如TTS(文本转语音)或者ASR(语音识别),推理起来又慢又吃资源。尤其是在需要实时交互或者批量处理的场景下&#xff0…

作者头像 李华
网站建设 2026/4/18 20:26:34

3步告别乱码烦恼!GBKtoUTF-8编码转换工具让跨平台协作更顺畅

3步告别乱码烦恼!GBKtoUTF-8编码转换工具让跨平台协作更顺畅 【免费下载链接】GBKtoUTF-8 To transcode text files from GBK to UTF-8 项目地址: https://gitcode.com/gh_mirrors/gb/GBKtoUTF-8 当你从Windows系统发送文本文件给Mac同事时,是否遇…

作者头像 李华
网站建设 2026/4/18 20:26:30

办公自动化利器:QAnything PDF解析模型应用案例

办公自动化利器:QAnything PDF解析模型应用案例 1. 引言:PDF文档处理的痛点与解决方案 在日常办公中,PDF文档处理是一个常见但令人头疼的问题。无论是合同文件、技术文档还是财务报表,PDF格式的文档往往包含复杂的表格、图片和排…

作者头像 李华