Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 48 additions & 9 deletions ydb/library/actors/interconnect/rdma/mem_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,19 @@ struct ibv_mr {
#include <vector>
#include <list>

#include <unistd.h>
#include <sys/syscall.h>
#include <mutex>
#include <thread>

#if defined(_linux_)
#include <sys/mman.h>
#include <cstdlib>
#include <cstring>
#include <cerrno>

#if defined(_win_)
#include <malloc.h> // _aligned_malloc, _aligned_free
#else
#include <sys/mman.h> // madvise
#include <unistd.h>
#include <sys/syscall.h>
#endif

static constexpr size_t HPageSz = (1 << 21);
Expand All @@ -45,6 +51,10 @@ using ::NMonitoring::TDynamicCounters;

namespace NInterconnect::NRdma {

// Cross-platform memory management
static void* allocateMemory(size_t size, size_t alignment, bool hp);
static void freeMemory(void* ptr) noexcept;

class TChunk: public NNonCopyable::TMoveOnly, public TAtomicRefCount<TChunk> {
public:

Expand All @@ -67,7 +77,7 @@ namespace NInterconnect::NRdma {
#else
free(MRs.front());
#endif
std::free(addr);
freeMemory(addr);
MRs.clear();
}

Expand Down Expand Up @@ -204,11 +214,29 @@ namespace NInterconnect::NRdma {
);
}

void* allocateMemory(size_t size, size_t alignment, bool hp) {
static void* allocateMemory(size_t size, size_t alignment, bool hp) {
if (size % alignment != 0) {
return nullptr;
}
void* buf = std::aligned_alloc(alignment, size);

void* buf = nullptr;

#if defined(_win_)
// Windows: use _aligned_malloc
buf = _aligned_malloc(size, alignment);
if (!buf) {
fprintf(stderr, "Failed to allocate aligned memory on Windows\n");
return nullptr;
}
#else
// POSIX/C++: std::aligned_alloc (C++17)
buf = std::aligned_alloc(alignment, size);
if (!buf) {
fprintf(stderr, "Failed to allocate aligned memory on Unix\n");
return nullptr;
}
#endif

if (hp) {
#if defined(_linux_)
if (madvise(buf, size, MADV_HUGEPAGE) < 0) {
Expand All @@ -218,12 +246,23 @@ namespace NInterconnect::NRdma {
#endif
for (size_t i = 0; i < size; i += HPageSz) {
// We use THP right now. We need to touch each page to promote it to HUGE.
((char*)buf)[i] = 0;
static_cast<char*>(buf)[i] = 0;
}
}
return buf;
}

static void freeMemory(void* ptr) noexcept {
if (!ptr) {
return;
}
#if defined(_win_)
_aligned_free(ptr);
#else
std::free(ptr);
#endif
}

std::vector<ibv_mr*> registerMemory(void* addr, size_t size, const NInterconnect::NRdma::NLinkMgr::TCtxsMap& ctxs) {
std::vector<ibv_mr*> res;
#ifndef MEM_POOL_DISABLE_RDMA_SUPPORT
Expand Down Expand Up @@ -307,7 +346,7 @@ namespace NInterconnect::NRdma {

auto mrs = registerMemory(ptr, size, Ctxs);
if (mrs.empty()) {
std::free(ptr);
freeMemory(ptr);
return nullptr;
}

Expand Down
Loading