Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions docs/user/ppl/cmd/stats.rst
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,10 @@ Description

Usage: MAX(expr). Returns the maximum value of expr.

For non-numeric fields, values are sorted lexicographically.

Note: Non-numeric field support requires Calcite to be enabled (see `Configuration`_ section above). Available since version 3.3.0.

Example::

os> source=accounts | stats max(age);
Expand All @@ -192,6 +196,16 @@ Example::
| 36 |
+----------+

Example with text field::

os> source=accounts | stats max(firstname);
fetched rows / total rows = 1/1
+----------------+
| max(firstname) |
|----------------|
| Nanette |
+----------------+

MIN
---

Expand All @@ -200,6 +214,10 @@ Description

Usage: MIN(expr). Returns the minimum value of expr.

For non-numeric fields, values are sorted lexicographically.

Note: Non-numeric field support requires Calcite to be enabled (see `Configuration`_ section above). Available since version 3.3.0.

Example::

os> source=accounts | stats min(age);
Expand All @@ -210,6 +228,16 @@ Example::
| 28 |
+----------+

Example with text field::

os> source=accounts | stats min(firstname);
fetched rows / total rows = 1/1
+----------------+
| min(firstname) |
|----------------|
| Amber |
+----------------+

VAR_SAMP
--------

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -609,6 +609,22 @@ public void testPushdownLimitIntoAggregation() throws IOException {
+ " head 100 | head 10 from 10 "));
}

@Test
public void testExplainMaxOnStringField() throws IOException {
String expected = loadExpectedPlan("explain_max_string_field.json");
assertJsonEqualsIgnoreId(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we can use assertYamlEqualsJsonIgnoreId (refer #4274) Calcite plan readability.

expected,
explainQueryToString("source=opensearch-sql_test_index_account | stats max(firstname)"));
}

@Test
public void testExplainMinOnStringField() throws IOException {
String expected = loadExpectedPlan("explain_min_string_field.json");
assertJsonEqualsIgnoreId(
expected,
explainQueryToString("source=opensearch-sql_test_index_account | stats min(firstname)"));
}

@Test
public void testExplainSortOnMetricsNoBucketNullable() throws IOException {
// TODO enhancement later: https://github.com/opensearch-project/sql/issues/4282
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1176,4 +1176,20 @@ public void testMedian() throws IOException {
verifySchema(actual, schema("median(balance)", "bigint"));
verifyDataRows(actual, rows(32838));
}

@Test
public void testStatsMaxOnStringField() throws IOException {
JSONObject actual =
executeQuery(String.format("source=%s | stats max(firstname)", TEST_INDEX_BANK));
verifySchema(actual, schema("max(firstname)", "string"));
verifyDataRows(actual, rows("Virginia"));
}

@Test
public void testStatsMinOnStringField() throws IOException {
JSONObject actual =
executeQuery(String.format("source=%s | stats min(firstname)", TEST_INDEX_BANK));
verifySchema(actual, schema("min(firstname)", "string"));
verifyDataRows(actual, rows("Amber JOHnny"));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"calcite": {
"logical": "LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalAggregate(group=[{}], max(firstname)=[MAX($0)])\n LogicalProject(firstname=[$1])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])\n",
"physical": "EnumerableLimit(fetch=[10000])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={},max(firstname)=MAX($0))], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"size\":0,\"timeout\":\"1m\",\"aggregations\":{\"max(firstname)\":{\"top_hits\":{\"from\":0,\"size\":1,\"version\":false,\"seq_no_primary_term\":false,\"explain\":false,\"_source\":{\"includes\":[\"firstname\"],\"excludes\":[]},\"sort\":[{\"firstname.keyword\":{\"order\":\"desc\"}}]}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])\n"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"calcite": {
"logical": "LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalAggregate(group=[{}], min(firstname)=[MIN($0)])\n LogicalProject(firstname=[$1])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])\n",
"physical": "EnumerableLimit(fetch=[10000])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={},min(firstname)=MIN($0))], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"size\":0,\"timeout\":\"1m\",\"aggregations\":{\"min(firstname)\":{\"top_hits\":{\"from\":0,\"size\":1,\"version\":false,\"seq_no_primary_term\":false,\"explain\":false,\"_source\":{\"includes\":[\"firstname\"],\"excludes\":[]},\"sort\":[{\"firstname.keyword\":{\"order\":\"asc\"}}]}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])\n"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"calcite": {
"logical": "LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalAggregate(group=[{}], max(firstname)=[MAX($0)])\n LogicalProject(firstname=[$1])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])\n",
"physical": "EnumerableLimit(fetch=[10000])\n EnumerableAggregate(group=[{}], max(firstname)=[MAX($1)])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])\n"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"calcite": {
"logical": "LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalAggregate(group=[{}], min(firstname)=[MIN($0)])\n LogicalProject(firstname=[$1])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])\n",
"physical": "EnumerableLimit(fetch=[10000])\n EnumerableAggregate(group=[{}], min(firstname)=[MIN($1)])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])\n"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,10 @@
import org.opensearch.sql.ast.expression.Argument;
import org.opensearch.sql.ast.expression.SpanUnit;
import org.opensearch.sql.calcite.utils.OpenSearchTypeFactory;
import org.opensearch.sql.data.type.ExprCoreType;
import org.opensearch.sql.data.type.ExprType;
import org.opensearch.sql.expression.function.BuiltinFunctionName;
import org.opensearch.sql.opensearch.data.type.OpenSearchDataType;
import org.opensearch.sql.opensearch.request.PredicateAnalyzer.NamedFieldExpression;
import org.opensearch.sql.opensearch.response.agg.ArgMaxMinParser;
import org.opensearch.sql.opensearch.response.agg.BucketAggregationParser;
Expand Down Expand Up @@ -298,12 +300,46 @@ private static Pair<AggregationBuilder, MetricParser> createRegularAggregation(
helper.build(
!args.isEmpty() ? args.getFirst() : null, AggregationBuilders.count(aggFieldName)),
new SingleValueParser(aggFieldName));
case MIN -> Pair.of(
helper.build(args.getFirst(), AggregationBuilders.min(aggFieldName)),
new SingleValueParser(aggFieldName));
case MAX -> Pair.of(
helper.build(args.getFirst(), AggregationBuilders.max(aggFieldName)),
new SingleValueParser(aggFieldName));
case MIN -> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MIN and MAX logic seems same other than AggregationBuilders.min/max and ASC/DESC. Can we extract as a method for maintainability?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup makes sense will add this and using assertYamlEqualsJsonIgnoreId in a follow up pr

String fieldName = helper.inferNamedField(args.getFirst()).getRootName();
ExprType fieldType = helper.fieldTypes.get(fieldName);

if (supportsMaxMinAggregation(fieldType)) {
yield Pair.of(
helper.build(args.getFirst(), AggregationBuilders.min(aggFieldName)),
new SingleValueParser(aggFieldName));
} else {
yield Pair.of(
AggregationBuilders.topHits(aggFieldName)
.fetchSource(helper.inferNamedField(args.getFirst()).getRootName(), null)
.size(1)
.from(0)
.sort(
helper.inferNamedField(args.getFirst()).getReferenceForTermQuery(),
SortOrder.ASC),
new TopHitsParser(aggFieldName, true));
}
}
case MAX -> {
String fieldName = helper.inferNamedField(args.getFirst()).getRootName();
ExprType fieldType = helper.fieldTypes.get(fieldName);

if (supportsMaxMinAggregation(fieldType)) {
yield Pair.of(
helper.build(args.getFirst(), AggregationBuilders.max(aggFieldName)),
new SingleValueParser(aggFieldName));
} else {
yield Pair.of(
AggregationBuilders.topHits(aggFieldName)
.fetchSource(helper.inferNamedField(args.getFirst()).getRootName(), null)
.size(1)
.from(0)
.sort(
helper.inferNamedField(args.getFirst()).getReferenceForTermQuery(),
SortOrder.DESC),
new TopHitsParser(aggFieldName, true));
}
}
case VAR_SAMP -> Pair.of(
helper.build(args.getFirst(), AggregationBuilders.extendedStats(aggFieldName)),
new StatsParser(ExtendedStats::getVarianceSampling, aggFieldName));
Expand Down Expand Up @@ -383,6 +419,18 @@ yield switch (functionName) {
};
}

private static boolean supportsMaxMinAggregation(ExprType fieldType) {
ExprType coreType =
(fieldType instanceof OpenSearchDataType)
? ((OpenSearchDataType) fieldType).getExprType()
: fieldType;

return ExprCoreType.numberTypes().contains(coreType)
|| coreType == ExprCoreType.DATE
|| coreType == ExprCoreType.TIME
|| coreType == ExprCoreType.TIMESTAMP;
}

private static ValuesSourceAggregationBuilder<?> createBucketAggregation(
Integer group, Project project, AggregateAnalyzer.AggregateBuilderHelper helper) {
return createBucket(group, project, helper);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -756,4 +756,76 @@ public void testMedian() {
"SELECT `percentile_approx`(`SAL`, 50.0, DECIMAL) `median(SAL)`\n" + "FROM `scott`.`EMP`";
verifyPPLToSparkSQL(root, expectedSparkSql);
}

@Test
public void testMaxOnStringField() {
String ppl = "source=EMP | stats max(ENAME) as max_name";
RelNode root = getRelNode(ppl);

String expectedLogical =
"LogicalAggregate(group=[{}], max_name=[MAX($0)])\n"
+ " LogicalProject(ENAME=[$1])\n"
+ " LogicalTableScan(table=[[scott, EMP]])\n";
verifyLogical(root, expectedLogical);

String expectedResult = "max_name=WARD\n";
verifyResult(root, expectedResult);

String expectedSparkSql = "SELECT MAX(`ENAME`) `max_name`\nFROM `scott`.`EMP`";
verifyPPLToSparkSQL(root, expectedSparkSql);
}

@Test
public void testMinOnStringField() {
String ppl = "source=EMP | stats min(ENAME) as min_name";
RelNode root = getRelNode(ppl);

String expectedLogical =
"LogicalAggregate(group=[{}], min_name=[MIN($0)])\n"
+ " LogicalProject(ENAME=[$1])\n"
+ " LogicalTableScan(table=[[scott, EMP]])\n";
verifyLogical(root, expectedLogical);

String expectedResult = "min_name=ADAMS\n";
verifyResult(root, expectedResult);

String expectedSparkSql = "SELECT MIN(`ENAME`) `min_name`\nFROM `scott`.`EMP`";
verifyPPLToSparkSQL(root, expectedSparkSql);
}

@Test
public void testMaxOnTimeField() {
String ppl = "source=EMP | stats max(HIREDATE) as max_hire_date";
RelNode root = getRelNode(ppl);

String expectedLogical =
"LogicalAggregate(group=[{}], max_hire_date=[MAX($0)])\n"
+ " LogicalProject(HIREDATE=[$4])\n"
+ " LogicalTableScan(table=[[scott, EMP]])\n";
verifyLogical(root, expectedLogical);

String expectedResult = "max_hire_date=1987-05-23\n";
verifyResult(root, expectedResult);

String expectedSparkSql = "SELECT MAX(`HIREDATE`) `max_hire_date`\nFROM `scott`.`EMP`";
verifyPPLToSparkSQL(root, expectedSparkSql);
}

@Test
public void testMinOnTimeField() {
String ppl = "source=EMP | stats min(HIREDATE) as min_hire_date";
RelNode root = getRelNode(ppl);

String expectedLogical =
"LogicalAggregate(group=[{}], min_hire_date=[MIN($0)])\n"
+ " LogicalProject(HIREDATE=[$4])\n"
+ " LogicalTableScan(table=[[scott, EMP]])\n";
verifyLogical(root, expectedLogical);

String expectedResult = "min_hire_date=1980-12-17\n";
verifyResult(root, expectedResult);

String expectedSparkSql = "SELECT MIN(`HIREDATE`) `min_hire_date`\nFROM `scott`.`EMP`";
verifyPPLToSparkSQL(root, expectedSparkSql);
}
}
Loading