elasticsearch 插件编写: API 调用

本贴最后更新于 2697 天前,其中的信息可能已经时异事殊

当我们需要扩容ES的功能时,就需要自己来开发插件。


最终的目录结构

一个开发好的插件的目录结构是:


plugins/
└── abeffect
    ├── abeffect.jar
    └── plugin-descriptor.properties



其中 plugin-descriptor.properties 中保存了运行的信息:如



description=abeffect for ElasticSearch
version=1.0
name=abeffect
site=${elasticsearch.plugin.site}
jvm=true
classname=com.abeffect.es.plugin.MyPlugin
java.version=1.8
elasticsearch.version=5.4.0
isolated=${elasticsearch.plugin.isolated}


加载过程

首先加载 plugin-descriptor.properties 中的 classname 类文件,这里为 

com.abeffect.es.plugin.MyPlugin


MyPlugin 文件内容


import java.util.Collections;
import java.util.List;
import java.util.function.Supplier;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.IndexScopedSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsFilter;
import org.elasticsearch.plugins.ActionPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestHandler;

public class MyPlugin extends Plugin implements ActionPlugin {
	private final static Logger logger = LogManager.getLogger(MyPlugin.class);

	public MyPlugin() {
		super();
		logger.info("Load MyPlugin");
	}

	@Override
	public List<RestHandler> getRestHandlers(Settings settings, RestController restController,
			ClusterSettings clusterSettings, IndexScopedSettings indexScopedSettings, SettingsFilter settingsFilter,
			IndexNameExpressionResolver indexNameExpressionResolver, Supplier<DiscoveryNodes> nodesInCluster) {
		RestHandler handler = new MyRestAction(settings, restController);
		return Collections.singletonList(handler);
	}
}

在 MyPlugin 中注册了 MyRestAction 实现,其示例内容为:



MyRestAction文件内容


import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;

import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;

import org.elasticsearch.client.node.NodeClient;

import org.elasticsearch.common.Table;

import org.elasticsearch.common.settings.Settings;

import org.elasticsearch.rest.RestController;

import org.elasticsearch.rest.RestHandler;

import org.elasticsearch.rest.RestRequest;

import org.elasticsearch.rest.RestRequest.Method;

import org.elasticsearch.rest.RestResponse;

import org.elasticsearch.rest.action.RestResponseListener;

import org.elasticsearch.rest.action.cat.AbstractCatAction;

import org.elasticsearch.rest.action.cat.RestTable;

public class MyRestAction extends AbstractCatAction implements RestHandler {
	public MyRestAction(Settings settings, RestController restController) {
		super(settings);
		restController.registerHandler(Method.GET, "/_mycat/health", this);
	}
@Override
protected void documentation(StringBuilder sb) {
	sb.append("/_mycat/health\n");
}

@Override
public RestChannelConsumer doCatRequest(final RestRequest request, final NodeClient client) {
	ClusterHealthRequest clusterHealthRequest = new ClusterHealthRequest();

	return channel -&gt; client.admin().cluster().health(clusterHealthRequest,
			new RestResponseListener&lt;ClusterHealthResponse&gt;(channel) {
				@Override
				public RestResponse buildResponse(final ClusterHealthResponse health) throws Exception {
					return RestTable.buildResponse(buildTable(health, request), channel);
				}
			});
}

@Override
protected Table getTableWithHeader(final RestRequest request) {
	Table t = new Table();
	t.startHeadersWithTimestamp();
	t.addCell("cluster", "alias:cl;desc:cluster name");
	t.addCell("status", "alias:st;desc:health status");
	t.addCell("node.total", "alias:nt,nodeTotal;text-align:right;desc:total number of nodes");
	t.addCell("node.data", "alias:nd,nodeData;text-align:right;desc:number of nodes that can store data");
	t.addCell("shards", "alias:t,sh,shards.total,shardsTotal;text-align:right;desc:total number of shards");
	t.addCell("pri", "alias:p,shards.primary,shardsPrimary;text-align:right;desc:number of primary shards");
	t.addCell("relo",
			"alias:r,shards.relocating,shardsRelocating;text-align:right;desc:number of relocating nodes");
	t.addCell("init",
			"alias:i,shards.initializing,shardsInitializing;text-align:right;desc:number of initializing nodes");
	t.addCell("unassign",
			"alias:u,shards.unassigned,shardsUnassigned;text-align:right;desc:number of unassigned shards");
	t.addCell("pending_tasks", "alias:pt,pendingTasks;text-align:right;desc:number of pending tasks");
	t.addCell("max_task_wait_time",
			"alias:mtwt,maxTaskWaitTime;text-align:right;desc:wait time of longest task pending");
	t.addCell("active_shards_percent",
			"alias:asp,activeShardsPercent;text-align:right;desc:active number of shards in percent");
	t.endHeaders();

	return t;
}

private Table buildTable(final ClusterHealthResponse health, final RestRequest request) {
	Table t = getTableWithHeader(request);
	t.startRow();
	t.addCell(health.getClusterName());
	t.addCell(health.getStatus().name().toLowerCase(Locale.ROOT));
	t.addCell(health.getNumberOfNodes());
	t.addCell(health.getNumberOfDataNodes());
	t.addCell(health.getActiveShards());
	t.addCell(health.getActivePrimaryShards());
	t.addCell(health.getRelocatingShards());
	t.addCell(health.getInitializingShards());
	t.addCell(health.getUnassignedShards());
	t.addCell(health.getNumberOfPendingTasks());
	t.addCell(health.getTaskMaxWaitingTime().millis() == 0 ? "-" : health.getTaskMaxWaitingTime());
	t.addCell(String.format(Locale.ROOT, "%1.1f%%", health.getActiveShardsPercent()));
	t.endRow();
	return t;
}

}




MyRestAction的内容参考了阅读资料2. 



打包过程

mvn package

得到 abeffect.jar 文件


运行过程和结果

启动


[2017-06-20T15:20:09,719][INFO ][c.u.e.p.MyPlugin         ] Load MyPlugin
[2017-06-20T15:20:09,728][INFO ][o.e.p.PluginsService     ] [MwHlebI] loaded module [transport-netty4]
[2017-06-20T15:20:09,729][INFO ][o.e.p.PluginsService     ] [MwHlebI] loaded plugin [abeffect]
[2017-06-20T15:20:12,940][INFO ][o.e.d.DiscoveryModule    ] [MwHlebI] using discovery type [zen]
[2017-06-20T15:20:14,292][INFO ][o.e.n.Node               ] initialized
[2017-06-20T15:20:14,301][INFO ][o.e.n.Node               ] [MwHlebI] starting ...


调用API示例


curl -s "localhost:9200/_mycat/health?v"
epoch      timestamp cluster       status node.total node.data shards pri relo init unassign pending_tasks max_task_wait_time active_shards_percent
1497943235 15:20:35  elasticsearch green           1         1      0   0    0    0        0             0                  -                100.0%


阅读资料


1. ElasticSearch5.3插件开发(一)控制台打印信息

2. ElasticSearch5.3插件开发(二)获取集群健康信息

3. Creating a Plugin for Elasticsearch 5.0 Using Maven (Updated for GA) 推荐阅读,比本文写的清晰,使用Maven打包

4. Creating an Ingest Plugin for Elasticsearch (Updated for GA) 推荐阅读,比本文写的清晰,使用Gradle打包

5. https://github.com/jolicode/emoji-search/tree/master/esplugin 一个es plugin的例子, 可以参考

6. Building a plugin with Gradle and Elasticsearch 5.0

7. plugin gradle


附录

1. elasticsearch5.0.0分配的变化Http协议和REST接口的变化


  • Elasticsearch

    Elasticsearch 是一个基于 Lucene 的搜索服务器。它提供了一个分布式多用户能力的全文搜索引擎,基于 RESTful 接口。Elasticsearch 是用 Java 开发的,并作为 Apache 许可条款下的开放源码发布,是当前流行的企业级搜索引擎。设计用于云计算中,能够达到实时搜索,稳定,可靠,快速,安装使用方便。

    117 引用 • 99 回帖 • 212 关注

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...