Skip to content

Commit

Permalink
Fix the lock order issue caused by value operator init (facebookincub…
Browse files Browse the repository at this point in the history
…ator#9738)

Summary:
The lock order issue detected by tsan is as follows:
T1: task startup creates driver and constructor operators which holds the task lock
T2: value operator constructor allocates memory from memory pool for parallelized value node mode
      which triggers memory arbitration 
T3: the memory arbitration grabs arbitration lock for arbitration
T4: the memory reclaim tries to grabs the task lock for reclaim such as to pause the task execution

This forms a deadlock and we have the logic to detect the memory pool allocation from the operator
constructor in driver execution framework. However the memory pool of vectors used by value nodes
is not the query memory pool but owns by the test framework. This PR moves the memory pool allocation
from constructor to initialize() method as the other operator does.


Test Plan:
The test passed 100 stress test iterations:
https://www.internalfb.com/intern/testinfra/testrun/10414574171435627
https://www.internalfb.com/intern/testinfra/testrun/13229323938244819

Differential Revision: D57063553

Pulled By: xiaoxmeng
  • Loading branch information
xiaoxmeng authored and facebook-github-bot committed May 7, 2024
1 parent 1fd2bc9 commit 8600c22
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 4 deletions.
16 changes: 12 additions & 4 deletions velox/exec/Values.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,19 @@ Values::Values(
operatorId,
values->id(),
"Values"),
roundsLeft_(values->repeatTimes()) {
valueNodes_(std::move(values)),
roundsLeft_(valueNodes_->repeatTimes()) {}

void Values::initialize() {
Operator::initialize();
VELOX_CHECK_NOT_NULL(valueNodes_);
VELOX_CHECK(values_.empty());
// Drop empty vectors. Operator::getOutput is expected to return nullptr or a
// non-empty vector.
values_.reserve(values->values().size());
for (auto& vector : values->values()) {
values_.reserve(valueNodes_->values().size());
for (auto& vector : valueNodes_->values()) {
if (vector->size() > 0) {
if (values->isParallelizable()) {
if (valueNodes_->isParallelizable()) {
// If this is parallelizable, copy the values to prevent Vectors from
// being shared across threads. Note that the contract in ValuesNode is
// that this should only be enabled for testing.
Expand All @@ -47,6 +53,8 @@ Values::Values(
}
}
}
// Drop the reference on the value nodes.
valueNodes_ = nullptr;
}

RowVectorPtr Values::getOutput() {
Expand Down
3 changes: 3 additions & 0 deletions velox/exec/Values.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ class Values : public SourceOperator {
DriverCtx* driverCtx,
std::shared_ptr<const core::ValuesNode> values);

void initialize() override;

RowVectorPtr getOutput() override;

BlockingReason isBlocked(ContinueFuture* /* unused */) override {
Expand Down Expand Up @@ -55,6 +57,7 @@ class Values : public SourceOperator {
}

private:
std::shared_ptr<const core::ValuesNode> valueNodes_;
std::vector<RowVectorPtr> values_;
int32_t current_ = 0;
size_t roundsLeft_ = 1;
Expand Down

0 comments on commit 8600c22

Please sign in to comment.