Skip to content

Commit

Permalink
parallelize scanning of ungrouped distinct HT's
Browse files Browse the repository at this point in the history
  • Loading branch information
lnkuiper committed Jul 17, 2023
1 parent 04c6cee commit e7a6a21
Show file tree
Hide file tree
Showing 2 changed files with 173 additions and 103 deletions.
263 changes: 161 additions & 102 deletions src/execution/operator/aggregate/physical_ungrouped_aggregate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@ PhysicalUngroupedAggregate::PhysicalUngroupedAggregate(vector<LogicalType> types
//===--------------------------------------------------------------------===//
struct AggregateState {
explicit AggregateState(const vector<unique_ptr<Expression>> &aggregate_expressions) {
for (auto &aggregate : aggregate_expressions) {
counts = make_uniq_array<atomic<idx_t>>(aggregate_expressions.size());
for (idx_t i = 0; i < aggregate_expressions.size(); i++) {
auto &aggregate = aggregate_expressions[i];
D_ASSERT(aggregate->GetExpressionClass() == ExpressionClass::BOUND_AGGREGATE);
auto &aggr = aggregate->Cast<BoundAggregateExpression>();
auto state = make_unsafe_uniq_array<data_t>(aggr.function.state_size());
Expand All @@ -46,7 +48,7 @@ struct AggregateState {
bind_data.push_back(aggr.bind_info.get());
destructors.push_back(aggr.function.destructor);
#ifdef DEBUG
counts.push_back(0);
counts[i] = 0;
#endif
}
}
Expand Down Expand Up @@ -77,7 +79,7 @@ struct AggregateState {
//! The destructors
vector<aggregate_destructor_t> destructors;
//! Counts (used for verification)
vector<idx_t> counts;
unique_array<atomic<idx_t>> counts;
};

class UngroupedAggregateGlobalState : public GlobalSinkState {
Expand Down Expand Up @@ -355,126 +357,183 @@ void PhysicalUngroupedAggregate::Combine(ExecutionContext &context, GlobalSinkSt
client_profiler.Flush(context.thread.profiler);
}

class UngroupedDistinctAggregateFinalizeEvent : public BasePipelineEvent {
public:
UngroupedDistinctAggregateFinalizeEvent(ClientContext &context, const PhysicalUngroupedAggregate &op_p,
UngroupedAggregateGlobalState &gstate_p, Pipeline &pipeline_p)
: BasePipelineEvent(pipeline_p), context(context), op(op_p), gstate(gstate_p), tasks_scheduled(0),
tasks_done(0) {
}

public:
void Schedule() override;

private:
ClientContext &context;

const PhysicalUngroupedAggregate &op;
UngroupedAggregateGlobalState &gstate;

public:
mutex lock;
idx_t tasks_scheduled;
idx_t tasks_done;

vector<idx_t> distinct_agg_idxs;
vector<unique_ptr<GlobalSourceState>> global_source_states;
};

class UngroupedDistinctAggregateFinalizeTask : public ExecutorTask {
public:
UngroupedDistinctAggregateFinalizeTask(Executor &executor, shared_ptr<Event> event_p,
UngroupedAggregateGlobalState &state_p, ClientContext &context,
const PhysicalUngroupedAggregate &op)
: ExecutorTask(executor), event(std::move(event_p)), gstate(state_p), context(context), op(op),
allocator(BufferAllocator::Get(context)) {
const PhysicalUngroupedAggregate &op, UngroupedAggregateGlobalState &state_p)
: ExecutorTask(executor), event(std::move(event_p)), op(op), gstate(state_p),
allocator(BufferAllocator::Get(executor.context)) {
}

void AggregateDistinct() {
D_ASSERT(gstate.distinct_state);
auto &aggregates = op.aggregates;
auto &distinct_state = *gstate.distinct_state;
auto &distinct_data = *op.distinct_data;
TaskExecutionResult ExecuteTask(TaskExecutionMode mode) override;

ThreadContext temp_thread_context(context);
ExecutionContext temp_exec_context(context, temp_thread_context, nullptr);
private:
void AggregateDistinct();

idx_t payload_idx = 0;
idx_t next_payload_idx = 0;
private:
shared_ptr<Event> event;

for (idx_t agg_idx = 0; agg_idx < aggregates.size(); agg_idx++) {
auto &aggregate = aggregates[agg_idx]->Cast<BoundAggregateExpression>();
const PhysicalUngroupedAggregate &op;
UngroupedAggregateGlobalState &gstate;

// Forward the payload idx
payload_idx = next_payload_idx;
next_payload_idx = payload_idx + aggregate.children.size();
ArenaAllocator allocator;
};

// If aggregate is not distinct, skip it
if (!distinct_data.IsDistinct(agg_idx)) {
continue;
void UngroupedDistinctAggregateFinalizeEvent::Schedule() {
D_ASSERT(gstate.distinct_state);
auto &aggregates = op.aggregates;
auto &distinct_data = *op.distinct_data;

idx_t payload_idx = 0;
idx_t next_payload_idx = 0;
for (idx_t agg_idx = 0; agg_idx < aggregates.size(); agg_idx++) {
auto &aggregate = aggregates[agg_idx]->Cast<BoundAggregateExpression>();

// Forward the payload idx
payload_idx = next_payload_idx;
next_payload_idx = payload_idx + aggregate.children.size();

// If aggregate is not distinct, skip it
if (!distinct_data.IsDistinct(agg_idx)) {
continue;
}
D_ASSERT(distinct_data.info.table_map.count(agg_idx));

distinct_agg_idxs.push_back(agg_idx);

// Create global state for scanning
auto table_idx = distinct_data.info.table_map.at(agg_idx);
auto &radix_table_p = *distinct_data.radix_tables[table_idx];
global_source_states.push_back(radix_table_p.GetGlobalSourceState(context));
}

D_ASSERT(!distinct_agg_idxs.empty());
D_ASSERT(distinct_agg_idxs.size() == global_source_states.size());

const idx_t n_threads = TaskScheduler::GetScheduler(context).NumberOfThreads();
vector<shared_ptr<Task>> tasks;
for (idx_t i = 0; i < n_threads; i++) {
tasks.push_back(
make_uniq<UngroupedDistinctAggregateFinalizeTask>(pipeline->executor, shared_from_this(), op, gstate));
tasks_scheduled++;
}
SetTasks(std::move(tasks));
}

TaskExecutionResult UngroupedDistinctAggregateFinalizeTask::ExecuteTask(TaskExecutionMode mode) {
AggregateDistinct();
event->FinishTask();
return TaskExecutionResult::TASK_FINISHED;
}

void UngroupedDistinctAggregateFinalizeTask::AggregateDistinct() {
D_ASSERT(gstate.distinct_state);
auto &distinct_state = *gstate.distinct_state;
auto &distinct_data = *op.distinct_data;

// Create thread-local copy of aggregate state
auto &aggregates = op.aggregates;
AggregateState state(aggregates);

// Thread-local contexts
ThreadContext thread_context(executor.context);
ExecutionContext execution_context(executor.context, thread_context, nullptr);

// Now loop through the distinct aggregates, scanning the distinct HTs
auto &finalize_event = event->Cast<UngroupedDistinctAggregateFinalizeEvent>();
for (idx_t distinct_idx = 0; distinct_idx < finalize_event.distinct_agg_idxs.size(); distinct_idx++) {
const auto &agg_idx = finalize_event.distinct_agg_idxs[distinct_idx];
auto &aggregate = aggregates[agg_idx]->Cast<BoundAggregateExpression>();
AggregateInputData aggr_input_data(aggregate.bind_info.get(), allocator);

const auto table_idx = distinct_data.info.table_map.at(agg_idx);
auto &radix_table = *distinct_data.radix_tables[table_idx];
auto lstate = radix_table.GetLocalSourceState(execution_context);

auto &sink = *distinct_state.radix_states[table_idx];
InterruptState interrupt_state;
OperatorSourceInput source_input {*finalize_event.global_source_states[distinct_idx], *lstate, interrupt_state};

DataChunk output_chunk;
output_chunk.Initialize(executor.context, distinct_state.distinct_output_chunks[table_idx]->GetTypes());

DataChunk payload_chunk;
payload_chunk.InitializeEmpty(distinct_data.grouped_aggregate_data[table_idx]->group_types);
payload_chunk.SetCardinality(0);

while (true) {
output_chunk.Reset();
auto res = radix_table.GetData(execution_context, output_chunk, sink, source_input);

if (res == SourceResultType::FINISHED) {
D_ASSERT(output_chunk.size() == 0);
break;
} else if (res == SourceResultType::BLOCKED) {
throw InternalException(
"Unexpected interrupt from radix table GetData in UngroupedDistinctAggregateFinalizeTask");
}

// We dont need to resolve the filter, we already did this in Sink
idx_t payload_cnt = aggregate.children.size();
for (idx_t i = 0; i < payload_cnt; i++) {
payload_chunk.data[i].Reference(output_chunk.data[i]);
}
payload_chunk.SetCardinality(output_chunk);

DataChunk payload_chunk;

D_ASSERT(distinct_data.info.table_map.count(agg_idx));
auto table_idx = distinct_data.info.table_map.at(agg_idx);
auto &radix_table_p = distinct_data.radix_tables[table_idx];
auto &output_chunk = *distinct_state.distinct_output_chunks[table_idx];
auto &grouped_aggregate_data = *distinct_data.grouped_aggregate_data[table_idx];

payload_chunk.InitializeEmpty(grouped_aggregate_data.group_types);
payload_chunk.SetCardinality(0);

//! Create global and local state for the hashtable
auto global_source_state = radix_table_p->GetGlobalSourceState(context);
auto local_source_state = radix_table_p->GetLocalSourceState(temp_exec_context);

//! Retrieve the stored data from the hashtable
while (true) {
output_chunk.Reset();

InterruptState interrupt_state;
OperatorSourceInput source_input {*global_source_state, *local_source_state, interrupt_state};
auto res = radix_table_p->GetData(temp_exec_context, output_chunk,
*distinct_state.radix_states[table_idx], source_input);
if (res == SourceResultType::FINISHED) {
D_ASSERT(output_chunk.size() == 0);
break;
} else if (res == SourceResultType::BLOCKED) {
throw InternalException(
"Unexpected interrupt from radix table GetData in UngroupedDistinctAggregateFinalizeTask");
}

// We dont need to resolve the filter, we already did this in Sink
idx_t payload_cnt = aggregate.children.size();
for (idx_t i = 0; i < payload_cnt; i++) {
payload_chunk.data[i].Reference(output_chunk.data[i]);
}
payload_chunk.SetCardinality(output_chunk);
#ifdef DEBUG
gstate.state.counts[agg_idx] += payload_chunk.size();
gstate.state.counts[agg_idx] += payload_chunk.size();
#endif

auto start_of_input = payload_cnt ? &payload_chunk.data[0] : nullptr;
//! Update the aggregate state
AggregateInputData aggr_input_data(aggregate.bind_info.get(), allocator);
aggregate.function.simple_update(start_of_input, aggr_input_data, payload_cnt,
gstate.state.aggregates[agg_idx].get(), payload_chunk.size());
}
// Update the aggregate state
auto start_of_input = payload_cnt ? &payload_chunk.data[0] : nullptr;
aggregate.function.simple_update(start_of_input, aggr_input_data, payload_cnt,
state.aggregates[agg_idx].get(), payload_chunk.size());
}
D_ASSERT(!gstate.finished);
gstate.finished = true;
}

TaskExecutionResult ExecuteTask(TaskExecutionMode mode) override {
AggregateDistinct();
event->FinishTask();
return TaskExecutionResult::TASK_FINISHED;
}

private:
shared_ptr<Event> event;
UngroupedAggregateGlobalState &gstate;
ClientContext &context;
const PhysicalUngroupedAggregate &op;
ArenaAllocator allocator;
};
// After scanning the distinct HTs, we can combine the thread-local agg states with the thread-global
lock_guard<mutex> guard(finalize_event.lock);
for (idx_t distinct_idx = 0; distinct_idx < finalize_event.distinct_agg_idxs.size(); distinct_idx++) {
const auto &agg_idx = finalize_event.distinct_agg_idxs[distinct_idx];
auto &aggregate = aggregates[agg_idx]->Cast<BoundAggregateExpression>();
AggregateInputData aggr_input_data(aggregate.bind_info.get(), allocator);

// TODO: Create tasks and run these in parallel instead of doing this all in Schedule, single threaded
class UngroupedDistinctAggregateFinalizeEvent : public BasePipelineEvent {
public:
UngroupedDistinctAggregateFinalizeEvent(const PhysicalUngroupedAggregate &op_p,
UngroupedAggregateGlobalState &gstate_p, Pipeline &pipeline_p,
ClientContext &context)
: BasePipelineEvent(pipeline_p), op(op_p), gstate(gstate_p), context(context) {
Vector state_vec(Value::POINTER(CastPointerToValue(state.aggregates[agg_idx].get())));
Vector combined_vec(Value::POINTER(CastPointerToValue(gstate.state.aggregates[agg_idx].get())));
aggregate.function.combine(state_vec, combined_vec, aggr_input_data, 1);
}
const PhysicalUngroupedAggregate &op;
UngroupedAggregateGlobalState &gstate;
ClientContext &context;

public:
void Schedule() override {
vector<shared_ptr<Task>> tasks;
tasks.push_back(make_uniq<UngroupedDistinctAggregateFinalizeTask>(pipeline->executor, shared_from_this(),
gstate, context, op));
D_ASSERT(!tasks.empty());
SetTasks(std::move(tasks));
D_ASSERT(!gstate.finished);
if (++finalize_event.tasks_done == finalize_event.tasks_scheduled) {
gstate.finished = true;
}
};
}

SinkFinalizeType PhysicalUngroupedAggregate::FinalizeDistinct(Pipeline &pipeline, Event &event, ClientContext &context,
GlobalSinkState &gstate_p) const {
Expand All @@ -487,7 +546,7 @@ SinkFinalizeType PhysicalUngroupedAggregate::FinalizeDistinct(Pipeline &pipeline
auto &radix_state = *distinct_state.radix_states[table_idx];
radix_table_p->Finalize(context, radix_state);
}
auto new_event = make_shared<UngroupedDistinctAggregateFinalizeEvent>(*this, gstate, pipeline, context);
auto new_event = make_shared<UngroupedDistinctAggregateFinalizeEvent>(context, *this, gstate, pipeline);
event.InsertEvent(std::move(new_event));
return SinkFinalizeType::READY;
}
Expand Down
13 changes: 12 additions & 1 deletion src/include/duckdb/parallel/event.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class Task;

class Event : public std::enable_shared_from_this<Event> {
public:
Event(Executor &executor);
explicit Event(Executor &executor);
virtual ~Event() = default;

public:
Expand Down Expand Up @@ -52,6 +52,17 @@ class Event : public std::enable_shared_from_this<Event> {
virtual void PrintPipeline() {
}

template <class TARGET>
TARGET &Cast() {
D_ASSERT(dynamic_cast<TARGET *>(this));
return reinterpret_cast<TARGET &>(*this);
}
template <class TARGET>
const TARGET &Cast() const {
D_ASSERT(dynamic_cast<const TARGET *>(this));
return reinterpret_cast<const TARGET &>(*this);
}

protected:
Executor &executor;
//! The current threads working on the event
Expand Down

0 comments on commit e7a6a21

Please sign in to comment.