From cae66f2e2afaa60a1f656610d350a825263c5966 Mon Sep 17 00:00:00 2001 From: xzhangxian1008 Date: Fri, 2 Jun 2023 12:42:42 +0800 Subject: [PATCH] Support offset frame type (#7514) ref pingcap/tiflash#7376 --- .../DataStreams/WindowBlockInputStream.cpp | 137 +++++++++++++++++- dbms/src/DataStreams/WindowBlockInputStream.h | 10 ++ dbms/src/Debug/MockExecutor/WindowBinder.h | 1 + .../src/Flash/tests/gtest_window_executor.cpp | 6 +- dbms/src/Interpreters/WindowDescription.h | 4 +- .../tests/gtest_first_value.cpp | 79 ++++++++-- .../tests/gtest_last_value.cpp | 82 ++++++++--- tests/fullstack-test/mpp/window.test | 76 ++++++++++ 8 files changed, 346 insertions(+), 49 deletions(-) diff --git a/dbms/src/DataStreams/WindowBlockInputStream.cpp b/dbms/src/DataStreams/WindowBlockInputStream.cpp index b9f1ceee68e..a3d773d3cc4 100644 --- a/dbms/src/DataStreams/WindowBlockInputStream.cpp +++ b/dbms/src/DataStreams/WindowBlockInputStream.cpp @@ -16,6 +16,8 @@ #include #include +#include +#include namespace DB { @@ -271,12 +273,121 @@ Int64 WindowTransformAction::getPartitionEndRow(size_t block_rows) return left; } +RowNumber WindowTransformAction::stepToFrameStart(const RowNumber & current_row, const WindowFrame & frame) +{ + auto step_num = frame.begin_offset; + auto dist = distance(current_row, partition_start); + + if (dist <= step_num) + return partition_start; + + RowNumber result_row = current_row; + + // The step happens only in a block + if (result_row.row >= step_num) + { + result_row.row -= step_num; + return result_row; + } + + // The step happens between blocks + step_num -= (result_row.row + 1); + --result_row.block; + result_row.row = blockAt(result_row).rows - 1; + while (step_num > 0) + { + auto & block = blockAt(result_row); + if (block.rows > step_num) + { + result_row.row = block.rows - step_num - 1; // index, so we need to -1 + break; + } + step_num -= block.rows; + --result_row.block; + result_row.row = blockAt(result_row).rows - 1; + } + return result_row; +} + +std::tuple WindowTransformAction::stepToFrameEnd(const RowNumber & current_row, const WindowFrame & frame) +{ + auto step_num = frame.end_offset; + if (!partition_ended) + return std::make_tuple(RowNumber(), false); + + // Range of rows is [frame_start, frame_end), + // and frame_end position is behind the position of the last frame row. + // So we need to ++n. + ++step_num; + + auto dist = distance(partition_end, current_row); + RUNTIME_CHECK(dist >= 1); + + // Offset is too large and the partition_end is the longest position we can reach + if (dist <= step_num) + return std::make_tuple(partition_end, true); + + // Now, frame_end is impossible to reach to partition_end. + RowNumber frame_end_row = current_row; + auto & block = blockAt(frame_end_row); + + // The step happens only in a block + if ((block.rows - frame_end_row.row - 1) >= step_num) + { + frame_end_row.row += step_num; + return std::make_tuple(frame_end_row, true); + } + + // The step happens between blocks + step_num -= block.rows - frame_end_row.row; + ++frame_end_row.block; + frame_end_row.row = 0; + while (step_num > 0) + { + auto block_rows = blockAt(frame_end_row).rows; + if (step_num >= block_rows) + { + frame_end_row.row = 0; + ++frame_end_row.block; + step_num -= block_rows; + continue; + } + + frame_end_row.row += step_num; + step_num = 0; + } + + return std::make_tuple(frame_end_row, true); +} + +UInt64 WindowTransformAction::distance(RowNumber left, RowNumber right) +{ + if (left.block == right.block) + { + RUNTIME_CHECK_MSG(left.row >= right.row, "left should always be bigger than right"); + return left.row - right.row; + } + + RUNTIME_CHECK_MSG(left.block > right.block, "left should always be bigger than right"); + + Int64 dist = left.row; + RowNumber tmp = left; + --tmp.block; + while (tmp.block > right.block) + { + dist += blockAt(tmp).rows; + --tmp.block; + } + + dist += blockAt(right).rows - right.row; + + return dist; +} + void WindowTransformAction::advanceFrameStart() { if (frame_started) - { return; - } switch (window_description.frame.begin_type) { @@ -292,6 +403,15 @@ void WindowTransformAction::advanceFrameStart() break; } case WindowFrame::BoundaryType::Offset: + if (window_description.frame.type == WindowFrame::FrameType::Rows) + frame_start = stepToFrameStart(current_row, window_description.frame); + else + throw Exception( + ErrorCodes::NOT_IMPLEMENTED, + fmt::format("Frame type {}'s Offset BoundaryType is not implemented", + magic_enum::enum_name(window_description.frame.type))); + frame_started = true; + break; default: throw Exception( ErrorCodes::NOT_IMPLEMENTED, @@ -388,6 +508,16 @@ void WindowTransformAction::advanceFrameEnd() break; } case WindowFrame::BoundaryType::Offset: + { + if (window_description.frame.type == WindowFrame::FrameType::Rows) + std::tie(frame_end, frame_ended) = stepToFrameEnd(current_row, window_description.frame); + else + throw Exception( + ErrorCodes::NOT_IMPLEMENTED, + fmt::format("Frame type {}'s Offset BoundaryType is not implemented", + magic_enum::enum_name(window_description.frame.type))); + break; + } default: throw Exception(ErrorCodes::NOT_IMPLEMENTED, "The frame end type '{}' is not implemented", @@ -594,7 +724,6 @@ void WindowTransformAction::tryCalculate() partition_start = partition_end; advanceRowNumber(partition_end); partition_ended = false; - // We have to reset the frame and other pointers when the new partition starts. frame_start = partition_start; frame_end = partition_start; @@ -660,7 +789,7 @@ RowNumber WindowTransformAction::getPreviousRowNumber(const RowNumber & row_num) } --prev_row_num.block; - assert(prev_row_num.block - first_block_number < window_blocks.size()); + assert(prev_row_num.block < window_blocks.size() + first_block_number); const auto new_block_rows = blockAt(prev_row_num).rows; prev_row_num.row = new_block_rows - 1; return prev_row_num; diff --git a/dbms/src/DataStreams/WindowBlockInputStream.h b/dbms/src/DataStreams/WindowBlockInputStream.h index c6fd314f3a6..fca4fa7ea0e 100644 --- a/dbms/src/DataStreams/WindowBlockInputStream.h +++ b/dbms/src/DataStreams/WindowBlockInputStream.h @@ -71,6 +71,16 @@ struct RowNumber /* Implementation details.*/ struct WindowTransformAction { +private: + // Used for calculating the frame start + RowNumber stepToFrameStart(const RowNumber & current_row, const WindowFrame & frame); + // Used for calculating the frame end + std::tuple stepToFrameEnd(const RowNumber & current_row, const WindowFrame & frame); + + // distance is left - right. + UInt64 distance(RowNumber left, RowNumber right); + +public: WindowTransformAction(const Block & input_header, const WindowDescription & window_description_, const String & req_id); void cleanUp(); diff --git a/dbms/src/Debug/MockExecutor/WindowBinder.h b/dbms/src/Debug/MockExecutor/WindowBinder.h index b9745d3358b..97e272c0ebe 100644 --- a/dbms/src/Debug/MockExecutor/WindowBinder.h +++ b/dbms/src/Debug/MockExecutor/WindowBinder.h @@ -19,6 +19,7 @@ namespace DB::mock { +// true: unbounded, false: not unbounded using MockWindowFrameBound = std::tuple; struct MockWindowFrame { diff --git a/dbms/src/Flash/tests/gtest_window_executor.cpp b/dbms/src/Flash/tests/gtest_window_executor.cpp index 5fdcc802cff..3976ee98270 100644 --- a/dbms/src/Flash/tests/gtest_window_executor.cpp +++ b/dbms/src/Flash/tests/gtest_window_executor.cpp @@ -330,7 +330,7 @@ try /* select count(1) from ( - SELECT + SELECT ROW_NUMBER() OVER (PARTITION BY `partition` ORDER BY `order`), ROW_NUMBER() OVER (PARTITION BY `partition` ORDER BY `order` DESC) FROM `test_db`.`test_table` @@ -347,7 +347,7 @@ try /* select count(1) from ( - SELECT + SELECT ROW_NUMBER() OVER (PARTITION BY `partition` ORDER BY `order`), ROW_NUMBER() OVER (PARTITION BY `partition` ORDER BY `order`) FROM `test_db`.`test_table` @@ -370,7 +370,7 @@ try /* select count(1) from ( - SELECT + SELECT Rank() OVER (PARTITION BY `partition` ORDER BY `order`), DenseRank() OVER (PARTITION BY `partition` ORDER BY `order`) FROM `test_db`.`test_table` diff --git a/dbms/src/Interpreters/WindowDescription.h b/dbms/src/Interpreters/WindowDescription.h index c51be3cd224..e986f920f65 100644 --- a/dbms/src/Interpreters/WindowDescription.h +++ b/dbms/src/Interpreters/WindowDescription.h @@ -61,11 +61,11 @@ struct WindowFrame FrameType type = FrameType::Ranges; BoundaryType begin_type = BoundaryType::Unbounded; - Field begin_offset = Field(UInt64(0)); + UInt64 begin_offset = 0; bool begin_preceding = true; BoundaryType end_type = BoundaryType::Unbounded; - Field end_offset = Field(UInt64(0)); + UInt64 end_offset = 0; bool end_preceding = false; bool operator==(const WindowFrame & other) const diff --git a/dbms/src/WindowFunctions/tests/gtest_first_value.cpp b/dbms/src/WindowFunctions/tests/gtest_first_value.cpp index 2c33a4a6830..402af1c8e0c 100644 --- a/dbms/src/WindowFunctions/tests/gtest_first_value.cpp +++ b/dbms/src/WindowFunctions/tests/gtest_first_value.cpp @@ -14,10 +14,13 @@ #include #include +#include +#include + +#include namespace DB::tests { -// TODO Tests with frame should be added class FirstValue : public DB::tests::ExecutorTest { static const size_t max_concurrency_level = 10; @@ -46,7 +49,8 @@ class FirstValue : public DB::tests::ExecutorTest void executeFunctionAndAssert( const ColumnWithTypeAndName & result, const ASTPtr & function, - const ColumnsWithTypeAndName & input) + const ColumnsWithTypeAndName & input, + MockWindowFrame mock_frame = MockWindowFrame()) { ColumnsWithTypeAndName actual_input = input; assert(actual_input.size() == 3); @@ -65,7 +69,7 @@ class FirstValue : public DB::tests::ExecutorTest auto request = context .scan("test_db", "test_table_for_first_value") .sort({{"partition", false}, {"order", false}}, true) - .window(function, {"order", false}, {"partition", false}, MockWindowFrame{}) + .window(function, {"order", false}, {"partition", false}, mock_frame) .build(context); ColumnsWithTypeAndName expect = input; @@ -113,19 +117,62 @@ class FirstValue : public DB::tests::ExecutorTest TEST_F(FirstValue, firstValue) try { - executeFunctionAndAssert( - toVec({"1", "2", "2", "2", "2", "6", "6", "6", "6", "6", "11", "11", "11"}), - FirstValue(value_col), - {toVec(/*partition*/ {0, 1, 1, 1, 1, 2, 2, 2, 2, 2, 3, 3, 3}), - toVec(/*order*/ {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12}), - toVec(/*value*/ {"1", "2", "3", "4", "5", "6", "7", "8", "9", "10", "11", "12", "13"})}); - - executeFunctionAndAssert( - toNullableVec({{}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}}), - FirstValue(value_col), - {toNullableVec(/*partition*/ {0, 1, 1, 1, 1, 2, 2, 2, 2, 2, 3, 3, 3}), - toNullableVec(/*order*/ {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12}), - toNullableVec(/*value*/ {{}, {}, "3", "4", "5", {}, "7", "8", "9", "10", {}, "12", "13"})}); + { + // frame type: unbounded + executeFunctionAndAssert( + toVec({"1", "2", "2", "2", "2", "6", "6", "6", "6", "6", "11", "11", "11"}), + FirstValue(value_col), + {toVec(/*partition*/ {0, 1, 1, 1, 1, 2, 2, 2, 2, 2, 3, 3, 3}), + toVec(/*order*/ {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12}), + toVec(/*value*/ {"1", "2", "3", "4", "5", "6", "7", "8", "9", "10", "11", "12", "13"})}); + + executeFunctionAndAssert( + toNullableVec({{}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}}), + FirstValue(value_col), + {toNullableVec(/*partition*/ {0, 1, 1, 1, 1, 2, 2, 2, 2, 2, 3, 3, 3}), + toNullableVec(/*order*/ {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12}), + toNullableVec(/*value*/ {{}, {}, "3", "4", "5", {}, "7", "8", "9", "10", {}, "12", "13"})}); + } + + { + // frame type: offset + MockWindowFrame frame; + frame.type = tipb::WindowFrameType::Rows; + frame.start = std::make_tuple(tipb::WindowBoundType::Following, false, 0); + + std::vector frame_start_offset{0, 1, 3, 10}; + std::vector> res_not_null{ + {"1", "2", "3", "4", "5", "6", "7", "8", "9", "10", "11", "12", "13"}, + {"1", "2", "2", "3", "4", "6", "6", "7", "8", "9", "11", "11", "12"}, + {"1", "2", "2", "2", "2", "6", "6", "6", "6", "7", "11", "11", "11"}, + {"1", "2", "2", "2", "2", "6", "6", "6", "6", "6", "11", "11", "11"}}; + std::vector>> res_null{ + {{}, {}, "3", "4", "5", {}, "7", "8", "9", "10", {}, "12", "13"}, + {{}, {}, {}, "3", "4", {}, {}, "7", "8", "9", {}, {}, "12"}, + {{}, {}, {}, {}, {}, {}, {}, {}, {}, "7", {}, {}, {}}, + {{}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}}}; + + for (size_t i = 0; i < frame_start_offset.size(); ++i) + { + frame.start = std::make_tuple(tipb::WindowBoundType::Preceding, false, frame_start_offset[i]); + + executeFunctionAndAssert( + toVec(res_not_null[i]), + FirstValue(value_col), + {toVec(/*partition*/ {0, 1, 1, 1, 1, 2, 2, 2, 2, 2, 3, 3, 3}), + toVec(/*order*/ {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12}), + toVec(/*value*/ {"1", "2", "3", "4", "5", "6", "7", "8", "9", "10", "11", "12", "13"})}, + frame); + + executeFunctionAndAssert( + toNullableVec(res_null[i]), + FirstValue(value_col), + {toNullableVec(/*partition*/ {0, 1, 1, 1, 1, 2, 2, 2, 2, 2, 3, 3, 3}), + toNullableVec(/*order*/ {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12}), + toNullableVec(/*value*/ {{}, {}, "3", "4", "5", {}, "7", "8", "9", "10", {}, "12", "13"})}, + frame); + } + } // TODO support unsigned int. testInt(); diff --git a/dbms/src/WindowFunctions/tests/gtest_last_value.cpp b/dbms/src/WindowFunctions/tests/gtest_last_value.cpp index 58d25def848..585efa70324 100644 --- a/dbms/src/WindowFunctions/tests/gtest_last_value.cpp +++ b/dbms/src/WindowFunctions/tests/gtest_last_value.cpp @@ -51,7 +51,7 @@ class LastValue : public DB::tests::ExecutorTest const ColumnWithTypeAndName & result, const ASTPtr & function, const ColumnsWithTypeAndName & input, - const MockWindowFrame & frame) + MockWindowFrame frame = MockWindowFrame()) { ColumnsWithTypeAndName actual_input = input; assert(actual_input.size() == 3); @@ -81,7 +81,6 @@ class LastValue : public DB::tests::ExecutorTest template void testInt() { - // TODO test with bounded_type_frame MockWindowFrame unbounded_type_frame{ tipb::WindowFrameType::Rows, std::make_tuple(tipb::WindowBoundType::Preceding, true, 0), @@ -107,7 +106,6 @@ class LastValue : public DB::tests::ExecutorTest template void testFloat() { - // TODO test with bounded_type_frame MockWindowFrame unbounded_type_frame{ tipb::WindowFrameType::Rows, std::make_tuple(tipb::WindowBoundType::Preceding, true, 0), @@ -134,27 +132,63 @@ class LastValue : public DB::tests::ExecutorTest TEST_F(LastValue, lastValue) try { - // TODO test with bounded_type_frame - MockWindowFrame unbounded_type_frame{ - tipb::WindowFrameType::Rows, - std::make_tuple(tipb::WindowBoundType::Preceding, true, 0), - std::make_tuple(tipb::WindowBoundType::Following, true, 0)}; - - executeFunctionAndAssert( - toVec({"1", "5", "5", "5", "5", "10", "10", "10", "10", "10", "13", "13", "13"}), - LastValue(value_col), - {toVec(/*partition*/ {0, 1, 1, 1, 1, 2, 2, 2, 2, 2, 3, 3, 3}), - toVec(/*order*/ {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12}), - toVec(/*value*/ {"1", "2", "3", "4", "5", "6", "7", "8", "9", "10", "11", "12", "13"})}, - unbounded_type_frame); - - executeFunctionAndAssert( - toNullableVec({{}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}}), - LastValue(value_col), - {toNullableVec(/*partition*/ {0, 1, 1, 1, 1, 2, 2, 2, 2, 2, 3, 3, 3}), - toNullableVec(/*order*/ {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12}), - toNullableVec(/*value*/ {{}, "2", "3", "4", {}, "6", "7", "8", "9", {}, "11", "12", {}})}, - unbounded_type_frame); + { + // frame type: unbounded + executeFunctionAndAssert( + toVec({"1", "5", "5", "5", "5", "10", "10", "10", "10", "10", "13", "13", "13"}), + LastValue(value_col), + {toVec(/*partition*/ {0, 1, 1, 1, 1, 2, 2, 2, 2, 2, 3, 3, 3}), + toVec(/*order*/ {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12}), + toVec(/*value*/ {"1", "2", "3", "4", "5", "6", "7", "8", "9", "10", "11", "12", "13"})}); + + executeFunctionAndAssert( + toNullableVec({{}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}}), + LastValue(value_col), + {toNullableVec(/*partition*/ {0, 1, 1, 1, 1, 2, 2, 2, 2, 2, 3, 3, 3}), + toNullableVec(/*order*/ {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12}), + toNullableVec(/*value*/ {{}, "2", "3", "4", {}, "6", "7", "8", "9", {}, "11", "12", {}})}); + } + + { + // frame type: offset + MockWindowFrame frame; + frame.type = tipb::WindowFrameType::Rows; + frame.end = std::make_tuple(tipb::WindowBoundType::Following, false, 0); + + std::vector frame_start_offset{0, 1, 3, 10}; + std::vector> res_not_null{ + {"1", "2", "3", "4", "5", "6", "7", "8", "9", "10", "11", "12", "13"}, + {"1", "3", "4", "5", "5", "7", "8", "9", "10", "10", "12", "13", "13"}, + {"1", "5", "5", "5", "5", "9", "10", "10", "10", "10", "13", "13", "13"}, + {"1", "5", "5", "5", "5", "10", "10", "10", "10", "10", "13", "13", "13"}, + }; + std::vector>> res_null{ + {{}, "2", "3", "4", {}, "6", "7", "8", "9", {}, "11", "12", {}}, + {{}, "3", "4", {}, {}, "7", "8", "9", {}, {}, "12", {}, {}}, + {{}, {}, {}, {}, {}, "9", {}, {}, {}, {}, {}, {}, {}}, + {{}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}}, + }; + + for (size_t i = 0; i < frame_start_offset.size(); ++i) + { + frame.end = std::make_tuple(tipb::WindowBoundType::Following, false, frame_start_offset[i]); + executeFunctionAndAssert( + toVec(res_not_null[i]), + LastValue(value_col), + {toVec(/*partition*/ {0, 1, 1, 1, 1, 2, 2, 2, 2, 2, 3, 3, 3}), + toVec(/*order*/ {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12}), + toVec(/*value*/ {"1", "2", "3", "4", "5", "6", "7", "8", "9", "10", "11", "12", "13"})}, + frame); + + executeFunctionAndAssert( + toNullableVec(res_null[i]), + LastValue(value_col), + {toNullableVec(/*partition*/ {0, 1, 1, 1, 1, 2, 2, 2, 2, 2, 3, 3, 3}), + toNullableVec(/*order*/ {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12}), + toNullableVec(/*value*/ {{}, "2", "3", "4", {}, "6", "7", "8", "9", {}, "11", "12", {}})}, + frame); + } + } // TODO support unsigned int. testInt(); diff --git a/tests/fullstack-test/mpp/window.test b/tests/fullstack-test/mpp/window.test index ed636a54c06..f417d50a69a 100644 --- a/tests/fullstack-test/mpp/window.test +++ b/tests/fullstack-test/mpp/window.test @@ -89,6 +89,44 @@ mysql> use test; set tidb_enforce_mpp=1; select *, first_value(v) over (partitio | 2 | 9 | 10 | 6 | +---+----+------+------+ +mysql> use test; set tidb_enforce_mpp=1; select *, first_value(v) over (partition by p order by o asc rows between 2 preceding and 2 following) as a from test.first; ++---+----+------+------+ +| p | o | v | a | ++---+----+------+------+ +| 0 | 0 | 1 | 1 | +| 1 | 1 | 2 | 2 | +| 1 | 2 | 3 | 2 | +| 1 | 3 | 4 | 2 | +| 1 | 4 | 5 | 3 | +| 2 | 5 | 6 | 6 | +| 2 | 6 | 7 | 6 | +| 2 | 7 | 8 | 6 | +| 2 | 8 | 9 | 7 | +| 2 | 9 | 10 | 8 | +| 3 | 10 | 11 | 11 | +| 3 | 11 | 12 | 11 | +| 3 | 12 | 13 | 11 | ++---+----+------+------+ + +mysql> use test; set tidb_enforce_mpp=1; select *, first_value(v) over (partition by p order by o asc rows between 0 preceding and 0 following) as a from test.first; ++---+----+------+------+ +| p | o | v | a | ++---+----+------+------+ +| 0 | 0 | 1 | 1 | +| 1 | 1 | 2 | 2 | +| 1 | 2 | 3 | 3 | +| 1 | 3 | 4 | 4 | +| 1 | 4 | 5 | 5 | +| 2 | 5 | 6 | 6 | +| 2 | 6 | 7 | 7 | +| 2 | 7 | 8 | 8 | +| 2 | 8 | 9 | 9 | +| 2 | 9 | 10 | 10 | +| 3 | 10 | 11 | 11 | +| 3 | 11 | 12 | 12 | +| 3 | 12 | 13 | 13 | ++---+----+------+------+ + mysql> use test; set tidb_enforce_mpp=1; select *, first_value(v) over (partition by p order by o asc) as a from test.first1; +---+----+------+------+ | p | o | v | a | @@ -179,6 +217,44 @@ mysql> use test; set tidb_enforce_mpp=1; select *, last_value(v) over (partition | 3 | 12 | 13 | 13 | +---+----+------+------+ +mysql> use test; set tidb_enforce_mpp=1; select *, last_value(v) over (partition by p order by o asc rows between 2 preceding and 2 following) as a from test.last; ++---+----+------+------+ +| p | o | v | a | ++---+----+------+------+ +| 0 | 0 | 1 | 1 | +| 1 | 1 | 2 | 4 | +| 1 | 2 | 3 | 5 | +| 1 | 3 | 4 | 5 | +| 1 | 4 | 5 | 5 | +| 2 | 5 | 6 | 8 | +| 2 | 6 | 7 | 9 | +| 2 | 7 | 8 | 10 | +| 2 | 8 | 9 | 10 | +| 2 | 9 | 10 | 10 | +| 3 | 10 | 11 | 13 | +| 3 | 11 | 12 | 13 | +| 3 | 12 | 13 | 13 | ++---+----+------+------+ + +mysql> use test; set tidb_enforce_mpp=1; select *, last_value(v) over (partition by p order by o asc rows between 0 preceding and 0 following) as a from test.last; ++---+----+------+------+ +| p | o | v | a | ++---+----+------+------+ +| 0 | 0 | 1 | 1 | +| 1 | 1 | 2 | 2 | +| 1 | 2 | 3 | 3 | +| 1 | 3 | 4 | 4 | +| 1 | 4 | 5 | 5 | +| 2 | 5 | 6 | 6 | +| 2 | 6 | 7 | 7 | +| 2 | 7 | 8 | 8 | +| 2 | 8 | 9 | 9 | +| 2 | 9 | 10 | 10 | +| 3 | 10 | 11 | 11 | +| 3 | 11 | 12 | 12 | +| 3 | 12 | 13 | 13 | ++---+----+------+------+ + mysql> use test; set tidb_enforce_mpp=1; select *, last_value(v) over (partition by p order by o asc) as a from test.last1; +---+----+------+------+ | p | o | v | a |