fix(native-pos): Move MaterializedOutputBuffer creation under task mutex to prevent pool name collision#27900
Conversation
Reviewer's GuideMoves MaterializedOutputBuffer creation from plan conversion into task creation under the task mutex, and introduces a process-wide registry keyed by taskId so operators can look up buffers at runtime, preventing concurrent pool name collisions when createTask is retried or duplicated. Sequence diagram for MaterializedOutputBuffer creation and lookup under task mutexsequenceDiagram
actor Coordinator
participant TaskResource
participant TaskManager
participant PrestoTask
participant ExecTask as exec::Task
participant MOBClass as MaterializedOutputBuffer
participant MaterializedOutputOp as MaterializedOutput
Coordinator->>TaskResource: createOrUpdateBatchTask()
TaskResource->>TaskManager: createOrUpdateTaskImpl(planFragment)
TaskManager->>TaskManager: toVeloxQueryPlan()
TaskManager-->>TaskManager: MaterializedOutputNode(shuffleWriterInfo,
TaskManager-->>TaskManager: shuffleWriterFactoryName)
TaskManager->>PrestoTask: lock mutex
alt prestoTask->task == nullptr
TaskManager->>ExecTask: exec::Task::create()
TaskManager->>MOBClass: MaterializedOutputBuffer::MaterializedOutputBuffer(
TaskManager-->>MOBClass: numPartitions,
TaskManager-->>MOBClass: shuffleWriterInfo,
TaskManager-->>MOBClass: shuffleWriterFactory,
TaskManager-->>MOBClass: taskId,
TaskManager-->>MOBClass: pool)
TaskManager->>MOBClass: registerBuffer(taskId, buffer)
TaskManager-->>PrestoTask: prestoTask->task = newExecTask
else prestoTask->task != nullptr
TaskManager-->>PrestoTask: skip exec::Task::create()
end
PrestoTask-->>TaskManager: unlock mutex
loop Operator construction
MaterializedOutputOp->>MOBClass: getBuffer(ctx->task->taskId())
MOBClass-->>MaterializedOutputOp: MaterializedOutputBuffer*
end
MaterializedOutputOp->>MOBClass: ~MaterializedOutputBuffer()
MOBClass->>MOBClass: removeBuffer(taskId)
File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
…tex to prevent pool name collision (prestodb#27900) Summary: ## Problem In extremely rare cases (0.024%, ~4 out of 17,000 queries), when the coordinator retries or duplicates a `createTask` HTTP request, two concurrent requests race through `toVeloxQueryPlan()` — each creating a `MaterializedOutputBuffer` with the same pool name under the shared query root pool. The second call crashes with `"pool already exists"`. ``` TaskResource::createOrUpdateBatchTask() | +----------------------------------+ | | Request A Request B | | toVeloxQueryPlan() toVeloxQueryPlan() | | new MaterializedOutputBuffer() new MaterializedOutputBuffer() | | pool->addLeafChild("mob.taskId") pool->addLeafChild("mob.taskId") | | OK CRASH: "already exists" | | +----------------------------------+ | createOrUpdateTaskImpl() | prestoTask->mutex <-- too late | if (prestoTask->task == nullptr) | exec::Task::create() ``` The `prestoTask->mutex` deduplicates `exec::Task::create()`, but the pool collision happens before that — during plan conversion. ## Fix Move `MaterializedOutputBuffer` creation from `toVeloxQueryPlan()` into `createOrUpdateTaskImpl()` under the `prestoTask->mutex`, after `exec::Task::create()`. The plan node (`MaterializedOutputNode`) now carries the shuffle factory params instead of the buffer itself. Operators look up the buffer by `taskId` from a process-wide registry at construction time. ``` TaskResource::createOrUpdateBatchTask() | +----------------------------------+ | | Request A Request B | | toVeloxQueryPlan() toVeloxQueryPlan() | | MaterializedOutputNode(params) MaterializedOutputNode(params) (no pool created) (no pool created) | | +----------------------------------+ | createOrUpdateTaskImpl() | prestoTask->mutex | if (prestoTask->task == nullptr) | | exec::Task::create() (Request B skipped) | new MaterializedOutputBuffer() | registerBuffer(taskId, buffer) | OK ``` ## Design This follows the same pattern as Velox `OutputBufferManager`, which is the 1:1 streaming equivalent of `MaterializedOutputBuffer`: - `OutputBufferManager` uses a process-wide `folly::Synchronized<unordered_map>` to store output buffers keyed by `taskId` - Buffer creation happens inside `Task::start()` -> `initializePartitionOutput()` -> `OutputBufferManager::initializeTask()`, after task dedup - Operators look up the buffer by `taskId` at runtime `MaterializedOutputBuffer` now follows the same lifecycle: | Step | OutputBufferManager | MaterializedOutputBuffer | |------|-------------------|------------------------| | Registry | `folly::Synchronized<map>` | `folly::Synchronized<map>` | | Creation | `Task::start()` under task lifecycle | `createOrUpdateTaskImpl()` under `prestoTask->mutex` | | Lookup | `OutputBufferManager::getBuffer(taskId)` | `MaterializedOutputBuffer::getBuffer(taskId)` | | Cleanup | `OutputBufferManager::removeTask(taskId)` | `MaterializedOutputBuffer::removeBuffer(taskId)` (destructor) | Differential Revision: D106850939
9753ce7 to
7544325
Compare
There was a problem hiding this comment.
Hey - I've found 3 issues, and left some high level feedback:
- In TaskManager::createOrUpdateTaskImpl(), consider restoring a VELOX_CHECK_NOT_NULL (or equivalent) on the result of ShuffleInterfaceFactory::factory() before passing it into MaterializedOutputBuffer to avoid a null dereference if the factory is not registered at runtime.
- MaterializedOutputBuffer::registerBuffer() currently uses emplace without checking for an existing entry; adding a DCHECK/VELOX_CHECK that the taskId is not already present would make violations of the "created under prestoTask->mutex" invariant fail fast and easier to debug.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- In TaskManager::createOrUpdateTaskImpl(), consider restoring a VELOX_CHECK_NOT_NULL (or equivalent) on the result of ShuffleInterfaceFactory::factory() before passing it into MaterializedOutputBuffer to avoid a null dereference if the factory is not registered at runtime.
- MaterializedOutputBuffer::registerBuffer() currently uses emplace without checking for an existing entry; adding a DCHECK/VELOX_CHECK that the taskId is not already present would make violations of the "created under prestoTask->mutex" invariant fail fast and easier to debug.
## Individual Comments
### Comment 1
<location path="presto-native-execution/presto_cpp/main/TaskManager.cpp" line_range="597-599" />
<code_context>
+ if (auto* materializedOutputNode = dynamic_cast<
+ const operators::MaterializedOutputNode*>(
+ planFragment.planNode.get())) {
+ auto* shuffleFactory =
+ operators::ShuffleInterfaceFactory::factory(
+ materializedOutputNode->shuffleWriterFactoryName());
+ auto buffer = std::make_shared<operators::MaterializedOutputBuffer>(
+ materializedOutputNode->numPartitions(),
</code_context>
<issue_to_address>
**issue (bug_risk):** Handle null ShuffleInterfaceFactory when creating MaterializedOutputBuffer to avoid crashes on misconfiguration or missing registration.
In this path, `shuffleFactory` may be null if `shuffleWriterFactoryName()` is empty or the factory is not registered, but it’s passed directly into `MaterializedOutputBuffer` without validation. Add a `VELOX_CHECK_NOT_NULL` (or equivalent) with a clear error message, and consider explicitly checking that `shuffleWriterFactoryName()` is non-empty before constructing the buffer.
</issue_to_address>
### Comment 2
<location path="presto-native-execution/presto_cpp/main/operators/MaterializedOutput.cpp" line_range="114-115" />
<code_context>
/*localExchange=*/false)),
replicateNullsAndAny_(planNode->isReplicateNullsAndAny()),
- buffer_(planNode->buffer()),
+ buffer_(
+ MaterializedOutputBuffer::getBuffer(ctx->task->taskId()).get()),
targetSizeInBytes_(
std::clamp(
</code_context>
<issue_to_address>
**issue (bug_risk):** Guard against a missing MaterializedOutputBuffer in the global registry to prevent null dereferences in the operator.
If `MaterializedOutputBuffer::getBuffer(taskId)` returns `nullptr` (e.g., race, failed registration, config mismatch), `buffer_` becomes a null raw pointer and any use will crash. Since this is a hard dependency, consider asserting the invariant with `VELOX_CHECK_NOT_NULL` (or similar) on the returned shared_ptr before calling `.get()` so we fail fast with a clear error when the registry and task construction are out of sync.
</issue_to_address>
### Comment 3
<location path="presto-native-execution/presto_cpp/main/operators/MaterializedOutputBuffer.cpp" line_range="44-48" />
<code_context>
+ std::unordered_map<std::string, std::shared_ptr<MaterializedOutputBuffer>>>
+ MaterializedOutputBuffer::buffers_;
+
+void MaterializedOutputBuffer::registerBuffer(
+ const std::string& taskId,
+ std::shared_ptr<MaterializedOutputBuffer> buffer) {
+ buffers_.withLock(
+ [&](auto& buffers) { buffers.emplace(taskId, std::move(buffer)); });
+}
+
</code_context>
<issue_to_address>
**suggestion (bug_risk):** Clarify handling of duplicate taskId registrations in the global buffer map.
The comment above says this runs under `prestoTask->mutex` to prevent duplicate creation, but here we use `emplace` and silently ignore duplicate keys. If that mutex invariant is broken or a task is retried with the same id, a stale buffer could remain registered with no signal. Consider checking the `emplace` result and failing fast (e.g., `VELOX_CHECK`) or at least logging a warning so duplicate registrations and lifecycle/concurrency issues are easier to detect.
Suggested implementation:
```cpp
void MaterializedOutputBuffer::registerBuffer(
const std::string& taskId,
std::shared_ptr<MaterializedOutputBuffer> buffer) {
buffers_.withLock([&](auto& buffers) {
auto [it, inserted] = buffers.emplace(taskId, std::move(buffer));
VELOX_CHECK(
inserted,
"Duplicate MaterializedOutputBuffer registration for taskId {}",
taskId);
});
```
1. Ensure the file includes whatever header defines `VELOX_CHECK` (commonly `velox/common/base/VeloxException.h`) if it is not already included at the top of `MaterializedOutputBuffer.cpp`.
2. If the project prefers logging-and-continue over hard checks in this code path, replace `VELOX_CHECK` with an appropriate logging macro (e.g., `LOG(WARNING) << ...`) while still keeping the `inserted` check.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| auto* shuffleFactory = | ||
| operators::ShuffleInterfaceFactory::factory( | ||
| materializedOutputNode->shuffleWriterFactoryName()); |
There was a problem hiding this comment.
issue (bug_risk): Handle null ShuffleInterfaceFactory when creating MaterializedOutputBuffer to avoid crashes on misconfiguration or missing registration.
In this path, shuffleFactory may be null if shuffleWriterFactoryName() is empty or the factory is not registered, but it’s passed directly into MaterializedOutputBuffer without validation. Add a VELOX_CHECK_NOT_NULL (or equivalent) with a clear error message, and consider explicitly checking that shuffleWriterFactoryName() is non-empty before constructing the buffer.
| buffer_( | ||
| MaterializedOutputBuffer::getBuffer(ctx->task->taskId()).get()), |
There was a problem hiding this comment.
issue (bug_risk): Guard against a missing MaterializedOutputBuffer in the global registry to prevent null dereferences in the operator.
If MaterializedOutputBuffer::getBuffer(taskId) returns nullptr (e.g., race, failed registration, config mismatch), buffer_ becomes a null raw pointer and any use will crash. Since this is a hard dependency, consider asserting the invariant with VELOX_CHECK_NOT_NULL (or similar) on the returned shared_ptr before calling .get() so we fail fast with a clear error when the registry and task construction are out of sync.
| void MaterializedOutputBuffer::registerBuffer( | ||
| const std::string& taskId, | ||
| std::shared_ptr<MaterializedOutputBuffer> buffer) { | ||
| buffers_.withLock( | ||
| [&](auto& buffers) { buffers.emplace(taskId, std::move(buffer)); }); |
There was a problem hiding this comment.
suggestion (bug_risk): Clarify handling of duplicate taskId registrations in the global buffer map.
The comment above says this runs under prestoTask->mutex to prevent duplicate creation, but here we use emplace and silently ignore duplicate keys. If that mutex invariant is broken or a task is retried with the same id, a stale buffer could remain registered with no signal. Consider checking the emplace result and failing fast (e.g., VELOX_CHECK) or at least logging a warning so duplicate registrations and lifecycle/concurrency issues are easier to detect.
Suggested implementation:
void MaterializedOutputBuffer::registerBuffer(
const std::string& taskId,
std::shared_ptr<MaterializedOutputBuffer> buffer) {
buffers_.withLock([&](auto& buffers) {
auto [it, inserted] = buffers.emplace(taskId, std::move(buffer));
VELOX_CHECK(
inserted,
"Duplicate MaterializedOutputBuffer registration for taskId {}",
taskId);
});
- Ensure the file includes whatever header defines
VELOX_CHECK(commonlyvelox/common/base/VeloxException.h) if it is not already included at the top ofMaterializedOutputBuffer.cpp. - If the project prefers logging-and-continue over hard checks in this code path, replace
VELOX_CHECKwith an appropriate logging macro (e.g.,LOG(WARNING) << ...) while still keeping theinsertedcheck.
…tex to prevent pool name collision (prestodb#27900) Summary: ## Problem In extremely rare cases (0.024%, ~4 out of 17,000 queries), when the coordinator retries or duplicates a `createTask` HTTP request, two concurrent requests race through `toVeloxQueryPlan()` — each creating a `MaterializedOutputBuffer` with the same pool name under the shared query root pool. The second call crashes with `"pool already exists"`. ``` TaskResource::createOrUpdateBatchTask() | +----------------------------------+ | | Request A Request B | | toVeloxQueryPlan() toVeloxQueryPlan() | | new MaterializedOutputBuffer() new MaterializedOutputBuffer() | | pool->addLeafChild("mob.taskId") pool->addLeafChild("mob.taskId") | | OK CRASH: "already exists" | | +----------------------------------+ | createOrUpdateTaskImpl() | prestoTask->mutex <-- too late | if (prestoTask->task == nullptr) | exec::Task::create() ``` The `prestoTask->mutex` deduplicates `exec::Task::create()`, but the pool collision happens before that — during plan conversion. ## Fix Move `MaterializedOutputBuffer` creation from `toVeloxQueryPlan()` into `createOrUpdateTaskImpl()` under the `prestoTask->mutex`, after `exec::Task::create()`. The plan node (`MaterializedOutputNode`) now carries the shuffle factory params instead of the buffer itself. Operators look up the buffer by `taskId` from a process-wide registry at construction time. ``` TaskResource::createOrUpdateBatchTask() | +----------------------------------+ | | Request A Request B | | toVeloxQueryPlan() toVeloxQueryPlan() | | MaterializedOutputNode(params) MaterializedOutputNode(params) (no pool created) (no pool created) | | +----------------------------------+ | createOrUpdateTaskImpl() | prestoTask->mutex | if (prestoTask->task == nullptr) | | exec::Task::create() (Request B skipped) | new MaterializedOutputBuffer() | registerBuffer(taskId, buffer) | OK ``` ## Design This follows the same pattern as Velox `OutputBufferManager`, which is the 1:1 streaming equivalent of `MaterializedOutputBuffer`: - `OutputBufferManager` uses a process-wide `folly::Synchronized<unordered_map>` to store output buffers keyed by `taskId` - Buffer creation happens inside `Task::start()` -> `initializePartitionOutput()` -> `OutputBufferManager::initializeTask()`, after task dedup - Operators look up the buffer by `taskId` at runtime `MaterializedOutputBuffer` now follows the same lifecycle: | Step | OutputBufferManager | MaterializedOutputBuffer | |------|-------------------|------------------------| | Registry | `folly::Synchronized<map>` | `folly::Synchronized<map>` | | Creation | `Task::start()` under task lifecycle | `createOrUpdateTaskImpl()` under `prestoTask->mutex` | | Lookup | `OutputBufferManager::getBuffer(taskId)` | `MaterializedOutputBuffer::getBuffer(taskId)` | | Cleanup | `OutputBufferManager::removeTask(taskId)` | `MaterializedOutputBuffer::removeBuffer(taskId)` (destructor) | Differential Revision: D106850939
7544325 to
f8807d1
Compare
…tex to prevent pool name collision (prestodb#27900) Summary: ## Problem In extremely rare cases (0.024%, ~4 out of 17,000 queries), when the coordinator retries or duplicates a `createTask` HTTP request, two concurrent requests race through `toVeloxQueryPlan()` — each creating a `MaterializedOutputBuffer` with the same pool name under the shared query root pool. The second call crashes with `"pool already exists"`. ``` TaskResource::createOrUpdateBatchTask() | +----------------------------------+ | | Request A Request B | | toVeloxQueryPlan() toVeloxQueryPlan() | | new MaterializedOutputBuffer() new MaterializedOutputBuffer() | | pool->addLeafChild("mob.taskId") pool->addLeafChild("mob.taskId") | | OK CRASH: "already exists" | | +----------------------------------+ | createOrUpdateTaskImpl() | prestoTask->mutex <-- too late | if (prestoTask->task == nullptr) | exec::Task::create() ``` The `prestoTask->mutex` deduplicates `exec::Task::create()`, but the pool collision happens before that — during plan conversion. ## Fix Move `MaterializedOutputBuffer` creation from `toVeloxQueryPlan()` into `createOrUpdateTaskImpl()` under the `prestoTask->mutex`, after `exec::Task::create()`. The plan node (`MaterializedOutputNode`) now carries the shuffle factory params instead of the buffer itself. Operators look up the buffer by `taskId` from a process-wide registry at construction time. ``` TaskResource::createOrUpdateBatchTask() | +----------------------------------+ | | Request A Request B | | toVeloxQueryPlan() toVeloxQueryPlan() | | MaterializedOutputNode(params) MaterializedOutputNode(params) (no pool created) (no pool created) | | +----------------------------------+ | createOrUpdateTaskImpl() | prestoTask->mutex | if (prestoTask->task == nullptr) | | exec::Task::create() (Request B skipped) | new MaterializedOutputBuffer() | registerBuffer(taskId, buffer) | OK ``` ## Design This follows the same pattern as Velox `OutputBufferManager`, which is the 1:1 streaming equivalent of `MaterializedOutputBuffer`: - `OutputBufferManager` uses a process-wide `folly::Synchronized<unordered_map>` to store output buffers keyed by `taskId` - Buffer creation happens inside `Task::start()` -> `initializePartitionOutput()` -> `OutputBufferManager::initializeTask()`, after task dedup - Operators look up the buffer by `taskId` at runtime `MaterializedOutputBuffer` now follows the same lifecycle: | Step | OutputBufferManager | MaterializedOutputBuffer | |------|-------------------|------------------------| | Registry | `folly::Synchronized<map>` | `folly::Synchronized<map>` | | Creation | `Task::start()` under task lifecycle | `createOrUpdateTaskImpl()` under `prestoTask->mutex` | | Lookup | `OutputBufferManager::getBuffer(taskId)` | `MaterializedOutputBuffer::getBuffer(taskId)` | | Cleanup | `OutputBufferManager::removeTask(taskId)` | `MaterializedOutputBuffer::removeBuffer(taskId)` (destructor) | Differential Revision: D106850939
f8807d1 to
1637974
Compare
…tex to prevent pool double creation (prestodb#27900) Summary: ## Problem In extremely rare cases (0.024%, ~4 out of 17,000 queries), when the coordinator retries or duplicates a `createTask` HTTP request, two concurrent requests race through `toVeloxQueryPlan()` — each creating a `MaterializedOutputBuffer` with the same pool name under the shared query root pool. The second call crashes with `"pool already exists"`. ``` TaskResource::createOrUpdateBatchTask() | +----------------------------------+ | | Request A Request B | | toVeloxQueryPlan() toVeloxQueryPlan() | | new MaterializedOutputBuffer() new MaterializedOutputBuffer() | | pool->addLeafChild("mob.taskId") pool->addLeafChild("mob.taskId") | | OK CRASH: "already exists" | | +----------------------------------+ | createOrUpdateTaskImpl() | prestoTask->mutex <-- too late | if (prestoTask->task == nullptr) | exec::Task::create() ``` The `prestoTask->mutex` deduplicates `exec::Task::create()`, but the pool collision happens before that — during plan conversion. ## Fix Move `MaterializedOutputBuffer` creation from `toVeloxQueryPlan()` into `createOrUpdateTaskImpl()` under the `prestoTask->mutex`, after `exec::Task::create()`. The plan node (`MaterializedOutputNode`) now carries the shuffle factory params instead of the buffer itself. Operators look up the buffer by `taskId` from a process-wide registry at construction time. ``` TaskResource::createOrUpdateBatchTask() | +----------------------------------+ | | Request A Request B | | toVeloxQueryPlan() toVeloxQueryPlan() | | MaterializedOutputNode(params) MaterializedOutputNode(params) (no pool created) (no pool created) | | +----------------------------------+ | createOrUpdateTaskImpl() | prestoTask->mutex | if (prestoTask->task == nullptr) | | exec::Task::create() (Request B skipped) | new MaterializedOutputBuffer() | registerBuffer(taskId, buffer) | OK ``` ## Design This follows the same pattern as Velox `OutputBufferManager`, which is the 1:1 streaming equivalent of `MaterializedOutputBuffer`: - `OutputBufferManager` uses a process-wide `folly::Synchronized<unordered_map>` to store output buffers keyed by `taskId` - Buffer creation happens inside `Task::start()` -> `initializePartitionOutput()` -> `OutputBufferManager::initializeTask()`, after task dedup - Operators look up the buffer by `taskId` at runtime `MaterializedOutputBuffer` now follows the same lifecycle: | Step | OutputBufferManager | MaterializedOutputBuffer | |------|-------------------|------------------------| | Registry | `folly::Synchronized<map>` | `folly::Synchronized<map>` | | Creation | `Task::start()` under task lifecycle | `createOrUpdateTaskImpl()` under `prestoTask->mutex` | | Lookup | `OutputBufferManager::getBuffer(taskId)` | `MaterializedOutputBuffer::getBuffer(taskId)` | | Cleanup | `OutputBufferManager::removeTask(taskId)` | `MaterializedOutputBuffer::removeBuffer(taskId)` (destructor) | Differential Revision: D106850939
1637974 to
e774b70
Compare
…tex to prevent pool double creation (prestodb#27900) Summary: ## Problem In extremely rare cases (0.024%, ~4 out of 17,000 queries), when the coordinator retries or duplicates a `createTask` HTTP request, two concurrent requests race through `toVeloxQueryPlan()` — each creating a `MaterializedOutputBuffer` with the same pool name under the shared query root pool. The second call crashes with `"pool already exists"`. ``` TaskResource::createOrUpdateBatchTask() | +----------------------------------+ | | Request A Request B | | toVeloxQueryPlan() toVeloxQueryPlan() | | new MaterializedOutputBuffer() new MaterializedOutputBuffer() | | pool->addLeafChild("mob.taskId") pool->addLeafChild("mob.taskId") | | OK CRASH: "already exists" | | +----------------------------------+ | createOrUpdateTaskImpl() | prestoTask->mutex <-- too late | if (prestoTask->task == nullptr) | exec::Task::create() ``` The `prestoTask->mutex` deduplicates `exec::Task::create()`, but the pool collision happens before that — during plan conversion. ## Fix Move `MaterializedOutputBuffer` creation from `toVeloxQueryPlan()` into `createOrUpdateTaskImpl()` under the `prestoTask->mutex`, after `exec::Task::create()`. The plan node (`MaterializedOutputNode`) now carries the shuffle factory params instead of the buffer itself. Operators look up the buffer by `taskId` from a process-wide registry at construction time. ``` TaskResource::createOrUpdateBatchTask() | +----------------------------------+ | | Request A Request B | | toVeloxQueryPlan() toVeloxQueryPlan() | | MaterializedOutputNode(params) MaterializedOutputNode(params) (no pool created) (no pool created) | | +----------------------------------+ | createOrUpdateTaskImpl() | prestoTask->mutex | if (prestoTask->task == nullptr) | | exec::Task::create() (Request B skipped) | new MaterializedOutputBuffer() | registerBuffer(taskId, buffer) | OK ``` ## Design This follows the same pattern as Velox `OutputBufferManager`, which is the 1:1 streaming equivalent of `MaterializedOutputBuffer`: - `OutputBufferManager` uses a process-wide `folly::Synchronized<unordered_map>` to store output buffers keyed by `taskId` - Buffer creation happens inside `Task::start()` -> `initializePartitionOutput()` -> `OutputBufferManager::initializeTask()`, after task dedup - Operators look up the buffer by `taskId` at runtime `MaterializedOutputBuffer` now follows the same lifecycle: | Step | OutputBufferManager | MaterializedOutputBuffer | |------|-------------------|------------------------| | Registry | `folly::Synchronized<map>` | `folly::Synchronized<map>` | | Creation | `Task::start()` under task lifecycle | `createOrUpdateTaskImpl()` under `prestoTask->mutex` | | Lookup | `OutputBufferManager::getBuffer(taskId)` | `MaterializedOutputBuffer::getBuffer(taskId)` | | Cleanup | `OutputBufferManager::removeTask(taskId)` | `MaterializedOutputBuffer::removeBuffer(taskId)` (destructor) | Differential Revision: D106850939
e774b70 to
ae4598a
Compare
…tex to prevent pool double creation (prestodb#27900) Summary: ## Problem In extremely rare cases (0.024%, ~4 out of 17,000 queries), when the coordinator retries or duplicates a `createTask` HTTP request, two concurrent requests race through `toVeloxQueryPlan()` — each creating a `MaterializedOutputBuffer` with the same pool name under the shared query root pool. The second call crashes with `"pool already exists"`. ``` TaskResource::createOrUpdateBatchTask() | +----------------------------------+ | | Request A Request B | | toVeloxQueryPlan() toVeloxQueryPlan() | | new MaterializedOutputBuffer() new MaterializedOutputBuffer() | | pool->addLeafChild("mob.taskId") pool->addLeafChild("mob.taskId") | | OK CRASH: "already exists" | | +----------------------------------+ | createOrUpdateTaskImpl() | prestoTask->mutex <-- too late | if (prestoTask->task == nullptr) | exec::Task::create() ``` The `prestoTask->mutex` deduplicates `exec::Task::create()`, but the pool collision happens before that — during plan conversion. ## Fix Move `MaterializedOutputBuffer` creation from `toVeloxQueryPlan()` into `createOrUpdateTaskImpl()` under the `prestoTask->mutex`, after `exec::Task::create()`. The plan node (`MaterializedOutputNode`) now carries the shuffle factory params instead of the buffer itself. Operators look up the buffer by `taskId` from a process-wide registry at construction time. ``` TaskResource::createOrUpdateBatchTask() | +----------------------------------+ | | Request A Request B | | toVeloxQueryPlan() toVeloxQueryPlan() | | MaterializedOutputNode(params) MaterializedOutputNode(params) (no pool created) (no pool created) | | +----------------------------------+ | createOrUpdateTaskImpl() | prestoTask->mutex | if (prestoTask->task == nullptr) | | exec::Task::create() (Request B skipped) | new MaterializedOutputBuffer() | registerBuffer(taskId, buffer) | OK ``` ## Design This follows the same pattern as Velox `OutputBufferManager`, which is the 1:1 streaming equivalent of `MaterializedOutputBuffer`: - `OutputBufferManager` uses a process-wide `folly::Synchronized<unordered_map>` to store output buffers keyed by `taskId` - Buffer creation happens inside `Task::start()` -> `initializePartitionOutput()` -> `OutputBufferManager::initializeTask()`, after task dedup - Operators look up the buffer by `taskId` at runtime `MaterializedOutputBuffer` now follows the same lifecycle: | Step | OutputBufferManager | MaterializedOutputBuffer | |------|-------------------|------------------------| | Registry | `folly::Synchronized<map>` | `folly::Synchronized<map>` | | Creation | `Task::start()` under task lifecycle | `createOrUpdateTaskImpl()` under `prestoTask->mutex` | | Lookup | `OutputBufferManager::getBuffer(taskId)` | `MaterializedOutputBuffer::getBuffer(taskId)` | | Cleanup | `OutputBufferManager::removeTask(taskId)` | `MaterializedOutputBuffer::removeBuffer(taskId)` (destructor) | Differential Revision: D106850939
ae4598a to
d15a042
Compare
…tex to prevent pool name collision
Summary:
## Problem
In extremely rare cases (0.024%, ~4 out of 17,000 queries), when the coordinator retries or duplicates a `createTask` HTTP request, two concurrent requests race through `toVeloxQueryPlan()` — each creating a `MaterializedOutputBuffer` with the same pool name under the shared query root pool. The second call crashes with `"pool already exists"`.
```
TaskResource::createOrUpdateBatchTask()
|
+----------------------------------+
| |
Request A Request B
| |
toVeloxQueryPlan() toVeloxQueryPlan()
| |
new MaterializedOutputBuffer() new MaterializedOutputBuffer()
| |
pool->addLeafChild("mob.taskId") pool->addLeafChild("mob.taskId")
| |
OK CRASH: "already exists"
| |
+----------------------------------+
|
createOrUpdateTaskImpl()
|
prestoTask->mutex <-- too late
|
if (prestoTask->task == nullptr)
|
exec::Task::create()
```
The `prestoTask->mutex` deduplicates `exec::Task::create()`, but the pool collision happens before that — during plan conversion.
## Fix
Move `MaterializedOutputBuffer` creation from `toVeloxQueryPlan()` into `createOrUpdateTaskImpl()` under the `prestoTask->mutex`, after `exec::Task::create()`. The plan node (`MaterializedOutputNode`) now carries shuffle params via a `ShuffleWriterMetadata` struct instead of the buffer itself. Operators hold a `shared_ptr<MaterializedOutputBuffer>` (not a raw pointer) looked up by `taskId` from a process-wide registry at construction time.
```
TaskResource::createOrUpdateBatchTask()
|
+----------------------------------+
| |
Request A Request B
| |
toVeloxQueryPlan() toVeloxQueryPlan()
| |
MaterializedOutputNode(params) MaterializedOutputNode(params)
(no pool created) (no pool created)
| |
+----------------------------------+
|
createOrUpdateTaskImpl()
|
prestoTask->mutex
|
if (prestoTask->task == nullptr)
| |
exec::Task::create() (Request B skipped)
|
new MaterializedOutputBuffer()
|
registerBuffer(taskId, buffer)
|
OK
```
## Changes
- `ShuffleWriterMetadata` struct added to `MaterializedOutput.h` — groups `shuffleWriterInfo` and `shuffleWriterFactoryName` for transport on the plan node.
- `MaterializedOutputNode` — replaced `shared_ptr<MaterializedOutputBuffer>` member with `ShuffleWriterMetadata`. Buffer is no longer created during plan conversion.
- `MaterializedOutputBuffer` — added static `registerBuffer`/`getBuffer`/`removeBuffer` backed by `folly::Synchronized<F14FastMap>`. Destructor calls `removeBuffer`.
- `MaterializedOutput` operator — holds `shared_ptr<MaterializedOutputBuffer>` (not raw pointer) looked up via `getBuffer(taskId)` at construction time.
- `TaskManager::createOrUpdateTaskImpl()` — after `exec::Task::create()`, under `prestoTask->mutex`, creates the buffer and registers it if the plan root is a `MaterializedOutputNode`.
- `PrestoToVeloxQueryPlan.cpp` — passes `ShuffleWriterMetadata` to the plan node instead of creating the buffer.
## Design
This follows the same pattern as Velox `OutputBufferManager`, which is the 1:1 streaming equivalent of `MaterializedOutputBuffer`:
- `OutputBufferManager` uses a process-wide `folly::Synchronized<unordered_map>` to store output buffers keyed by `taskId`
- Buffer creation happens inside `Task::start()` -> `initializePartitionOutput()` -> `OutputBufferManager::initializeTask()`, after task dedup
- Operators look up the buffer by `taskId` at runtime
`MaterializedOutputBuffer` now follows the same lifecycle:
| Step | OutputBufferManager | MaterializedOutputBuffer |
|------|-------------------|------------------------|
| Registry | `folly::Synchronized<map>` | `folly::Synchronized<F14FastMap>` |
| Creation | `Task::start()` under task lifecycle | `createOrUpdateTaskImpl()` under `prestoTask->mutex` |
| Lookup | `OutputBufferManager::getBuffer(taskId)` | `MaterializedOutputBuffer::getBuffer(taskId)` |
| Cleanup | `OutputBufferManager::removeTask(taskId)` | `MaterializedOutputBuffer::removeBuffer(taskId)` (destructor) |
Reviewed By: xiaoxmeng
Differential Revision: D106850939
d15a042 to
486d088
Compare
Summary:
Problem
In extremely rare cases (0.024%, ~4 out of 17,000 queries), when the coordinator retries or duplicates a
createTaskHTTP request, two concurrent requests race throughtoVeloxQueryPlan()— each creating aMaterializedOutputBufferwith the same pool name under the shared query root pool. The second call crashes with"pool already exists".The
prestoTask->mutexdeduplicatesexec::Task::create(), but the pool collision happens before that — during plan conversion.Fix
Move
MaterializedOutputBuffercreation fromtoVeloxQueryPlan()intocreateOrUpdateTaskImpl()under theprestoTask->mutex, afterexec::Task::create(). The plan node (MaterializedOutputNode) now carries shuffle params via aShuffleWriterMetadatastruct instead of the buffer itself. Operators hold ashared_ptr<MaterializedOutputBuffer>(not a raw pointer) looked up bytaskIdfrom a process-wide registry at construction time.Changes
ShuffleWriterMetadatastruct added toMaterializedOutput.h— groupsshuffleWriterInfoandshuffleWriterFactoryNamefor transport on the plan node.MaterializedOutputNode— replacedshared_ptr<MaterializedOutputBuffer>member withShuffleWriterMetadata. Buffer is no longer created during plan conversion.MaterializedOutputBuffer— added staticregisterBuffer/getBuffer/removeBufferbacked byfolly::Synchronized<F14FastMap>. Destructor callsremoveBuffer.MaterializedOutputoperator — holdsshared_ptr<MaterializedOutputBuffer>(not raw pointer) looked up viagetBuffer(taskId)at construction time.TaskManager::createOrUpdateTaskImpl()— afterexec::Task::create(), underprestoTask->mutex, creates the buffer and registers it if the plan root is aMaterializedOutputNode.PrestoToVeloxQueryPlan.cpp— passesShuffleWriterMetadatato the plan node instead of creating the buffer.Design
This follows the same pattern as Velox
OutputBufferManager, which is the 1:1 streaming equivalent ofMaterializedOutputBuffer:OutputBufferManageruses a process-widefolly::Synchronized<unordered_map>to store output buffers keyed bytaskIdTask::start()->initializePartitionOutput()->OutputBufferManager::initializeTask(), after task deduptaskIdat runtimeMaterializedOutputBuffernow follows the same lifecycle:folly::Synchronized<map>folly::Synchronized<F14FastMap>Task::start()under task lifecyclecreateOrUpdateTaskImpl()underprestoTask->mutexOutputBufferManager::getBuffer(taskId)MaterializedOutputBuffer::getBuffer(taskId)OutputBufferManager::removeTask(taskId)MaterializedOutputBuffer::removeBuffer(taskId)(destructor)Reviewed By: xiaoxmeng
Differential Revision: D106850939