Skip to content

Commit

Permalink
[Dev] Fix an issue causing ExecuteTask to do much more work than inte…
Browse files Browse the repository at this point in the history
…nded (duckdb#14034)

This PR makes `ExecutePushInternal` respect the `max_chunks` set in the
call to `PipelineExecutor::Execute(idx_t max_chunks)`

When running DuckDB inside an environment where the control over
received signals (like SIGINT) is not instant (like Python or Postgres),
we rely on ExecuteTask to return after doing a small amount of
processing work.

In the DuckDB CLI this problem was not apparent because SIGINT is
handled and `Interrupt()` is called instantly, which is respected by the
execution.
This problem only appeared because the `interrupt` flag could not be set
as the application was first waiting for ExecuteTask to return.
  • Loading branch information
Mytherin authored Oct 7, 2024
2 parents 42df770 + ca88874 commit 92cf6f1
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 30 deletions.
4 changes: 4 additions & 0 deletions src/include/duckdb/execution/physical_operator.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -241,8 +241,12 @@ class CachingPhysicalOperator : public PhysicalOperator {
bool caching_supported;

public:
//! This Execute will prevent small chunks from entering the pipeline, buffering them until a bigger chunk is
//! created.
OperatorResultType Execute(ExecutionContext &context, DataChunk &input, DataChunk &chunk,
GlobalOperatorState &gstate, OperatorState &state) const final;
//! FinalExecute is used here to send out the remainder of the chunk (< STANDARD_VECTOR_SIZE) that we still had
//! cached.
OperatorFinalizeResultType FinalExecute(ExecutionContext &context, DataChunk &chunk, GlobalOperatorState &gstate,
OperatorState &state) const final;

Expand Down
30 changes: 24 additions & 6 deletions src/include/duckdb/parallel/pipeline_executor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,28 @@ enum class PipelineExecuteResult {
INTERRUPTED
};

class ExecutionBudget {
public:
explicit ExecutionBudget(idx_t maximum) : processed(0), maximum_to_process(maximum) {
}

public:
bool Next() {
if (IsDepleted()) {
return false;
}
processed++;
return true;
}
bool IsDepleted() const {
return processed >= maximum_to_process;
}

private:
idx_t processed;
idx_t maximum_to_process;
};

//! The Pipeline class represents an execution pipeline
class PipelineExecutor {
public:
Expand All @@ -43,10 +65,6 @@ class PipelineExecutor {
//! Returns true if execution is finished, false if Execute should be called again
PipelineExecuteResult Execute(idx_t max_chunks);

//! Push a single input DataChunk into the pipeline.
//! Returns either OperatorResultType::NEED_MORE_INPUT or OperatorResultType::FINISHED
//! If OperatorResultType::FINISHED is returned, more input will not change the result anymore
OperatorResultType ExecutePush(DataChunk &input);
//! Called after depleting the source: finalizes the execution of this pipeline executor
//! This should only be called once per PipelineExecutor.
PipelineExecuteResult PushFinalize();
Expand Down Expand Up @@ -128,7 +146,7 @@ class PipelineExecutor {
SourceResultType GetData(DataChunk &chunk, OperatorSourceInput &input);
SinkResultType Sink(DataChunk &chunk, OperatorSinkInput &input);

OperatorResultType ExecutePushInternal(DataChunk &input, idx_t initial_idx = 0);
OperatorResultType ExecutePushInternal(DataChunk &input, ExecutionBudget &chunk_budget, idx_t initial_idx = 0);
//! Pushes a chunk through the pipeline and returns a single result chunk
//! Returns whether or not a new input chunk is needed, or whether or not we are finished
OperatorResultType Execute(DataChunk &input, DataChunk &result, idx_t initial_index = 0);
Expand All @@ -138,7 +156,7 @@ class PipelineExecutor {

//! Tries to flush all state from intermediate operators. Will return true if all state is flushed, false in the
//! case of a blocked sink.
bool TryFlushCachingOperators();
bool TryFlushCachingOperators(ExecutionBudget &chunk_budget);

static bool CanCacheType(const LogicalType &type);
void CacheChunk(DataChunk &input, idx_t operator_idx);
Expand Down
69 changes: 45 additions & 24 deletions src/parallel/pipeline_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,17 @@ PipelineExecutor::PipelineExecutor(ClientContext &context_p, Pipeline &pipeline_
InitializeChunk(final_chunk);
}

bool PipelineExecutor::TryFlushCachingOperators() {
bool PipelineExecutor::TryFlushCachingOperators(ExecutionBudget &chunk_budget) {
if (!started_flushing) {
// Remainder of this method assumes any in process operators are from flushing
D_ASSERT(in_process_operators.empty());
started_flushing = true;
flushing_idx = IsFinished() ? idx_t(finished_processing_idx) : 0;
}

// Go over each operator and keep flushing them using `FinalExecute` until empty
// For each operator that supports FinalExecute,
// extract every chunk from it and push it through the rest of the pipeline
// before moving onto the next operators' FinalExecute
while (flushing_idx < pipeline.operators.size()) {
if (!pipeline.operators[flushing_idx].get().RequiresFinalExecute()) {
flushing_idx++;
Expand All @@ -76,7 +78,6 @@ bool PipelineExecutor::TryFlushCachingOperators() {
auto &current_operator = pipeline.operators[flushing_idx].get();

OperatorFinalizeResultType finalize_result;
OperatorResultType push_result;

if (in_process_operators.empty()) {
curr_chunk.Reset();
Expand All @@ -89,20 +90,34 @@ bool PipelineExecutor::TryFlushCachingOperators() {
finalize_result = OperatorFinalizeResultType::HAVE_MORE_OUTPUT;
}

push_result = ExecutePushInternal(curr_chunk, flushing_idx + 1);
auto push_result = ExecutePushInternal(curr_chunk, chunk_budget, flushing_idx + 1);

if (finalize_result == OperatorFinalizeResultType::HAVE_MORE_OUTPUT) {
should_flush_current_idx = true;
} else {
should_flush_current_idx = false;
}

if (push_result == OperatorResultType::BLOCKED) {
switch (push_result) {
case OperatorResultType::BLOCKED: {
remaining_sink_chunk = true;
return false;
} else if (push_result == OperatorResultType::FINISHED) {
}
case OperatorResultType::HAVE_MORE_OUTPUT: {
D_ASSERT(chunk_budget.IsDepleted());
// The chunk budget was used up, pushing the chunk through the pipeline created more chunks
// we need to continue this the next time Execute is called.
return false;
}
case OperatorResultType::NEED_MORE_INPUT:
continue;
case OperatorResultType::FINISHED:
break;
default:
throw InternalException("Unexpected OperatorResultType (%s) in TryFlushCachingOperators",
EnumUtil::ToString(push_result));
}
break;
}
return true;
}
Expand Down Expand Up @@ -168,7 +183,8 @@ SinkNextBatchType PipelineExecutor::NextBatch(duckdb::DataChunk &source_chunk) {
PipelineExecuteResult PipelineExecutor::Execute(idx_t max_chunks) {
D_ASSERT(pipeline.sink);
auto &source_chunk = pipeline.operators.empty() ? final_chunk : *intermediate_chunks[0];
for (idx_t i = 0; i < max_chunks; i++) {
ExecutionBudget chunk_budget(max_chunks);
do {
if (context.client.interrupted) {
throw InterruptException();
}
Expand All @@ -179,22 +195,27 @@ PipelineExecuteResult PipelineExecutor::Execute(idx_t max_chunks) {
break;
} else if (remaining_sink_chunk) {
// The pipeline was interrupted by the Sink. We should retry sinking the final chunk.
result = ExecutePushInternal(final_chunk);
result = ExecutePushInternal(final_chunk, chunk_budget);
D_ASSERT(result != OperatorResultType::HAVE_MORE_OUTPUT);
remaining_sink_chunk = false;
} else if (!in_process_operators.empty() && !started_flushing) {
// The pipeline was interrupted by the Sink when pushing a source chunk through the pipeline. We need to
// re-push the same source chunk through the pipeline because there are in_process operators, meaning that
// the result for the pipeline
// Operator(s) in the pipeline have returned `HAVE_MORE_OUTPUT` in the last Execute call
// the operators have to be called with the same input chunk to produce the rest of the output
D_ASSERT(source_chunk.size() > 0);
result = ExecutePushInternal(source_chunk);
result = ExecutePushInternal(source_chunk, chunk_budget);
} else if (exhausted_source && !next_batch_blocked && !done_flushing) {
// The source was exhausted, try flushing all operators
auto flush_completed = TryFlushCachingOperators();
auto flush_completed = TryFlushCachingOperators(chunk_budget);
if (flush_completed) {
done_flushing = true;
break;
} else {
return PipelineExecuteResult::INTERRUPTED;
if (remaining_sink_chunk) {
return PipelineExecuteResult::INTERRUPTED;
} else {
D_ASSERT(chunk_budget.IsDepleted());
return PipelineExecuteResult::NOT_FINISHED;
}
}
} else if (!exhausted_source || next_batch_blocked) {
SourceResultType source_result;
Expand Down Expand Up @@ -223,7 +244,7 @@ PipelineExecuteResult PipelineExecutor::Execute(idx_t max_chunks) {
continue;
}

result = ExecutePushInternal(source_chunk);
result = ExecutePushInternal(source_chunk, chunk_budget);
} else {
throw InternalException("Unexpected state reached in pipeline executor");
}
Expand All @@ -237,7 +258,7 @@ PipelineExecuteResult PipelineExecutor::Execute(idx_t max_chunks) {
if (result == OperatorResultType::FINISHED) {
break;
}
}
} while (chunk_budget.Next());

if ((!exhausted_source || !done_flushing) && !IsFinished()) {
return PipelineExecuteResult::NOT_FINISHED;
Expand All @@ -254,10 +275,6 @@ PipelineExecuteResult PipelineExecutor::Execute() {
return Execute(NumericLimits<idx_t>::Maximum());
}

OperatorResultType PipelineExecutor::ExecutePush(DataChunk &input) { // LCOV_EXCL_START
return ExecutePushInternal(input);
} // LCOV_EXCL_STOP

void PipelineExecutor::FinishProcessing(int32_t operator_idx) {
finished_processing_idx = operator_idx < 0 ? NumericLimits<int32_t>::Maximum() : operator_idx;
in_process_operators = stack<idx_t>();
Expand All @@ -278,7 +295,8 @@ bool PipelineExecutor::IsFinished() {
return finished_processing_idx >= 0;
}

OperatorResultType PipelineExecutor::ExecutePushInternal(DataChunk &input, idx_t initial_idx) {
OperatorResultType PipelineExecutor::ExecutePushInternal(DataChunk &input, ExecutionBudget &chunk_budget,
idx_t initial_idx) {
D_ASSERT(pipeline.sink);
if (input.size() == 0) { // LCOV_EXCL_START
return OperatorResultType::NEED_MORE_INPUT;
Expand All @@ -287,11 +305,13 @@ OperatorResultType PipelineExecutor::ExecutePushInternal(DataChunk &input, idx_t
// this loop will continuously push the input chunk through the pipeline as long as:
// - the OperatorResultType for the Execute is HAVE_MORE_OUTPUT
// - the Sink doesn't block
while (true) {
OperatorResultType result;
// - the ExecutionBudget has not been depleted
OperatorResultType result = OperatorResultType::HAVE_MORE_OUTPUT;
do {
// Note: if input is the final_chunk, we don't do any executing, the chunk just needs to be sinked
if (&input != &final_chunk) {
final_chunk.Reset();
// Execute and put the result into 'final_chunk'
result = Execute(input, final_chunk, initial_idx);
if (result == OperatorResultType::FINISHED) {
return OperatorResultType::FINISHED;
Expand Down Expand Up @@ -320,7 +340,8 @@ OperatorResultType PipelineExecutor::ExecutePushInternal(DataChunk &input, idx_t
if (result == OperatorResultType::NEED_MORE_INPUT) {
return OperatorResultType::NEED_MORE_INPUT;
}
}
} while (chunk_budget.Next());
return result;
}

PipelineExecuteResult PipelineExecutor::PushFinalize() {
Expand Down
29 changes: 29 additions & 0 deletions test/api/test_api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,35 @@ TEST_CASE("Test fetch API with big results", "[api][.]") {
VerifyStreamResult(std::move(result));
}

TEST_CASE("Test TryFlushCachingOperators interrupted ExecutePushInternal", "[api][.]") {
DuckDB db;
Connection con(db);

con.Query("create table tbl as select 100000 a from range(2) t(a);");
con.Query("pragma threads=1");

// Use PhysicalCrossProduct with a very low amount of produced tuples, this caches the result in the
// CachingOperatorState This gets flushed with FinalExecute in PipelineExecutor::TryFlushCachingOperator
auto pending_query = con.PendingQuery("select unnest(range(a.a)) from tbl a, tbl b;");

// Through `unnest(range(a.a.))` this FinalExecute multiple chunks, more than the ExecutionBudget can handle with
// PROCESS_PARTIAL
pending_query->ExecuteTask();

// query the connection as normal after
auto res = pending_query->Execute();
REQUIRE(!res->HasError());
auto &materialized_res = res->Cast<MaterializedQueryResult>();
idx_t initial_tuples = 2 * 2;
REQUIRE(materialized_res.RowCount() == initial_tuples * 100000);
for (idx_t i = 0; i < initial_tuples; i++) {
for (idx_t j = 0; j < 100000; j++) {
auto value = static_cast<idx_t>(materialized_res.GetValue<int64_t>(0, (i * 100000) + j));
REQUIRE(value == j);
}
}
}

TEST_CASE("Test streaming query during stack unwinding", "[api]") {
DuckDB db;
Connection con(db);
Expand Down

0 comments on commit 92cf6f1

Please sign in to comment.