diff --git a/.github/config/uncovered_files.csv b/.github/config/uncovered_files.csv index 6ef9e393af58..60a817370c35 100644 --- a/.github/config/uncovered_files.csv +++ b/.github/config/uncovered_files.csv @@ -224,6 +224,7 @@ execution/physical_plan/plan_show_select.cpp 3 execution/physical_plan_generator.cpp 6 execution/radix_partitioned_hashtable.cpp 3 execution/reservoir_sample.cpp 60 +execution/window_executor.cpp 29 execution/window_segment_tree.cpp 12 function/aggregate/sorted_aggregate_function.cpp 85 function/built_in_functions.cpp 6 diff --git a/.github/workflows/NightlyTests.yml b/.github/workflows/NightlyTests.yml index b40d83c5aa0b..889552d179ff 100644 --- a/.github/workflows/NightlyTests.yml +++ b/.github/workflows/NightlyTests.yml @@ -26,7 +26,7 @@ concurrency: env: GH_TOKEN: ${{ secrets.GH_TOKEN }} - DUCKDB_WASM_VERSION: "a8f2c38" + DUCKDB_WASM_VERSION: "687bab9a" CCACHE_SAVE: ${{ github.repository != 'duckdb/duckdb' }} jobs: @@ -696,6 +696,7 @@ jobs: git clone --recurse-submodules https://github.com/duckdb/duckdb-wasm cd duckdb-wasm git checkout ${{ env.DUCKDB_WASM_VERSION }} + git submodule update shopt -s nullglob for filename in ../.github/patches/duckdb-wasm/*.patch; do git apply $filename diff --git a/scripts/generate_functions.py b/scripts/generate_functions.py index bfdc946b75f2..82bf6e7242a3 100644 --- a/scripts/generate_functions.py +++ b/scripts/generate_functions.py @@ -3,7 +3,7 @@ import json aggregate_functions = ['algebraic', 'distributive', 'holistic', 'nested', 'regression'] -scalar_functions = ['bit', 'blob', 'date', 'enum', 'generic', 'list', 'map', 'math', 'operators', 'random', 'string', 'struct', 'union'] +scalar_functions = ['bit', 'blob', 'date', 'enum', 'generic', 'list', 'map', 'math', 'operators', 'random', 'string', 'debug', 'struct', 'union'] header = '''//===----------------------------------------------------------------------===// // DuckDB diff --git a/src/core_functions/function_list.cpp b/src/core_functions/function_list.cpp index 1e607f15d3e2..ee346ef74e43 100644 --- a/src/core_functions/function_list.cpp +++ b/src/core_functions/function_list.cpp @@ -17,6 +17,7 @@ #include "duckdb/core_functions/scalar/string_functions.hpp" #include "duckdb/core_functions/scalar/struct_functions.hpp" #include "duckdb/core_functions/scalar/union_functions.hpp" +#include "duckdb/core_functions/scalar/debug_functions.hpp" namespace duckdb { @@ -339,6 +340,7 @@ static StaticFunctionDefinition internal_functions[] = { DUCKDB_AGGREGATE_FUNCTION(VarPopFun), DUCKDB_AGGREGATE_FUNCTION(VarSampFun), DUCKDB_AGGREGATE_FUNCTION_ALIAS(VarianceFun), + DUCKDB_SCALAR_FUNCTION(VectorTypeFun), DUCKDB_SCALAR_FUNCTION(VersionFun), DUCKDB_SCALAR_FUNCTION_SET(WeekFun), DUCKDB_SCALAR_FUNCTION_SET(WeekDayFun), diff --git a/src/core_functions/scalar/CMakeLists.txt b/src/core_functions/scalar/CMakeLists.txt index 14437f50f8f9..c02bd908ee1a 100644 --- a/src/core_functions/scalar/CMakeLists.txt +++ b/src/core_functions/scalar/CMakeLists.txt @@ -11,6 +11,7 @@ add_subdirectory(random) add_subdirectory(string) add_subdirectory(struct) add_subdirectory(union) +add_subdirectory(debug) set(ALL_OBJECT_FILES ${ALL_OBJECT_FILES} diff --git a/src/core_functions/scalar/debug/CMakeLists.txt b/src/core_functions/scalar/debug/CMakeLists.txt new file mode 100644 index 000000000000..5012faa86619 --- /dev/null +++ b/src/core_functions/scalar/debug/CMakeLists.txt @@ -0,0 +1,4 @@ +add_library_unity(duckdb_func_debug OBJECT vector_type.cpp) +set(ALL_OBJECT_FILES + ${ALL_OBJECT_FILES} $ + PARENT_SCOPE) diff --git a/src/core_functions/scalar/debug/functions.json b/src/core_functions/scalar/debug/functions.json new file mode 100644 index 000000000000..07efa4a79192 --- /dev/null +++ b/src/core_functions/scalar/debug/functions.json @@ -0,0 +1,9 @@ +[ + { + "name": "vector_type", + "parameters": "col", + "description": "Returns the VectorType of a given column", + "example": "vector_type(col)", + "type": "scalar_function" + } +] diff --git a/src/core_functions/scalar/debug/vector_type.cpp b/src/core_functions/scalar/debug/vector_type.cpp new file mode 100644 index 000000000000..0f2dc5e2ed3b --- /dev/null +++ b/src/core_functions/scalar/debug/vector_type.cpp @@ -0,0 +1,23 @@ +#include "duckdb/core_functions/scalar/debug_functions.hpp" + +#include "duckdb/common/exception.hpp" +#include "duckdb/common/vector_operations/vector_operations.hpp" +#include "duckdb/planner/expression/bound_function_expression.hpp" +#include "duckdb/common/enum_util.hpp" + +namespace duckdb { + +static void VectorTypeFunction(DataChunk &input, ExpressionState &state, Vector &result) { + result.SetVectorType(VectorType::CONSTANT_VECTOR); + auto data = ConstantVector::GetData(result); + data[0] = StringVector::AddString(result, EnumUtil::ToString(input.data[0].GetVectorType())); +} + +ScalarFunction VectorTypeFun::GetFunction() { + return ScalarFunction("vector_type", // name of the function + {LogicalType::ANY}, // argument list + LogicalType::VARCHAR, // return type + VectorTypeFunction); +} + +} // namespace duckdb diff --git a/src/execution/CMakeLists.txt b/src/execution/CMakeLists.txt index b98788924dd9..a564167260b2 100644 --- a/src/execution/CMakeLists.txt +++ b/src/execution/CMakeLists.txt @@ -19,6 +19,7 @@ add_library_unity( physical_plan_generator.cpp radix_partitioned_hashtable.cpp reservoir_sample.cpp + window_executor.cpp window_segment_tree.cpp) set(ALL_OBJECT_FILES ${ALL_OBJECT_FILES} $ diff --git a/src/execution/operator/aggregate/physical_window.cpp b/src/execution/operator/aggregate/physical_window.cpp index 16c0a9316b60..f69c178215bc 100644 --- a/src/execution/operator/aggregate/physical_window.cpp +++ b/src/execution/operator/aggregate/physical_window.cpp @@ -14,6 +14,7 @@ #include "duckdb/common/vector_operations/vector_operations.hpp" #include "duckdb/common/windows_undefs.hpp" #include "duckdb/execution/expression_executor.hpp" +#include "duckdb/execution/window_executor.hpp" #include "duckdb/execution/window_segment_tree.hpp" #include "duckdb/main/client_config.hpp" #include "duckdb/main/config.hpp" @@ -77,1187 +78,38 @@ PhysicalWindow::PhysicalWindow(vector types, vector l) and word aligned, so this will not underflow. - r -= ValidityMask::BITS_PER_VALUE; - continue; - } - - // Loop backwards over the block - // shift is probing r-1 >= l >= 0 - for (++shift; shift-- > 0; --r) { - if (mask.RowIsValid(block, shift) && --n == 0) { - return MaxValue(l, r - 1); - } - } - } - - // Didn't find a start so return the start of the range - return l; -} - -static void PrepareInputExpressions(vector> &exprs, ExpressionExecutor &executor, - DataChunk &chunk) { - if (exprs.empty()) { - return; - } - - vector types; - for (idx_t expr_idx = 0; expr_idx < exprs.size(); ++expr_idx) { - types.push_back(exprs[expr_idx]->return_type); - executor.AddExpression(*exprs[expr_idx]); - } - - if (!types.empty()) { - auto &allocator = executor.GetAllocator(); - chunk.Initialize(allocator, types); - } -} - -static void PrepareInputExpression(Expression &expr, ExpressionExecutor &executor, DataChunk &chunk) { - vector types; - types.push_back(expr.return_type); - executor.AddExpression(expr); - - auto &allocator = executor.GetAllocator(); - chunk.Initialize(allocator, types); -} - -struct WindowInputExpression { - WindowInputExpression(optional_ptr expr_p, ClientContext &context) - : expr(expr_p), ptype(PhysicalType::INVALID), scalar(true), executor(context) { - if (expr) { - PrepareInputExpression(*expr, executor, chunk); - ptype = expr->return_type.InternalType(); - scalar = expr->IsScalar(); - } - } - - void Execute(DataChunk &input_chunk) { - if (expr) { - chunk.Reset(); - executor.Execute(input_chunk, chunk); - chunk.Verify(); - } - } - - template - inline T GetCell(idx_t i) const { - D_ASSERT(!chunk.data.empty()); - const auto data = FlatVector::GetData(chunk.data[0]); - return data[scalar ? 0 : i]; - } - - inline bool CellIsNull(idx_t i) const { - D_ASSERT(!chunk.data.empty()); - if (chunk.data[0].GetVectorType() == VectorType::CONSTANT_VECTOR) { - return ConstantVector::IsNull(chunk.data[0]); - } - return FlatVector::IsNull(chunk.data[0], i); - } - - inline void CopyCell(Vector &target, idx_t target_offset) const { - D_ASSERT(!chunk.data.empty()); - auto &source = chunk.data[0]; - auto source_offset = scalar ? 0 : target_offset; - VectorOperations::Copy(source, target, source_offset + 1, source_offset, target_offset); - } - - optional_ptr expr; - PhysicalType ptype; - bool scalar; - ExpressionExecutor executor; - DataChunk chunk; -}; - -struct WindowInputColumn { - WindowInputColumn(Expression *expr_p, ClientContext &context, idx_t capacity_p) - : input_expr(expr_p, context), count(0), capacity(capacity_p) { - if (input_expr.expr) { - target = make_uniq(input_expr.chunk.data[0].GetType(), capacity); - } - } - - void Append(DataChunk &input_chunk) { - if (input_expr.expr) { - const auto source_count = input_chunk.size(); - D_ASSERT(count + source_count <= capacity); - if (!input_expr.scalar || !count) { - input_expr.Execute(input_chunk); - auto &source = input_expr.chunk.data[0]; - VectorOperations::Copy(source, *target, source_count, 0, count); - } - count += source_count; - } - } - - inline bool CellIsNull(idx_t i) { - D_ASSERT(target); - D_ASSERT(i < count); - return FlatVector::IsNull(*target, input_expr.scalar ? 0 : i); - } - - template - inline T GetCell(idx_t i) const { - D_ASSERT(target); - D_ASSERT(i < count); - const auto data = FlatVector::GetData(*target); - return data[input_expr.scalar ? 0 : i]; - } - - WindowInputExpression input_expr; - -private: - unique_ptr target; - idx_t count; - idx_t capacity; -}; - -static inline bool BoundaryNeedsPeer(const WindowBoundary &boundary) { - switch (boundary) { - case WindowBoundary::CURRENT_ROW_RANGE: - case WindowBoundary::EXPR_PRECEDING_RANGE: - case WindowBoundary::EXPR_FOLLOWING_RANGE: - return true; - default: - return false; - } -} - -enum WindowBounds : uint8_t { PARTITION_BEGIN, PARTITION_END, PEER_BEGIN, PEER_END, WINDOW_BEGIN, WINDOW_END }; - -struct WindowBoundariesState { - static inline bool IsScalar(const unique_ptr &expr) { - return expr ? expr->IsScalar() : true; - } - - WindowBoundariesState(BoundWindowExpression &wexpr, const idx_t input_size) - : type(wexpr.type), input_size(input_size), start_boundary(wexpr.start), end_boundary(wexpr.end), - partition_count(wexpr.partitions.size()), order_count(wexpr.orders.size()), - range_sense(wexpr.orders.empty() ? OrderType::INVALID : wexpr.orders[0].type), - has_preceding_range(wexpr.start == WindowBoundary::EXPR_PRECEDING_RANGE || - wexpr.end == WindowBoundary::EXPR_PRECEDING_RANGE), - has_following_range(wexpr.start == WindowBoundary::EXPR_FOLLOWING_RANGE || - wexpr.end == WindowBoundary::EXPR_FOLLOWING_RANGE), - needs_peer(BoundaryNeedsPeer(wexpr.end) || wexpr.type == ExpressionType::WINDOW_CUME_DIST) { - } - - void Update(const idx_t row_idx, WindowInputColumn &range_collection, const idx_t chunk_idx, - WindowInputExpression &boundary_start, WindowInputExpression &boundary_end, - const ValidityMask &partition_mask, const ValidityMask &order_mask); - - void Bounds(DataChunk &bounds, idx_t row_idx, WindowInputColumn &range, const idx_t count, - WindowInputExpression &boundary_start, WindowInputExpression &boundary_end, - const ValidityMask &partition_mask, const ValidityMask &order_mask); - - // Cached lookups - const ExpressionType type; - const idx_t input_size; - const WindowBoundary start_boundary; - const WindowBoundary end_boundary; - const size_t partition_count; - const size_t order_count; - const OrderType range_sense; - const bool has_preceding_range; - const bool has_following_range; - const bool needs_peer; - - idx_t partition_start = 0; - idx_t partition_end = 0; - idx_t peer_start = 0; - idx_t peer_end = 0; - idx_t valid_start = 0; - idx_t valid_end = 0; - int64_t window_start = -1; - int64_t window_end = -1; - FrameBounds prev; -}; - -template -static T GetCell(DataChunk &chunk, idx_t column, idx_t index) { - D_ASSERT(chunk.ColumnCount() > column); - auto &source = chunk.data[column]; - const auto data = FlatVector::GetData(source); - return data[index]; -} - -static bool CellIsNull(DataChunk &chunk, idx_t column, idx_t index) { - D_ASSERT(chunk.ColumnCount() > column); - auto &source = chunk.data[column]; - return FlatVector::IsNull(source, index); -} - -static void CopyCell(DataChunk &chunk, idx_t column, idx_t index, Vector &target, idx_t target_offset) { - D_ASSERT(chunk.ColumnCount() > column); - auto &source = chunk.data[column]; - VectorOperations::Copy(source, target, index + 1, index, target_offset); -} - -template -struct WindowColumnIterator { - using iterator = WindowColumnIterator; - using iterator_category = std::random_access_iterator_tag; - using difference_type = std::ptrdiff_t; - using value_type = T; - using reference = T; - using pointer = idx_t; - - explicit WindowColumnIterator(WindowInputColumn &coll_p, pointer pos_p = 0) : coll(&coll_p), pos(pos_p) { - } - - // Forward iterator - inline reference operator*() const { - return coll->GetCell(pos); - } - inline explicit operator pointer() const { - return pos; - } - - inline iterator &operator++() { - ++pos; - return *this; - } - inline iterator operator++(int) { - auto result = *this; - ++(*this); - return result; - } - - // Bidirectional iterator - inline iterator &operator--() { - --pos; - return *this; - } - inline iterator operator--(int) { - auto result = *this; - --(*this); - return result; - } - - // Random Access - inline iterator &operator+=(difference_type n) { - pos += n; - return *this; - } - inline iterator &operator-=(difference_type n) { - pos -= n; - return *this; - } - - inline reference operator[](difference_type m) const { - return coll->GetCell(pos + m); - } - - friend inline iterator operator+(const iterator &a, difference_type n) { - return iterator(a.coll, a.pos + n); - } - - friend inline iterator operator-(const iterator &a, difference_type n) { - return iterator(a.coll, a.pos - n); - } - - friend inline iterator operator+(difference_type n, const iterator &a) { - return a + n; - } - friend inline difference_type operator-(const iterator &a, const iterator &b) { - return difference_type(a.pos - b.pos); - } - - friend inline bool operator==(const iterator &a, const iterator &b) { - return a.pos == b.pos; - } - friend inline bool operator!=(const iterator &a, const iterator &b) { - return a.pos != b.pos; - } - friend inline bool operator<(const iterator &a, const iterator &b) { - return a.pos < b.pos; - } - friend inline bool operator<=(const iterator &a, const iterator &b) { - return a.pos <= b.pos; - } - friend inline bool operator>(const iterator &a, const iterator &b) { - return a.pos > b.pos; - } - friend inline bool operator>=(const iterator &a, const iterator &b) { - return a.pos >= b.pos; - } - -private: - optional_ptr coll; - pointer pos; -}; - -template -struct OperationCompare : public std::function { - inline bool operator()(const T &lhs, const T &val) const { - return OP::template Operation(lhs, val); - } -}; - -template -static idx_t FindTypedRangeBound(WindowInputColumn &over, const idx_t order_begin, const idx_t order_end, - WindowInputExpression &boundary, const idx_t chunk_idx, const FrameBounds &prev) { - D_ASSERT(!boundary.CellIsNull(chunk_idx)); - const auto val = boundary.GetCell(chunk_idx); - - OperationCompare comp; - WindowColumnIterator begin(over, order_begin); - WindowColumnIterator end(over, order_end); - - if (order_begin < prev.first && prev.first < order_end) { - const auto first = over.GetCell(prev.first); - if (!comp(val, first)) { - // prev.first <= val, so we can start further forward - begin += (prev.first - order_begin); - } - } - if (order_begin <= prev.second && prev.second < order_end) { - const auto second = over.GetCell(prev.second); - if (!comp(second, val)) { - // val <= prev.second, so we can end further back - // (prev.second is the largest peer) - end -= (order_end - prev.second - 1); - } - } - - if (FROM) { - return idx_t(std::lower_bound(begin, end, val, comp)); - } else { - return idx_t(std::upper_bound(begin, end, val, comp)); - } -} - -template -static idx_t FindRangeBound(WindowInputColumn &over, const idx_t order_begin, const idx_t order_end, - WindowInputExpression &boundary, const idx_t chunk_idx, const FrameBounds &prev) { - D_ASSERT(boundary.chunk.ColumnCount() == 1); - D_ASSERT(boundary.chunk.data[0].GetType().InternalType() == over.input_expr.ptype); - - switch (over.input_expr.ptype) { - case PhysicalType::INT8: - return FindTypedRangeBound(over, order_begin, order_end, boundary, chunk_idx, prev); - case PhysicalType::INT16: - return FindTypedRangeBound(over, order_begin, order_end, boundary, chunk_idx, prev); - case PhysicalType::INT32: - return FindTypedRangeBound(over, order_begin, order_end, boundary, chunk_idx, prev); - case PhysicalType::INT64: - return FindTypedRangeBound(over, order_begin, order_end, boundary, chunk_idx, prev); - case PhysicalType::UINT8: - return FindTypedRangeBound(over, order_begin, order_end, boundary, chunk_idx, prev); - case PhysicalType::UINT16: - return FindTypedRangeBound(over, order_begin, order_end, boundary, chunk_idx, prev); - case PhysicalType::UINT32: - return FindTypedRangeBound(over, order_begin, order_end, boundary, chunk_idx, prev); - case PhysicalType::UINT64: - return FindTypedRangeBound(over, order_begin, order_end, boundary, chunk_idx, prev); - case PhysicalType::INT128: - return FindTypedRangeBound(over, order_begin, order_end, boundary, chunk_idx, prev); - case PhysicalType::FLOAT: - return FindTypedRangeBound(over, order_begin, order_end, boundary, chunk_idx, prev); - case PhysicalType::DOUBLE: - return FindTypedRangeBound(over, order_begin, order_end, boundary, chunk_idx, prev); - case PhysicalType::INTERVAL: - return FindTypedRangeBound(over, order_begin, order_end, boundary, chunk_idx, prev); - default: - throw InternalException("Unsupported column type for RANGE"); - } -} - -template -static idx_t FindOrderedRangeBound(WindowInputColumn &over, const OrderType range_sense, const idx_t order_begin, - const idx_t order_end, WindowInputExpression &boundary, const idx_t chunk_idx, - const FrameBounds &prev) { - switch (range_sense) { - case OrderType::ASCENDING: - return FindRangeBound(over, order_begin, order_end, boundary, chunk_idx, prev); - case OrderType::DESCENDING: - return FindRangeBound(over, order_begin, order_end, boundary, chunk_idx, prev); - default: - throw InternalException("Unsupported ORDER BY sense for RANGE"); - } -} - -void WindowBoundariesState::Update(const idx_t row_idx, WindowInputColumn &range_collection, const idx_t chunk_idx, - WindowInputExpression &boundary_start, WindowInputExpression &boundary_end, - const ValidityMask &partition_mask, const ValidityMask &order_mask) { - - if (partition_count + order_count > 0) { - - // determine partition and peer group boundaries to ultimately figure out window size - const auto is_same_partition = !partition_mask.RowIsValidUnsafe(row_idx); - const auto is_peer = !order_mask.RowIsValidUnsafe(row_idx); - - // when the partition changes, recompute the boundaries - if (!is_same_partition) { - partition_start = row_idx; - peer_start = row_idx; - - // find end of partition - partition_end = input_size; - if (partition_count) { - idx_t n = 1; - partition_end = FindNextStart(partition_mask, partition_start + 1, input_size, n); - } - - // Find valid ordering values for the new partition - // so we can exclude NULLs from RANGE expression computations - valid_start = partition_start; - valid_end = partition_end; - - if ((valid_start < valid_end) && has_preceding_range) { - // Exclude any leading NULLs - if (range_collection.CellIsNull(valid_start)) { - idx_t n = 1; - valid_start = FindNextStart(order_mask, valid_start + 1, valid_end, n); - } - } - - if ((valid_start < valid_end) && has_following_range) { - // Exclude any trailing NULLs - if (range_collection.CellIsNull(valid_end - 1)) { - idx_t n = 1; - valid_end = FindPrevStart(order_mask, valid_start, valid_end, n); - } - - // Reset range hints - prev.first = valid_start; - prev.second = valid_end; - } - } else if (!is_peer) { - peer_start = row_idx; - } - - if (needs_peer) { - peer_end = partition_end; - if (order_count) { - idx_t n = 1; - peer_end = FindNextStart(order_mask, peer_start + 1, partition_end, n); - } - } - - } else { - // OVER() - partition_end = input_size; - peer_end = partition_end; - } - - // determine window boundaries depending on the type of expression - window_start = -1; - window_end = -1; - - switch (start_boundary) { - case WindowBoundary::UNBOUNDED_PRECEDING: - window_start = partition_start; - break; - case WindowBoundary::CURRENT_ROW_ROWS: - window_start = row_idx; - break; - case WindowBoundary::CURRENT_ROW_RANGE: - window_start = peer_start; - break; - case WindowBoundary::EXPR_PRECEDING_ROWS: { - if (!TrySubtractOperator::Operation(int64_t(row_idx), boundary_start.GetCell(chunk_idx), - window_start)) { - throw OutOfRangeException("Overflow computing ROWS PRECEDING start"); - } - break; - } - case WindowBoundary::EXPR_FOLLOWING_ROWS: { - if (!TryAddOperator::Operation(int64_t(row_idx), boundary_start.GetCell(chunk_idx), window_start)) { - throw OutOfRangeException("Overflow computing ROWS FOLLOWING start"); - } - break; - } - case WindowBoundary::EXPR_PRECEDING_RANGE: { - if (boundary_start.CellIsNull(chunk_idx)) { - window_start = peer_start; - } else { - prev.first = FindOrderedRangeBound(range_collection, range_sense, valid_start, row_idx, - boundary_start, chunk_idx, prev); - window_start = prev.first; - } - break; - } - case WindowBoundary::EXPR_FOLLOWING_RANGE: { - if (boundary_start.CellIsNull(chunk_idx)) { - window_start = peer_start; - } else { - prev.first = FindOrderedRangeBound(range_collection, range_sense, row_idx, valid_end, boundary_start, - chunk_idx, prev); - window_start = prev.first; - } - break; - } - default: - throw InternalException("Unsupported window start boundary"); - } - - switch (end_boundary) { - case WindowBoundary::CURRENT_ROW_ROWS: - window_end = row_idx + 1; - break; - case WindowBoundary::CURRENT_ROW_RANGE: - window_end = peer_end; - break; - case WindowBoundary::UNBOUNDED_FOLLOWING: - window_end = partition_end; - break; - case WindowBoundary::EXPR_PRECEDING_ROWS: - if (!TrySubtractOperator::Operation(int64_t(row_idx + 1), boundary_end.GetCell(chunk_idx), - window_end)) { - throw OutOfRangeException("Overflow computing ROWS PRECEDING end"); - } - break; - case WindowBoundary::EXPR_FOLLOWING_ROWS: - if (!TryAddOperator::Operation(int64_t(row_idx + 1), boundary_end.GetCell(chunk_idx), window_end)) { - throw OutOfRangeException("Overflow computing ROWS FOLLOWING end"); - } - break; - case WindowBoundary::EXPR_PRECEDING_RANGE: { - if (boundary_end.CellIsNull(chunk_idx)) { - window_end = peer_end; - } else { - prev.second = FindOrderedRangeBound(range_collection, range_sense, valid_start, row_idx, - boundary_end, chunk_idx, prev); - window_end = prev.second; - } - break; - } - case WindowBoundary::EXPR_FOLLOWING_RANGE: { - if (boundary_end.CellIsNull(chunk_idx)) { - window_end = peer_end; - } else { - prev.second = FindOrderedRangeBound(range_collection, range_sense, row_idx, valid_end, boundary_end, - chunk_idx, prev); - window_end = prev.second; - } - break; - } - default: - throw InternalException("Unsupported window end boundary"); - } - - // clamp windows to partitions if they should exceed - if (window_start < (int64_t)partition_start) { - window_start = partition_start; - } - if (window_start > (int64_t)partition_end) { - window_start = partition_end; - } - if (window_end < (int64_t)partition_start) { - window_end = partition_start; - } - if (window_end > (int64_t)partition_end) { - window_end = partition_end; - } - - if (window_start < 0 || window_end < 0) { - throw InternalException("Failed to compute window boundaries"); - } -} - -void WindowBoundariesState::Bounds(DataChunk &bounds, idx_t row_idx, WindowInputColumn &range, const idx_t count, - WindowInputExpression &boundary_start, WindowInputExpression &boundary_end, - const ValidityMask &partition_mask, const ValidityMask &order_mask) { - bounds.Reset(); - D_ASSERT(bounds.ColumnCount() == 6); - auto partition_begin_data = FlatVector::GetData(bounds.data[PARTITION_BEGIN]); - auto partition_end_data = FlatVector::GetData(bounds.data[PARTITION_END]); - auto peer_begin_data = FlatVector::GetData(bounds.data[PEER_BEGIN]); - auto peer_end_data = FlatVector::GetData(bounds.data[PEER_END]); - auto window_begin_data = FlatVector::GetData(bounds.data[WINDOW_BEGIN]); - auto window_end_data = FlatVector::GetData(bounds.data[WINDOW_END]); - for (idx_t chunk_idx = 0; chunk_idx < count; ++chunk_idx, ++row_idx) { - Update(row_idx, range, chunk_idx, boundary_start, boundary_end, partition_mask, order_mask); - *partition_begin_data++ = partition_start; - *partition_end_data++ = partition_end; - if (needs_peer) { - *peer_begin_data++ = peer_start; - *peer_end_data++ = peer_end; - } - *window_begin_data++ = window_start; - *window_end_data++ = window_end; - } - bounds.SetCardinality(count); -} - -struct WindowExecutor { - bool IsConstantAggregate(); - bool IsCustomAggregate(); - - WindowExecutor(BoundWindowExpression &wexpr, ClientContext &context, const ValidityMask &partition_mask, - const idx_t count, WindowAggregationMode mode); - - void Sink(DataChunk &input_chunk, const idx_t input_idx, const idx_t total_count); - void Finalize(); - - void Evaluate(idx_t row_idx, DataChunk &input_chunk, Vector &result, const ValidityMask &partition_mask, - const ValidityMask &order_mask); - - // The function - BoundWindowExpression &wexpr; - const WindowAggregationMode mode; - - // Frame management - WindowBoundariesState state; - DataChunk bounds; - uint64_t dense_rank = 1; - uint64_t rank_equal = 0; - uint64_t rank = 1; - - // Expression collections - DataChunk payload_collection; - ExpressionExecutor payload_executor; - DataChunk payload_chunk; - - ExpressionExecutor filter_executor; - SelectionVector filter_sel; - - // LEAD/LAG Evaluation - WindowInputExpression leadlag_offset; - WindowInputExpression leadlag_default; - - // evaluate boundaries if present. Parser has checked boundary types. - WindowInputExpression boundary_start; - WindowInputExpression boundary_end; - - // evaluate RANGE expressions, if needed - WindowInputColumn range; - - // IGNORE NULLS - ValidityMask ignore_nulls; - - // aggregate computation algorithm - unique_ptr aggregate_state = nullptr; - -protected: - void NextRank(idx_t partition_begin, idx_t peer_begin, idx_t row_idx); - void Aggregate(DataChunk &bounds, Vector &result, idx_t count, idx_t row_idx); - void RowNumber(DataChunk &bounds, Vector &result, idx_t count, idx_t row_idx); - void Rank(DataChunk &bounds, Vector &result, idx_t count, idx_t row_idx); - void DenseRank(DataChunk &bounds, Vector &result, idx_t count, idx_t row_idx); - void PercentRank(DataChunk &bounds, Vector &result, idx_t count, idx_t row_idx); - void CumeDist(DataChunk &bounds, Vector &result, idx_t count, idx_t row_idx); - void Ntile(DataChunk &bounds, Vector &result, idx_t count, idx_t row_idx); - void LeadLag(DataChunk &bounds, Vector &result, idx_t count, idx_t row_idx); - void FirstValue(DataChunk &bounds, Vector &result, idx_t count, idx_t row_idx); - void LastValue(DataChunk &bounds, Vector &result, idx_t count, idx_t row_idx); - void NthValue(DataChunk &bounds, Vector &result, idx_t count, idx_t row_idx); -}; - -bool WindowExecutor::IsConstantAggregate() { - if (!wexpr.aggregate) { - return false; - } - - // COUNT(*) is already handled efficiently by segment trees. - if (wexpr.children.empty()) { - return false; - } - - /* - The default framing option is RANGE UNBOUNDED PRECEDING, which - is the same as RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT - ROW; it sets the frame to be all rows from the partition start - up through the current row's last peer (a row that the window's - ORDER BY clause considers equivalent to the current row; all - rows are peers if there is no ORDER BY). In general, UNBOUNDED - PRECEDING means that the frame starts with the first row of the - partition, and similarly UNBOUNDED FOLLOWING means that the - frame ends with the last row of the partition, regardless of - RANGE, ROWS or GROUPS mode. In ROWS mode, CURRENT ROW means that - the frame starts or ends with the current row; but in RANGE or - GROUPS mode it means that the frame starts or ends with the - current row's first or last peer in the ORDER BY ordering. The - offset PRECEDING and offset FOLLOWING options vary in meaning - depending on the frame mode. - */ - switch (wexpr.start) { - case WindowBoundary::UNBOUNDED_PRECEDING: - break; - case WindowBoundary::CURRENT_ROW_RANGE: - if (!wexpr.orders.empty()) { - return false; - } - break; - default: - return false; - } - - switch (wexpr.end) { - case WindowBoundary::UNBOUNDED_FOLLOWING: - break; - case WindowBoundary::CURRENT_ROW_RANGE: - if (!wexpr.orders.empty()) { - return false; - } - break; - default: - return false; - } - - return true; -} - -bool WindowExecutor::IsCustomAggregate() { - if (!wexpr.aggregate) { - return false; - } - - if (!AggregateObject(wexpr).function.window) { - return false; - } - - return (mode < WindowAggregationMode::COMBINE); -} - -WindowExecutor::WindowExecutor(BoundWindowExpression &wexpr, ClientContext &context, const ValidityMask &partition_mask, - const idx_t count, WindowAggregationMode mode) - : wexpr(wexpr), mode(mode), state(wexpr, count), payload_collection(), payload_executor(context), - filter_executor(context), leadlag_offset(wexpr.offset_expr.get(), context), - leadlag_default(wexpr.default_expr.get(), context), boundary_start(wexpr.start_expr.get(), context), - boundary_end(wexpr.end_expr.get(), context), - range((state.has_preceding_range || state.has_following_range) ? wexpr.orders[0].expression.get() : nullptr, - context, count) - -{ - // TODO we could evaluate those expressions in parallel - - // Check for constant aggregate - if (IsConstantAggregate()) { - aggregate_state = - make_uniq(AggregateObject(wexpr), wexpr.return_type, partition_mask, count); - } else if (IsCustomAggregate()) { - aggregate_state = make_uniq(AggregateObject(wexpr), wexpr.return_type, count); - } else if (wexpr.aggregate) { - // build a segment tree for frame-adhering aggregates - // see http://www.vldb.org/pvldb/vol8/p1058-leis.pdf - aggregate_state = make_uniq(AggregateObject(wexpr), wexpr.return_type, count, mode); - } - - // evaluate the FILTER clause and stuff it into a large mask for compactness and reuse - if (wexpr.filter_expr) { - filter_executor.AddExpression(*wexpr.filter_expr); - filter_sel.Initialize(STANDARD_VECTOR_SIZE); - } - - // TODO: child may be a scalar, don't need to materialize the whole collection then - - // evaluate inner expressions of window functions, could be more complex - PrepareInputExpressions(wexpr.children, payload_executor, payload_chunk); - - auto types = payload_chunk.GetTypes(); - if (!types.empty()) { - payload_collection.Initialize(Allocator::Get(context), types); - } - - vector bounds_types(6, LogicalType(LogicalTypeId::UBIGINT)); - bounds.Initialize(Allocator::Get(context), bounds_types); -} - -void WindowExecutor::Sink(DataChunk &input_chunk, const idx_t input_idx, const idx_t total_count) { - // Single pass over the input to produce the global data. - // Vectorisation for the win... - - // Set up a validity mask for IGNORE NULLS - bool check_nulls = false; - if (wexpr.ignore_nulls) { - switch (wexpr.type) { - case ExpressionType::WINDOW_LEAD: - case ExpressionType::WINDOW_LAG: - case ExpressionType::WINDOW_FIRST_VALUE: - case ExpressionType::WINDOW_LAST_VALUE: - case ExpressionType::WINDOW_NTH_VALUE: - check_nulls = true; - break; - default: - break; - } - } - - const auto count = input_chunk.size(); - - idx_t filtered = 0; - SelectionVector *filtering = nullptr; - if (wexpr.filter_expr) { - filtering = &filter_sel; - filtered = filter_executor.SelectExpression(input_chunk, filter_sel); - } - - if (!wexpr.children.empty()) { - payload_chunk.Reset(); - payload_executor.Execute(input_chunk, payload_chunk); - payload_chunk.Verify(); - if (aggregate_state) { - aggregate_state->Sink(payload_chunk, filtering, filtered); - } else { - payload_collection.Append(payload_chunk, true); - } - - // process payload chunks while they are still piping hot - if (check_nulls) { - UnifiedVectorFormat vdata; - payload_chunk.data[0].ToUnifiedFormat(count, vdata); - if (!vdata.validity.AllValid()) { - // Lazily materialise the contents when we find the first NULL - if (ignore_nulls.AllValid()) { - ignore_nulls.Initialize(total_count); - } - // Write to the current position - if (input_idx % ValidityMask::BITS_PER_VALUE == 0) { - // If we are at the edge of an output entry, just copy the entries - auto dst = ignore_nulls.GetData() + ignore_nulls.EntryCount(input_idx); - auto src = vdata.validity.GetData(); - for (auto entry_count = vdata.validity.EntryCount(count); entry_count-- > 0;) { - *dst++ = *src++; - } - } else { - // If not, we have ragged data and need to copy one bit at a time. - for (idx_t i = 0; i < count; ++i) { - ignore_nulls.Set(input_idx + i, vdata.validity.RowIsValid(i)); - } - } - } - } - } else if (aggregate_state) { - // Zero-argument aggregate (e.g., COUNT(*) - payload_chunk.SetCardinality(input_chunk); - aggregate_state->Sink(payload_chunk, filtering, filtered); - } - - range.Append(input_chunk); -} - -void WindowExecutor::Finalize() { - if (aggregate_state) { - aggregate_state->Finalize(); - } -} - -void WindowExecutor::Evaluate(idx_t row_idx, DataChunk &input_chunk, Vector &result, const ValidityMask &partition_mask, - const ValidityMask &order_mask) { - // Evaluate the row-level arguments - boundary_start.Execute(input_chunk); - boundary_end.Execute(input_chunk); - - leadlag_offset.Execute(input_chunk); - leadlag_default.Execute(input_chunk); - - const auto count = input_chunk.size(); - bounds.Reset(); - state.Bounds(bounds, row_idx, range, input_chunk.size(), boundary_start, boundary_end, partition_mask, order_mask); - +static unique_ptr WindowExecutorFactory(BoundWindowExpression &wexpr, ClientContext &context, + const ValidityMask &partition_mask, + const ValidityMask &order_mask, const idx_t payload_count, + WindowAggregationMode mode) { switch (wexpr.type) { case ExpressionType::WINDOW_AGGREGATE: - Aggregate(bounds, result, count, row_idx); - break; + return make_uniq(wexpr, context, payload_count, partition_mask, order_mask, mode); case ExpressionType::WINDOW_ROW_NUMBER: - RowNumber(bounds, result, count, row_idx); - break; + return make_uniq(wexpr, context, payload_count, partition_mask, order_mask); case ExpressionType::WINDOW_RANK_DENSE: - DenseRank(bounds, result, count, row_idx); - break; + return make_uniq(wexpr, context, payload_count, partition_mask, order_mask); case ExpressionType::WINDOW_RANK: - Rank(bounds, result, count, row_idx); - break; + return make_uniq(wexpr, context, payload_count, partition_mask, order_mask); case ExpressionType::WINDOW_PERCENT_RANK: - PercentRank(bounds, result, count, row_idx); - break; + return make_uniq(wexpr, context, payload_count, partition_mask, order_mask); case ExpressionType::WINDOW_CUME_DIST: - CumeDist(bounds, result, count, row_idx); - break; + return make_uniq(wexpr, context, payload_count, partition_mask, order_mask); case ExpressionType::WINDOW_NTILE: - Ntile(bounds, result, count, row_idx); - break; + return make_uniq(wexpr, context, payload_count, partition_mask, order_mask); case ExpressionType::WINDOW_LEAD: case ExpressionType::WINDOW_LAG: - LeadLag(bounds, result, count, row_idx); - break; + return make_uniq(wexpr, context, payload_count, partition_mask, order_mask); case ExpressionType::WINDOW_FIRST_VALUE: - FirstValue(bounds, result, count, row_idx); - break; + return make_uniq(wexpr, context, payload_count, partition_mask, order_mask); case ExpressionType::WINDOW_LAST_VALUE: - LastValue(bounds, result, count, row_idx); - break; + return make_uniq(wexpr, context, payload_count, partition_mask, order_mask); case ExpressionType::WINDOW_NTH_VALUE: - NthValue(bounds, result, count, row_idx); + return make_uniq(wexpr, context, payload_count, partition_mask, order_mask); break; default: throw InternalException("Window aggregate type %s", ExpressionTypeToString(wexpr.type)); } - - result.Verify(count); -} - -void WindowExecutor::NextRank(idx_t partition_begin, idx_t peer_begin, idx_t row_idx) { - if (partition_begin == row_idx) { - dense_rank = 1; - rank = 1; - rank_equal = 0; - } else if (peer_begin == row_idx) { - dense_rank++; - rank += rank_equal; - rank_equal = 0; - } - rank_equal++; -} - -void WindowExecutor::Aggregate(DataChunk &bounds, Vector &result, idx_t count, idx_t row_idx) { - D_ASSERT(aggregate_state); - auto window_begin = FlatVector::GetData(bounds.data[WINDOW_BEGIN]); - auto window_end = FlatVector::GetData(bounds.data[WINDOW_END]); - aggregate_state->Evaluate(window_begin, window_end, result, count); -} - -void WindowExecutor::RowNumber(DataChunk &bounds, Vector &result, idx_t count, idx_t row_idx) { - auto partition_begin = FlatVector::GetData(bounds.data[PARTITION_BEGIN]); - auto rdata = FlatVector::GetData(result); - for (idx_t i = 0; i < count; ++i, ++row_idx) { - rdata[i] = row_idx - partition_begin[i] + 1; - } -} - -void WindowExecutor::Rank(DataChunk &bounds, Vector &result, idx_t count, idx_t row_idx) { - auto partition_begin = FlatVector::GetData(bounds.data[PARTITION_BEGIN]); - auto peer_begin = FlatVector::GetData(bounds.data[PEER_BEGIN]); - auto rdata = FlatVector::GetData(result); - for (idx_t i = 0; i < count; ++i, ++row_idx) { - NextRank(partition_begin[i], peer_begin[i], row_idx); - rdata[i] = rank; - } -} - -void WindowExecutor::DenseRank(DataChunk &bounds, Vector &result, idx_t count, idx_t row_idx) { - auto partition_begin = FlatVector::GetData(bounds.data[PARTITION_BEGIN]); - auto peer_begin = FlatVector::GetData(bounds.data[PEER_BEGIN]); - auto rdata = FlatVector::GetData(result); - for (idx_t i = 0; i < count; ++i, ++row_idx) { - NextRank(partition_begin[i], peer_begin[i], row_idx); - rdata[i] = dense_rank; - } -} - -void WindowExecutor::PercentRank(DataChunk &bounds, Vector &result, idx_t count, idx_t row_idx) { - auto partition_begin = FlatVector::GetData(bounds.data[PARTITION_BEGIN]); - auto partition_end = FlatVector::GetData(bounds.data[PARTITION_END]); - auto peer_begin = FlatVector::GetData(bounds.data[PEER_BEGIN]); - auto rdata = FlatVector::GetData(result); - for (idx_t i = 0; i < count; ++i, ++row_idx) { - NextRank(partition_begin[i], peer_begin[i], row_idx); - int64_t denom = partition_end[i] - partition_begin[i] - 1; - double percent_rank = denom > 0 ? ((double)rank - 1) / denom : 0; - rdata[i] = percent_rank; - } -} - -void WindowExecutor::CumeDist(DataChunk &bounds, Vector &result, idx_t count, idx_t row_idx) { - auto partition_begin = FlatVector::GetData(bounds.data[PARTITION_BEGIN]); - auto partition_end = FlatVector::GetData(bounds.data[PARTITION_END]); - auto peer_begin = FlatVector::GetData(bounds.data[PEER_BEGIN]); - auto peer_end = FlatVector::GetData(bounds.data[PEER_END]); - auto rdata = FlatVector::GetData(result); - for (idx_t i = 0; i < count; ++i, ++row_idx) { - NextRank(partition_begin[i], peer_begin[i], row_idx); - int64_t denom = partition_end[i] - partition_begin[i]; - double cume_dist = denom > 0 ? ((double)(peer_end[i] - partition_begin[i])) / denom : 0; - rdata[i] = cume_dist; - } -} - -void WindowExecutor::Ntile(DataChunk &bounds, Vector &result, idx_t count, idx_t row_idx) { - D_ASSERT(payload_collection.ColumnCount() == 1); - auto partition_begin = FlatVector::GetData(bounds.data[PARTITION_BEGIN]); - auto partition_end = FlatVector::GetData(bounds.data[PARTITION_END]); - auto rdata = FlatVector::GetData(result); - for (idx_t i = 0; i < count; ++i, ++row_idx) { - if (CellIsNull(payload_collection, 0, row_idx)) { - FlatVector::SetNull(result, i, true); - } else { - auto n_param = GetCell(payload_collection, 0, row_idx); - if (n_param < 1) { - throw InvalidInputException("Argument for ntile must be greater than zero"); - } - // With thanks from SQLite's ntileValueFunc() - int64_t n_total = partition_end[i] - partition_begin[i]; - if (n_param > n_total) { - // more groups allowed than we have values - // map every entry to a unique group - n_param = n_total; - } - int64_t n_size = (n_total / n_param); - // find the row idx within the group - D_ASSERT(row_idx >= partition_begin[i]); - int64_t adjusted_row_idx = row_idx - partition_begin[i]; - // now compute the ntile - int64_t n_large = n_total - n_param * n_size; - int64_t i_small = n_large * (n_size + 1); - int64_t result_ntile; - - D_ASSERT((n_large * (n_size + 1) + (n_param - n_large) * n_size) == n_total); - - if (adjusted_row_idx < i_small) { - result_ntile = 1 + adjusted_row_idx / (n_size + 1); - } else { - result_ntile = 1 + n_large + (adjusted_row_idx - i_small) / n_size; - } - // result has to be between [1, NTILE] - D_ASSERT(result_ntile >= 1 && result_ntile <= n_param); - rdata[i] = result_ntile; - } - } -} - -void WindowExecutor::LeadLag(DataChunk &bounds, Vector &result, idx_t count, idx_t row_idx) { - auto partition_begin = FlatVector::GetData(bounds.data[PARTITION_BEGIN]); - auto partition_end = FlatVector::GetData(bounds.data[PARTITION_END]); - for (idx_t i = 0; i < count; ++i, ++row_idx) { - int64_t offset = 1; - if (wexpr.offset_expr) { - offset = leadlag_offset.GetCell(i); - } - int64_t val_idx = (int64_t)row_idx; - if (wexpr.type == ExpressionType::WINDOW_LEAD) { - val_idx += offset; - } else { - val_idx -= offset; - } - - idx_t delta = 0; - if (val_idx < (int64_t)row_idx) { - // Count backwards - delta = idx_t(row_idx - val_idx); - val_idx = FindPrevStart(ignore_nulls, partition_begin[i], row_idx, delta); - } else if (val_idx > (int64_t)row_idx) { - delta = idx_t(val_idx - row_idx); - val_idx = FindNextStart(ignore_nulls, row_idx + 1, partition_end[i], delta); - } - // else offset is zero, so don't move. - - if (!delta) { - CopyCell(payload_collection, 0, val_idx, result, i); - } else if (wexpr.default_expr) { - leadlag_default.CopyCell(result, i); - } else { - FlatVector::SetNull(result, i, true); - } - } -} - -void WindowExecutor::FirstValue(DataChunk &bounds, Vector &result, idx_t count, idx_t row_idx) { - auto window_begin = FlatVector::GetData(bounds.data[WINDOW_BEGIN]); - auto window_end = FlatVector::GetData(bounds.data[WINDOW_END]); - auto &rmask = FlatVector::Validity(result); - for (idx_t i = 0; i < count; ++i, ++row_idx) { - if (window_begin[i] >= window_end[i]) { - rmask.SetInvalid(i); - continue; - } - // Same as NTH_VALUE(..., 1) - idx_t n = 1; - const auto first_idx = FindNextStart(ignore_nulls, window_begin[i], window_end[i], n); - if (!n) { - CopyCell(payload_collection, 0, first_idx, result, i); - } else { - FlatVector::SetNull(result, i, true); - } - } -} - -void WindowExecutor::LastValue(DataChunk &bounds, Vector &result, idx_t count, idx_t row_idx) { - auto window_begin = FlatVector::GetData(bounds.data[WINDOW_BEGIN]); - auto window_end = FlatVector::GetData(bounds.data[WINDOW_END]); - auto &rmask = FlatVector::Validity(result); - for (idx_t i = 0; i < count; ++i, ++row_idx) { - if (window_begin[i] >= window_end[i]) { - rmask.SetInvalid(i); - continue; - } - idx_t n = 1; - const auto last_idx = FindPrevStart(ignore_nulls, window_begin[i], window_end[i], n); - if (!n) { - CopyCell(payload_collection, 0, last_idx, result, i); - } else { - FlatVector::SetNull(result, i, true); - } - } -} - -void WindowExecutor::NthValue(DataChunk &bounds, Vector &result, idx_t count, idx_t row_idx) { - D_ASSERT(payload_collection.ColumnCount() == 2); - - auto window_begin = FlatVector::GetData(bounds.data[WINDOW_BEGIN]); - auto window_end = FlatVector::GetData(bounds.data[WINDOW_END]); - auto &rmask = FlatVector::Validity(result); - for (idx_t i = 0; i < count; ++i, ++row_idx) { - if (window_begin[i] >= window_end[i]) { - rmask.SetInvalid(i); - continue; - } - // Returns value evaluated at the row that is the n'th row of the window frame (counting from 1); - // returns NULL if there is no such row. - if (CellIsNull(payload_collection, 1, row_idx)) { - FlatVector::SetNull(result, i, true); - } else { - auto n_param = GetCell(payload_collection, 1, row_idx); - if (n_param < 1) { - FlatVector::SetNull(result, i, true); - } else { - auto n = idx_t(n_param); - const auto nth_index = FindNextStart(ignore_nulls, window_begin[i], window_end[i], n); - if (!n) { - CopyCell(payload_collection, 0, nth_index, result, i); - } else { - FlatVector::SetNull(result, i, true); - } - } - } - } } //===--------------------------------------------------------------------===// @@ -1346,8 +198,10 @@ class WindowGlobalSourceState : public GlobalSourceState { class WindowLocalSourceState : public LocalSourceState { public: using HashGroupPtr = unique_ptr; - using WindowExecutorPtr = unique_ptr; - using WindowExecutors = vector; + using ExecutorPtr = unique_ptr; + using Executors = vector; + using LocalStatePtr = unique_ptr; + using LocalStates = vector; WindowLocalSourceState(const PhysicalWindow &op_p, ExecutionContext &context, WindowGlobalSourceState &gsource) : context(context.client), op(op_p), gsink(gsource.gsink) { @@ -1386,7 +240,8 @@ class WindowLocalSourceState : public LocalSourceState { vector order_bits; ValidityMask order_mask; //! The current execution functions - WindowExecutors window_execs; + Executors executors; + LocalStates local_states; //! The read partition idx_t hash_bin; @@ -1488,12 +343,13 @@ void WindowLocalSourceState::GeneratePartition(WindowGlobalSinkState &gstate, co } // Create the executors for each function - window_execs.clear(); + local_states.clear(); + executors.clear(); for (idx_t expr_idx = 0; expr_idx < op.select_list.size(); ++expr_idx) { D_ASSERT(op.select_list[expr_idx]->GetExpressionClass() == ExpressionClass::BOUND_WINDOW); auto &wexpr = op.select_list[expr_idx]->Cast(); - auto wexec = make_uniq(wexpr, context, partition_mask, count, gstate.mode); - window_execs.emplace_back(std::move(wexec)); + auto wexec = WindowExecutorFactory(wexpr, context, partition_mask, order_mask, count, gstate.mode); + executors.emplace_back(std::move(wexec)); } // First pass over the input without flushing @@ -1508,15 +364,16 @@ void WindowLocalSourceState::GeneratePartition(WindowGlobalSinkState &gstate, co } // TODO: Parallelization opportunity - for (auto &wexec : window_execs) { + for (auto &wexec : executors) { wexec->Sink(input_chunk, input_idx, scanner->Count()); } input_idx += input_chunk.size(); } // TODO: Parallelization opportunity - for (auto &wexec : window_execs) { + for (auto &wexec : executors) { wexec->Finalize(); + local_states.emplace_back(wexec->GetExecutorState()); } // External scanning assumes all blocks are swizzled. @@ -1537,9 +394,11 @@ void WindowLocalSourceState::Scan(DataChunk &result) { scanner->Scan(input_chunk); output_chunk.Reset(); - for (idx_t expr_idx = 0; expr_idx < window_execs.size(); ++expr_idx) { - auto &executor = *window_execs[expr_idx]; - executor.Evaluate(position, input_chunk, output_chunk.data[expr_idx], partition_mask, order_mask); + for (idx_t expr_idx = 0; expr_idx < executors.size(); ++expr_idx) { + auto &executor = *executors[expr_idx]; + auto &lstate = *local_states[expr_idx]; + auto &result = output_chunk.data[expr_idx]; + executor.Evaluate(position, input_chunk, result, lstate); } output_chunk.SetCardinality(input_chunk); output_chunk.Verify(); diff --git a/src/execution/window_executor.cpp b/src/execution/window_executor.cpp new file mode 100644 index 000000000000..168711999dab --- /dev/null +++ b/src/execution/window_executor.cpp @@ -0,0 +1,1280 @@ +#include "duckdb/execution/window_executor.hpp" + +#include "duckdb/common/operator/add.hpp" +#include "duckdb/common/operator/subtract.hpp" + +namespace duckdb { + +static idx_t FindNextStart(const ValidityMask &mask, idx_t l, const idx_t r, idx_t &n) { + if (mask.AllValid()) { + auto start = MinValue(l + n - 1, r); + n -= MinValue(n, r - l); + return start; + } + + while (l < r) { + // If l is aligned with the start of a block, and the block is blank, then skip forward one block. + idx_t entry_idx; + idx_t shift; + mask.GetEntryIndex(l, entry_idx, shift); + + const auto block = mask.GetValidityEntry(entry_idx); + if (mask.NoneValid(block) && !shift) { + l += ValidityMask::BITS_PER_VALUE; + continue; + } + + // Loop over the block + for (; shift < ValidityMask::BITS_PER_VALUE && l < r; ++shift, ++l) { + if (mask.RowIsValid(block, shift) && --n == 0) { + return MinValue(l, r); + } + } + } + + // Didn't find a start so return the end of the range + return r; +} + +static idx_t FindPrevStart(const ValidityMask &mask, const idx_t l, idx_t r, idx_t &n) { + if (mask.AllValid()) { + auto start = (r <= l + n) ? l : r - n; + n -= r - start; + return start; + } + + while (l < r) { + // If r is aligned with the start of a block, and the previous block is blank, + // then skip backwards one block. + idx_t entry_idx; + idx_t shift; + mask.GetEntryIndex(r - 1, entry_idx, shift); + + const auto block = mask.GetValidityEntry(entry_idx); + if (mask.NoneValid(block) && (shift + 1 == ValidityMask::BITS_PER_VALUE)) { + // r is nonzero (> l) and word aligned, so this will not underflow. + r -= ValidityMask::BITS_PER_VALUE; + continue; + } + + // Loop backwards over the block + // shift is probing r-1 >= l >= 0 + for (++shift; shift-- > 0; --r) { + if (mask.RowIsValid(block, shift) && --n == 0) { + return MaxValue(l, r - 1); + } + } + } + + // Didn't find a start so return the start of the range + return l; +} + +template +static T GetCell(const DataChunk &chunk, idx_t column, idx_t index) { + D_ASSERT(chunk.ColumnCount() > column); + auto &source = chunk.data[column]; + const auto data = FlatVector::GetData(source); + return data[index]; +} + +static bool CellIsNull(const DataChunk &chunk, idx_t column, idx_t index) { + D_ASSERT(chunk.ColumnCount() > column); + auto &source = chunk.data[column]; + return FlatVector::IsNull(source, index); +} + +static void CopyCell(const DataChunk &chunk, idx_t column, idx_t index, Vector &target, idx_t target_offset) { + D_ASSERT(chunk.ColumnCount() > column); + auto &source = chunk.data[column]; + VectorOperations::Copy(source, target, index + 1, index, target_offset); +} + +//===--------------------------------------------------------------------===// +// WindowColumnIterator +//===--------------------------------------------------------------------===// +template +struct WindowColumnIterator { + using iterator = WindowColumnIterator; + using iterator_category = std::random_access_iterator_tag; + using difference_type = std::ptrdiff_t; + using value_type = T; + using reference = T; + using pointer = idx_t; + + explicit WindowColumnIterator(const WindowInputColumn &coll_p, pointer pos_p = 0) : coll(&coll_p), pos(pos_p) { + } + + // Forward iterator + inline reference operator*() const { + return coll->GetCell(pos); + } + inline explicit operator pointer() const { + return pos; + } + + inline iterator &operator++() { + ++pos; + return *this; + } + inline iterator operator++(int) { + auto result = *this; + ++(*this); + return result; + } + + // Bidirectional iterator + inline iterator &operator--() { + --pos; + return *this; + } + inline iterator operator--(int) { + auto result = *this; + --(*this); + return result; + } + + // Random Access + inline iterator &operator+=(difference_type n) { + pos += n; + return *this; + } + inline iterator &operator-=(difference_type n) { + pos -= n; + return *this; + } + + inline reference operator[](difference_type m) const { + return coll->GetCell(pos + m); + } + + friend inline iterator &operator+(const iterator &a, difference_type n) { + return iterator(a.coll, a.pos + n); + } + + friend inline iterator &operator-(const iterator &a, difference_type n) { + return iterator(a.coll, a.pos - n); + } + + friend inline iterator &operator+(difference_type n, const iterator &a) { + return a + n; + } + friend inline difference_type operator-(const iterator &a, const iterator &b) { + return difference_type(a.pos - b.pos); + } + + friend inline bool operator==(const iterator &a, const iterator &b) { + return a.pos == b.pos; + } + friend inline bool operator!=(const iterator &a, const iterator &b) { + return a.pos != b.pos; + } + friend inline bool operator<(const iterator &a, const iterator &b) { + return a.pos < b.pos; + } + friend inline bool operator<=(const iterator &a, const iterator &b) { + return a.pos <= b.pos; + } + friend inline bool operator>(const iterator &a, const iterator &b) { + return a.pos > b.pos; + } + friend inline bool operator>=(const iterator &a, const iterator &b) { + return a.pos >= b.pos; + } + +private: + optional_ptr coll; + pointer pos; +}; + +template +struct OperationCompare : public std::function { + inline bool operator()(const T &lhs, const T &val) const { + return OP::template Operation(lhs, val); + } +}; + +template +static idx_t FindTypedRangeBound(const WindowInputColumn &over, const idx_t order_begin, const idx_t order_end, + WindowInputExpression &boundary, const idx_t chunk_idx, const FrameBounds &prev) { + D_ASSERT(!boundary.CellIsNull(chunk_idx)); + const auto val = boundary.GetCell(chunk_idx); + + OperationCompare comp; + WindowColumnIterator begin(over, order_begin); + WindowColumnIterator end(over, order_end); + + if (order_begin < prev.first && prev.first < order_end) { + const auto first = over.GetCell(prev.first); + if (!comp(val, first)) { + // prev.first <= val, so we can start further forward + begin += (prev.first - order_begin); + } + } + if (order_begin <= prev.second && prev.second < order_end) { + const auto second = over.GetCell(prev.second); + if (!comp(second, val)) { + // val <= prev.second, so we can end further back + // (prev.second is the largest peer) + end -= (order_end - prev.second - 1); + } + } + + if (FROM) { + return idx_t(std::lower_bound(begin, end, val, comp)); + } else { + return idx_t(std::upper_bound(begin, end, val, comp)); + } +} + +template +static idx_t FindRangeBound(const WindowInputColumn &over, const idx_t order_begin, const idx_t order_end, + WindowInputExpression &boundary, const idx_t chunk_idx, const FrameBounds &prev) { + D_ASSERT(boundary.chunk.ColumnCount() == 1); + D_ASSERT(boundary.chunk.data[0].GetType().InternalType() == over.input_expr.ptype); + + switch (over.input_expr.ptype) { + case PhysicalType::INT8: + return FindTypedRangeBound(over, order_begin, order_end, boundary, chunk_idx, prev); + case PhysicalType::INT16: + return FindTypedRangeBound(over, order_begin, order_end, boundary, chunk_idx, prev); + case PhysicalType::INT32: + return FindTypedRangeBound(over, order_begin, order_end, boundary, chunk_idx, prev); + case PhysicalType::INT64: + return FindTypedRangeBound(over, order_begin, order_end, boundary, chunk_idx, prev); + case PhysicalType::UINT8: + return FindTypedRangeBound(over, order_begin, order_end, boundary, chunk_idx, prev); + case PhysicalType::UINT16: + return FindTypedRangeBound(over, order_begin, order_end, boundary, chunk_idx, prev); + case PhysicalType::UINT32: + return FindTypedRangeBound(over, order_begin, order_end, boundary, chunk_idx, prev); + case PhysicalType::UINT64: + return FindTypedRangeBound(over, order_begin, order_end, boundary, chunk_idx, prev); + case PhysicalType::INT128: + return FindTypedRangeBound(over, order_begin, order_end, boundary, chunk_idx, prev); + case PhysicalType::FLOAT: + return FindTypedRangeBound(over, order_begin, order_end, boundary, chunk_idx, prev); + case PhysicalType::DOUBLE: + return FindTypedRangeBound(over, order_begin, order_end, boundary, chunk_idx, prev); + case PhysicalType::INTERVAL: + return FindTypedRangeBound(over, order_begin, order_end, boundary, chunk_idx, prev); + default: + throw InternalException("Unsupported column type for RANGE"); + } +} + +template +static idx_t FindOrderedRangeBound(const WindowInputColumn &over, const OrderType range_sense, const idx_t order_begin, + const idx_t order_end, WindowInputExpression &boundary, const idx_t chunk_idx, + const FrameBounds &prev) { + switch (range_sense) { + case OrderType::ASCENDING: + return FindRangeBound(over, order_begin, order_end, boundary, chunk_idx, prev); + case OrderType::DESCENDING: + return FindRangeBound(over, order_begin, order_end, boundary, chunk_idx, prev); + default: + throw InternalException("Unsupported ORDER BY sense for RANGE"); + } +} + +struct WindowBoundariesState { + using FrameBounds = std::pair; + + static inline bool IsScalar(const unique_ptr &expr) { + return expr ? expr->IsScalar() : true; + } + + static inline bool BoundaryNeedsPeer(const WindowBoundary &boundary) { + switch (boundary) { + case WindowBoundary::CURRENT_ROW_RANGE: + case WindowBoundary::EXPR_PRECEDING_RANGE: + case WindowBoundary::EXPR_FOLLOWING_RANGE: + return true; + default: + return false; + } + } + + WindowBoundariesState(BoundWindowExpression &wexpr, const idx_t input_size); + + void Update(const idx_t row_idx, const WindowInputColumn &range_collection, const idx_t chunk_idx, + WindowInputExpression &boundary_start, WindowInputExpression &boundary_end, + const ValidityMask &partition_mask, const ValidityMask &order_mask); + + void Bounds(DataChunk &bounds, idx_t row_idx, const WindowInputColumn &range, const idx_t count, + WindowInputExpression &boundary_start, WindowInputExpression &boundary_end, + const ValidityMask &partition_mask, const ValidityMask &order_mask); + + // Cached lookups + const ExpressionType type; + const idx_t input_size; + const WindowBoundary start_boundary; + const WindowBoundary end_boundary; + const size_t partition_count; + const size_t order_count; + const OrderType range_sense; + const bool has_preceding_range; + const bool has_following_range; + const bool needs_peer; + + idx_t partition_start = 0; + idx_t partition_end = 0; + idx_t peer_start = 0; + idx_t peer_end = 0; + idx_t valid_start = 0; + idx_t valid_end = 0; + int64_t window_start = -1; + int64_t window_end = -1; + FrameBounds prev; +}; + +//===--------------------------------------------------------------------===// +// WindowBoundariesState +//===--------------------------------------------------------------------===// +void WindowBoundariesState::Update(const idx_t row_idx, const WindowInputColumn &range_collection, + const idx_t chunk_idx, WindowInputExpression &boundary_start, + WindowInputExpression &boundary_end, const ValidityMask &partition_mask, + const ValidityMask &order_mask) { + + if (partition_count + order_count > 0) { + + // determine partition and peer group boundaries to ultimately figure out window size + const auto is_same_partition = !partition_mask.RowIsValidUnsafe(row_idx); + const auto is_peer = !order_mask.RowIsValidUnsafe(row_idx); + + // when the partition changes, recompute the boundaries + if (!is_same_partition) { + partition_start = row_idx; + peer_start = row_idx; + + // find end of partition + partition_end = input_size; + if (partition_count) { + idx_t n = 1; + partition_end = FindNextStart(partition_mask, partition_start + 1, input_size, n); + } + + // Find valid ordering values for the new partition + // so we can exclude NULLs from RANGE expression computations + valid_start = partition_start; + valid_end = partition_end; + + if ((valid_start < valid_end) && has_preceding_range) { + // Exclude any leading NULLs + if (range_collection.CellIsNull(valid_start)) { + idx_t n = 1; + valid_start = FindNextStart(order_mask, valid_start + 1, valid_end, n); + } + } + + if ((valid_start < valid_end) && has_following_range) { + // Exclude any trailing NULLs + if (range_collection.CellIsNull(valid_end - 1)) { + idx_t n = 1; + valid_end = FindPrevStart(order_mask, valid_start, valid_end, n); + } + + // Reset range hints + prev.first = valid_start; + prev.second = valid_end; + } + } else if (!is_peer) { + peer_start = row_idx; + } + + if (needs_peer) { + peer_end = partition_end; + if (order_count) { + idx_t n = 1; + peer_end = FindNextStart(order_mask, peer_start + 1, partition_end, n); + } + } + + } else { + // OVER() + partition_end = input_size; + peer_end = partition_end; + } + + // determine window boundaries depending on the type of expression + window_start = -1; + window_end = -1; + + switch (start_boundary) { + case WindowBoundary::UNBOUNDED_PRECEDING: + window_start = partition_start; + break; + case WindowBoundary::CURRENT_ROW_ROWS: + window_start = row_idx; + break; + case WindowBoundary::CURRENT_ROW_RANGE: + window_start = peer_start; + break; + case WindowBoundary::EXPR_PRECEDING_ROWS: { + if (!TrySubtractOperator::Operation(int64_t(row_idx), boundary_start.GetCell(chunk_idx), + window_start)) { + throw OutOfRangeException("Overflow computing ROWS PRECEDING start"); + } + break; + } + case WindowBoundary::EXPR_FOLLOWING_ROWS: { + if (!TryAddOperator::Operation(int64_t(row_idx), boundary_start.GetCell(chunk_idx), window_start)) { + throw OutOfRangeException("Overflow computing ROWS FOLLOWING start"); + } + break; + } + case WindowBoundary::EXPR_PRECEDING_RANGE: { + if (boundary_start.CellIsNull(chunk_idx)) { + window_start = peer_start; + } else { + prev.first = FindOrderedRangeBound(range_collection, range_sense, valid_start, row_idx, + boundary_start, chunk_idx, prev); + window_start = prev.first; + } + break; + } + case WindowBoundary::EXPR_FOLLOWING_RANGE: { + if (boundary_start.CellIsNull(chunk_idx)) { + window_start = peer_start; + } else { + prev.first = FindOrderedRangeBound(range_collection, range_sense, row_idx, valid_end, boundary_start, + chunk_idx, prev); + window_start = prev.first; + } + break; + } + default: + throw InternalException("Unsupported window start boundary"); + } + + switch (end_boundary) { + case WindowBoundary::CURRENT_ROW_ROWS: + window_end = row_idx + 1; + break; + case WindowBoundary::CURRENT_ROW_RANGE: + window_end = peer_end; + break; + case WindowBoundary::UNBOUNDED_FOLLOWING: + window_end = partition_end; + break; + case WindowBoundary::EXPR_PRECEDING_ROWS: + if (!TrySubtractOperator::Operation(int64_t(row_idx + 1), boundary_end.GetCell(chunk_idx), + window_end)) { + throw OutOfRangeException("Overflow computing ROWS PRECEDING end"); + } + break; + case WindowBoundary::EXPR_FOLLOWING_ROWS: + if (!TryAddOperator::Operation(int64_t(row_idx + 1), boundary_end.GetCell(chunk_idx), window_end)) { + throw OutOfRangeException("Overflow computing ROWS FOLLOWING end"); + } + break; + case WindowBoundary::EXPR_PRECEDING_RANGE: { + if (boundary_end.CellIsNull(chunk_idx)) { + window_end = peer_end; + } else { + prev.second = FindOrderedRangeBound(range_collection, range_sense, valid_start, row_idx, + boundary_end, chunk_idx, prev); + window_end = prev.second; + } + break; + } + case WindowBoundary::EXPR_FOLLOWING_RANGE: { + if (boundary_end.CellIsNull(chunk_idx)) { + window_end = peer_end; + } else { + prev.second = FindOrderedRangeBound(range_collection, range_sense, row_idx, valid_end, boundary_end, + chunk_idx, prev); + window_end = prev.second; + } + break; + } + default: + throw InternalException("Unsupported window end boundary"); + } + + // clamp windows to partitions if they should exceed + if (window_start < (int64_t)partition_start) { + window_start = partition_start; + } + if (window_start > (int64_t)partition_end) { + window_start = partition_end; + } + if (window_end < (int64_t)partition_start) { + window_end = partition_start; + } + if (window_end > (int64_t)partition_end) { + window_end = partition_end; + } + + if (window_start < 0 || window_end < 0) { + throw InternalException("Failed to compute window boundaries"); + } +} + +static bool HasPrecedingRange(BoundWindowExpression &wexpr) { + return (wexpr.start == WindowBoundary::EXPR_PRECEDING_RANGE || wexpr.end == WindowBoundary::EXPR_PRECEDING_RANGE); +} + +static bool HasFollowingRange(BoundWindowExpression &wexpr) { + return (wexpr.start == WindowBoundary::EXPR_FOLLOWING_RANGE || wexpr.end == WindowBoundary::EXPR_FOLLOWING_RANGE); +} + +WindowBoundariesState::WindowBoundariesState(BoundWindowExpression &wexpr, const idx_t input_size) + : type(wexpr.type), input_size(input_size), start_boundary(wexpr.start), end_boundary(wexpr.end), + partition_count(wexpr.partitions.size()), order_count(wexpr.orders.size()), + range_sense(wexpr.orders.empty() ? OrderType::INVALID : wexpr.orders[0].type), + has_preceding_range(HasPrecedingRange(wexpr)), has_following_range(HasFollowingRange(wexpr)), + needs_peer(BoundaryNeedsPeer(wexpr.end) || wexpr.type == ExpressionType::WINDOW_CUME_DIST) { +} + +void WindowBoundariesState::Bounds(DataChunk &bounds, idx_t row_idx, const WindowInputColumn &range, const idx_t count, + WindowInputExpression &boundary_start, WindowInputExpression &boundary_end, + const ValidityMask &partition_mask, const ValidityMask &order_mask) { + bounds.Reset(); + D_ASSERT(bounds.ColumnCount() == 6); + auto partition_begin_data = FlatVector::GetData(bounds.data[PARTITION_BEGIN]); + auto partition_end_data = FlatVector::GetData(bounds.data[PARTITION_END]); + auto peer_begin_data = FlatVector::GetData(bounds.data[PEER_BEGIN]); + auto peer_end_data = FlatVector::GetData(bounds.data[PEER_END]); + auto window_begin_data = FlatVector::GetData(bounds.data[WINDOW_BEGIN]); + auto window_end_data = FlatVector::GetData(bounds.data[WINDOW_END]); + for (idx_t chunk_idx = 0; chunk_idx < count; ++chunk_idx, ++row_idx) { + Update(row_idx, range, chunk_idx, boundary_start, boundary_end, partition_mask, order_mask); + *partition_begin_data++ = partition_start; + *partition_end_data++ = partition_end; + if (needs_peer) { + *peer_begin_data++ = peer_start; + *peer_end_data++ = peer_end; + } + *window_begin_data++ = window_start; + *window_end_data++ = window_end; + } + bounds.SetCardinality(count); +} + +//===--------------------------------------------------------------------===// +// WindowExecutorBoundsState +//===--------------------------------------------------------------------===// +class WindowExecutorBoundsState : public WindowExecutorState { +public: + WindowExecutorBoundsState(BoundWindowExpression &wexpr, ClientContext &context, const idx_t count, + const ValidityMask &partition_mask_p, const ValidityMask &order_mask_p); + ~WindowExecutorBoundsState() override { + } + + virtual void UpdateBounds(idx_t row_idx, DataChunk &input_chunk, const WindowInputColumn &range); + + // Frame management + const ValidityMask &partition_mask; + const ValidityMask &order_mask; + DataChunk bounds; + WindowBoundariesState state; + + // evaluate boundaries if present. Parser has checked boundary types. + WindowInputExpression boundary_start; + WindowInputExpression boundary_end; +}; + +WindowExecutorBoundsState::WindowExecutorBoundsState(BoundWindowExpression &wexpr, ClientContext &context, + const idx_t payload_count, const ValidityMask &partition_mask_p, + const ValidityMask &order_mask_p) + : partition_mask(partition_mask_p), order_mask(order_mask_p), state(wexpr, payload_count), + boundary_start(wexpr.start_expr.get(), context), boundary_end(wexpr.end_expr.get(), context) { + vector bounds_types(6, LogicalType(LogicalTypeId::UBIGINT)); + bounds.Initialize(Allocator::Get(context), bounds_types); +} + +void WindowExecutorBoundsState::UpdateBounds(idx_t row_idx, DataChunk &input_chunk, const WindowInputColumn &range) { + // Evaluate the row-level arguments + boundary_start.Execute(input_chunk); + boundary_end.Execute(input_chunk); + + const auto count = input_chunk.size(); + bounds.Reset(); + state.Bounds(bounds, row_idx, range, count, boundary_start, boundary_end, partition_mask, order_mask); +} + +//===--------------------------------------------------------------------===// +// WindowExecutor +//===--------------------------------------------------------------------===// +static void PrepareInputExpressions(vector> &exprs, ExpressionExecutor &executor, + DataChunk &chunk) { + if (exprs.empty()) { + return; + } + + vector types; + for (idx_t expr_idx = 0; expr_idx < exprs.size(); ++expr_idx) { + types.push_back(exprs[expr_idx]->return_type); + executor.AddExpression(*exprs[expr_idx]); + } + + if (!types.empty()) { + auto &allocator = executor.GetAllocator(); + chunk.Initialize(allocator, types); + } +} + +WindowExecutor::WindowExecutor(BoundWindowExpression &wexpr, ClientContext &context, const idx_t payload_count, + const ValidityMask &partition_mask, const ValidityMask &order_mask) + : wexpr(wexpr), context(context), payload_count(payload_count), partition_mask(partition_mask), + order_mask(order_mask), payload_collection(), payload_executor(context), + range((HasPrecedingRange(wexpr) || HasFollowingRange(wexpr)) ? wexpr.orders[0].expression.get() : nullptr, + context, payload_count) { + // TODO: child may be a scalar, don't need to materialize the whole collection then + + // evaluate inner expressions of window functions, could be more complex + PrepareInputExpressions(wexpr.children, payload_executor, payload_chunk); + + auto types = payload_chunk.GetTypes(); + if (!types.empty()) { + payload_collection.Initialize(Allocator::Get(context), types); + } +} + +unique_ptr WindowExecutor::GetExecutorState() const { + return make_uniq(wexpr, context, payload_count, partition_mask, order_mask); +} + +//===--------------------------------------------------------------------===// +// WindowAggregateExecutor +//===--------------------------------------------------------------------===// +bool WindowAggregateExecutor::IsConstantAggregate() { + if (!wexpr.aggregate) { + return false; + } + + // COUNT(*) is already handled efficiently by segment trees. + if (wexpr.children.empty()) { + return false; + } + + /* + The default framing option is RANGE UNBOUNDED PRECEDING, which + is the same as RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT + ROW; it sets the frame to be all rows from the partition start + up through the current row's last peer (a row that the window's + ORDER BY clause considers equivalent to the current row; all + rows are peers if there is no ORDER BY). In general, UNBOUNDED + PRECEDING means that the frame starts with the first row of the + partition, and similarly UNBOUNDED FOLLOWING means that the + frame ends with the last row of the partition, regardless of + RANGE, ROWS or GROUPS mode. In ROWS mode, CURRENT ROW means that + the frame starts or ends with the current row; but in RANGE or + GROUPS mode it means that the frame starts or ends with the + current row's first or last peer in the ORDER BY ordering. The + offset PRECEDING and offset FOLLOWING options vary in meaning + depending on the frame mode. + */ + switch (wexpr.start) { + case WindowBoundary::UNBOUNDED_PRECEDING: + break; + case WindowBoundary::CURRENT_ROW_RANGE: + if (!wexpr.orders.empty()) { + return false; + } + break; + default: + return false; + } + + switch (wexpr.end) { + case WindowBoundary::UNBOUNDED_FOLLOWING: + break; + case WindowBoundary::CURRENT_ROW_RANGE: + if (!wexpr.orders.empty()) { + return false; + } + break; + default: + return false; + } + + return true; +} + +bool WindowAggregateExecutor::IsCustomAggregate() { + if (!wexpr.aggregate) { + return false; + } + + if (!AggregateObject(wexpr).function.window) { + return false; + } + + return (mode < WindowAggregationMode::COMBINE); +} + +void WindowExecutor::Evaluate(idx_t row_idx, DataChunk &input_chunk, Vector &result, + WindowExecutorState &lstate) const { + auto &lbstate = lstate.Cast(); + lbstate.UpdateBounds(row_idx, input_chunk, range); + + const auto count = input_chunk.size(); + EvaluateInternal(lstate, result, count, row_idx); + + result.Verify(count); +} + +WindowAggregateExecutor::WindowAggregateExecutor(BoundWindowExpression &wexpr, ClientContext &context, + const idx_t count, const ValidityMask &partition_mask, + const ValidityMask &order_mask, WindowAggregationMode mode) + : WindowExecutor(wexpr, context, count, partition_mask, order_mask), mode(mode), filter_executor(context) { + // TODO we could evaluate those expressions in parallel + + // Check for constant aggregate + if (IsConstantAggregate()) { + aggregator = + make_uniq(AggregateObject(wexpr), wexpr.return_type, partition_mask, count); + } else if (IsCustomAggregate()) { + aggregator = make_uniq(AggregateObject(wexpr), wexpr.return_type, count); + } else if (wexpr.aggregate) { + // build a segment tree for frame-adhering aggregates + // see http://www.vldb.org/pvldb/vol8/p1058-leis.pdf + aggregator = make_uniq(AggregateObject(wexpr), wexpr.return_type, count, mode); + } + + // evaluate the FILTER clause and stuff it into a large mask for compactness and reuse + if (wexpr.filter_expr) { + filter_executor.AddExpression(*wexpr.filter_expr); + filter_sel.Initialize(STANDARD_VECTOR_SIZE); + } +} + +void WindowAggregateExecutor::Sink(DataChunk &input_chunk, const idx_t input_idx, const idx_t total_count) { + idx_t filtered = 0; + SelectionVector *filtering = nullptr; + if (wexpr.filter_expr) { + filtering = &filter_sel; + filtered = filter_executor.SelectExpression(input_chunk, filter_sel); + } + + if (!wexpr.children.empty()) { + payload_chunk.Reset(); + payload_executor.Execute(input_chunk, payload_chunk); + payload_chunk.Verify(); + } else if (aggregator) { + // Zero-argument aggregate (e.g., COUNT(*) + payload_chunk.SetCardinality(input_chunk); + } + + D_ASSERT(aggregator); + aggregator->Sink(payload_chunk, filtering, filtered); + + WindowExecutor::Sink(input_chunk, input_idx, total_count); +} + +void WindowAggregateExecutor::Finalize() { + D_ASSERT(aggregator); + aggregator->Finalize(); +} + +class WindowAggregateState : public WindowExecutorBoundsState { +public: + WindowAggregateState(BoundWindowExpression &wexpr, ClientContext &context, const idx_t payload_count, + const ValidityMask &partition_mask, const ValidityMask &order_mask, + const WindowAggregator &aggregator) + : WindowExecutorBoundsState(wexpr, context, payload_count, partition_mask, order_mask), + aggregator_state(aggregator.GetLocalState()) { + } + +public: + unique_ptr aggregator_state; + + void NextRank(idx_t partition_begin, idx_t peer_begin, idx_t row_idx); +}; + +unique_ptr WindowAggregateExecutor::GetExecutorState() const { + return make_uniq(wexpr, context, payload_count, partition_mask, order_mask, *aggregator); +} + +void WindowAggregateExecutor::EvaluateInternal(WindowExecutorState &lstate, Vector &result, idx_t count, + idx_t row_idx) const { + auto &lastate = lstate.Cast(); + D_ASSERT(aggregator); + auto window_begin = FlatVector::GetData(lastate.bounds.data[WINDOW_BEGIN]); + auto window_end = FlatVector::GetData(lastate.bounds.data[WINDOW_END]); + aggregator->Evaluate(*lastate.aggregator_state, window_begin, window_end, result, count); +} + +//===--------------------------------------------------------------------===// +// WindowRowNumberExecutor +//===--------------------------------------------------------------------===// +WindowRowNumberExecutor::WindowRowNumberExecutor(BoundWindowExpression &wexpr, ClientContext &context, + const idx_t payload_count, const ValidityMask &partition_mask, + const ValidityMask &order_mask) + : WindowExecutor(wexpr, context, payload_count, partition_mask, order_mask) { +} + +void WindowRowNumberExecutor::EvaluateInternal(WindowExecutorState &lstate, Vector &result, idx_t count, + idx_t row_idx) const { + auto &lbstate = lstate.Cast(); + auto partition_begin = FlatVector::GetData(lbstate.bounds.data[PARTITION_BEGIN]); + auto rdata = FlatVector::GetData(result); + for (idx_t i = 0; i < count; ++i, ++row_idx) { + rdata[i] = row_idx - partition_begin[i] + 1; + } +} + +//===--------------------------------------------------------------------===// +// WindowPeerState +//===--------------------------------------------------------------------===// +class WindowPeerState : public WindowExecutorBoundsState { +public: + WindowPeerState(BoundWindowExpression &wexpr, ClientContext &context, const idx_t payload_count, + const ValidityMask &partition_mask, const ValidityMask &order_mask) + : WindowExecutorBoundsState(wexpr, context, payload_count, partition_mask, order_mask) { + } + +public: + uint64_t dense_rank = 1; + uint64_t rank_equal = 0; + uint64_t rank = 1; + + void NextRank(idx_t partition_begin, idx_t peer_begin, idx_t row_idx); +}; + +void WindowPeerState::NextRank(idx_t partition_begin, idx_t peer_begin, idx_t row_idx) { + if (partition_begin == row_idx) { + dense_rank = 1; + rank = 1; + rank_equal = 0; + } else if (peer_begin == row_idx) { + dense_rank++; + rank += rank_equal; + rank_equal = 0; + } + rank_equal++; +} + +WindowRankExecutor::WindowRankExecutor(BoundWindowExpression &wexpr, ClientContext &context, const idx_t payload_count, + const ValidityMask &partition_mask, const ValidityMask &order_mask) + : WindowExecutor(wexpr, context, payload_count, partition_mask, order_mask) { +} + +unique_ptr WindowRankExecutor::GetExecutorState() const { + return make_uniq(wexpr, context, payload_count, partition_mask, order_mask); +} + +void WindowRankExecutor::EvaluateInternal(WindowExecutorState &lstate, Vector &result, idx_t count, + idx_t row_idx) const { + auto &lpeer = lstate.Cast(); + auto partition_begin = FlatVector::GetData(lpeer.bounds.data[PARTITION_BEGIN]); + auto peer_begin = FlatVector::GetData(lpeer.bounds.data[PEER_BEGIN]); + auto rdata = FlatVector::GetData(result); + + // Reset to "previous" row + lpeer.rank = (peer_begin[0] - partition_begin[0]) + 1; + lpeer.rank_equal = (row_idx - peer_begin[0]); + + for (idx_t i = 0; i < count; ++i, ++row_idx) { + lpeer.NextRank(partition_begin[i], peer_begin[i], row_idx); + rdata[i] = lpeer.rank; + } +} + +WindowDenseRankExecutor::WindowDenseRankExecutor(BoundWindowExpression &wexpr, ClientContext &context, + const idx_t payload_count, const ValidityMask &partition_mask, + const ValidityMask &order_mask) + : WindowExecutor(wexpr, context, payload_count, partition_mask, order_mask) { +} + +unique_ptr WindowDenseRankExecutor::GetExecutorState() const { + return make_uniq(wexpr, context, payload_count, partition_mask, order_mask); +} + +void WindowDenseRankExecutor::EvaluateInternal(WindowExecutorState &lstate, Vector &result, idx_t count, + idx_t row_idx) const { + auto &lpeer = lstate.Cast(); + auto partition_begin = FlatVector::GetData(lpeer.bounds.data[PARTITION_BEGIN]); + auto peer_begin = FlatVector::GetData(lpeer.bounds.data[PEER_BEGIN]); + auto rdata = FlatVector::GetData(result); + + // Reset to "previous" row + lpeer.rank = (peer_begin[0] - partition_begin[0]) + 1; + lpeer.rank_equal = (row_idx - peer_begin[0]); + + // The previous dense rank is the number of order mask bits in [partition_begin, row_idx) + lpeer.dense_rank = 0; + + auto order_begin = partition_begin[0]; + idx_t begin_idx; + idx_t begin_offset; + order_mask.GetEntryIndex(order_begin, begin_idx, begin_offset); + + auto order_end = row_idx; + idx_t end_idx; + idx_t end_offset; + order_mask.GetEntryIndex(order_end, end_idx, end_offset); + + // If they are in the same entry, just loop + if (begin_idx == end_idx) { + const auto entry = order_mask.GetValidityEntry(begin_idx); + for (; begin_offset < end_offset; ++begin_offset) { + lpeer.dense_rank += order_mask.RowIsValid(entry, begin_offset); + } + } else { + // Count the ragged bits at the start of the partition + if (begin_offset) { + const auto entry = order_mask.GetValidityEntry(begin_idx); + for (; begin_offset < order_mask.BITS_PER_VALUE; ++begin_offset) { + lpeer.dense_rank += order_mask.RowIsValid(entry, begin_offset); + ++order_begin; + } + ++begin_idx; + } + + // Count the the aligned bits. + ValidityMask tail_mask(order_mask.GetData() + begin_idx); + lpeer.dense_rank += tail_mask.CountValid(order_end - order_begin); + } + + for (idx_t i = 0; i < count; ++i, ++row_idx) { + lpeer.NextRank(partition_begin[i], peer_begin[i], row_idx); + rdata[i] = lpeer.dense_rank; + } +} + +WindowPercentRankExecutor::WindowPercentRankExecutor(BoundWindowExpression &wexpr, ClientContext &context, + const idx_t payload_count, const ValidityMask &partition_mask, + const ValidityMask &order_mask) + : WindowExecutor(wexpr, context, payload_count, partition_mask, order_mask) { +} + +unique_ptr WindowPercentRankExecutor::GetExecutorState() const { + return make_uniq(wexpr, context, payload_count, partition_mask, order_mask); +} + +void WindowPercentRankExecutor::EvaluateInternal(WindowExecutorState &lstate, Vector &result, idx_t count, + idx_t row_idx) const { + auto &lpeer = lstate.Cast(); + auto partition_begin = FlatVector::GetData(lpeer.bounds.data[PARTITION_BEGIN]); + auto partition_end = FlatVector::GetData(lpeer.bounds.data[PARTITION_END]); + auto peer_begin = FlatVector::GetData(lpeer.bounds.data[PEER_BEGIN]); + auto rdata = FlatVector::GetData(result); + + // Reset to "previous" row + lpeer.rank = (peer_begin[0] - partition_begin[0]) + 1; + lpeer.rank_equal = (row_idx - peer_begin[0]); + + for (idx_t i = 0; i < count; ++i, ++row_idx) { + lpeer.NextRank(partition_begin[i], peer_begin[i], row_idx); + int64_t denom = partition_end[i] - partition_begin[i] - 1; + double percent_rank = denom > 0 ? ((double)lpeer.rank - 1) / denom : 0; + rdata[i] = percent_rank; + } +} + +//===--------------------------------------------------------------------===// +// WindowCumeDistExecutor +//===--------------------------------------------------------------------===// +WindowCumeDistExecutor::WindowCumeDistExecutor(BoundWindowExpression &wexpr, ClientContext &context, + const idx_t payload_count, const ValidityMask &partition_mask, + const ValidityMask &order_mask) + : WindowExecutor(wexpr, context, payload_count, partition_mask, order_mask) { +} + +void WindowCumeDistExecutor::EvaluateInternal(WindowExecutorState &lstate, Vector &result, idx_t count, + idx_t row_idx) const { + auto &lbstate = lstate.Cast(); + auto partition_begin = FlatVector::GetData(lbstate.bounds.data[PARTITION_BEGIN]); + auto partition_end = FlatVector::GetData(lbstate.bounds.data[PARTITION_END]); + auto peer_end = FlatVector::GetData(lbstate.bounds.data[PEER_END]); + auto rdata = FlatVector::GetData(result); + for (idx_t i = 0; i < count; ++i, ++row_idx) { + int64_t denom = partition_end[i] - partition_begin[i]; + double cume_dist = denom > 0 ? ((double)(peer_end[i] - partition_begin[i])) / denom : 0; + rdata[i] = cume_dist; + } +} + +//===--------------------------------------------------------------------===// +// WindowValueExecutor +//===--------------------------------------------------------------------===// +WindowValueExecutor::WindowValueExecutor(BoundWindowExpression &wexpr, ClientContext &context, + const idx_t payload_count, const ValidityMask &partition_mask, + const ValidityMask &order_mask) + : WindowExecutor(wexpr, context, payload_count, partition_mask, order_mask) { +} + +WindowNtileExecutor::WindowNtileExecutor(BoundWindowExpression &wexpr, ClientContext &context, + const idx_t payload_count, const ValidityMask &partition_mask, + const ValidityMask &order_mask) + : WindowValueExecutor(wexpr, context, payload_count, partition_mask, order_mask) { +} + +void WindowValueExecutor::Sink(DataChunk &input_chunk, const idx_t input_idx, const idx_t total_count) { + // Single pass over the input to produce the global data. + // Vectorisation for the win... + + // Set up a validity mask for IGNORE NULLS + bool check_nulls = false; + if (wexpr.ignore_nulls) { + switch (wexpr.type) { + case ExpressionType::WINDOW_LEAD: + case ExpressionType::WINDOW_LAG: + case ExpressionType::WINDOW_FIRST_VALUE: + case ExpressionType::WINDOW_LAST_VALUE: + case ExpressionType::WINDOW_NTH_VALUE: + check_nulls = true; + break; + default: + break; + } + } + + if (!wexpr.children.empty()) { + payload_chunk.Reset(); + payload_executor.Execute(input_chunk, payload_chunk); + payload_chunk.Verify(); + payload_collection.Append(payload_chunk, true); + + // process payload chunks while they are still piping hot + if (check_nulls) { + const auto count = input_chunk.size(); + + UnifiedVectorFormat vdata; + payload_chunk.data[0].ToUnifiedFormat(count, vdata); + if (!vdata.validity.AllValid()) { + // Lazily materialise the contents when we find the first NULL + if (ignore_nulls.AllValid()) { + ignore_nulls.Initialize(total_count); + } + // Write to the current position + if (input_idx % ValidityMask::BITS_PER_VALUE == 0) { + // If we are at the edge of an output entry, just copy the entries + auto dst = ignore_nulls.GetData() + ignore_nulls.EntryCount(input_idx); + auto src = vdata.validity.GetData(); + for (auto entry_count = vdata.validity.EntryCount(count); entry_count-- > 0;) { + *dst++ = *src++; + } + } else { + // If not, we have ragged data and need to copy one bit at a time. + for (idx_t i = 0; i < count; ++i) { + ignore_nulls.Set(input_idx + i, vdata.validity.RowIsValid(i)); + } + } + } + } + } + + WindowExecutor::Sink(input_chunk, input_idx, total_count); +} + +void WindowNtileExecutor::EvaluateInternal(WindowExecutorState &lstate, Vector &result, idx_t count, + idx_t row_idx) const { + D_ASSERT(payload_collection.ColumnCount() == 1); + auto &lbstate = lstate.Cast(); + auto partition_begin = FlatVector::GetData(lbstate.bounds.data[PARTITION_BEGIN]); + auto partition_end = FlatVector::GetData(lbstate.bounds.data[PARTITION_END]); + auto rdata = FlatVector::GetData(result); + for (idx_t i = 0; i < count; ++i, ++row_idx) { + if (CellIsNull(payload_collection, 0, row_idx)) { + FlatVector::SetNull(result, i, true); + } else { + auto n_param = GetCell(payload_collection, 0, row_idx); + if (n_param < 1) { + throw InvalidInputException("Argument for ntile must be greater than zero"); + } + // With thanks from SQLite's ntileValueFunc() + int64_t n_total = partition_end[i] - partition_begin[i]; + if (n_param > n_total) { + // more groups allowed than we have values + // map every entry to a unique group + n_param = n_total; + } + int64_t n_size = (n_total / n_param); + // find the row idx within the group + D_ASSERT(row_idx >= partition_begin[i]); + int64_t adjusted_row_idx = row_idx - partition_begin[i]; + // now compute the ntile + int64_t n_large = n_total - n_param * n_size; + int64_t i_small = n_large * (n_size + 1); + int64_t result_ntile; + + D_ASSERT((n_large * (n_size + 1) + (n_param - n_large) * n_size) == n_total); + + if (adjusted_row_idx < i_small) { + result_ntile = 1 + adjusted_row_idx / (n_size + 1); + } else { + result_ntile = 1 + n_large + (adjusted_row_idx - i_small) / n_size; + } + // result has to be between [1, NTILE] + D_ASSERT(result_ntile >= 1 && result_ntile <= n_param); + rdata[i] = result_ntile; + } + } +} + +//===--------------------------------------------------------------------===// +// WindowLeadLagState +//===--------------------------------------------------------------------===// +class WindowLeadLagState : public WindowExecutorBoundsState { +public: + WindowLeadLagState(BoundWindowExpression &wexpr, ClientContext &context, const idx_t payload_count, + const ValidityMask &partition_mask, const ValidityMask &order_mask) + : WindowExecutorBoundsState(wexpr, context, payload_count, partition_mask, order_mask), + leadlag_offset(wexpr.offset_expr.get(), context), leadlag_default(wexpr.default_expr.get(), context) { + } + + void UpdateBounds(idx_t row_idx, DataChunk &input_chunk, const WindowInputColumn &range) override; + +public: + // LEAD/LAG Evaluation + WindowInputExpression leadlag_offset; + WindowInputExpression leadlag_default; +}; + +void WindowLeadLagState::UpdateBounds(idx_t row_idx, DataChunk &input_chunk, const WindowInputColumn &range) { + // Evaluate the row-level arguments + leadlag_offset.Execute(input_chunk); + leadlag_default.Execute(input_chunk); + + WindowExecutorBoundsState::UpdateBounds(row_idx, input_chunk, range); +} + +WindowLeadLagExecutor::WindowLeadLagExecutor(BoundWindowExpression &wexpr, ClientContext &context, + const idx_t payload_count, const ValidityMask &partition_mask, + const ValidityMask &order_mask) + : WindowValueExecutor(wexpr, context, payload_count, partition_mask, order_mask) { +} + +unique_ptr WindowLeadLagExecutor::GetExecutorState() const { + return make_uniq(wexpr, context, payload_count, partition_mask, order_mask); +} + +void WindowLeadLagExecutor::EvaluateInternal(WindowExecutorState &lstate, Vector &result, idx_t count, + idx_t row_idx) const { + auto &llstate = lstate.Cast(); + + auto partition_begin = FlatVector::GetData(llstate.bounds.data[PARTITION_BEGIN]); + auto partition_end = FlatVector::GetData(llstate.bounds.data[PARTITION_END]); + for (idx_t i = 0; i < count; ++i, ++row_idx) { + int64_t offset = 1; + if (wexpr.offset_expr) { + offset = llstate.leadlag_offset.GetCell(i); + } + int64_t val_idx = (int64_t)row_idx; + if (wexpr.type == ExpressionType::WINDOW_LEAD) { + val_idx += offset; + } else { + val_idx -= offset; + } + + idx_t delta = 0; + if (val_idx < (int64_t)row_idx) { + // Count backwards + delta = idx_t(row_idx - val_idx); + val_idx = FindPrevStart(ignore_nulls, partition_begin[i], row_idx, delta); + } else if (val_idx > (int64_t)row_idx) { + delta = idx_t(val_idx - row_idx); + val_idx = FindNextStart(ignore_nulls, row_idx + 1, partition_end[i], delta); + } + // else offset is zero, so don't move. + + if (!delta) { + CopyCell(payload_collection, 0, val_idx, result, i); + } else if (wexpr.default_expr) { + llstate.leadlag_default.CopyCell(result, i); + } else { + FlatVector::SetNull(result, i, true); + } + } +} + +WindowFirstValueExecutor::WindowFirstValueExecutor(BoundWindowExpression &wexpr, ClientContext &context, + const idx_t payload_count, const ValidityMask &partition_mask, + const ValidityMask &order_mask) + : WindowValueExecutor(wexpr, context, payload_count, partition_mask, order_mask) { +} + +void WindowFirstValueExecutor::EvaluateInternal(WindowExecutorState &lstate, Vector &result, idx_t count, + idx_t row_idx) const { + auto &lbstate = lstate.Cast(); + auto window_begin = FlatVector::GetData(lbstate.bounds.data[WINDOW_BEGIN]); + auto window_end = FlatVector::GetData(lbstate.bounds.data[WINDOW_END]); + auto &rmask = FlatVector::Validity(result); + for (idx_t i = 0; i < count; ++i, ++row_idx) { + if (window_begin[i] >= window_end[i]) { + rmask.SetInvalid(i); + continue; + } + // Same as NTH_VALUE(..., 1) + idx_t n = 1; + const auto first_idx = FindNextStart(ignore_nulls, window_begin[i], window_end[i], n); + if (!n) { + CopyCell(payload_collection, 0, first_idx, result, i); + } else { + FlatVector::SetNull(result, i, true); + } + } +} + +WindowLastValueExecutor::WindowLastValueExecutor(BoundWindowExpression &wexpr, ClientContext &context, + const idx_t payload_count, const ValidityMask &partition_mask, + const ValidityMask &order_mask) + : WindowValueExecutor(wexpr, context, payload_count, partition_mask, order_mask) { +} + +void WindowLastValueExecutor::EvaluateInternal(WindowExecutorState &lstate, Vector &result, idx_t count, + idx_t row_idx) const { + auto &lbstate = lstate.Cast(); + auto window_begin = FlatVector::GetData(lbstate.bounds.data[WINDOW_BEGIN]); + auto window_end = FlatVector::GetData(lbstate.bounds.data[WINDOW_END]); + auto &rmask = FlatVector::Validity(result); + for (idx_t i = 0; i < count; ++i, ++row_idx) { + if (window_begin[i] >= window_end[i]) { + rmask.SetInvalid(i); + continue; + } + idx_t n = 1; + const auto last_idx = FindPrevStart(ignore_nulls, window_begin[i], window_end[i], n); + if (!n) { + CopyCell(payload_collection, 0, last_idx, result, i); + } else { + FlatVector::SetNull(result, i, true); + } + } +} + +WindowNthValueExecutor::WindowNthValueExecutor(BoundWindowExpression &wexpr, ClientContext &context, + const idx_t payload_count, const ValidityMask &partition_mask, + const ValidityMask &order_mask) + : WindowValueExecutor(wexpr, context, payload_count, partition_mask, order_mask) { +} + +void WindowNthValueExecutor::EvaluateInternal(WindowExecutorState &lstate, Vector &result, idx_t count, + idx_t row_idx) const { + D_ASSERT(payload_collection.ColumnCount() == 2); + + auto &lbstate = lstate.Cast(); + auto window_begin = FlatVector::GetData(lbstate.bounds.data[WINDOW_BEGIN]); + auto window_end = FlatVector::GetData(lbstate.bounds.data[WINDOW_END]); + auto &rmask = FlatVector::Validity(result); + for (idx_t i = 0; i < count; ++i, ++row_idx) { + if (window_begin[i] >= window_end[i]) { + rmask.SetInvalid(i); + continue; + } + // Returns value evaluated at the row that is the n'th row of the window frame (counting from 1); + // returns NULL if there is no such row. + if (CellIsNull(payload_collection, 1, row_idx)) { + FlatVector::SetNull(result, i, true); + } else { + auto n_param = GetCell(payload_collection, 1, row_idx); + if (n_param < 1) { + FlatVector::SetNull(result, i, true); + } else { + auto n = idx_t(n_param); + const auto nth_index = FindNextStart(ignore_nulls, window_begin[i], window_end[i], n); + if (!n) { + CopyCell(payload_collection, 0, nth_index, result, i); + } else { + FlatVector::SetNull(result, i, true); + } + } + } + } +} + +} // namespace duckdb diff --git a/src/execution/window_segment_tree.cpp b/src/execution/window_segment_tree.cpp index c0d5f353f89d..2a66399c9e4a 100644 --- a/src/execution/window_segment_tree.cpp +++ b/src/execution/window_segment_tree.cpp @@ -9,34 +9,20 @@ namespace duckdb { //===--------------------------------------------------------------------===// -// WindowAggregateState +// WindowAggregator //===--------------------------------------------------------------------===// -WindowAggregateState::WindowAggregateState(AggregateObject aggr, const LogicalType &result_type_p, - idx_t partition_count_p) - : aggr(std::move(aggr)), result_type(result_type_p), partition_count(partition_count_p), - state_size(this->aggr.function.state_size()), state(state_size), - statef(Value::POINTER(CastPointerToValue(state.data()))), filter_pos(0), - allocator(Allocator::DefaultAllocator()) { - statef.SetVectorType(VectorType::FLAT_VECTOR); // Prevent conversion of results to constants +WindowAggregatorState::WindowAggregatorState() : allocator(Allocator::DefaultAllocator()) { } -WindowAggregateState::~WindowAggregateState() { +WindowAggregator::WindowAggregator(AggregateObject aggr, const LogicalType &result_type_p, idx_t partition_count_p) + : aggr(std::move(aggr)), result_type(result_type_p), partition_count(partition_count_p), + state_size(aggr.function.state_size()), filter_pos(0) { } -void WindowAggregateState::AggregateInit() { - aggr.function.initialize(state.data()); +WindowAggregator::~WindowAggregator() { } -void WindowAggregateState::AggegateFinal(Vector &result, idx_t rid) { - AggregateInputData aggr_input_data(aggr.GetFunctionData(), allocator); - aggr.function.finalize(statef, aggr_input_data, result, 1, rid); - - if (aggr.function.destructor) { - aggr.function.destructor(statef, aggr_input_data, 1); - } -} - -void WindowAggregateState::Sink(DataChunk &payload_chunk, SelectionVector *filter_sel, idx_t filtered) { +void WindowAggregator::Sink(DataChunk &payload_chunk, SelectionVector *filter_sel, idx_t filtered) { if (!inputs.ColumnCount() && payload_chunk.ColumnCount()) { inputs.Initialize(Allocator::DefaultAllocator(), payload_chunk.GetTypes()); } @@ -57,34 +43,19 @@ void WindowAggregateState::Sink(DataChunk &payload_chunk, SelectionVector *filte } } -void WindowAggregateState::Finalize() { -} - -void WindowAggregateState::Compute(Vector &result, idx_t rid, idx_t start, idx_t end) { -} - -void WindowAggregateState::Evaluate(const idx_t *begins, const idx_t *ends, Vector &result, idx_t count) { - auto &rmask = FlatVector::Validity(result); - for (idx_t i = 0; i < count; ++i) { - const auto begin = begins[i]; - const auto end = ends[i]; - if (begin >= end) { - rmask.SetInvalid(i); - continue; - } - Compute(result, i, begin, end); - } +void WindowAggregator::Finalize() { } //===--------------------------------------------------------------------===// // WindowConstantAggregate //===--------------------------------------------------------------------===// +WindowConstantAggregator::WindowConstantAggregator(AggregateObject aggr, const LogicalType &result_type, + const ValidityMask &partition_mask, const idx_t count) + : WindowAggregator(std::move(aggr), result_type, count), partition(0), row(0), state(state_size), + statep(Value::POINTER(CastPointerToValue(state.data()))), + statef(Value::POINTER(CastPointerToValue(state.data()))) { -WindowConstantAggregate::WindowConstantAggregate(AggregateObject aggr, const LogicalType &result_type, - const ValidityMask &partition_mask, const idx_t count) - : WindowAggregateState(std::move(aggr), result_type, count), partition(0), row(0), - statep(Value::POINTER(CastPointerToValue(state.data()))) { - matches.Initialize(); + statef.SetVectorType(VectorType::FLAT_VECTOR); // Prevent conversion of results to constants // Locate the partition boundaries if (partition_mask.AllValid()) { @@ -116,11 +87,27 @@ WindowConstantAggregate::WindowConstantAggregate(AggregateObject aggr, const Log results = make_uniq(result_type, partition_offsets.size()); partition_offsets.emplace_back(count); + // Create an aggregate state for intermediate aggregates + gstate = make_uniq(); + // Start the first aggregate AggregateInit(); } -void WindowConstantAggregate::Sink(DataChunk &payload_chunk, SelectionVector *filter_sel, idx_t filtered) { +void WindowConstantAggregator::AggregateInit() { + aggr.function.initialize(state.data()); +} + +void WindowConstantAggregator::AggegateFinal(Vector &result, idx_t rid) { + AggregateInputData aggr_input_data(aggr.GetFunctionData(), gstate->allocator); + aggr.function.finalize(statef, aggr_input_data, result, 1, rid); + + if (aggr.function.destructor) { + aggr.function.destructor(statef, aggr_input_data, 1); + } +} + +void WindowConstantAggregator::Sink(DataChunk &payload_chunk, SelectionVector *filter_sel, idx_t filtered) { const auto chunk_begin = row; const auto chunk_end = chunk_begin + payload_chunk.size(); @@ -128,7 +115,7 @@ void WindowConstantAggregate::Sink(DataChunk &payload_chunk, SelectionVector *fi inputs.Initialize(Allocator::DefaultAllocator(), payload_chunk.GetTypes()); } - AggregateInputData aggr_input_data(aggr.GetFunctionData(), allocator); + AggregateInputData aggr_input_data(aggr.GetFunctionData(), gstate->allocator); idx_t begin = 0; idx_t filter_idx = 0; auto partition_end = partition_offsets[partition + 1]; @@ -193,64 +180,127 @@ void WindowConstantAggregate::Sink(DataChunk &payload_chunk, SelectionVector *fi } } -void WindowConstantAggregate::Finalize() { +void WindowConstantAggregator::Finalize() { AggegateFinal(*results, partition++); +} - partition = 0; - row = 0; +class WindowConstantAggregatorState : public WindowAggregatorState { +public: + WindowConstantAggregatorState() : partition(0) { + matches.Initialize(); + } + ~WindowConstantAggregatorState() override { + } + +public: + //! The current result partition being read + idx_t partition; + //! Shared SV for evaluation + SelectionVector matches; +}; + +unique_ptr WindowConstantAggregator::GetLocalState() const { + return make_uniq(); } -void WindowConstantAggregate::Evaluate(const idx_t *begins, const idx_t *ends, Vector &target, idx_t count) { +void WindowConstantAggregator::Evaluate(WindowAggregatorState &lstate, const idx_t *begins, const idx_t *ends, + Vector &target, idx_t count) const { // Chunk up the constants and copy them one at a time + auto &lcstate = lstate.Cast(); idx_t matched = 0; idx_t target_offset = 0; for (idx_t i = 0; i < count; ++i) { const auto begin = begins[i]; // Find the partition containing [begin, end) - while (partition_offsets[partition + 1] <= begin) { + while (partition_offsets[lcstate.partition + 1] <= begin) { // Flush the previous partition's data if (matched) { - VectorOperations::Copy(*results, target, matches, matched, 0, target_offset); + VectorOperations::Copy(*results, target, lcstate.matches, matched, 0, target_offset); target_offset += matched; matched = 0; } - ++partition; + ++lcstate.partition; } - matches.set_index(matched++, partition); + lcstate.matches.set_index(matched++, lcstate.partition); } // Flush the last partition if (matched) { - VectorOperations::Copy(*results, target, matches, matched, 0, target_offset); + VectorOperations::Copy(*results, target, lcstate.matches, matched, 0, target_offset); } } //===--------------------------------------------------------------------===// -// WindowCustomAggregate +// WindowCustomAggregator //===--------------------------------------------------------------------===// -WindowCustomAggregate::WindowCustomAggregate(AggregateObject aggr, const LogicalType &result_type, idx_t count) - : WindowAggregateState(std::move(aggr), result_type, count) { +WindowCustomAggregator::WindowCustomAggregator(AggregateObject aggr, const LogicalType &result_type, idx_t count) + : WindowAggregator(std::move(aggr), result_type, count) { +} + +WindowCustomAggregator::~WindowCustomAggregator() { +} + +class WindowCustomAggregatorState : public WindowAggregatorState { +public: + explicit WindowCustomAggregatorState(const AggregateObject &aggr, DataChunk &inputs); + ~WindowCustomAggregatorState() override; + +public: + //! The aggregate function + const AggregateObject &aggr; + //! The aggregate function + DataChunk &inputs; + //! Data pointer that contains a single state, shared by all the custom evaluators + vector state; + //! Reused result state container for the window functions + Vector statef; + //! The frame boundaries, used for the window functions + FrameBounds frame; +}; + +WindowCustomAggregatorState::WindowCustomAggregatorState(const AggregateObject &aggr, DataChunk &inputs) + : aggr(aggr), inputs(inputs), state(aggr.function.state_size()), + statef(Value::POINTER(CastPointerToValue(state.data()))), frame(0, 0) { // if we have a frame-by-frame method, share the single state - AggregateInit(); + aggr.function.initialize(state.data()); } -WindowCustomAggregate::~WindowCustomAggregate() { +WindowCustomAggregatorState::~WindowCustomAggregatorState() { if (aggr.function.destructor) { AggregateInputData aggr_input_data(aggr.GetFunctionData(), allocator); aggr.function.destructor(statef, aggr_input_data, 1); } } -void WindowCustomAggregate::Compute(Vector &result, idx_t rid, idx_t begin, idx_t end) { - // Frame boundaries - auto prev = frame; - frame = FrameBounds(begin, end); +unique_ptr WindowCustomAggregator::GetLocalState() const { + return make_uniq(aggr, const_cast(inputs)); +} - // Extract the range - AggregateInputData aggr_input_data(aggr.GetFunctionData(), allocator); - aggr.function.window(inputs.data.data(), filter_mask, aggr_input_data, inputs.ColumnCount(), state.data(), frame, - prev, result, rid, 0); +void WindowCustomAggregator::Evaluate(WindowAggregatorState &lstate, const idx_t *begins, const idx_t *ends, + Vector &result, idx_t count) const { + // TODO: window should take a const Vector* + auto &lcstate = lstate.Cast(); + auto &frame = lcstate.frame; + auto params = lcstate.inputs.data.data(); + auto &rmask = FlatVector::Validity(result); + for (idx_t i = 0; i < count; ++i) { + const auto begin = begins[i]; + const auto end = ends[i]; + if (begin >= end) { + rmask.SetInvalid(i); + continue; + } + + // Frame boundaries + auto prev = frame; + frame = FrameBounds(begin, end); + + // Extract the range + AggregateInputData aggr_input_data(aggr.GetFunctionData(), lstate.allocator); + aggr.function.window(params, filter_mask, aggr_input_data, inputs.ColumnCount(), lcstate.state.data(), frame, + prev, result, i, 0); + } } //===--------------------------------------------------------------------===// @@ -258,28 +308,12 @@ void WindowCustomAggregate::Compute(Vector &result, idx_t rid, idx_t begin, idx_ //===--------------------------------------------------------------------===// WindowSegmentTree::WindowSegmentTree(AggregateObject aggr, const LogicalType &result_type, idx_t count, WindowAggregationMode mode_p) - : WindowAggregateState(std::move(aggr), result_type, count), - statep(Value::POINTER(CastPointerToValue(state.data()))), frame(0, 0), statel(LogicalType::POINTER), - flush_count(0), internal_nodes(0), mode(mode_p), allocator(Allocator::DefaultAllocator()) { - state.resize(state_size * STANDARD_VECTOR_SIZE); - statep.Flatten(STANDARD_VECTOR_SIZE); - - // Build the finalise vector that just points to the result states - data_ptr_t state_ptr = state.data(); - D_ASSERT(statef.GetVectorType() == VectorType::FLAT_VECTOR); - statef.SetVectorType(VectorType::CONSTANT_VECTOR); - statef.Flatten(STANDARD_VECTOR_SIZE); - auto fdata = FlatVector::GetData(statef); - for (idx_t i = 0; i < STANDARD_VECTOR_SIZE; ++i) { - fdata[i] = state_ptr; - state_ptr += state_size; - } + : WindowAggregator(std::move(aggr), result_type, count), internal_nodes(0), mode(mode_p) { } void WindowSegmentTree::Finalize() { + gstate = GetLocalState(); if (inputs.ColumnCount() > 0) { - leaves.Initialize(Allocator::DefaultAllocator(), inputs.GetTypes()); - filter_sel.Initialize(); if (aggr.function.combine && UseCombineAPI()) { ConstructTree(); } @@ -291,7 +325,7 @@ WindowSegmentTree::~WindowSegmentTree() { // nothing to destroy return; } - AggregateInputData aggr_input_data(aggr.GetFunctionData(), allocator); + AggregateInputData aggr_input_data(aggr.GetFunctionData(), gstate->allocator); // call the destructor for all the intermediate states data_ptr_t address_data[STANDARD_VECTOR_SIZE]; Vector addresses(LogicalType::POINTER, data_ptr_cast(address_data)); @@ -308,7 +342,72 @@ WindowSegmentTree::~WindowSegmentTree() { } } -void WindowSegmentTree::FlushStates(bool combining) { +class WindowSegmentTreeState : public WindowAggregatorState { +public: + WindowSegmentTreeState(const AggregateObject &aggr, DataChunk &inputs, const ValidityMask &filter_mask); + ~WindowSegmentTreeState() override; + + void FlushStates(bool combining); + void ExtractFrame(idx_t begin, idx_t end, data_ptr_t current_state); + void WindowSegmentValue(const WindowSegmentTree &tree, idx_t l_idx, idx_t begin, idx_t end, + data_ptr_t current_state); + void Finalize(Vector &result, idx_t count); + +public: + //! The aggregate function + const AggregateObject &aggr; + //! The aggregate function + DataChunk &inputs; + //! The filtered rows in inputs + const ValidityMask &filter_mask; + //! The size of a single aggregate state + const idx_t state_size; + //! Data pointer that contains a single state, used for intermediate window segment aggregation + vector state; + //! Input data chunk, used for leaf segment aggregation + DataChunk leaves; + //! The filtered rows in inputs. + SelectionVector filter_sel; + //! A vector of pointers to "state", used for intermediate window segment aggregation + Vector statep; + //! Reused state pointers for combining segment tree levels + Vector statel; + //! Reused result state container for the window functions + Vector statef; + //! Count of buffered values + idx_t flush_count; +}; + +WindowSegmentTreeState::WindowSegmentTreeState(const AggregateObject &aggr, DataChunk &inputs, + const ValidityMask &filter_mask) + : aggr(aggr), inputs(inputs), filter_mask(filter_mask), state_size(aggr.function.state_size()), + state(state_size * STANDARD_VECTOR_SIZE), statep(LogicalType::POINTER), statel(LogicalType::POINTER), + statef(LogicalType::POINTER), flush_count(0) { + if (inputs.ColumnCount() > 0) { + leaves.Initialize(Allocator::DefaultAllocator(), inputs.GetTypes()); + filter_sel.Initialize(); + } + + // Build the finalise vector that just points to the result states + data_ptr_t state_ptr = state.data(); + D_ASSERT(statef.GetVectorType() == VectorType::FLAT_VECTOR); + statef.SetVectorType(VectorType::CONSTANT_VECTOR); + statef.Flatten(STANDARD_VECTOR_SIZE); + auto fdata = FlatVector::GetData(statef); + for (idx_t i = 0; i < STANDARD_VECTOR_SIZE; ++i) { + fdata[i] = state_ptr; + state_ptr += state_size; + } +} + +WindowSegmentTreeState::~WindowSegmentTreeState() { +} + +unique_ptr WindowSegmentTree::GetLocalState() const { + return make_uniq(aggr, const_cast(inputs), filter_mask); +} + +void WindowSegmentTreeState::FlushStates(bool combining) { if (!flush_count) { return; } @@ -326,9 +425,8 @@ void WindowSegmentTree::FlushStates(bool combining) { flush_count = 0; } -void WindowSegmentTree::ExtractFrame(idx_t begin, idx_t end, data_ptr_t state_ptr) { +void WindowSegmentTreeState::ExtractFrame(idx_t begin, idx_t end, data_ptr_t state_ptr) { const auto count = end - begin; - D_ASSERT(count <= TREE_FANOUT); // If we are not filtering, // just update the shared dictionary selection to the range @@ -355,7 +453,8 @@ void WindowSegmentTree::ExtractFrame(idx_t begin, idx_t end, data_ptr_t state_pt } } -void WindowSegmentTree::WindowSegmentValue(idx_t l_idx, idx_t begin, idx_t end, data_ptr_t state_ptr) { +void WindowSegmentTreeState::WindowSegmentValue(const WindowSegmentTree &tree, idx_t l_idx, idx_t begin, idx_t end, + data_ptr_t state_ptr) { D_ASSERT(begin <= end); if (begin == end || inputs.ColumnCount() == 0) { return; @@ -366,7 +465,7 @@ void WindowSegmentTree::WindowSegmentValue(idx_t l_idx, idx_t begin, idx_t end, ExtractFrame(begin, end, state_ptr); } else { // find out where the states begin - data_ptr_t begin_ptr = levels_flat_native.get() + state_size * (begin + levels_flat_start[l_idx - 1]); + auto begin_ptr = tree.levels_flat_native.get() + state_size * (begin + tree.levels_flat_start[l_idx - 1]); // set up a vector of pointers that point towards the set of states auto ldata = FlatVector::GetData(statel); auto pdata = FlatVector::GetData(statep); @@ -380,10 +479,23 @@ void WindowSegmentTree::WindowSegmentValue(idx_t l_idx, idx_t begin, idx_t end, } } } +void WindowSegmentTreeState::Finalize(Vector &result, idx_t count) { + // Finalise the result aggregates + AggregateInputData aggr_input_data(aggr.GetFunctionData(), allocator); + aggr.function.finalize(statef, aggr_input_data, result, count, 0); + + // Destruct the result aggregates + if (aggr.function.destructor) { + aggr.function.destructor(statef, aggr_input_data, count); + } +} void WindowSegmentTree::ConstructTree() { D_ASSERT(inputs.ColumnCount() > 0); + // Use a temporary scan state to build the tree + auto >state = gstate->Cast(); + // compute space required to store internal nodes of segment tree internal_nodes = 0; idx_t level_nodes = inputs.size(); @@ -405,8 +517,8 @@ void WindowSegmentTree::ConstructTree() { // compute the aggregate for this entry in the segment tree data_ptr_t state_ptr = levels_flat_native.get() + (levels_flat_offset * state_size); aggr.function.initialize(state_ptr); - WindowSegmentValue(level_current, pos, MinValue(level_size, pos + TREE_FANOUT), state_ptr); - FlushStates(level_current > 0); + gtstate.WindowSegmentValue(*this, level_current, pos, MinValue(level_size, pos + TREE_FANOUT), state_ptr); + gtstate.FlushStates(level_current > 0); levels_flat_offset++; } @@ -421,17 +533,19 @@ void WindowSegmentTree::ConstructTree() { } } -void WindowSegmentTree::Evaluate(const idx_t *begins, const idx_t *ends, Vector &result, idx_t count) { +void WindowSegmentTree::Evaluate(WindowAggregatorState &lstate, const idx_t *begins, const idx_t *ends, Vector &result, + idx_t count) const { + auto <state = lstate.Cast(); const auto cant_combine = (!aggr.function.combine || !UseCombineAPI()); - auto fdata = FlatVector::GetData(statef); + auto fdata = FlatVector::GetData(ltstate.statef); // First pass: aggregate the segment tree nodes // Share adjacent identical states // We do this first because we want to share only tree aggregations idx_t prev_begin = 1; idx_t prev_end = 0; - auto ldata = FlatVector::GetData(statel); - auto pdata = FlatVector::GetData(statep); + auto ldata = FlatVector::GetData(ltstate.statel); + auto pdata = FlatVector::GetData(ltstate.statep); data_ptr_t prev_state = nullptr; for (idx_t rid = 0; rid < count; ++rid) { auto state_ptr = fdata[rid]; @@ -455,10 +569,10 @@ void WindowSegmentTree::Evaluate(const idx_t *begins, const idx_t *ends, Vector idx_t parent_end = end / TREE_FANOUT; if (prev_state && l_idx == 1 && begin == prev_begin && end == prev_end) { // Just combine the previous top level result - ldata[flush_count] = prev_state; - pdata[flush_count] = state_ptr; - if (++flush_count >= STANDARD_VECTOR_SIZE) { - FlushStates(true); + ldata[ltstate.flush_count] = prev_state; + pdata[ltstate.flush_count] = state_ptr; + if (++ltstate.flush_count >= STANDARD_VECTOR_SIZE) { + ltstate.FlushStates(true); } break; } @@ -471,28 +585,28 @@ void WindowSegmentTree::Evaluate(const idx_t *begins, const idx_t *ends, Vector if (parent_begin == parent_end) { if (l_idx) { - WindowSegmentValue(l_idx, begin, end, state_ptr); + ltstate.WindowSegmentValue(*this, l_idx, begin, end, state_ptr); } break; } idx_t group_begin = parent_begin * TREE_FANOUT; if (begin != group_begin) { if (l_idx) { - WindowSegmentValue(l_idx, begin, group_begin + TREE_FANOUT, state_ptr); + ltstate.WindowSegmentValue(*this, l_idx, begin, group_begin + TREE_FANOUT, state_ptr); } parent_begin++; } idx_t group_end = parent_end * TREE_FANOUT; if (end != group_end) { if (l_idx) { - WindowSegmentValue(l_idx, group_end, end, state_ptr); + ltstate.WindowSegmentValue(*this, l_idx, group_end, end, state_ptr); } } begin = parent_begin; end = parent_end; } } - FlushStates(true); + ltstate.FlushStates(true); // Second pass: aggregate the ragged leaves // (or everything if we can't combine) @@ -509,30 +623,23 @@ void WindowSegmentTree::Evaluate(const idx_t *begins, const idx_t *ends, Vector idx_t parent_begin = begin / TREE_FANOUT; idx_t parent_end = end / TREE_FANOUT; if (parent_begin == parent_end || cant_combine) { - WindowSegmentValue(0, begin, end, state_ptr); + ltstate.WindowSegmentValue(*this, 0, begin, end, state_ptr); continue; } idx_t group_begin = parent_begin * TREE_FANOUT; if (begin != group_begin) { - WindowSegmentValue(0, begin, group_begin + TREE_FANOUT, state_ptr); + ltstate.WindowSegmentValue(*this, 0, begin, group_begin + TREE_FANOUT, state_ptr); parent_begin++; } idx_t group_end = parent_end * TREE_FANOUT; if (end != group_end) { - WindowSegmentValue(0, group_end, end, state_ptr); + ltstate.WindowSegmentValue(*this, 0, group_end, end, state_ptr); } } - FlushStates(false); - - // Finalise the result aggregates - AggregateInputData aggr_input_data(aggr.GetFunctionData(), allocator); - aggr.function.finalize(statef, aggr_input_data, result, count, 0); + ltstate.FlushStates(false); - // Destruct the result aggregates - if (aggr.function.destructor) { - aggr.function.destructor(statef, aggr_input_data, count); - } + ltstate.Finalize(result, count); // Set the validity mask on the invalid rows auto &rmask = FlatVector::Validity(result); diff --git a/src/function/table/read_csv.cpp b/src/function/table/read_csv.cpp index 4690b65dccd8..9d343e73cf75 100644 --- a/src/function/table/read_csv.cpp +++ b/src/function/table/read_csv.cpp @@ -343,7 +343,8 @@ struct ParallelCSVGlobalState : public GlobalTableFunctionState { line_info.lines_read[0][0]++; } } - ParallelCSVGlobalState() : line_info(main_mutex, batch_to_tuple_end, tuple_start, tuple_end) { + explicit ParallelCSVGlobalState(idx_t system_threads_p) + : system_threads(system_threads_p), line_info(main_mutex, batch_to_tuple_end, tuple_start, tuple_end) { running_threads = MaxThreads(); } @@ -405,7 +406,7 @@ struct ParallelCSVGlobalState : public GlobalTableFunctionState { //! How many bytes we should execute per local state idx_t bytes_per_local_state; //! Size of first file - idx_t first_file_size; + idx_t first_file_size = 0; //! Whether or not this is an on-disk file bool on_disk_file = true; //! Basically max number of threads in DuckDB @@ -691,7 +692,7 @@ static unique_ptr ParallelCSVInitGlobal(ClientContext auto &bind_data = input.bind_data->CastNoConst(); if (bind_data.files.empty()) { // This can happen when a filename based filter pushdown has eliminated all possible files for this scan. - return make_uniq(); + return make_uniq(context.db->NumberOfThreads()); } unique_ptr file_handle; diff --git a/src/include/duckdb/core_functions/scalar/debug_functions.hpp b/src/include/duckdb/core_functions/scalar/debug_functions.hpp new file mode 100644 index 000000000000..5c83d51c8ca9 --- /dev/null +++ b/src/include/duckdb/core_functions/scalar/debug_functions.hpp @@ -0,0 +1,27 @@ +//===----------------------------------------------------------------------===// +// DuckDB +// +// duckdb/core_functions/scalar/debug_functions.hpp +// +// +//===----------------------------------------------------------------------===// +// This file is automatically generated by scripts/generate_functions.py +// Do not edit this file manually, your changes will be overwritten +//===----------------------------------------------------------------------===// + +#pragma once + +#include "duckdb/function/function_set.hpp" + +namespace duckdb { + +struct VectorTypeFun { + static constexpr const char *Name = "vector_type"; + static constexpr const char *Parameters = "col"; + static constexpr const char *Description = "Returns the VectorType of a given column"; + static constexpr const char *Example = "vector_type(col)"; + + static ScalarFunction GetFunction(); +}; + +} // namespace duckdb diff --git a/src/include/duckdb/execution/window_executor.hpp b/src/include/duckdb/execution/window_executor.hpp new file mode 100644 index 000000000000..beadd3d68eac --- /dev/null +++ b/src/include/duckdb/execution/window_executor.hpp @@ -0,0 +1,313 @@ +//===----------------------------------------------------------------------===// +// DuckDB +// +// duckdb/execution/window_executor.hpp +// +// +//===----------------------------------------------------------------------===// + +#pragma once + +#include "duckdb/execution/expression_executor.hpp" +#include "duckdb/execution/window_segment_tree.hpp" +#include "duckdb/planner/expression/bound_window_expression.hpp" +#include "duckdb/common/vector_operations/vector_operations.hpp" + +namespace duckdb { + +struct WindowInputExpression { + static void PrepareInputExpression(Expression &expr, ExpressionExecutor &executor, DataChunk &chunk) { + vector types; + types.push_back(expr.return_type); + executor.AddExpression(expr); + + auto &allocator = executor.GetAllocator(); + chunk.Initialize(allocator, types); + } + + WindowInputExpression(optional_ptr expr_p, ClientContext &context) + : expr(expr_p), ptype(PhysicalType::INVALID), scalar(true), executor(context) { + if (expr) { + PrepareInputExpression(*expr, executor, chunk); + ptype = expr->return_type.InternalType(); + scalar = expr->IsScalar(); + } + } + + void Execute(DataChunk &input_chunk) { + if (expr) { + chunk.Reset(); + executor.Execute(input_chunk, chunk); + chunk.Verify(); + } + } + + template + inline T GetCell(idx_t i) const { + D_ASSERT(!chunk.data.empty()); + const auto data = FlatVector::GetData(chunk.data[0]); + return data[scalar ? 0 : i]; + } + + inline bool CellIsNull(idx_t i) const { + D_ASSERT(!chunk.data.empty()); + if (chunk.data[0].GetVectorType() == VectorType::CONSTANT_VECTOR) { + return ConstantVector::IsNull(chunk.data[0]); + } + return FlatVector::IsNull(chunk.data[0], i); + } + + inline void CopyCell(Vector &target, idx_t target_offset) const { + D_ASSERT(!chunk.data.empty()); + auto &source = chunk.data[0]; + auto source_offset = scalar ? 0 : target_offset; + VectorOperations::Copy(source, target, source_offset + 1, source_offset, target_offset); + } + + optional_ptr expr; + PhysicalType ptype; + bool scalar; + ExpressionExecutor executor; + DataChunk chunk; +}; + +struct WindowInputColumn { + WindowInputColumn(Expression *expr_p, ClientContext &context, idx_t capacity_p) + : input_expr(expr_p, context), count(0), capacity(capacity_p) { + if (input_expr.expr) { + target = make_uniq(input_expr.chunk.data[0].GetType(), capacity); + } + } + + void Append(DataChunk &input_chunk) { + if (input_expr.expr) { + const auto source_count = input_chunk.size(); + D_ASSERT(count + source_count <= capacity); + if (!input_expr.scalar || !count) { + input_expr.Execute(input_chunk); + auto &source = input_expr.chunk.data[0]; + VectorOperations::Copy(source, *target, source_count, 0, count); + } + count += source_count; + } + } + + inline bool CellIsNull(idx_t i) const { + D_ASSERT(target); + D_ASSERT(i < count); + return FlatVector::IsNull(*target, input_expr.scalar ? 0 : i); + } + + template + inline T GetCell(idx_t i) const { + D_ASSERT(target); + D_ASSERT(i < count); + const auto data = FlatVector::GetData(*target); + return data[input_expr.scalar ? 0 : i]; + } + + WindowInputExpression input_expr; + +private: + unique_ptr target; + idx_t count; + idx_t capacity; +}; + +// Column indexes of the bounds chunk +enum WindowBounds : uint8_t { PARTITION_BEGIN, PARTITION_END, PEER_BEGIN, PEER_END, WINDOW_BEGIN, WINDOW_END }; + +class WindowExecutorState { +public: + WindowExecutorState() {}; + virtual ~WindowExecutorState() { + } + + template + TARGET &Cast() { + D_ASSERT(dynamic_cast(this)); + return reinterpret_cast(*this); + } + template + const TARGET &Cast() const { + D_ASSERT(dynamic_cast(this)); + return reinterpret_cast(*this); + } +}; + +class WindowExecutor { +public: + WindowExecutor(BoundWindowExpression &wexpr, ClientContext &context, const idx_t payload_count, + const ValidityMask &partition_mask, const ValidityMask &order_mask); + virtual ~WindowExecutor() { + } + + virtual void Sink(DataChunk &input_chunk, const idx_t input_idx, const idx_t total_count) { + range.Append(input_chunk); + } + + virtual void Finalize() { + } + + virtual unique_ptr GetExecutorState() const; + + void Evaluate(idx_t row_idx, DataChunk &input_chunk, Vector &result, WindowExecutorState &lstate) const; + +protected: + // The function + BoundWindowExpression &wexpr; + ClientContext &context; + const idx_t payload_count; + const ValidityMask &partition_mask; + const ValidityMask &order_mask; + + // Expression collections + DataChunk payload_collection; + ExpressionExecutor payload_executor; + DataChunk payload_chunk; + + // evaluate RANGE expressions, if needed + WindowInputColumn range; + + virtual void EvaluateInternal(WindowExecutorState &lstate, Vector &result, idx_t count, idx_t row_idx) const = 0; +}; + +class WindowAggregateExecutor : public WindowExecutor { +public: + bool IsConstantAggregate(); + bool IsCustomAggregate(); + + WindowAggregateExecutor(BoundWindowExpression &wexpr, ClientContext &context, const idx_t payload_count, + const ValidityMask &partition_mask, const ValidityMask &order_mask, + WindowAggregationMode mode); + + void Sink(DataChunk &input_chunk, const idx_t input_idx, const idx_t total_count) override; + void Finalize() override; + + unique_ptr GetExecutorState() const override; + + const WindowAggregationMode mode; + +protected: + ExpressionExecutor filter_executor; + SelectionVector filter_sel; + + // aggregate computation algorithm + unique_ptr aggregator; + + void EvaluateInternal(WindowExecutorState &lstate, Vector &result, idx_t count, idx_t row_idx) const override; +}; + +class WindowRowNumberExecutor : public WindowExecutor { +public: + WindowRowNumberExecutor(BoundWindowExpression &wexpr, ClientContext &context, const idx_t payload_count, + const ValidityMask &partition_mask, const ValidityMask &order_mask); + +protected: + void EvaluateInternal(WindowExecutorState &lstate, Vector &result, idx_t count, idx_t row_idx) const override; +}; + +// Base class for non-aggregate functions that use peer boundaries +class WindowRankExecutor : public WindowExecutor { +public: + WindowRankExecutor(BoundWindowExpression &wexpr, ClientContext &context, const idx_t payload_count, + const ValidityMask &partition_mask, const ValidityMask &order_mask); + + unique_ptr GetExecutorState() const override; + +protected: + void EvaluateInternal(WindowExecutorState &lstate, Vector &result, idx_t count, idx_t row_idx) const override; +}; + +class WindowDenseRankExecutor : public WindowExecutor { +public: + WindowDenseRankExecutor(BoundWindowExpression &wexpr, ClientContext &context, const idx_t payload_count, + const ValidityMask &partition_mask, const ValidityMask &order_mask); + + unique_ptr GetExecutorState() const override; + +protected: + void EvaluateInternal(WindowExecutorState &lstate, Vector &result, idx_t count, idx_t row_idx) const override; +}; + +class WindowPercentRankExecutor : public WindowExecutor { +public: + WindowPercentRankExecutor(BoundWindowExpression &wexpr, ClientContext &context, const idx_t payload_count, + const ValidityMask &partition_mask, const ValidityMask &order_mask); + + unique_ptr GetExecutorState() const override; + +protected: + void EvaluateInternal(WindowExecutorState &lstate, Vector &result, idx_t count, idx_t row_idx) const override; +}; + +class WindowCumeDistExecutor : public WindowExecutor { +public: + WindowCumeDistExecutor(BoundWindowExpression &wexpr, ClientContext &context, const idx_t payload_count, + const ValidityMask &partition_mask, const ValidityMask &order_mask); + +protected: + void EvaluateInternal(WindowExecutorState &lstate, Vector &result, idx_t count, idx_t row_idx) const override; +}; + +// Base class for non-aggregate functions that have a payload +class WindowValueExecutor : public WindowExecutor { +public: + WindowValueExecutor(BoundWindowExpression &wexpr, ClientContext &context, const idx_t payload_count, + const ValidityMask &partition_mask, const ValidityMask &order_mask); + + void Sink(DataChunk &input_chunk, const idx_t input_idx, const idx_t total_count) override; + +protected: + // IGNORE NULLS + ValidityMask ignore_nulls; +}; + +// +class WindowNtileExecutor : public WindowValueExecutor { +public: + WindowNtileExecutor(BoundWindowExpression &wexpr, ClientContext &context, const idx_t payload_count, + const ValidityMask &partition_mask, const ValidityMask &order_mask); + +protected: + void EvaluateInternal(WindowExecutorState &lstate, Vector &result, idx_t count, idx_t row_idx) const override; +}; +class WindowLeadLagExecutor : public WindowValueExecutor { +public: + WindowLeadLagExecutor(BoundWindowExpression &wexpr, ClientContext &context, const idx_t payload_count, + const ValidityMask &partition_mask, const ValidityMask &order_mask); + + unique_ptr GetExecutorState() const override; + +protected: + void EvaluateInternal(WindowExecutorState &lstate, Vector &result, idx_t count, idx_t row_idx) const override; +}; + +class WindowFirstValueExecutor : public WindowValueExecutor { +public: + WindowFirstValueExecutor(BoundWindowExpression &wexpr, ClientContext &context, const idx_t payload_count, + const ValidityMask &partition_mask, const ValidityMask &order_mask); + +protected: + void EvaluateInternal(WindowExecutorState &lstate, Vector &result, idx_t count, idx_t row_idx) const override; +}; + +class WindowLastValueExecutor : public WindowValueExecutor { +public: + WindowLastValueExecutor(BoundWindowExpression &wexpr, ClientContext &context, const idx_t payload_count, + const ValidityMask &partition_mask, const ValidityMask &order_mask); + +protected: + void EvaluateInternal(WindowExecutorState &lstate, Vector &result, idx_t count, idx_t row_idx) const override; +}; + +class WindowNthValueExecutor : public WindowValueExecutor { +public: + WindowNthValueExecutor(BoundWindowExpression &wexpr, ClientContext &context, const idx_t payload_count, + const ValidityMask &partition_mask, const ValidityMask &order_mask); + +protected: + void EvaluateInternal(WindowExecutorState &lstate, Vector &result, idx_t count, idx_t row_idx) const override; +}; + +} // namespace duckdb diff --git a/src/include/duckdb/execution/window_segment_tree.hpp b/src/include/duckdb/execution/window_segment_tree.hpp index bdf84b4987fa..de194c418f28 100644 --- a/src/include/duckdb/execution/window_segment_tree.hpp +++ b/src/include/duckdb/execution/window_segment_tree.hpp @@ -16,20 +16,42 @@ namespace duckdb { -class WindowAggregateState { +class WindowAggregatorState { public: - WindowAggregateState(AggregateObject aggr, const LogicalType &result_type_p, idx_t partition_count); - virtual ~WindowAggregateState(); + WindowAggregatorState(); + virtual ~WindowAggregatorState() { + } + + template + TARGET &Cast() { + D_ASSERT(dynamic_cast(this)); + return reinterpret_cast(*this); + } + template + const TARGET &Cast() const { + D_ASSERT(dynamic_cast(this)); + return reinterpret_cast(*this); + } + //! Allocator for aggregates + ArenaAllocator allocator; +}; + +class WindowAggregator { +public: + WindowAggregator(AggregateObject aggr, const LogicalType &result_type_p, idx_t partition_count); + virtual ~WindowAggregator(); + + // Build virtual void Sink(DataChunk &payload_chunk, SelectionVector *filter_sel, idx_t filtered); virtual void Finalize(); - virtual void Compute(Vector &result, idx_t rid, idx_t start, idx_t end); - virtual void Evaluate(const idx_t *begins, const idx_t *ends, Vector &result, idx_t count); -protected: - void AggregateInit(); - void AggegateFinal(Vector &result, idx_t rid); + // Probe + virtual unique_ptr GetLocalState() const = 0; + virtual void Evaluate(WindowAggregatorState &lstate, const idx_t *begins, const idx_t *ends, Vector &result, + idx_t count) const = 0; +protected: AggregateObject aggr; //! The result type of the window function LogicalType result_type; @@ -38,10 +60,6 @@ class WindowAggregateState { const idx_t partition_count; //! The size of a single aggregate state const idx_t state_size; - //! Data pointer that contains a single state, used for intermediate window segment aggregation - vector state; - //! Reused result state container for the window functions - Vector statef; //! Partition data chunk DataChunk inputs; @@ -49,23 +67,28 @@ class WindowAggregateState { vector filter_bits; ValidityMask filter_mask; idx_t filter_pos; - - //! Allocator for aggregates - ArenaAllocator allocator; + //! The state used by the aggregator to build. + unique_ptr gstate; }; -class WindowConstantAggregate : public WindowAggregateState { +class WindowConstantAggregator : public WindowAggregator { public: - WindowConstantAggregate(AggregateObject aggr, const LogicalType &result_type_p, const ValidityMask &partition_mask, - const idx_t count); - ~WindowConstantAggregate() override { + WindowConstantAggregator(AggregateObject aggr, const LogicalType &result_type_p, const ValidityMask &partition_mask, + const idx_t count); + ~WindowConstantAggregator() override { } void Sink(DataChunk &payload_chunk, SelectionVector *filter_sel, idx_t filtered) override; void Finalize() override; - void Evaluate(const idx_t *begins, const idx_t *ends, Vector &result, idx_t count) override; + + unique_ptr GetLocalState() const override; + void Evaluate(WindowAggregatorState &lstate, const idx_t *begins, const idx_t *ends, Vector &result, + idx_t count) const override; private: + void AggregateInit(); + void AggegateFinal(Vector &result, idx_t rid); + //! Partition starts vector partition_offsets; //! Aggregate results @@ -74,25 +97,25 @@ class WindowConstantAggregate : public WindowAggregateState { idx_t partition; //! The current input row being built/read idx_t row; + //! Data pointer that contains a single state, used for intermediate window segment aggregation + vector state; //! A vector of pointers to "state", used for intermediate window segment aggregation Vector statep; - //! Shared SV for evaluation - SelectionVector matches; + //! Reused result state container for the window functions + Vector statef; }; -class WindowCustomAggregate : public WindowAggregateState { +class WindowCustomAggregator : public WindowAggregator { public: - WindowCustomAggregate(AggregateObject aggr, const LogicalType &result_type_p, idx_t partition_count); - ~WindowCustomAggregate() override; + WindowCustomAggregator(AggregateObject aggr, const LogicalType &result_type_p, idx_t partition_count); + ~WindowCustomAggregator() override; - void Compute(Vector &result, idx_t rid, idx_t start, idx_t end) override; - -private: - //! The frame boundaries, used for the window functions - FrameBounds frame; + unique_ptr GetLocalState() const override; + void Evaluate(WindowAggregatorState &lstate, const idx_t *begins, const idx_t *ends, Vector &result, + idx_t count) const override; }; -class WindowSegmentTree : public WindowAggregateState { +class WindowSegmentTree : public WindowAggregator { public: using FrameBounds = std::pair; @@ -100,32 +123,19 @@ class WindowSegmentTree : public WindowAggregateState { ~WindowSegmentTree() override; void Finalize() override; - void Evaluate(const idx_t *begins, const idx_t *ends, Vector &result, idx_t count) override; -private: + unique_ptr GetLocalState() const override; + void Evaluate(WindowAggregatorState &lstate, const idx_t *begins, const idx_t *ends, Vector &result, + idx_t count) const override; + +public: void ConstructTree(); - void ExtractFrame(idx_t begin, idx_t end, data_ptr_t current_state); - void FlushStates(bool combining); - void WindowSegmentValue(idx_t l_idx, idx_t begin, idx_t end, data_ptr_t current_state); //! Use the combine API, if available inline bool UseCombineAPI() const { return mode < WindowAggregationMode::SEPARATE; } - //! Input data chunk, used for leaf segment aggregation - DataChunk leaves; - //! The filtered rows in inputs. - SelectionVector filter_sel; - //! A vector of pointers to "state", used for intermediate window segment aggregation - Vector statep; - //! The frame boundaries, used for the window functions - FrameBounds frame; - //! Reused state pointers for combining segment tree levels - Vector statel; - //! Count of buffered values - idx_t flush_count; - //! The actual window segment tree: an array of aggregate states that represent all the intermediate nodes unsafe_unique_array levels_flat_native; //! For each level, the starting location in the levels_flat_native array @@ -134,12 +144,9 @@ class WindowSegmentTree : public WindowAggregateState { //! The total number of internal nodes of the tree, stored in levels_flat_native idx_t internal_nodes; - //! Use the window API, if available + //! Use the combine API, if available WindowAggregationMode mode; - //! Aggregate allocator - ArenaAllocator allocator; - // TREE_FANOUT needs to cleanly divide STANDARD_VECTOR_SIZE static constexpr idx_t TREE_FANOUT = 16; }; diff --git a/src/storage/compression/rle.cpp b/src/storage/compression/rle.cpp index f57eee2ca764..9dd081ab47e8 100644 --- a/src/storage/compression/rle.cpp +++ b/src/storage/compression/rle.cpp @@ -166,8 +166,8 @@ struct RLECompressState : public CompressionState { void WriteValue(T value, rle_count_t count, bool is_null) { // write the RLE entry auto handle_ptr = handle.Ptr() + RLEConstants::RLE_HEADER_SIZE; - auto data_pointer = (T *)handle_ptr; - auto index_pointer = (rle_count_t *)(handle_ptr + max_rle_count * sizeof(T)); + auto data_pointer = reinterpret_cast(handle_ptr); + auto index_pointer = reinterpret_cast(handle_ptr + max_rle_count * sizeof(T)); data_pointer[entry_count] = value; index_pointer[entry_count] = count; entry_count++; @@ -257,7 +257,7 @@ struct RLEScanState : public SegmentScanState { void Skip(ColumnSegment &segment, idx_t skip_count) { auto data = handle.Ptr() + segment.GetBlockOffset(); - auto index_pointer = (rle_count_t *)(data + rle_count_offset); + auto index_pointer = reinterpret_cast(data + rle_count_offset); for (idx_t i = 0; i < skip_count; i++) { // assign the current value @@ -292,14 +292,58 @@ void RLESkip(ColumnSegment &segment, ColumnScanState &state, idx_t skip_count) { scan_state.Skip(segment, skip_count); } +static bool CanEmitConstantVector(idx_t position, idx_t run_length, idx_t scan_count) { + if (scan_count != STANDARD_VECTOR_SIZE) { + // Only when we can fill an entire Vector can we emit a ConstantVector, because subsequent scans require the + // input Vector to be flat + return false; + } + D_ASSERT(position < run_length); + auto remaining_in_run = run_length - position; + // The amount of values left in this run are equal or greater than the amount of values we need to scan + return remaining_in_run >= scan_count; +} + +template +inline static void ForwardToNextRun(RLEScanState &scan_state) { + // handled all entries in this RLE value + // move to the next entry + scan_state.entry_pos++; + scan_state.position_in_entry = 0; +} + +template +inline static bool ExhaustedRun(RLEScanState &scan_state, rle_count_t *index_pointer) { + return scan_state.position_in_entry >= index_pointer[scan_state.entry_pos]; +} + +template +static void RLEScanConstant(RLEScanState &scan_state, rle_count_t *index_pointer, T *data_pointer, idx_t scan_count, + Vector &result) { + result.SetVectorType(VectorType::CONSTANT_VECTOR); + auto result_data = ConstantVector::GetData(result); + result_data[0] = data_pointer[scan_state.entry_pos]; + scan_state.position_in_entry += scan_count; + if (ExhaustedRun(scan_state, index_pointer)) { + ForwardToNextRun(scan_state); + } + return; +} + template void RLEScanPartial(ColumnSegment &segment, ColumnScanState &state, idx_t scan_count, Vector &result, idx_t result_offset) { auto &scan_state = state.scan_state->Cast>(); auto data = scan_state.handle.Ptr() + segment.GetBlockOffset(); - auto data_pointer = (T *)(data + RLEConstants::RLE_HEADER_SIZE); - auto index_pointer = (rle_count_t *)(data + scan_state.rle_count_offset); + auto data_pointer = reinterpret_cast(data + RLEConstants::RLE_HEADER_SIZE); + auto index_pointer = reinterpret_cast(data + scan_state.rle_count_offset); + + // If we are scanning an entire Vector and it contains only a single run + if (CanEmitConstantVector(scan_state.position_in_entry, index_pointer[scan_state.entry_pos], scan_count)) { + RLEScanConstant(scan_state, index_pointer, data_pointer, scan_count, result); + return; + } auto result_data = FlatVector::GetData(result); result.SetVectorType(VectorType::FLAT_VECTOR); @@ -307,18 +351,14 @@ void RLEScanPartial(ColumnSegment &segment, ColumnScanState &state, idx_t scan_c // assign the current value result_data[result_offset + i] = data_pointer[scan_state.entry_pos]; scan_state.position_in_entry++; - if (scan_state.position_in_entry >= index_pointer[scan_state.entry_pos]) { - // handled all entries in this RLE value - // move to the next entry - scan_state.entry_pos++; - scan_state.position_in_entry = 0; + if (ExhaustedRun(scan_state, index_pointer)) { + ForwardToNextRun(scan_state); } } } template void RLEScan(ColumnSegment &segment, ColumnScanState &state, idx_t scan_count, Vector &result) { - // FIXME: emit constant vector if repetition of single value is >= scan_count RLEScanPartial(segment, state, scan_count, result, 0); } @@ -331,7 +371,7 @@ void RLEFetchRow(ColumnSegment &segment, ColumnFetchState &state, row_t row_id, scan_state.Skip(segment, row_id); auto data = scan_state.handle.Ptr() + segment.GetBlockOffset(); - auto data_pointer = (T *)(data + RLEConstants::RLE_HEADER_SIZE); + auto data_pointer = reinterpret_cast(data + RLEConstants::RLE_HEADER_SIZE); auto result_data = FlatVector::GetData(result); result_data[result_idx] = data_pointer[scan_state.entry_pos]; } diff --git a/test/sql/copy/parquet/writer/row_group_size_bytes.test b/test/sql/copy/parquet/writer/row_group_size_bytes.test index 4b203a94629d..377e1d71ebb0 100644 --- a/test/sql/copy/parquet/writer/row_group_size_bytes.test +++ b/test/sql/copy/parquet/writer/row_group_size_bytes.test @@ -4,6 +4,8 @@ require parquet +require vector_size 1024 + statement error copy (select 42) to '__TEST_DIR__/tbl.parquet' (ROW_GROUP_SIZE_BYTES) diff --git a/test/sql/storage/compression/rle/rle_constant.test b/test/sql/storage/compression/rle/rle_constant.test new file mode 100644 index 000000000000..f3bd302c969f --- /dev/null +++ b/test/sql/storage/compression/rle/rle_constant.test @@ -0,0 +1,67 @@ +# name: test/sql/storage/compression/rle/rle_constant.test +# description: Test RLE where we can emit ConstantVectors when scanning +# group: [rle] + +# load the DB from disk +load __TEST_DIR__/test_rle.db + +statement ok +PRAGMA force_compression = 'rle' + +# simple RLE with few values +statement ok +CREATE TABLE test (a INTEGER); + +# Produces two full vectors from one run +statement ok +INSERT INTO test select 0 from range(4096); + +# Produces one full vector from one run +statement ok +INSERT INTO test select 1 from range(2048); + +# Dito +statement ok +INSERT INTO test select 2 from range(2048); + +# These do not fully fill the Vector, so they don't produce ConstantVectors +statement ok +INSERT INTO test select 3 from range(1024) + +statement ok +INSERT INTO test select 4 from range(1024) + +statement ok +INSERT INTO test select 5 from range(512) + +statement ok +INSERT INTO test select 6 from range(512) + +statement ok +INSERT INTO test select 7 from range(512) + +statement ok +INSERT INTO test select 8 from range(512) + +statement ok +checkpoint; + + +# Some of them produce constant vectors, but not all +query I +select distinct on (types) vector_type(a) as types from test; +---- +CONSTANT_VECTOR +FLAT_VECTOR + +# The first 4 vectors are constant +query I +select distinct on (types) types from (select vector_type(a) from test limit 8192) tbl(types) +---- +CONSTANT_VECTOR + +# The other vectors are not constant +query I +select distinct on (types) types from (select vector_type(a) from test offset 8192) tbl(types) +---- +FLAT_VECTOR diff --git a/test/sql/window/test_boundary_expr.test b/test/sql/window/test_boundary_expr.test index 4123b84ee85a..8729ddc7bce7 100644 --- a/test/sql/window/test_boundary_expr.test +++ b/test/sql/window/test_boundary_expr.test @@ -67,6 +67,21 @@ SELECT sum(unique1) over (order by unique1 rows between unbounded preceding and 45.000000 45.000000 +# Frame starts past partition end +query R +SELECT sum(unique1) over (order by unique1 rows between 5 following and 10 following) su FROM tenk1 order by unique1 +---- +35 +30 +24 +17 +9 +NULL +NULL +NULL +NULL +NULL + # Issue 1472 statement ok diff --git a/test/sql/window/test_dense_rank.test b/test/sql/window/test_dense_rank.test new file mode 100644 index 000000000000..2454d43f3204 --- /dev/null +++ b/test/sql/window/test_dense_rank.test @@ -0,0 +1,36 @@ +# name: test/sql/window/test_dense_rank.test +# description: Test DENSE_RANK state computations +# group: [window] + +statement ok +PRAGMA enable_verification + +# Multiple chunks, single partition +query IIIII +WITH t AS ( + SELECT i, DENSE_RANK() OVER (ORDER BY i % 50) AS d + FROM range(3000) tbl(i) +), w AS ( + SELECT d, COUNT(*) as c + FROM t + GROUP BY ALL +) +SELECT COUNT(*), MIN(d), MAX(d), MIN(c), MAX(c) +FROM w +---- +50 1 50 60 60 + +# Multiple chunks, multiple partitions +query IIIII +WITH t AS ( + SELECT i, DENSE_RANK() OVER (PARTITION BY i // 3000 ORDER BY i % 50) AS d + FROM range(9000) tbl(i) +), w AS ( + SELECT d, COUNT(*) as c + FROM t + GROUP BY ALL +) +SELECT COUNT(*), MIN(d), MAX(d), MIN(c), MAX(c) +FROM w +---- +50 1 50 180 180 diff --git a/test/sql/window/test_nthvalue.test b/test/sql/window/test_nthvalue.test index db90c3e37329..d2930477d037 100644 --- a/test/sql/window/test_nthvalue.test +++ b/test/sql/window/test_nthvalue.test @@ -143,6 +143,27 @@ sales 1 2 3 sales 3 1 3 sales 4 2 NULL +# Where the second parameter (offset) can be zero (coverage) +query IIII +SELECT depname, empno, empno %3 as offset, + nth_value(empno, empno %3) OVER ( + PARTITION BY depname ORDER BY empno ASC + ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING + ) fv +FROM empsalary +ORDER BY 1, 2 +---- +develop 7 1 7 +develop 8 2 9 +develop 9 0 NULL +develop 10 1 10 +develop 11 2 NULL +personnel 2 2 5 +personnel 5 2 NULL +sales 1 1 1 +sales 3 0 NULL +sales 4 1 4 + # Where the first parameter is a constant query III SELECT depname, empno, diff --git a/test/sql/window/test_rank.test b/test/sql/window/test_rank.test new file mode 100644 index 000000000000..5ec1a7e83ccd --- /dev/null +++ b/test/sql/window/test_rank.test @@ -0,0 +1,36 @@ +# name: test/sql/window/test_rank.test +# description: Test RANK state computations +# group: [window] + +statement ok +PRAGMA enable_verification + +# Multiple chunks, single partition +query IIIII +WITH t AS ( + SELECT i, RANK() OVER (ORDER BY i % 50) AS d + FROM range(3000) tbl(i) +), w AS ( + SELECT d, COUNT(*) as c + FROM t + GROUP BY ALL +) +SELECT COUNT(*), MIN(d), MAX(d), MIN(c), MAX(c) +FROM w +---- +50 1 2941 60 60 + +# Multiple chunks, multiple partitions +query IIIII +WITH t AS ( + SELECT i, RANK() OVER (PARTITION BY i // 3000 ORDER BY i % 50) AS d + FROM range(9000) tbl(i) +), w AS ( + SELECT d, COUNT(*) as c + FROM t + GROUP BY ALL +) +SELECT COUNT(*), MIN(d), MAX(d), MIN(c), MAX(c) +FROM w +---- +50 1 2941 180 180 diff --git a/test/sql/window/test_window_range.test b/test/sql/window/test_window_range.test index b8a04318f890..a014c335a5a5 100644 --- a/test/sql/window/test_window_range.test +++ b/test/sql/window/test_window_range.test @@ -27,7 +27,9 @@ statement ok CREATE VIEW c1 AS SELECT a::${type} AS a, b FROM t1; query II -SELECT a, sum(b) OVER win FROM c1 WINDOW win AS ( ORDER BY a RANGE BETWEEN 5 PRECEDING AND 5 FOLLOWING ) +SELECT a, sum(b) OVER win +FROM c1 +WINDOW win AS ( ORDER BY a RANGE BETWEEN 5::${type} PRECEDING AND 5::${type} FOLLOWING ) ---- 5 30 10 112 @@ -398,6 +400,17 @@ SELECT sum(b) OVER ( 15 15 +query I +SELECT sum(b) OVER ( + ORDER BY a NULLS FIRST RANGE BETWEEN CURRENT ROW AND 10 FOLLOWING + ) FROM t1 ORDER BY 1 NULLS FIRST; +---- +6 +6 +6 +9 +9 + #========================================================================== statement ok diff --git a/tools/rpkg/tests/testthat/helper-arrow.R b/tools/rpkg/tests/testthat/helper-arrow.R new file mode 100644 index 000000000000..33d254f35ec5 --- /dev/null +++ b/tools/rpkg/tests/testthat/helper-arrow.R @@ -0,0 +1,7 @@ +# if not running on cran, we run arrow tests. +# we import arrow first since there are GC issues if we import it after +# running relational R tests. +# something to do with how arrow releases its resources. +if (Sys.getenv("NOT_CRAN") == "true") { + requireNamespace("arrow", quietly = TRUE) +} diff --git a/tools/rpkg/tests/testthat/test_arrow_recordbatchreader.py b/tools/rpkg/tests/testthat/test_arrow_recordbatchreader.py deleted file mode 100644 index 266368284681..000000000000 --- a/tools/rpkg/tests/testthat/test_arrow_recordbatchreader.py +++ /dev/null @@ -1,39 +0,0 @@ -import duckdb -import os -try: - import pyarrow - import pyarrow.parquet - import pyarrow.dataset - import numpy as np - can_run = True -except: - can_run = False - -class TestArrowRecordBatchReader(object): - - def test_parallel_reader(self,duckdb_cursor): - if not can_run: - return - -duckdb_conn = duckdb.connect() -duckdb_conn.execute("PRAGMA threads=4") -duckdb_conn.execute("PRAGMA verify_parallelism") - -parquet_filename = os.path.join('data','userdata1.parquet') - -userdata_parquet_dataset= pyarrow.dataset.dataset([ - parquet_filename, - parquet_filename, - parquet_filename, -] -, format="parquet") - -batches= [r for r in userdata_parquet_dataset.to_batches()] -reader=pyarrow.dataset.Scanner.from_batches(batches,userdata_parquet_dataset.schema).to_reader() - - -rel = duckdb_conn.from_arrow_table(reader) - -rows=rel.fetchall() - -assert rel.filter("first_name=\'Jose\' and salary > 134708.82").aggregate('count(*)').execute().fetchone()[0] == 12 \ No newline at end of file diff --git a/tools/rpkg/tests/testthat/test_struct.R b/tools/rpkg/tests/testthat/test_struct.R index 973f4cc5f529..3c5874c205d9 100644 --- a/tools/rpkg/tests/testthat/test_struct.R +++ b/tools/rpkg/tests/testthat/test_struct.R @@ -49,7 +49,7 @@ test_that("structs can be read", { }) test_that("structs give the same results via Arrow", { - skip_if_not_installed("clearlynotinstalled") + skip_on_cran() skip_if_not_installed("vctrs") skip_if_not_installed("tibble") skip_if_not_installed("arrow")