From 3504ecff7d13a979207327748eb05f4f94caf1f1 Mon Sep 17 00:00:00 2001 From: xiaoxmeng Date: Tue, 7 May 2024 14:06:15 -0700 Subject: [PATCH] Avoid unnecessary reclaim on non-reclaimable query (#9737) Summary: Avoid unnecessary reclaim on non-reclaimable query which might spent time on waiting for query task to pause. Also allows arbitration to reclaim reserved capacity from the request pool itself. Pull Request resolved: https://github.com/facebookincubator/velox/pull/9737 Reviewed By: tanjialiang Differential Revision: D57055391 Pulled By: xiaoxmeng fbshipit-source-id: 34fd61a343e445c2e5a4e228f3ce54d630c65f24 --- velox/common/memory/SharedArbitrator.cpp | 28 ++++++++++++------- velox/common/memory/SharedArbitrator.h | 21 ++++++++++---- .../memory/tests/MockSharedArbitratorTest.cpp | 15 +++++----- 3 files changed, 41 insertions(+), 23 deletions(-) diff --git a/velox/common/memory/SharedArbitrator.cpp b/velox/common/memory/SharedArbitrator.cpp index 56d53caea799..743c8c68eb4a 100644 --- a/velox/common/memory/SharedArbitrator.cpp +++ b/velox/common/memory/SharedArbitrator.cpp @@ -181,9 +181,10 @@ void SharedArbitrator::getCandidateStats( op->candidates.clear(); op->candidates.reserve(op->candidatePools.size()); for (const auto& pool : op->candidatePools) { + const bool selfCandidate = op->requestRoot == pool.get(); op->candidates.push_back( - {freeCapacityOnly ? 0 : reclaimableUsedCapacity(*pool), - reclaimableFreeCapacity(*pool), + {freeCapacityOnly ? 0 : reclaimableUsedCapacity(*pool, selfCandidate), + reclaimableFreeCapacity(*pool, selfCandidate), pool->currentBytes(), pool.get()}); } @@ -199,26 +200,31 @@ void SharedArbitrator::updateArbitrationFailureStats() { ++numFailures_; } -int64_t SharedArbitrator::maxReclaimableCapacity(const MemoryPool& pool) const { +int64_t SharedArbitrator::maxReclaimableCapacity( + const MemoryPool& pool, + bool isSelfReclaim) const { // Checks if a query memory pool has finished processing or not. If it has // finished, then we don't have to respect the memory pool reserved capacity // limit check. // NOTE: for query system like Prestissimo, it holds a finished query // state in minutes for query stats fetch request from the Presto coordinator. - if (pool.currentBytes() == 0 && pool.peakBytes() != 0) { + if (isSelfReclaim || (pool.currentBytes() == 0 && pool.peakBytes() != 0)) { return pool.capacity(); } return std::max(0, pool.capacity() - memoryPoolReservedCapacity_); } int64_t SharedArbitrator::reclaimableFreeCapacity( - const MemoryPool& pool) const { - return std::min(pool.freeBytes(), maxReclaimableCapacity(pool)); + const MemoryPool& pool, + bool isSelfReclaim) const { + return std::min( + pool.freeBytes(), maxReclaimableCapacity(pool, isSelfReclaim)); } int64_t SharedArbitrator::reclaimableUsedCapacity( - const MemoryPool& pool) const { - const auto maxReclaimableBytes = maxReclaimableCapacity(pool); + const MemoryPool& pool, + bool isSelfReclaim) const { + const auto maxReclaimableBytes = maxReclaimableCapacity(pool, isSelfReclaim); const auto reclaimableBytes = pool.reclaimableBytes(); return std::min(maxReclaimableBytes, reclaimableBytes.value_or(0)); } @@ -656,7 +662,8 @@ uint64_t SharedArbitrator::reclaimFreeMemoryFromCandidates( } const int64_t bytesToReclaim = std::min( reclaimTargetBytes - reclaimedBytes, - reclaimableFreeCapacity(*candidate.pool)); + reclaimableFreeCapacity( + *candidate.pool, candidate.pool == op->requestRoot)); if (bytesToReclaim <= 0) { continue; } @@ -725,7 +732,8 @@ uint64_t SharedArbitrator::reclaim( uint64_t targetBytes, bool isLocalArbitration) noexcept { int64_t bytesToReclaim = std::min( - std::max(targetBytes, memoryPoolTransferCapacity_), pool->capacity()); + std::max(targetBytes, memoryPoolTransferCapacity_), + maxReclaimableCapacity(*pool, true)); if (bytesToReclaim == 0) { return 0; } diff --git a/velox/common/memory/SharedArbitrator.h b/velox/common/memory/SharedArbitrator.h index 08af4c971128..7988f9e0c1c5 100644 --- a/velox/common/memory/SharedArbitrator.h +++ b/velox/common/memory/SharedArbitrator.h @@ -318,16 +318,25 @@ class SharedArbitrator : public memory::MemoryArbitrator { Stats statsLocked() const; // Returns the max reclaimable capacity from 'pool' which includes both used - // and free capacities. - int64_t maxReclaimableCapacity(const MemoryPool& pool) const; + // and free capacities. If 'isSelfReclaim' true, we reclaim memory from the + // request pool itself so that we can bypass the reserved free capacity + // reclaim restriction. + int64_t maxReclaimableCapacity(const MemoryPool& pool, bool isSelfReclaim) + const; // Returns the free memory capacity that can be reclaimed from 'pool' by - // shrink. - int64_t reclaimableFreeCapacity(const MemoryPool& pool) const; + // shrink. If 'isSelfReclaim' true, we reclaim memory from the request pool + // itself so that we can bypass the reserved free capacity reclaim + // restriction. + int64_t reclaimableFreeCapacity(const MemoryPool& pool, bool isSelfReclaim) + const; // Returns the used memory capacity that can be reclaimed from 'pool' by - // disk spill. - int64_t reclaimableUsedCapacity(const MemoryPool& pool) const; + // disk spill. If 'isSelfReclaim' true, we reclaim memory from the request + // pool itself so that we can bypass the reserved free capacity reclaim + // restriction. + int64_t reclaimableUsedCapacity(const MemoryPool& pool, bool isSelfReclaim) + const; // Returns the minimal amount of memory capacity to grow for 'pool' to have // the reserved capacity as specified by 'memoryPoolReservedCapacity_'. diff --git a/velox/common/memory/tests/MockSharedArbitratorTest.cpp b/velox/common/memory/tests/MockSharedArbitratorTest.cpp index 7fb6ed30c69c..95074e0c71c4 100644 --- a/velox/common/memory/tests/MockSharedArbitratorTest.cpp +++ b/velox/common/memory/tests/MockSharedArbitratorTest.cpp @@ -834,9 +834,9 @@ TEST_F(MockSharedArbitrationTest, shrinkPools) { {memoryPoolInitCapacity, true, 0, memoryPoolInitCapacity, false}, {memoryPoolInitCapacity, false, 0, memoryPoolInitCapacity, false}, {memoryPoolInitCapacity, true, 0, memoryPoolReserveCapacity, false}}, - 14 << 20, - 14 << 20, - 20 << 20, + 12 << 20, + 12 << 20, + 18 << 20, reservedMemoryCapacity, true, false}, @@ -872,9 +872,9 @@ TEST_F(MockSharedArbitrationTest, shrinkPools) { memoryPoolReserveCapacity, memoryPoolReserveCapacity, false}}, - 14 << 20, - 14 << 20, - 20 << 20, + 12 << 20, + 12 << 20, + 18 << 20, reservedMemoryCapacity, true, false}, @@ -2204,7 +2204,8 @@ TEST_F(MockSharedArbitrationTest, arbitrateWithMemoryReclaim) { kReservedMemoryCapacity - reservedPoolCapacity, 16, 0, - 67108864); + 58720256, + 8388608); verifyReclaimerStats( arbitrateOp->reclaimer()->stats(), 0, 1, kMemoryPoolTransferCapacity);