Skip to content

Commit

Permalink
[native] Fix the shuffle hanging problem when fetch data from presto …
Browse files Browse the repository at this point in the history
…java (coordinator)

Presto java worker doesn't set next token in response for a get data size request
and it simply returns the buffered data from the source. The Prestissimo worker
updates 'sequence_' with ack sequence encoded by next token and uses it as data
stream offset to fetch data from the source. If the next token str is not set in
the data response handler header, then ack sequence is set to zero which resets the
'sequence_' to zero, and the rollback of 'sequence_' can cause data hanging problem
when Prestissimo worker fetch data from a Presto java worker (which can only happen
in the case that Presto java worker run by the coordinator as some metadata operation
can only execute on the coordinator) from the reset 'sequence_'.

This PR fixes the issue by avoiding updating 'sequence_' if next token is not set
in data response handler.
  • Loading branch information
xiaoxmeng committed Dec 27, 2024
1 parent 03d5e88 commit be07fd9
Show file tree
Hide file tree
Showing 4 changed files with 300 additions and 60 deletions.
28 changes: 17 additions & 11 deletions presto-native-execution/presto_cpp/main/PrestoExchangeSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ void PrestoExchangeSource::processDataResponse(
VELOX_CHECK(
!headers->getIsChunked(),
"Chunked http transferring encoding is not supported.");
uint64_t contentLength =
const uint64_t contentLength =
atol(headers->getHeaders()
.getSingleOrEmpty(proxygen::HTTP_HEADER_CONTENT_LENGTH)
.c_str());
Expand All @@ -291,10 +291,19 @@ void PrestoExchangeSource::processDataResponse(
}
}

int64_t ackSequence =
atol(headers->getHeaders()
.getSingleOrEmpty(protocol::PRESTO_PAGE_NEXT_TOKEN_HEADER)
.c_str());
std::optional<int64_t> ackSequenceOpt;
const auto nextTokenStr = headers->getHeaders().getSingleOrEmpty(
protocol::PRESTO_PAGE_NEXT_TOKEN_HEADER);
if (!nextTokenStr.empty()) {
// NOTE: when get data size from Presto coordinator, it might not set next
// token so we shouldn't update 'sequence_' if it is empty. Otherwise,
// 'sequence_' gets reset and we can't fetch any data from the source with
// the rolled back 'sequence_'.
ackSequenceOpt = atol(nextTokenStr.c_str());
} else {
VELOX_CHECK_EQ(
contentLength, 0, "next token is not set in non-empty data response");
}

std::unique_ptr<exec::SerializedPage> page;
const bool empty = response->empty();
Expand Down Expand Up @@ -357,7 +366,9 @@ void PrestoExchangeSource::processDataResponse(
queue_->enqueueLocked(nullptr, queuePromises);
}

sequence_ = ackSequence;
if (ackSequenceOpt.has_value()) {
sequence_ = ackSequenceOpt.value();
}
requestPending_ = false;
requestPromise = std::move(promise_);
}
Expand Down Expand Up @@ -582,9 +593,4 @@ void PrestoExchangeSource::getMemoryUsage(
void PrestoExchangeSource::resetPeakMemoryUsage() {
peakQueuedMemoryBytes() = currQueuedMemoryBytes().load();
}

void PrestoExchangeSource::testingClearMemoryUsage() {
currQueuedMemoryBytes() = 0;
peakQueuedMemoryBytes() = 0;
}
} // namespace facebook::presto
21 changes: 10 additions & 11 deletions presto-native-execution/presto_cpp/main/PrestoExchangeSource.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@

namespace facebook::presto {

namespace test {
class PrestoExchangeSourceTestHelper;
};

class PrestoExchangeSource : public velox::exec::ExchangeSource {
public:
class RetryState {
Expand Down Expand Up @@ -59,13 +63,13 @@ class PrestoExchangeSource : public velox::exec::ExchangeSource {
}

private:
int64_t maxWaitMs_;
int64_t startMs_;
size_t numTries_{0};

static constexpr int64_t kMinBackoffMs = 100;
static constexpr int64_t kMaxBackoffMs = 10000;
static constexpr double kJitterParam = 0.1;

int64_t maxWaitMs_;
int64_t startMs_;
size_t numTries_{0};
};

PrestoExchangeSource(
Expand Down Expand Up @@ -153,10 +157,6 @@ class PrestoExchangeSource : public velox::exec::ExchangeSource {
return obj;
}

int testingFailedAttempts() const {
return failedAttempts_;
}

/// Invoked to track the node-wise memory usage queued in
/// PrestoExchangeSource. If 'updateBytes' > 0, then increment the usage,
/// otherwise decrement the usage.
Expand All @@ -172,9 +172,6 @@ class PrestoExchangeSource : public velox::exec::ExchangeSource {
/// intervals.
static void resetPeakMemoryUsage();

/// Used by test to clear the node-wise memory usage tracking.
static void testingClearMemoryUsage();

private:
void doRequest(
int64_t delayMs,
Expand Down Expand Up @@ -285,5 +282,7 @@ class PrestoExchangeSource : public velox::exec::ExchangeSource {
std::atomic_bool abortResultsIssued_{false};
velox::VeloxPromise<Response> promise_{
velox::VeloxPromise<Response>::makeEmpty()};

friend class test::PrestoExchangeSourceTestHelper;
};
} // namespace facebook::presto
2 changes: 1 addition & 1 deletion presto-native-execution/presto_cpp/main/TaskManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ void getData(
bytes += next->length();
iobuf->prev()->appendChain(std::move(next));
}
nextSequence++;
++nextSequence;
} else {
complete = true;
}
Expand Down
Loading

0 comments on commit be07fd9

Please sign in to comment.