From 8f0adb1fa6a2553b58f0360224418eb7c5306608 Mon Sep 17 00:00:00 2001 From: duanmeng Date: Sun, 24 Mar 2024 00:15:38 -0700 Subject: [PATCH] Consolidate SpillStats (#9211) Summary: Decouple spill stats from the spiller as row number and hash probe spilling might use more than one and different spillers. Consolidate to use one spill stats to collect the spill stats to streamline implementation. This PR introduces a synchronized spill stats within the operator to gather these stats and later on we could separate them for different types of spiller if offline analysis needs. Pull Request resolved: https://github.com/facebookincubator/velox/pull/9211 Reviewed By: tanjialiang Differential Revision: D55287100 Pulled By: xiaoxmeng fbshipit-source-id: ffde57d4f3425e3f3f679252504f7690e8dfce68 --- velox/connectors/hive/HiveDataSink.cpp | 11 ++-- velox/connectors/hive/HiveDataSink.h | 4 +- velox/dwio/common/SortingWriter.cpp | 11 +--- velox/dwio/common/SortingWriter.h | 4 +- velox/exec/GroupingSet.cpp | 18 +++++-- velox/exec/GroupingSet.h | 5 +- velox/exec/HashAggregation.cpp | 14 +---- velox/exec/HashAggregation.h | 4 -- velox/exec/HashBuild.cpp | 24 ++------- velox/exec/HashBuild.h | 3 -- velox/exec/HashProbe.cpp | 14 ++--- velox/exec/HashProbe.h | 2 - velox/exec/Operator.cpp | 52 ++++++++++--------- velox/exec/Operator.h | 4 +- velox/exec/OrderBy.cpp | 12 +---- velox/exec/OrderBy.h | 4 -- velox/exec/RowNumber.cpp | 8 +-- velox/exec/SortBuffer.cpp | 12 +++-- velox/exec/SortBuffer.h | 12 ++--- velox/exec/SortWindowBuild.cpp | 9 ++-- velox/exec/SortWindowBuild.h | 4 +- velox/exec/Spiller.cpp | 44 ++++++++++------ velox/exec/Spiller.h | 21 +++++--- velox/exec/TableWriter.cpp | 3 +- velox/exec/TopNRowNumber.cpp | 4 +- velox/exec/Window.cpp | 6 +-- .../tests/AggregateSpillBenchmarkBase.cpp | 11 ++-- .../exec/tests/AggregateSpillBenchmarkBase.h | 2 +- velox/exec/tests/HashJoinTest.cpp | 2 +- .../tests/JoinSpillInputBenchmarkBase.cpp | 3 +- velox/exec/tests/OrderByTest.cpp | 5 +- velox/exec/tests/SortBufferTest.cpp | 45 ++++++++-------- velox/exec/tests/SpillerBenchmarkBase.h | 1 + velox/exec/tests/SpillerTest.cpp | 21 +++++--- 34 files changed, 196 insertions(+), 203 deletions(-) diff --git a/velox/connectors/hive/HiveDataSink.cpp b/velox/connectors/hive/HiveDataSink.cpp index 89436ce81640..549c4edbcd42 100644 --- a/velox/connectors/hive/HiveDataSink.cpp +++ b/velox/connectors/hive/HiveDataSink.cpp @@ -510,8 +510,9 @@ DataSink::Stats HiveDataSink::stats() const { for (int i = 0; i < writerInfo_.size(); ++i) { const auto& info = writerInfo_.at(i); VELOX_CHECK_NOT_NULL(info); - if (!info->spillStats->empty()) { - stats.spillStats += *info->spillStats; + const auto spillStats = info->spillStats->rlock(); + if (!spillStats->empty()) { + stats.spillStats += *spillStats; } } return stats; @@ -719,15 +720,15 @@ HiveDataSink::maybeCreateBucketSortWriter( sortCompareFlags_, sortPool, writerInfo_.back()->nonReclaimableSectionHolder.get(), - spillConfig_); + spillConfig_, + writerInfo_.back()->spillStats.get()); return std::make_unique( std::move(writer), std::move(sortBuffer), hiveConfig_->sortWriterMaxOutputRows( connectorQueryCtx_->sessionProperties()), hiveConfig_->sortWriterMaxOutputBytes( - connectorQueryCtx_->sessionProperties()), - writerInfo_.back()->spillStats.get()); + connectorQueryCtx_->sessionProperties())); } void HiveDataSink::splitInputRowsAndEnsureWriters() { diff --git a/velox/connectors/hive/HiveDataSink.h b/velox/connectors/hive/HiveDataSink.h index 7990bb8acb3a..ad22ad2b109a 100644 --- a/velox/connectors/hive/HiveDataSink.h +++ b/velox/connectors/hive/HiveDataSink.h @@ -355,7 +355,7 @@ struct HiveWriterInfo { std::shared_ptr _sortPool) : writerParameters(std::move(parameters)), nonReclaimableSectionHolder(new tsan_atomic(false)), - spillStats(new common::SpillStats()), + spillStats(std::make_unique>()), writerPool(std::move(_writerPool)), sinkPool(std::move(_sinkPool)), sortPool(std::move(_sortPool)) {} @@ -364,7 +364,7 @@ struct HiveWriterInfo { const std::unique_ptr> nonReclaimableSectionHolder; /// Collects the spill stats from sort writer if the spilling has been /// triggered. - const std::unique_ptr spillStats; + const std::unique_ptr> spillStats; const std::shared_ptr writerPool; const std::shared_ptr sinkPool; const std::shared_ptr sortPool; diff --git a/velox/dwio/common/SortingWriter.cpp b/velox/dwio/common/SortingWriter.cpp index 1409ad810c8f..83f81d67a72c 100644 --- a/velox/dwio/common/SortingWriter.cpp +++ b/velox/dwio/common/SortingWriter.cpp @@ -22,18 +22,15 @@ SortingWriter::SortingWriter( std::unique_ptr writer, std::unique_ptr sortBuffer, uint32_t maxOutputRowsConfig, - uint64_t maxOutputBytesConfig, - velox::common::SpillStats* spillStats) + uint64_t maxOutputBytesConfig) : outputWriter_(std::move(writer)), maxOutputRowsConfig_(maxOutputRowsConfig), maxOutputBytesConfig_(maxOutputBytesConfig), sortPool_(sortBuffer->pool()), canReclaim_(sortBuffer->canSpill()), - spillStats_(spillStats), sortBuffer_(std::move(sortBuffer)) { VELOX_CHECK_GT(maxOutputRowsConfig_, 0); VELOX_CHECK_GT(maxOutputBytesConfig_, 0); - VELOX_CHECK_NOT_NULL(spillStats_); if (sortPool_->parent()->reclaimer() != nullptr) { sortPool_->setReclaimer(MemoryReclaimer::create(this)); } @@ -64,11 +61,7 @@ void SortingWriter::close() { outputWriter_->write(output); output = sortBuffer_->getOutput(maxOutputBatchRows); } - auto spillStatsOr = sortBuffer_->spilledStats(); - if (spillStatsOr.has_value()) { - VELOX_CHECK(canReclaim_); - *spillStats_ = spillStatsOr.value(); - } + sortBuffer_.reset(); sortPool_->release(); outputWriter_->close(); diff --git a/velox/dwio/common/SortingWriter.h b/velox/dwio/common/SortingWriter.h index 8dcab763b092..c73574b334f9 100644 --- a/velox/dwio/common/SortingWriter.h +++ b/velox/dwio/common/SortingWriter.h @@ -29,8 +29,7 @@ class SortingWriter : public Writer { std::unique_ptr writer, std::unique_ptr sortBuffer, uint32_t maxOutputRowsConfig, - uint64_t maxOutputBytesConfig, - velox::common::SpillStats* spillStats); + uint64_t maxOutputBytesConfig); ~SortingWriter() override; @@ -81,7 +80,6 @@ class SortingWriter : public Writer { const uint64_t maxOutputBytesConfig_; memory::MemoryPool* const sortPool_; const bool canReclaim_; - velox::common::SpillStats* const spillStats_; std::unique_ptr sortBuffer_; }; diff --git a/velox/exec/GroupingSet.cpp b/velox/exec/GroupingSet.cpp index b1e54eed605f..4c0ef7843cea 100644 --- a/velox/exec/GroupingSet.cpp +++ b/velox/exec/GroupingSet.cpp @@ -52,7 +52,8 @@ GroupingSet::GroupingSet( const std::optional& groupIdChannel, const common::SpillConfig* spillConfig, tsan_atomic* nonReclaimableSection, - OperatorCtx* operatorCtx) + OperatorCtx* operatorCtx, + folly::Synchronized* spillStats) : preGroupedKeyChannels_(std::move(preGroupedKeys)), hashers_(std::move(hashers)), isGlobal_(hashers_.empty()), @@ -69,7 +70,8 @@ GroupingSet::GroupingSet( stringAllocator_(operatorCtx->pool()), rows_(operatorCtx->pool()), isAdaptive_(queryConfig_.hashAdaptivityEnabled()), - pool_(*operatorCtx->pool()) { + pool_(*operatorCtx->pool()), + spillStats_(spillStats) { VELOX_CHECK_NOT_NULL(nonReclaimableSection_); VELOX_CHECK(pool_.trackUsage()); for (auto& hasher : hashers_) { @@ -131,7 +133,8 @@ std::unique_ptr GroupingSet::createForMarkDistinct( /*groupIdColumn*/ std::nullopt, /*spillConfig*/ nullptr, nonReclaimableSection, - operatorCtx); + operatorCtx, + /*spillStats_*/ nullptr); }; namespace { @@ -939,7 +942,8 @@ void GroupingSet::spill() { makeSpillType(), rows->keyTypes().size(), std::vector(), - spillConfig_); + spillConfig_, + spillStats_); } spiller_->spill(); if (sortedAggregations_) { @@ -958,7 +962,11 @@ void GroupingSet::spill(const RowContainerIterator& rowIterator) { auto* rows = table_->rows(); VELOX_CHECK(pool_.trackUsage()); spiller_ = std::make_unique( - Spiller::Type::kAggregateOutput, rows, makeSpillType(), spillConfig_); + Spiller::Type::kAggregateOutput, + rows, + makeSpillType(), + spillConfig_, + spillStats_); spiller_->spill(rowIterator); table_->clear(); diff --git a/velox/exec/GroupingSet.h b/velox/exec/GroupingSet.h index bf489c4621af..8cbbe8ff99bc 100644 --- a/velox/exec/GroupingSet.h +++ b/velox/exec/GroupingSet.h @@ -40,7 +40,8 @@ class GroupingSet { const std::optional& groupIdChannel, const common::SpillConfig* spillConfig, tsan_atomic* nonReclaimableSection, - OperatorCtx* operatorCtx); + OperatorCtx* operatorCtx, + folly::Synchronized* spillStats); ~GroupingSet(); @@ -359,6 +360,8 @@ class GroupingSet { // Temporary for case where an aggregate in toIntermediate() outputs post-init // state of aggregate for all rows. std::vector firstGroup_; + + folly::Synchronized* const spillStats_; }; } // namespace facebook::velox::exec diff --git a/velox/exec/HashAggregation.cpp b/velox/exec/HashAggregation.cpp index 0e9c12690c9b..cdc6f5238e21 100644 --- a/velox/exec/HashAggregation.cpp +++ b/velox/exec/HashAggregation.cpp @@ -106,7 +106,8 @@ void HashAggregation::initialize() { groupIdChannel, spillConfig_.has_value() ? &spillConfig_.value() : nullptr, &nonReclaimableSection_, - operatorCtx_.get()); + operatorCtx_.get(), + &spillStats_); aggregationNode_.reset(); } @@ -188,13 +189,6 @@ void HashAggregation::updateRuntimeStats() { RuntimeMetric(hashTableStats.numTombstones); } -void HashAggregation::recordSpillStats() { - auto spillStatsOr = groupingSet_->spilledStats(); - if (spillStatsOr.has_value()) { - Operator::recordSpillStats(spillStatsOr.value()); - } -} - void HashAggregation::prepareOutput(vector_size_t size) { if (output_) { VectorPtr output = std::move(output_); @@ -388,7 +382,6 @@ void HashAggregation::noMoreInput() { updateEstimatedOutputRowSize(); groupingSet_->noMoreInput(); Operator::noMoreInput(); - recordSpillStats(); // Release the extra reserved memory right after processing all the inputs. pool()->release(); } @@ -429,9 +422,6 @@ void HashAggregation::reclaim( // Spill all the rows starting from the next output row pointed by // 'resultIterator_'. groupingSet_->spill(resultIterator_); - // NOTE: we will only spill once during the output processing stage so - // record stats here. - recordSpillStats(); } else { // TODO: support fine-grain disk spilling based on 'targetBytes' after // having row container memory compaction support later. diff --git a/velox/exec/HashAggregation.h b/velox/exec/HashAggregation.h index 771dd2ca809d..1bf28c43428a 100644 --- a/velox/exec/HashAggregation.h +++ b/velox/exec/HashAggregation.h @@ -72,10 +72,6 @@ class HashAggregation : public Operator { RowVectorPtr getDistinctOutput(); - // Invoked to record the spilling stats in operator stats after processing all - // the inputs. - void recordSpillStats(); - void updateEstimatedOutputRowSize(); std::shared_ptr aggregationNode_; diff --git a/velox/exec/HashBuild.cpp b/velox/exec/HashBuild.cpp index a4c8dfc5c8b4..fff744ed7528 100644 --- a/velox/exec/HashBuild.cpp +++ b/velox/exec/HashBuild.cpp @@ -231,9 +231,11 @@ void HashBuild::setupSpiller(SpillPartition* spillPartition) { << spillConfig.maxSpillLevel << ", and disable spilling for memory pool: " << pool()->name(); + ++spillStats_.wlock()->spillMaxLevelExceededCount; exceededMaxSpillLevelLimit_ = true; return; } + exceededMaxSpillLevelLimit_ = false; hashBits = HashBitRange(startBit, startBit + spillConfig.numPartitionBits); } @@ -243,7 +245,8 @@ void HashBuild::setupSpiller(SpillPartition* spillPartition) { table_->rows(), spillType_, std::move(hashBits), - &spillConfig); + &spillConfig, + &spillStats_); const int32_t numPartitions = spiller_->hashBits().numPartitions(); spillInputIndicesBuffers_.resize(numPartitions); @@ -732,7 +735,6 @@ bool HashBuild::finishHashBuild() { } if (spiller != nullptr) { spiller->finishSpill(spillPartitions); - build->recordSpillStats(spiller.get()); } } @@ -740,7 +742,6 @@ bool HashBuild::finishHashBuild() { spiller_->finishSpill(spillPartitions); removeEmptyPartitions(spillPartitions); } - recordSpillStats(); // TODO: re-enable parallel join build with spilling triggered after // https://github.com/facebookincubator/velox/issues/3567 is fixed. @@ -765,23 +766,6 @@ bool HashBuild::finishHashBuild() { return true; } -void HashBuild::recordSpillStats() { - recordSpillStats(spiller_.get()); -} - -void HashBuild::recordSpillStats(Spiller* spiller) { - if (spiller != nullptr) { - const auto spillStats = spiller->stats(); - VELOX_CHECK_EQ(spillStats.spillSortTimeUs, 0); - Operator::recordSpillStats(spillStats); - } else if (exceededMaxSpillLevelLimit_) { - exceededMaxSpillLevelLimit_ = false; - common::SpillStats spillStats; - spillStats.spillMaxLevelExceededCount = 1; - Operator::recordSpillStats(spillStats); - } -} - void HashBuild::ensureTableFits(uint64_t numRows) { // NOTE: we don't need memory reservation if all the partitions have been // spilled as nothing need to be built. diff --git a/velox/exec/HashBuild.h b/velox/exec/HashBuild.h index 42bf6633bb15..11691597278e 100644 --- a/velox/exec/HashBuild.h +++ b/velox/exec/HashBuild.h @@ -117,9 +117,6 @@ class HashBuild final : public Operator { return canReclaim(); } - void recordSpillStats(); - void recordSpillStats(Spiller* spiller); - // Indicates if the input is read from spill data or not. bool isInputFromSpill() const; diff --git a/velox/exec/HashProbe.cpp b/velox/exec/HashProbe.cpp index 023f0dd4db56..1f2f8d91ed9d 100644 --- a/velox/exec/HashProbe.cpp +++ b/velox/exec/HashProbe.cpp @@ -253,7 +253,8 @@ void HashProbe::maybeSetupSpillInput( spillInputPartitionIds_.begin()->partitionBitOffset(), spillInputPartitionIds_.begin()->partitionBitOffset() + spillConfig.numPartitionBits), - &spillConfig); + &spillConfig, + &spillStats_); // Set the spill partitions to the corresponding ones at the build side. The // hash probe operator itself won't trigger any spilling. spiller_->setPartitionsSpilled(toPartitionNumSet(spillInputPartitionIds_)); @@ -1382,7 +1383,8 @@ void HashProbe::noMoreInputInternal() { VELOX_CHECK_EQ( spillInputPartitionIds_.size(), spiller_->spilledPartitionSet().size()); spiller_->finishSpill(spillPartitionSet_); - recordSpillStats(); + VELOX_CHECK_EQ(spillStats_.rlock()->spillSortTimeUs, 0); + VELOX_CHECK_EQ(spillStats_.rlock()->spillFillTimeUs, 0); } const bool hasSpillData = hasMoreSpillData(); @@ -1412,14 +1414,6 @@ void HashProbe::noMoreInputInternal() { lastProber_ = true; } -void HashProbe::recordSpillStats() { - VELOX_CHECK_NOT_NULL(spiller_); - const auto spillStats = spiller_->stats(); - VELOX_CHECK_EQ(spillStats.spillSortTimeUs, 0); - VELOX_CHECK_EQ(spillStats.spillFillTimeUs, 0); - Operator::recordSpillStats(spillStats); -} - bool HashProbe::isFinished() { return state_ == ProbeOperatorState::kFinish; } diff --git a/velox/exec/HashProbe.h b/velox/exec/HashProbe.h index 3f54f84e77a6..3c35a8365e36 100644 --- a/velox/exec/HashProbe.h +++ b/velox/exec/HashProbe.h @@ -220,8 +220,6 @@ class HashProbe : public Operator { // next hash table from the spilled data. void noMoreInputInternal(); - void recordSpillStats(); - // Returns the index of the 'match' column in the output for semi project // joins. VectorPtr& matchColumn() const { diff --git a/velox/exec/Operator.cpp b/velox/exec/Operator.cpp index e37404421f4c..e74049d43230 100644 --- a/velox/exec/Operator.cpp +++ b/velox/exec/Operator.cpp @@ -295,78 +295,80 @@ void Operator::recordBlockingTime(uint64_t start, BlockingReason reason) { fmt::format("blocked{}Times", blockReason), RuntimeCounter(1)); } -void Operator::recordSpillStats(const common::SpillStats& spillStats) { +void Operator::recordSpillStats() { + const auto lockedSpillStats = spillStats_.wlock(); auto lockedStats = stats_.wlock(); - lockedStats->spilledInputBytes += spillStats.spilledInputBytes; - lockedStats->spilledBytes += spillStats.spilledBytes; - lockedStats->spilledRows += spillStats.spilledRows; - lockedStats->spilledPartitions += spillStats.spilledPartitions; - lockedStats->spilledFiles += spillStats.spilledFiles; - if (spillStats.spillFillTimeUs != 0) { + lockedStats->spilledInputBytes += lockedSpillStats->spilledInputBytes; + lockedStats->spilledBytes += lockedSpillStats->spilledBytes; + lockedStats->spilledRows += lockedSpillStats->spilledRows; + lockedStats->spilledPartitions += lockedSpillStats->spilledPartitions; + lockedStats->spilledFiles += lockedSpillStats->spilledFiles; + if (lockedSpillStats->spillFillTimeUs != 0) { lockedStats->addRuntimeStat( "spillFillTime", RuntimeCounter{ static_cast( - spillStats.spillFillTimeUs * + lockedSpillStats->spillFillTimeUs * Timestamp::kNanosecondsInMicrosecond), RuntimeCounter::Unit::kNanos}); } - if (spillStats.spillSortTimeUs != 0) { + if (lockedSpillStats->spillSortTimeUs != 0) { lockedStats->addRuntimeStat( "spillSortTime", RuntimeCounter{ static_cast( - spillStats.spillSortTimeUs * + lockedSpillStats->spillSortTimeUs * Timestamp::kNanosecondsInMicrosecond), RuntimeCounter::Unit::kNanos}); } - if (spillStats.spillSerializationTimeUs != 0) { + if (lockedSpillStats->spillSerializationTimeUs != 0) { lockedStats->addRuntimeStat( "spillSerializationTime", RuntimeCounter{ static_cast( - spillStats.spillSerializationTimeUs * + lockedSpillStats->spillSerializationTimeUs * Timestamp::kNanosecondsInMicrosecond), RuntimeCounter::Unit::kNanos}); } - if (spillStats.spillFlushTimeUs != 0) { + if (lockedSpillStats->spillFlushTimeUs != 0) { lockedStats->addRuntimeStat( "spillFlushTime", RuntimeCounter{ static_cast( - spillStats.spillFlushTimeUs * + lockedSpillStats->spillFlushTimeUs * Timestamp::kNanosecondsInMicrosecond), RuntimeCounter::Unit::kNanos}); } - if (spillStats.spillWrites != 0) { + if (lockedSpillStats->spillWrites != 0) { lockedStats->addRuntimeStat( "spillWrites", - RuntimeCounter{static_cast(spillStats.spillWrites)}); + RuntimeCounter{static_cast(lockedSpillStats->spillWrites)}); } - if (spillStats.spillWriteTimeUs != 0) { + if (lockedSpillStats->spillWriteTimeUs != 0) { lockedStats->addRuntimeStat( "spillWriteTime", RuntimeCounter{ static_cast( - spillStats.spillWriteTimeUs * + lockedSpillStats->spillWriteTimeUs * Timestamp::kNanosecondsInMicrosecond), RuntimeCounter::Unit::kNanos}); } - if (spillStats.spillRuns != 0) { + if (lockedSpillStats->spillRuns != 0) { lockedStats->addRuntimeStat( "spillRuns", - RuntimeCounter{static_cast(spillStats.spillRuns)}); - common::updateGlobalSpillRunStats(spillStats.spillRuns); + RuntimeCounter{static_cast(lockedSpillStats->spillRuns)}); + common::updateGlobalSpillRunStats(lockedSpillStats->spillRuns); } - if (spillStats.spillMaxLevelExceededCount != 0) { + if (lockedSpillStats->spillMaxLevelExceededCount != 0) { lockedStats->addRuntimeStat( "exceededMaxSpillLevel", - RuntimeCounter{ - static_cast(spillStats.spillMaxLevelExceededCount)}); + RuntimeCounter{static_cast( + lockedSpillStats->spillMaxLevelExceededCount)}); common::updateGlobalMaxSpillLevelExceededCount( - spillStats.spillMaxLevelExceededCount); + lockedSpillStats->spillMaxLevelExceededCount); } + lockedSpillStats->reset(); } std::string Operator::toString() const { diff --git a/velox/exec/Operator.h b/velox/exec/Operator.h index 2b40df4d00b4..c98e1a58deef 100644 --- a/velox/exec/Operator.h +++ b/velox/exec/Operator.h @@ -420,6 +420,7 @@ class Operator : public BaseRuntimeStatWriter { virtual void close() { input_ = nullptr; results_.clear(); + recordSpillStats(); // Release the unused memory reservation on close. operatorCtx_->pool()->release(); } @@ -694,7 +695,7 @@ class Operator : public BaseRuntimeStatWriter { std::optional averageRowSize = std::nullopt) const; /// Invoked to record spill stats in operator stats. - void recordSpillStats(const common::SpillStats& spillStats); + virtual void recordSpillStats(); const std::unique_ptr operatorCtx_; const RowTypePtr outputType_; @@ -705,6 +706,7 @@ class Operator : public BaseRuntimeStatWriter { bool initialized_{false}; folly::Synchronized stats_; + folly::Synchronized spillStats_; /// Indicates if an operator is under a non-reclaimable execution section. /// This prevents the memory arbitrator from reclaiming memory from this diff --git a/velox/exec/OrderBy.cpp b/velox/exec/OrderBy.cpp index 4d9a96955359..59b1bc43a0bc 100644 --- a/velox/exec/OrderBy.cpp +++ b/velox/exec/OrderBy.cpp @@ -65,7 +65,8 @@ OrderBy::OrderBy( sortCompareFlags, pool(), &nonReclaimableSection_, - spillConfig_.has_value() ? &(spillConfig_.value()) : nullptr); + spillConfig_.has_value() ? &(spillConfig_.value()) : nullptr, + &spillStats_); } void OrderBy::addInput(RowVectorPtr input) { @@ -90,7 +91,6 @@ void OrderBy::noMoreInput() { Operator::noMoreInput(); sortBuffer_->noMoreInput(); maxOutputRows_ = outputBatchRows(sortBuffer_->estimateOutputRowSize()); - recordSpillStats(); } RowVectorPtr OrderBy::getOutput() { @@ -107,12 +107,4 @@ void OrderBy::close() { Operator::close(); sortBuffer_.reset(); } - -void OrderBy::recordSpillStats() { - VELOX_CHECK_NOT_NULL(sortBuffer_); - auto spillStats = sortBuffer_->spilledStats(); - if (spillStats.has_value()) { - Operator::recordSpillStats(spillStats.value()); - } -} } // namespace facebook::velox::exec diff --git a/velox/exec/OrderBy.h b/velox/exec/OrderBy.h index 850eda574207..d75315094d9a 100644 --- a/velox/exec/OrderBy.h +++ b/velox/exec/OrderBy.h @@ -63,10 +63,6 @@ class OrderBy : public Operator { void close() override; private: - // Invoked to record the spilling stats in operator stats after processing all - // the inputs. - void recordSpillStats(); - std::unique_ptr sortBuffer_; bool finished_ = false; uint32_t maxOutputRows_; diff --git a/velox/exec/RowNumber.cpp b/velox/exec/RowNumber.cpp index 7e12b26c220c..e3bb28d00f31 100644 --- a/velox/exec/RowNumber.cpp +++ b/velox/exec/RowNumber.cpp @@ -116,7 +116,6 @@ void RowNumber::noMoreInput() { if (inputSpiller_ != nullptr) { inputSpiller_->finishSpill(spillInputPartitionSet_); - recordSpillStats(inputSpiller_->stats()); removeEmptyPartitions(spillInputPartitionSet_); restoreNextSpillPartition(); } @@ -390,11 +389,11 @@ SpillPartitionNumSet RowNumber::spillHashTable() { table_->rows(), tableType, spillPartitionBits_, - &spillConfig); + &spillConfig, + &spillStats_); hashTableSpiller->spill(); hashTableSpiller->finishSpill(spillHashTablePartitionSet_); - recordSpillStats(hashTableSpiller->stats()); table_->clear(); pool()->release(); @@ -412,7 +411,8 @@ void RowNumber::setupInputSpiller( Spiller::Type::kHashJoinProbe, inputType_, spillPartitionBits_, - &spillConfig); + &spillConfig, + &spillStats_); inputSpiller_->setPartitionsSpilled(spillPartitionSet); const auto& hashers = table_->hashers(); diff --git a/velox/exec/SortBuffer.cpp b/velox/exec/SortBuffer.cpp index f9358f99f3b7..5379a600d9f7 100644 --- a/velox/exec/SortBuffer.cpp +++ b/velox/exec/SortBuffer.cpp @@ -25,12 +25,14 @@ SortBuffer::SortBuffer( const std::vector& sortCompareFlags, velox::memory::MemoryPool* pool, tsan_atomic* nonReclaimableSection, - const common::SpillConfig* spillConfig) + const common::SpillConfig* spillConfig, + folly::Synchronized* spillStats) : input_(input), sortCompareFlags_(sortCompareFlags), pool_(pool), nonReclaimableSection_(nonReclaimableSection), - spillConfig_(spillConfig) { + spillConfig_(spillConfig), + spillStats_(spillStats) { VELOX_CHECK_GE(input_->size(), sortCompareFlags_.size()); VELOX_CHECK_GT(sortCompareFlags_.size(), 0); VELOX_CHECK_EQ(sortColumnIndices.size(), sortCompareFlags_.size()); @@ -258,7 +260,8 @@ void SortBuffer::spillInput() { spillerStoreType_, data_->keyTypes().size(), sortCompareFlags_, - spillConfig_); + spillConfig_, + spillStats_); } spiller_->spill(); data_->clear(); @@ -278,7 +281,8 @@ void SortBuffer::spillOutput() { Spiller::Type::kOrderByOutput, data_.get(), spillerStoreType_, - spillConfig_); + spillConfig_, + spillStats_); auto spillRows = std::vector( sortedRows_.begin() + numOutputRows_, sortedRows_.end()); spiller_->spill(spillRows); diff --git a/velox/exec/SortBuffer.h b/velox/exec/SortBuffer.h index ca9930e3e95e..473fe0e7e9b0 100644 --- a/velox/exec/SortBuffer.h +++ b/velox/exec/SortBuffer.h @@ -36,7 +36,8 @@ class SortBuffer { const std::vector& sortCompareFlags, velox::memory::MemoryPool* pool, tsan_atomic* nonReclaimableSection, - const common::SpillConfig* spillConfig = nullptr); + const common::SpillConfig* spillConfig = nullptr, + folly::Synchronized* spillStats = nullptr); void addInput(const VectorPtr& input); @@ -61,14 +62,6 @@ class SortBuffer { return pool_; } - /// Returns the spiller stats including total bytes and rows spilled so far. - std::optional spilledStats() const { - if (spiller_ == nullptr) { - return std::nullopt; - } - return spiller_->stats(); - } - std::optional estimateOutputRowSize() const; private: @@ -95,6 +88,7 @@ class SortBuffer { // execution section or not. tsan_atomic* const nonReclaimableSection_; const common::SpillConfig* const spillConfig_; + folly::Synchronized* const spillStats_; // The column projection map between 'input_' and 'spillerStoreType_' as sort // buffer stores the sort columns first in 'data_'. diff --git a/velox/exec/SortWindowBuild.cpp b/velox/exec/SortWindowBuild.cpp index 2c86ae062575..86f3ee458edb 100644 --- a/velox/exec/SortWindowBuild.cpp +++ b/velox/exec/SortWindowBuild.cpp @@ -43,12 +43,14 @@ SortWindowBuild::SortWindowBuild( const std::shared_ptr& node, velox::memory::MemoryPool* pool, const common::SpillConfig* spillConfig, - tsan_atomic* nonReclaimableSection) + tsan_atomic* nonReclaimableSection, + folly::Synchronized* spillStats) : WindowBuild(node, pool, spillConfig, nonReclaimableSection), numPartitionKeys_{node->partitionKeys().size()}, spillCompareFlags_{ makeSpillCompareFlags(numPartitionKeys_, node->sortingOrders())}, - pool_(pool) { + pool_(pool), + spillStats_(spillStats) { VELOX_CHECK_NOT_NULL(pool_); allKeyInfo_.reserve(partitionKeyInfo_.size() + sortKeyInfo_.size()); allKeyInfo_.insert( @@ -145,7 +147,8 @@ void SortWindowBuild::setupSpiller() { inputType_, spillCompareFlags_.size(), spillCompareFlags_, - spillConfig_); + spillConfig_, + spillStats_); } void SortWindowBuild::spill() { diff --git a/velox/exec/SortWindowBuild.h b/velox/exec/SortWindowBuild.h index bc73de0117b8..645949ddb7e0 100644 --- a/velox/exec/SortWindowBuild.h +++ b/velox/exec/SortWindowBuild.h @@ -30,7 +30,8 @@ class SortWindowBuild : public WindowBuild { const std::shared_ptr& node, velox::memory::MemoryPool* pool, const common::SpillConfig* spillConfig, - tsan_atomic* nonReclaimableSection); + tsan_atomic* nonReclaimableSection, + folly::Synchronized* spillStats); bool needsInput() override { // No partitions are available yet, so can consume input rows. @@ -85,6 +86,7 @@ class SortWindowBuild : public WindowBuild { const std::vector spillCompareFlags_; memory::MemoryPool* const pool_; + folly::Synchronized* const spillStats_; // allKeyInfo_ is a combination of (partitionKeyInfo_ and sortKeyInfo_). // It is used to perform a full sorting of the input rows to be able to diff --git a/velox/exec/Spiller.cpp b/velox/exec/Spiller.cpp index d2dc3fdc9e47..cd63c5545a2a 100644 --- a/velox/exec/Spiller.cpp +++ b/velox/exec/Spiller.cpp @@ -39,7 +39,8 @@ Spiller::Spiller( RowTypePtr rowType, int32_t numSortingKeys, const std::vector& sortCompareFlags, - const common::SpillConfig* spillConfig) + const common::SpillConfig* spillConfig, + folly::Synchronized* spillStats) : Spiller( type, container, @@ -56,7 +57,8 @@ Spiller::Spiller( spillConfig->compressionKind, spillConfig->executor, spillConfig->maxSpillRunRows, - spillConfig->fileCreateConfig) { + spillConfig->fileCreateConfig, + spillStats) { VELOX_CHECK( type_ == Type::kOrderByInput || type_ == Type::kAggregateInput, "Unexpected spiller type: {}", @@ -69,7 +71,8 @@ Spiller::Spiller( Type type, RowContainer* container, RowTypePtr rowType, - const common::SpillConfig* spillConfig) + const common::SpillConfig* spillConfig, + folly::Synchronized* spillStats) : Spiller( type, container, @@ -86,7 +89,8 @@ Spiller::Spiller( spillConfig->compressionKind, spillConfig->executor, spillConfig->maxSpillRunRows, - spillConfig->fileCreateConfig) { + spillConfig->fileCreateConfig, + spillStats) { VELOX_CHECK( type_ == Type::kAggregateOutput || type_ == Type::kOrderByOutput, "Unexpected spiller type: {}", @@ -99,7 +103,8 @@ Spiller::Spiller( Type type, RowTypePtr rowType, HashBitRange bits, - const common::SpillConfig* spillConfig) + const common::SpillConfig* spillConfig, + folly::Synchronized* spillStats) : Spiller( type, nullptr, @@ -116,7 +121,8 @@ Spiller::Spiller( spillConfig->compressionKind, spillConfig->executor, 0, - spillConfig->fileCreateConfig) { + spillConfig->fileCreateConfig, + spillStats) { VELOX_CHECK_EQ( type_, Type::kHashJoinProbe, @@ -130,7 +136,8 @@ Spiller::Spiller( RowContainer* container, RowTypePtr rowType, HashBitRange bits, - const common::SpillConfig* spillConfig) + const common::SpillConfig* spillConfig, + folly::Synchronized* spillStats) : Spiller( type, container, @@ -147,7 +154,8 @@ Spiller::Spiller( spillConfig->compressionKind, spillConfig->executor, spillConfig->maxSpillRunRows, - spillConfig->fileCreateConfig) { + spillConfig->fileCreateConfig, + spillStats) { VELOX_CHECK_EQ(type_, Type::kHashJoinBuild); VELOX_CHECK(isHashJoinTableSpillType(rowType_, joinType)); } @@ -157,7 +165,8 @@ Spiller::Spiller( RowContainer* container, RowTypePtr rowType, HashBitRange bits, - const common::SpillConfig* spillConfig) + const common::SpillConfig* spillConfig, + folly::Synchronized* spillStats) : Spiller( type, container, @@ -174,7 +183,8 @@ Spiller::Spiller( spillConfig->compressionKind, spillConfig->executor, spillConfig->maxSpillRunRows, - spillConfig->fileCreateConfig) { + spillConfig->fileCreateConfig, + spillStats) { VELOX_CHECK_EQ(type_, Type::kRowNumber); } @@ -194,7 +204,8 @@ Spiller::Spiller( common::CompressionKind compressionKind, folly::Executor* executor, uint64_t maxSpillRunRows, - const std::string& fileCreateConfig) + const std::string& fileCreateConfig, + folly::Synchronized* spillStats) : type_(type), container_(container), executor_(executor), @@ -202,6 +213,7 @@ Spiller::Spiller( rowType_(std::move(rowType)), spillProbedFlag_(recordProbedFlag), maxSpillRunRows_(maxSpillRunRows), + spillStats_(spillStats), state_( getSpillDirPathCb, updateAndCheckSpillLimitCb, @@ -213,7 +225,7 @@ Spiller::Spiller( writeBufferSize, compressionKind, memory::spillMemoryPool(), - &stats_, + spillStats, fileCreateConfig) { TestValue::adjust( "facebook::velox::exec::Spiller", const_cast(&bits_)); @@ -419,7 +431,7 @@ std::unique_ptr Spiller::writeSpill(int32_t partition) { } void Spiller::runSpill(bool lastRun) { - ++stats_.wlock()->spillRuns; + ++spillStats_->wlock()->spillRuns; VELOX_CHECK(type_ != Spiller::Type::kOrderByOutput || lastRun); std::vector>> writes; @@ -481,12 +493,12 @@ void Spiller::runSpill(bool lastRun) { } void Spiller::updateSpillFillTime(uint64_t timeUs) { - stats_.wlock()->spillFillTimeUs += timeUs; + spillStats_->wlock()->spillFillTimeUs += timeUs; common::updateGlobalSpillFillTime(timeUs); } void Spiller::updateSpillSortTime(uint64_t timeUs) { - stats_.wlock()->spillSortTimeUs += timeUs; + spillStats_->wlock()->spillSortTimeUs += timeUs; common::updateGlobalSpillSortTime(timeUs); } @@ -705,6 +717,6 @@ std::string Spiller::typeName(Type type) { } common::SpillStats Spiller::stats() const { - return stats_.copy(); + return spillStats_->copy(); } } // namespace facebook::velox::exec diff --git a/velox/exec/Spiller.h b/velox/exec/Spiller.h index f78c080ffc1c..aecce215dc70 100644 --- a/velox/exec/Spiller.h +++ b/velox/exec/Spiller.h @@ -59,21 +59,24 @@ class Spiller { RowTypePtr rowType, int32_t numSortingKeys, const std::vector& sortCompareFlags, - const common::SpillConfig* spillConfig); + const common::SpillConfig* spillConfig, + folly::Synchronized* spillStats); /// type == Type::kAggregateOutput || type == Type::kOrderByOutput Spiller( Type type, RowContainer* container, RowTypePtr rowType, - const common::SpillConfig* spillConfig); + const common::SpillConfig* spillConfig, + folly::Synchronized* spillStats); /// type == Type::kHashJoinProbe Spiller( Type type, RowTypePtr rowType, HashBitRange bits, - const common::SpillConfig* spillConfig); + const common::SpillConfig* spillConfig, + folly::Synchronized* spillStats); /// type == Type::kHashJoinBuild Spiller( @@ -82,7 +85,8 @@ class Spiller { RowContainer* container, RowTypePtr rowType, HashBitRange bits, - const common::SpillConfig* spillConfig); + const common::SpillConfig* spillConfig, + folly::Synchronized* spillStats); /// type == Type::kRowNumber Spiller( @@ -90,7 +94,8 @@ class Spiller { RowContainer* container, RowTypePtr rowType, HashBitRange bits, - const common::SpillConfig* spillConfig); + const common::SpillConfig* spillConfig, + folly::Synchronized* spillStats); Type type() const { return type_; @@ -204,7 +209,8 @@ class Spiller { common::CompressionKind compressionKind, folly::Executor* executor, uint64_t maxSpillRunRows, - const std::string& fileCreateConfig); + const std::string& fileCreateConfig, + folly::Synchronized* spillStats); // Invoked to spill. If 'startRowIter' is not null, then we only spill rows // from row container starting at the offset pointed by 'startRowIter'. @@ -316,13 +322,14 @@ class Spiller { const bool spillProbedFlag_; const uint64_t maxSpillRunRows_; + folly::Synchronized* const spillStats_; + // True if all rows of spilling partitions are in 'spillRuns_', so // that one can start reading these back. This means that the rows // that are not written out and deleted will be captured by // spillMergeStreamOverRows(). bool finalized_{false}; - folly::Synchronized stats_; SpillState state_; // Collects the rows to spill for each partition. diff --git a/velox/exec/TableWriter.cpp b/velox/exec/TableWriter.cpp index a40611c37112..c8f923af1276 100644 --- a/velox/exec/TableWriter.cpp +++ b/velox/exec/TableWriter.cpp @@ -255,11 +255,12 @@ void TableWriter::updateStats(const connector::DataSink::Stats& stats) { "numWrittenFiles", RuntimeCounter(stats.numWrittenFiles)); } if (!stats.spillStats.empty()) { - recordSpillStats(stats.spillStats); + *spillStats_.wlock() += stats.spillStats; } } void TableWriter::close() { + Operator::close(); if (!closed_) { // Abort the data sink if the query has already failed and no need for // regular close. diff --git a/velox/exec/TopNRowNumber.cpp b/velox/exec/TopNRowNumber.cpp index 2b7b1cbfd884..5b35a11476ce 100644 --- a/velox/exec/TopNRowNumber.cpp +++ b/velox/exec/TopNRowNumber.cpp @@ -288,7 +288,6 @@ void TopNRowNumber::noMoreInput() { VELOX_CHECK_NULL(merge_); auto spillPartition = spiller_->finishSpill(); merge_ = spillPartition.createOrderedReader(pool()); - recordSpillStats(spiller_->stats()); } else { outputRows_.resize(outputBatchSize_); } @@ -748,6 +747,7 @@ void TopNRowNumber::setupSpiller() { inputType_, spillCompareFlags_.size(), spillCompareFlags_, - &spillConfig_.value()); + &spillConfig_.value(), + &spillStats_); } } // namespace facebook::velox::exec diff --git a/velox/exec/Window.cpp b/velox/exec/Window.cpp index dd8707e7c1cd..6e09973edf60 100644 --- a/velox/exec/Window.cpp +++ b/velox/exec/Window.cpp @@ -45,7 +45,7 @@ Window::Window( windowNode, pool(), spillConfig, &nonReclaimableSection_); } else { windowBuild_ = std::make_unique( - windowNode, pool(), spillConfig, &nonReclaimableSection_); + windowNode, pool(), spillConfig, &nonReclaimableSection_, &spillStats_); } } @@ -241,10 +241,6 @@ void Window::createPeerAndFrameBuffers() { void Window::noMoreInput() { Operator::noMoreInput(); windowBuild_->noMoreInput(); - - if (auto spillStats = windowBuild_->spilledStats()) { - recordSpillStats(spillStats.value()); - } } void Window::callResetPartition() { diff --git a/velox/exec/tests/AggregateSpillBenchmarkBase.cpp b/velox/exec/tests/AggregateSpillBenchmarkBase.cpp index 4c8fbcc82793..3218cda4431c 100644 --- a/velox/exec/tests/AggregateSpillBenchmarkBase.cpp +++ b/velox/exec/tests/AggregateSpillBenchmarkBase.cpp @@ -124,7 +124,7 @@ void AggregateSpillBenchmarkBase::writeSpillData() { } } -std::unique_ptr AggregateSpillBenchmarkBase::makeSpiller() const { +std::unique_ptr AggregateSpillBenchmarkBase::makeSpiller() { common::SpillConfig spillConfig; spillConfig.getSpillDirPathCb = [&]() -> const std::string& { return spillDir_; @@ -145,11 +145,16 @@ std::unique_ptr AggregateSpillBenchmarkBase::makeSpiller() const { rowType_, rowContainer_->keyTypes().size(), std::vector{}, - &spillConfig); + &spillConfig, + &spillStats_); } else { // TODO: Add config flag to control the max spill rows. return std::make_unique( - spillerType_, rowContainer_.get(), rowType_, &spillConfig); + spillerType_, + rowContainer_.get(), + rowType_, + &spillConfig, + &spillStats_); } } } // namespace facebook::velox::exec::test diff --git a/velox/exec/tests/AggregateSpillBenchmarkBase.h b/velox/exec/tests/AggregateSpillBenchmarkBase.h index fe1769dc5579..15b3bb853d66 100644 --- a/velox/exec/tests/AggregateSpillBenchmarkBase.h +++ b/velox/exec/tests/AggregateSpillBenchmarkBase.h @@ -32,7 +32,7 @@ class AggregateSpillBenchmarkBase : public SpillerBenchmarkBase { private: void writeSpillData(); - std::unique_ptr makeSpiller() const; + std::unique_ptr makeSpiller(); const Spiller::Type spillerType_; std::unique_ptr rowContainer_; diff --git a/velox/exec/tests/HashJoinTest.cpp b/velox/exec/tests/HashJoinTest.cpp index abca47ad7f22..93e60c178440 100644 --- a/velox/exec/tests/HashJoinTest.cpp +++ b/velox/exec/tests/HashJoinTest.cpp @@ -6299,7 +6299,7 @@ DEBUG_ONLY_TEST_F(HashJoinTest, exceededMaxSpillLevel) { .operatorStats.back() .runtimeStats; ASSERT_EQ(joinStats["exceededMaxSpillLevel"].sum, 8); - ASSERT_EQ(joinStats["exceededMaxSpillLevel"].count, 8); + ASSERT_EQ(joinStats["exceededMaxSpillLevel"].count, 1); }) .run(); ASSERT_EQ( diff --git a/velox/exec/tests/JoinSpillInputBenchmarkBase.cpp b/velox/exec/tests/JoinSpillInputBenchmarkBase.cpp index 66627e145771..3488a95b6f30 100644 --- a/velox/exec/tests/JoinSpillInputBenchmarkBase.cpp +++ b/velox/exec/tests/JoinSpillInputBenchmarkBase.cpp @@ -49,7 +49,8 @@ void JoinSpillInputBenchmarkBase::setUp() { exec::Spiller::Type::kHashJoinProbe, rowType_, HashBitRange{29, 29}, - &spillConfig); + &spillConfig, + &spillStats_); spiller_->setPartitionsSpilled({0}); } diff --git a/velox/exec/tests/OrderByTest.cpp b/velox/exec/tests/OrderByTest.cpp index 992fa8abb0d3..6cc77ec538cd 100644 --- a/velox/exec/tests/OrderByTest.cpp +++ b/velox/exec/tests/OrderByTest.cpp @@ -1034,8 +1034,9 @@ DEBUG_ONLY_TEST_F(OrderByTest, reclaimDuringOutputProcessing) { taskThread.join(); auto stats = task->taskStats().pipelineStats; - ASSERT_EQ(stats[0].operatorStats[1].spilledBytes, 0); - ASSERT_EQ(stats[0].operatorStats[1].spilledPartitions, 0); + ASSERT_TRUE(!enableSpilling || stats[0].operatorStats[1].spilledBytes > 0); + ASSERT_TRUE( + !enableSpilling || stats[0].operatorStats[1].spilledPartitions > 0); OperatorTestBase::deleteTaskAndCheckSpillDirectory(task); } ASSERT_EQ(reclaimerStats_.numNonReclaimableAttempts, 0); diff --git a/velox/exec/tests/SortBufferTest.cpp b/velox/exec/tests/SortBufferTest.cpp index f965652651ac..9b6a285755a9 100644 --- a/velox/exec/tests/SortBufferTest.cpp +++ b/velox/exec/tests/SortBufferTest.cpp @@ -301,13 +301,15 @@ TEST_F(SortBufferTest, batchOutput) { 0, 0, "none"); + folly::Synchronized spillStats; auto sortBuffer = std::make_unique( inputType_, sortColumnIndices_, sortCompareFlags_, pool_.get(), &nonReclaimableSection_, - testData.triggerSpill ? &spillConfig : nullptr); + testData.triggerSpill ? &spillConfig : nullptr, + &spillStats); ASSERT_EQ(sortBuffer->canSpill(), testData.triggerSpill); const std::shared_ptr fuzzerPool = @@ -324,8 +326,6 @@ TEST_F(SortBufferTest, batchOutput) { totalNumInput += inputRows; } sortBuffer->noMoreInput(); - auto spillStats = sortBuffer->spilledStats(); - int expectedOutputBufferIndex = 0; RowVectorPtr output = sortBuffer->getOutput(testData.maxOutputRows); while (output != nullptr) { @@ -336,14 +336,14 @@ TEST_F(SortBufferTest, batchOutput) { } if (!testData.triggerSpill) { - ASSERT_FALSE(spillStats.has_value()); + ASSERT_TRUE(spillStats.rlock()->empty()); } else { - ASSERT_TRUE(spillStats.has_value()); - ASSERT_GT(spillStats->spilledRows, 0); - ASSERT_LE(spillStats->spilledRows, totalNumInput); - ASSERT_GT(spillStats->spilledBytes, 0); - ASSERT_EQ(spillStats->spilledPartitions, 1); - ASSERT_GT(spillStats->spilledFiles, 0); + ASSERT_FALSE(spillStats.rlock()->empty()); + ASSERT_GT(spillStats.rlock()->spilledRows, 0); + ASSERT_LE(spillStats.rlock()->spilledRows, totalNumInput); + ASSERT_GT(spillStats.rlock()->spilledBytes, 0); + ASSERT_EQ(spillStats.rlock()->spilledPartitions, 1); + ASSERT_GT(spillStats.rlock()->spilledFiles, 0); } } } @@ -396,13 +396,15 @@ TEST_F(SortBufferTest, spill) { 0, 0, "none"); + folly::Synchronized spillStats; auto sortBuffer = std::make_unique( inputType_, sortColumnIndices_, sortCompareFlags_, pool_.get(), &nonReclaimableSection_, - testData.spillEnabled ? &spillConfig : nullptr); + testData.spillEnabled ? &spillConfig : nullptr, + &spillStats); const std::shared_ptr fuzzerPool = memory::memoryManager()->addLeafPool("spillSource"); @@ -420,23 +422,22 @@ TEST_F(SortBufferTest, spill) { totalNumInput += 1024; } sortBuffer->noMoreInput(); - const auto spillStats = sortBuffer->spilledStats(); if (!testData.spillTriggered) { - ASSERT_FALSE(spillStats.has_value()); + ASSERT_TRUE(spillStats.rlock()->empty()); if (!testData.spillEnabled) { VELOX_ASSERT_THROW(sortBuffer->spill(), "spill config is null"); } } else { - ASSERT_TRUE(spillStats.has_value()); - ASSERT_GT(spillStats->spilledRows, 0); - ASSERT_LE(spillStats->spilledRows, totalNumInput); - ASSERT_GT(spillStats->spilledBytes, 0); - ASSERT_EQ(spillStats->spilledPartitions, 1); + ASSERT_FALSE(spillStats.rlock()->empty()); + ASSERT_GT(spillStats.rlock()->spilledRows, 0); + ASSERT_LE(spillStats.rlock()->spilledRows, totalNumInput); + ASSERT_GT(spillStats.rlock()->spilledBytes, 0); + ASSERT_EQ(spillStats.rlock()->spilledPartitions, 1); // SortBuffer shall not respect maxFileSize. Total files should be num // addInput() calls minus one which is the first one that has nothing to // spill. - ASSERT_EQ(spillStats->spilledFiles, 3); + ASSERT_EQ(spillStats.rlock()->spilledFiles, 3); sortBuffer.reset(); ASSERT_EQ(memory::spillMemoryPool()->stats().currentBytes, 0); if (memory::spillMemoryPool()->trackUsage()) { @@ -456,13 +457,15 @@ TEST_F(SortBufferTest, emptySpill) { SCOPED_TRACE(fmt::format("hasPostSpillData {}", hasPostSpillData)); auto spillDirectory = exec::test::TempDirectoryPath::create(); auto spillConfig = getSpillConfig(spillDirectory->path); + folly::Synchronized spillStats; auto sortBuffer = std::make_unique( inputType_, sortColumnIndices_, sortCompareFlags_, pool_.get(), &nonReclaimableSection_, - &spillConfig); + &spillConfig, + &spillStats); sortBuffer->spill(); if (hasPostSpillData) { @@ -470,7 +473,7 @@ TEST_F(SortBufferTest, emptySpill) { sortBuffer->addInput(fuzzer.fuzzRow(inputType_)); } sortBuffer->noMoreInput(); - ASSERT_FALSE(sortBuffer->spilledStats()); + ASSERT_TRUE(spillStats.rlock()->empty()); } } } // namespace facebook::velox::functions::test diff --git a/velox/exec/tests/SpillerBenchmarkBase.h b/velox/exec/tests/SpillerBenchmarkBase.h index 28e08cfe792e..2fd181cc015a 100644 --- a/velox/exec/tests/SpillerBenchmarkBase.h +++ b/velox/exec/tests/SpillerBenchmarkBase.h @@ -71,5 +71,6 @@ class SpillerBenchmarkBase { std::unique_ptr spiller_; // Stats. uint64_t executionTimeUs_{0}; + folly::Synchronized spillStats_; }; } // namespace facebook::velox::exec::test diff --git a/velox/exec/tests/SpillerTest.cpp b/velox/exec/tests/SpillerTest.cpp index d3f0568b5d23..c6ed8dc48c4d 100644 --- a/velox/exec/tests/SpillerTest.cpp +++ b/velox/exec/tests/SpillerTest.cpp @@ -513,6 +513,7 @@ class SpillerTest : public exec::test::RowContainerTestBase { common::GetSpillDirectoryPathCB tempSpillDirCb = [&]() -> const std::string& { return tempDirPath_->path; }; stats_.clear(); + spillStats_ = folly::Synchronized(); common::SpillConfig spillConfig; spillConfig.getSpillDirPathCb = makeError ? badSpillDirCb : tempSpillDirCb; @@ -527,8 +528,8 @@ class SpillerTest : public exec::test::RowContainerTestBase { if (type_ == Spiller::Type::kHashJoinProbe) { // kHashJoinProbe doesn't have associated row container. - spiller_ = - std::make_unique(type_, rowType_, hashBits_, &spillConfig); + spiller_ = std::make_unique( + type_, rowType_, hashBits_, &spillConfig, &spillStats_); } else if ( type_ == Spiller::Type::kOrderByInput || type_ == Spiller::Type::kAggregateInput) { @@ -540,15 +541,21 @@ class SpillerTest : public exec::test::RowContainerTestBase { rowType_, rowContainer_->keyTypes().size(), compareFlags_, - &spillConfig); + &spillConfig, + &spillStats_); } else if ( type_ == Spiller::Type::kAggregateOutput || type_ == Spiller::Type::kOrderByOutput) { spiller_ = std::make_unique( - type_, rowContainer_.get(), rowType_, &spillConfig); + type_, rowContainer_.get(), rowType_, &spillConfig, &spillStats_); } else if (type_ == Spiller::Type::kRowNumber) { spiller_ = std::make_unique( - type_, rowContainer_.get(), rowType_, hashBits_, &spillConfig); + type_, + rowContainer_.get(), + rowType_, + hashBits_, + &spillConfig, + &spillStats_); } else { VELOX_CHECK_EQ(type_, Spiller::Type::kHashJoinBuild); spiller_ = std::make_unique( @@ -557,7 +564,8 @@ class SpillerTest : public exec::test::RowContainerTestBase { rowContainer_.get(), rowType_, hashBits_, - &spillConfig); + &spillConfig, + &spillStats_); } ASSERT_EQ(spiller_->state().maxPartitions(), numPartitions_); ASSERT_FALSE(spiller_->isAllSpilled()); @@ -1057,6 +1065,7 @@ class SpillerTest : public exec::test::RowContainerTestBase { std::vector> partitions_; std::vector compareFlags_; std::unique_ptr spiller_; + folly::Synchronized spillStats_; }; struct AllTypesTestParam {