Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] Add IndexMapping Tool #1934

Merged
merged 1 commit into from
Jan 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,7 @@ public <T> void run(Map<String, String> parameters, ActionListener<T> listener)
final IndicesOptions indicesOptions = IndicesOptions.strictExpand();
final boolean local = parameters.containsKey("local") ? Boolean.parseBoolean("local") : false;
final TimeValue clusterManagerNodeTimeout = DEFAULT_CLUSTER_MANAGER_NODE_TIMEOUT;
final boolean includeUnloadedSegments = parameters.containsKey("include_unloaded_segments")
? Boolean.parseBoolean(parameters.get("include_unloaded_segments"))
: false;
final boolean includeUnloadedSegments = Boolean.parseBoolean(parameters.get("include_unloaded_segments"));

final ActionListener<Table> internalListener = ActionListener.notifyOnce(ActionListener.wrap(table -> {
// Handle empty table
Expand Down Expand Up @@ -297,10 +295,7 @@ public void onFailure(final Exception e) {

@Override
public boolean validate(Map<String, String> parameters) {
if (parameters == null || parameters.size() == 0) {
return false;
}
return true;
return parameters != null && !parameters.isEmpty();
}

/**
Expand Down Expand Up @@ -388,7 +383,7 @@ private Table buildTable(
final Table table = getTableWithHeader();

indicesSettings.forEach((indexName, settings) -> {
if (indicesMetadatas.containsKey(indexName) == false) {
if (!indicesMetadatas.containsKey(indexName)) {
// the index exists in the Get Indices response but is not present in the cluster state:
// it is likely that the index was deleted in the meanwhile, so we ignore it.
return;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.ml.engine.tools;

import static org.opensearch.action.support.clustermanager.ClusterManagerNodeRequest.DEFAULT_CLUSTER_MANAGER_NODE_TIMEOUT;
import static org.opensearch.ml.common.utils.StringUtils.gson;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;

import org.apache.logging.log4j.util.Strings;
import org.opensearch.action.admin.indices.get.GetIndexRequest;
import org.opensearch.action.admin.indices.get.GetIndexResponse;
import org.opensearch.action.support.IndicesOptions;
import org.opensearch.client.Client;
import org.opensearch.cluster.metadata.MappingMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.action.ActionListener;
import org.opensearch.ml.common.output.model.ModelTensors;
import org.opensearch.ml.common.spi.tools.Parser;
import org.opensearch.ml.common.spi.tools.Tool;
import org.opensearch.ml.common.spi.tools.ToolAnnotation;

import lombok.Getter;
import lombok.Setter;

@ToolAnnotation(IndexMappingTool.TYPE)
public class IndexMappingTool implements Tool {
public static final String TYPE = "IndexMappingTool";
private static final String DEFAULT_DESCRIPTION = String
.join(
" ",
"This tool gets index mapping information from a certain index.",
"It takes 1 required argument named `index` which is a comma-delimited list of one or more indices to get mapping information from, which expands wildcards.",
"It takes 1 optional argument named `local` which means whether to return information from the local node only instead of the cluster manager node (Default is false).",
"The tool returns the index mapping information, which is about how documents and their fields are stored and indexed, and also returns the index settings."
);

@Setter
@Getter
private String name = IndexMappingTool.TYPE;
@Getter
@Setter
private String description = DEFAULT_DESCRIPTION;
@Getter
private String version;

private Client client;
@Setter
private Parser<?, ?> inputParser;
@Setter
private Parser<?, ?> outputParser;

public IndexMappingTool(Client client) {
this.client = client;

outputParser = new Parser<>() {
@Override
public Object parse(Object o) {
@SuppressWarnings("unchecked")
List<ModelTensors> mlModelOutputs = (List<ModelTensors>) o;
return mlModelOutputs.get(0).getMlModelTensors().get(0).getDataAsMap().get("response");
}
};
}

@Override
public <T> void run(Map<String, String> parameters, ActionListener<T> listener) {
@SuppressWarnings("unchecked")
List<String> indexList = parameters.containsKey("index")
? gson.fromJson(parameters.get("index"), List.class)
: Collections.emptyList();
if (indexList.isEmpty()) {
@SuppressWarnings("unchecked")
T empty = (T) ("There were no results searching the index parameter [" + parameters.get("index") + "].");
listener.onResponse(empty);
return;
}

final String[] indices = indexList.toArray(Strings.EMPTY_ARRAY);

final IndicesOptions indicesOptions = IndicesOptions.strictExpand();
final boolean local = Boolean.parseBoolean(parameters.get("local"));
final TimeValue clusterManagerNodeTimeout = DEFAULT_CLUSTER_MANAGER_NODE_TIMEOUT;

ActionListener<GetIndexResponse> internalListener = new ActionListener<GetIndexResponse>() {

@Override
public void onResponse(GetIndexResponse getIndexResponse) {
try {
// Handle empty response
if (getIndexResponse.indices().length == 0) {
@SuppressWarnings("unchecked")
T empty = (T) ("There were no results searching the index parameter [" + parameters.get("index") + "].");
listener.onResponse(empty);
return;
}
StringBuilder sb = new StringBuilder();
for (String index : getIndexResponse.indices()) {
sb.append("index: ").append(index).append("\n\n");

MappingMetadata mapping = getIndexResponse.mappings().get(index);
if (mapping != null) {
sb.append("mappings:\n");
for (Entry<String, Object> entry : mapping.sourceAsMap().entrySet()) {
sb.append(entry.getKey()).append("=").append(entry.getValue()).append('\n');
}
sb.append("\n\n");
}

Settings settings = getIndexResponse.settings().get(index);
if (settings != null) {
sb.append("settings:\n").append(settings.toDelimitedString('\n')).append("\n\n");
}
}

@SuppressWarnings("unchecked")
T response = (T) sb.toString();
listener.onResponse(response);
} catch (Exception e) {
onFailure(e);
}
}

@Override
public void onFailure(final Exception e) {
listener.onFailure(e);
}

};
final GetIndexRequest getIndexRequest = new GetIndexRequest()
.indices(indices)
.indicesOptions(indicesOptions)
.local(local)
.clusterManagerNodeTimeout(clusterManagerNodeTimeout);

client.admin().indices().getIndex(getIndexRequest, internalListener);
}

@Override
public String getType() {
return TYPE;
}

@Override
public boolean validate(Map<String, String> parameters) {
return parameters != null && parameters.containsKey("index");
}

/**
* Factory for the {@link IndexMappingTool}
*/
public static class Factory implements Tool.Factory<IndexMappingTool> {
private Client client;

private static Factory INSTANCE;

/**
* Create or return the singleton factory instance
*/
public static Factory getInstance() {
if (INSTANCE != null) {
return INSTANCE;
}
synchronized (IndexMappingTool.class) {
if (INSTANCE != null) {
return INSTANCE;
}
INSTANCE = new Factory();
return INSTANCE;
}
}

/**
* Initialize this factory
* @param client The OpenSearch client
*/
public void init(Client client) {
this.client = client;
}

@Override
public IndexMappingTool create(Map<String, Object> map) {
return new IndexMappingTool(client);
}

@Override
public String getDefaultDescription() {
return DEFAULT_DESCRIPTION;
}

@Override
public String getDefaultType() {
return TYPE;
}

@Override
public String getDefaultVersion() {
return null;
}
}
}
Loading
Loading