From e5bd9ebebb6034ae56eced2abc0611e05d38fc16 Mon Sep 17 00:00:00 2001 From: Brian Olsen Date: Sat, 11 Jan 2020 23:08:56 -0500 Subject: [PATCH] Add array support using definitions in the _meta field. --- .../main/sphinx/connector/elasticsearch.rst | 46 ++++++ .../elasticsearch/ElasticsearchMetadata.java | 19 ++- .../ElasticsearchPageSource.java | 7 + .../client/ElasticsearchClient.java | 24 +++- .../elasticsearch/client/IndexMetadata.java | 9 +- .../elasticsearch/decoders/ArrayDecoder.java | 52 +++++++ ...TestElasticsearchIntegrationSmokeTest.java | 136 ++++++++++++++++++ 7 files changed, 283 insertions(+), 10 deletions(-) create mode 100644 presto-elasticsearch/src/main/java/io/prestosql/elasticsearch/decoders/ArrayDecoder.java diff --git a/presto-docs/src/main/sphinx/connector/elasticsearch.rst b/presto-docs/src/main/sphinx/connector/elasticsearch.rst index adee3d83ede4..0dba0ea44b0d 100644 --- a/presto-docs/src/main/sphinx/connector/elasticsearch.rst +++ b/presto-docs/src/main/sphinx/connector/elasticsearch.rst @@ -185,6 +185,52 @@ Elasticsearch Presto (all others) (unsupported) ============= ============= +Array Types +^^^^^^^^^^^ + +Fields in Elasticsearch can contain `zero or more values `_ +, but there is no dedicated array type. To indicate a field contains an array, it can be annotated in a Presto-specific structure in +the `_meta `_ section of the index mapping. + +For example, you can have an Elasticsearch index that contains documents with the following structure: + +.. code-block:: json + + { + "array_string_field": ["presto","is","the","besto"], + "long_field": 314159265359, + "id_field": "564e6982-88ee-4498-aa98-df9e3f6b6109", + "timestamp_field": "1987-09-17T06:22:48.000Z", + "object_field": { + "array_int_field": [86,75,309], + "int_field": 2 + } + } + +The array fields of this structure can be defined by using the following command to add the field +property definition to the ``_meta.presto`` property of the target index mapping. + +.. code-block:: shell + + curl --request PUT \ + --url localhost:9200/doc/_mapping \ + --header 'content-type: application/json' \ + --data ' + { + "_meta": { + "presto":{ + "array_string_field":{ + "isArray":true + }, + "object_field":{ + "array_int_field":{ + "isArray":true + } + }, + } + } + }' + Special Columns --------------- diff --git a/presto-elasticsearch/src/main/java/io/prestosql/elasticsearch/ElasticsearchMetadata.java b/presto-elasticsearch/src/main/java/io/prestosql/elasticsearch/ElasticsearchMetadata.java index 0eccbbe82902..9aafbec33db7 100644 --- a/presto-elasticsearch/src/main/java/io/prestosql/elasticsearch/ElasticsearchMetadata.java +++ b/presto-elasticsearch/src/main/java/io/prestosql/elasticsearch/ElasticsearchMetadata.java @@ -33,6 +33,7 @@ import io.prestosql.spi.connector.SchemaTablePrefix; import io.prestosql.spi.predicate.Domain; import io.prestosql.spi.predicate.TupleDomain; +import io.prestosql.spi.type.ArrayType; import io.prestosql.spi.type.RowType; import io.prestosql.spi.type.Type; @@ -128,7 +129,7 @@ private List toColumnMetadata(IndexMetadata metadata) result.add(BuiltinColumns.SCORE.getMetadata()); for (IndexMetadata.Field field : metadata.getSchema().getFields()) { - Type type = toPrestoType(field.getType()); + Type type = toPrestoType(field); if (type == null) { continue; } @@ -162,9 +163,19 @@ private static boolean supportsPredicates(IndexMetadata.Type type) return false; } - private Type toPrestoType(IndexMetadata.Type type) + private Type toPrestoType(IndexMetadata.Field metaDataField) { - if (type instanceof PrimitiveType) { + return toPrestoType(metaDataField, metaDataField.isArray()); + } + + private Type toPrestoType(IndexMetadata.Field metaDataField, boolean isArray) + { + IndexMetadata.Type type = metaDataField.getType(); + if (isArray) { + Type elementType = toPrestoType(metaDataField, false); + return new ArrayType(elementType); + } + else if (type instanceof PrimitiveType) { switch (((PrimitiveType) type).getName()) { case "float": return REAL; @@ -198,7 +209,7 @@ else if (type instanceof ObjectType) { ObjectType objectType = (ObjectType) type; List fields = objectType.getFields().stream() - .map(field -> RowType.field(field.getName(), toPrestoType(field.getType()))) + .map(field -> RowType.field(field.getName(), toPrestoType(field))) .collect(toImmutableList()); return RowType.from(fields); diff --git a/presto-elasticsearch/src/main/java/io/prestosql/elasticsearch/ElasticsearchPageSource.java b/presto-elasticsearch/src/main/java/io/prestosql/elasticsearch/ElasticsearchPageSource.java index 4f9100acfd7e..4be40155223e 100644 --- a/presto-elasticsearch/src/main/java/io/prestosql/elasticsearch/ElasticsearchPageSource.java +++ b/presto-elasticsearch/src/main/java/io/prestosql/elasticsearch/ElasticsearchPageSource.java @@ -17,6 +17,7 @@ import com.google.common.collect.ImmutableList; import io.airlift.log.Logger; import io.prestosql.elasticsearch.client.ElasticsearchClient; +import io.prestosql.elasticsearch.decoders.ArrayDecoder; import io.prestosql.elasticsearch.decoders.BigintDecoder; import io.prestosql.elasticsearch.decoders.BooleanDecoder; import io.prestosql.elasticsearch.decoders.Decoder; @@ -38,6 +39,7 @@ import io.prestosql.spi.block.PageBuilderStatus; import io.prestosql.spi.connector.ConnectorPageSource; import io.prestosql.spi.connector.ConnectorSession; +import io.prestosql.spi.type.ArrayType; import io.prestosql.spi.type.RowType; import io.prestosql.spi.type.Type; import org.elasticsearch.action.search.SearchResponse; @@ -316,6 +318,11 @@ private Decoder createDecoder(ConnectorSession session, String path, Type type) return new RowDecoder(fieldNames, decoders); } + if (type instanceof ArrayType) { + Type elementType = ((ArrayType) type).getElementType(); + + return new ArrayDecoder(createDecoder(session, path, elementType)); + } throw new UnsupportedOperationException("Type not supported: " + type); } diff --git a/presto-elasticsearch/src/main/java/io/prestosql/elasticsearch/client/ElasticsearchClient.java b/presto-elasticsearch/src/main/java/io/prestosql/elasticsearch/client/ElasticsearchClient.java index b4c0b6c820b7..cfad214ce886 100644 --- a/presto-elasticsearch/src/main/java/io/prestosql/elasticsearch/client/ElasticsearchClient.java +++ b/presto-elasticsearch/src/main/java/io/prestosql/elasticsearch/client/ElasticsearchClient.java @@ -20,6 +20,7 @@ import com.amazonaws.auth.InstanceProfileCredentialsProvider; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.NullNode; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import io.airlift.json.JsonCodec; @@ -449,7 +450,9 @@ public IndexMetadata getIndexMetadata(String index) mappings = mappings.elements().next(); } - return new IndexMetadata(parseType(mappings.get("properties"))); + JsonNode metaNode = nullSafeNode(mappings, "_meta"); + + return new IndexMetadata(parseType(mappings.get("properties"), nullSafeNode(metaNode, "presto"))); } catch (IOException e) { throw new PrestoException(ELASTICSEARCH_INVALID_RESPONSE, e); @@ -457,7 +460,7 @@ public IndexMetadata getIndexMetadata(String index) }); } - private IndexMetadata.ObjectType parseType(JsonNode properties) + private IndexMetadata.ObjectType parseType(JsonNode properties, JsonNode metaProperties) { Iterator> entries = properties.fields(); @@ -473,18 +476,21 @@ private IndexMetadata.ObjectType parseType(JsonNode properties) if (value.has("type")) { type = value.get("type").asText(); } + JsonNode metaNode = nullSafeNode(metaProperties, name); + boolean isArray = !metaNode.isNull() && metaNode.has("isArray") && metaNode.get("isArray").asBoolean(); + switch (type) { case "date": List formats = ImmutableList.of(); if (value.has("format")) { formats = Arrays.asList(value.get("format").asText().split("\\|\\|")); } - result.add(new IndexMetadata.Field(name, new IndexMetadata.DateTimeType(formats))); + result.add(new IndexMetadata.Field(isArray, name, new IndexMetadata.DateTimeType(formats))); break; case "object": if (value.has("properties")) { - result.add(new IndexMetadata.Field(name, parseType(value.get("properties")))); + result.add(new IndexMetadata.Field(isArray, name, parseType(value.get("properties"), metaNode))); } else { LOG.debug("Ignoring empty object field: %s", name); @@ -492,13 +498,21 @@ private IndexMetadata.ObjectType parseType(JsonNode properties) break; default: - result.add(new IndexMetadata.Field(name, new IndexMetadata.PrimitiveType(type))); + result.add(new IndexMetadata.Field(isArray, name, new IndexMetadata.PrimitiveType(type))); } } return new IndexMetadata.ObjectType(result.build()); } + private JsonNode nullSafeNode(JsonNode jsonNode, String name) + { + if (jsonNode == null || jsonNode.isNull() || jsonNode.get(name) == null) { + return NullNode.getInstance(); + } + return jsonNode.get(name); + } + public SearchResponse beginSearch(String index, int shard, QueryBuilder query, Optional> fields, List documentFields) { SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource() diff --git a/presto-elasticsearch/src/main/java/io/prestosql/elasticsearch/client/IndexMetadata.java b/presto-elasticsearch/src/main/java/io/prestosql/elasticsearch/client/IndexMetadata.java index 1e5acce8cde3..a998bad8a373 100644 --- a/presto-elasticsearch/src/main/java/io/prestosql/elasticsearch/client/IndexMetadata.java +++ b/presto-elasticsearch/src/main/java/io/prestosql/elasticsearch/client/IndexMetadata.java @@ -35,15 +35,22 @@ public ObjectType getSchema() public static class Field { + private final boolean isArray; private final String name; private final Type type; - public Field(String name, Type type) + public Field(boolean isArray, String name, Type type) { + this.isArray = isArray; this.name = requireNonNull(name, "name is null"); this.type = requireNonNull(type, "type is null"); } + public boolean isArray() + { + return isArray; + } + public String getName() { return name; diff --git a/presto-elasticsearch/src/main/java/io/prestosql/elasticsearch/decoders/ArrayDecoder.java b/presto-elasticsearch/src/main/java/io/prestosql/elasticsearch/decoders/ArrayDecoder.java new file mode 100644 index 000000000000..b030656173ec --- /dev/null +++ b/presto-elasticsearch/src/main/java/io/prestosql/elasticsearch/decoders/ArrayDecoder.java @@ -0,0 +1,52 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.prestosql.elasticsearch.decoders; + +import io.prestosql.spi.PrestoException; +import io.prestosql.spi.block.BlockBuilder; +import org.elasticsearch.search.SearchHit; + +import java.util.List; +import java.util.function.Supplier; + +import static io.prestosql.spi.StandardErrorCode.TYPE_MISMATCH; + +public class ArrayDecoder + implements Decoder +{ + private final Decoder elementDecoder; + + public ArrayDecoder(Decoder elementDecoder) + { + this.elementDecoder = elementDecoder; + } + + @Override + public void decode(SearchHit hit, Supplier getter, BlockBuilder output) + { + Object data = getter.get(); + + if (data == null) { + output.appendNull(); + } + else if (data instanceof List) { + BlockBuilder array = output.beginBlockEntry(); + ((List) data).forEach(element -> elementDecoder.decode(hit, () -> element, array)); + output.closeEntry(); + } + else { + throw new PrestoException(TYPE_MISMATCH, "Expected list of elements for ARRAY field"); + } + } +} diff --git a/presto-elasticsearch/src/test/java/io/prestosql/elasticsearch/TestElasticsearchIntegrationSmokeTest.java b/presto-elasticsearch/src/test/java/io/prestosql/elasticsearch/TestElasticsearchIntegrationSmokeTest.java index 09b73f76afe3..8e2fb51b946b 100644 --- a/presto-elasticsearch/src/test/java/io/prestosql/elasticsearch/TestElasticsearchIntegrationSmokeTest.java +++ b/presto-elasticsearch/src/test/java/io/prestosql/elasticsearch/TestElasticsearchIntegrationSmokeTest.java @@ -130,6 +130,142 @@ public void testNestedFields() "VALUES ('nestfield', 32, 'valueb')"); } + @Test + public void testArrayFields() + { + String indexName = "test_arrays"; + + embeddedElasticsearchNode.getClient() + .admin() + .indices() + .prepareCreate(indexName) + .addMapping("doc", "" + + "{" + + " \"_meta\": {" + + " \"presto\": {" + + " \"a\": {" + + " \"b\": {" + + " \"y\": {" + + " \"isArray\": true" + + " }" + + " }" + + " }," + + " \"c\": {" + + " \"f\": {" + + " \"g\": {" + + " \"isArray\": true" + + " }," + + " \"isArray\": true" + + " }" + + " }," + + " \"j\": {" + + " \"isArray\": true" + + " }," + + " \"k\": {" + + " \"isArray\": true" + + " }" + + " }" + + " }," + + " \"properties\":{" + + " \"a\": {" + + " \"type\": \"object\"," + + " \"properties\": {" + + " \"b\": {" + + " \"type\": \"object\"," + + " \"properties\": {" + + " \"x\": {" + + " \"type\": \"integer\"" + + " }," + + " \"y\": {" + + " \"type\": \"keyword\"" + + " }" + + " } " + + " }" + + " }" + + " }," + + " \"c\": {" + + " \"type\": \"object\"," + + " \"properties\": {" + + " \"d\": {" + + " \"type\": \"keyword\"" + + " }," + + " \"e\": {" + + " \"type\": \"keyword\"" + + " }," + + " \"f\": {" + + " \"type\": \"object\"," + + " \"properties\": {" + + " \"g\": {" + + " \"type\": \"integer\"" + + " }," + + " \"h\": {" + + " \"type\": \"integer\"" + + " }" + + " } " + + " }" + + " }" + + " }," + + " \"i\": {" + + " \"type\": \"long\"" + + " }," + + " \"j\": {" + + " \"type\": \"long\"" + + " }," + + " \"k\": {" + + " \"type\": \"long\"" + + " }" + + " }" + + "}", + XContentType.JSON) + .get(); + + index(indexName, ImmutableMap.builder() + .put("a", ImmutableMap.builder() + .put("b", ImmutableMap.builder() + .put("x", 1) + .put("y", ImmutableList.builder() + .add("hello") + .add("world") + .build()) + .build()) + .build()) + .put("c", ImmutableMap.builder() + .put("d", "foo") + .put("e", "bar") + .put("f", ImmutableList.>builder() + .add(ImmutableMap.builder() + .put("g", ImmutableList.builder() + .add(10) + .add(20) + .build()) + .put("h", 100) + .build()) + .add(ImmutableMap.builder() + .put("g", ImmutableList.builder() + .add(30) + .add(40) + .build()) + .put("h", 200) + .build()) + .build()) + .build()) + .put("j", ImmutableList.builder() + .add(50L) + .add(60L) + .build()) + .build()); + + embeddedElasticsearchNode.getClient() + .admin() + .indices() + .refresh(refreshRequest(indexName)) + .actionGet(); + + assertQuery( + "SELECT a.b.y[1], c.f[1].g[2], c.f[2].g[1], j[2], k[1] FROM test_arrays", + "VALUES ('hello', 20, 30, 60, NULL)"); + } + @Test public void testEmptyObjectFields() {