From be6493d9702371fce15af565245e2a3a5c85ff44 Mon Sep 17 00:00:00 2001 From: Xiaoxuan Meng Date: Tue, 8 Oct 2024 15:59:50 -0700 Subject: [PATCH] Improve spill related logging Summary: Improve spill related logging by always printing warning log if a spillable operator is under non-reclaimable section. Add runtime stats to record if a spillable operator doesn't support spill in certain config like window operator doesn't support to spill without partitioning keys to ease debug query OOM. Differential Revision: D64075306 --- velox/docs/monitoring/stats.rst | 5 +++ velox/exec/HashBuild.cpp | 33 +++++++++---------- velox/exec/HashProbe.cpp | 57 +++++++++++++++++---------------- velox/exec/Operator.h | 5 +++ velox/exec/Window.cpp | 5 +++ velox/exec/tests/WindowTest.cpp | 44 +++++++++++++++++++++++++ 6 files changed, 104 insertions(+), 45 deletions(-) diff --git a/velox/docs/monitoring/stats.rst b/velox/docs/monitoring/stats.rst index a878547925eb..fbabecbf5269 100644 --- a/velox/docs/monitoring/stats.rst +++ b/velox/docs/monitoring/stats.rst @@ -116,6 +116,11 @@ These stats are reported by operators that support spilling. * - Stats - Unit - Description + * - spillNotSupported + - nanos + - The number of a spillable operators that don't support spill because of + spill limitation. For instance, a window operator do not support spill + if there is no partitioning. * - spillFillWallNanos - nanos - The time spent on filling rows for spilling. diff --git a/velox/exec/HashBuild.cpp b/velox/exec/HashBuild.cpp index d3d095c5447c..09058c2bdcba 100644 --- a/velox/exec/HashBuild.cpp +++ b/velox/exec/HashBuild.cpp @@ -220,9 +220,9 @@ void HashBuild::setupSpiller(SpillPartition* spillPartition) { // out of memory if the restored partition still can't fit in memory. if (config->exceedSpillLevelLimit(startPartitionBit)) { RECORD_METRIC_VALUE(kMetricMaxSpillLevelExceededCount); - FB_LOG_EVERY_MS(WARNING, 1'000) - << "Exceeded spill level limit: " << config->maxSpillLevel - << ", and disable spilling for memory pool: " << pool()->name(); + LOG(WARNING) << "Exceeded spill level limit: " << config->maxSpillLevel + << ", and disable spilling for memory pool: " + << pool()->name(); ++spillStats_.wlock()->spillMaxLevelExceededCount; exceededMaxSpillLevelLimit_ = true; return; @@ -1076,15 +1076,14 @@ void HashBuild::reclaim( // TODO: reduce the log frequency if it is too verbose. RECORD_METRIC_VALUE(kMetricMemoryNonReclaimableCount); ++stats.numNonReclaimableAttempts; - FB_LOG_EVERY_MS(WARNING, 1'000) - << "Can't reclaim from hash build operator, state_[" - << stateName(state_) << "], nonReclaimableSection_[" - << nonReclaimableSection_ << "], spiller_[" - << (stateCleared_ - ? "cleared" - : (spiller_->finalized() ? "finalized" : "non-finalized")) - << "] " << pool()->name() - << ", usage: " << succinctBytes(pool()->usedBytes()); + LOG(WARNING) << "Can't reclaim from hash build operator, state_[" + << stateName(state_) << "], nonReclaimableSection_[" + << nonReclaimableSection_ << "], spiller_[" + << (stateCleared_ ? "cleared" + : (spiller_->finalized() ? "finalized" + : "non-finalized")) + << "] " << pool()->name() + << ", usage: " << succinctBytes(pool()->usedBytes()); return; } @@ -1100,11 +1099,11 @@ void HashBuild::reclaim( // TODO: reduce the log frequency if it is too verbose. RECORD_METRIC_VALUE(kMetricMemoryNonReclaimableCount); ++stats.numNonReclaimableAttempts; - FB_LOG_EVERY_MS(WARNING, 1'000) - << "Can't reclaim from hash build operator, state_[" - << stateName(buildOp->state_) << "], nonReclaimableSection_[" - << buildOp->nonReclaimableSection_ << "], " << buildOp->pool()->name() - << ", usage: " << succinctBytes(buildOp->pool()->usedBytes()); + LOG(WARNING) << "Can't reclaim from hash build operator, state_[" + << stateName(buildOp->state_) << "], nonReclaimableSection_[" + << buildOp->nonReclaimableSection_ << "], " + << buildOp->pool()->name() << ", usage: " + << succinctBytes(buildOp->pool()->usedBytes()); return; } } diff --git a/velox/exec/HashProbe.cpp b/velox/exec/HashProbe.cpp index 3ae563d6f6fa..3b9dc93f1c78 100644 --- a/velox/exec/HashProbe.cpp +++ b/velox/exec/HashProbe.cpp @@ -1696,19 +1696,19 @@ void HashProbe::reclaim( if (nonReclaimableState()) { RECORD_METRIC_VALUE(kMetricMemoryNonReclaimableCount); ++stats.numNonReclaimableAttempts; - FB_LOG_EVERY_MS(WARNING, 1'000) - << "Can't reclaim from hash probe operator, state_[" - << ProbeOperatorState(state_) << "], nonReclaimableSection_[" - << nonReclaimableSection_ << "], inputSpiller_[" - << (inputSpiller_ == nullptr ? "nullptr" : "initialized") - << "], table_[" << (table_ == nullptr ? "nullptr" : "initialized") - << "], table_ numDistinct[" - << (table_ == nullptr ? "nullptr" - : std::to_string(table_->numDistinct())) - << "], " << pool()->name() - << ", usage: " << succinctBytes(pool()->usedBytes()) - << ", node pool reservation: " - << succinctBytes(pool()->parent()->reservedBytes()); + LOG(WARNING) << "Can't reclaim from hash probe operator, state_[" + << ProbeOperatorState(state_) << "], nonReclaimableSection_[" + << nonReclaimableSection_ << "], inputSpiller_[" + << (inputSpiller_ == nullptr ? "nullptr" : "initialized") + << "], table_[" + << (table_ == nullptr ? "nullptr" : "initialized") + << "], table_ numDistinct[" + << (table_ == nullptr ? "nullptr" + : std::to_string(table_->numDistinct())) + << "], " << pool()->name() + << ", usage: " << succinctBytes(pool()->usedBytes()) + << ", node pool reservation: " + << succinctBytes(pool()->parent()->reservedBytes()); return; } @@ -1723,21 +1723,22 @@ void HashProbe::reclaim( RECORD_METRIC_VALUE(kMetricMemoryNonReclaimableCount); ++stats.numNonReclaimableAttempts; const auto* peerPool = probeOp->pool(); - FB_LOG_EVERY_MS(WARNING, 1'000) - << "Can't reclaim from hash probe operator, state_[" - << ProbeOperatorState(probeOp->state_) << "], nonReclaimableSection_[" - << probeOp->nonReclaimableSection_ << "], inputSpiller_[" - << (probeOp->inputSpiller_ == nullptr ? "nullptr" : "initialized") - << "], table_[" - << (probeOp->table_ == nullptr ? "nullptr" : "initialized") - << "], table_ numDistinct[" - << (probeOp->table_ == nullptr - ? "nullptr" - : std::to_string(probeOp->table_->numDistinct())) - << "], " << peerPool->name() - << ", usage: " << succinctBytes(peerPool->usedBytes()) - << ", node pool reservation: " - << succinctBytes(peerPool->parent()->reservedBytes()); + LOG(WARNING) << "Can't reclaim from hash probe operator, state_[" + << ProbeOperatorState(probeOp->state_) + << "], nonReclaimableSection_[" + << probeOp->nonReclaimableSection_ << "], inputSpiller_[" + << (probeOp->inputSpiller_ == nullptr ? "nullptr" + : "initialized") + << "], table_[" + << (probeOp->table_ == nullptr ? "nullptr" : "initialized") + << "], table_ numDistinct[" + << (probeOp->table_ == nullptr + ? "nullptr" + : std::to_string(probeOp->table_->numDistinct())) + << "], " << peerPool->name() + << ", usage: " << succinctBytes(peerPool->usedBytes()) + << ", node pool reservation: " + << succinctBytes(peerPool->parent()->reservedBytes()); return; } hasMoreProbeInput |= !probeOp->noMoreSpillInput_; diff --git a/velox/exec/Operator.h b/velox/exec/Operator.h index bd6888423b32..20561b3e788d 100644 --- a/velox/exec/Operator.h +++ b/velox/exec/Operator.h @@ -335,6 +335,11 @@ class Operator : public BaseRuntimeStatWriter { /// The name of the runtime spill stats collected and reported by operators /// that support spilling. + + /// This indicates the spill not supported for a spillable operator when the + /// spill config is enabled. This is due to the spill limitation in certain + /// plan node config such as unpartition window operator. + static inline const std::string kSpillNotSupported{"spillNotSupported"}; /// The spill write stats. static inline const std::string kSpillFillTime{"spillFillWallNanos"}; static inline const std::string kSpillSortTime{"spillSortWallNanos"}; diff --git a/velox/exec/Window.cpp b/velox/exec/Window.cpp index 6f60f9fcf5e3..ec1fc1fb1aef 100644 --- a/velox/exec/Window.cpp +++ b/velox/exec/Window.cpp @@ -41,6 +41,11 @@ Window::Window( stringAllocator_(pool()) { auto* spillConfig = spillConfig_.has_value() ? &spillConfig_.value() : nullptr; + if (spillConfig == nullptr && + operatorCtx_->driverCtx()->queryConfig().windowSpillEnabled()) { + auto lockedStats = stats_.wlock(); + lockedStats->runtimeStats.emplace(kSpillNotSupported, RuntimeMetric(1)); + } if (windowNode->inputsSorted()) { if (supportRowsStreaming()) { windowBuild_ = std::make_unique( diff --git a/velox/exec/tests/WindowTest.cpp b/velox/exec/tests/WindowTest.cpp index 16b7294913a0..d9b315ee5bf0 100644 --- a/velox/exec/tests/WindowTest.cpp +++ b/velox/exec/tests/WindowTest.cpp @@ -81,6 +81,50 @@ TEST_F(WindowTest, spill) { ASSERT_GT(stats.spilledPartitions, 0); } +TEST_F(WindowTest, spillUnsupported) { + const vector_size_t size = 1'000; + auto data = makeRowVector( + {"d", "p", "s"}, + { + // Payload. + makeFlatVector(size, [](auto row) { return row; }), + // Partition key. + makeFlatVector(size, [](auto row) { return row % 11; }), + // Sorting key. + makeFlatVector(size, [](auto row) { return row; }), + }); + + createDuckDbTable({data}); + + core::PlanNodeId windowId; + auto plan = PlanBuilder() + .values(split(data, 10)) + .window({"row_number() over (order by s)"}) + .capturePlanNodeId(windowId) + .planNode(); + + auto spillDirectory = TempDirectoryPath::create(); + TestScopedSpillInjection scopedSpillInjection(100); + auto task = + AssertQueryBuilder(plan, duckDbQueryRunner_) + .config(core::QueryConfig::kPreferredOutputBatchBytes, "1024") + .config(core::QueryConfig::kSpillEnabled, "true") + .config(core::QueryConfig::kWindowSpillEnabled, "true") + .spillDirectory(spillDirectory->getPath()) + .assertResults("SELECT *, row_number() over (order by s) FROM tmp"); + + auto taskStats = exec::toPlanStats(task->taskStats()); + const auto& stats = taskStats.at(windowId); + + ASSERT_EQ(stats.spilledBytes, 0); + ASSERT_EQ(stats.spilledRows, 0); + ASSERT_EQ(stats.spilledFiles, 0); + ASSERT_EQ(stats.spilledPartitions, 0); + auto opStats = toOperatorStats(task->taskStats()); + ASSERT_GT( + opStats.at("Window").runtimeStats[Operator::kSpillNotSupported].sum, 1); +} + TEST_F(WindowTest, rowBasedStreamingWindowOOM) { const vector_size_t size = 1'000'000; auto data = makeRowVector(