Skip to content

Commit

Permalink
Support offset frame type (#7514)
Browse files Browse the repository at this point in the history
ref #7376
  • Loading branch information
xzhangxian1008 authored Jun 2, 2023
1 parent fbfb4cb commit cae66f2
Show file tree
Hide file tree
Showing 8 changed files with 346 additions and 49 deletions.
137 changes: 133 additions & 4 deletions dbms/src/DataStreams/WindowBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
#include <Interpreters/WindowDescription.h>

#include <magic_enum.hpp>
#include <tuple>
#include <type_traits>

namespace DB
{
Expand Down Expand Up @@ -271,12 +273,121 @@ Int64 WindowTransformAction::getPartitionEndRow(size_t block_rows)
return left;
}

RowNumber WindowTransformAction::stepToFrameStart(const RowNumber & current_row, const WindowFrame & frame)
{
auto step_num = frame.begin_offset;
auto dist = distance(current_row, partition_start);

if (dist <= step_num)
return partition_start;

RowNumber result_row = current_row;

// The step happens only in a block
if (result_row.row >= step_num)
{
result_row.row -= step_num;
return result_row;
}

// The step happens between blocks
step_num -= (result_row.row + 1);
--result_row.block;
result_row.row = blockAt(result_row).rows - 1;
while (step_num > 0)
{
auto & block = blockAt(result_row);
if (block.rows > step_num)
{
result_row.row = block.rows - step_num - 1; // index, so we need to -1
break;
}
step_num -= block.rows;
--result_row.block;
result_row.row = blockAt(result_row).rows - 1;
}
return result_row;
}

std::tuple<RowNumber, bool> WindowTransformAction::stepToFrameEnd(const RowNumber & current_row, const WindowFrame & frame)
{
auto step_num = frame.end_offset;
if (!partition_ended)
return std::make_tuple(RowNumber(), false);

// Range of rows is [frame_start, frame_end),
// and frame_end position is behind the position of the last frame row.
// So we need to ++n.
++step_num;

auto dist = distance(partition_end, current_row);
RUNTIME_CHECK(dist >= 1);

// Offset is too large and the partition_end is the longest position we can reach
if (dist <= step_num)
return std::make_tuple(partition_end, true);

// Now, frame_end is impossible to reach to partition_end.
RowNumber frame_end_row = current_row;
auto & block = blockAt(frame_end_row);

// The step happens only in a block
if ((block.rows - frame_end_row.row - 1) >= step_num)
{
frame_end_row.row += step_num;
return std::make_tuple(frame_end_row, true);
}

// The step happens between blocks
step_num -= block.rows - frame_end_row.row;
++frame_end_row.block;
frame_end_row.row = 0;
while (step_num > 0)
{
auto block_rows = blockAt(frame_end_row).rows;
if (step_num >= block_rows)
{
frame_end_row.row = 0;
++frame_end_row.block;
step_num -= block_rows;
continue;
}

frame_end_row.row += step_num;
step_num = 0;
}

return std::make_tuple(frame_end_row, true);
}

UInt64 WindowTransformAction::distance(RowNumber left, RowNumber right)
{
if (left.block == right.block)
{
RUNTIME_CHECK_MSG(left.row >= right.row, "left should always be bigger than right");
return left.row - right.row;
}

RUNTIME_CHECK_MSG(left.block > right.block, "left should always be bigger than right");

Int64 dist = left.row;
RowNumber tmp = left;
--tmp.block;
while (tmp.block > right.block)
{
dist += blockAt(tmp).rows;
--tmp.block;
}

dist += blockAt(right).rows - right.row;

return dist;
}

void WindowTransformAction::advanceFrameStart()
{
if (frame_started)
{
return;
}

switch (window_description.frame.begin_type)
{
Expand All @@ -292,6 +403,15 @@ void WindowTransformAction::advanceFrameStart()
break;
}
case WindowFrame::BoundaryType::Offset:
if (window_description.frame.type == WindowFrame::FrameType::Rows)
frame_start = stepToFrameStart(current_row, window_description.frame);
else
throw Exception(
ErrorCodes::NOT_IMPLEMENTED,
fmt::format("Frame type {}'s Offset BoundaryType is not implemented",
magic_enum::enum_name(window_description.frame.type)));
frame_started = true;
break;
default:
throw Exception(
ErrorCodes::NOT_IMPLEMENTED,
Expand Down Expand Up @@ -388,6 +508,16 @@ void WindowTransformAction::advanceFrameEnd()
break;
}
case WindowFrame::BoundaryType::Offset:
{
if (window_description.frame.type == WindowFrame::FrameType::Rows)
std::tie(frame_end, frame_ended) = stepToFrameEnd(current_row, window_description.frame);
else
throw Exception(
ErrorCodes::NOT_IMPLEMENTED,
fmt::format("Frame type {}'s Offset BoundaryType is not implemented",
magic_enum::enum_name(window_description.frame.type)));
break;
}
default:
throw Exception(ErrorCodes::NOT_IMPLEMENTED,
"The frame end type '{}' is not implemented",
Expand Down Expand Up @@ -594,7 +724,6 @@ void WindowTransformAction::tryCalculate()
partition_start = partition_end;
advanceRowNumber(partition_end);
partition_ended = false;

// We have to reset the frame and other pointers when the new partition starts.
frame_start = partition_start;
frame_end = partition_start;
Expand Down Expand Up @@ -660,7 +789,7 @@ RowNumber WindowTransformAction::getPreviousRowNumber(const RowNumber & row_num)
}

--prev_row_num.block;
assert(prev_row_num.block - first_block_number < window_blocks.size());
assert(prev_row_num.block < window_blocks.size() + first_block_number);
const auto new_block_rows = blockAt(prev_row_num).rows;
prev_row_num.row = new_block_rows - 1;
return prev_row_num;
Expand Down
10 changes: 10 additions & 0 deletions dbms/src/DataStreams/WindowBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,16 @@ struct RowNumber
/* Implementation details.*/
struct WindowTransformAction
{
private:
// Used for calculating the frame start
RowNumber stepToFrameStart(const RowNumber & current_row, const WindowFrame & frame);
// Used for calculating the frame end
std::tuple<RowNumber, bool> stepToFrameEnd(const RowNumber & current_row, const WindowFrame & frame);

// distance is left - right.
UInt64 distance(RowNumber left, RowNumber right);

public:
WindowTransformAction(const Block & input_header, const WindowDescription & window_description_, const String & req_id);

void cleanUp();
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Debug/MockExecutor/WindowBinder.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

namespace DB::mock
{
// true: unbounded, false: not unbounded
using MockWindowFrameBound = std::tuple<tipb::WindowBoundType, bool, UInt64>;
struct MockWindowFrame
{
Expand Down
6 changes: 3 additions & 3 deletions dbms/src/Flash/tests/gtest_window_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ try

/*
select count(1) from (
SELECT
SELECT
ROW_NUMBER() OVER (PARTITION BY `partition` ORDER BY `order`),
ROW_NUMBER() OVER (PARTITION BY `partition` ORDER BY `order` DESC)
FROM `test_db`.`test_table`
Expand All @@ -347,7 +347,7 @@ try

/*
select count(1) from (
SELECT
SELECT
ROW_NUMBER() OVER (PARTITION BY `partition` ORDER BY `order`),
ROW_NUMBER() OVER (PARTITION BY `partition` ORDER BY `order`)
FROM `test_db`.`test_table`
Expand All @@ -370,7 +370,7 @@ try

/*
select count(1) from (
SELECT
SELECT
Rank() OVER (PARTITION BY `partition` ORDER BY `order`),
DenseRank() OVER (PARTITION BY `partition` ORDER BY `order`)
FROM `test_db`.`test_table`
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Interpreters/WindowDescription.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,11 @@ struct WindowFrame
FrameType type = FrameType::Ranges;

BoundaryType begin_type = BoundaryType::Unbounded;
Field begin_offset = Field(UInt64(0));
UInt64 begin_offset = 0;
bool begin_preceding = true;

BoundaryType end_type = BoundaryType::Unbounded;
Field end_offset = Field(UInt64(0));
UInt64 end_offset = 0;
bool end_preceding = false;

bool operator==(const WindowFrame & other) const
Expand Down
79 changes: 63 additions & 16 deletions dbms/src/WindowFunctions/tests/gtest_first_value.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,13 @@

#include <Interpreters/Context.h>
#include <TestUtils/ExecutorTestUtils.h>
#include <TestUtils/mockExecutor.h>
#include <tipb/executor.pb.h>

#include <optional>

namespace DB::tests
{
// TODO Tests with frame should be added
class FirstValue : public DB::tests::ExecutorTest
{
static const size_t max_concurrency_level = 10;
Expand Down Expand Up @@ -46,7 +49,8 @@ class FirstValue : public DB::tests::ExecutorTest
void executeFunctionAndAssert(
const ColumnWithTypeAndName & result,
const ASTPtr & function,
const ColumnsWithTypeAndName & input)
const ColumnsWithTypeAndName & input,
MockWindowFrame mock_frame = MockWindowFrame())
{
ColumnsWithTypeAndName actual_input = input;
assert(actual_input.size() == 3);
Expand All @@ -65,7 +69,7 @@ class FirstValue : public DB::tests::ExecutorTest
auto request = context
.scan("test_db", "test_table_for_first_value")
.sort({{"partition", false}, {"order", false}}, true)
.window(function, {"order", false}, {"partition", false}, MockWindowFrame{})
.window(function, {"order", false}, {"partition", false}, mock_frame)
.build(context);

ColumnsWithTypeAndName expect = input;
Expand Down Expand Up @@ -113,19 +117,62 @@ class FirstValue : public DB::tests::ExecutorTest
TEST_F(FirstValue, firstValue)
try
{
executeFunctionAndAssert(
toVec<String>({"1", "2", "2", "2", "2", "6", "6", "6", "6", "6", "11", "11", "11"}),
FirstValue(value_col),
{toVec<Int64>(/*partition*/ {0, 1, 1, 1, 1, 2, 2, 2, 2, 2, 3, 3, 3}),
toVec<Int64>(/*order*/ {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12}),
toVec<String>(/*value*/ {"1", "2", "3", "4", "5", "6", "7", "8", "9", "10", "11", "12", "13"})});

executeFunctionAndAssert(
toNullableVec<String>({{}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}}),
FirstValue(value_col),
{toNullableVec<Int64>(/*partition*/ {0, 1, 1, 1, 1, 2, 2, 2, 2, 2, 3, 3, 3}),
toNullableVec<Int64>(/*order*/ {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12}),
toNullableVec<String>(/*value*/ {{}, {}, "3", "4", "5", {}, "7", "8", "9", "10", {}, "12", "13"})});
{
// frame type: unbounded
executeFunctionAndAssert(
toVec<String>({"1", "2", "2", "2", "2", "6", "6", "6", "6", "6", "11", "11", "11"}),
FirstValue(value_col),
{toVec<Int64>(/*partition*/ {0, 1, 1, 1, 1, 2, 2, 2, 2, 2, 3, 3, 3}),
toVec<Int64>(/*order*/ {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12}),
toVec<String>(/*value*/ {"1", "2", "3", "4", "5", "6", "7", "8", "9", "10", "11", "12", "13"})});

executeFunctionAndAssert(
toNullableVec<String>({{}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}}),
FirstValue(value_col),
{toNullableVec<Int64>(/*partition*/ {0, 1, 1, 1, 1, 2, 2, 2, 2, 2, 3, 3, 3}),
toNullableVec<Int64>(/*order*/ {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12}),
toNullableVec<String>(/*value*/ {{}, {}, "3", "4", "5", {}, "7", "8", "9", "10", {}, "12", "13"})});
}

{
// frame type: offset
MockWindowFrame frame;
frame.type = tipb::WindowFrameType::Rows;
frame.start = std::make_tuple(tipb::WindowBoundType::Following, false, 0);

std::vector<Int64> frame_start_offset{0, 1, 3, 10};
std::vector<std::vector<String>> res_not_null{
{"1", "2", "3", "4", "5", "6", "7", "8", "9", "10", "11", "12", "13"},
{"1", "2", "2", "3", "4", "6", "6", "7", "8", "9", "11", "11", "12"},
{"1", "2", "2", "2", "2", "6", "6", "6", "6", "7", "11", "11", "11"},
{"1", "2", "2", "2", "2", "6", "6", "6", "6", "6", "11", "11", "11"}};
std::vector<std::vector<std::optional<String>>> res_null{
{{}, {}, "3", "4", "5", {}, "7", "8", "9", "10", {}, "12", "13"},
{{}, {}, {}, "3", "4", {}, {}, "7", "8", "9", {}, {}, "12"},
{{}, {}, {}, {}, {}, {}, {}, {}, {}, "7", {}, {}, {}},
{{}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}}};

for (size_t i = 0; i < frame_start_offset.size(); ++i)
{
frame.start = std::make_tuple(tipb::WindowBoundType::Preceding, false, frame_start_offset[i]);

executeFunctionAndAssert(
toVec<String>(res_not_null[i]),
FirstValue(value_col),
{toVec<Int64>(/*partition*/ {0, 1, 1, 1, 1, 2, 2, 2, 2, 2, 3, 3, 3}),
toVec<Int64>(/*order*/ {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12}),
toVec<String>(/*value*/ {"1", "2", "3", "4", "5", "6", "7", "8", "9", "10", "11", "12", "13"})},
frame);

executeFunctionAndAssert(
toNullableVec<String>(res_null[i]),
FirstValue(value_col),
{toNullableVec<Int64>(/*partition*/ {0, 1, 1, 1, 1, 2, 2, 2, 2, 2, 3, 3, 3}),
toNullableVec<Int64>(/*order*/ {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12}),
toNullableVec<String>(/*value*/ {{}, {}, "3", "4", "5", {}, "7", "8", "9", "10", {}, "12", "13"})},
frame);
}
}

// TODO support unsigned int.
testInt<Int8>();
Expand Down
Loading

0 comments on commit cae66f2

Please sign in to comment.