-
Notifications
You must be signed in to change notification settings - Fork 181
Support parsing documents with flattened value #3577
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
f256f4d
7652698
6bb1050
e122cb6
c421677
e8c6454
b1391ee
4b932bf
ffa1b18
8bd8356
98726c3
e7414d4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -166,4 +166,17 @@ default List<ExprValue> collectionValue() { | |
| default ExprValue keyValue(String key) { | ||
| return ExprMissingValue.of(); | ||
| } | ||
|
|
||
| /** | ||
| * Merge the value to the base value. By default, it overrides the base value with the current | ||
| * | ||
| * <p>This method will be called when key conflict happens in the process of populating | ||
| * ExprTupleValue See {@link OpenSearchExprValueFactory::populateValueRecursive}. | ||
| * | ||
| * @param base the target value to merge | ||
| * @return The merged value | ||
| */ | ||
| default ExprValue mergeTo(ExprValue base) { | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what if ExprIpValue mergeTo ExprIntValue?
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Seems it is
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As said in the doc, it should override the base under the current implementation. Such case will only happen when there is duplication value for a field.
For For So after populating the first field, there is key conflict when populating the second field. In this case, we need to invoke ExprTupleValue::mergeTo` to merge 2 ExprTupleValue deeply.
There is value conflict for the field But, in json, since the sequence of the fields in the same level are unstable actually, we cannot assert which value will be put firstly. Maybe throw exception to tell customer there is incompatible value conflict will also be an option. What's your opinion on this edge case? @LantaoJin @penghuo
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Do you mean throw exception in ppl querying or data ingesting? Data ingesting seems out of the scope here.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. IMO, if there are conflict fields. Display the any of them would be all fine as long as the field populated out is fixed. For example: PPL or are all acceptable. This could be treated as data quality issue.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could you add a test for key conflict case?
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The problem is value conflict case will flakey because fields order are not ensured in json. We cannot assert which value will be put in the end.
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added in the UT of
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
do not throw exception, current approach looks good. OpenSearch also return the LATEST field. |
||
| return this; | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,17 @@ | ||
| /* | ||
| * Copyright OpenSearch Contributors | ||
| * SPDX-License-Identifier: Apache-2.0 | ||
| */ | ||
|
|
||
| package org.opensearch.sql.calcite.remote; | ||
|
|
||
| import org.opensearch.sql.ppl.FlattenDocValueIT; | ||
|
|
||
| public class CalciteFlattenDocValueIT extends FlattenDocValueIT { | ||
| @Override | ||
| public void init() throws Exception { | ||
| super.init(); | ||
| enableCalcite(); | ||
| disallowCalciteFallback(); | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,58 @@ | ||
| /* | ||
| * Copyright OpenSearch Contributors | ||
| * SPDX-License-Identifier: Apache-2.0 | ||
| */ | ||
|
|
||
| package org.opensearch.sql.ppl; | ||
|
|
||
| import static org.opensearch.sql.legacy.SQLIntegTestCase.Index.FLATTENED_VALUE; | ||
| import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_FLATTENED_VALUE; | ||
| import static org.opensearch.sql.util.MatcherUtils.rows; | ||
| import static org.opensearch.sql.util.MatcherUtils.schema; | ||
| import static org.opensearch.sql.util.MatcherUtils.verifyDataRows; | ||
| import static org.opensearch.sql.util.MatcherUtils.verifySchema; | ||
|
|
||
| import java.io.IOException; | ||
| import org.hamcrest.TypeSafeMatcher; | ||
| import org.json.JSONArray; | ||
| import org.json.JSONObject; | ||
| import org.junit.jupiter.api.Test; | ||
|
|
||
| public class FlattenDocValueIT extends PPLIntegTestCase { | ||
| @Override | ||
| public void init() throws Exception { | ||
| super.init(); | ||
| loadIndex(FLATTENED_VALUE); | ||
| } | ||
|
|
||
| @Test | ||
| public void testFlattenDocValue() throws IOException { | ||
| JSONObject result = executeQuery(String.format("source=%s", TEST_INDEX_FLATTENED_VALUE)); | ||
| verifySchema(result, schema("log", "struct")); | ||
| TypeSafeMatcher<JSONArray> expectedRow = | ||
| rows(new JSONObject("{ \"json\" : { \"status\": \"SUCCESS\", \"time\": 100} }")); | ||
| verifyDataRows(result, expectedRow, expectedRow, expectedRow, expectedRow, expectedRow); | ||
| } | ||
|
|
||
| @Test | ||
| public void testFlattenDocValueWithFields() throws IOException { | ||
| JSONObject result = | ||
| executeQuery( | ||
| String.format( | ||
| "source=%s | fields log, log.json, log.json.status, log.json.time", | ||
| TEST_INDEX_FLATTENED_VALUE)); | ||
| verifySchema( | ||
| result, | ||
| schema("log", "struct"), | ||
| schema("log.json", "struct"), | ||
| schema("log.json.status", "string"), | ||
| schema("log.json.time", "bigint")); | ||
| TypeSafeMatcher<JSONArray> expectedRow = | ||
| rows( | ||
| new JSONObject("{ \"json\" : { \"status\": \"SUCCESS\", \"time\": 100} }"), | ||
| new JSONObject("{ \"status\": \"SUCCESS\", \"time\": 100}"), | ||
| "SUCCESS", | ||
| 100); | ||
| verifyDataRows(result, expectedRow, expectedRow, expectedRow, expectedRow, expectedRow); | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,10 @@ | ||
| {"index": {"_id":"1"}} | ||
| {"log": { "json" : { "status": "SUCCESS", "time": 100} } } | ||
| {"index": {"_id":"2"}} | ||
| {"log.json": { "status": "SUCCESS", "time": 100} } | ||
| {"index": {"_id":"3"}} | ||
| {"log.json.status": "SUCCESS", "log.json.time": 100 } | ||
| {"index": {"_id":"4"}} | ||
| {"log.json": { "status": "SUCCESS" }, "log.json.time": 100 } | ||
| {"index": {"_id":"5"}} | ||
| {"log": { "json" : {} }, "log.json": { "status": "SUCCESS" }, "log.json.time": 100 } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,56 @@ | ||
| setup: | ||
| - do: | ||
| query.settings: | ||
| body: | ||
| transient: | ||
| plugins.calcite.enabled : true | ||
| plugins.calcite.fallback.allowed : false | ||
|
|
||
| --- | ||
| teardown: | ||
| - do: | ||
| query.settings: | ||
| body: | ||
| transient: | ||
| plugins.calcite.enabled : false | ||
| plugins.calcite.fallback.allowed : true | ||
|
|
||
| --- | ||
| "Handle flattened document value": | ||
| - skip: | ||
| features: | ||
| - headers | ||
| - do: | ||
| bulk: | ||
| index: test | ||
| refresh: true | ||
| body: | ||
| - '{"index": {}}' | ||
| - '{"log": { "json" : { "status": "SUCCESS", "time": 100} } }' | ||
| - '{"index": {}}' | ||
| - '{"log.json": { "status": "SUCCESS", "time": 100} }' | ||
| - '{"index": {}}' | ||
| - '{"log.json.status": "SUCCESS", "log.json.time": 100 }' | ||
| - '{"index": {}}' | ||
| - '{"log.json": { "status": "SUCCESS" }, "log.json.time": 100 }' | ||
| - '{"index": {}}' | ||
| - '{"log": { "json" : {} }, "log.json": { "status": "SUCCESS" }, "log.json.time": 100 }' | ||
| - do: | ||
| headers: | ||
| Content-Type: 'application/json' | ||
| ppl: | ||
| body: | ||
| query: 'source=test' | ||
| - match: {"total": 5} | ||
| - match: {"schema": [{"name": "log", "type": "struct"}]} | ||
| - match: {"datarows": [[{ "json" : { "status": "SUCCESS", "time": 100} }], [{ "json" : { "status": "SUCCESS", "time": 100} }], [{ "json" : { "status": "SUCCESS", "time": 100} }], [{ "json" : { "status": "SUCCESS", "time": 100} }], [{ "json" : { "status": "SUCCESS", "time": 100} }]]} | ||
|
|
||
| - do: | ||
| headers: | ||
| Content-Type: 'application/json' | ||
| ppl: | ||
| body: | ||
| query: 'source=test | fields log, log.json, log.json.status, log.json.time' | ||
| - match: {"total": 5} | ||
| - match: {"schema": [{"name": "log", "type": "struct"}, {"name": "log.json", "type": "struct"}, {"name": "log.json.status", "type": "string"}, {"name": "log.json.time", "type": "bigint"}]} | ||
| - match: {"datarows": [[{ "json" : { "status": "SUCCESS", "time": 100} }, { "status": "SUCCESS", "time": 100}, "SUCCESS", 100], [{ "json" : { "status": "SUCCESS", "time": 100} }, { "status": "SUCCESS", "time": 100}, "SUCCESS", 100], [{ "json" : { "status": "SUCCESS", "time": 100} }, { "status": "SUCCESS", "time": 100}, "SUCCESS", 100], [{ "json" : { "status": "SUCCESS", "time": 100} }, { "status": "SUCCESS", "time": 100}, "SUCCESS", 100], [{ "json" : { "status": "SUCCESS", "time": 100} }, { "status": "SUCCESS", "time": 100}, "SUCCESS", 100]]} |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -32,7 +32,6 @@ | |
| import java.time.format.DateTimeParseException; | ||
| import java.time.temporal.TemporalAccessor; | ||
| import java.util.ArrayList; | ||
| import java.util.LinkedHashMap; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
| import java.util.Optional; | ||
|
|
@@ -317,19 +316,63 @@ private static ExprValue createOpenSearchDateType(Content value, ExprType type) | |
| * @return Value parsed from content. | ||
| */ | ||
| private ExprValue parseStruct(Content content, String prefix, boolean supportArrays) { | ||
| LinkedHashMap<String, ExprValue> result = new LinkedHashMap<>(); | ||
| ExprTupleValue result = ExprTupleValue.empty(); | ||
| content | ||
| .map() | ||
| .forEachRemaining( | ||
| entry -> | ||
| result.put( | ||
| entry.getKey(), | ||
| populateValueRecursive( | ||
| result, | ||
| new JsonPath(entry.getKey()), | ||
| parse( | ||
| entry.getValue(), | ||
| makeField(prefix, entry.getKey()), | ||
| type(makeField(prefix, entry.getKey())), | ||
| supportArrays))); | ||
| return new ExprTupleValue(result); | ||
| return result; | ||
| } | ||
|
|
||
| /** | ||
| * Populate the current ExprTupleValue recursively. | ||
| * | ||
| * <p>If JsonPath is not a root path(i.e. has dot in its raw path), it needs update to its | ||
| * children recursively until the leaf node. | ||
| * | ||
| * <p>If there is existing vale for the JsonPath, we need to merge the new value to the old. | ||
| */ | ||
| static void populateValueRecursive(ExprTupleValue result, JsonPath path, ExprValue value) { | ||
| if (path.getPaths().size() == 1) { | ||
| // Update the current ExprValue by using mergeTo if exists | ||
| result | ||
| .tupleValue() | ||
| .computeIfPresent(path.getRootPath(), (key, oldValue) -> value.mergeTo(oldValue)); | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what case is coverred by mergeTo, same field with different value?
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. See the comment [here](#3577 (comment)) |
||
| result.tupleValue().putIfAbsent(path.getRootPath(), value); | ||
| } else { | ||
| result.tupleValue().putIfAbsent(path.getRootPath(), ExprTupleValue.empty()); | ||
| populateValueRecursive( | ||
| (ExprTupleValue) result.tupleValue().get(path.getRootPath()), path.getChildPath(), value); | ||
penghuo marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| } | ||
| } | ||
|
|
||
| @Getter | ||
| static class JsonPath { | ||
| private final List<String> paths; | ||
|
|
||
| public JsonPath(String rawPath) { | ||
| this.paths = List.of(rawPath.split("\\.")); | ||
| } | ||
|
|
||
| public JsonPath(List<String> paths) { | ||
| this.paths = paths; | ||
| } | ||
|
|
||
| public String getRootPath() { | ||
| return paths.getFirst(); | ||
| } | ||
|
|
||
| public JsonPath getChildPath() { | ||
| return new JsonPath(paths.subList(1, paths.size())); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add UT
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done