Skip to content

Commit

Permalink
Add array support using definitions in the _meta field.
Browse files Browse the repository at this point in the history
  • Loading branch information
bitsondatadev authored and martint committed Jan 15, 2020
1 parent 7ae1231 commit e5bd9eb
Show file tree
Hide file tree
Showing 7 changed files with 283 additions and 10 deletions.
46 changes: 46 additions & 0 deletions presto-docs/src/main/sphinx/connector/elasticsearch.rst
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,52 @@ Elasticsearch Presto
(all others) (unsupported)
============= =============

Array Types
^^^^^^^^^^^

Fields in Elasticsearch can contain `zero or more values <https://www.elastic.co/guide/en/elasticsearch/reference/current/array.html>`_
, but there is no dedicated array type. To indicate a field contains an array, it can be annotated in a Presto-specific structure in
the `_meta <https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping-meta-field.html>`_ section of the index mapping.

For example, you can have an Elasticsearch index that contains documents with the following structure:

.. code-block:: json
{
"array_string_field": ["presto","is","the","besto"],
"long_field": 314159265359,
"id_field": "564e6982-88ee-4498-aa98-df9e3f6b6109",
"timestamp_field": "1987-09-17T06:22:48.000Z",
"object_field": {
"array_int_field": [86,75,309],
"int_field": 2
}
}
The array fields of this structure can be defined by using the following command to add the field
property definition to the ``_meta.presto`` property of the target index mapping.

.. code-block:: shell
curl --request PUT \
--url localhost:9200/doc/_mapping \
--header 'content-type: application/json' \
--data '
{
"_meta": {
"presto":{
"array_string_field":{
"isArray":true
},
"object_field":{
"array_int_field":{
"isArray":true
}
},
}
}
}'
Special Columns
---------------

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import io.prestosql.spi.connector.SchemaTablePrefix;
import io.prestosql.spi.predicate.Domain;
import io.prestosql.spi.predicate.TupleDomain;
import io.prestosql.spi.type.ArrayType;
import io.prestosql.spi.type.RowType;
import io.prestosql.spi.type.Type;

Expand Down Expand Up @@ -128,7 +129,7 @@ private List<ColumnMetadata> toColumnMetadata(IndexMetadata metadata)
result.add(BuiltinColumns.SCORE.getMetadata());

for (IndexMetadata.Field field : metadata.getSchema().getFields()) {
Type type = toPrestoType(field.getType());
Type type = toPrestoType(field);
if (type == null) {
continue;
}
Expand Down Expand Up @@ -162,9 +163,19 @@ private static boolean supportsPredicates(IndexMetadata.Type type)
return false;
}

private Type toPrestoType(IndexMetadata.Type type)
private Type toPrestoType(IndexMetadata.Field metaDataField)
{
if (type instanceof PrimitiveType) {
return toPrestoType(metaDataField, metaDataField.isArray());
}

private Type toPrestoType(IndexMetadata.Field metaDataField, boolean isArray)
{
IndexMetadata.Type type = metaDataField.getType();
if (isArray) {
Type elementType = toPrestoType(metaDataField, false);
return new ArrayType(elementType);
}
else if (type instanceof PrimitiveType) {
switch (((PrimitiveType) type).getName()) {
case "float":
return REAL;
Expand Down Expand Up @@ -198,7 +209,7 @@ else if (type instanceof ObjectType) {
ObjectType objectType = (ObjectType) type;

List<RowType.Field> fields = objectType.getFields().stream()
.map(field -> RowType.field(field.getName(), toPrestoType(field.getType())))
.map(field -> RowType.field(field.getName(), toPrestoType(field)))
.collect(toImmutableList());

return RowType.from(fields);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.google.common.collect.ImmutableList;
import io.airlift.log.Logger;
import io.prestosql.elasticsearch.client.ElasticsearchClient;
import io.prestosql.elasticsearch.decoders.ArrayDecoder;
import io.prestosql.elasticsearch.decoders.BigintDecoder;
import io.prestosql.elasticsearch.decoders.BooleanDecoder;
import io.prestosql.elasticsearch.decoders.Decoder;
Expand All @@ -38,6 +39,7 @@
import io.prestosql.spi.block.PageBuilderStatus;
import io.prestosql.spi.connector.ConnectorPageSource;
import io.prestosql.spi.connector.ConnectorSession;
import io.prestosql.spi.type.ArrayType;
import io.prestosql.spi.type.RowType;
import io.prestosql.spi.type.Type;
import org.elasticsearch.action.search.SearchResponse;
Expand Down Expand Up @@ -316,6 +318,11 @@ private Decoder createDecoder(ConnectorSession session, String path, Type type)

return new RowDecoder(fieldNames, decoders);
}
if (type instanceof ArrayType) {
Type elementType = ((ArrayType) type).getElementType();

return new ArrayDecoder(createDecoder(session, path, elementType));
}

throw new UnsupportedOperationException("Type not supported: " + type);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.amazonaws.auth.InstanceProfileCredentialsProvider;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.NullNode;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import io.airlift.json.JsonCodec;
Expand Down Expand Up @@ -449,15 +450,17 @@ public IndexMetadata getIndexMetadata(String index)
mappings = mappings.elements().next();
}

return new IndexMetadata(parseType(mappings.get("properties")));
JsonNode metaNode = nullSafeNode(mappings, "_meta");

return new IndexMetadata(parseType(mappings.get("properties"), nullSafeNode(metaNode, "presto")));
}
catch (IOException e) {
throw new PrestoException(ELASTICSEARCH_INVALID_RESPONSE, e);
}
});
}

private IndexMetadata.ObjectType parseType(JsonNode properties)
private IndexMetadata.ObjectType parseType(JsonNode properties, JsonNode metaProperties)
{
Iterator<Map.Entry<String, JsonNode>> entries = properties.fields();

Expand All @@ -473,32 +476,43 @@ private IndexMetadata.ObjectType parseType(JsonNode properties)
if (value.has("type")) {
type = value.get("type").asText();
}
JsonNode metaNode = nullSafeNode(metaProperties, name);
boolean isArray = !metaNode.isNull() && metaNode.has("isArray") && metaNode.get("isArray").asBoolean();

switch (type) {
case "date":
List<String> formats = ImmutableList.of();
if (value.has("format")) {
formats = Arrays.asList(value.get("format").asText().split("\\|\\|"));
}
result.add(new IndexMetadata.Field(name, new IndexMetadata.DateTimeType(formats)));
result.add(new IndexMetadata.Field(isArray, name, new IndexMetadata.DateTimeType(formats)));
break;

case "object":
if (value.has("properties")) {
result.add(new IndexMetadata.Field(name, parseType(value.get("properties"))));
result.add(new IndexMetadata.Field(isArray, name, parseType(value.get("properties"), metaNode)));
}
else {
LOG.debug("Ignoring empty object field: %s", name);
}
break;

default:
result.add(new IndexMetadata.Field(name, new IndexMetadata.PrimitiveType(type)));
result.add(new IndexMetadata.Field(isArray, name, new IndexMetadata.PrimitiveType(type)));
}
}

return new IndexMetadata.ObjectType(result.build());
}

private JsonNode nullSafeNode(JsonNode jsonNode, String name)
{
if (jsonNode == null || jsonNode.isNull() || jsonNode.get(name) == null) {
return NullNode.getInstance();
}
return jsonNode.get(name);
}

public SearchResponse beginSearch(String index, int shard, QueryBuilder query, Optional<List<String>> fields, List<String> documentFields)
{
SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,22 @@ public ObjectType getSchema()

public static class Field
{
private final boolean isArray;
private final String name;
private final Type type;

public Field(String name, Type type)
public Field(boolean isArray, String name, Type type)
{
this.isArray = isArray;
this.name = requireNonNull(name, "name is null");
this.type = requireNonNull(type, "type is null");
}

public boolean isArray()
{
return isArray;
}

public String getName()
{
return name;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.prestosql.elasticsearch.decoders;

import io.prestosql.spi.PrestoException;
import io.prestosql.spi.block.BlockBuilder;
import org.elasticsearch.search.SearchHit;

import java.util.List;
import java.util.function.Supplier;

import static io.prestosql.spi.StandardErrorCode.TYPE_MISMATCH;

public class ArrayDecoder
implements Decoder
{
private final Decoder elementDecoder;

public ArrayDecoder(Decoder elementDecoder)
{
this.elementDecoder = elementDecoder;
}

@Override
public void decode(SearchHit hit, Supplier<Object> getter, BlockBuilder output)
{
Object data = getter.get();

if (data == null) {
output.appendNull();
}
else if (data instanceof List) {
BlockBuilder array = output.beginBlockEntry();
((List) data).forEach(element -> elementDecoder.decode(hit, () -> element, array));
output.closeEntry();
}
else {
throw new PrestoException(TYPE_MISMATCH, "Expected list of elements for ARRAY field");
}
}
}
Loading

0 comments on commit e5bd9eb

Please sign in to comment.