Skip to content

Commit

Permalink
automate malloc_trim
Browse files Browse the repository at this point in the history
  • Loading branch information
lnkuiper committed Aug 27, 2024
1 parent 10a018a commit 31dad45
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 12 deletions.
35 changes: 32 additions & 3 deletions src/common/allocator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include "duckdb/common/exception.hpp"
#include "duckdb/common/helper.hpp"
#include "duckdb/common/numeric_utils.hpp"
#include "duckdb/common/types/timestamp.hpp"

#include <cstdint>

Expand All @@ -27,6 +28,10 @@
#include "jemalloc_extension.hpp"
#endif

#ifdef __GLIBC__
#include <malloc.h>
#endif

namespace duckdb {

AllocatedData::AllocatedData() : allocator(nullptr), pointer(nullptr), allocated_size(0) {
Expand Down Expand Up @@ -224,17 +229,40 @@ int64_t Allocator::DecayDelay() {
}

bool Allocator::SupportsFlush() {
#ifdef USE_JEMALLOC
#if defined(USE_JEMALLOC) || defined(__GLIBC__)
return true;
#else
return false;
#endif
}

void Allocator::ThreadFlush(idx_t threshold) {
static void MallocTrim(idx_t pad) {
#ifdef __GLIBC__
static constexpr int64_t TRIM_INTERVAL_MS = 100;
static atomic<int64_t> LAST_TRIM_TIMESTAMP_MS {0};

int64_t last_trim_timestamp_ms = LAST_TRIM_TIMESTAMP_MS.load();
const int64_t current_timestamp_ms = Timestamp::GetEpochMs(Timestamp::GetCurrentTimestamp());

if (current_timestamp_ms - last_trim_timestamp_ms < TRIM_INTERVAL_MS) {
return; // We trimmed less than TRIM_INTERVAL_MS ago
}
if (!std::atomic_compare_exchange_weak(&LAST_TRIM_TIMESTAMP_MS, &last_trim_timestamp_ms, current_timestamp_ms)) {
return; // Another thread has updated LAST_TRIM_TIMESTAMP_MS since we loaded it
}

// We succesfully updated LAST_TRIM_TIMESTAMP_MS, we can trim
malloc_trim(pad);
#endif
}

void Allocator::ThreadFlush(bool allocator_background_threads, idx_t threshold, idx_t thread_count) {
#ifdef USE_JEMALLOC
JemallocExtension::ThreadFlush(threshold);
if (!allocator_background_threads) {
JemallocExtension::ThreadFlush(threshold);
}
#endif
MallocTrim(thread_count * threshold);
}

void Allocator::ThreadIdle() {
Expand All @@ -247,6 +275,7 @@ void Allocator::FlushAll() {
#ifdef USE_JEMALLOC
JemallocExtension::FlushAll();
#endif
MallocTrim(0);
}

void Allocator::SetBackgroundThreads(bool enable) {
Expand Down
2 changes: 1 addition & 1 deletion src/include/duckdb/common/allocator.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ class Allocator {

static bool SupportsFlush();
static int64_t DecayDelay();
static void ThreadFlush(idx_t threshold);
static void ThreadFlush(bool allocator_background_threads, idx_t threshold, idx_t thread_count);
static void ThreadIdle();
static void FlushAll();
static void SetBackgroundThreads(bool enable);
Expand Down
10 changes: 5 additions & 5 deletions src/parallel/task_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -164,12 +164,12 @@ void TaskScheduler::ExecuteForever(atomic<bool> *marker) {
shared_ptr<Task> task;
// loop until the marker is set to false
while (*marker) {
if (!Allocator::SupportsFlush() || allocator_background_threads) {
// allocator can't flush, or background threads clean up allocations, just start an untimed wait
if (!Allocator::SupportsFlush()) {
// allocator can't flush, just start an untimed wait
queue->semaphore.wait();
} else if (!queue->semaphore.wait(INITIAL_FLUSH_WAIT)) {
// no background threads, flush this threads outstanding allocations after it was idle for 0.5s
Allocator::ThreadFlush(allocator_flush_threshold);
// allocator can flush, we flush this threads outstanding allocations after it was idle for 0.5s
Allocator::ThreadFlush(allocator_background_threads, allocator_flush_threshold, requested_thread_count);
if (!queue->semaphore.wait(Allocator::DecayDelay() * 1000000 - INITIAL_FLUSH_WAIT)) {
// in total, the thread was idle for the entire decay delay (note: seconds converted to mus)
// mark it as idle and start an untimed wait
Expand All @@ -196,7 +196,7 @@ void TaskScheduler::ExecuteForever(atomic<bool> *marker) {
}
// this thread will exit, flush all of its outstanding allocations
if (Allocator::SupportsFlush()) {
Allocator::ThreadFlush(0);
Allocator::ThreadFlush(allocator_background_threads, 0, requested_thread_count);
Allocator::ThreadIdle();
}
#else
Expand Down
5 changes: 5 additions & 0 deletions src/storage/buffer/buffer_pool.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include "duckdb/storage/buffer/buffer_pool.hpp"

#include "duckdb/common/allocator.hpp"
#include "duckdb/common/chrono.hpp"
#include "duckdb/common/exception.hpp"
#include "duckdb/common/typedefs.hpp"
Expand All @@ -8,6 +9,7 @@
#include "duckdb/storage/temporary_memory_manager.hpp"

namespace duckdb {

BufferEvictionNode::BufferEvictionNode(weak_ptr<BlockHandle> handle_p, idx_t eviction_seq_num)
: handle(std::move(handle_p)), handle_sequence_number(eviction_seq_num) {
D_ASSERT(!handle.expired());
Expand Down Expand Up @@ -394,6 +396,9 @@ void BufferPool::SetLimit(idx_t limit, const char *exception_postscript) {
"Failed to change memory limit to %lld: could not free up enough memory for the new limit%s", limit,
exception_postscript);
}
if (Allocator::SupportsFlush()) {
Allocator::FlushAll();
}
}

BufferPool::MemoryUsage::MemoryUsage() {
Expand Down
3 changes: 0 additions & 3 deletions src/storage/standard_buffer_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -399,9 +399,6 @@ void StandardBufferManager::Unpin(shared_ptr<BlockHandle> &handle) {

void StandardBufferManager::SetMemoryLimit(idx_t limit) {
buffer_pool.SetLimit(limit, InMemoryWarning());
if (Allocator::SupportsFlush()) {
Allocator::FlushAll();
}
}

void StandardBufferManager::SetSwapLimit(optional_idx limit) {
Expand Down

0 comments on commit 31dad45

Please sign in to comment.