Java + Elasticsearch 全量 & 增量同步实战:打造高性能合同搜索系统
在企业合同管理系统中,我们常常遇到以下挑战:
合同量大,文本内容多,传统数据库查询慢
搜索需求多样:全文搜索、按签署人筛选、分页排序
历史合同也要可搜索,不仅仅是新建合同
统计报表需求:合同签署量、签署人分析等
本文将分享如何使用Elasticsearch + MySQL + ClickHouse构建一个高性能合同搜索系统,并提供完整 Java 示例。
一、系统架构概览
合同系统采用“三角架构”:
┌───────────────────────┐ │ 前端 / API │ │ (创建 / 修改 / 查询) │ └─────────┬─────────────┘ │ ▼ ┌───────────────────────┐ │ MySQL 数据库 │ │ (权威业务数据源) │ └─────────┬─────────────┘ │ ┌──────────────┴───────────────┐ │ │ 【历史数据全量初始化】 【增量同步 / 实时更新】 │ │ ▼ ▼ 分页 / 批量读取历史合同 新建合同 / 修改合同 / 删除合同 │ │ ▼ ▼ 转换为 ContractDoc 转换为 ContractDoc │ │ ▼ ▼ ES Bulk API ES Index / Update / Delete API │ │ └───────────┬──────────────────┘ ▼ ┌───────────────┐ │ Elasticsearch │ │ contract │ │ index │ └───────────────┘ │ ▼ ┌───────────────┐ │ 查询接口 │ │(合同列表 / 搜索)│ └───────────────┘核心说明:
MySQL:权威数据源,存储所有合同业务信息
ES:用于搜索,支持全文搜索、筛选和排序
ClickHouse:用于统计报表,处理大规模合同分析
二、为什么使用宽表
1. 什么是宽表?
宽表 = 把多张业务表的数据提前合并到一张字段很多的表里,用“空间换时间”,减少查询时的 join。
传统 MySQL 查询可能涉及多个 join,性能差:
SELECT c.*, u.name, e.enterprise_name FROM contract c JOIN user u ON c.user_id = u.id JOIN enterprise e ON c.enterprise_id = e.id WHERE c.status = 'SIGNED';宽表设计后,所有信息在一条记录中:
{ "contractId": 10001, "contractTitle": "劳动合同", "contractStatus": "SIGNED", "signTime": "2025-12-01 10:30:00", "initiatorId": 2001, "initiatorName": "张三", "initiatorPhone": "138****", "enterpriseId": 3001, "enterpriseName": "天津数字认证有限公司", "fileHash": "xxxx", "signType": "SILENT", "source": "OPEN_API" }查询无需 join,ES 或 ClickHouse 查询极快
冗余换来性能,是搜索系统的设计常态
三、ES 全量初始化历史数据
1. Java 代码示例(全量导入)
@Service public class ContractEsService { @Autowired private ContractMapper contractMapper; @Autowired private RestHighLevelClient esClient; /** * 全量初始化合同数据到 Elasticsearch */ public void initHistoricalContracts() throws IOException { int pageSize = 500; int page = 0; while (true) { List<Contract> contracts = contractMapper.selectHistorical(page * pageSize, pageSize); if (contracts.isEmpty()) break; BulkRequest bulkRequest = new BulkRequest(); for (Contract contract : contracts) { ContractDoc doc = toContractDoc(contract); bulkRequest.add(new IndexRequest("contract_index") .id(String.valueOf(doc.getContractId())) .source(doc.toMap())); } esClient.bulk(bulkRequest, RequestOptions.DEFAULT); page++; } } private ContractDoc toContractDoc(Contract contract) { ContractDoc doc = new ContractDoc(); doc.setContractId(contract.getId()); doc.setContractTitle(contract.getTitle()); doc.setContractStatus(contract.getStatus()); doc.setSignTime(contract.getSignTime()); doc.setInitiatorId(contract.getUserId()); doc.setInitiatorName(contract.getUserName()); doc.setInitiatorPhone(contract.getUserPhone()); doc.setEnterpriseId(contract.getEnterpriseId()); doc.setEnterpriseName(contract.getEnterpriseName()); doc.setFileHash(contract.getFileHash()); doc.setSignType(contract.getSignType()); doc.setSource(contract.getSource()); return doc; } }✅说明:
分批读取,避免内存爆炸
BulkRequest提高写入性能ContractDoc为宽表结构,支持全文搜索
四、增量同步
1. 新建合同
public void saveContract(Contract contract) throws IOException { contractMapper.insert(contract); // 写 MySQL ContractDoc doc = toContractDoc(contract); esClient.index(new IndexRequest("contract_index") .id(String.valueOf(doc.getContractId())) .source(doc.toMap()), RequestOptions.DEFAULT); // 写 ES }2. 更新合同
public void updateContract(Contract contract) throws IOException { contractMapper.update(contract); ContractDoc doc = toContractDoc(contract); esClient.update(new UpdateRequest("contract_index", String.valueOf(doc.getContractId())) .doc(doc.toMap()), RequestOptions.DEFAULT); }3. 删除合同
public void deleteContract(Long contractId) throws IOException { contractMapper.delete(contractId); esClient.delete(new DeleteRequest("contract_index", String.valueOf(contractId)), RequestOptions.DEFAULT); }五、查询示例
public List<ContractDoc> searchContracts(String keyword, String status) throws IOException { SearchRequest searchRequest = new SearchRequest("contract_index"); SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); BoolQueryBuilder boolQuery = QueryBuilders.boolQuery(); if (keyword != null) { boolQuery.must(QueryBuilders.multiMatchQuery(keyword, "contractTitle", "enterpriseName")); } if (status != null) { boolQuery.filter(QueryBuilders.termQuery("contractStatus", status)); } sourceBuilder.query(boolQuery).from(0).size(20); searchRequest.source(sourceBuilder); SearchResponse response = esClient.search(searchRequest, RequestOptions.DEFAULT); List<ContractDoc> results = new ArrayList<>(); for (SearchHit hit : response.getHits()) { results.add(ContractDoc.fromMap(hit.getSourceAsMap())); } return results; }支持全文搜索和条件过滤
支持分页
支持宽表字段查询(无需 join)
六、增量 & 历史数据同步策略总结
| 数据类型 | 处理方式 |
|---|---|
| 历史合同 | 全量初始化→ 批量写入 ES |
| 新建合同 | 实时写入 ES |
| 更新合同 | 实时更新 ES |
| 删除合同 | 实时删除 ES |
建议:
增量同步可结合消息队列 + CDC,保证最终一致性
历史数据初始化建议在低峰时执行,分批写入
七、总结
宽表 + ES:提高合同搜索性能,避免 join
全量初始化历史数据:ES 支持既往合同搜索
增量同步:保证新数据实时可查
三角架构(MySQL + ES + ClickHouse):各司其职
MySQL:权威数据
ES:快速搜索
ClickHouse:报表分析【聚合极快、适合统计数据量(亿级)报表】
通过这套设计,合同系统既能秒级响应搜索,又能提供高效报表分析,满足大规模企业业务需求。