news 2026/4/15 10:27:15

6.8 Elasticsearch-写插件:RestHandler、ActionPlugin、ClusterPlugin 全套模板

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
6.8 Elasticsearch-写插件:RestHandler、ActionPlugin、ClusterPlugin 全套模板

6.8 Elasticsearch-写插件:RestHandler、ActionPlugin、ClusterPlugin 全套模板

(基于 8.11 源码,可直接拷贝到org.example.es包下跑通)


0. 目标

给出一个“开箱即用”的 Maven 模块,一次性把下面三件事全部做完:

  1. 暴露自定义 REST 端点(RestHandler)。
  2. 注册 TransportAction,让协调节点→数据节点走内部 RPC(ActionPlugin)。
  3. 在集群状态里持久化自己的配置(ClusterPluginPersistentTasksExecutor)。

代码全部单文件即可编译,无额外依赖(除org.elasticsearch.plugin:elasticsearch8.11.0)。


1. 模块骨架
es-write-plugin ├── pom.xml └── src └── main ├── java │ └── org │ └── example │ └── es │ ├── WritePlugin.java │ ├── RestWriteAction.java │ ├── WriteTransportAction.java │ ├── WriteClusterService.java │ └── WritePersistentTaskExecutor.java └── resources └── META-INF └── plugin-descriptor.properties

pom.xml 关键片段

<properties><elasticsearch.version>8.11.0</elasticsearch.version></properties><dependencies><dependency><groupId>org.elasticsearch.plugin</groupId><artifactId>elasticsearch</artifactId><version>${elasticsearch.version}</version><scope>provided</scope></dependency></dependencies>

plugin-descriptor.properties

description=Demo write plugin with REST + Transport + Cluster state version=1.0.0 name=write-plugin classname=org.example.es.WritePlugin java.version=17 elasticsearch.version=8.11.0

2. 统一入口:WritePlugin.java
publicclassWritePluginextendsPluginimplementsActionPlugin,ClusterPlugin{@OverridepublicList<RestHandler>getRestHandlers(Settingssettings,RestControllerrestController,ClusterSettingsclusterSettings,IndexScopedSettingsindexScopedSettings,SettingsFiltersettingsFilter,IndexNameExpressionResolverindexNameExpressionResolver,Supplier<DiscoveryNodes>nodesInCluster){returnList.of(newRestWriteAction());}@OverridepublicList<ActionHandler<?extendsActionRequest,?extendsActionResponse>>getActions(){returnList.of(newActionHandler<>(WriteAction.INSTANCE,WriteTransportAction.class));}@OverridepublicList<PersistentTasksExecutor<?>>getPersistentTasksExecutor(ClusterServiceclusterService,ThreadPoolthreadPool,Clientclient,PersistentTasksServicepersistentTasksService){returnList.of(newWritePersistentTaskExecutor(clusterService,threadPool,client));}}

3. REST 层:RestWriteAction.java
publicclassRestWriteActionextendsBaseRestHandler{@OverridepublicStringgetName(){return"write_plugin_action";}@OverridepublicList<Route>routes(){returnList.of(newRoute(RestRequest.Method.POST,"/_write/{index}"),newRoute(RestRequest.Method.PUT,"/_write/{index}"));}@OverrideprotectedRestChannelConsumerprepareRequest(RestRequestrequest,NodeClientclient){Stringindex=request.param("index");Stringbody=request.content().utf8ToString();WriteRequestwriteRequest=newWriteRequest(index,body);returnchannel->client.execute(WriteAction.INSTANCE,writeRequest,newRestToXContentListener<>(channel));}}

4. 内部 RPC:WriteAction / WriteRequest / WriteResponse
publicclassWriteActionextendsActionType<WriteResponse>{publicstaticfinalWriteActionINSTANCE=newWriteAction();publicstaticfinalStringNAME="cluster:admin/write/plugin";privateWriteAction(){super(NAME);}}publicclassWriteRequestextendsActionRequest{privatefinalStringindex;privatefinalStringpayload;publicWriteRequest(Stringindex,Stringpayload){this.index=index;this.payload=payload;}publicWriteRequest(StreamInputin)throwsIOException{super(in);this.index=in.readString();this.payload=in.readString();}@OverridepublicvoidwriteTo(StreamOutputout)throwsIOException{super.writeTo(out);out.writeString(index);out.writeString(payload);}publicStringgetIndex(){returnindex;}publicStringgetPayload(){returnpayload;}}publicclassWriteResponseextendsActionResponse{privatefinalbooleanacked;publicWriteResponse(booleanacked){this.acked=acked;}publicWriteResponse(StreamInputin)throwsIOException{this.acked=in.readBoolean();}@OverridepublicvoidwriteTo(StreamOutputout)throwsIOException{out.writeBoolean(acked);}@OverridepublicXContentBuildertoXContent(XContentBuilderbuilder,Paramsparams)throwsIOException{returnbuilder.startObject().field("acked",acked).endObject();}}

5. Transport 层:WriteTransportAction.java
publicclassWriteTransportActionextendsTransportMasterNodeAction<WriteRequest,WriteResponse>{@InjectpublicWriteTransportAction(TransportServicetransportService,ClusterServiceclusterService,ThreadPoolthreadPool,ActionFiltersactionFilters,IndexNameExpressionResolverindexNameExpressionResolver){super(WriteAction.NAME,transportService,clusterService,threadPool,actionFilters,WriteRequest::new,indexNameExpressionResolver);}@OverrideprotectedvoidmasterOperation(Tasktask,WriteRequestrequest,ClusterStatestate,ActionListener<WriteResponse>listener){// 1. 持久化任务到 cluster statePersistentTasksServicepersistentTasksService=newPersistentTasksService(clusterService,transportService,null);persistentTasksService.sendStartRequest(UUIDs.base64UUID(),"write_task",newWriteTaskParams(request.getIndex(),request.getPayload()),ActionListener.wrap(r->listener.onResponse(newWriteResponse(true)),listener::onFailure));}@OverrideprotectedClusterBlockExceptioncheckBlock(WriteRequestrequest,ClusterStatestate){returnstate.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);}}

6. 集群状态持久化:WriteClusterService + WritePersistentTaskExecutor
publicclassWriteTaskParamsimplementsPersistentTaskParams{privatefinalStringindex;privatefinalStringpayload;publicWriteTaskParams(Stringindex,Stringpayload){this.index=index;this.payload=payload;}publicWriteTaskParams(StreamInputin)throwsIOException{this.index=in.readString();this.payload=in.readString();}@OverridepublicStringgetWriteableName(){return"write_task";}@OverridepublicvoidwriteTo(StreamOutputout)throwsIOException{out.writeString(index);out.writeString(payload);}@OverridepublicXContentBuildertoXContent(XContentBuilderbuilder,Paramsparams)throwsIOException{returnbuilder.startObject().field("index",index).field("payload",payload).endObject();}}publicclassWritePersistentTaskExecutorextendsPersistentTasksExecutor<WriteTaskParams>{privatefinalClientclient;privatefinalThreadPoolthreadPool;publicWritePersistentTaskExecutor(ClusterServiceclusterService,ThreadPoolthreadPool,Clientclient){super("write_task",ThreadPool.Names.GENERIC);this.client=client;this.threadPool=threadPool;}@OverrideprotectedvoidnodeOperation(PersistentTask<WriteTaskParams>task,WriteTaskParamsparams,PersistentTaskStatestate){// 真正写数据:这里演示异步索引文档IndexRequestindexRequest=newIndexRequest(params.index).source("payload",params.payload,"timestamp",System.currentTimeMillis()).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);client.index(indexRequest,ActionListener.wrap(r->logger.info("Write task {} done, docId={}",task.getId(),r.getId()),e->logger.warn("Write task "+task.getId()+" failed",e)));}@OverrideprotectedAssignmentgetAssignment(WriteTaskParamsparams,ClusterStateclusterState){// 简单策略:随便挑一个 data 节点DiscoveryNodesnodes=clusterState.nodes();List<DiscoveryNode>dataNodes=nodes.getDataNodes().values().stream().toList();returndataNodes.isEmpty()?Assignment.NO_VALID_NODE_ASSIGNMENT:newAssignment(dataNodes.get(Randomness.get().nextInt(dataNodes.size())).getId(),"ok");}}

7. 安装 & 验证
mvn clean package# 得到 target/write-plugin-1.0.0.zipbin/elasticsearch-plugininstallfile:///full/path/write-plugin-1.0.0.zip# 重启节点
# 1. 调 RESTcurl-XPOST localhost:9200/_write/my_index -d'{"msg":"hello plugin"}'-H"Content-Type: application/json"# 返回 {"acked":true}# 2. 看任务curl-XGET localhost:9200/_cluster/pending_tasks# 3. 看结果curllocalhost:9200/my_index/_search?q=*:*

8. 可继续扩展的 5 个方向
  1. NamedXContentRegistryWriteTaskParams注册成 JSON,支持_cluster/state直接可读。
  2. WritePersistentTaskExecutor里捕获IndexNotFoundException,自动创建索引并写入模板。
  3. WriteTaskParams做成AckedRequest,实现POST /_write/{index}?wait_for_active_shards=2语义。
  4. 通过Plugin.createComponents注入自定义线程池,让大批量写任务走独立队列。
  5. PersistentTaskState存储重试次数,结合BackoffPolicy实现断点续写。

至此,一套“REST → Transport → ClusterState → PersistentTask → 数据节点执行”的完整写插件模板就闭环了。直接复制即可编译,二次开发只需替换WriteTaskParamsnodeOperation里的业务逻辑。```
推荐阅读:
PyCharm 2018–2024使用指南

更多技术文章见公众号: 大城市小农民

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/4/9 18:31:21

springboot-java热点新闻投稿快搜稿费网站vue

目录Spring Boot与Vue整合的热点新闻投稿系统稿费结算与快搜功能实现技术亮点与性能优化开发技术源码文档获取/同行可拿货,招校园代理 &#xff1a;文章底部获取博主联系方式&#xff01;Spring Boot与Vue整合的热点新闻投稿系统 该系统采用前后端分离架构&#xff0c;后端基于…

作者头像 李华
网站建设 2026/4/10 7:25:28

基于贾子智慧“势‑道‑术”框架的AI战略

智权革命&#xff1a;基于贾子智慧“势‑道‑术”框架的AI时代生存战略与中国规则制定之路摘要&#xff1a; 本报告以贾子智慧“势‑道‑术”为核心分析轴&#xff0c;系统解构AI对职业、经济、技术、能源及社会五大领域的颠覆性影响。报告指出&#xff0c;职业替代遵循“白领先…

作者头像 李华
网站建设 2026/4/13 6:29:42

基于多目标遗传算法的分布式电源选址定容探索

基于多目标遗传算法的分布式电源选址定容研究 关键词&#xff1a;分布式电源 选址定容 多目标遗传算法 参考文档&#xff1a;《店主自写文档》基本复现&#xff1b; 仿真软件&#xff1a;MATLAB 研究内容&#xff1a;代码主要做的是基于多目标遗传算法的分布式电源选址定容…

作者头像 李华
网站建设 2026/4/13 22:09:39

30行PHP,利用硅基流动API,网页客服瞬间上线

欢迎来到小灰灰的博客空间&#xff01;Weclome you&#xff01; 博客主页&#xff1a;IT小灰灰 爱发电&#xff1a;小灰灰的爱发电 热爱领域&#xff1a;前端&#xff08;HTML&#xff09;、后端&#xff08;PHP&#xff09;、人工智能、云服务 目录 一、硅基流动平台准备 二…

作者头像 李华
网站建设 2026/4/13 6:25:45

“PLC立体车库智能仿真:博途V15 3×2车库仿真系统”

PLC立体车库智能仿真 博途V15 32立体车库 西门子1200PLC 触摸屏仿真 不需要实物 自带人机界面 小车上下行有电梯效果 每一个程序段都有注释 FC块标准化编写 自带变频器输出也可以仿真 现在拥有自动出入仓库的功能 IO表已列出最近在搞的32立体车库仿真项目挺有意思&#xff0c;用…

作者头像 李华
网站建设 2026/4/13 23:20:45

京东验证码

声明 本文章中所有内容仅供学习交流使用&#xff0c;不用于其他任何目的&#xff0c;抓包内容、敏感网址、数据接口等均已做脱敏处理&#xff0c;严禁用于商业用途和非法用途&#xff0c;否则由此产生的一切后果均与作者无关&#xff01;部分Python代码cookies.update(dict(res…

作者头像 李华