Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docs/user/ppl/cmd/stats.rst
Original file line number Diff line number Diff line change
Expand Up @@ -505,3 +505,7 @@ PPL query::
| 36 | 30 | M |
+-----+----------+--------+


Limitation
==========
From 3.1.0, the ``stats`` command can preform a `nested aggregation <https://docs.opensearch.org/docs/latest/aggregations/bucket/nested/>`_ only when ``plugins.calcite.enabled`` is true.
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@

package org.opensearch.sql.calcite.remote;

import static org.opensearch.sql.util.MatcherUtils.assertJsonEqualsIgnoreId;

import java.io.IOException;
import org.junit.Ignore;
import org.junit.jupiter.api.Test;
import org.opensearch.sql.ppl.ExplainIT;

public class CalciteExplainIT extends ExplainIT {
Expand Down Expand Up @@ -43,6 +46,18 @@ public void testTrendlineWithSortPushDownExplain() throws Exception {
"https://github.com/opensearch-project/sql/issues/3466");
}

@Test
public void testNestedAggPushDownExplain() throws Exception {
String expected = loadFromFile("expectedOutput/calcite/explain_nested_agg_push.json");

assertJsonEqualsIgnoreId(
expected,
explainQueryToString(
"source=opensearch-sql_test_index_nested_simple| stats count(address.area) as"
+ " count_area, min(address.area) as min_area, max(address.area) as max_area,"
+ " avg(address.area) as avg_area, avg(age) as avg_age by name"));
}

@Override
@Ignore("test only in v2")
public void testExplainModeUnsupportedInV2() throws IOException {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,16 @@

package org.opensearch.sql.calcite.remote;

import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_NESTED_SIMPLE;
import static org.opensearch.sql.util.MatcherUtils.rows;
import static org.opensearch.sql.util.MatcherUtils.schema;
import static org.opensearch.sql.util.MatcherUtils.verifyDataRows;
import static org.opensearch.sql.util.MatcherUtils.verifySchemaInOrder;

import java.io.IOException;
import org.json.JSONObject;
import org.junit.Ignore;
import org.junit.jupiter.api.Test;
import org.opensearch.sql.ppl.StatsCommandIT;

public class CalciteStatsCommandIT extends StatsCommandIT {
Expand All @@ -13,5 +23,63 @@ public void init() throws Exception {
super.init();
enableCalcite();
disallowCalciteFallback();

loadIndex(Index.NESTED_SIMPLE);
}

@Test
public void testNestedAggregation() throws IOException {
JSONObject actual =
executeQuery(
String.format(
"source=%s | stats count(address.area) as count_area, min(address.area) as"
+ " min_area, max(address.area) as max_area, avg(address.area) as avg_area,"
+ " avg(age) as avg_age",
TEST_INDEX_NESTED_SIMPLE));
verifySchemaInOrder(
actual,
isCalciteEnabled() ? schema("count_area", "bigint") : schema("count_area", "int"),
schema("min_area", "double"),
schema("max_area", "double"),
schema("avg_area", "double"),
schema("avg_age", "double"));
verifyDataRows(actual, rows(9, 9.99, 1000.99, 300.11555555555555, 25.2));
}

@Test
public void testNestedAggregationBy() throws IOException {
JSONObject actual =
executeQuery(
String.format(
"source=%s | stats count(address.area) as count_area, min(address.area) as"
+ " min_area, max(address.area) as max_area, avg(address.area) as avg_area,"
+ " avg(age) as avg_age by name",
TEST_INDEX_NESTED_SIMPLE));
verifySchemaInOrder(
actual,
isCalciteEnabled() ? schema("count_area", "bigint") : schema("count_area", "int"),
schema("min_area", "double"),
schema("max_area", "double"),
schema("avg_area", "double"),
schema("avg_age", "double"),
schema("name", "string"));
verifyDataRows(
actual,
rows(4, 10.24, 400.99, 209.69, 24, "abbas"),
rows(0, null, null, null, 19, "andy"),
rows(2, 9.99, 1000.99, 505.49, 32, "chen"),
rows(1, 190.5, 190.5, 190.5, 25, "david"),
rows(2, 231.01, 429.79, 330.4, 26, "peng"));
}

@Ignore("https://github.com/opensearch-project/sql/issues/3384")
public void testNestedAggregationBySpan() throws IOException {
JSONObject actual =
executeQuery(
String.format(
"source=%s | stats count(address.area) as count_area, min(address.area) as"
+ " min_area, max(address.area) as max_area, avg(address.area) as avg_area,"
+ " avg(age) as avg_age by span(age, 10)",
TEST_INDEX_NESTED_SIMPLE));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ public class ExplainIT extends PPLIntegTestCase {
public void init() throws Exception {
super.init();
loadIndex(Index.ACCOUNT);
loadIndex(Index.NESTED_SIMPLE);
}

@Test
Expand Down Expand Up @@ -247,7 +248,7 @@ public void testTrendlineWithSortPushDownExplain() throws Exception {
+ "| fields ageTrend"));
}

String loadFromFile(String filename) throws Exception {
protected String loadFromFile(String filename) throws Exception {
URI uri = Resources.getResource(filename).toURI();
return new String(Files.readAllBytes(Paths.get(uri)));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"calcite": {
"logical": "LogicalProject(count_area=[$1], min_area=[$2], max_area=[$3], avg_area=[$4], avg_age=[$5], name=[$0])\n LogicalAggregate(group=[{0}], count_area=[COUNT($1)], min_area=[MIN($1)], max_area=[MAX($1)], avg_area=[AVG($1)], avg_age=[AVG($2)])\n LogicalProject(name=[$0], address.area=[$2], age=[$8])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_nested_simple]])\n",
"physical": "EnumerableCalc(expr#0..5=[{inputs}], count_area=[$t1], min_area=[$t2], max_area=[$t3], avg_area=[$t4], avg_age=[$t5], name=[$t0])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_nested_simple]], PushDownContext=[[PROJECT->[name, address.area, age], AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0},count_area=COUNT($1),min_area=MIN($1),max_area=MAX($1),avg_area=AVG($1),avg_age=AVG($2))], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"size\":0,\"timeout\":\"1m\",\"_source\":{\"includes\":[\"name\",\"address.area\",\"age\"],\"excludes\":[]},\"aggregations\":{\"composite_buckets\":{\"composite\":{\"size\":1000,\"sources\":[{\"name\":{\"terms\":{\"field\":\"name.keyword\",\"missing_bucket\":true,\"missing_order\":\"first\",\"order\":\"asc\"}}}]},\"aggregations\":{\"nested_count_area\":{\"nested\":{\"path\":\"address\"},\"aggregations\":{\"count_area\":{\"value_count\":{\"field\":\"address.area\"}}}},\"nested_min_area\":{\"nested\":{\"path\":\"address\"},\"aggregations\":{\"min_area\":{\"min\":{\"field\":\"address.area\"}}}},\"nested_max_area\":{\"nested\":{\"path\":\"address\"},\"aggregations\":{\"max_area\":{\"max\":{\"field\":\"address.area\"}}}},\"nested_avg_area\":{\"nested\":{\"path\":\"address\"},\"aggregations\":{\"avg_area\":{\"avg\":{\"field\":\"address.area\"}}}},\"avg_age\":{\"avg\":{\"field\":\"age\"}}}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])\n"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@
"format": "basic_date_time"
}
}
},
"area" : {
"type": "double"
}
}
},
Expand Down
8 changes: 4 additions & 4 deletions integ-test/src/test/resources/nested_simple.json
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
{"index":{"_id":"1"}}
{"name":"abbas","age":24,"address":[{"city":"New york city","state":"NY","moveInDate":{"dateAndTime":"19840412T090742.000Z"}},{"city":"bellevue","state":"WA","moveInDate":[{"dateAndTime":"20230503T080742.000Z"},{"dateAndTime":"20011111T040744.000Z"}]},{"city":"seattle","state":"WA","moveInDate":{"dateAndTime":"19660319T030455.000Z"}},{"city":"chicago","state":"IL","moveInDate":{"dateAndTime":"20110601T010142.000Z"}}]}
{"name":"abbas","age":24,"address":[{"city":"New york city","state":"NY","moveInDate":{"dateAndTime":"19840412T090742.000Z"},"area":300.13},{"city":"bellevue","state":"WA","moveInDate":[{"dateAndTime":"20230503T080742.000Z"},{"dateAndTime":"20011111T040744.000Z"}],"area":400.99},{"city":"seattle","state":"WA","moveInDate":{"dateAndTime":"19660319T030455.000Z"},"area":127.4},{"city":"chicago","state":"IL","moveInDate":{"dateAndTime":"20110601T010142.000Z"},"area":10.24}]}
{"index":{"_id":"2"}}
{"name":"chen","age":32,"address":[{"city":"Miami","state":"Florida","moveInDate":{"dateAndTime":"19010811T040333.000Z"}},{"city":"los angeles","state":"CA","moveInDate":{"dateAndTime":"20230503T080742.000Z"}}]}
{"name":"chen","age":32,"address":[{"city":"Miami","state":"Florida","moveInDate":{"dateAndTime":"19010811T040333.000Z"},"area":1000.99},{"city":"los angeles","state":"CA","moveInDate":{"dateAndTime":"20230503T080742.000Z"},"area":9.99}]}
{"index":{"_id":"3"}}
{"name":"peng","age":26,"address":[{"city":"san diego","state":"CA","moveInDate":{"dateAndTime":"20011111T040744.000Z"}},{"city":"austin","state":"TX","moveInDate":{"dateAndTime":"19770713T090441.000Z"}}]}
{"name":"peng","age":26,"address":[{"city":"san diego","state":"CA","moveInDate":{"dateAndTime":"20011111T040744.000Z"},"area":231.01},{"city":"austin","state":"TX","moveInDate":{"dateAndTime":"19770713T090441.000Z"},"area":429.79}]}
{"index":{"_id":"4"}}
{"name":"andy","age":19,"id":4,"address":[{"city":"houston","state":"TX","moveInDate":{"dateAndTime":"19331212T050545.000Z"}}]}
{"index":{"_id":"5"}}
{"name":"david","age":25,"address":[{"city":"raleigh","state":"NC","moveInDate":{"dateAndTime":"19090617T010421.000Z"}},{"city":"charlotte","state":"SC","moveInDate":[{"dateAndTime":"20011111T040744.000Z"}]}]}
{"name":"david","age":25,"address":[{"city":"raleigh","state":"NC","moveInDate":{"dateAndTime":"19090617T010421.000Z"},"area":190.5},{"city":"charlotte","state":"SC","moveInDate":[{"dateAndTime":"20011111T040744.000Z"}]}]}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
package org.opensearch.sql.opensearch.request;

import static java.util.Objects.requireNonNull;
import static org.opensearch.sql.data.type.ExprCoreType.ARRAY;
import static org.opensearch.sql.data.type.ExprCoreType.DATE;
import static org.opensearch.sql.data.type.ExprCoreType.TIME;
import static org.opensearch.sql.data.type.ExprCoreType.TIMESTAMP;
Expand All @@ -41,6 +42,7 @@
import org.apache.calcite.rel.core.Aggregate;
import org.apache.calcite.rel.core.AggregateCall;
import org.apache.calcite.sql.SqlKind;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.opensearch.search.aggregations.AggregationBuilder;
import org.opensearch.search.aggregations.AggregationBuilders;
Expand Down Expand Up @@ -118,7 +120,11 @@ public static Pair<List<AggregationBuilder>, OpenSearchAggregationResponseParser
// Process all aggregate calls
Pair<Builder, List<MetricParser>> builderAndParser =
processAggregateCalls(
groupList.size(), aggregate.getAggCallList(), fieldExpressionCreator, outputFields);
groupList.size(),
aggregate.getAggCallList(),
fieldExpressionCreator,
outputFields,
fieldTypes);
Builder metricBuilder = builderAndParser.getLeft();
List<MetricParser> metricParserList = builderAndParser.getRight();

Expand Down Expand Up @@ -146,7 +152,8 @@ private static Pair<Builder, List<MetricParser>> processAggregateCalls(
int groupOffset,
List<AggregateCall> aggCalls,
FieldExpressionCreator fieldExpressionCreator,
List<String> outputFields) {
List<String> outputFields,
Map<String, ExprType> fieldTypes) {
assert aggCalls.size() + groupOffset == outputFields.size()
: "groups size and agg calls size should match with output fields";
Builder metricBuilder = new AggregatorFactories.Builder();
Expand All @@ -164,7 +171,17 @@ private static Pair<Builder, List<MetricParser>> processAggregateCalls(

Pair<ValuesSourceAggregationBuilder<?>, MetricParser> builderAndParser =
createAggregationBuilderAndParser(aggCall, argStr, aggField);
metricBuilder.addAggregator(builderAndParser.getLeft());
// Nested aggregation (https://docs.opensearch.org/docs/latest/aggregations/bucket/nested/)
// works as expected only when pushdown is triggerred. If aggregates a nested field without
// pushdown, the result could be incorrect. TODO fix it later.
String rootStr = StringUtils.substringBefore(argStr, ".");
if (fieldTypes.get(rootStr) != null && fieldTypes.get(rootStr) == ARRAY) {
metricBuilder.addAggregator(
AggregationBuilders.nested(String.format("nested_%s", aggCall.getName()), rootStr)
.subAggregation(builderAndParser.getLeft()));
} else {
metricBuilder.addAggregator(builderAndParser.getLeft());
}
metricParserList.add(builderAndParser.getRight());
}
return Pair.of(metricBuilder, metricParserList);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import lombok.RequiredArgsConstructor;
import org.opensearch.search.aggregations.Aggregation;
import org.opensearch.search.aggregations.Aggregations;
import org.opensearch.search.aggregations.bucket.nested.Nested;
import org.opensearch.sql.common.utils.StringUtils;

/** Parse multiple metrics in one bucket. */
Expand All @@ -46,6 +47,9 @@ public MetricParserHelper(List<MetricParser> metricParserList) {
public Map<String, Object> parse(Aggregations aggregations) {
Map<String, Object> resultMap = new HashMap<>();
for (Aggregation aggregation : aggregations) {
if (aggregation instanceof Nested) {
aggregation = ((Nested) aggregation).getAggregations().asList().getFirst();
}
if (metricParserMap.containsKey(aggregation.getName())) {
resultMap.putAll(metricParserMap.get(aggregation.getName()).parse(aggregation));
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import org.opensearch.search.aggregations.bucket.histogram.HistogramAggregationBuilder;
import org.opensearch.search.aggregations.bucket.histogram.ParsedDateHistogram;
import org.opensearch.search.aggregations.bucket.histogram.ParsedHistogram;
import org.opensearch.search.aggregations.bucket.nested.NestedAggregationBuilder;
import org.opensearch.search.aggregations.bucket.nested.ParsedNested;
import org.opensearch.search.aggregations.bucket.terms.DoubleTerms;
import org.opensearch.search.aggregations.bucket.terms.LongTerms;
import org.opensearch.search.aggregations.bucket.terms.ParsedDoubleTerms;
Expand Down Expand Up @@ -87,6 +89,8 @@ public class AggregationResponseUtils {
.put(
TopHitsAggregationBuilder.NAME,
(p, c) -> ParsedTopHits.fromXContent(p, (String) c))
.put(
NestedAggregationBuilder.NAME, (p, c) -> ParsedNested.fromXContent(p, (String) c))
.build()
.entrySet()
.stream()
Expand Down
Loading