diff --git a/be/src/exprs/agg/aggregate.h b/be/src/exprs/agg/aggregate.h index c7c9e7cb913fb..7fa732a6469a8 100644 --- a/be/src/exprs/agg/aggregate.h +++ b/be/src/exprs/agg/aggregate.h @@ -195,7 +195,10 @@ class AggregateFunction { int64_t partition_start, int64_t partition_end, int64_t rows_start_offset, int64_t rows_end_offset, bool ignore_subtraction, bool ignore_addition, - [[maybe_unused]] bool has_null) const {} + [[maybe_unused]] bool has_null) const { + // can't invoke this function + DCHECK(0); + } // Contains a loop with calls to "merge" function. // You can collect arguments into array "states" diff --git a/be/src/exprs/agg/maxmin.h b/be/src/exprs/agg/maxmin.h index 7cf5c749176df..c21ede7a08e4c 100644 --- a/be/src/exprs/agg/maxmin.h +++ b/be/src/exprs/agg/maxmin.h @@ -268,6 +268,44 @@ class MaxMinAggregateFunction, StringLTGuard(*columns[0]); + + const int64_t previous_frame_first_position = current_row_position - 1 + rows_start_offset; + int64_t current_frame_last_position = current_row_position + rows_end_offset; + if (!ignore_subtraction && previous_frame_first_position >= partition_start && + previous_frame_first_position < partition_end) { + if (OP::equals(this->data(state), column.get_data()[previous_frame_first_position])) { + current_frame_last_position = std::min(current_frame_last_position, partition_end - 1); + this->data(state).reset(); + int64_t frame_start = previous_frame_first_position + 1; + int64_t frame_end = current_frame_last_position + 1; + if (has_null) { + const auto null_column = down_cast(columns[1]); + const uint8_t* f_data = null_column->raw_data(); + for (size_t i = frame_start; i < frame_end; ++i) { + if (f_data[i] == 0) { + update(ctx, columns, state, i); + } + } + } else { + update_batch_single_state_with_frame(ctx, state, columns, partition_start, partition_end, + frame_start, frame_end); + } + return; + } + } + + if (!ignore_addition && current_frame_last_position >= partition_start && + current_frame_last_position < partition_end) { + update(ctx, columns, state, current_frame_last_position); + } + } + void merge(FunctionContext* ctx, const Column* column, AggDataPtr __restrict state, size_t row_num) const override { DCHECK(column->is_binary()); Slice value = column->get(row_num).get_slice(); diff --git a/test/sql/test_window_function/R/test_removable_cumulative_process b/test/sql/test_window_function/R/test_removable_cumulative_process index 7ed641efa38c0..c53501cc8350f 100644 --- a/test/sql/test_window_function/R/test_removable_cumulative_process +++ b/test/sql/test_window_function/R/test_removable_cumulative_process @@ -829,4 +829,55 @@ SELECT v3, AVG(v3) OVER(ORDER BY v3 ROWS BETWEEN CURRENT ROW AND 2 FOLLOWING) AS 22 23.0 23 23.5 24 24.0 +-- !result + + + + + + + +-- name: test_removable_cumulative_process_string_type +CREATE TABLE IF NOT EXISTS `t2` ( + `v1` int(11) NULL, + `v2` STRING NULL +) +DUPLICATE KEY(`v1`) +DISTRIBUTED BY HASH(`v1`) BUCKETS 10 +PROPERTIES ( + "replication_num" = "1" +); +-- result: +-- !result +INSERT INTO t2 VALUES +(1, '1'), +(2, NULL), +(3, '2'), +(4, '2'), +(5, '2'), +(6, '3'), +(7, '3'), +(8, '200'), +(9, '40'), +(10, '50'), +(11, '60'), +(12, '10'), +(13, '20'); +-- result: +-- !result +SELECT v1, v2, MIN(v2) OVER (ORDER BY v1 DESC ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) AS _min FROM t2 ORDER BY v1 desc; +-- result: +13 20 20 +12 10 10 +11 60 10 +10 50 10 +9 40 40 +8 200 200 +7 3 200 +6 3 200 +5 2 2 +4 2 2 +3 2 2 +2 None 2 +1 1 1 -- !result \ No newline at end of file diff --git a/test/sql/test_window_function/T/test_removable_cumulative_process b/test/sql/test_window_function/T/test_removable_cumulative_process index 977572ebe3463..9d32bff9ab012 100644 --- a/test/sql/test_window_function/T/test_removable_cumulative_process +++ b/test/sql/test_window_function/T/test_removable_cumulative_process @@ -65,3 +65,29 @@ SELECT v3, AVG(v3) OVER(ORDER BY v3 ROWS BETWEEN 2 PRECEDING AND 2 FOLLOWING) AS SELECT v3, AVG(v3) OVER(ORDER BY v3 ROWS BETWEEN 2 FOLLOWING AND 5 FOLLOWING) AS CNT FROM t1 ORDER BY v3; SELECT v3, AVG(v3) OVER(ORDER BY v3 ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) AS CNT FROM t1 ORDER BY v3; SELECT v3, AVG(v3) OVER(ORDER BY v3 ROWS BETWEEN CURRENT ROW AND 2 FOLLOWING) AS CNT FROM t1 ORDER BY v3; + +-- name: test_removable_cumulative_process_string_type +CREATE TABLE IF NOT EXISTS `t2` ( + `v1` int(11) NULL, + `v2` STRING NULL +) +DUPLICATE KEY(`v1`) +DISTRIBUTED BY HASH(`v1`) BUCKETS 10 +PROPERTIES ( + "replication_num" = "1" +); +INSERT INTO t2 VALUES +(1, '1'), +(2, NULL), +(3, '2'), +(4, '2'), +(5, '2'), +(6, '3'), +(7, '3'), +(8, '200'), +(9, '40'), +(10, '50'), +(11, '60'), +(12, '10'), +(13, '20'); +SELECT v1, v2, MIN(v2) OVER (ORDER BY v1 DESC ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) AS _min FROM t2 ORDER BY v1 desc;