Skip to content

Commit

Permalink
Support Eventstats in PPL (#800)
Browse files Browse the repository at this point in the history
* Support Eventstats in PPL

Signed-off-by: Lantao Jin <ltjin@amazon.com>

* add doc

Signed-off-by: Lantao Jin <ltjin@amazon.com>

---------

Signed-off-by: Lantao Jin <ltjin@amazon.com>
Co-authored-by: YANGDB <yang.db.dev@gmail.com>
  • Loading branch information
LantaoJin and YANG-DB authored Oct 25, 2024
1 parent 2a647d4 commit 7bc0927
Show file tree
Hide file tree
Showing 12 changed files with 1,107 additions and 9 deletions.
28 changes: 28 additions & 0 deletions docs/ppl-lang/PPL-Example-Commands.md
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,34 @@ source = table | where ispresent(a) |
- `source = table | stats avg(age) as avg_state_age by country, state | stats avg(avg_state_age) as avg_country_age by country`
- `source = table | stats avg(age) as avg_city_age by country, state, city | eval new_avg_city_age = avg_city_age - 1 | stats avg(new_avg_city_age) as avg_state_age by country, state | where avg_state_age > 18 | stats avg(avg_state_age) as avg_adult_country_age by country`

#### **Event Aggregations**
[See additional command details](ppl-eventstats-command.md)

- `source = table | eventstats avg(a) `
- `source = table | where a < 50 | eventstats avg(c) `
- `source = table | eventstats max(c) by b`
- `source = table | eventstats count(c) by b | head 5`
- `source = table | eventstats stddev_samp(c)`
- `source = table | eventstats stddev_pop(c)`
- `source = table | eventstats percentile(c, 90)`
- `source = table | eventstats percentile_approx(c, 99)`

**Limitation: distinct aggregation could not used in `eventstats`:**_
- `source = table | eventstats distinct_count(c)` (throw exception)

**Aggregations With Span**
- `source = table | eventstats count(a) by span(a, 10) as a_span`
- `source = table | eventstats sum(age) by span(age, 5) as age_span | head 2`
- `source = table | eventstats avg(age) by span(age, 20) as age_span, country | sort - age_span | head 2`

**Aggregations With TimeWindow Span (tumble windowing function)**
- `source = table | eventstats sum(productsAmount) by span(transactionDate, 1d) as age_date | sort age_date`
- `source = table | eventstats sum(productsAmount) by span(transactionDate, 1w) as age_date, productId`

**Aggregations Group by Multiple Times**
- `source = table | eventstats avg(age) as avg_state_age by country, state | eventstats avg(avg_state_age) as avg_country_age by country`
- `source = table | eventstats avg(age) as avg_city_age by country, state, city | eval new_avg_city_age = avg_city_age - 1 | eventstats avg(new_avg_city_age) as avg_state_age by country, state | where avg_state_age > 18 | eventstats avg(avg_state_age) as avg_adult_country_age by country`

#### **Dedup**

[See additional command details](ppl-dedup-command.md)
Expand Down
2 changes: 2 additions & 0 deletions docs/ppl-lang/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ For additional examples see the next [documentation](PPL-Example-Commands.md).

- [`stats command`](ppl-stats-command.md)

- [`eventstats command`](ppl-eventstats-command.md)

- [`where command`](ppl-where-command.md)

- [`head command`](ppl-head-command.md)
Expand Down
327 changes: 327 additions & 0 deletions docs/ppl-lang/ppl-eventstats-command.md

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ WHERE: 'WHERE';
FIELDS: 'FIELDS';
RENAME: 'RENAME';
STATS: 'STATS';
EVENTSTATS: 'EVENTSTATS';
DEDUP: 'DEDUP';
SORT: 'SORT';
EVAL: 'EVAL';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ renameCommand
;

statsCommand
: STATS (PARTITIONS EQUAL partitions = integerLiteral)? (ALLNUM EQUAL allnum = booleanLiteral)? (DELIM EQUAL delim = stringLiteral)? statsAggTerm (COMMA statsAggTerm)* (statsByClause)? (DEDUP_SPLITVALUES EQUAL dedupsplit = booleanLiteral)?
: (STATS | EVENTSTATS) (PARTITIONS EQUAL partitions = integerLiteral)? (ALLNUM EQUAL allnum = booleanLiteral)? (DELIM EQUAL delim = stringLiteral)? statsAggTerm (COMMA statsAggTerm)* (statsByClause)? (DEDUP_SPLITVALUES EQUAL dedupsplit = booleanLiteral)?
;

dedupCommand
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,4 +318,8 @@ public T visitScalarSubquery(ScalarSubquery node, C context) {
public T visitExistsSubquery(ExistsSubquery node, C context) {
return visitChildren(node, context);
}

public T visitWindow(Window node, C context) {
return visitChildren(node, context);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.ast.tree;

import com.google.common.collect.ImmutableList;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
import lombok.ToString;
import org.opensearch.sql.ast.AbstractNodeVisitor;
import org.opensearch.sql.ast.expression.UnresolvedExpression;

import java.util.List;

@Getter
@ToString
@EqualsAndHashCode(callSuper = false)
@RequiredArgsConstructor
public class Window extends UnresolvedPlan {
private final List<UnresolvedExpression> windowFunctionList;
private final List<UnresolvedExpression> partExprList;
private final List<UnresolvedExpression> sortExprList;
@Setter private UnresolvedExpression span;
private UnresolvedPlan child;

@Override
public UnresolvedPlan attach(UnresolvedPlan child) {
this.child = child;
return this;
}

@Override
public List<UnresolvedPlan> getChild() {
return ImmutableList.of(this.child);
}

@Override
public <T, C> T accept(AbstractNodeVisitor<T, C> nodeVisitor, C context) {
return nodeVisitor.visitWindow(this, context);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,15 @@
import org.opensearch.sql.ast.tree.SubqueryAlias;
import org.opensearch.sql.ast.tree.TopAggregation;
import org.opensearch.sql.ast.tree.UnresolvedPlan;
import org.opensearch.sql.ast.tree.Window;
import org.opensearch.sql.common.antlr.SyntaxCheckException;
import org.opensearch.sql.ppl.utils.AggregatorTranslator;
import org.opensearch.sql.ppl.utils.BuiltinFunctionTranslator;
import org.opensearch.sql.ppl.utils.ComparatorTransformer;
import org.opensearch.sql.ppl.utils.FieldSummaryTransformer;
import org.opensearch.sql.ppl.utils.ParseStrategy;
import org.opensearch.sql.ppl.utils.SortUtils;
import org.opensearch.sql.ppl.utils.WindowSpecTransformer;
import scala.Option;
import scala.Tuple2;
import scala.collection.IterableLike;
Expand Down Expand Up @@ -117,6 +119,7 @@
import static org.opensearch.sql.ppl.utils.RelationUtils.getTableIdentifier;
import static org.opensearch.sql.ppl.utils.RelationUtils.resolveField;
import static org.opensearch.sql.ppl.utils.WindowSpecTransformer.window;
import static scala.collection.JavaConverters.seqAsJavaList;

/**
* Utility class to traverse PPL logical plan and translate it into catalyst logical plan
Expand Down Expand Up @@ -328,6 +331,30 @@ private static LogicalPlan extractedAggregation(CatalystPlanContext context) {
return context.apply(p -> new Aggregate(groupingExpression, aggregateExpressions, p));
}

@Override
public LogicalPlan visitWindow(Window node, CatalystPlanContext context) {
node.getChild().get(0).accept(this, context);
List<Expression> windowFunctionExpList = visitExpressionList(node.getWindowFunctionList(), context);
Seq<Expression> windowFunctionExpressions = context.retainAllNamedParseExpressions(p -> p);
List<Expression> partitionExpList = visitExpressionList(node.getPartExprList(), context);
UnresolvedExpression span = node.getSpan();
if (!Objects.isNull(span)) {
visitExpression(span, context);
}
Seq<Expression> partitionSpec = context.retainAllNamedParseExpressions(p -> p);
Seq<SortOrder> orderSpec = seq(new ArrayList<SortOrder>());
Seq<NamedExpression> aggregatorFunctions = seq(
seqAsJavaList(windowFunctionExpressions).stream()
.map(w -> WindowSpecTransformer.buildAggregateWindowFunction(w, partitionSpec, orderSpec))
.collect(Collectors.toList()));
return context.apply(p ->
new org.apache.spark.sql.catalyst.plans.logical.Window(
aggregatorFunctions,
partitionSpec,
orderSpec,
p));
}

@Override
public LogicalPlan visitAlias(Alias node, CatalystPlanContext context) {
expressionAnalyzer.visitAlias(node, context);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,14 +270,24 @@ public UnresolvedPlan visitStatsCommand(OpenSearchPPLParser.StatsCommandContext
.map(this::internalVisitExpression)
.orElse(null);

Aggregation aggregation =
new Aggregation(
aggListBuilder.build(),
emptyList(),
groupList,
span,
ArgumentFactory.getArgumentList(ctx));
return aggregation;
if (ctx.STATS() != null) {
Aggregation aggregation =
new Aggregation(
aggListBuilder.build(),
emptyList(),
groupList,
span,
ArgumentFactory.getArgumentList(ctx));
return aggregation;
} else {
Window window =
new Window(
aggListBuilder.build(),
groupList,
emptyList());
window.setSpan(span);
return window;
}
}

/** Dedup command. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.sql.ppl.utils;

import org.apache.spark.sql.catalyst.expressions.Alias;
import org.apache.spark.sql.catalyst.expressions.CurrentRow$;
import org.apache.spark.sql.catalyst.expressions.Divide;
import org.apache.spark.sql.catalyst.expressions.Expression;
Expand All @@ -16,6 +17,7 @@
import org.apache.spark.sql.catalyst.expressions.SortOrder;
import org.apache.spark.sql.catalyst.expressions.SpecifiedWindowFrame;
import org.apache.spark.sql.catalyst.expressions.TimeWindow;
import org.apache.spark.sql.catalyst.expressions.UnboundedFollowing$;
import org.apache.spark.sql.catalyst.expressions.UnboundedPreceding$;
import org.apache.spark.sql.catalyst.expressions.WindowExpression;
import org.apache.spark.sql.catalyst.expressions.WindowSpecDefinition;
Expand Down Expand Up @@ -79,4 +81,21 @@ static NamedExpression buildRowNumber(Seq<Expression> partitionSpec, Seq<SortOrd
Option.empty(),
seq(new ArrayList<String>()));
}

static NamedExpression buildAggregateWindowFunction(Expression aggregator, Seq<Expression> partitionSpec, Seq<SortOrder> orderSpec) {
Alias aggregatorAlias = (Alias) aggregator;
WindowExpression aggWindowExpression = new WindowExpression(
aggregatorAlias.child(),
new WindowSpecDefinition(
partitionSpec,
orderSpec,
new SpecifiedWindowFrame(RowFrame$.MODULE$, UnboundedPreceding$.MODULE$, UnboundedFollowing$.MODULE$)));
return org.apache.spark.sql.catalyst.expressions.Alias$.MODULE$.apply(
aggWindowExpression,
aggregatorAlias.name(),
NamedExpression.newExprId(),
seq(new ArrayList<String>()),
Option.empty(),
seq(new ArrayList<String>()));
}
}
Loading

0 comments on commit 7bc0927

Please sign in to comment.