Skip to content

[7.x] Geo-Match Enrich Processor (#47243) #47701

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Oct 7, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public void testDefaultConfiguration() throws IOException {
GeoShapeFieldMapper geoShapeFieldMapper = (GeoShapeFieldMapper) fieldMapper;
assertThat(geoShapeFieldMapper.fieldType().orientation(),
equalTo(GeoShapeFieldMapper.Defaults.ORIENTATION.value()));
assertThat(geoShapeFieldMapper.fieldType.hasDocValues(), equalTo(false));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,11 @@ public final class EnrichPolicy implements Writeable, ToXContentFragment {
public static final String ENRICH_INDEX_NAME_BASE = ".enrich-";

public static final String MATCH_TYPE = "match";
public static final String[] SUPPORTED_POLICY_TYPES = new String[]{MATCH_TYPE};
public static final String GEO_MATCH_TYPE = "geo_match";
public static final String[] SUPPORTED_POLICY_TYPES = new String[]{
MATCH_TYPE,
GEO_MATCH_TYPE
};

private static final ParseField QUERY = new ParseField("query");
private static final ParseField INDICES = new ParseField("indices");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,158 @@
*/
package org.elasticsearch.xpack.enrich;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.routing.Preference;
import org.elasticsearch.index.query.ConstantScoreQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.ingest.AbstractProcessor;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
import org.elasticsearch.xpack.enrich.action.EnrichCoordinatorProxyAction;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.function.BiConsumer;

public abstract class AbstractEnrichProcessor extends AbstractProcessor {

private final String policyName;
private final BiConsumer<SearchRequest, BiConsumer<SearchResponse, Exception>> searchRunner;
private final String field;
private final String targetField;
private final boolean ignoreMissing;
private final boolean overrideEnabled;
protected final String matchField;
protected final int maxMatches;

protected AbstractEnrichProcessor(String tag, Client client, String policyName, String field, String targetField,
boolean ignoreMissing, boolean overrideEnabled, String matchField, int maxMatches) {
this(tag, createSearchRunner(client), policyName, field, targetField, ignoreMissing, overrideEnabled, matchField, maxMatches);
}

protected AbstractEnrichProcessor(String tag, String policyName) {
protected AbstractEnrichProcessor(String tag,
BiConsumer<SearchRequest, BiConsumer<SearchResponse, Exception>> searchRunner,
String policyName, String field, String targetField, boolean ignoreMissing, boolean overrideEnabled,
String matchField, int maxMatches) {
super(tag);
this.policyName = policyName;
this.searchRunner = searchRunner;
this.field = field;
this.targetField = targetField;
this.ignoreMissing = ignoreMissing;
this.overrideEnabled = overrideEnabled;
this.matchField = matchField;
this.maxMatches = maxMatches;
}

public abstract QueryBuilder getQueryBuilder(Object fieldValue);

@Override
public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Exception> handler) {
try {
// If a document does not have the enrich key, return the unchanged document
final Object value = ingestDocument.getFieldValue(field, Object.class, ignoreMissing);
if (value == null) {
handler.accept(ingestDocument, null);
return;
}

QueryBuilder queryBuilder = getQueryBuilder(value);
ConstantScoreQueryBuilder constantScore = new ConstantScoreQueryBuilder(queryBuilder);
SearchSourceBuilder searchBuilder = new SearchSourceBuilder();
searchBuilder.from(0);
searchBuilder.size(maxMatches);
searchBuilder.trackScores(false);
searchBuilder.fetchSource(true);
searchBuilder.query(constantScore);
SearchRequest req = new SearchRequest();
req.indices(EnrichPolicy.getBaseName(getPolicyName()));
req.preference(Preference.LOCAL.type());
req.source(searchBuilder);

searchRunner.accept(req, (searchResponse, e) -> {
if (e != null) {
handler.accept(null, e);
return;
}

// If the index is empty, return the unchanged document
// If the enrich key does not exist in the index, throw an error
// If no documents match the key, return the unchanged document
SearchHit[] searchHits = searchResponse.getHits().getHits();
if (searchHits.length < 1) {
handler.accept(ingestDocument, null);
return;
}

if (overrideEnabled || ingestDocument.hasField(targetField) == false) {
List<Map<String, Object>> enrichDocuments = new ArrayList<>(searchHits.length);
for (SearchHit searchHit : searchHits) {
Map<String, Object> enrichDocument = searchHit.getSourceAsMap();
enrichDocuments.add(enrichDocument);
}
ingestDocument.setFieldValue(targetField, enrichDocuments);
}
handler.accept(ingestDocument, null);
});
} catch (Exception e) {
handler.accept(null, e);
}
}

@Override
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
throw new UnsupportedOperationException("this method should not get executed");
}

public String getPolicyName() {
return policyName;
}

@Override
public String getType() {
return EnrichProcessorFactory.TYPE;
}

String getField() {
return field;
}

public String getTargetField() {
return targetField;
}

boolean isIgnoreMissing() {
return ignoreMissing;
}

boolean isOverrideEnabled() {
return overrideEnabled;
}

public String getMatchField() {
return matchField;
}

int getMaxMatches() {
return maxMatches;
}

private static BiConsumer<SearchRequest, BiConsumer<SearchResponse, Exception>> createSearchRunner(Client client) {
return (req, handler) -> {
client.execute(EnrichCoordinatorProxyAction.INSTANCE, req, ActionListener.wrap(
resp -> {
handler.accept(resp, null);
},
e -> {
handler.accept(null, e);
}));
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.CheckedFunction;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.collect.ImmutableOpenMap;
Expand Down Expand Up @@ -196,29 +197,29 @@ private void validateField(Map<?, ?> properties, String fieldName, boolean field

private XContentBuilder resolveEnrichMapping(final EnrichPolicy policy) {
// Currently the only supported policy type is EnrichPolicy.MATCH_TYPE, which is a keyword type
String keyType;
final String keyType;
final CheckedFunction<XContentBuilder, XContentBuilder, IOException> matchFieldMapping;
if (EnrichPolicy.MATCH_TYPE.equals(policy.getType())) {
keyType = "keyword";
matchFieldMapping = (builder) -> builder.field("type", "keyword").field("doc_values", false);
// No need to also configure index_options, because keyword type defaults to 'docs'.
} else if (EnrichPolicy.GEO_MATCH_TYPE.equals(policy.getType())) {
matchFieldMapping = (builder) -> builder.field("type", "geo_shape");
} else {
throw new ElasticsearchException("Unrecognized enrich policy type [{}]", policy.getType());
}

// Enable _source on enrich index. Explicitly mark key mapping type.
try {
XContentBuilder builder = JsonXContent.contentBuilder();
builder.startObject()
builder = builder.startObject()
.startObject(MapperService.SINGLE_MAPPING_NAME)
.field("dynamic", false)
.startObject("_source")
.field("enabled", true)
.endObject()
.startObject("properties")
.startObject(policy.getMatchField())
.field("type", keyType)
.field("doc_values", false)
.endObject()
.endObject()
.startObject(policy.getMatchField());
builder = matchFieldMapping.apply(builder).endObject().endObject()
.startObject("_meta")
.field(ENRICH_README_FIELD_NAME, ENRICH_INDEX_README_TEXT)
.field(ENRICH_POLICY_NAME_FIELD_NAME, policyName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.elasticsearch.cluster.metadata.AliasOrIndex;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.geo.ShapeRelation;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.ingest.ConfigurationUtils;
import org.elasticsearch.ingest.Processor;
Expand Down Expand Up @@ -57,8 +58,13 @@ public Processor create(Map<String, Processor.Factory> processorFactories, Strin

switch (policyType) {
case EnrichPolicy.MATCH_TYPE:
return new MatchProcessor(tag, client, policyName, field, targetField, matchField,
ignoreMissing, overrideEnabled, maxMatches);
return new MatchProcessor(tag, client, policyName, field, targetField, overrideEnabled, ignoreMissing, matchField,
maxMatches);
case EnrichPolicy.GEO_MATCH_TYPE:
String relationStr = ConfigurationUtils.readStringProperty(TYPE, tag, config, "shape_relation", "intersects");
ShapeRelation shapeRelation = ShapeRelation.getRelationByName(relationStr);
return new GeoMatchProcessor(tag, client, policyName, field, targetField, overrideEnabled, ignoreMissing, matchField,
maxMatches, shapeRelation);
default:
throw new IllegalArgumentException("unsupported policy type [" + policyType + "]");
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.enrich;

import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.geo.GeoPoint;
import org.elasticsearch.common.geo.GeoUtils;
import org.elasticsearch.common.geo.ShapeRelation;
import org.elasticsearch.geometry.Geometry;
import org.elasticsearch.geometry.MultiPoint;
import org.elasticsearch.geometry.Point;
import org.elasticsearch.index.query.GeoShapeQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;

import java.util.ArrayList;
import java.util.List;
import java.util.function.BiConsumer;

public final class GeoMatchProcessor extends AbstractEnrichProcessor {

private ShapeRelation shapeRelation;

GeoMatchProcessor(String tag,
Client client,
String policyName,
String field,
String targetField,
boolean overrideEnabled,
boolean ignoreMissing,
String matchField,
int maxMatches,
ShapeRelation shapeRelation) {
super(tag, client, policyName, field, targetField, ignoreMissing, overrideEnabled, matchField, maxMatches);
this.shapeRelation = shapeRelation;
}

/** used in tests **/
GeoMatchProcessor(String tag,
BiConsumer<SearchRequest, BiConsumer<SearchResponse, Exception>> searchRunner,
String policyName,
String field,
String targetField,
boolean overrideEnabled,
boolean ignoreMissing,
String matchField,
int maxMatches, ShapeRelation shapeRelation) {
super(tag, searchRunner, policyName, field, targetField, ignoreMissing, overrideEnabled, matchField, maxMatches);
this.shapeRelation = shapeRelation;
}

@SuppressWarnings("unchecked")
@Override
public QueryBuilder getQueryBuilder(Object fieldValue) {
List<Point> points = new ArrayList<>();
if (fieldValue instanceof List) {
List<Object> values = (List<Object>) fieldValue;
if (values.size() == 2 && values.get(0) instanceof Number) {
GeoPoint geoPoint = GeoUtils.parseGeoPoint(values, true);
points.add(new Point(geoPoint.lon(), geoPoint.lat()));
} else {
for (Object value : values) {
GeoPoint geoPoint = GeoUtils.parseGeoPoint(value, true);
points.add(new Point(geoPoint.lon(), geoPoint.lat()));
}
}
} else {
GeoPoint geoPoint = GeoUtils.parseGeoPoint(fieldValue, true);
points.add(new Point(geoPoint.lon(), geoPoint.lat()));
}
final Geometry queryGeometry;
if (points.isEmpty()) {
throw new IllegalArgumentException("no geopoints found");
} else if (points.size() == 1) {
queryGeometry = points.get(0);
} else {
queryGeometry = new MultiPoint(points);
}
GeoShapeQueryBuilder shapeQuery = new GeoShapeQueryBuilder(matchField, queryGeometry);
shapeQuery.relation(shapeRelation);
return shapeQuery;
}

public ShapeRelation getShapeRelation() {
return shapeRelation;
}
}
Loading