Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Use mapping format for reading date values #169

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,21 @@ public LocalTime timeValue() {
return time;
}

@Override
public LocalDate dateValue() {
return LocalDate.now();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What was this required for?

It would need to account for query start time.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. We return TIMESTAMP for OpenSearch date type => we return ExprTimestampValue
  2. An index can store time in date field => parser returns LocalTime

To satisfy both 1 and 2 we have to build ExprTimestampValue from LocalTime which calls to LocalDate.now().
I see few ways to fix/avoid this:

  1. Don't return ExprTimestampValue where it is possible. It is nice to have, but still we would have to build ExprTimestampValue from LocalTime.
  2. Change how we build ExprTimestampValue from LocalTime - use a fixed date (e.g. Epoch) instead of today.
  3. Deliver queryContext to OpenSearchExprValueFactory in opensearch module.

}

@Override
public LocalDateTime datetimeValue() {
return LocalDateTime.of(dateValue(), timeValue());
}

@Override
public Instant timestampValue() {
return ZonedDateTime.of(dateValue(), timeValue(), ExprTimestampValue.ZONE).toInstant();
}

@Override
public String toString() {
return String.format("TIME '%s'", value());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public class ExprTimestampValue extends AbstractExprValue {
/**
* todo. only support UTC now.
*/
private static final ZoneId ZONE = ZoneId.of("UTC");
public static final ZoneId ZONE = ZoneId.of("UTC");

private final Instant timestamp;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,22 @@
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import java.time.DateTimeException;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.format.DateTimeParseException;
import java.time.temporal.TemporalAccessor;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import java.util.function.BiFunction;
import lombok.Getter;
import lombok.Setter;
import org.apache.logging.log4j.LogManager;
import org.opensearch.common.time.DateFormatters;
import org.opensearch.sql.data.model.ExprBooleanValue;
import org.opensearch.sql.data.model.ExprByteValue;
Expand All @@ -61,6 +67,7 @@
import org.opensearch.sql.opensearch.data.utils.Content;
import org.opensearch.sql.opensearch.data.utils.ObjectContent;
import org.opensearch.sql.opensearch.data.utils.OpenSearchJsonContent;
import org.opensearch.sql.opensearch.mapping.MappingEntry;
import org.opensearch.sql.opensearch.response.agg.OpenSearchAggregationResponseParser;

/**
Expand All @@ -71,7 +78,7 @@ public class OpenSearchExprValueFactory {
* The Mapping of Field and ExprType.
*/
@Setter
private Map<String, ExprType> typeMapping;
private Map<String, MappingEntry> typeMapping;

@Getter
@Setter
Expand All @@ -81,33 +88,32 @@ public class OpenSearchExprValueFactory {

private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

private final Map<ExprType, Function<Content, ExprValue>> typeActionMap =
new ImmutableMap.Builder<ExprType, Function<Content, ExprValue>>()
.put(INTEGER, c -> new ExprIntegerValue(c.intValue()))
.put(LONG, c -> new ExprLongValue(c.longValue()))
.put(SHORT, c -> new ExprShortValue(c.shortValue()))
.put(BYTE, c -> new ExprByteValue(c.byteValue()))
.put(FLOAT, c -> new ExprFloatValue(c.floatValue()))
.put(DOUBLE, c -> new ExprDoubleValue(c.doubleValue()))
.put(STRING, c -> new ExprStringValue(c.stringValue()))
.put(BOOLEAN, c -> ExprBooleanValue.of(c.booleanValue()))
private final Map<ExprType, BiFunction<Content, MappingEntry, ExprValue>> typeActionMap =
new ImmutableMap.Builder<ExprType, BiFunction<Content, MappingEntry, ExprValue>>()
Comment on lines +91 to +92

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because MappingEntry is only used by date time, I'm think can we add this mapping info to Content?

.put(INTEGER, (c, m) -> new ExprIntegerValue(c.intValue()))
.put(LONG, (c, m) -> new ExprLongValue(c.longValue()))
.put(SHORT, (c, m) -> new ExprShortValue(c.shortValue()))
.put(BYTE, (c, m) -> new ExprByteValue(c.byteValue()))
.put(FLOAT, (c, m) -> new ExprFloatValue(c.floatValue()))
.put(DOUBLE, (c, m) -> new ExprDoubleValue(c.doubleValue()))
.put(STRING, (c, m) -> new ExprStringValue(c.stringValue()))
.put(BOOLEAN, (c, m) -> ExprBooleanValue.of(c.booleanValue()))
.put(TIMESTAMP, this::parseTimestamp)
.put(DATE, c -> new ExprDateValue(parseTimestamp(c).dateValue().toString()))
.put(TIME, c -> new ExprTimeValue(parseTimestamp(c).timeValue().toString()))
.put(DATETIME, c -> new ExprDatetimeValue(parseTimestamp(c).datetimeValue()))
.put(OPENSEARCH_TEXT, c -> new OpenSearchExprTextValue(c.stringValue()))
.put(OPENSEARCH_TEXT_KEYWORD, c -> new OpenSearchExprTextKeywordValue(c.stringValue()))
.put(OPENSEARCH_IP, c -> new OpenSearchExprIpValue(c.stringValue()))
.put(OPENSEARCH_GEO_POINT, c -> new OpenSearchExprGeoPointValue(c.geoValue().getLeft(),
.put(DATE, (c, m) -> new ExprDateValue(parseTimestamp(c, m).dateValue().toString()))
.put(TIME, (c, m) -> new ExprTimeValue(parseTimestamp(c, m).timeValue().toString()))
.put(DATETIME, (c, m) -> new ExprDatetimeValue(parseTimestamp(c, m).datetimeValue()))
.put(OPENSEARCH_TEXT, (c, m) -> new OpenSearchExprTextValue(c.stringValue()))
.put(OPENSEARCH_TEXT_KEYWORD, (c, m) -> new OpenSearchExprTextKeywordValue(c.stringValue()))
.put(OPENSEARCH_IP, (c, m) -> new OpenSearchExprIpValue(c.stringValue()))
.put(OPENSEARCH_GEO_POINT, (c, m) -> new OpenSearchExprGeoPointValue(c.geoValue().getLeft(),
c.geoValue().getRight()))
.put(OPENSEARCH_BINARY, c -> new OpenSearchExprBinaryValue(c.stringValue()))
.put(OPENSEARCH_BINARY, (c, m) -> new OpenSearchExprBinaryValue(c.stringValue()))
.build();

/**
* Constructor of OpenSearchExprValueFactory.
*/
public OpenSearchExprValueFactory(
Map<String, ExprType> typeMapping) {
public OpenSearchExprValueFactory(Map<String, MappingEntry> typeMapping) {
this.typeMapping = typeMapping;
}

Expand Down Expand Up @@ -151,7 +157,7 @@ private ExprValue parse(Content content, String field, Optional<ExprType> fieldT
return parseArray(content, field);
} else {
if (typeActionMap.containsKey(type)) {
return typeActionMap.get(type).apply(content);
return typeActionMap.get(type).apply(content, typeMapping.getOrDefault(field, null));
} else {
throw new IllegalStateException(
String.format(
Expand All @@ -165,7 +171,7 @@ private ExprValue parse(Content content, String field, Optional<ExprType> fieldT
* but has empty value. For example, {"empty_field": []}.
*/
private Optional<ExprType> type(String field) {
return Optional.ofNullable(typeMapping.get(field));
return Optional.ofNullable(typeMapping.get(field).getDataType());
}

/**
Expand All @@ -188,11 +194,61 @@ private ExprValue constructTimestamp(String value) {
}
}

private ExprValue parseTimestamp(Content value) {
// returns java.time.format.Parsed
private TemporalAccessor parseTimestampString(String value, MappingEntry mapping) {
if (mapping == null) {
return null;
}
for (var formatter : mapping.getRegularFormatters()) {
try {
return formatter.parse(value);
} catch (Exception ignored) {
// nothing to do, try another format
}
}
for (var formatter : mapping.getNamedFormatters()) {
try {
return formatter.parse(value);
} catch (Exception ignored) {
// nothing to do, try another format
}
}
return null;
}

private ExprValue parseTimestamp(Content value, MappingEntry mapping) {
if (value.isNumber()) {
return new ExprTimestampValue(Instant.ofEpochMilli(value.longValue()));
} else if (value.isString()) {
return constructTimestamp(value.stringValue());
TemporalAccessor parsed = parseTimestampString(value.stringValue(), mapping);
if (parsed == null) { // failed to parse or no formats given
return constructTimestamp(value.stringValue());
}
try {
return new ExprTimestampValue(Instant.from(parsed));
} catch (DateTimeException ignored) {
// nothing to do, try another type
}
// TODO return not ExprTimestampValue
try {
return new ExprTimestampValue(new ExprDateValue(LocalDate.from(parsed)).timestampValue());
} catch (DateTimeException ignored) {
// nothing to do, try another type
}
try {
return new ExprTimestampValue(new ExprDatetimeValue(LocalDateTime.from(parsed)).timestampValue());
} catch (DateTimeException ignored) {
// nothing to do, try another type
}
try {
return new ExprTimestampValue(new ExprTimeValue(LocalTime.from(parsed)).timestampValue());
} catch (DateTimeException ignored) {
// nothing to do, try another type
}
// TODO throw exception
LogManager.getLogger(OpenSearchExprValueFactory.class).error(
String.format("Can't recognize parsed value: %s, %s", parsed, parsed.getClass()));
return new ExprStringValue(value.stringValue());
} else {
return new ExprTimestampValue((Instant) value.objectValue());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,14 @@ public class IndexMapping {
/** Field mappings from field name to field type in OpenSearch date type system. */
private final Map<String, String> fieldMappings;

public Map<String, MappingEntry> mapping2;

public IndexMapping(Map<String, String> fieldMappings) {
this.fieldMappings = fieldMappings;
}

public IndexMapping(MappingMetadata metaData) {
this.mapping2 = flat2(metaData.getSourceAsMap());
this.fieldMappings = flatMappings(metaData.getSourceAsMap());
}

Expand Down Expand Up @@ -65,6 +68,19 @@ public <T> Map<String, T> getAllFieldTypes(Function<String, T> transform) {
.collect(Collectors.toMap(Map.Entry::getKey, e -> transform.apply(e.getValue())));
}

// TODO nested, consider recursive call
@SuppressWarnings("unchecked")
private Map<String, MappingEntry> flat2(Map<String, Object> indexMapping) {
return ((Map<String, Object>)indexMapping.getOrDefault("properties", emptyMap()))
.entrySet().stream()
.collect(Collectors.toMap(e -> e.getKey(), e -> {
Map<String, Object> mapping = (Map<String, Object>) e.getValue();
return new MappingEntry((String) mapping.getOrDefault("type", "object"),
(String) mapping.getOrDefault("format", null), null);
}));
}


@SuppressWarnings("unchecked")
private Map<String, String> flatMappings(Map<String, Object> indexMapping) {
ImmutableMap.Builder<String, String> builder = new ImmutableMap.Builder<>();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/


package org.opensearch.sql.opensearch.mapping;

import java.time.format.DateTimeFormatter;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.Setter;
import org.opensearch.common.time.DateFormatter;
import org.opensearch.sql.data.type.ExprType;

@AllArgsConstructor
public class MappingEntry {
Comment on lines +20 to +21

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may need more thoughts about this new abstraction. For now it seems only for OpenSearch date field mapping. Can we extend it to carry mapping info for all OpenSearch fields? For example, multi-field may use this abstraction to get inner field name and type.

{
  "mappings": {
    "properties": {
      "city": {
        "type": "text",
        "fields": {
          "raw": { 
            "type":  "keyword"
          }
        }
      }
    }
  }
}

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, current implementation doesn't work with keyword and nested types.


/**
* Data type stored in index mapping.
*/
@Getter
private String fieldType;

/**
* Date formats stored in index mapping.
*/
@Getter
private String formats;

/**
* ExprType calculated for given `fieldType`.
*/
@Getter
@Setter
private ExprType dataType;

public MappingEntry(String fieldType) {
this(fieldType, null, null);
}

public List<String> getFormatList() {
if (formats == null || formats.isEmpty()) {
return List.of();
}
return Arrays.stream(formats.split("\\|\\|")).map(String::trim).collect(Collectors.toList());
}

public List<DateFormatter> getNamedFormatters() {
return getFormatList().stream().filter(f -> {
try {
DateTimeFormatter.ofPattern(f);
return false;
} catch (Exception e) {
return true;
}
})
.map(DateFormatter::forPattern).collect(Collectors.toList());
}

public List<DateTimeFormatter> getRegularFormatters() {
return getFormatList().stream().map(f -> {
try {
return DateTimeFormatter.ofPattern(f);
} catch (Exception e) {
return null;
}
})
.filter(Objects::nonNull).collect(Collectors.toList());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.opensearch.sql.exception.SemanticCheckException;
import org.opensearch.sql.expression.ReferenceExpression;
import org.opensearch.sql.opensearch.data.value.OpenSearchExprValueFactory;
import org.opensearch.sql.opensearch.mapping.MappingEntry;
import org.opensearch.sql.opensearch.response.agg.OpenSearchAggregationResponseParser;

/**
Expand Down Expand Up @@ -213,7 +214,7 @@ public void pushDownProjects(Set<ReferenceExpression> projects) {
sourceBuilder.fetchSource(projectsSet.toArray(new String[0]), new String[0]);
}

public void pushTypeMapping(Map<String, ExprType> typeMapping) {
public void pushTypeMapping(Map<String, MappingEntry> typeMapping) {
exprValueFactory.setTypeMapping(typeMapping);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.opensearch.sql.opensearch.client.OpenSearchClient;
import org.opensearch.sql.opensearch.data.type.OpenSearchDataType;
import org.opensearch.sql.opensearch.mapping.IndexMapping;
import org.opensearch.sql.opensearch.mapping.MappingEntry;
import org.opensearch.sql.opensearch.request.OpenSearchRequest;

/**
Expand Down Expand Up @@ -121,6 +122,20 @@ public Map<String, ExprType> getFieldTypes() {
return fieldTypes;
}

// TODO possible collision if two indices have fields with same names
public Map<String, MappingEntry> getFieldMappings() {
Map<String, IndexMapping> indexMappings = client.getIndexMappings(indexName.getIndexNames());
Map<String, MappingEntry> fieldTypes = new HashMap<>();

for (IndexMapping indexMapping : indexMappings.values()) {
indexMapping.mapping2.forEach((key, value) ->
value.setDataType(transformESTypeToExprType(value.getFieldType())));
fieldTypes
.putAll(indexMapping.mapping2);
}
return fieldTypes;
}

/**
* Get the minimum of the max result windows of the indices.
*
Expand Down
Loading