Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@ public static ExprTupleValue fromExprValueMap(Map<String, ExprValue> map) {
return new ExprTupleValue(linkedHashMap);
}

public static ExprTupleValue empty() {
LinkedHashMap<String, ExprValue> linkedHashMap = new LinkedHashMap<>();
return new ExprTupleValue(linkedHashMap);
}

@Override
public Object value() {
LinkedHashMap<String, Object> resultMap = new LinkedHashMap<>();
Expand Down Expand Up @@ -107,4 +112,17 @@ public int compare(ExprValue other) {
public int hashCode() {
return Objects.hashCode(valueMap);
}

/** Implements mergeTo by merging deeply */
@Override
public ExprTupleValue mergeTo(ExprValue base) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add UT

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

if (base instanceof ExprTupleValue) {
base.tupleValue()
.forEach((key, value) -> this.tupleValue().merge(key, value, ExprValue::mergeTo));
} else {
throw new IllegalArgumentException(
String.format("Cannot merge ExprTupleValue to %s", base.getClass().getSimpleName()));
}
return this;
}
}
13 changes: 13 additions & 0 deletions core/src/main/java/org/opensearch/sql/data/model/ExprValue.java
Original file line number Diff line number Diff line change
Expand Up @@ -166,4 +166,17 @@ default List<ExprValue> collectionValue() {
default ExprValue keyValue(String key) {
return ExprMissingValue.of();
}

/**
* Merge the value to the base value. By default, it overrides the base value with the current
*
* <p>This method will be called when key conflict happens in the process of populating
* ExprTupleValue See {@link OpenSearchExprValueFactory::populateValueRecursive}.
*
* @param base the target value to merge
* @return The merged value
*/
default ExprValue mergeTo(ExprValue base) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if ExprIpValue mergeTo ExprIntValue?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems it is replace. Could you comment in code what's the purpose of adding this mergeTo interface?

Copy link
Collaborator Author

@qianheng-aws qianheng-aws Apr 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As said in the doc, it should override the base under the current implementation. Such case will only happen when there is duplication value for a field.

  1. In normal cases, only ExprTupleValue::mergeTo should be invoked. Cases like:
{"log.json.time": 100, "log.json": { "status": "SUCCESS" }}

For "log.json.time": 100, it will be finally populated into an ExprValue ExprTupleValue(Map.of(\"log\": ExprTupleValue(Map.of({\"json\": ExprTupleValue(Map.of({\"time\": 100}))}))

For "log.json": { "status": "SUCCESS" }, it will be parsed into path: log.json, value: ExprTupleValue(Map.of({\"status\": SUCCESS}))

So after populating the first field, there is key conflict when populating the second field. In this case, we need to invoke ExprTupleValue::mergeTo` to merge 2 ExprTupleValue deeply.

  1. In some edge cases where there is conflict value in customer's document, like:
{"log.json.status": "FAILED", "log.json": { "status": "SUCCESS" }}

There is value conflict for the field log.json.status, so I want the value added most recently to override any previously added values, and ExprValue::megeTo will be invoked.

But, in json, since the sequence of the fields in the same level are unstable actually, we cannot assert which value will be put firstly. Maybe throw exception to tell customer there is incompatible value conflict will also be an option. What's your opinion on this edge case? @LantaoJin @penghuo

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe throw exception to tell customer there is incompatible value conflict will also be an option.

Do you mean throw exception in ppl querying or data ingesting? Data ingesting seems out of the scope here.

Copy link
Member

@LantaoJin LantaoJin Apr 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO, if there are conflict fields. Display the any of them would be all fine as long as the field populated out is fixed. For example:

{"id": 1, "log.json.status": "FAILED", "log.json": { "status": "SUCCESS" }}
{"id": 2, "log.json.status": "FAILED", "log.json": { "status": "SUCCESS" }}
{"id": 3, "log.json.status": "FAILED", "log.json": { "status": "SUCCESS" }}

PPL source=t | fields id, log.json.status return

1, FAILED
2, FAILED
3, FAILED

or

1, SUCCESS
2, SUCCESS
3, SUCCESS

are all acceptable. This could be treated as data quality issue.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add a test for key conflict case?

Copy link
Collaborator Author

@qianheng-aws qianheng-aws Apr 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem is value conflict case will flakey because fields order are not ensured in json. We cannot assert which value will be put in the end.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added in the UT of testPopulateValueRecursive, where I can control the populate order

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe throw exception to tell customer there is incompatible value conflict will also be an option. What's your opinion on this edge case?

do not throw exception, current approach looks good. OpenSearch also return the LATEST field.

###
POST {{baseUrl}}/test00001/_doc
Content-Type: application/x-ndjson

{
    "log.json.status": "failed1",
    "log.json": {
      "status": "failed2"
    },
    "log": {
      "json": {
        "status": "failed3"
      }
    },
}

###
GET {{baseUrl}}/test00001/_search
Content-Type: application/x-ndjson

{
    "fields": [
        "log.json.status"
    ],
    "_source": false
}

### Return 
        "fields": {
          "log.json.status": [
            "failed3"
          ]
        }

return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,4 +55,21 @@ public void comparabilityTest() {
assertThrows(ExpressionEvaluationException.class, () -> compare(tupleValue, tupleValue));
assertEquals("ExprTupleValue instances are not comparable", exception.getMessage());
}

@Test
public void testMergeTo() {
ExprValue tupleValue1 =
ExprValueUtils.tupleValue(
ImmutableMap.of("v1", 1, "inner_tuple", ImmutableMap.of("inner_v1", 1)));
ExprValue tupleValue2 =
ExprValueUtils.tupleValue(
ImmutableMap.of("v2", 2, "inner_tuple", ImmutableMap.of("inner_v2", 2)));
ExprValue expectedMergedValue =
ExprValueUtils.tupleValue(
ImmutableMap.of(
"v1", 1,
"inner_tuple", ImmutableMap.of("inner_v1", 1, "inner_v2", 2),
"v2", 2));
assertEquals(expectedMergedValue, tupleValue1.mergeTo(tupleValue2));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.calcite.remote;

import org.opensearch.sql.ppl.FlattenDocValueIT;

public class CalciteFlattenDocValueIT extends FlattenDocValueIT {
@Override
public void init() throws Exception {
super.init();
enableCalcite();
disallowCalciteFallback();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -838,6 +838,11 @@ public enum Index {
"alias",
getAliasIndexMapping(),
"src/test/resources/alias.json"),
FLATTENED_VALUE(
TestsConstants.TEST_INDEX_FLATTENED_VALUE,
"flattened_value",
null,
"src/test/resources/flattened_value.json"),
DUPLICATION_NULLABLE(
TestsConstants.TEST_INDEX_DUPLICATION_NULLABLE,
"duplication_nullable",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public class TestsConstants {
public static final String TEST_INDEX_GEOPOINT = TEST_INDEX + "_geopoint";
public static final String TEST_INDEX_JSON_TEST = TEST_INDEX + "_json_test";
public static final String TEST_INDEX_ALIAS = TEST_INDEX + "_alias";
public static final String TEST_INDEX_FLATTENED_VALUE = TEST_INDEX + "_flattened_value";
public static final String TEST_INDEX_GEOIP = TEST_INDEX + "_geoip";
public static final String DATASOURCES = ".ql-datasources";
public static final String TEST_INDEX_STATE_COUNTRY = TEST_INDEX + "_state_country";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.ppl;

import static org.opensearch.sql.legacy.SQLIntegTestCase.Index.FLATTENED_VALUE;
import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_FLATTENED_VALUE;
import static org.opensearch.sql.util.MatcherUtils.rows;
import static org.opensearch.sql.util.MatcherUtils.schema;
import static org.opensearch.sql.util.MatcherUtils.verifyDataRows;
import static org.opensearch.sql.util.MatcherUtils.verifySchema;

import java.io.IOException;
import org.hamcrest.TypeSafeMatcher;
import org.json.JSONArray;
import org.json.JSONObject;
import org.junit.jupiter.api.Test;

public class FlattenDocValueIT extends PPLIntegTestCase {
@Override
public void init() throws Exception {
super.init();
loadIndex(FLATTENED_VALUE);
}

@Test
public void testFlattenDocValue() throws IOException {
JSONObject result = executeQuery(String.format("source=%s", TEST_INDEX_FLATTENED_VALUE));
verifySchema(result, schema("log", "struct"));
TypeSafeMatcher<JSONArray> expectedRow =
rows(new JSONObject("{ \"json\" : { \"status\": \"SUCCESS\", \"time\": 100} }"));
verifyDataRows(result, expectedRow, expectedRow, expectedRow, expectedRow, expectedRow);
}

@Test
public void testFlattenDocValueWithFields() throws IOException {
JSONObject result =
executeQuery(
String.format(
"source=%s | fields log, log.json, log.json.status, log.json.time",
TEST_INDEX_FLATTENED_VALUE));
verifySchema(
result,
schema("log", "struct"),
schema("log.json", "struct"),
schema("log.json.status", "string"),
schema("log.json.time", "bigint"));
TypeSafeMatcher<JSONArray> expectedRow =
rows(
new JSONObject("{ \"json\" : { \"status\": \"SUCCESS\", \"time\": 100} }"),
new JSONObject("{ \"status\": \"SUCCESS\", \"time\": 100}"),
"SUCCESS",
100);
verifyDataRows(result, expectedRow, expectedRow, expectedRow, expectedRow, expectedRow);
}
}
10 changes: 10 additions & 0 deletions integ-test/src/test/resources/flattened_value.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{"index": {"_id":"1"}}
{"log": { "json" : { "status": "SUCCESS", "time": 100} } }
{"index": {"_id":"2"}}
{"log.json": { "status": "SUCCESS", "time": 100} }
{"index": {"_id":"3"}}
{"log.json.status": "SUCCESS", "log.json.time": 100 }
{"index": {"_id":"4"}}
{"log.json": { "status": "SUCCESS" }, "log.json.time": 100 }
{"index": {"_id":"5"}}
{"log": { "json" : {} }, "log.json": { "status": "SUCCESS" }, "log.json.time": 100 }
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
setup:
- do:
query.settings:
body:
transient:
plugins.calcite.enabled : true
plugins.calcite.fallback.allowed : false

---
teardown:
- do:
query.settings:
body:
transient:
plugins.calcite.enabled : false
plugins.calcite.fallback.allowed : true

---
"Handle flattened document value":
- skip:
features:
- headers
- do:
bulk:
index: test
refresh: true
body:
- '{"index": {}}'
- '{"log": { "json" : { "status": "SUCCESS", "time": 100} } }'
- '{"index": {}}'
- '{"log.json": { "status": "SUCCESS", "time": 100} }'
- '{"index": {}}'
- '{"log.json.status": "SUCCESS", "log.json.time": 100 }'
- '{"index": {}}'
- '{"log.json": { "status": "SUCCESS" }, "log.json.time": 100 }'
- '{"index": {}}'
- '{"log": { "json" : {} }, "log.json": { "status": "SUCCESS" }, "log.json.time": 100 }'
- do:
headers:
Content-Type: 'application/json'
ppl:
body:
query: 'source=test'
- match: {"total": 5}
- match: {"schema": [{"name": "log", "type": "struct"}]}
- match: {"datarows": [[{ "json" : { "status": "SUCCESS", "time": 100} }], [{ "json" : { "status": "SUCCESS", "time": 100} }], [{ "json" : { "status": "SUCCESS", "time": 100} }], [{ "json" : { "status": "SUCCESS", "time": 100} }], [{ "json" : { "status": "SUCCESS", "time": 100} }]]}

- do:
headers:
Content-Type: 'application/json'
ppl:
body:
query: 'source=test | fields log, log.json, log.json.status, log.json.time'
- match: {"total": 5}
- match: {"schema": [{"name": "log", "type": "struct"}, {"name": "log.json", "type": "struct"}, {"name": "log.json.status", "type": "string"}, {"name": "log.json.time", "type": "bigint"}]}
- match: {"datarows": [[{ "json" : { "status": "SUCCESS", "time": 100} }, { "status": "SUCCESS", "time": 100}, "SUCCESS", 100], [{ "json" : { "status": "SUCCESS", "time": 100} }, { "status": "SUCCESS", "time": 100}, "SUCCESS", 100], [{ "json" : { "status": "SUCCESS", "time": 100} }, { "status": "SUCCESS", "time": 100}, "SUCCESS", 100], [{ "json" : { "status": "SUCCESS", "time": 100} }, { "status": "SUCCESS", "time": 100}, "SUCCESS", 100], [{ "json" : { "status": "SUCCESS", "time": 100} }, { "status": "SUCCESS", "time": 100}, "SUCCESS", 100]]}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import java.time.format.DateTimeParseException;
import java.time.temporal.TemporalAccessor;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -317,19 +316,63 @@ private static ExprValue createOpenSearchDateType(Content value, ExprType type)
* @return Value parsed from content.
*/
private ExprValue parseStruct(Content content, String prefix, boolean supportArrays) {
LinkedHashMap<String, ExprValue> result = new LinkedHashMap<>();
ExprTupleValue result = ExprTupleValue.empty();
content
.map()
.forEachRemaining(
entry ->
result.put(
entry.getKey(),
populateValueRecursive(
result,
new JsonPath(entry.getKey()),
parse(
entry.getValue(),
makeField(prefix, entry.getKey()),
type(makeField(prefix, entry.getKey())),
supportArrays)));
return new ExprTupleValue(result);
return result;
}

/**
* Populate the current ExprTupleValue recursively.
*
* <p>If JsonPath is not a root path(i.e. has dot in its raw path), it needs update to its
* children recursively until the leaf node.
*
* <p>If there is existing vale for the JsonPath, we need to merge the new value to the old.
*/
static void populateValueRecursive(ExprTupleValue result, JsonPath path, ExprValue value) {
if (path.getPaths().size() == 1) {
// Update the current ExprValue by using mergeTo if exists
result
.tupleValue()
.computeIfPresent(path.getRootPath(), (key, oldValue) -> value.mergeTo(oldValue));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what case is coverred by mergeTo, same field with different value?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See the comment [here](#3577 (comment))

result.tupleValue().putIfAbsent(path.getRootPath(), value);
} else {
result.tupleValue().putIfAbsent(path.getRootPath(), ExprTupleValue.empty());
populateValueRecursive(
(ExprTupleValue) result.tupleValue().get(path.getRootPath()), path.getChildPath(), value);
}
}

@Getter
static class JsonPath {
private final List<String> paths;

public JsonPath(String rawPath) {
this.paths = List.of(rawPath.split("\\."));
}

public JsonPath(List<String> paths) {
this.paths = paths;
}

public String getRootPath() {
return paths.getFirst();
}

public JsonPath getChildPath() {
return new JsonPath(paths.subList(1, paths.size()));
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ public static ExprValue getExprValueFromSqlType(
return ExprValueUtils.fromObjectValue(array);

default:
LOG.warn(
LOG.debug(
"Unchecked sql type: {}, return Object type {}",
sqlType,
value.getClass().getTypeName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,12 @@
import org.opensearch.sql.data.model.ExprTimestampValue;
import org.opensearch.sql.data.model.ExprTupleValue;
import org.opensearch.sql.data.model.ExprValue;
import org.opensearch.sql.data.model.ExprValueUtils;
import org.opensearch.sql.opensearch.data.type.OpenSearchDataType;
import org.opensearch.sql.opensearch.data.type.OpenSearchDateType;
import org.opensearch.sql.opensearch.data.type.OpenSearchTextType;
import org.opensearch.sql.opensearch.data.utils.OpenSearchJsonContent;
import org.opensearch.sql.opensearch.data.value.OpenSearchExprValueFactory.JsonPath;

class OpenSearchExprValueFactoryTest {

Expand Down Expand Up @@ -992,6 +994,53 @@ public void factoryMappingsAreExtendableWithoutOverWrite()
() -> assertEquals(OpenSearchDataType.of(DATE), mapping.get("agg")));
}

@Test
public void testPopulateValueRecursive() {
ExprTupleValue tupleValue = ExprTupleValue.empty();

OpenSearchExprValueFactory.populateValueRecursive(
tupleValue, new JsonPath("log.json.time"), ExprValueUtils.integerValue(100));
ExprValue expectedValue =
ExprValueUtils.tupleValue(
Map.of("log", Map.of("json", new LinkedHashMap<>(Map.of("time", 100)))));
assertEquals(expectedValue, tupleValue);

OpenSearchExprValueFactory.populateValueRecursive(
tupleValue,
new JsonPath("log.json"),
ExprValueUtils.tupleValue(new LinkedHashMap<>(Map.of("status", "SUCCESS"))));
expectedValue =
ExprValueUtils.tupleValue(
Map.of(
"log",
Map.of(
"json",
new LinkedHashMap<>() {
{
put("status", "SUCCESS");
put("time", 100);
}
})));
assertEquals(expectedValue, tupleValue);

// update the conflict value with the latest
OpenSearchExprValueFactory.populateValueRecursive(
tupleValue, new JsonPath("log.json.status"), ExprValueUtils.stringValue("FAILED"));
expectedValue =
ExprValueUtils.tupleValue(
Map.of(
"log",
Map.of(
"json",
new LinkedHashMap<>() {
{
put("status", "FAILED");
put("time", 100);
}
})));
assertEquals(expectedValue, tupleValue);
}

public Map<String, ExprValue> tupleValue(String jsonString) {
final ExprValue construct = exprValueFactory.construct(jsonString, false);
return construct.tupleValue();
Expand Down
Loading