news 2026/5/11 5:01:19

Milvus 与传统数据库(MySQL、PostgreSQL、MongoDB)的协同应用实战

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
Milvus 与传统数据库(MySQL、PostgreSQL、MongoDB)的协同应用实战

1. 为什么要把 Milvus 和传统数据库“撮合”到一起?

你可能听说过 Milvus,这个在 AI 圈子里火得不行的向量数据库。它处理图片、文本、音频这些非结构化数据,搞个“以图搜图”或者“语义搜索”,那速度真是没得说。但如果你真把它往项目里一放,很快就会遇到一个现实问题:光有向量好像不太够用啊。

我打个比方,Milvus 就像是一个记忆力超群、但有点“偏科”的天才。你问它:“找出所有和这张猫图片最像的图片”,它能瞬间给你答案。但如果你接着问:“这些像的图片里,哪些是用户‘张三’上周上传的、并且打了‘宠物’标签的?” 这位天才可能就懵了。因为它只擅长记“向量”这种抽象特征,对于“谁”、“什么时候”、“什么标签”这些具体的、结构化的信息,它并不擅长管理。

这时候,就需要请出我们熟悉的老朋友了:MySQL、PostgreSQL 和 MongoDB。它们就像是严谨的档案管理员,特别擅长打理这些结构化的元数据。所以,把它们俩“撮合”到一起,就成了一个非常自然的想法。这不是简单的“1+1”,而是真正的优势互补。

在实际业务里,这种协同主要解决三大痛点:

第一,元数据管理。这是最核心的需求。你的商品图片向量存在 Milvus 里,但商品的标题、价格、库存、所属店铺、上架时间这些信息,放在 MySQL 或 PostgreSQL 里管理更合适、更可靠。你的文档内容向量化后塞进 Milvus,但文档的作者、创建日期、权限级别、分类目录,用 MongoDB 来存可能更灵活。

第二,实现复杂查询。单纯的向量相似度搜索,结果往往太“粗”了。业务上我们几乎总是需要叠加各种过滤条件。比如在电商推荐场景,你先用 Milvus 找到一批相似商品,但最终展示给用户时,很可能需要过滤掉“已下架的”、“用户所在地区不配送的”、“或者用户已经购买过的”商品。这些过滤条件,都需要通过查询传统数据库中的元数据来完成。

第三,保证数据的一致性。业务数据是活的,今天上架,明天可能就下架或者修改了价格。如果只在传统数据库里更新了下架状态,而 Milvus 里的向量还能被搜到,那就出问题了。所以,我们需要一套机制,确保当业务数据变更时,两边(向量数据和元数据)能尽可能地保持同步,至少不能出现严重的逻辑矛盾。

说白了,Milvus 负责解决“像不像”的问题,而传统数据库负责解决“是什么”、“谁干的”、“什么时候”以及“能不能用”的问题。两者结合,才能构建出一个既智能又实用的系统。

2. 与 MySQL 搭档:经典组合,稳字当头

MySQL 大家太熟了,稳如老狗的关系型数据库。当你的业务本身就已经建立在 MySQL 之上,现在想引入 AI 向量搜索能力时,Milvus + MySQL 的组合是最容易上手、也最让人放心的选择。这个组合特别适合那些业务逻辑清晰、数据结构规整的场景。

2.1 集成架构与数据流设计

怎么把它们俩连起来呢?核心思想是“ID 关联”。我们给每一条需要向量化的数据(比如一个商品、一篇文章)在 MySQL 里分配一个唯一的主键 ID。当这条数据被向量化后,我们将这个向量存入 Milvus,并且在存入时,明确地把这个 MySQL 中的主键 ID 作为向量数据的一个字段(比如叫entity_id)一起存进去

这样一来,数据流就清晰了:

  1. 写入流程:业务系统新增一条商品记录 -> 写入 MySQL,生成自增ID123-> 调用嵌入模型(如 BERT、CLIP)将商品描述或图片生成向量 -> 将向量连同entity_id=123一起插入 Milvus 集合。
  2. 查询流程:用户发起搜索(例如上传一张图片)-> 将图片转化为向量 -> 在 Milvus 中搜索最相似的 N 个向量 -> Milvus 返回结果,其中包含每个向量的entity_id(如123, 456, 789)和相似度分数 -> 业务后端用这些entity_id去 MySQL 执行一个SELECT ... WHERE id IN (123, 456, 789)的查询 -> 将 MySQL 返回的完整商品信息,与 Milvus 返回的相似度分数组合,排序后返回给前端。

这里有个关键细节:从 Milvus 查出来的 ID 列表,去 MySQL 做查询时,一定要用IN语句,并且最好给 ID 字段加上索引。否则,当返回结果很多时,频繁的单条查询会成为性能瓶颈。我踩过这个坑,一开始傻乎乎地用循环一条条查,接口延迟瞬间飙升,改成批量IN查询后,速度就正常了。

2.2 实战代码:构建一个简单的商品推荐接口

光说理论没感觉,我们写段实实在在的代码。假设我们有一个电商商品表products,我们已经把商品主图的向量存好了。现在要实现一个“找相似”的接口。

import pymysql from pymilvus import connections, Collection import numpy as np # 1. 初始化连接(这部分通常在应用启动时做一次) # 连接 MySQL mysql_conn = pymysql.connect( host="你的MySQL地址", user="用户名", password="密码", database="你的数据库", charset='utf8mb4', cursorclass=pymysql.cursors.DictCursor # 返回字典格式,方便处理 ) # 连接 Milvus connections.connect(alias="default", host='你的Milvus地址', port='19530') # 加载 Milvus 中的集合(相当于表) product_collection = Collection("product_image_vectors") product_collection.load() # 将集合加载到内存,搜索前必须做 def search_similar_products(image_vector, top_k=10, category_filter=None, price_max=None): """ 根据图片向量搜索相似商品,并支持过滤。 :param image_vector: 图片的向量,numpy数组,例如 np.array([0.1, 0.2, ...]) :param top_k: 返回最相似的商品数量 :param category_filter: 可选的类别过滤,如 'electronics' :param price_max: 可选的价格上限过滤 :return: 商品详细信息列表 """ search_params = {"metric_type": "L2", "params": {"nprobe": 10}} # L2距离,nprobe是搜索精度参数 # 2. 在 Milvus 中执行向量搜索 results = product_collection.search( data=[image_vector], anns_field="image_vector", # 你存储向量数据的字段名 param=search_params, limit=top_k * 2, # 多查一些,因为后面可能被过滤掉一部分 output_fields=["entity_id"] # 指定要返回的字段,这里只要ID ) # results 是一个列表的列表,因为我们只搜索了一个向量 milvus_ids = [hit.entity.get('entity_id') for hit in results[0]] if not milvus_ids: return [] # 3. 构建 MySQL 查询,获取完整商品信息 with mysql_conn.cursor() as cursor: # 基础SQL sql = "SELECT id, name, price, category, image_url, stock FROM products WHERE id IN %s" params = [milvus_ids] # 动态添加过滤条件 if category_filter: sql += " AND category = %s" params.append(category_filter) if price_max is not None: sql += " AND price <= %s" params.append(price_max) # 添加排序,保证结果顺序(可选,也可以根据分数在内存中排序) sql += " ORDER BY FIELD(id, %s)" % ','.join(['%s'] * len(milvus_ids)) params.extend(milvus_ids) cursor.execute(sql, tuple(params)) mysql_products = cursor.fetchall() # 4. 将 MySQL 数据与 Milvus 分数合并 # 创建一个从ID到Milvus搜索结果的映射,方便查找分数 id_to_score = {hit.entity.get('entity_id'): hit.score for hit in results[0]} for product in mysql_products: product['similarity_score'] = id_to_score.get(product['id']) # 可以按分数再排序一次 mysql_products.sort(key=lambda x: x['similarity_score']) return mysql_products[:top_k] # 返回最终TopK结果 # 模拟使用:假设我们有一个从某张图片提取好的向量 # dummy_vector = np.random.rand(512).astype(np.float32) # 假设向量维度是512 # similar_items = search_similar_products(dummy_vector, top_k=10, price_max=1000) # print(similar_items)

这段代码就是一个非常典型的协同查询模式。它先利用 Milvus 的向量索引快速缩小范围,再通过 MySQL 进行精确的元数据过滤和丰富,最后将结果整合返回。你可以根据业务需要,轻松地在 MySQL 查询部分添加更多过滤条件,比如库存状态、商家评分等等。

3. 与 PostgreSQL 联手:当向量遇上 JSON 和全文检索

如果你觉得 MySQL 在某些复杂场景下有点“力不从心”,比如需要存储更灵活的数据结构,或者想利用强大的全文检索功能,那么PostgreSQL就是更好的搭档。PostgreSQL 本身就是一个“瑞士军刀”式的数据库,它对 JSONB 数据的原生支持,以及pg_trgmtsvector等全文检索扩展,让它和 Milvus 的协作能玩出更多花样。

3.1 利用 JSONB 存储动态元数据

很多 AI 应用中的数据,其元数据字段可能是不固定的。比如,你处理学术论文,每篇论文的元数据(作者、机构、关键词、发表期刊、影响因子)都很规整,适合用关系表。但如果你处理的是用户生成的个性化内容,比如一个商品笔记,用户可能添加了自定义标签“适合送礼”、“颜值超高”,这些属性用固定的表字段来存就很麻烦。

这时,PostgreSQL 的JSONB类型就大显身手了。你可以在表中设一个attributes字段,类型为 JSONB,把这些动态的、稀疏的元数据全部塞进去。查询时,可以用->->>操作符高效地查询 JSONB 内的字段。

集成方式和 MySQL 类似,但数据模型更灵活:

  • PostgreSQL 表articles (id SERIAL PRIMARY KEY, title TEXT, content TEXT, attributes JSONB, created_at TIMESTAMP)
  • Milvus 集合:存储content字段的文本向量,并包含article_id字段与 PostgreSQL 关联。

当你需要做混合搜索时,威力就来了。比如:“搜索与‘神经网络优化’内容相似的论文,并且要求发表在‘ICLR’会议上,且影响因子大于 5”。这个查询可以拆解为:

  1. 在 Milvus 中用“神经网络优化”的向量找到一批相似论文的 ID。
  2. 在 PostgreSQL 中用这些 ID 和条件WHERE attributes->>'conference' = 'ICLR' AND (attributes->'impact_factor')::float > 5进行过滤。

3.2 结合全文检索实现“语义+关键词”双保险

这是 PostgreSQL + Milvus 组合的一个杀手级应用。单纯向量搜索(语义搜索)有时会“跑偏”,特别是当查询词非常具体或包含专有名词时。而传统的关键词全文检索(如LIKEtsvector)虽然不能理解语义,但匹配字面意思非常精准。

我们可以让它们俩同时工作,取长补短。具体做法是,在 PostgreSQL 中,为需要文本搜索的字段(如title,content)建立GIN索引。

import psycopg2 from pymilvus import Collection, connections # 连接 PostgreSQL pg_conn = psycopg2.connect( dbname="your_db", user="postgres", password="your_pwd", host="localhost" ) pg_conn.autocommit = True # 连接 Milvus connections.connect(host="localhost", port="19530") doc_collection = Collection("document_vectors") doc_collection.load() def hybrid_search(query_text, query_vector, full_text_weight=0.5, vector_weight=0.5, top_k=20): """ 混合搜索:结合全文检索分数和向量相似度分数。 这是一个简化示例,实际排名算法可能更复杂。 """ # 1. 全文检索:使用 PostgreSQL 的 plainto_tsquery 进行关键词匹配并打分 with pg_conn.cursor() as cursor: # 这里使用 ts_rank 给全文检索结果一个分数 cursor.execute(""" SELECT id, ts_rank_cd(text_search_vector, plainto_tsquery('english', %s)) as ft_score FROM documents WHERE text_search_vector @@ plainto_tsquery('english', %s) ORDER BY ft_score DESC LIMIT %s """, (query_text, query_text, top_k * 3)) # 多取一些候选 ft_results = cursor.fetchall() ft_id_to_score = {row[0]: row[1] for row in ft_results} candidate_ids = list(ft_id_to_score.keys()) if not candidate_ids: return [] # 2. 向量检索:在 Milvus 中搜索这些候选 ID 对应的向量 # 注意:这里需要先根据 candidate_ids 从 Milvus 中拿到它们的向量,再进行搜索。 # 更常见的做法是:直接用 query_vector 在 Milvus 中搜索出 top N*3 的结果,再与全文检索结果取交集或加权融合。 # 以下是另一种更实用的“后融合”思路: # 2a. 先在 Milvus 中做纯向量搜索,拿到一批结果和分数 vector_results = doc_collection.search( data=[query_vector], anns_field="embedding", param={"metric_type": "IP", "params": {"nprobe": 20}}, limit=top_k * 3, output_fields=["doc_id"] ) vector_id_to_score = {hit.entity.get('doc_id'): hit.score for hit in vector_results[0]} # 3. 融合排序:将两个来源的分数归一化后加权 all_candidate_ids = set(ft_id_to_score.keys()) | set(vector_id_to_score.keys()) combined_scores = [] for doc_id in all_candidate_ids: ft_score = ft_id_to_score.get(doc_id, 0) vec_score = vector_id_to_score.get(doc_id, 0) # 简单归一化(假设分数范围已知或可估算,这里仅为示例) # 实际中可能需要更精细的归一化处理,如 Min-Max Scaling combined = full_text_weight * ft_score + vector_weight * vec_score combined_scores.append((doc_id, combined)) # 按综合分排序 combined_scores.sort(key=lambda x: x[1], reverse=True) final_ids = [item[0] for item in combined_scores[:top_k]] # 4. 最后去 PostgreSQL 取出这些 ID 的完整信息 with pg_conn.cursor() as cursor: cursor.execute( "SELECT id, title, content, author FROM documents WHERE id = ANY(%s)", (final_ids,) ) final_docs = cursor.fetchall() # 按 final_ids 的顺序组织返回结果(这里省略了排序逻辑) return final_docs

这种“语义搜索打底,关键词搜索纠偏”的模式,在实际的智能搜索系统中非常有效。它能确保当用户搜索一个非常明确的产品型号或专业术语时,系统不会因为语义上的轻微偏差而漏掉最关键的结果。

4. 与 MongoDB 协作:拥抱非结构化数据的灵活性

当你的数据天生就是文档型的、结构变化多端,或者你已经在使用 MongoDB 作为主数据库时,Milvus + MongoDB的组合就显得非常顺理成章。MongoDB 的灵活模式(Schema-less)让它特别适合存储用户生成内容、日志、设备传感器数据等半结构化或非结构化数据。

4.1 文档与向量的天然映射

MongoDB 里的一条文档(Document),天然地对应一个需要被向量化的实体。比如,一个用户画像文档,里面包含了用户的各类行为标签、兴趣点(都是JSON字段);一篇新闻文章文档,包含了标题、正文、作者、图片链接等。我们可以很方便地将整个文档的某部分(如正文)或组合信息生成一个向量,存入 Milvus,并在 Milvus 的记录中保存该文档在 MongoDB 中的_id

这种模式下的数据同步,有时会更简单。因为 MongoDB 的变更流(Change Streams)功能非常强大,你可以监听集合的数据变化(插入、更新、删除),并近乎实时地触发向量化任务和 Milvus 的更新。

from pymongo import MongoClient from pymilvus import Collection, connections import threading import time # 连接 mongo_client = MongoClient("mongodb://localhost:27017/") db = mongo_client["content_db"] article_col = db["articles"] milvus_col = Collection("article_vectors") milvus_col.load() def listen_to_changes(): """监听 MongoDB 文章集合的变更,同步更新 Milvus""" pipeline = [{'$match': {'operationType': {'$in': ['insert', 'update', 'delete']}}}] with article_col.watch(pipeline) as stream: for change in stream: doc_id = change['documentKey']['_id'] if change['operationType'] == 'delete': # 从 Milvus 中删除对应向量(需根据 entity_id 删除) # 注意:Milvus 删除需要提供主键,这里假设我们存的时候用了 doc_id 的字符串形式 milvus_col.delete(expr=f'entity_id == "{str(doc_id)}"') print(f"Deleted vector for document {doc_id}") else: # 对于插入或更新,获取完整文档,重新生成向量并插入/更新 full_doc = article_col.find_one({"_id": doc_id}) if full_doc and 'content' in full_doc: # 假设有一个函数 generate_vector 能生成文本向量 new_vector = generate_vector(full_doc['content']) # 准备 Milvus 数据,upsert 操作(存在则更新,不存在则插入) data = [ [str(doc_id)], # 主键字段 [new_vector], # 向量字段 # ... 其他字段 ] # 这里需要根据你的集合schema来组织数据,并使用 upsert 接口 # milvus_col.upsert(data) print(f"Upserted vector for document {doc_id}") # 可以在后台线程中启动监听 # change_listener_thread = threading.Thread(target=listen_to_changes, daemon=True) # change_listener_thread.start()

4.2 处理海量日志与实时分析的场景

这是一个非常典型的应用。假设你有一个应用,每天产生海量的日志(用户操作日志、系统错误日志),这些日志以 JSON 文档形式存入 MongoDB。每条日志都有时间戳、级别、服务名、具体的消息文本等字段。

现在,你想做的不仅仅是关键词过滤,而是想进行“语义聚类”“异常模式发现”。比如,把过去一小时内所有描述“连接超时”但具体表述各不相同的错误日志自动归为一类;或者发现一种新的、从未见过的错误模式。

这时,Milvus 就能派上大用场:

  1. 流式处理:用一个实时流处理框架(如 Apache Flink, Spark Streaming)消费日志。
  2. 向量化:对每条日志的“消息文本”字段,用轻量级的句子嵌入模型(如all-MiniLM-L6-v2)实时生成向量。
  3. 写入 Milvus:将向量和日志的 MongoDB_id、时间戳一起写入 Milvus 的一个按时间分区的集合。
  4. 实时分析:每隔一段时间,对最近几分钟的日志向量进行聚类分析(比如用 Milvus 搜索每个向量的最近邻,基于密度聚类)。发现密集的簇,就代表出现了频繁的相似错误。运维人员可以直接拿到这个簇里几个代表性的日志_id,去 MongoDB 里查询完整的上下文信息,快速定位问题。

这种组合让 MongoDB 发挥了其海量数据写入和灵活存储的优势,而 Milvus 则赋予了这些非结构化数据以“可计算”、“可聚类”的智能分析能力。它不再是简单的存储和检索,而是能够主动发现数据中的模式和关联。

5. 协同架构下的关键考量与避坑指南

把两个(甚至多个)数据库组合起来用,听起来美好,但实际落地时会遇到不少挑战。根据我过去在几个项目中实践的经验,下面这些点你必须提前想清楚,不然很容易踩坑。

第一,数据一致性问题。这是头号挑战。理想情况是,业务数据在传统数据库中的任何增删改,都能原子性地同步到 Milvus。但这在分布式系统中很难完美实现。常见的策略有几种:

  • 最终一致性(最常用):通过消息队列(如 Kafka, RabbitMQ)解耦。业务系统更新主数据库后,发送一个事件到消息队列。一个独立的消费者服务监听这个队列,负责生成向量并更新 Milvus。这种方式允许短暂的不一致,但系统容错性好。你需要考虑消息重复消费(幂等性)和消息丢失的处理。
  • 双写(谨慎使用):在应用层事务中,同时写入传统数据库和 Milvus。这要求 Milvus 的写入接口支持某种程度的事务性,或者你自己实现补偿机制(比如先写传统DB,成功后再写Milvus,失败则重试或记录异常)。双写的风险在于,一个成功一个失败时,状态会不一致。
  • 定时同步:对于一致性要求不高的场景,可以定期扫描传统数据库中更新时间晚于某个戳的数据,批量同步到 Milvus。这种方式简单,但延迟高。

第二,ID 设计与管理。关联两个数据库的“纽带”就是 ID。这个 ID 的选择有讲究:

  • 使用业务主键:如果业务本身有天然的唯一标识(如用户邮箱、商品SKU),可以直接用。但要确保它不会变。
  • 使用自增ID/UUID:更推荐。在传统数据库中生成(如 MySQL 的AUTO_INCREMENT, PostgreSQL 的SERIAL, MongoDB 的ObjectId),然后将其作为 Milvus 向量记录的一部分。强烈建议在 Milvus 中为这个 ID 字段建立标量索引,这样当你需要根据 ID 删除或查询特定向量时,速度会非常快。

第三,查询性能优化。协同查询的链路变长了,性能瓶颈可能出现在任何一环。

  • Milvus 侧:合理设置索引类型(HNSW, IVF_FLAT等)和搜索参数(如nprobe)。nprobe越大,搜索越精确,但越慢。需要在精度和速度间做权衡。对于海量数据,考虑使用分区,将不同类别的数据分到不同分区,搜索时指定分区可以大幅提速。
  • 传统数据库侧:确保用于过滤的元数据字段(如category,status)有索引。用于和 Milvus 结果集关联的 ID 字段必须有索引。
  • 应用层:尽量减少网络往返。比如从 Milvus 拿到100个ID,应该用一次IN查询去传统数据库获取数据,而不是循环100次。可以考虑引入缓存(如 Redis),缓存那些频繁被查询的元数据,减轻传统数据库压力。

第四,版本管理与数据迁移。这是一个容易被忽略但极其重要的问题。你的嵌入模型(生成向量的AI模型)可能会升级。V1 模型生成的向量和 V2 模型生成的向量,可能不在同一个向量空间,直接比较没有意义。当你升级模型时,意味着需要为所有历史数据重新生成向量,并更新 Milvus。这通常是一个离线批处理任务。在设计之初,可以在 Milvus 的集合 Schema 中增加一个model_version字段。查询时,只使用相同model_version的向量。迁移时,可以用新模型生成向量存入新集合,逐步切换流量,实现平滑升级。

最后,监控与运维。你需要同时监控两个(或更多)数据库系统的健康度:Milvus 的查询延迟、QPS、内存/GPU 使用率;传统数据库的连接数、慢查询、CPU 负载。当搜索接口变慢时,要能快速定位是向量搜索慢了,还是元数据查询慢了,或者是网络延迟高了。建立清晰的监控仪表盘和告警规则,是保证这个协同系统稳定运行的基石。

把这些点都考虑周全,你的 Milvus 与传统数据库的协同系统才能真正在生产环境中扛住压力,稳定地发挥出“1+1>2”的威力。

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/5/11 5:01:11

串口数据可视化实战:让嵌入式调试效率提升10倍的工具指南

串口数据可视化实战&#xff1a;让嵌入式调试效率提升10倍的工具指南 【免费下载链接】serialplot Small and simple software for plotting data from serial port in realtime. 项目地址: https://gitcode.com/gh_mirrors/se/serialplot 你是否曾遇到过这样的困境&…

作者头像 李华
网站建设 2026/5/11 2:39:08

Moondream2使用测评:图片描述效果惊艳实测

Moondream2使用测评&#xff1a;图片描述效果惊艳实测 1. 引言&#xff1a;当电脑拥有"眼睛"是什么体验&#xff1f; 你有没有想过&#xff0c;给你的电脑装上一双"眼睛"&#xff0c;让它能够看懂图片、描述场景、甚至回答关于图像的各种问题&#xff1f…

作者头像 李华
网站建设 2026/4/19 1:23:33

chandra财务场景应用:发票与报销单自动识别系统

chandra财务场景应用&#xff1a;发票与报销单自动识别系统 1. 为什么财务人员需要chandra&#xff1f; 你有没有遇到过这样的情况&#xff1a;月底集中处理几十张发票和报销单&#xff0c;每张都要手动录入金额、日期、供应商名称、商品明细——光是核对一张扫描件就要花3分…

作者头像 李华
网站建设 2026/4/18 22:01:56

OFA模型部署实践:基于Kubernetes的集群部署

OFA模型部署实践&#xff1a;基于Kubernetes的集群部署 1. 引言 你是不是遇到过这样的情况&#xff1a;好不容易训练好的AI模型&#xff0c;一到实际部署就各种问题&#xff1f;单机跑起来还行&#xff0c;一旦用户量上来&#xff0c;要么响应慢&#xff0c;要么直接崩溃。特…

作者头像 李华