From e7a6a21ddd6d1cec6a917335b589f450b6d9c8ce Mon Sep 17 00:00:00 2001 From: Laurens Kuiper Date: Mon, 17 Jul 2023 14:25:37 +0200 Subject: [PATCH] parallelize scanning of ungrouped distinct HT's --- .../physical_ungrouped_aggregate.cpp | 263 +++++++++++------- src/include/duckdb/parallel/event.hpp | 13 +- 2 files changed, 173 insertions(+), 103 deletions(-) diff --git a/src/execution/operator/aggregate/physical_ungrouped_aggregate.cpp b/src/execution/operator/aggregate/physical_ungrouped_aggregate.cpp index 91271a1a39f2..d0a625559841 100644 --- a/src/execution/operator/aggregate/physical_ungrouped_aggregate.cpp +++ b/src/execution/operator/aggregate/physical_ungrouped_aggregate.cpp @@ -37,7 +37,9 @@ PhysicalUngroupedAggregate::PhysicalUngroupedAggregate(vector types //===--------------------------------------------------------------------===// struct AggregateState { explicit AggregateState(const vector> &aggregate_expressions) { - for (auto &aggregate : aggregate_expressions) { + counts = make_uniq_array>(aggregate_expressions.size()); + for (idx_t i = 0; i < aggregate_expressions.size(); i++) { + auto &aggregate = aggregate_expressions[i]; D_ASSERT(aggregate->GetExpressionClass() == ExpressionClass::BOUND_AGGREGATE); auto &aggr = aggregate->Cast(); auto state = make_unsafe_uniq_array(aggr.function.state_size()); @@ -46,7 +48,7 @@ struct AggregateState { bind_data.push_back(aggr.bind_info.get()); destructors.push_back(aggr.function.destructor); #ifdef DEBUG - counts.push_back(0); + counts[i] = 0; #endif } } @@ -77,7 +79,7 @@ struct AggregateState { //! The destructors vector destructors; //! Counts (used for verification) - vector counts; + unique_array> counts; }; class UngroupedAggregateGlobalState : public GlobalSinkState { @@ -355,126 +357,183 @@ void PhysicalUngroupedAggregate::Combine(ExecutionContext &context, GlobalSinkSt client_profiler.Flush(context.thread.profiler); } +class UngroupedDistinctAggregateFinalizeEvent : public BasePipelineEvent { +public: + UngroupedDistinctAggregateFinalizeEvent(ClientContext &context, const PhysicalUngroupedAggregate &op_p, + UngroupedAggregateGlobalState &gstate_p, Pipeline &pipeline_p) + : BasePipelineEvent(pipeline_p), context(context), op(op_p), gstate(gstate_p), tasks_scheduled(0), + tasks_done(0) { + } + +public: + void Schedule() override; + +private: + ClientContext &context; + + const PhysicalUngroupedAggregate &op; + UngroupedAggregateGlobalState &gstate; + +public: + mutex lock; + idx_t tasks_scheduled; + idx_t tasks_done; + + vector distinct_agg_idxs; + vector> global_source_states; +}; + class UngroupedDistinctAggregateFinalizeTask : public ExecutorTask { public: UngroupedDistinctAggregateFinalizeTask(Executor &executor, shared_ptr event_p, - UngroupedAggregateGlobalState &state_p, ClientContext &context, - const PhysicalUngroupedAggregate &op) - : ExecutorTask(executor), event(std::move(event_p)), gstate(state_p), context(context), op(op), - allocator(BufferAllocator::Get(context)) { + const PhysicalUngroupedAggregate &op, UngroupedAggregateGlobalState &state_p) + : ExecutorTask(executor), event(std::move(event_p)), op(op), gstate(state_p), + allocator(BufferAllocator::Get(executor.context)) { } - void AggregateDistinct() { - D_ASSERT(gstate.distinct_state); - auto &aggregates = op.aggregates; - auto &distinct_state = *gstate.distinct_state; - auto &distinct_data = *op.distinct_data; + TaskExecutionResult ExecuteTask(TaskExecutionMode mode) override; - ThreadContext temp_thread_context(context); - ExecutionContext temp_exec_context(context, temp_thread_context, nullptr); +private: + void AggregateDistinct(); - idx_t payload_idx = 0; - idx_t next_payload_idx = 0; +private: + shared_ptr event; - for (idx_t agg_idx = 0; agg_idx < aggregates.size(); agg_idx++) { - auto &aggregate = aggregates[agg_idx]->Cast(); + const PhysicalUngroupedAggregate &op; + UngroupedAggregateGlobalState &gstate; - // Forward the payload idx - payload_idx = next_payload_idx; - next_payload_idx = payload_idx + aggregate.children.size(); + ArenaAllocator allocator; +}; - // If aggregate is not distinct, skip it - if (!distinct_data.IsDistinct(agg_idx)) { - continue; +void UngroupedDistinctAggregateFinalizeEvent::Schedule() { + D_ASSERT(gstate.distinct_state); + auto &aggregates = op.aggregates; + auto &distinct_data = *op.distinct_data; + + idx_t payload_idx = 0; + idx_t next_payload_idx = 0; + for (idx_t agg_idx = 0; agg_idx < aggregates.size(); agg_idx++) { + auto &aggregate = aggregates[agg_idx]->Cast(); + + // Forward the payload idx + payload_idx = next_payload_idx; + next_payload_idx = payload_idx + aggregate.children.size(); + + // If aggregate is not distinct, skip it + if (!distinct_data.IsDistinct(agg_idx)) { + continue; + } + D_ASSERT(distinct_data.info.table_map.count(agg_idx)); + + distinct_agg_idxs.push_back(agg_idx); + + // Create global state for scanning + auto table_idx = distinct_data.info.table_map.at(agg_idx); + auto &radix_table_p = *distinct_data.radix_tables[table_idx]; + global_source_states.push_back(radix_table_p.GetGlobalSourceState(context)); + } + + D_ASSERT(!distinct_agg_idxs.empty()); + D_ASSERT(distinct_agg_idxs.size() == global_source_states.size()); + + const idx_t n_threads = TaskScheduler::GetScheduler(context).NumberOfThreads(); + vector> tasks; + for (idx_t i = 0; i < n_threads; i++) { + tasks.push_back( + make_uniq(pipeline->executor, shared_from_this(), op, gstate)); + tasks_scheduled++; + } + SetTasks(std::move(tasks)); +} + +TaskExecutionResult UngroupedDistinctAggregateFinalizeTask::ExecuteTask(TaskExecutionMode mode) { + AggregateDistinct(); + event->FinishTask(); + return TaskExecutionResult::TASK_FINISHED; +} + +void UngroupedDistinctAggregateFinalizeTask::AggregateDistinct() { + D_ASSERT(gstate.distinct_state); + auto &distinct_state = *gstate.distinct_state; + auto &distinct_data = *op.distinct_data; + + // Create thread-local copy of aggregate state + auto &aggregates = op.aggregates; + AggregateState state(aggregates); + + // Thread-local contexts + ThreadContext thread_context(executor.context); + ExecutionContext execution_context(executor.context, thread_context, nullptr); + + // Now loop through the distinct aggregates, scanning the distinct HTs + auto &finalize_event = event->Cast(); + for (idx_t distinct_idx = 0; distinct_idx < finalize_event.distinct_agg_idxs.size(); distinct_idx++) { + const auto &agg_idx = finalize_event.distinct_agg_idxs[distinct_idx]; + auto &aggregate = aggregates[agg_idx]->Cast(); + AggregateInputData aggr_input_data(aggregate.bind_info.get(), allocator); + + const auto table_idx = distinct_data.info.table_map.at(agg_idx); + auto &radix_table = *distinct_data.radix_tables[table_idx]; + auto lstate = radix_table.GetLocalSourceState(execution_context); + + auto &sink = *distinct_state.radix_states[table_idx]; + InterruptState interrupt_state; + OperatorSourceInput source_input {*finalize_event.global_source_states[distinct_idx], *lstate, interrupt_state}; + + DataChunk output_chunk; + output_chunk.Initialize(executor.context, distinct_state.distinct_output_chunks[table_idx]->GetTypes()); + + DataChunk payload_chunk; + payload_chunk.InitializeEmpty(distinct_data.grouped_aggregate_data[table_idx]->group_types); + payload_chunk.SetCardinality(0); + + while (true) { + output_chunk.Reset(); + auto res = radix_table.GetData(execution_context, output_chunk, sink, source_input); + + if (res == SourceResultType::FINISHED) { + D_ASSERT(output_chunk.size() == 0); + break; + } else if (res == SourceResultType::BLOCKED) { + throw InternalException( + "Unexpected interrupt from radix table GetData in UngroupedDistinctAggregateFinalizeTask"); + } + + // We dont need to resolve the filter, we already did this in Sink + idx_t payload_cnt = aggregate.children.size(); + for (idx_t i = 0; i < payload_cnt; i++) { + payload_chunk.data[i].Reference(output_chunk.data[i]); } + payload_chunk.SetCardinality(output_chunk); - DataChunk payload_chunk; - - D_ASSERT(distinct_data.info.table_map.count(agg_idx)); - auto table_idx = distinct_data.info.table_map.at(agg_idx); - auto &radix_table_p = distinct_data.radix_tables[table_idx]; - auto &output_chunk = *distinct_state.distinct_output_chunks[table_idx]; - auto &grouped_aggregate_data = *distinct_data.grouped_aggregate_data[table_idx]; - - payload_chunk.InitializeEmpty(grouped_aggregate_data.group_types); - payload_chunk.SetCardinality(0); - - //! Create global and local state for the hashtable - auto global_source_state = radix_table_p->GetGlobalSourceState(context); - auto local_source_state = radix_table_p->GetLocalSourceState(temp_exec_context); - - //! Retrieve the stored data from the hashtable - while (true) { - output_chunk.Reset(); - - InterruptState interrupt_state; - OperatorSourceInput source_input {*global_source_state, *local_source_state, interrupt_state}; - auto res = radix_table_p->GetData(temp_exec_context, output_chunk, - *distinct_state.radix_states[table_idx], source_input); - if (res == SourceResultType::FINISHED) { - D_ASSERT(output_chunk.size() == 0); - break; - } else if (res == SourceResultType::BLOCKED) { - throw InternalException( - "Unexpected interrupt from radix table GetData in UngroupedDistinctAggregateFinalizeTask"); - } - - // We dont need to resolve the filter, we already did this in Sink - idx_t payload_cnt = aggregate.children.size(); - for (idx_t i = 0; i < payload_cnt; i++) { - payload_chunk.data[i].Reference(output_chunk.data[i]); - } - payload_chunk.SetCardinality(output_chunk); #ifdef DEBUG - gstate.state.counts[agg_idx] += payload_chunk.size(); + gstate.state.counts[agg_idx] += payload_chunk.size(); #endif - auto start_of_input = payload_cnt ? &payload_chunk.data[0] : nullptr; - //! Update the aggregate state - AggregateInputData aggr_input_data(aggregate.bind_info.get(), allocator); - aggregate.function.simple_update(start_of_input, aggr_input_data, payload_cnt, - gstate.state.aggregates[agg_idx].get(), payload_chunk.size()); - } + // Update the aggregate state + auto start_of_input = payload_cnt ? &payload_chunk.data[0] : nullptr; + aggregate.function.simple_update(start_of_input, aggr_input_data, payload_cnt, + state.aggregates[agg_idx].get(), payload_chunk.size()); } - D_ASSERT(!gstate.finished); - gstate.finished = true; - } - - TaskExecutionResult ExecuteTask(TaskExecutionMode mode) override { - AggregateDistinct(); - event->FinishTask(); - return TaskExecutionResult::TASK_FINISHED; } -private: - shared_ptr event; - UngroupedAggregateGlobalState &gstate; - ClientContext &context; - const PhysicalUngroupedAggregate &op; - ArenaAllocator allocator; -}; + // After scanning the distinct HTs, we can combine the thread-local agg states with the thread-global + lock_guard guard(finalize_event.lock); + for (idx_t distinct_idx = 0; distinct_idx < finalize_event.distinct_agg_idxs.size(); distinct_idx++) { + const auto &agg_idx = finalize_event.distinct_agg_idxs[distinct_idx]; + auto &aggregate = aggregates[agg_idx]->Cast(); + AggregateInputData aggr_input_data(aggregate.bind_info.get(), allocator); -// TODO: Create tasks and run these in parallel instead of doing this all in Schedule, single threaded -class UngroupedDistinctAggregateFinalizeEvent : public BasePipelineEvent { -public: - UngroupedDistinctAggregateFinalizeEvent(const PhysicalUngroupedAggregate &op_p, - UngroupedAggregateGlobalState &gstate_p, Pipeline &pipeline_p, - ClientContext &context) - : BasePipelineEvent(pipeline_p), op(op_p), gstate(gstate_p), context(context) { + Vector state_vec(Value::POINTER(CastPointerToValue(state.aggregates[agg_idx].get()))); + Vector combined_vec(Value::POINTER(CastPointerToValue(gstate.state.aggregates[agg_idx].get()))); + aggregate.function.combine(state_vec, combined_vec, aggr_input_data, 1); } - const PhysicalUngroupedAggregate &op; - UngroupedAggregateGlobalState &gstate; - ClientContext &context; -public: - void Schedule() override { - vector> tasks; - tasks.push_back(make_uniq(pipeline->executor, shared_from_this(), - gstate, context, op)); - D_ASSERT(!tasks.empty()); - SetTasks(std::move(tasks)); + D_ASSERT(!gstate.finished); + if (++finalize_event.tasks_done == finalize_event.tasks_scheduled) { + gstate.finished = true; } -}; +} SinkFinalizeType PhysicalUngroupedAggregate::FinalizeDistinct(Pipeline &pipeline, Event &event, ClientContext &context, GlobalSinkState &gstate_p) const { @@ -487,7 +546,7 @@ SinkFinalizeType PhysicalUngroupedAggregate::FinalizeDistinct(Pipeline &pipeline auto &radix_state = *distinct_state.radix_states[table_idx]; radix_table_p->Finalize(context, radix_state); } - auto new_event = make_shared(*this, gstate, pipeline, context); + auto new_event = make_shared(context, *this, gstate, pipeline); event.InsertEvent(std::move(new_event)); return SinkFinalizeType::READY; } diff --git a/src/include/duckdb/parallel/event.hpp b/src/include/duckdb/parallel/event.hpp index 99ea705f6c80..1cfee6917c8a 100644 --- a/src/include/duckdb/parallel/event.hpp +++ b/src/include/duckdb/parallel/event.hpp @@ -18,7 +18,7 @@ class Task; class Event : public std::enable_shared_from_this { public: - Event(Executor &executor); + explicit Event(Executor &executor); virtual ~Event() = default; public: @@ -52,6 +52,17 @@ class Event : public std::enable_shared_from_this { virtual void PrintPipeline() { } + template + TARGET &Cast() { + D_ASSERT(dynamic_cast(this)); + return reinterpret_cast(*this); + } + template + const TARGET &Cast() const { + D_ASSERT(dynamic_cast(this)); + return reinterpret_cast(*this); + } + protected: Executor &executor; //! The current threads working on the event