From be07fd95d00f7a3a8d5efbf9cfda797da6461383 Mon Sep 17 00:00:00 2001 From: Xiaoxuan Meng Date: Wed, 25 Dec 2024 22:10:57 -0800 Subject: [PATCH] [native] Fix the shuffle hanging problem when fetch data from presto java (coordinator) Presto java worker doesn't set next token in response for a get data size request and it simply returns the buffered data from the source. The Prestissimo worker updates 'sequence_' with ack sequence encoded by next token and uses it as data stream offset to fetch data from the source. If the next token str is not set in the data response handler header, then ack sequence is set to zero which resets the 'sequence_' to zero, and the rollback of 'sequence_' can cause data hanging problem when Prestissimo worker fetch data from a Presto java worker (which can only happen in the case that Presto java worker run by the coordinator as some metadata operation can only execute on the coordinator) from the reset 'sequence_'. This PR fixes the issue by avoiding updating 'sequence_' if next token is not set in data response handler. --- .../presto_cpp/main/PrestoExchangeSource.cpp | 28 +- .../presto_cpp/main/PrestoExchangeSource.h | 21 +- .../presto_cpp/main/TaskManager.cpp | 2 +- .../main/tests/PrestoExchangeSourceTest.cpp | 309 +++++++++++++++--- 4 files changed, 300 insertions(+), 60 deletions(-) diff --git a/presto-native-execution/presto_cpp/main/PrestoExchangeSource.cpp b/presto-native-execution/presto_cpp/main/PrestoExchangeSource.cpp index aa7c1a798586c..e0263d8d8f243 100644 --- a/presto-native-execution/presto_cpp/main/PrestoExchangeSource.cpp +++ b/presto-native-execution/presto_cpp/main/PrestoExchangeSource.cpp @@ -265,7 +265,7 @@ void PrestoExchangeSource::processDataResponse( VELOX_CHECK( !headers->getIsChunked(), "Chunked http transferring encoding is not supported."); - uint64_t contentLength = + const uint64_t contentLength = atol(headers->getHeaders() .getSingleOrEmpty(proxygen::HTTP_HEADER_CONTENT_LENGTH) .c_str()); @@ -291,10 +291,19 @@ void PrestoExchangeSource::processDataResponse( } } - int64_t ackSequence = - atol(headers->getHeaders() - .getSingleOrEmpty(protocol::PRESTO_PAGE_NEXT_TOKEN_HEADER) - .c_str()); + std::optional ackSequenceOpt; + const auto nextTokenStr = headers->getHeaders().getSingleOrEmpty( + protocol::PRESTO_PAGE_NEXT_TOKEN_HEADER); + if (!nextTokenStr.empty()) { + // NOTE: when get data size from Presto coordinator, it might not set next + // token so we shouldn't update 'sequence_' if it is empty. Otherwise, + // 'sequence_' gets reset and we can't fetch any data from the source with + // the rolled back 'sequence_'. + ackSequenceOpt = atol(nextTokenStr.c_str()); + } else { + VELOX_CHECK_EQ( + contentLength, 0, "next token is not set in non-empty data response"); + } std::unique_ptr page; const bool empty = response->empty(); @@ -357,7 +366,9 @@ void PrestoExchangeSource::processDataResponse( queue_->enqueueLocked(nullptr, queuePromises); } - sequence_ = ackSequence; + if (ackSequenceOpt.has_value()) { + sequence_ = ackSequenceOpt.value(); + } requestPending_ = false; requestPromise = std::move(promise_); } @@ -582,9 +593,4 @@ void PrestoExchangeSource::getMemoryUsage( void PrestoExchangeSource::resetPeakMemoryUsage() { peakQueuedMemoryBytes() = currQueuedMemoryBytes().load(); } - -void PrestoExchangeSource::testingClearMemoryUsage() { - currQueuedMemoryBytes() = 0; - peakQueuedMemoryBytes() = 0; -} } // namespace facebook::presto diff --git a/presto-native-execution/presto_cpp/main/PrestoExchangeSource.h b/presto-native-execution/presto_cpp/main/PrestoExchangeSource.h index c6bb6bbaa2289..2d239a7a5a0e1 100644 --- a/presto-native-execution/presto_cpp/main/PrestoExchangeSource.h +++ b/presto-native-execution/presto_cpp/main/PrestoExchangeSource.h @@ -24,6 +24,10 @@ namespace facebook::presto { +namespace test { +class PrestoExchangeSourceTestHelper; +}; + class PrestoExchangeSource : public velox::exec::ExchangeSource { public: class RetryState { @@ -59,13 +63,13 @@ class PrestoExchangeSource : public velox::exec::ExchangeSource { } private: - int64_t maxWaitMs_; - int64_t startMs_; - size_t numTries_{0}; - static constexpr int64_t kMinBackoffMs = 100; static constexpr int64_t kMaxBackoffMs = 10000; static constexpr double kJitterParam = 0.1; + + int64_t maxWaitMs_; + int64_t startMs_; + size_t numTries_{0}; }; PrestoExchangeSource( @@ -153,10 +157,6 @@ class PrestoExchangeSource : public velox::exec::ExchangeSource { return obj; } - int testingFailedAttempts() const { - return failedAttempts_; - } - /// Invoked to track the node-wise memory usage queued in /// PrestoExchangeSource. If 'updateBytes' > 0, then increment the usage, /// otherwise decrement the usage. @@ -172,9 +172,6 @@ class PrestoExchangeSource : public velox::exec::ExchangeSource { /// intervals. static void resetPeakMemoryUsage(); - /// Used by test to clear the node-wise memory usage tracking. - static void testingClearMemoryUsage(); - private: void doRequest( int64_t delayMs, @@ -285,5 +282,7 @@ class PrestoExchangeSource : public velox::exec::ExchangeSource { std::atomic_bool abortResultsIssued_{false}; velox::VeloxPromise promise_{ velox::VeloxPromise::makeEmpty()}; + + friend class test::PrestoExchangeSourceTestHelper; }; } // namespace facebook::presto diff --git a/presto-native-execution/presto_cpp/main/TaskManager.cpp b/presto-native-execution/presto_cpp/main/TaskManager.cpp index 3c954c1d1ef75..926808f07a9af 100644 --- a/presto-native-execution/presto_cpp/main/TaskManager.cpp +++ b/presto-native-execution/presto_cpp/main/TaskManager.cpp @@ -176,7 +176,7 @@ void getData( bytes += next->length(); iobuf->prev()->appendChain(std::move(next)); } - nextSequence++; + ++nextSequence; } else { complete = true; } diff --git a/presto-native-execution/presto_cpp/main/tests/PrestoExchangeSourceTest.cpp b/presto-native-execution/presto_cpp/main/tests/PrestoExchangeSourceTest.cpp index 2ff251bba5eee..f3e54bead51a3 100644 --- a/presto-native-execution/presto_cpp/main/tests/PrestoExchangeSourceTest.cpp +++ b/presto-native-execution/presto_cpp/main/tests/PrestoExchangeSourceTest.cpp @@ -38,12 +38,34 @@ using namespace facebook::velox::memory; using namespace facebook::velox::common::testutil; using namespace testing; -int main(int argc, char** argv) { - testing::InitGoogleTest(&argc, argv); - folly::Init init{&argc, &argv}; - FLAGS_velox_memory_leak_check_enabled = true; - return RUN_ALL_TESTS(); -} +namespace facebook::presto::test { +class PrestoExchangeSourceTestHelper { + public: + PrestoExchangeSourceTestHelper(PrestoExchangeSource* exchangeSource) + : exchangeSource_(exchangeSource) {} + + uint64_t sequence() const { + return exchangeSource_->sequence_; + } + + int failedAttempts() const { + return exchangeSource_->failedAttempts_; + } + + // Clears the node-wise memory usage tracking. + static void clearMemoryUsage() { + PrestoExchangeSource::currQueuedMemoryBytes() = 0; + PrestoExchangeSource::peakQueuedMemoryBytes() = 0; + } + + bool atEnd() const { + return exchangeSource_->atEnd_; + } + + private: + PrestoExchangeSource* const exchangeSource_; +}; +} // namespace facebook::presto::test namespace { std::string getCertsPath(const std::string& fileName) { @@ -67,8 +89,10 @@ std::string getCertsPath(const std::string& fileName) { class Producer { public: explicit Producer( - std::function shouldFail = [](bool) { return false; }) - : shouldFail_(std::move(shouldFail)) {} + std::function shouldFail = [](bool) { return false; }, + bool dataResponseHasNoSequence = false) + : shouldFail_(std::move(shouldFail)), + dataResponseHasNoSequence_(dataResponseHasNoSequence) {} void registerEndpoints(http::HttpServer* server) { server->registerGet( @@ -76,8 +100,17 @@ class Producer { [this]( proxygen::HTTPMessage* message, const std::vector& pathMatch) { - return getResults(message, pathMatch); + return getResults(message, pathMatch, false); + }); + + server->registerHead( + R"(/v1/task/(.+)/results/([0-9]+)/([0-9]+))", + [this]( + proxygen::HTTPMessage* message, + const std::vector& pathMatch) { + return getResults(message, pathMatch, true); }); + server->registerGet( R"(/v1/task/(.+)/results/([0-9]+)/([0-9]+)/acknowledge)", [this]( @@ -85,6 +118,7 @@ class Producer { const std::vector& pathMatch) { return acknowledgeResults(message, pathMatch); }); + server->registerDelete( R"(/v1/task/(.+)/results/([0-9]+))", [this]( @@ -96,12 +130,13 @@ class Producer { proxygen::RequestHandler* getResults( proxygen::HTTPMessage* /*message*/, - const std::vector& pathMatch) { + const std::vector& pathMatch, + bool getDataSizeOnly) { protocol::TaskId taskId = pathMatch[1]; long sequence = std::stol(pathMatch[3]); return new http::CallbackRequestHandler( - [this, taskId, sequence]( + [this, taskId, sequence, getDataSizeOnly]( proxygen::HTTPMessage* message, const std::vector>& /*body*/, proxygen::ResponseHandler* downstream) { @@ -109,22 +144,58 @@ class Producer { return sendErrorResponse( downstream, "ERR\nConnection reset by peer", 500); } + + if (getDataSizeOnly) { + auto [remainingBytes, noMoreData] = getDataSize(sequence); + return sendResponse( + downstream, + taskId, + std::nullopt, + "", + remainingBytes, + noMoreData); + } + + std::optional sequenceOpt; + if (!dataResponseHasNoSequence_) { + sequenceOpt = sequence; + } if (sequence < this->startSequence_) { - return sendResponse(downstream, taskId, sequence, "", false); + auto [remainingBytes, noMoreData] = getDataSize(sequence); + return sendResponse( + downstream, + taskId, + sequenceOpt, + "", + remainingBytes, + noMoreData); } - auto [data, noMoreData] = getData(sequence); + + auto [data, remainingBytes, noMoreData] = getData(sequence); if (!data.empty() || noMoreData) { - sendResponse(downstream, taskId, sequence, data, noMoreData); + sendResponse( + downstream, + taskId, + sequenceOpt, + data, + remainingBytes, + noMoreData); } else { auto [promise, future] = folly::makePromiseContract(); std::move(future) .via(folly::EventBaseManager::get()->getEventBase()) - .thenValue([this, downstream, taskId, sequence]( + .thenValue([this, downstream, taskId, sequence, sequenceOpt]( bool /*value*/) { - auto [data, noMoreData] = getData(sequence); + auto [data, remainingBytes, noMoreData] = getData(sequence); VELOX_CHECK(!data.empty() || noMoreData); - sendResponse(downstream, taskId, sequence, data, noMoreData); + sendResponse( + downstream, + taskId, + sequenceOpt, + data, + remainingBytes, + noMoreData); }); promise_ = std::move(promise); @@ -239,20 +310,42 @@ class Producer { } private: - std::tuple getData(int64_t sequence) { + std::tuple getData(int64_t sequence) { std::string data; - bool noMoreData = false; + uint64_t remainingBytes{0}; + bool noMoreData{false}; { std::lock_guard l(mutex_); - auto index = sequence - startSequence_; - VELOX_CHECK_GE(index, 0); - if (queue_.size() > index) { - data = queue_[index]; + const auto getIndex = sequence - startSequence_; + VELOX_CHECK_GE(getIndex, 0); + if (queue_.size() > getIndex) { + data = queue_[getIndex]; + for (auto remainingIndex = getIndex + 1; remainingIndex < queue_.size(); + ++remainingIndex) { + remainingBytes += queue_[remainingIndex].size(); + } } else { noMoreData = noMoreData_; } } - return std::make_tuple(std::move(data), noMoreData); + return std::make_tuple(std::move(data), remainingBytes, noMoreData); + } + + std::tuple getDataSize(int64_t sequence) const { + uint64_t remainingBytes{0}; + bool noMoreData{false}; + { + std::lock_guard l(mutex_); + const auto startIndex = sequence - startSequence_; + VELOX_CHECK_GE(startIndex, 0); + for (auto index = startIndex; index < queue_.size(); ++index) { + remainingBytes += queue_[index].size(); + } + if (remainingBytes == 0) { + noMoreData = noMoreData_; + } + } + return std::make_tuple(remainingBytes, noMoreData); } void sendErrorResponse( @@ -268,19 +361,26 @@ class Producer { void sendResponse( proxygen::ResponseHandler* downstream, const protocol::TaskId& taskId, - int64_t sequence, + std::optional sequence, const std::string& data, + uint64_t remainingBytes, bool complete) { proxygen::ResponseBuilder builder(downstream); builder.status(http::kHttpOk, "OK") .header(protocol::PRESTO_TASK_INSTANCE_ID_HEADER, taskId) - .header(protocol::PRESTO_PAGE_TOKEN_HEADER, std::to_string(sequence)) - .header( - protocol::PRESTO_PAGE_NEXT_TOKEN_HEADER, - std::to_string(sequence + 1)) .header( protocol::PRESTO_BUFFER_COMPLETE_HEADER, complete ? "true" : "false"); + if (sequence.has_value()) { + builder.header( + protocol::PRESTO_PAGE_TOKEN_HEADER, std::to_string(sequence.value())); + builder.header( + protocol::PRESTO_PAGE_NEXT_TOKEN_HEADER, + std::to_string(sequence.value() + 1)); + } + builder.header( + protocol::PRESTO_BUFFER_REMAINING_BYTES_HEADER, + std::to_string(remainingBytes)); if (!data.empty()) { auto buffer = folly::IOBuf::create(4 + data.size()); int32_t dataSize = data.size(); @@ -297,15 +397,17 @@ class Producer { builder.sendWithEOM(); } + const std::function shouldFail_; + const bool dataResponseHasNoSequence_; + std::deque queue_; - std::mutex mutex_; + mutable std::mutex mutex_; bool noMoreData_ = false; int startSequence_ = 0; folly::Promise promise_ = folly::Promise::makeEmpty(); folly::Promise deleteResultsPromise_ = folly::Promise::makeEmpty(); bool receivedDeleteResults_ = false; - std::function shouldFail_; }; std::string toString(exec::SerializedPage* page) { @@ -448,12 +550,33 @@ class PrestoExchangeSourceTest : public ::testing::TestWithParam { void requestNextPage( const std::shared_ptr& queue, - const std::shared_ptr& exchangeSource) { + const std::shared_ptr& exchangeSource, + size_t maxWaitMs = 2'000) { { std::lock_guard l(queue->mutex()); - ASSERT_TRUE(exchangeSource->shouldRequestLocked()); + VELOX_CHECK(exchangeSource->shouldRequestLocked()); } - exchangeSource->request(1 << 20, std::chrono::seconds(2)); + exchangeSource->request(1 << 20, std::chrono::milliseconds(maxWaitMs)); + } + + void requestDataSize( + const std::shared_ptr& queue, + const std::shared_ptr& exchangeSource, + uint64_t& remainingBytes, + bool& atEnd) { + { + std::lock_guard l(queue->mutex()); + VELOX_CHECK(exchangeSource->shouldRequestLocked()); + } + const auto response = + exchangeSource->requestDataSizes(std::chrono::seconds(2)).get(); + remainingBytes = 0; + + atEnd = false; + for (auto bytes : response.remainingBytes) { + remainingBytes += bytes; + } + atEnd = response.atEnd; } std::shared_ptr pool_; @@ -516,6 +639,109 @@ TEST_P(PrestoExchangeSourceTest, basic) { ASSERT_EQ(stats.at("prestoExchangeSource.totalBytes").sum, totalBytes(pages)); } +TEST_P(PrestoExchangeSourceTest, getDataSize) { + const std::vector pages = {"page1 - xx", "page2 - xxxxx"}; + const auto useHttps = GetParam().useHttps; + auto producer = std::make_unique(); + + for (const auto& page : pages) { + producer->enqueue(page); + } + producer->noMoreData(); + + auto producerServer = createHttpServer(useHttps); + producer->registerEndpoints(producerServer.get()); + + test::HttpServerWrapper serverWrapper(std::move(producerServer)); + auto producerAddress = serverWrapper.start().get(); + + auto queue = makeSingleSourceQueue(); + + auto exchangeSource = makeExchangeSource(producerAddress, useHttps, 3, queue); + test::PrestoExchangeSourceTestHelper sourceHelper(exchangeSource.get()); + + // Get data size before fetch any data. + ASSERT_EQ(sourceHelper.sequence(), 0); + uint64_t remainingBytes; + bool atEnd; + requestDataSize(queue, exchangeSource, remainingBytes, atEnd); + ASSERT_EQ(remainingBytes, pages[0].size() + pages[1].size()); + ASSERT_FALSE(atEnd); + ASSERT_EQ(sourceHelper.sequence(), 0); + + // Get first page size. + requestNextPage(queue, exchangeSource); + waitForNextPage(queue); + + ASSERT_EQ(sourceHelper.sequence(), 1); + requestDataSize(queue, exchangeSource, remainingBytes, atEnd); + ASSERT_EQ(remainingBytes, pages[1].size()); + ASSERT_FALSE(atEnd); + ASSERT_EQ(sourceHelper.sequence(), 1); + + // Get second page size. + requestNextPage(queue, exchangeSource); + waitForNextPage(queue); + ASSERT_EQ(sourceHelper.sequence(), 2); + ASSERT_FALSE(sourceHelper.atEnd()); + + requestDataSize(queue, exchangeSource, remainingBytes, atEnd); + ASSERT_EQ(remainingBytes, 0); + + ASSERT_TRUE(atEnd); + ASSERT_EQ(sourceHelper.sequence(), 2); + + waitForEndMarker(queue); + + producer->waitForDeleteResults(); + exchangeCpuExecutor_->stop(); + serverWrapper.stop(); +} + +TEST_P(PrestoExchangeSourceTest, invalidDataResponseWithoutTokenSet) { + SystemConfig::instance()->setValue( + std::string(SystemConfig::kExchangeMaxErrorDuration), "2s"); + const std::vector pages = {"page1 - xx", "page2 - xxxxx"}; + const auto useHttps = GetParam().useHttps; + auto producer = std::make_unique([](bool) { return false; }, true); + + for (const auto& page : pages) { + producer->enqueue(page); + } + producer->noMoreData(); + + auto producerServer = createHttpServer(useHttps); + producer->registerEndpoints(producerServer.get()); + + test::HttpServerWrapper serverWrapper(std::move(producerServer)); + auto producerAddress = serverWrapper.start().get(); + + auto queue = makeSingleSourceQueue(); + + auto exchangeSource = makeExchangeSource(producerAddress, useHttps, 3, queue); + test::PrestoExchangeSourceTestHelper sourceHelper(exchangeSource.get()); + + // Get data size before fetch any data. + ASSERT_EQ(sourceHelper.sequence(), 0); + uint64_t remainingBytes; + bool atEnd; + requestDataSize(queue, exchangeSource, remainingBytes, atEnd); + ASSERT_EQ(remainingBytes, pages[0].size() + pages[1].size()); + ASSERT_FALSE(atEnd); + ASSERT_EQ(sourceHelper.sequence(), 0); + + requestNextPage(queue, exchangeSource, 10); + VELOX_ASSERT_THROW( + waitForNextPage(queue), + "next token is not set in non-empty data response"); + VELOX_ASSERT_THROW( + waitForEndMarker(queue), + "next token is not set in non-empty data response"); + + exchangeCpuExecutor_->stop(); + serverWrapper.stop(); +} + TEST_P(PrestoExchangeSourceTest, retryState) { PrestoExchangeSource::RetryState state(1000); ASSERT_FALSE(state.isExhausted()); @@ -567,12 +793,13 @@ TEST_P(PrestoExchangeSourceTest, retries) { auto queue = makeSingleSourceQueue(); auto exchangeSource = makeExchangeSource(producerAddress, useHttps, 3, queue); + test::PrestoExchangeSourceTestHelper sourceHelper(exchangeSource.get()); requestNextPage(queue, exchangeSource); { auto page = waitForNextPage(queue); ASSERT_EQ(toString(page.get()), pages[0]) << "at " << 0; - ASSERT_EQ(exchangeSource->testingFailedAttempts(), 3); + ASSERT_EQ(sourceHelper.failedAttempts(), 3); requestNextPage(queue, exchangeSource); } @@ -823,6 +1050,7 @@ DEBUG_ONLY_TEST_P( auto queue = makeSingleSourceQueue(); auto exchangeSource = makeExchangeSource(producerAddress, useHttps, 3, queue, leafPool.get()); + test::PrestoExchangeSourceTestHelper sourceHelper(exchangeSource.get()); requestNextPage(queue, exchangeSource); const std::string payload(1 << 20, 'L'); @@ -838,7 +1066,7 @@ DEBUG_ONLY_TEST_P( if (immediateBufferTransfer) { // Verify that we have retried on memory allocation failure of the http // response data other than just failing the query. - ASSERT_GE(exchangeSource->testingFailedAttempts(), 1); + ASSERT_GE(sourceHelper.failedAttempts(), 1); } ASSERT_EQ(leafPool->usedBytes(), 0); } @@ -852,7 +1080,7 @@ TEST_P(PrestoExchangeSourceTest, memoryAllocationAndUsageCheck) { for (const auto resetPeak : resetPeaks) { SCOPED_TRACE(fmt::format("resetPeak {}", resetPeak)); - PrestoExchangeSource::testingClearMemoryUsage(); + test::PrestoExchangeSourceTestHelper::clearMemoryUsage(); auto rootPool = memory::MemoryManager::getInstance()->addRootPool(); auto leafPool = rootPool->addLeafChild("memoryAllocationAndUsageCheck"); @@ -1054,3 +1282,10 @@ INSTANTIATE_TEST_CASE_P( Params{true, false, false, 2, 10}, Params{false, false, true, 2, 10}, Params{false, false, false, 2, 10})); + +int main(int argc, char** argv) { + testing::InitGoogleTest(&argc, argv); + folly::Init init{&argc, &argv}; + FLAGS_velox_memory_leak_check_enabled = true; + return RUN_ALL_TESTS(); +}