当我们需要扩容ES的功能时,就需要自己来开发插件。
最终的目录结构
一个开发好的插件的目录结构是:
plugins/ └── abeffect ├── abeffect.jar └── plugin-descriptor.properties
其中 plugin-descriptor.properties 中保存了运行的信息:如
description=abeffect for ElasticSearch version=1.0 name=abeffect site=${elasticsearch.plugin.site} jvm=true classname=com.abeffect.es.plugin.MyPlugin java.version=1.8 elasticsearch.version=5.4.0 isolated=${elasticsearch.plugin.isolated}
加载过程
首先加载 plugin-descriptor.properties 中的 classname 类文件,这里为
com.abeffect.es.plugin.MyPlugin
MyPlugin 文件内容
在 MyPlugin 中注册了 MyRestAction 实现,其示例内容为:import java.util.Collections; import java.util.List; import java.util.function.Supplier; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.IndexScopedSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.SettingsFilter; import org.elasticsearch.plugins.ActionPlugin; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.rest.RestController; import org.elasticsearch.rest.RestHandler; public class MyPlugin extends Plugin implements ActionPlugin { private final static Logger logger = LogManager.getLogger(MyPlugin.class); public MyPlugin() { super(); logger.info("Load MyPlugin"); } @Override public List<RestHandler> getRestHandlers(Settings settings, RestController restController, ClusterSettings clusterSettings, IndexScopedSettings indexScopedSettings, SettingsFilter settingsFilter, IndexNameExpressionResolver indexNameExpressionResolver, Supplier<DiscoveryNodes> nodesInCluster) { RestHandler handler = new MyRestAction(settings, restController); return Collections.singletonList(handler); } }
MyRestAction文件内容
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.Table;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestHandler;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.RestRequest.Method;
import org.elasticsearch.rest.RestResponse;
import org.elasticsearch.rest.action.RestResponseListener;
import org.elasticsearch.rest.action.cat.AbstractCatAction;
import org.elasticsearch.rest.action.cat.RestTable;
public class MyRestAction extends AbstractCatAction implements RestHandler { public MyRestAction(Settings settings, RestController restController) { super(settings); restController.registerHandler(Method.GET, "/_mycat/health", this); }@Override protected void documentation(StringBuilder sb) { sb.append("/_mycat/health\n"); } @Override public RestChannelConsumer doCatRequest(final RestRequest request, final NodeClient client) { ClusterHealthRequest clusterHealthRequest = new ClusterHealthRequest(); return channel -> client.admin().cluster().health(clusterHealthRequest, new RestResponseListener<ClusterHealthResponse>(channel) { @Override public RestResponse buildResponse(final ClusterHealthResponse health) throws Exception { return RestTable.buildResponse(buildTable(health, request), channel); } }); } @Override protected Table getTableWithHeader(final RestRequest request) { Table t = new Table(); t.startHeadersWithTimestamp(); t.addCell("cluster", "alias:cl;desc:cluster name"); t.addCell("status", "alias:st;desc:health status"); t.addCell("node.total", "alias:nt,nodeTotal;text-align:right;desc:total number of nodes"); t.addCell("node.data", "alias:nd,nodeData;text-align:right;desc:number of nodes that can store data"); t.addCell("shards", "alias:t,sh,shards.total,shardsTotal;text-align:right;desc:total number of shards"); t.addCell("pri", "alias:p,shards.primary,shardsPrimary;text-align:right;desc:number of primary shards"); t.addCell("relo", "alias:r,shards.relocating,shardsRelocating;text-align:right;desc:number of relocating nodes"); t.addCell("init", "alias:i,shards.initializing,shardsInitializing;text-align:right;desc:number of initializing nodes"); t.addCell("unassign", "alias:u,shards.unassigned,shardsUnassigned;text-align:right;desc:number of unassigned shards"); t.addCell("pending_tasks", "alias:pt,pendingTasks;text-align:right;desc:number of pending tasks"); t.addCell("max_task_wait_time", "alias:mtwt,maxTaskWaitTime;text-align:right;desc:wait time of longest task pending"); t.addCell("active_shards_percent", "alias:asp,activeShardsPercent;text-align:right;desc:active number of shards in percent"); t.endHeaders(); return t; } private Table buildTable(final ClusterHealthResponse health, final RestRequest request) { Table t = getTableWithHeader(request); t.startRow(); t.addCell(health.getClusterName()); t.addCell(health.getStatus().name().toLowerCase(Locale.ROOT)); t.addCell(health.getNumberOfNodes()); t.addCell(health.getNumberOfDataNodes()); t.addCell(health.getActiveShards()); t.addCell(health.getActivePrimaryShards()); t.addCell(health.getRelocatingShards()); t.addCell(health.getInitializingShards()); t.addCell(health.getUnassignedShards()); t.addCell(health.getNumberOfPendingTasks()); t.addCell(health.getTaskMaxWaitingTime().millis() == 0 ? "-" : health.getTaskMaxWaitingTime()); t.addCell(String.format(Locale.ROOT, "%1.1f%%", health.getActiveShardsPercent())); t.endRow(); return t; }
}
MyRestAction的内容参考了阅读资料2.
打包过程
mvn package
得到 abeffect.jar 文件
运行过程和结果
启动
[2017-06-20T15:20:09,719][INFO ][c.u.e.p.MyPlugin ] Load MyPlugin [2017-06-20T15:20:09,728][INFO ][o.e.p.PluginsService ] [MwHlebI] loaded module [transport-netty4] [2017-06-20T15:20:09,729][INFO ][o.e.p.PluginsService ] [MwHlebI] loaded plugin [abeffect] [2017-06-20T15:20:12,940][INFO ][o.e.d.DiscoveryModule ] [MwHlebI] using discovery type [zen] [2017-06-20T15:20:14,292][INFO ][o.e.n.Node ] initialized [2017-06-20T15:20:14,301][INFO ][o.e.n.Node ] [MwHlebI] starting ...
调用API示例
curl -s "localhost:9200/_mycat/health?v" epoch timestamp cluster status node.total node.data shards pri relo init unassign pending_tasks max_task_wait_time active_shards_percent 1497943235 15:20:35 elasticsearch green 1 1 0 0 0 0 0 0 - 100.0%
阅读资料
1. ElasticSearch5.3插件开发(一)控制台打印信息
2. ElasticSearch5.3插件开发(二)获取集群健康信息
3. Creating a Plugin for Elasticsearch 5.0 Using Maven (Updated for GA) 推荐阅读,比本文写的清晰,使用Maven打包
4. Creating an Ingest Plugin for Elasticsearch (Updated for GA) 推荐阅读,比本文写的清晰,使用Gradle打包
5. https://github.com/jolicode/emoji-search/tree/master/esplugin 一个es plugin的例子, 可以参考
6. Building a plugin with Gradle and Elasticsearch 5.0
附录
1. elasticsearch5.0.0分配的变化Http协议和REST接口的变化
欢迎来到这里!
我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。
注册 关于