Skip to content

Commit

Permalink
Global memory arbitration optimization (#11262)
Browse files Browse the repository at this point in the history
Summary:

This PR adds global memory arbitration optimization which decouples the frontend memory arbitration request
and the backend slow memory arbitration processing. This enables to (1) remove the global locking inside the
arbitrator for slow memory arbitration processing; (2) respect e2e memory arbitration time to avoid the
potential node level deadlock that has happened in the production in the past; (3) backend optimization
such as query level memory reclamation parallelism; (4) more advanced memory arbitration policy
such as respect older query than younger when abort to allow the old slow query having a chance to run
through and also make a more consistent abort choice across nodes in a distributed execution environment
like Prestissimo instead of just relying on a query's current capacity.
Will update existing memory design doc reflect the internal change in followup.

Shadowed this in Prestissimo batch shadow and LBM stress test for reliability (for LBM stress test, the
memory checker gets pushback signal from the jemalloc earlier).

For performance measure by taking a spilled heavy query from Prestissimo batch production and
running 25 of them in parallel, the landing time (the time of the first query to the end of the last query)
has been reduced from 22mins down to 14 mins. The averaged execution time is reduced by half.
The speedup is mostly from query level spill parallelism as well as the global locking reduction. The
memory arbitration time has been reduced form 40k mins to 17k mins, and the driver queue time
has been reduced from 19k mins to 7k mins. This stress test has flakiness but the reduction in
memory arbitration wall-time is consistent across runs. Also the total amount of data to spill are also
consistent across runs at around 22TB


The followup needs to fix the issues on spilling execution path exposed by the stress test.

Orri's review patch: P1654857111

Reviewed By: tanjialiang, oerling

Differential Revision: D63902323
  • Loading branch information
xiaoxmeng authored and facebook-github-bot committed Oct 19, 2024
1 parent f8da123 commit b4f7902
Show file tree
Hide file tree
Showing 29 changed files with 3,530 additions and 2,662 deletions.
18 changes: 18 additions & 0 deletions velox/common/base/Counters.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,24 @@ void registerVeloxMetrics() {
kMetricArbitratorGlobalArbitrationCount,
facebook::velox::StatType::COUNT);

// The number of victims distribution of a global arbitration run [0, 32] with
// 32 buckets. It is configured to report the number of victims at P50, P90,
// P99, and P100 percentiles.
DEFINE_HISTOGRAM_METRIC(
kMetricArbitratorGlobalArbitrationNumReclaimVictims,
1,
0,
32,
50,
90,
99,
100);

// The number of victim query memory pool having nothing to spill.
DEFINE_METRIC(
kMetricArbitratorGlobalArbitrationFailedVictimCount,
facebook::velox::StatType::COUNT);

// The time distribution of a global arbitration run [0, 600s] with 20
// buckets. It is configured to report the latency at P50, P90, P99, and P100
// percentiles.
Expand Down
10 changes: 9 additions & 1 deletion velox/common/base/Counters.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ constexpr folly::StringPiece kMetricTaskMemoryReclaimWaitTimeoutCount{
"velox.task_memory_reclaim_wait_timeout_count"};

constexpr folly::StringPiece kMetricOpMemoryReclaimTimeMs{
"velox.op_memory_reclaim_ms"};
"velox.op_memory_reclaim_time_ms"};

constexpr folly::StringPiece kMetricOpMemoryReclaimedBytes{
"velox.op_memory_reclaim_bytes"};
Expand Down Expand Up @@ -88,6 +88,14 @@ constexpr folly::StringPiece kMetricArbitratorLocalArbitrationCount{
constexpr folly::StringPiece kMetricArbitratorGlobalArbitrationCount{
"velox.arbitrator_global_arbitration_count"};

constexpr folly::StringPiece
kMetricArbitratorGlobalArbitrationNumReclaimVictims{
"velox.arbitrator_global_arbitration_num_reclaim_victims"};

constexpr folly::StringPiece
kMetricArbitratorGlobalArbitrationFailedVictimCount{
"velox.arbitrator_global_arbitration_failed_victim_count"};

constexpr folly::StringPiece kMetricArbitratorGlobalArbitrationBytes{
"velox.arbitrator_global_arbitration_bytes"};

Expand Down
8 changes: 5 additions & 3 deletions velox/common/base/SuccinctPrinter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,15 @@ namespace {
/// Possible units are days(d), hours(h), minutes(m), seconds(s).
std::string succinctSeconds(uint64_t seconds) {
std::stringstream out;
int days = seconds / kSecondsInDay;
const uint64_t days = seconds / kSecondsInDay;
bool isFirstUnit = true;
if (days) {
out << days << "d";
isFirstUnit = false;
}
seconds -= days * kSecondsInDay;

int hours = seconds / kSecondsInHour;
const uint64_t hours = seconds / kSecondsInHour;
if (days || hours) {
if (!isFirstUnit) {
out << " ";
Expand All @@ -58,11 +58,12 @@ std::string succinctSeconds(uint64_t seconds) {
}
seconds -= hours * kSecondsInHour;

int minutes = seconds / kSecondsInMinute;
const uint64_t minutes = seconds / kSecondsInMinute;
if (days || hours || minutes) {
if (!isFirstUnit) {
out << " ";
}

out << minutes << "m";
isFirstUnit = false;
}
Expand Down Expand Up @@ -109,6 +110,7 @@ std::string succinctDuration(uint64_t duration, int unitOffset, int precision) {
std::round((duration * 1.0) / kTimeUnitsInSecond[unitOffset]);
return succinctSeconds(seconds);
}

return succinctPrint(
duration,
&kTimeUnits[0],
Expand Down
67 changes: 35 additions & 32 deletions velox/common/base/VeloxException.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,80 +40,83 @@ namespace velox {
namespace error_source {
using namespace folly::string_literals;

// Errors where the root cause of the problem is either because of bad input
// or an unsupported pattern of use are classified with source USER. Examples
// of errors in this category include syntax errors, unavailable names or
// objects.
/// Errors where the root cause of the problem is either because of bad input
/// or an unsupported pattern of use are classified with source USER. Examples
/// of errors in this category include syntax errors, unavailable names or
/// objects.
inline constexpr auto kErrorSourceUser = "USER"_fs;

// Errors where the root cause of the problem is an unexpected internal state in
// the system.
/// Errors where the root cause of the problem is an unexpected internal state
/// in the system.
inline constexpr auto kErrorSourceRuntime = "RUNTIME"_fs;

// Errors where the root cause of the problem is some unreliable aspect of the
// system are classified with source SYSTEM.
/// Errors where the root cause of the problem is some unreliable aspect of the
/// system are classified with source SYSTEM.
inline constexpr auto kErrorSourceSystem = "SYSTEM"_fs;
} // namespace error_source

namespace error_code {
using namespace folly::string_literals;

//====================== User Error Codes ======================:
///====================== User Error Codes ======================:

// A generic user error code
/// A generic user error code
inline constexpr auto kGenericUserError = "GENERIC_USER_ERROR"_fs;

// An error raised when an argument verification fails
/// An error raised when an argument verification fails
inline constexpr auto kInvalidArgument = "INVALID_ARGUMENT"_fs;

// An error raised when a requested operation is not supported.
/// An error raised when a requested operation is not supported.
inline constexpr auto kUnsupported = "UNSUPPORTED"_fs;

// Arithmetic errors - underflow, overflow, divide by zero etc.
/// Arithmetic errors - underflow, overflow, divide by zero etc.
inline constexpr auto kArithmeticError = "ARITHMETIC_ERROR"_fs;

// Arithmetic errors - underflow, overflow, divide by zero etc.
/// Arithmetic errors - underflow, overflow, divide by zero etc.
inline constexpr auto kSchemaMismatch = "SCHEMA_MISMATCH"_fs;

//====================== Runtime Error Codes ======================:
///====================== Runtime Error Codes ======================:

// An error raised when the current state of a component is invalid.
/// An error raised when the current state of a component is invalid.
inline constexpr auto kInvalidState = "INVALID_STATE"_fs;

// An error raised when unreachable code point was executed.
/// An error raised when unreachable code point was executed.
inline constexpr auto kUnreachableCode = "UNREACHABLE_CODE"_fs;

// An error raised when a requested operation is not yet supported.
/// An error raised when a requested operation is not yet supported.
inline constexpr auto kNotImplemented = "NOT_IMPLEMENTED"_fs;

// An error raised when memory pool exceeds limits.
/// An error raised when memory pool exceeds limits.
inline constexpr auto kMemCapExceeded = "MEM_CAP_EXCEEDED"_fs;

// An error raised when memory pool is aborted.
/// An error raised when memory pool is aborted.
inline constexpr auto kMemAborted = "MEM_ABORTED"_fs;

// Error caused by memory allocation failure (inclusive of allocator memory cap
// exceeded).
/// An error raised when memory arbitration is timed out.
inline constexpr auto kMemArbitrationTimeout = "MEM_ARBITRATION_TIMEOUT"_fs;

/// Error caused by memory allocation failure (inclusive of allocator memory cap
/// exceeded).
inline constexpr auto kMemAllocError = "MEM_ALLOC_ERROR"_fs;

// Error caused by failing to allocate cache buffer space for IO.
/// Error caused by failing to allocate cache buffer space for IO.
inline constexpr auto kNoCacheSpace = "NO_CACHE_SPACE"_fs;

// An error raised when spill bytes exceeds limits.
/// An error raised when spill bytes exceeds limits.
inline constexpr auto kSpillLimitExceeded = "SPILL_LIMIT_EXCEEDED"_fs;

// Errors indicating file read corruptions.
/// Errors indicating file read corruptions.
inline constexpr auto kFileCorruption = "FILE_CORRUPTION"_fs;

// Errors indicating file not found.
/// Errors indicating file not found.
inline constexpr auto kFileNotFound = "FILE_NOT_FOUND"_fs;

// We do not know how to classify it yet.
/// We do not know how to classify it yet.
inline constexpr auto kUnknown = "UNKNOWN"_fs;

// VeloxRuntimeErrors due to unsupported input values such as unicode input to
// cast-varchar-to-integer and timestamps beyond the year 2037 to datetime
// functions. This kind of errors is allowed in expression fuzzer.
/// VeloxRuntimeErrors due to unsupported input values such as unicode input to
/// cast-varchar-to-integer and timestamps beyond the year 2037 to datetime
/// functions. This kind of errors is allowed in expression fuzzer.
inline constexpr auto kUnsupportedInputUncatchable =
"UNSUPPORTED_INPUT_UNCATCHABLE"_fs;
} // namespace error_code
Expand Down Expand Up @@ -160,12 +163,12 @@ class VeloxException : public std::exception {
exceptionType,
exceptionName) {}

// Inherited
/// Inherited
const char* what() const noexcept override {
return state_->what();
}

// Introduced nonvirtuals
/// Introduced nonvirtuals
const process::StackTrace* stackTrace() const {
return state_->stackTrace.get();
}
Expand Down
11 changes: 11 additions & 0 deletions velox/common/base/tests/SuccinctPrinterTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ TEST(SuccinctPrinterTest, testSuccinctNanos) {
EXPECT_EQ(succinctNanos(86'399'499'000'000), "23h 59m 59s");
EXPECT_EQ(succinctNanos(86'400'123'000'000), "1d 0h 0m 0s");
EXPECT_EQ(succinctNanos(867'661'789'000'000), "10d 1h 1m 2s");
EXPECT_EQ(
succinctNanos(std::numeric_limits<uint64_t>::max()),
"213503d 23h 34m 34s");
}

TEST(SuccinctPrinterTest, testSuccinctMicros) {
Expand All @@ -51,6 +54,9 @@ TEST(SuccinctPrinterTest, testSuccinctMicros) {
EXPECT_EQ(succinctMicros(86'399'498), "1m 26s");
EXPECT_EQ(succinctMicros(86'400'123), "1m 26s");
EXPECT_EQ(succinctMicros(867'661'789), "14m 28s");
EXPECT_EQ(
succinctMicros(std::numeric_limits<uint64_t>::max()),
"213503982d 8h 1m 50s");
}

TEST(SuccinctPrinterTest, testSuccinctMillis) {
Expand All @@ -65,6 +71,9 @@ TEST(SuccinctPrinterTest, testSuccinctMillis) {
EXPECT_EQ(succinctMillis(86'399'498), "23h 59m 59s");
EXPECT_EQ(succinctMillis(86'400'123), "1d 0h 0m 0s");
EXPECT_EQ(succinctMillis(867'661'789), "10d 1h 1m 2s");
EXPECT_EQ(
succinctMillis(std::numeric_limits<uint64_t>::max()),
"213503982334d 14h 25m 52s");
}

TEST(SuccinctPrinterTest, testSuccinctBytes) {
Expand All @@ -77,6 +86,8 @@ TEST(SuccinctPrinterTest, testSuccinctBytes) {
EXPECT_EQ(succinctBytes(1'234'567'890), "1.15GB");
EXPECT_EQ(succinctBytes(1'099'511'627'776), "1.00TB");
EXPECT_EQ(succinctBytes(1234'099'511'627'776), "1122.41TB");
EXPECT_EQ(
succinctBytes(std::numeric_limits<uint64_t>::max()), "16777216.00TB");
}

} // namespace facebook::velox
2 changes: 1 addition & 1 deletion velox/common/memory/ArbitrationOperation.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ class ArbitrationOperation {

/// Invoked to mark the start of global arbitration. This is used to measure
/// how much time spent in waiting for global arbitration.
void startGlobalArbitration() {
void recordGlobalArbitrationStartTime() {
VELOX_CHECK_EQ(globalArbitrationStartTimeMs_, 0);
VELOX_CHECK_EQ(state_, State::kRunning);
globalArbitrationStartTimeMs_ = getCurrentTimeMs();
Expand Down
45 changes: 34 additions & 11 deletions velox/common/memory/ArbitrationParticipant.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,15 @@ using namespace facebook::velox::memory;

std::string ArbitrationParticipant::Config::toString() const {
return fmt::format(
"initCapacity {}, minCapacity {}, fastExponentialGrowthCapacityLimit {}, slowCapacityGrowRatio {}, minFreeCapacity {}, minFreeCapacityRatio {}",
"initCapacity {}, minCapacity {}, fastExponentialGrowthCapacityLimit {}, slowCapacityGrowRatio {}, minFreeCapacity {}, minFreeCapacityRatio {}, minReclaimBytes {}, abortCapacityLimit {}",
succinctBytes(initCapacity),
succinctBytes(minCapacity),
succinctBytes(fastExponentialGrowthCapacityLimit),
slowCapacityGrowRatio,
succinctBytes(minFreeCapacity),
minFreeCapacityRatio);
minFreeCapacityRatio,
succinctBytes(minReclaimBytes),
succinctBytes(abortCapacityLimit));
}

ArbitrationParticipant::Config::Config(
Expand All @@ -46,13 +48,17 @@ ArbitrationParticipant::Config::Config(
uint64_t _fastExponentialGrowthCapacityLimit,
double _slowCapacityGrowRatio,
uint64_t _minFreeCapacity,
double _minFreeCapacityRatio)
double _minFreeCapacityRatio,
uint64_t _minReclaimBytes,
uint64_t _abortCapacityLimit)
: initCapacity(_initCapacity),
minCapacity(_minCapacity),
fastExponentialGrowthCapacityLimit(_fastExponentialGrowthCapacityLimit),
slowCapacityGrowRatio(_slowCapacityGrowRatio),
minFreeCapacity(_minFreeCapacity),
minFreeCapacityRatio(_minFreeCapacityRatio) {
minFreeCapacityRatio(_minFreeCapacityRatio),
minReclaimBytes(_minReclaimBytes),
abortCapacityLimit(_abortCapacityLimit) {
VELOX_CHECK_GE(slowCapacityGrowRatio, 0);
VELOX_CHECK_EQ(
fastExponentialGrowthCapacityLimit == 0,
Expand All @@ -73,6 +79,10 @@ ArbitrationParticipant::Config::Config(
"adjustment.",
minFreeCapacity,
minFreeCapacityRatio);
VELOX_CHECK(
bits::isPowerOfTwo(abortCapacityLimit),
"abortCapacityLimit {} not a power of two",
abortCapacityLimit);
}

std::shared_ptr<ArbitrationParticipant> ArbitrationParticipant::create(
Expand Down Expand Up @@ -251,24 +261,27 @@ void ArbitrationParticipant::finishArbitration(ArbitrationOperation* op) {

uint64_t ArbitrationParticipant::reclaim(
uint64_t targetBytes,
uint64_t maxWaitTimeMs) noexcept {
uint64_t maxWaitTimeMs,
MemoryReclaimer::Stats& stats) noexcept {
targetBytes = std::max(targetBytes, config_->minReclaimBytes);
if (targetBytes == 0) {
return 0;
}
std::lock_guard<std::timed_mutex> l(reclaimLock_);
TestValue::adjust(
"facebook::velox::memory::ArbitrationParticipant::reclaim", this);
uint64_t reclaimedBytes{0};
MemoryReclaimer::Stats reclaimStats;
try {
++numReclaims_;
pool_->reclaim(targetBytes, maxWaitTimeMs, reclaimStats);
VELOX_MEM_LOG(INFO) << "Reclaiming from memory pool " << pool_->name()
<< " with target " << succinctBytes(targetBytes);
pool_->reclaim(targetBytes, maxWaitTimeMs, stats);
reclaimedBytes = shrink(/*reclaimAll=*/false);
} catch (const std::exception& e) {
VELOX_MEM_LOG(ERROR) << "Failed to reclaim from memory pool "
<< pool_->name() << ", aborting it: " << e.what();
abortLocked(std::current_exception());
reclaimedBytes = abortLocked(std::current_exception());
}
reclaimedBytes = shrink(/*reclaimAll=*/true);
return reclaimedBytes;
}

Expand All @@ -286,6 +299,10 @@ bool ArbitrationParticipant::grow(

uint64_t ArbitrationParticipant::shrink(bool reclaimAll) {
std::lock_guard<std::mutex> l(stateLock_);
return shrinkLocked(reclaimAll);
}

uint64_t ArbitrationParticipant::shrinkLocked(bool reclaimAll) {
++numShrinks_;

uint64_t reclaimedBytes{0};
Expand Down Expand Up @@ -316,18 +333,24 @@ uint64_t ArbitrationParticipant::abortLocked(
if (aborted_) {
return 0;
}
aborted_ = true;
}
try {
VELOX_MEM_LOG(WARNING) << "Memory pool " << pool_->name()
<< " is being aborted";
pool_->abort(error);
} catch (const std::exception& e) {
VELOX_MEM_LOG(WARNING) << "Failed to abort memory pool "
<< pool_->toString() << ", error: " << e.what();
}
VELOX_MEM_LOG(WARNING) << "Memory pool " << pool_->name() << " aborted";
// NOTE: no matter query memory pool abort throws or not, it should have been
// marked as aborted to prevent any new memory arbitration operations.
VELOX_CHECK(pool_->aborted());
return shrink(/*reclaimAll=*/true);

std::lock_guard<std::mutex> l(stateLock_);
VELOX_CHECK(!aborted_);
aborted_ = true;
return shrinkLocked(/*reclaimAll=*/true);
}

bool ArbitrationParticipant::waitForReclaimOrAbort(
Expand Down
Loading

0 comments on commit b4f7902

Please sign in to comment.