Skip to content

Commit

Permalink
Merge branch 'main' into persianstem
Browse files Browse the repository at this point in the history
Signed-off-by: Daniel (dB.) Doubrovkine <dblock@amazon.com>
  • Loading branch information
dblock authored Jul 22, 2024
2 parents 8761a01 + 5de0c8a commit f328615
Show file tree
Hide file tree
Showing 69 changed files with 3,596 additions and 798 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,12 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add matchesPluginSystemIndexPattern to SystemIndexRegistry ([#14750](https://github.com/opensearch-project/OpenSearch/pull/14750))
- Add Plugin interface for loading application based configuration templates (([#14659](https://github.com/opensearch-project/OpenSearch/issues/14659)))
- Refactor remote-routing-table service inline with remote state interfaces([#14668](https://github.com/opensearch-project/OpenSearch/pull/14668))
- Add SortResponseProcessor to Search Pipelines (([#14785](https://github.com/opensearch-project/OpenSearch/issues/14785)))
- Add prefix mode verification setting for repository verification (([#14790](https://github.com/opensearch-project/OpenSearch/pull/14790)))
- Add SplitResponseProcessor to Search Pipelines (([#14800](https://github.com/opensearch-project/OpenSearch/issues/14800)))
- Optimize TransportNodesAction to not send DiscoveryNodes for NodeStats, NodesInfo and ClusterStats call ([14749](https://github.com/opensearch-project/OpenSearch/pull/14749))
- Reduce logging in DEBUG for MasterService:run ([#14795](https://github.com/opensearch-project/OpenSearch/pull/14795))
- Enabling term version check on local state for all ClusterManager Read Transport Actions ([#14273](https://github.com/opensearch-project/OpenSearch/pull/14273))
- Add persian_stem filter (([#14847](https://github.com/opensearch-project/OpenSearch/pull/14847)))

### Dependencies
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,9 @@ public Map<String, Processor.Factory<SearchResponseProcessor>> getResponseProces
TruncateHitsResponseProcessor.TYPE,
new TruncateHitsResponseProcessor.Factory(),
CollapseResponseProcessor.TYPE,
new CollapseResponseProcessor.Factory()
new CollapseResponseProcessor.Factory(),
SortResponseProcessor.TYPE,
new SortResponseProcessor.Factory()
)
);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.search.pipeline.common;

import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.document.DocumentField;
import org.opensearch.common.xcontent.XContentHelper;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.xcontent.MediaType;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.ingest.ConfigurationUtils;
import org.opensearch.search.SearchHit;
import org.opensearch.search.pipeline.AbstractProcessor;
import org.opensearch.search.pipeline.Processor;
import org.opensearch.search.pipeline.SearchResponseProcessor;

import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;

/**
* Processor that sorts an array of items.
* Throws exception is the specified field is not an array.
*/
public class SortResponseProcessor extends AbstractProcessor implements SearchResponseProcessor {
/** Key to reference this processor type from a search pipeline. */
public static final String TYPE = "sort";
/** Key defining the array field to be sorted. */
public static final String SORT_FIELD = "field";
/** Optional key defining the sort order. */
public static final String SORT_ORDER = "order";
/** Optional key to put the sorted values in a different field. */
public static final String TARGET_FIELD = "target_field";
/** Default sort order if not specified */
public static final String DEFAULT_ORDER = "asc";

/** Enum defining how elements will be sorted */
public enum SortOrder {
/** Sort in ascending (natural) order */
ASCENDING("asc"),
/** Sort in descending (reverse) order */
DESCENDING("desc");

private final String direction;

SortOrder(String direction) {
this.direction = direction;
}

@Override
public String toString() {
return this.direction;
}

/**
* Converts the string representation of the enum value to the enum.
* @param value A string ("asc" or "desc")
* @return the corresponding enum value
*/
public static SortOrder fromString(String value) {
if (value == null) {
throw new IllegalArgumentException("Sort direction cannot be null");
}

if (value.equals(ASCENDING.toString())) {
return ASCENDING;
} else if (value.equals(DESCENDING.toString())) {
return DESCENDING;
}
throw new IllegalArgumentException("Sort direction [" + value + "] not recognized." + " Valid values are: [asc, desc]");
}
}

private final String sortField;
private final SortOrder sortOrder;
private final String targetField;

SortResponseProcessor(
String tag,
String description,
boolean ignoreFailure,
String sortField,
SortOrder sortOrder,
String targetField
) {
super(tag, description, ignoreFailure);
this.sortField = Objects.requireNonNull(sortField);
this.sortOrder = Objects.requireNonNull(sortOrder);
this.targetField = targetField == null ? sortField : targetField;
}

/**
* Getter function for sortField
* @return sortField
*/
public String getSortField() {
return sortField;
}

/**
* Getter function for targetField
* @return targetField
*/
public String getTargetField() {
return targetField;
}

/**
* Getter function for sortOrder
* @return sortOrder
*/
public SortOrder getSortOrder() {
return sortOrder;
}

@Override
public String getType() {
return TYPE;
}

@Override
public SearchResponse processResponse(SearchRequest request, SearchResponse response) throws Exception {
SearchHit[] hits = response.getHits().getHits();
for (SearchHit hit : hits) {
Map<String, DocumentField> fields = hit.getFields();
if (fields.containsKey(sortField)) {
DocumentField docField = hit.getFields().get(sortField);
if (docField == null) {
throw new IllegalArgumentException("field [" + sortField + "] is null, cannot sort.");
}
hit.setDocumentField(targetField, new DocumentField(targetField, getSortedValues(docField.getValues())));
}
if (hit.hasSource()) {
BytesReference sourceRef = hit.getSourceRef();
Tuple<? extends MediaType, Map<String, Object>> typeAndSourceMap = XContentHelper.convertToMap(
sourceRef,
false,
(MediaType) null
);

Map<String, Object> sourceAsMap = typeAndSourceMap.v2();
if (sourceAsMap.containsKey(sortField)) {
Object val = sourceAsMap.get(sortField);
if (val instanceof List) {
@SuppressWarnings("unchecked")
List<Object> listVal = (List<Object>) val;
sourceAsMap.put(targetField, getSortedValues(listVal));
}
XContentBuilder builder = XContentBuilder.builder(typeAndSourceMap.v1().xContent());
builder.map(sourceAsMap);
hit.sourceRef(BytesReference.bytes(builder));
}
}
}
return response;
}

private List<Object> getSortedValues(List<Object> values) {
return values.stream()
.map(this::downcastToComparable)
.sorted(sortOrder.equals(SortOrder.ASCENDING) ? Comparator.naturalOrder() : Comparator.reverseOrder())
.collect(Collectors.toList());
}

@SuppressWarnings("unchecked")
private Comparable<Object> downcastToComparable(Object obj) {
if (obj instanceof Comparable) {
return (Comparable<Object>) obj;
} else if (obj == null) {
throw new IllegalArgumentException("field [" + sortField + "] contains a null value.]");
} else {
throw new IllegalArgumentException("field [" + sortField + "] of type [" + obj.getClass().getName() + "] is not comparable.]");
}
}

static class Factory implements Processor.Factory<SearchResponseProcessor> {

@Override
public SortResponseProcessor create(
Map<String, Processor.Factory<SearchResponseProcessor>> processorFactories,
String tag,
String description,
boolean ignoreFailure,
Map<String, Object> config,
PipelineContext pipelineContext
) {
String sortField = ConfigurationUtils.readStringProperty(TYPE, tag, config, SORT_FIELD);
String targetField = ConfigurationUtils.readStringProperty(TYPE, tag, config, TARGET_FIELD, sortField);
try {
SortOrder sortOrder = SortOrder.fromString(
ConfigurationUtils.readStringProperty(TYPE, tag, config, SORT_ORDER, DEFAULT_ORDER)
);
return new SortResponseProcessor(tag, description, ignoreFailure, sortField, sortOrder, targetField);
} catch (IllegalArgumentException e) {
throw ConfigurationUtils.newConfigurationException(TYPE, tag, SORT_ORDER, e.getMessage());
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.search.pipeline.common;

import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.document.DocumentField;
import org.opensearch.common.xcontent.XContentHelper;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.xcontent.MediaType;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.ingest.ConfigurationUtils;
import org.opensearch.search.SearchHit;
import org.opensearch.search.pipeline.AbstractProcessor;
import org.opensearch.search.pipeline.Processor;
import org.opensearch.search.pipeline.SearchResponseProcessor;

import java.util.Arrays;
import java.util.Map;
import java.util.Objects;

/**
* Processor that sorts an array of items.
* Throws exception is the specified field is not an array.
*/
public class SplitResponseProcessor extends AbstractProcessor implements SearchResponseProcessor {
/** Key to reference this processor type from a search pipeline. */
public static final String TYPE = "split";
/** Key defining the string field to be split. */
public static final String SPLIT_FIELD = "field";
/** Key defining the delimiter used to split the string. This can be a regular expression pattern. */
public static final String SEPARATOR = "separator";
/** Optional key for handling empty trailing fields. */
public static final String PRESERVE_TRAILING = "preserve_trailing";
/** Optional key to put the split values in a different field. */
public static final String TARGET_FIELD = "target_field";

private final String splitField;
private final String separator;
private final boolean preserveTrailing;
private final String targetField;

SplitResponseProcessor(
String tag,
String description,
boolean ignoreFailure,
String splitField,
String separator,
boolean preserveTrailing,
String targetField
) {
super(tag, description, ignoreFailure);
this.splitField = Objects.requireNonNull(splitField);
this.separator = Objects.requireNonNull(separator);
this.preserveTrailing = preserveTrailing;
this.targetField = targetField == null ? splitField : targetField;
}

/**
* Getter function for splitField
* @return sortField
*/
public String getSplitField() {
return splitField;
}

/**
* Getter function for separator
* @return separator
*/
public String getSeparator() {
return separator;
}

/**
* Getter function for preserveTrailing
* @return preserveTrailing;
*/
public boolean isPreserveTrailing() {
return preserveTrailing;
}

/**
* Getter function for targetField
* @return targetField
*/
public String getTargetField() {
return targetField;
}

@Override
public String getType() {
return TYPE;
}

@Override
public SearchResponse processResponse(SearchRequest request, SearchResponse response) throws Exception {
SearchHit[] hits = response.getHits().getHits();
for (SearchHit hit : hits) {
Map<String, DocumentField> fields = hit.getFields();
if (fields.containsKey(splitField)) {
DocumentField docField = hit.getFields().get(splitField);
if (docField == null) {
throw new IllegalArgumentException("field [" + splitField + "] is null, cannot split.");
}
Object val = docField.getValue();
if (!(val instanceof String)) {
throw new IllegalArgumentException("field [" + splitField + "] is not a string, cannot split");
}
Object[] strings = ((String) val).split(separator, preserveTrailing ? -1 : 0);
hit.setDocumentField(targetField, new DocumentField(targetField, Arrays.asList(strings)));
}
if (hit.hasSource()) {
BytesReference sourceRef = hit.getSourceRef();
Tuple<? extends MediaType, Map<String, Object>> typeAndSourceMap = XContentHelper.convertToMap(
sourceRef,
false,
(MediaType) null
);

Map<String, Object> sourceAsMap = typeAndSourceMap.v2();
if (sourceAsMap.containsKey(splitField)) {
Object val = sourceAsMap.get(splitField);
if (val instanceof String) {
Object[] strings = ((String) val).split(separator, preserveTrailing ? -1 : 0);
sourceAsMap.put(targetField, Arrays.asList(strings));
}
XContentBuilder builder = XContentBuilder.builder(typeAndSourceMap.v1().xContent());
builder.map(sourceAsMap);
hit.sourceRef(BytesReference.bytes(builder));
}
}
}
return response;
}

static class Factory implements Processor.Factory<SearchResponseProcessor> {

@Override
public SplitResponseProcessor create(
Map<String, Processor.Factory<SearchResponseProcessor>> processorFactories,
String tag,
String description,
boolean ignoreFailure,
Map<String, Object> config,
PipelineContext pipelineContext
) {
String splitField = ConfigurationUtils.readStringProperty(TYPE, tag, config, SPLIT_FIELD);
String separator = ConfigurationUtils.readStringProperty(TYPE, tag, config, SEPARATOR);
boolean preserveTrailing = ConfigurationUtils.readBooleanProperty(TYPE, tag, config, PRESERVE_TRAILING, false);
String targetField = ConfigurationUtils.readStringProperty(TYPE, tag, config, TARGET_FIELD, splitField);
return new SplitResponseProcessor(tag, description, ignoreFailure, splitField, separator, preserveTrailing, targetField);
}
}
}
Loading

0 comments on commit f328615

Please sign in to comment.