Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid unnecessary reclaim on non-reclaimable query #9737

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 18 additions & 10 deletions velox/common/memory/SharedArbitrator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -181,9 +181,10 @@ void SharedArbitrator::getCandidateStats(
op->candidates.clear();
op->candidates.reserve(op->candidatePools.size());
for (const auto& pool : op->candidatePools) {
const bool selfCandidate = op->requestRoot == pool.get();
Copy link
Contributor

@tanjialiang tanjialiang May 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/selfCandidate/isSelfReclaim/ to have the consistent name across methods.

op->candidates.push_back(
{freeCapacityOnly ? 0 : reclaimableUsedCapacity(*pool),
reclaimableFreeCapacity(*pool),
{freeCapacityOnly ? 0 : reclaimableUsedCapacity(*pool, selfCandidate),
reclaimableFreeCapacity(*pool, selfCandidate),
pool->currentBytes(),
pool.get()});
}
Expand All @@ -199,26 +200,31 @@ void SharedArbitrator::updateArbitrationFailureStats() {
++numFailures_;
}

int64_t SharedArbitrator::maxReclaimableCapacity(const MemoryPool& pool) const {
int64_t SharedArbitrator::maxReclaimableCapacity(
const MemoryPool& pool,
bool isSelfReclaim) const {
// Checks if a query memory pool has finished processing or not. If it has
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we update the comments here as well?

// finished, then we don't have to respect the memory pool reserved capacity
// limit check.
// NOTE: for query system like Prestissimo, it holds a finished query
// state in minutes for query stats fetch request from the Presto coordinator.
if (pool.currentBytes() == 0 && pool.peakBytes() != 0) {
if (isSelfReclaim || (pool.currentBytes() == 0 && pool.peakBytes() != 0)) {
return pool.capacity();
}
return std::max<int64_t>(0, pool.capacity() - memoryPoolReservedCapacity_);
}

int64_t SharedArbitrator::reclaimableFreeCapacity(
const MemoryPool& pool) const {
return std::min<int64_t>(pool.freeBytes(), maxReclaimableCapacity(pool));
const MemoryPool& pool,
bool isSelfReclaim) const {
return std::min<int64_t>(
pool.freeBytes(), maxReclaimableCapacity(pool, isSelfReclaim));
}

int64_t SharedArbitrator::reclaimableUsedCapacity(
const MemoryPool& pool) const {
const auto maxReclaimableBytes = maxReclaimableCapacity(pool);
const MemoryPool& pool,
bool isSelfReclaim) const {
const auto maxReclaimableBytes = maxReclaimableCapacity(pool, isSelfReclaim);
const auto reclaimableBytes = pool.reclaimableBytes();
return std::min<int64_t>(maxReclaimableBytes, reclaimableBytes.value_or(0));
}
Expand Down Expand Up @@ -656,7 +662,8 @@ uint64_t SharedArbitrator::reclaimFreeMemoryFromCandidates(
}
const int64_t bytesToReclaim = std::min<int64_t>(
reclaimTargetBytes - reclaimedBytes,
reclaimableFreeCapacity(*candidate.pool));
reclaimableFreeCapacity(
*candidate.pool, candidate.pool == op->requestRoot));
if (bytesToReclaim <= 0) {
continue;
}
Expand Down Expand Up @@ -725,7 +732,8 @@ uint64_t SharedArbitrator::reclaim(
uint64_t targetBytes,
bool isLocalArbitration) noexcept {
int64_t bytesToReclaim = std::min<uint64_t>(
std::max(targetBytes, memoryPoolTransferCapacity_), pool->capacity());
std::max(targetBytes, memoryPoolTransferCapacity_),
maxReclaimableCapacity(*pool, true));
if (bytesToReclaim == 0) {
return 0;
}
Expand Down
21 changes: 15 additions & 6 deletions velox/common/memory/SharedArbitrator.h
Original file line number Diff line number Diff line change
Expand Up @@ -318,16 +318,25 @@ class SharedArbitrator : public memory::MemoryArbitrator {
Stats statsLocked() const;

// Returns the max reclaimable capacity from 'pool' which includes both used
// and free capacities.
int64_t maxReclaimableCapacity(const MemoryPool& pool) const;
// and free capacities. If 'isSelfReclaim' true, we reclaim memory from the
// request pool itself so that we can bypass the reserved free capacity
// reclaim restriction.
int64_t maxReclaimableCapacity(const MemoryPool& pool, bool isSelfReclaim)
const;

// Returns the free memory capacity that can be reclaimed from 'pool' by
// shrink.
int64_t reclaimableFreeCapacity(const MemoryPool& pool) const;
// shrink. If 'isSelfReclaim' true, we reclaim memory from the request pool
// itself so that we can bypass the reserved free capacity reclaim
// restriction.
int64_t reclaimableFreeCapacity(const MemoryPool& pool, bool isSelfReclaim)
const;

// Returns the used memory capacity that can be reclaimed from 'pool' by
// disk spill.
int64_t reclaimableUsedCapacity(const MemoryPool& pool) const;
// disk spill. If 'isSelfReclaim' true, we reclaim memory from the request
// pool itself so that we can bypass the reserved free capacity reclaim
// restriction.
int64_t reclaimableUsedCapacity(const MemoryPool& pool, bool isSelfReclaim)
const;

// Returns the minimal amount of memory capacity to grow for 'pool' to have
// the reserved capacity as specified by 'memoryPoolReservedCapacity_'.
Expand Down
15 changes: 8 additions & 7 deletions velox/common/memory/tests/MockSharedArbitratorTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -834,9 +834,9 @@ TEST_F(MockSharedArbitrationTest, shrinkPools) {
{memoryPoolInitCapacity, true, 0, memoryPoolInitCapacity, false},
{memoryPoolInitCapacity, false, 0, memoryPoolInitCapacity, false},
{memoryPoolInitCapacity, true, 0, memoryPoolReserveCapacity, false}},
14 << 20,
14 << 20,
20 << 20,
12 << 20,
12 << 20,
18 << 20,
reservedMemoryCapacity,
true,
false},
Expand Down Expand Up @@ -872,9 +872,9 @@ TEST_F(MockSharedArbitrationTest, shrinkPools) {
memoryPoolReserveCapacity,
memoryPoolReserveCapacity,
false}},
14 << 20,
14 << 20,
20 << 20,
12 << 20,
12 << 20,
18 << 20,
reservedMemoryCapacity,
true,
false},
Expand Down Expand Up @@ -2204,7 +2204,8 @@ TEST_F(MockSharedArbitrationTest, arbitrateWithMemoryReclaim) {
kReservedMemoryCapacity - reservedPoolCapacity,
16,
0,
67108864);
58720256,
8388608);

verifyReclaimerStats(
arbitrateOp->reclaimer()->stats(), 0, 1, kMemoryPoolTransferCapacity);
Expand Down
Loading