Skip to content

Conversation

ishaoxy
Copy link
Contributor

@ishaoxy ishaoxy commented Sep 15, 2025

Description

Support Streamstats command with arguments below:

streamstats
  [current=<bool>]
  [window=<int>]
  [global=<bool>]
  [reset_before="("<eval-expression>")"]
  [reset_after="("<eval-expression>")"]
  stats-agg-term
  [by-clause]

Implementation Details

The implementation handles three distinct execution paths, depending on the combination of window, global, group, and reset arguments:

Why This Design

  1. Default path can rely on native SQL OVER because there is no global/window-with-reset complexity.

  2. Specific SQL limitations:

  • Native SQL OVER clauses cannot implement per-group sliding windows over the entire stream . However, we want to combine a global sequence with group-level partitioning. In SQL, a window is either global without a BY clause or partitioned by a group with a BY clause; you cannot have a “global sequence plus per-group sliding frame” in one OVER.

  • ROWS BETWEEN ... PRECEDING cannot take a variable (it only supports constants like 1 PRECEDING, 1+1 PRECEDING).

  1. Global + window + group want "per-group sliding windows over entire stream," but SQL window functions do not allow fully flexible frame boundaries combined with lateral joins. Hence, we simulate it via ROW_NUMBER() + correlated join + aggregate.

  2. Reset path introduces segment semantics (seg_id) that cannot be represented natively in SQL OVER clauses. Each reset creates a new frame partition. By default, reset behaves like a global window, but when grouping exists, it applies per-group aggregation within each reset segment. So I use helper columns (before_flag, after_flag, seg_id) and a correlated join ensures correctness.

1. Default Path (No global in use / no reset)

  • Window functions are translated directly using visitWindowFunction().
  • Calcite OVER clauses are generated for each aggregate.
  • SQL-like plan:
SELECT *, <window_function_over(...)>
FROM source

2. global=true + window > 0 + group exists

To support sliding windows over the entire stream with optional grouping:

  • A global sequence column (ROW_NUMBER() OVER (ORDER BY ...) AS seq) is added.
  • Correlated LEFT JOINs simulate the sliding window using seq and by-clause equality filters.
  • Each window function is converted into a standard aggregate (AggregateFunction) and executed within the correlated subquery.
  • SQL-like plan:
WITH t AS (
  SELECT x.*,
         ROW_NUMBER() OVER (ORDER BY /* default ordering */) AS seq
  FROM source x
)
SELECT t.*, agg.*
FROM t
LEFT JOIN LATERAL (
  SELECT SUM(age) AS sum_age
  FROM t r
  WHERE r.seq BETWEEN t.seq - (:window - 1) AND t.seq
    AND r.gender IS NOT DISTINCT FROM t.gender
) AS agg ON TRUE;

3. Reset Path (reset_before / reset_after defined)

When reset_before or reset_after exist:

  • Helper columns are added:
  1. stream_seq: global row number.
  2. reset_before_flag / reset_after_flag: flags for reset conditions.
  3. seg_id: segment ID, computed via SUM over flags to identify partitions.
  • Correlated LEFT JOIN + aggregate simulates the frame while respecting segment boundaries (seg_id) and optional group filtering.
  • SQL-like plan:
WITH base AS (
  SELECT s.*,
         ROW_NUMBER() OVER (ORDER BY /* default */) AS seq,
         CASE WHEN (/reset_before predicate/) THEN 1 ELSE 0 END AS before_flag,
         CASE WHEN (/reset_after  predicate/) THEN 1 ELSE 0 END AS after_flag
  FROM source s
),
seg AS (
  SELECT b.*,
         COALESCE(SUM(before_flag) OVER (ORDER BY seq ROWS UNBOUNDED PRECEDING), 0)
       + COALESCE(SUM(after_flag)  OVER (ORDER BY seq ROWS UNBOUNDED PRECEDING
                                        RANGE BETWEEN UNBOUNDED PRECEDING AND 1 PRECEDING), 0)
       AS seg_id
  FROM base b
)
SELECT t.*, agg.*
FROM seg t
LEFT JOIN LATERAL (
  SELECT /* window aggregates: SUM(age), AVG(salary), ... */
  FROM seg r
  WHERE (
    CASE
      WHEN :window = 0 AND :current  THEN r.seq <= t.seq
      WHEN :window = 0 AND NOT :current THEN r.seq <  t.seq
      WHEN :current  THEN r.seq BETWEEN t.seq - (:window - 1) AND t.seq
      ELSE               r.seq BETWEEN t.seq - :window AND t.seq - 1
    END
  )
  AND r.seg_id = t.seg_id
  AND (r.gender IS NOT DISTINCT FROM t.gender) -- optional by-clause
) AS agg ON TRUE;

Related Issues

Resolves #4207

Check List

  • New functionality includes testing.
  • New functionality has been documented.
  • New functionality has javadoc added.
  • New functionality has a user manual doc added.
  • New PPL command checklist all confirmed.
  • API changes companion pull request created.
  • Commits are signed per the DCO using --signoff or -s.
  • Public documentation issue/PR created.

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

Signed-off-by: Xinyu Hao <haoxinyu@amazon.com>
Signed-off-by: Xinyu Hao <haoxinyu@amazon.com>
Copy link
Collaborator

@yuancu yuancu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me. Thanks for your contribution. I left some comments for minor suggestions.

Also, please resolve the conflicts

Signed-off-by: Xinyu Hao <haoxinyu@amazon.com>
Signed-off-by: Xinyu Hao <haoxinyu@amazon.com>
Signed-off-by: Xinyu Hao <haoxinyu@amazon.com>
@LantaoJin LantaoJin added feature PPL Piped processing language labels Sep 18, 2025
Signed-off-by: Xinyu Hao <haoxinyu@amazon.com>
@ishaoxy ishaoxy requested a review from RyanL1997 as a code owner September 19, 2025 09:42
Signed-off-by: Xinyu Hao <haoxinyu@amazon.com>
Signed-off-by: Xinyu Hao <haoxinyu@amazon.com>
Signed-off-by: Xinyu Hao <haoxinyu@amazon.com>
Signed-off-by: Xinyu Hao <haoxinyu@amazon.com>
Signed-off-by: Xinyu Hao <haoxinyu@amazon.com>
Signed-off-by: Xinyu Hao <haoxinyu@amazon.com>
Signed-off-by: Xinyu Hao <haoxinyu@amazon.com>
Signed-off-by: Xinyu Hao <haoxinyu@amazon.com>
Copy link
Member

@LantaoJin LantaoJin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

haven't review the code details, just need to refactor the user doc and confirm the behaviour

Copy link
Member

@LantaoJin LantaoJin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add some testinCalciteExplainIT.java`


- `stats command <cmd/stats.rst>`_

- `streamstats command <cmd/streamstats.rst>`_
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add streamstats.rst to category.json as well

Comment on lines +116 to +118
GLOBAL: 'GLOBAL';
RESET_BEFORE: 'RESET_BEFORE';
RESET_AFTER: 'RESET_AFTER';
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these three seems missing in keywordsCanBeId

import org.opensearch.client.Request;
import org.opensearch.sql.ppl.PPLIntegTestCase;

public class CalciteStreamstatsCommandIT extends PPLIntegTestCase {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add some tests with other command
eventstats + streamstats:

source = t | eventstats ... | streamstats ...

sort + streamstats:

source = t | sort ... | streamstats ...

left join with streamstats:

source = t | left join left=l right=r on l.key=r.key [ source = tt | streamstats ... ] 

streamstats in in-subsearch:

source = t | where a in [ source = tt | streamstats ...]

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

feature PPL Piped processing language

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[RFC] Implement streamstats command in PPL

3 participants