Skip to content

fix(native-pos): Move MaterializedOutputBuffer creation under task mutex to prevent pool name collision#27900

Open
shrinidhijoshi wants to merge 1 commit into
prestodb:masterfrom
shrinidhijoshi:export-D106850939
Open

fix(native-pos): Move MaterializedOutputBuffer creation under task mutex to prevent pool name collision#27900
shrinidhijoshi wants to merge 1 commit into
prestodb:masterfrom
shrinidhijoshi:export-D106850939

Conversation

@shrinidhijoshi
Copy link
Copy Markdown
Collaborator

@shrinidhijoshi shrinidhijoshi commented May 29, 2026

Summary:

Problem

In extremely rare cases (0.024%, ~4 out of 17,000 queries), when the coordinator retries or duplicates a createTask HTTP request, two concurrent requests race through toVeloxQueryPlan() — each creating a MaterializedOutputBuffer with the same pool name under the shared query root pool. The second call crashes with "pool already exists".

                        TaskResource::createOrUpdateBatchTask()
                                      |
                    +----------------------------------+
                    |                                  |
             Request A                           Request B
                    |                                  |
          toVeloxQueryPlan()                 toVeloxQueryPlan()
                    |                                  |
     new MaterializedOutputBuffer()    new MaterializedOutputBuffer()
                    |                                  |
     pool->addLeafChild("mob.taskId")  pool->addLeafChild("mob.taskId")
                    |                                  |
                   OK                         CRASH: "already exists"
                    |                                  |
                    +----------------------------------+
                                      |
                       createOrUpdateTaskImpl()
                                      |
                          prestoTask->mutex  <-- too late
                                      |
                     if (prestoTask->task == nullptr)
                                      |
                         exec::Task::create()

The prestoTask->mutex deduplicates exec::Task::create(), but the pool collision happens before that — during plan conversion.

Fix

Move MaterializedOutputBuffer creation from toVeloxQueryPlan() into createOrUpdateTaskImpl() under the prestoTask->mutex, after exec::Task::create(). The plan node (MaterializedOutputNode) now carries shuffle params via a ShuffleWriterMetadata struct instead of the buffer itself. Operators hold a shared_ptr<MaterializedOutputBuffer> (not a raw pointer) looked up by taskId from a process-wide registry at construction time.

                        TaskResource::createOrUpdateBatchTask()
                                      |
                    +----------------------------------+
                    |                                  |
             Request A                           Request B
                    |                                  |
          toVeloxQueryPlan()                 toVeloxQueryPlan()
                    |                                  |
     MaterializedOutputNode(params)    MaterializedOutputNode(params)
              (no pool created)              (no pool created)
                    |                                  |
                    +----------------------------------+
                                      |
                       createOrUpdateTaskImpl()
                                      |
                          prestoTask->mutex
                                      |
                     if (prestoTask->task == nullptr)
                          |                     |
                  exec::Task::create()    (Request B skipped)
                          |
              new MaterializedOutputBuffer()
                          |
              registerBuffer(taskId, buffer)
                          |
                         OK

Changes

  • ShuffleWriterMetadata struct added to MaterializedOutput.h — groups shuffleWriterInfo and shuffleWriterFactoryName for transport on the plan node.
  • MaterializedOutputNode — replaced shared_ptr<MaterializedOutputBuffer> member with ShuffleWriterMetadata. Buffer is no longer created during plan conversion.
  • MaterializedOutputBuffer — added static registerBuffer/getBuffer/removeBuffer backed by folly::Synchronized<F14FastMap>. Destructor calls removeBuffer.
  • MaterializedOutput operator — holds shared_ptr<MaterializedOutputBuffer> (not raw pointer) looked up via getBuffer(taskId) at construction time.
  • TaskManager::createOrUpdateTaskImpl() — after exec::Task::create(), under prestoTask->mutex, creates the buffer and registers it if the plan root is a MaterializedOutputNode.
  • PrestoToVeloxQueryPlan.cpp — passes ShuffleWriterMetadata to the plan node instead of creating the buffer.

Design

This follows the same pattern as Velox OutputBufferManager, which is the 1:1 streaming equivalent of MaterializedOutputBuffer:

  • OutputBufferManager uses a process-wide folly::Synchronized<unordered_map> to store output buffers keyed by taskId
  • Buffer creation happens inside Task::start() -> initializePartitionOutput() -> OutputBufferManager::initializeTask(), after task dedup
  • Operators look up the buffer by taskId at runtime

MaterializedOutputBuffer now follows the same lifecycle:

Step OutputBufferManager MaterializedOutputBuffer
Registry folly::Synchronized<map> folly::Synchronized<F14FastMap>
Creation Task::start() under task lifecycle createOrUpdateTaskImpl() under prestoTask->mutex
Lookup OutputBufferManager::getBuffer(taskId) MaterializedOutputBuffer::getBuffer(taskId)
Cleanup OutputBufferManager::removeTask(taskId) MaterializedOutputBuffer::removeBuffer(taskId) (destructor)

Reviewed By: xiaoxmeng

Differential Revision: D106850939

== NO RELEASE NOTE ==

@shrinidhijoshi shrinidhijoshi requested review from a team as code owners May 29, 2026 20:09
@prestodb-ci prestodb-ci added the from:Meta PR from Meta label May 29, 2026
@sourcery-ai
Copy link
Copy Markdown
Contributor

sourcery-ai Bot commented May 29, 2026

Reviewer's Guide

Moves MaterializedOutputBuffer creation from plan conversion into task creation under the task mutex, and introduces a process-wide registry keyed by taskId so operators can look up buffers at runtime, preventing concurrent pool name collisions when createTask is retried or duplicated.

Sequence diagram for MaterializedOutputBuffer creation and lookup under task mutex

sequenceDiagram
  actor Coordinator
  participant TaskResource
  participant TaskManager
  participant PrestoTask
  participant ExecTask as exec::Task
  participant MOBClass as MaterializedOutputBuffer
  participant MaterializedOutputOp as MaterializedOutput

  Coordinator->>TaskResource: createOrUpdateBatchTask()
  TaskResource->>TaskManager: createOrUpdateTaskImpl(planFragment)
  TaskManager->>TaskManager: toVeloxQueryPlan()
  TaskManager-->>TaskManager: MaterializedOutputNode(shuffleWriterInfo,
  TaskManager-->>TaskManager:  shuffleWriterFactoryName)

  TaskManager->>PrestoTask: lock mutex
  alt prestoTask->task == nullptr
    TaskManager->>ExecTask: exec::Task::create()
    TaskManager->>MOBClass: MaterializedOutputBuffer::MaterializedOutputBuffer(
    TaskManager-->>MOBClass:  numPartitions,
    TaskManager-->>MOBClass:  shuffleWriterInfo,
    TaskManager-->>MOBClass:  shuffleWriterFactory,
    TaskManager-->>MOBClass:  taskId,
    TaskManager-->>MOBClass:  pool)
    TaskManager->>MOBClass: registerBuffer(taskId, buffer)
    TaskManager-->>PrestoTask: prestoTask->task = newExecTask
  else prestoTask->task != nullptr
    TaskManager-->>PrestoTask: skip exec::Task::create()
  end
  PrestoTask-->>TaskManager: unlock mutex

  loop Operator construction
    MaterializedOutputOp->>MOBClass: getBuffer(ctx->task->taskId())
    MOBClass-->>MaterializedOutputOp: MaterializedOutputBuffer*
  end

  MaterializedOutputOp->>MOBClass: ~MaterializedOutputBuffer()
  MOBClass->>MOBClass: removeBuffer(taskId)
Loading

File-Level Changes

Change Details Files
Introduce a process-wide registry for MaterializedOutputBuffer instances keyed by taskId and tie their lifecycle to task creation/cleanup.
  • Add a static folly::Synchronized unordered_map of taskId to shared_ptr.
  • Implement registerBuffer, getBuffer, and removeBuffer helpers that manipulate the registry under lock.
  • Track taskId_ in MaterializedOutputBuffer and remove the buffer from the registry in the destructor.
presto-native-execution/presto_cpp/main/operators/MaterializedOutputBuffer.cpp
presto-native-execution/presto_cpp/main/operators/MaterializedOutputBuffer.h
Change MaterializedOutputNode to carry shuffle writer metadata instead of a MaterializedOutputBuffer instance, and update the operator to look up the buffer at runtime.
  • Update MaterializedOutputNode constructor and members to accept shuffleWriterInfo and shuffleWriterFactoryName strings instead of a buffer pointer.
  • Remove the buffer() accessor and add accessors for shuffleWriterInfo and shuffleWriterFactoryName.
  • Adjust deserialization and tests to construct MaterializedOutputNode without a buffer and to register buffers explicitly by taskId.
  • Update MaterializedOutput operator to obtain the buffer via MaterializedOutputBuffer::getBuffer(ctx->task->taskId()).
presto-native-execution/presto_cpp/main/operators/MaterializedOutput.h
presto-native-execution/presto_cpp/main/operators/MaterializedOutput.cpp
presto-native-execution/presto_cpp/main/operators/tests/MaterializedExchangeTest.cpp
Move MaterializedOutputBuffer creation from query plan conversion into TaskManager::createOrUpdateTaskImpl() under prestoTask->mutex.
  • Remove buffer construction and shuffle factory lookup from VeloxBatchQueryPlanConverter::toVeloxQueryPlan; MaterializedOutputNode now only carries serialized shuffle info and the factory name.
  • Add logic in TaskManager::createOrUpdateTaskImpl to detect a top-level MaterializedOutputNode, create the corresponding MaterializedOutputBuffer with the task’s memory pool, and register it in the global registry while under prestoTask->mutex.
  • Add sanity check that shuffleName_ is non-empty before constructing MaterializedOutputNode when exchange materialization is enabled.
presto-native-execution/presto_cpp/main/TaskManager.cpp
presto-native-execution/presto_cpp/main/types/PrestoToVeloxQueryPlan.cpp

Tips and commands

Interacting with Sourcery

  • Trigger a new review: Comment @sourcery-ai review on the pull request.
  • Continue discussions: Reply directly to Sourcery's review comments.
  • Generate a GitHub issue from a review comment: Ask Sourcery to create an
    issue from a review comment by replying to it. You can also reply to a
    review comment with @sourcery-ai issue to create an issue from it.
  • Generate a pull request title: Write @sourcery-ai anywhere in the pull
    request title to generate a title at any time. You can also comment
    @sourcery-ai title on the pull request to (re-)generate the title at any time.
  • Generate a pull request summary: Write @sourcery-ai summary anywhere in
    the pull request body to generate a PR summary at any time exactly where you
    want it. You can also comment @sourcery-ai summary on the pull request to
    (re-)generate the summary at any time.
  • Generate reviewer's guide: Comment @sourcery-ai guide on the pull
    request to (re-)generate the reviewer's guide at any time.
  • Resolve all Sourcery comments: Comment @sourcery-ai resolve on the
    pull request to resolve all Sourcery comments. Useful if you've already
    addressed all the comments and don't want to see them anymore.
  • Dismiss all Sourcery reviews: Comment @sourcery-ai dismiss on the pull
    request to dismiss all existing Sourcery reviews. Especially useful if you
    want to start fresh with a new review - don't forget to comment
    @sourcery-ai review to trigger a new review!

Customizing Your Experience

Access your dashboard to:

  • Enable or disable review features such as the Sourcery-generated pull request
    summary, the reviewer's guide, and others.
  • Change the review language.
  • Add, remove or edit custom review instructions.
  • Adjust other review settings.

Getting Help

@meta-codesync meta-codesync Bot changed the title fix(native-pos): Move MaterializedOutputBuffer creation under task mutex to prevent pool name collision fix(native-pos): Move MaterializedOutputBuffer creation under task mutex to prevent pool name collision (#27900) May 29, 2026
shrinidhijoshi added a commit to shrinidhijoshi/presto that referenced this pull request May 29, 2026
…tex to prevent pool name collision (prestodb#27900)

Summary:

## Problem

In extremely rare cases (0.024%, ~4 out of 17,000 queries), when the coordinator retries or duplicates a `createTask` HTTP request, two concurrent requests race through `toVeloxQueryPlan()` — each creating a `MaterializedOutputBuffer` with the same pool name under the shared query root pool. The second call crashes with `"pool already exists"`.

```
                        TaskResource::createOrUpdateBatchTask()
                                      |
                    +----------------------------------+
                    |                                  |
             Request A                           Request B
                    |                                  |
          toVeloxQueryPlan()                 toVeloxQueryPlan()
                    |                                  |
     new MaterializedOutputBuffer()    new MaterializedOutputBuffer()
                    |                                  |
     pool->addLeafChild("mob.taskId")  pool->addLeafChild("mob.taskId")
                    |                                  |
                   OK                         CRASH: "already exists"
                    |                                  |
                    +----------------------------------+
                                      |
                       createOrUpdateTaskImpl()
                                      |
                          prestoTask->mutex  <-- too late
                                      |
                     if (prestoTask->task == nullptr)
                                      |
                         exec::Task::create()
```

The `prestoTask->mutex` deduplicates `exec::Task::create()`, but the pool collision happens before that — during plan conversion.

## Fix

Move `MaterializedOutputBuffer` creation from `toVeloxQueryPlan()` into `createOrUpdateTaskImpl()` under the `prestoTask->mutex`, after `exec::Task::create()`. The plan node (`MaterializedOutputNode`) now carries the shuffle factory params instead of the buffer itself. Operators look up the buffer by `taskId` from a process-wide registry at construction time.

```
                        TaskResource::createOrUpdateBatchTask()
                                      |
                    +----------------------------------+
                    |                                  |
             Request A                           Request B
                    |                                  |
          toVeloxQueryPlan()                 toVeloxQueryPlan()
                    |                                  |
     MaterializedOutputNode(params)    MaterializedOutputNode(params)
              (no pool created)              (no pool created)
                    |                                  |
                    +----------------------------------+
                                      |
                       createOrUpdateTaskImpl()
                                      |
                          prestoTask->mutex
                                      |
                     if (prestoTask->task == nullptr)
                          |                     |
                  exec::Task::create()    (Request B skipped)
                          |
              new MaterializedOutputBuffer()
                          |
              registerBuffer(taskId, buffer)
                          |
                         OK
```

## Design

This follows the same pattern as Velox `OutputBufferManager`, which is the 1:1 streaming equivalent of `MaterializedOutputBuffer`:

- `OutputBufferManager` uses a process-wide `folly::Synchronized<unordered_map>` to store output buffers keyed by `taskId`
- Buffer creation happens inside `Task::start()` -> `initializePartitionOutput()` -> `OutputBufferManager::initializeTask()`, after task dedup
- Operators look up the buffer by `taskId` at runtime

`MaterializedOutputBuffer` now follows the same lifecycle:

| Step | OutputBufferManager | MaterializedOutputBuffer |
|------|-------------------|------------------------|
| Registry | `folly::Synchronized<map>` | `folly::Synchronized<map>` |
| Creation | `Task::start()` under task lifecycle | `createOrUpdateTaskImpl()` under `prestoTask->mutex` |
| Lookup | `OutputBufferManager::getBuffer(taskId)` | `MaterializedOutputBuffer::getBuffer(taskId)` |
| Cleanup | `OutputBufferManager::removeTask(taskId)` | `MaterializedOutputBuffer::removeBuffer(taskId)` (destructor) |

Differential Revision: D106850939
Copy link
Copy Markdown
Contributor

@sourcery-ai sourcery-ai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey - I've found 3 issues, and left some high level feedback:

  • In TaskManager::createOrUpdateTaskImpl(), consider restoring a VELOX_CHECK_NOT_NULL (or equivalent) on the result of ShuffleInterfaceFactory::factory() before passing it into MaterializedOutputBuffer to avoid a null dereference if the factory is not registered at runtime.
  • MaterializedOutputBuffer::registerBuffer() currently uses emplace without checking for an existing entry; adding a DCHECK/VELOX_CHECK that the taskId is not already present would make violations of the "created under prestoTask->mutex" invariant fail fast and easier to debug.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- In TaskManager::createOrUpdateTaskImpl(), consider restoring a VELOX_CHECK_NOT_NULL (or equivalent) on the result of ShuffleInterfaceFactory::factory() before passing it into MaterializedOutputBuffer to avoid a null dereference if the factory is not registered at runtime.
- MaterializedOutputBuffer::registerBuffer() currently uses emplace without checking for an existing entry; adding a DCHECK/VELOX_CHECK that the taskId is not already present would make violations of the "created under prestoTask->mutex" invariant fail fast and easier to debug.

## Individual Comments

### Comment 1
<location path="presto-native-execution/presto_cpp/main/TaskManager.cpp" line_range="597-599" />
<code_context>
+      if (auto* materializedOutputNode = dynamic_cast<
+              const operators::MaterializedOutputNode*>(
+              planFragment.planNode.get())) {
+        auto* shuffleFactory =
+            operators::ShuffleInterfaceFactory::factory(
+                materializedOutputNode->shuffleWriterFactoryName());
+        auto buffer = std::make_shared<operators::MaterializedOutputBuffer>(
+            materializedOutputNode->numPartitions(),
</code_context>
<issue_to_address>
**issue (bug_risk):** Handle null ShuffleInterfaceFactory when creating MaterializedOutputBuffer to avoid crashes on misconfiguration or missing registration.

In this path, `shuffleFactory` may be null if `shuffleWriterFactoryName()` is empty or the factory is not registered, but it’s passed directly into `MaterializedOutputBuffer` without validation. Add a `VELOX_CHECK_NOT_NULL` (or equivalent) with a clear error message, and consider explicitly checking that `shuffleWriterFactoryName()` is non-empty before constructing the buffer.
</issue_to_address>

### Comment 2
<location path="presto-native-execution/presto_cpp/main/operators/MaterializedOutput.cpp" line_range="114-115" />
<code_context>
                                       /*localExchange=*/false)),
       replicateNullsAndAny_(planNode->isReplicateNullsAndAny()),
-      buffer_(planNode->buffer()),
+      buffer_(
+          MaterializedOutputBuffer::getBuffer(ctx->task->taskId()).get()),
       targetSizeInBytes_(
           std::clamp(
</code_context>
<issue_to_address>
**issue (bug_risk):** Guard against a missing MaterializedOutputBuffer in the global registry to prevent null dereferences in the operator.

If `MaterializedOutputBuffer::getBuffer(taskId)` returns `nullptr` (e.g., race, failed registration, config mismatch), `buffer_` becomes a null raw pointer and any use will crash. Since this is a hard dependency, consider asserting the invariant with `VELOX_CHECK_NOT_NULL` (or similar) on the returned shared_ptr before calling `.get()` so we fail fast with a clear error when the registry and task construction are out of sync.
</issue_to_address>

### Comment 3
<location path="presto-native-execution/presto_cpp/main/operators/MaterializedOutputBuffer.cpp" line_range="44-48" />
<code_context>
+    std::unordered_map<std::string, std::shared_ptr<MaterializedOutputBuffer>>>
+    MaterializedOutputBuffer::buffers_;
+
+void MaterializedOutputBuffer::registerBuffer(
+    const std::string& taskId,
+    std::shared_ptr<MaterializedOutputBuffer> buffer) {
+  buffers_.withLock(
+      [&](auto& buffers) { buffers.emplace(taskId, std::move(buffer)); });
+}
+
</code_context>
<issue_to_address>
**suggestion (bug_risk):** Clarify handling of duplicate taskId registrations in the global buffer map.

The comment above says this runs under `prestoTask->mutex` to prevent duplicate creation, but here we use `emplace` and silently ignore duplicate keys. If that mutex invariant is broken or a task is retried with the same id, a stale buffer could remain registered with no signal. Consider checking the `emplace` result and failing fast (e.g., `VELOX_CHECK`) or at least logging a warning so duplicate registrations and lifecycle/concurrency issues are easier to detect.

Suggested implementation:

```cpp
void MaterializedOutputBuffer::registerBuffer(
    const std::string& taskId,
    std::shared_ptr<MaterializedOutputBuffer> buffer) {
  buffers_.withLock([&](auto& buffers) {
    auto [it, inserted] = buffers.emplace(taskId, std::move(buffer));
    VELOX_CHECK(
        inserted,
        "Duplicate MaterializedOutputBuffer registration for taskId {}",
        taskId);
  });

```

1. Ensure the file includes whatever header defines `VELOX_CHECK` (commonly `velox/common/base/VeloxException.h`) if it is not already included at the top of `MaterializedOutputBuffer.cpp`.
2. If the project prefers logging-and-continue over hard checks in this code path, replace `VELOX_CHECK` with an appropriate logging macro (e.g., `LOG(WARNING) << ...`) while still keeping the `inserted` check.
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

Comment on lines +597 to +599
auto* shuffleFactory =
operators::ShuffleInterfaceFactory::factory(
materializedOutputNode->shuffleWriterFactoryName());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (bug_risk): Handle null ShuffleInterfaceFactory when creating MaterializedOutputBuffer to avoid crashes on misconfiguration or missing registration.

In this path, shuffleFactory may be null if shuffleWriterFactoryName() is empty or the factory is not registered, but it’s passed directly into MaterializedOutputBuffer without validation. Add a VELOX_CHECK_NOT_NULL (or equivalent) with a clear error message, and consider explicitly checking that shuffleWriterFactoryName() is non-empty before constructing the buffer.

Comment on lines +114 to +115
buffer_(
MaterializedOutputBuffer::getBuffer(ctx->task->taskId()).get()),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (bug_risk): Guard against a missing MaterializedOutputBuffer in the global registry to prevent null dereferences in the operator.

If MaterializedOutputBuffer::getBuffer(taskId) returns nullptr (e.g., race, failed registration, config mismatch), buffer_ becomes a null raw pointer and any use will crash. Since this is a hard dependency, consider asserting the invariant with VELOX_CHECK_NOT_NULL (or similar) on the returned shared_ptr before calling .get() so we fail fast with a clear error when the registry and task construction are out of sync.

Comment on lines +44 to +48
void MaterializedOutputBuffer::registerBuffer(
const std::string& taskId,
std::shared_ptr<MaterializedOutputBuffer> buffer) {
buffers_.withLock(
[&](auto& buffers) { buffers.emplace(taskId, std::move(buffer)); });
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (bug_risk): Clarify handling of duplicate taskId registrations in the global buffer map.

The comment above says this runs under prestoTask->mutex to prevent duplicate creation, but here we use emplace and silently ignore duplicate keys. If that mutex invariant is broken or a task is retried with the same id, a stale buffer could remain registered with no signal. Consider checking the emplace result and failing fast (e.g., VELOX_CHECK) or at least logging a warning so duplicate registrations and lifecycle/concurrency issues are easier to detect.

Suggested implementation:

void MaterializedOutputBuffer::registerBuffer(
    const std::string& taskId,
    std::shared_ptr<MaterializedOutputBuffer> buffer) {
  buffers_.withLock([&](auto& buffers) {
    auto [it, inserted] = buffers.emplace(taskId, std::move(buffer));
    VELOX_CHECK(
        inserted,
        "Duplicate MaterializedOutputBuffer registration for taskId {}",
        taskId);
  });
  1. Ensure the file includes whatever header defines VELOX_CHECK (commonly velox/common/base/VeloxException.h) if it is not already included at the top of MaterializedOutputBuffer.cpp.
  2. If the project prefers logging-and-continue over hard checks in this code path, replace VELOX_CHECK with an appropriate logging macro (e.g., LOG(WARNING) << ...) while still keeping the inserted check.

shrinidhijoshi added a commit to shrinidhijoshi/presto that referenced this pull request May 29, 2026
…tex to prevent pool name collision (prestodb#27900)

Summary:

## Problem

In extremely rare cases (0.024%, ~4 out of 17,000 queries), when the coordinator retries or duplicates a `createTask` HTTP request, two concurrent requests race through `toVeloxQueryPlan()` — each creating a `MaterializedOutputBuffer` with the same pool name under the shared query root pool. The second call crashes with `"pool already exists"`.

```
                        TaskResource::createOrUpdateBatchTask()
                                      |
                    +----------------------------------+
                    |                                  |
             Request A                           Request B
                    |                                  |
          toVeloxQueryPlan()                 toVeloxQueryPlan()
                    |                                  |
     new MaterializedOutputBuffer()    new MaterializedOutputBuffer()
                    |                                  |
     pool->addLeafChild("mob.taskId")  pool->addLeafChild("mob.taskId")
                    |                                  |
                   OK                         CRASH: "already exists"
                    |                                  |
                    +----------------------------------+
                                      |
                       createOrUpdateTaskImpl()
                                      |
                          prestoTask->mutex  <-- too late
                                      |
                     if (prestoTask->task == nullptr)
                                      |
                         exec::Task::create()
```

The `prestoTask->mutex` deduplicates `exec::Task::create()`, but the pool collision happens before that — during plan conversion.

## Fix

Move `MaterializedOutputBuffer` creation from `toVeloxQueryPlan()` into `createOrUpdateTaskImpl()` under the `prestoTask->mutex`, after `exec::Task::create()`. The plan node (`MaterializedOutputNode`) now carries the shuffle factory params instead of the buffer itself. Operators look up the buffer by `taskId` from a process-wide registry at construction time.

```
                        TaskResource::createOrUpdateBatchTask()
                                      |
                    +----------------------------------+
                    |                                  |
             Request A                           Request B
                    |                                  |
          toVeloxQueryPlan()                 toVeloxQueryPlan()
                    |                                  |
     MaterializedOutputNode(params)    MaterializedOutputNode(params)
              (no pool created)              (no pool created)
                    |                                  |
                    +----------------------------------+
                                      |
                       createOrUpdateTaskImpl()
                                      |
                          prestoTask->mutex
                                      |
                     if (prestoTask->task == nullptr)
                          |                     |
                  exec::Task::create()    (Request B skipped)
                          |
              new MaterializedOutputBuffer()
                          |
              registerBuffer(taskId, buffer)
                          |
                         OK
```

## Design

This follows the same pattern as Velox `OutputBufferManager`, which is the 1:1 streaming equivalent of `MaterializedOutputBuffer`:

- `OutputBufferManager` uses a process-wide `folly::Synchronized<unordered_map>` to store output buffers keyed by `taskId`
- Buffer creation happens inside `Task::start()` -> `initializePartitionOutput()` -> `OutputBufferManager::initializeTask()`, after task dedup
- Operators look up the buffer by `taskId` at runtime

`MaterializedOutputBuffer` now follows the same lifecycle:

| Step | OutputBufferManager | MaterializedOutputBuffer |
|------|-------------------|------------------------|
| Registry | `folly::Synchronized<map>` | `folly::Synchronized<map>` |
| Creation | `Task::start()` under task lifecycle | `createOrUpdateTaskImpl()` under `prestoTask->mutex` |
| Lookup | `OutputBufferManager::getBuffer(taskId)` | `MaterializedOutputBuffer::getBuffer(taskId)` |
| Cleanup | `OutputBufferManager::removeTask(taskId)` | `MaterializedOutputBuffer::removeBuffer(taskId)` (destructor) |

Differential Revision: D106850939
shrinidhijoshi added a commit to shrinidhijoshi/presto that referenced this pull request May 29, 2026
…tex to prevent pool name collision (prestodb#27900)

Summary:

## Problem

In extremely rare cases (0.024%, ~4 out of 17,000 queries), when the coordinator retries or duplicates a `createTask` HTTP request, two concurrent requests race through `toVeloxQueryPlan()` — each creating a `MaterializedOutputBuffer` with the same pool name under the shared query root pool. The second call crashes with `"pool already exists"`.

```
                        TaskResource::createOrUpdateBatchTask()
                                      |
                    +----------------------------------+
                    |                                  |
             Request A                           Request B
                    |                                  |
          toVeloxQueryPlan()                 toVeloxQueryPlan()
                    |                                  |
     new MaterializedOutputBuffer()    new MaterializedOutputBuffer()
                    |                                  |
     pool->addLeafChild("mob.taskId")  pool->addLeafChild("mob.taskId")
                    |                                  |
                   OK                         CRASH: "already exists"
                    |                                  |
                    +----------------------------------+
                                      |
                       createOrUpdateTaskImpl()
                                      |
                          prestoTask->mutex  <-- too late
                                      |
                     if (prestoTask->task == nullptr)
                                      |
                         exec::Task::create()
```

The `prestoTask->mutex` deduplicates `exec::Task::create()`, but the pool collision happens before that — during plan conversion.

## Fix

Move `MaterializedOutputBuffer` creation from `toVeloxQueryPlan()` into `createOrUpdateTaskImpl()` under the `prestoTask->mutex`, after `exec::Task::create()`. The plan node (`MaterializedOutputNode`) now carries the shuffle factory params instead of the buffer itself. Operators look up the buffer by `taskId` from a process-wide registry at construction time.

```
                        TaskResource::createOrUpdateBatchTask()
                                      |
                    +----------------------------------+
                    |                                  |
             Request A                           Request B
                    |                                  |
          toVeloxQueryPlan()                 toVeloxQueryPlan()
                    |                                  |
     MaterializedOutputNode(params)    MaterializedOutputNode(params)
              (no pool created)              (no pool created)
                    |                                  |
                    +----------------------------------+
                                      |
                       createOrUpdateTaskImpl()
                                      |
                          prestoTask->mutex
                                      |
                     if (prestoTask->task == nullptr)
                          |                     |
                  exec::Task::create()    (Request B skipped)
                          |
              new MaterializedOutputBuffer()
                          |
              registerBuffer(taskId, buffer)
                          |
                         OK
```

## Design

This follows the same pattern as Velox `OutputBufferManager`, which is the 1:1 streaming equivalent of `MaterializedOutputBuffer`:

- `OutputBufferManager` uses a process-wide `folly::Synchronized<unordered_map>` to store output buffers keyed by `taskId`
- Buffer creation happens inside `Task::start()` -> `initializePartitionOutput()` -> `OutputBufferManager::initializeTask()`, after task dedup
- Operators look up the buffer by `taskId` at runtime

`MaterializedOutputBuffer` now follows the same lifecycle:

| Step | OutputBufferManager | MaterializedOutputBuffer |
|------|-------------------|------------------------|
| Registry | `folly::Synchronized<map>` | `folly::Synchronized<map>` |
| Creation | `Task::start()` under task lifecycle | `createOrUpdateTaskImpl()` under `prestoTask->mutex` |
| Lookup | `OutputBufferManager::getBuffer(taskId)` | `MaterializedOutputBuffer::getBuffer(taskId)` |
| Cleanup | `OutputBufferManager::removeTask(taskId)` | `MaterializedOutputBuffer::removeBuffer(taskId)` (destructor) |

Differential Revision: D106850939
@meta-codesync meta-codesync Bot changed the title fix(native-pos): Move MaterializedOutputBuffer creation under task mutex to prevent pool name collision (#27900) fix(native-pos): Move MaterializedOutputBuffer creation under task mutex to prevent pool double creation (#27900) May 29, 2026
shrinidhijoshi added a commit to shrinidhijoshi/presto that referenced this pull request May 29, 2026
…tex to prevent pool double creation (prestodb#27900)

Summary:

## Problem

In extremely rare cases (0.024%, ~4 out of 17,000 queries), when the coordinator retries or duplicates a `createTask` HTTP request, two concurrent requests race through `toVeloxQueryPlan()` — each creating a `MaterializedOutputBuffer` with the same pool name under the shared query root pool. The second call crashes with `"pool already exists"`.

```
                        TaskResource::createOrUpdateBatchTask()
                                      |
                    +----------------------------------+
                    |                                  |
             Request A                           Request B
                    |                                  |
          toVeloxQueryPlan()                 toVeloxQueryPlan()
                    |                                  |
     new MaterializedOutputBuffer()    new MaterializedOutputBuffer()
                    |                                  |
     pool->addLeafChild("mob.taskId")  pool->addLeafChild("mob.taskId")
                    |                                  |
                   OK                         CRASH: "already exists"
                    |                                  |
                    +----------------------------------+
                                      |
                       createOrUpdateTaskImpl()
                                      |
                          prestoTask->mutex  <-- too late
                                      |
                     if (prestoTask->task == nullptr)
                                      |
                         exec::Task::create()
```

The `prestoTask->mutex` deduplicates `exec::Task::create()`, but the pool collision happens before that — during plan conversion.

## Fix

Move `MaterializedOutputBuffer` creation from `toVeloxQueryPlan()` into `createOrUpdateTaskImpl()` under the `prestoTask->mutex`, after `exec::Task::create()`. The plan node (`MaterializedOutputNode`) now carries the shuffle factory params instead of the buffer itself. Operators look up the buffer by `taskId` from a process-wide registry at construction time.

```
                        TaskResource::createOrUpdateBatchTask()
                                      |
                    +----------------------------------+
                    |                                  |
             Request A                           Request B
                    |                                  |
          toVeloxQueryPlan()                 toVeloxQueryPlan()
                    |                                  |
     MaterializedOutputNode(params)    MaterializedOutputNode(params)
              (no pool created)              (no pool created)
                    |                                  |
                    +----------------------------------+
                                      |
                       createOrUpdateTaskImpl()
                                      |
                          prestoTask->mutex
                                      |
                     if (prestoTask->task == nullptr)
                          |                     |
                  exec::Task::create()    (Request B skipped)
                          |
              new MaterializedOutputBuffer()
                          |
              registerBuffer(taskId, buffer)
                          |
                         OK
```

## Design

This follows the same pattern as Velox `OutputBufferManager`, which is the 1:1 streaming equivalent of `MaterializedOutputBuffer`:

- `OutputBufferManager` uses a process-wide `folly::Synchronized<unordered_map>` to store output buffers keyed by `taskId`
- Buffer creation happens inside `Task::start()` -> `initializePartitionOutput()` -> `OutputBufferManager::initializeTask()`, after task dedup
- Operators look up the buffer by `taskId` at runtime

`MaterializedOutputBuffer` now follows the same lifecycle:

| Step | OutputBufferManager | MaterializedOutputBuffer |
|------|-------------------|------------------------|
| Registry | `folly::Synchronized<map>` | `folly::Synchronized<map>` |
| Creation | `Task::start()` under task lifecycle | `createOrUpdateTaskImpl()` under `prestoTask->mutex` |
| Lookup | `OutputBufferManager::getBuffer(taskId)` | `MaterializedOutputBuffer::getBuffer(taskId)` |
| Cleanup | `OutputBufferManager::removeTask(taskId)` | `MaterializedOutputBuffer::removeBuffer(taskId)` (destructor) |

Differential Revision: D106850939
shrinidhijoshi added a commit to shrinidhijoshi/presto that referenced this pull request May 29, 2026
…tex to prevent pool double creation (prestodb#27900)

Summary:

## Problem

In extremely rare cases (0.024%, ~4 out of 17,000 queries), when the coordinator retries or duplicates a `createTask` HTTP request, two concurrent requests race through `toVeloxQueryPlan()` — each creating a `MaterializedOutputBuffer` with the same pool name under the shared query root pool. The second call crashes with `"pool already exists"`.

```
                        TaskResource::createOrUpdateBatchTask()
                                      |
                    +----------------------------------+
                    |                                  |
             Request A                           Request B
                    |                                  |
          toVeloxQueryPlan()                 toVeloxQueryPlan()
                    |                                  |
     new MaterializedOutputBuffer()    new MaterializedOutputBuffer()
                    |                                  |
     pool->addLeafChild("mob.taskId")  pool->addLeafChild("mob.taskId")
                    |                                  |
                   OK                         CRASH: "already exists"
                    |                                  |
                    +----------------------------------+
                                      |
                       createOrUpdateTaskImpl()
                                      |
                          prestoTask->mutex  <-- too late
                                      |
                     if (prestoTask->task == nullptr)
                                      |
                         exec::Task::create()
```

The `prestoTask->mutex` deduplicates `exec::Task::create()`, but the pool collision happens before that — during plan conversion.

## Fix

Move `MaterializedOutputBuffer` creation from `toVeloxQueryPlan()` into `createOrUpdateTaskImpl()` under the `prestoTask->mutex`, after `exec::Task::create()`. The plan node (`MaterializedOutputNode`) now carries the shuffle factory params instead of the buffer itself. Operators look up the buffer by `taskId` from a process-wide registry at construction time.

```
                        TaskResource::createOrUpdateBatchTask()
                                      |
                    +----------------------------------+
                    |                                  |
             Request A                           Request B
                    |                                  |
          toVeloxQueryPlan()                 toVeloxQueryPlan()
                    |                                  |
     MaterializedOutputNode(params)    MaterializedOutputNode(params)
              (no pool created)              (no pool created)
                    |                                  |
                    +----------------------------------+
                                      |
                       createOrUpdateTaskImpl()
                                      |
                          prestoTask->mutex
                                      |
                     if (prestoTask->task == nullptr)
                          |                     |
                  exec::Task::create()    (Request B skipped)
                          |
              new MaterializedOutputBuffer()
                          |
              registerBuffer(taskId, buffer)
                          |
                         OK
```

## Design

This follows the same pattern as Velox `OutputBufferManager`, which is the 1:1 streaming equivalent of `MaterializedOutputBuffer`:

- `OutputBufferManager` uses a process-wide `folly::Synchronized<unordered_map>` to store output buffers keyed by `taskId`
- Buffer creation happens inside `Task::start()` -> `initializePartitionOutput()` -> `OutputBufferManager::initializeTask()`, after task dedup
- Operators look up the buffer by `taskId` at runtime

`MaterializedOutputBuffer` now follows the same lifecycle:

| Step | OutputBufferManager | MaterializedOutputBuffer |
|------|-------------------|------------------------|
| Registry | `folly::Synchronized<map>` | `folly::Synchronized<map>` |
| Creation | `Task::start()` under task lifecycle | `createOrUpdateTaskImpl()` under `prestoTask->mutex` |
| Lookup | `OutputBufferManager::getBuffer(taskId)` | `MaterializedOutputBuffer::getBuffer(taskId)` |
| Cleanup | `OutputBufferManager::removeTask(taskId)` | `MaterializedOutputBuffer::removeBuffer(taskId)` (destructor) |

Differential Revision: D106850939
shrinidhijoshi added a commit to shrinidhijoshi/presto that referenced this pull request May 29, 2026
…tex to prevent pool double creation (prestodb#27900)

Summary:

## Problem

In extremely rare cases (0.024%, ~4 out of 17,000 queries), when the coordinator retries or duplicates a `createTask` HTTP request, two concurrent requests race through `toVeloxQueryPlan()` — each creating a `MaterializedOutputBuffer` with the same pool name under the shared query root pool. The second call crashes with `"pool already exists"`.

```
                        TaskResource::createOrUpdateBatchTask()
                                      |
                    +----------------------------------+
                    |                                  |
             Request A                           Request B
                    |                                  |
          toVeloxQueryPlan()                 toVeloxQueryPlan()
                    |                                  |
     new MaterializedOutputBuffer()    new MaterializedOutputBuffer()
                    |                                  |
     pool->addLeafChild("mob.taskId")  pool->addLeafChild("mob.taskId")
                    |                                  |
                   OK                         CRASH: "already exists"
                    |                                  |
                    +----------------------------------+
                                      |
                       createOrUpdateTaskImpl()
                                      |
                          prestoTask->mutex  <-- too late
                                      |
                     if (prestoTask->task == nullptr)
                                      |
                         exec::Task::create()
```

The `prestoTask->mutex` deduplicates `exec::Task::create()`, but the pool collision happens before that — during plan conversion.

## Fix

Move `MaterializedOutputBuffer` creation from `toVeloxQueryPlan()` into `createOrUpdateTaskImpl()` under the `prestoTask->mutex`, after `exec::Task::create()`. The plan node (`MaterializedOutputNode`) now carries the shuffle factory params instead of the buffer itself. Operators look up the buffer by `taskId` from a process-wide registry at construction time.

```
                        TaskResource::createOrUpdateBatchTask()
                                      |
                    +----------------------------------+
                    |                                  |
             Request A                           Request B
                    |                                  |
          toVeloxQueryPlan()                 toVeloxQueryPlan()
                    |                                  |
     MaterializedOutputNode(params)    MaterializedOutputNode(params)
              (no pool created)              (no pool created)
                    |                                  |
                    +----------------------------------+
                                      |
                       createOrUpdateTaskImpl()
                                      |
                          prestoTask->mutex
                                      |
                     if (prestoTask->task == nullptr)
                          |                     |
                  exec::Task::create()    (Request B skipped)
                          |
              new MaterializedOutputBuffer()
                          |
              registerBuffer(taskId, buffer)
                          |
                         OK
```

## Design

This follows the same pattern as Velox `OutputBufferManager`, which is the 1:1 streaming equivalent of `MaterializedOutputBuffer`:

- `OutputBufferManager` uses a process-wide `folly::Synchronized<unordered_map>` to store output buffers keyed by `taskId`
- Buffer creation happens inside `Task::start()` -> `initializePartitionOutput()` -> `OutputBufferManager::initializeTask()`, after task dedup
- Operators look up the buffer by `taskId` at runtime

`MaterializedOutputBuffer` now follows the same lifecycle:

| Step | OutputBufferManager | MaterializedOutputBuffer |
|------|-------------------|------------------------|
| Registry | `folly::Synchronized<map>` | `folly::Synchronized<map>` |
| Creation | `Task::start()` under task lifecycle | `createOrUpdateTaskImpl()` under `prestoTask->mutex` |
| Lookup | `OutputBufferManager::getBuffer(taskId)` | `MaterializedOutputBuffer::getBuffer(taskId)` |
| Cleanup | `OutputBufferManager::removeTask(taskId)` | `MaterializedOutputBuffer::removeBuffer(taskId)` (destructor) |

Differential Revision: D106850939
…tex to prevent pool name collision

Summary:
## Problem

In extremely rare cases (0.024%, ~4 out of 17,000 queries), when the coordinator retries or duplicates a `createTask` HTTP request, two concurrent requests race through `toVeloxQueryPlan()` — each creating a `MaterializedOutputBuffer` with the same pool name under the shared query root pool. The second call crashes with `"pool already exists"`.

```
                        TaskResource::createOrUpdateBatchTask()
                                      |
                    +----------------------------------+
                    |                                  |
             Request A                           Request B
                    |                                  |
          toVeloxQueryPlan()                 toVeloxQueryPlan()
                    |                                  |
     new MaterializedOutputBuffer()    new MaterializedOutputBuffer()
                    |                                  |
     pool->addLeafChild("mob.taskId")  pool->addLeafChild("mob.taskId")
                    |                                  |
                   OK                         CRASH: "already exists"
                    |                                  |
                    +----------------------------------+
                                      |
                       createOrUpdateTaskImpl()
                                      |
                          prestoTask->mutex  <-- too late
                                      |
                     if (prestoTask->task == nullptr)
                                      |
                         exec::Task::create()
```

The `prestoTask->mutex` deduplicates `exec::Task::create()`, but the pool collision happens before that — during plan conversion.

## Fix

Move `MaterializedOutputBuffer` creation from `toVeloxQueryPlan()` into `createOrUpdateTaskImpl()` under the `prestoTask->mutex`, after `exec::Task::create()`. The plan node (`MaterializedOutputNode`) now carries shuffle params via a `ShuffleWriterMetadata` struct instead of the buffer itself. Operators hold a `shared_ptr<MaterializedOutputBuffer>` (not a raw pointer) looked up by `taskId` from a process-wide registry at construction time.

```
                        TaskResource::createOrUpdateBatchTask()
                                      |
                    +----------------------------------+
                    |                                  |
             Request A                           Request B
                    |                                  |
          toVeloxQueryPlan()                 toVeloxQueryPlan()
                    |                                  |
     MaterializedOutputNode(params)    MaterializedOutputNode(params)
              (no pool created)              (no pool created)
                    |                                  |
                    +----------------------------------+
                                      |
                       createOrUpdateTaskImpl()
                                      |
                          prestoTask->mutex
                                      |
                     if (prestoTask->task == nullptr)
                          |                     |
                  exec::Task::create()    (Request B skipped)
                          |
              new MaterializedOutputBuffer()
                          |
              registerBuffer(taskId, buffer)
                          |
                         OK
```

## Changes

- `ShuffleWriterMetadata` struct added to `MaterializedOutput.h` — groups `shuffleWriterInfo` and `shuffleWriterFactoryName` for transport on the plan node.
- `MaterializedOutputNode` — replaced `shared_ptr<MaterializedOutputBuffer>` member with `ShuffleWriterMetadata`. Buffer is no longer created during plan conversion.
- `MaterializedOutputBuffer` — added static `registerBuffer`/`getBuffer`/`removeBuffer` backed by `folly::Synchronized<F14FastMap>`. Destructor calls `removeBuffer`.
- `MaterializedOutput` operator — holds `shared_ptr<MaterializedOutputBuffer>` (not raw pointer) looked up via `getBuffer(taskId)` at construction time.
- `TaskManager::createOrUpdateTaskImpl()` — after `exec::Task::create()`, under `prestoTask->mutex`, creates the buffer and registers it if the plan root is a `MaterializedOutputNode`.
- `PrestoToVeloxQueryPlan.cpp` — passes `ShuffleWriterMetadata` to the plan node instead of creating the buffer.

## Design

This follows the same pattern as Velox `OutputBufferManager`, which is the 1:1 streaming equivalent of `MaterializedOutputBuffer`:

- `OutputBufferManager` uses a process-wide `folly::Synchronized<unordered_map>` to store output buffers keyed by `taskId`
- Buffer creation happens inside `Task::start()` -> `initializePartitionOutput()` -> `OutputBufferManager::initializeTask()`, after task dedup
- Operators look up the buffer by `taskId` at runtime

`MaterializedOutputBuffer` now follows the same lifecycle:

| Step | OutputBufferManager | MaterializedOutputBuffer |
|------|-------------------|------------------------|
| Registry | `folly::Synchronized<map>` | `folly::Synchronized<F14FastMap>` |
| Creation | `Task::start()` under task lifecycle | `createOrUpdateTaskImpl()` under `prestoTask->mutex` |
| Lookup | `OutputBufferManager::getBuffer(taskId)` | `MaterializedOutputBuffer::getBuffer(taskId)` |
| Cleanup | `OutputBufferManager::removeTask(taskId)` | `MaterializedOutputBuffer::removeBuffer(taskId)` (destructor) |

Reviewed By: xiaoxmeng

Differential Revision: D106850939
@meta-codesync meta-codesync Bot changed the title fix(native-pos): Move MaterializedOutputBuffer creation under task mutex to prevent pool double creation (#27900) fix(native-pos): Move MaterializedOutputBuffer creation under task mutex to prevent pool name collision May 30, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants