diff --git a/src/execution/operator/aggregate/physical_window.cpp b/src/execution/operator/aggregate/physical_window.cpp index 598d77f4640..7790eb67de3 100644 --- a/src/execution/operator/aggregate/physical_window.cpp +++ b/src/execution/operator/aggregate/physical_window.cpp @@ -205,6 +205,8 @@ class WindowGlobalSourceState : public GlobalSourceState { mutable mutex built_lock; //! The number of unfinished tasks atomic tasks_remaining; + //! The number of rows returned + atomic returned; public: idx_t MaxThreads() override { @@ -217,7 +219,7 @@ class WindowGlobalSourceState : public GlobalSourceState { }; WindowGlobalSourceState::WindowGlobalSourceState(ClientContext &context_p, WindowGlobalSinkState &gsink_p) - : context(context_p), gsink(gsink_p), next_build(0), tasks_remaining(0) { + : context(context_p), gsink(gsink_p), next_build(0), tasks_remaining(0), returned(0) { auto &hash_groups = gsink.global_partition->hash_groups; auto &gpart = gsink.global_partition; @@ -681,6 +683,15 @@ OrderPreservationType PhysicalWindow::SourceOrder() const { return SupportsBatchIndex() ? OrderPreservationType::FIXED_ORDER : OrderPreservationType::NO_ORDER; } +double PhysicalWindow::GetProgress(ClientContext &context, GlobalSourceState &gsource_p) const { + auto &gsource = gsource_p.Cast(); + const auto returned = gsource.returned.load(); + + auto &gsink = gsource.gsink; + const auto count = gsink.global_partition->count.load(); + return count ? (returned / double(count)) : -1; +} + idx_t PhysicalWindow::GetBatchIndex(ExecutionContext &context, DataChunk &chunk, GlobalSourceState &gstate_p, LocalSourceState &lstate_p) const { auto &lstate = lstate_p.Cast(); @@ -689,6 +700,7 @@ idx_t PhysicalWindow::GetBatchIndex(ExecutionContext &context, DataChunk &chunk, SourceResultType PhysicalWindow::GetData(ExecutionContext &context, DataChunk &chunk, OperatorSourceInput &input) const { + auto &gsource = input.global_state.Cast(); auto &lsource = input.local_state.Cast(); while (chunk.size() == 0) { // Move to the next bin if we are done. @@ -699,6 +711,7 @@ SourceResultType PhysicalWindow::GetData(ExecutionContext &context, DataChunk &c } lsource.Scan(chunk); + gsource.returned += chunk.size(); } return chunk.size() == 0 ? SourceResultType::FINISHED : SourceResultType::HAVE_MORE_OUTPUT; diff --git a/src/include/duckdb/execution/operator/aggregate/physical_window.hpp b/src/include/duckdb/execution/operator/aggregate/physical_window.hpp index aad04b56277..a554a46bc74 100644 --- a/src/include/duckdb/execution/operator/aggregate/physical_window.hpp +++ b/src/include/duckdb/execution/operator/aggregate/physical_window.hpp @@ -50,6 +50,8 @@ class PhysicalWindow : public PhysicalOperator { bool SupportsBatchIndex() const override; OrderPreservationType SourceOrder() const override; + double GetProgress(ClientContext &context, GlobalSourceState &gstate_p) const override; + public: // Sink interface SinkResultType Sink(ExecutionContext &context, DataChunk &chunk, OperatorSinkInput &input) const override;