Skip to content

Commit

Permalink
Merge pull request duckdb#11702 from hawkfish/window-progress
Browse files Browse the repository at this point in the history
Internal duckdb#1848: Window Progress
  • Loading branch information
Mytherin committed Apr 19, 2024
2 parents aec0375 + eadb2cf commit f29b2db
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 1 deletion.
15 changes: 14 additions & 1 deletion src/execution/operator/aggregate/physical_window.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,8 @@ class WindowGlobalSourceState : public GlobalSourceState {
mutable mutex built_lock;
//! The number of unfinished tasks
atomic<idx_t> tasks_remaining;
//! The number of rows returned
atomic<idx_t> returned;

public:
idx_t MaxThreads() override {
Expand All @@ -217,7 +219,7 @@ class WindowGlobalSourceState : public GlobalSourceState {
};

WindowGlobalSourceState::WindowGlobalSourceState(ClientContext &context_p, WindowGlobalSinkState &gsink_p)
: context(context_p), gsink(gsink_p), next_build(0), tasks_remaining(0) {
: context(context_p), gsink(gsink_p), next_build(0), tasks_remaining(0), returned(0) {
auto &hash_groups = gsink.global_partition->hash_groups;

auto &gpart = gsink.global_partition;
Expand Down Expand Up @@ -681,6 +683,15 @@ OrderPreservationType PhysicalWindow::SourceOrder() const {
return SupportsBatchIndex() ? OrderPreservationType::FIXED_ORDER : OrderPreservationType::NO_ORDER;
}

double PhysicalWindow::GetProgress(ClientContext &context, GlobalSourceState &gsource_p) const {
auto &gsource = gsource_p.Cast<WindowGlobalSourceState>();
const auto returned = gsource.returned.load();

auto &gsink = gsource.gsink;
const auto count = gsink.global_partition->count.load();
return count ? (returned / double(count)) : -1;
}

idx_t PhysicalWindow::GetBatchIndex(ExecutionContext &context, DataChunk &chunk, GlobalSourceState &gstate_p,
LocalSourceState &lstate_p) const {
auto &lstate = lstate_p.Cast<WindowLocalSourceState>();
Expand All @@ -689,6 +700,7 @@ idx_t PhysicalWindow::GetBatchIndex(ExecutionContext &context, DataChunk &chunk,

SourceResultType PhysicalWindow::GetData(ExecutionContext &context, DataChunk &chunk,
OperatorSourceInput &input) const {
auto &gsource = input.global_state.Cast<WindowGlobalSourceState>();
auto &lsource = input.local_state.Cast<WindowLocalSourceState>();
while (chunk.size() == 0) {
// Move to the next bin if we are done.
Expand All @@ -699,6 +711,7 @@ SourceResultType PhysicalWindow::GetData(ExecutionContext &context, DataChunk &c
}

lsource.Scan(chunk);
gsource.returned += chunk.size();
}

return chunk.size() == 0 ? SourceResultType::FINISHED : SourceResultType::HAVE_MORE_OUTPUT;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ class PhysicalWindow : public PhysicalOperator {
bool SupportsBatchIndex() const override;
OrderPreservationType SourceOrder() const override;

double GetProgress(ClientContext &context, GlobalSourceState &gstate_p) const override;

public:
// Sink interface
SinkResultType Sink(ExecutionContext &context, DataChunk &chunk, OperatorSinkInput &input) const override;
Expand Down

0 comments on commit f29b2db

Please sign in to comment.