Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 10 additions & 16 deletions cpp/src/plasma/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,6 @@ find_package(Threads)
set(PLASMA_SO_VERSION "${ARROW_SO_VERSION}")
set(PLASMA_FULL_SO_VERSION "${ARROW_FULL_SO_VERSION}")

include_directories("${CMAKE_CURRENT_LIST_DIR}/" "${CMAKE_CURRENT_LIST_DIR}/thirdparty/"
"${CMAKE_CURRENT_LIST_DIR}/../")

set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Wno-conversion")

# Compile flatbuffers
Expand Down Expand Up @@ -64,17 +61,14 @@ add_custom_command(

set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fPIC")

set(PLASMA_SRCS
client.cc
common.cc
eviction_policy.cc
set(PLASMA_SRCS client.cc common.cc fling.cc io.cc malloc.cc plasma.cc protocol.cc)

set(PLASMA_STORE_SRCS
dlmalloc.cc
events.cc
fling.cc
io.cc
malloc.cc
plasma.cc
eviction_policy.cc
plasma_allocator.cc
protocol.cc
store.cc
thirdparty/ae/ae.c)

set(PLASMA_LINK_LIBS arrow_shared)
Expand Down Expand Up @@ -106,10 +100,10 @@ endforeach()

# The optimization flag -O3 is suggested by dlmalloc.c, which is #included in
# malloc.cc; we set it here regardless of whether we do a debug or release build.
set_source_files_properties(malloc.cc PROPERTIES COMPILE_FLAGS "-O3")
set_source_files_properties(dlmalloc.cc PROPERTIES COMPILE_FLAGS "-O3")

if("${COMPILER_FAMILY}" STREQUAL "clang")
set_property(SOURCE malloc.cc
set_property(SOURCE dlmalloc.cc
APPEND_STRING
PROPERTY COMPILE_FLAGS " -Wno-parentheses-equality \
-Wno-null-pointer-arithmetic \
Expand All @@ -118,14 +112,14 @@ if("${COMPILER_FAMILY}" STREQUAL "clang")
endif()

if("${COMPILER_FAMILY}" STREQUAL "gcc")
set_property(SOURCE malloc.cc APPEND_STRING PROPERTY COMPILE_FLAGS " -Wno-conversion")
set_property(SOURCE dlmalloc.cc APPEND_STRING PROPERTY COMPILE_FLAGS " -Wno-conversion")
endif()

list(APPEND PLASMA_EXTERNAL_STORE_SOURCES "external_store.cc" "hash_table_store.cc")

# We use static libraries for the plasma_store_server executable so that it can
# be copied around and used in different locations.
add_executable(plasma_store_server ${PLASMA_EXTERNAL_STORE_SOURCES} store.cc)
add_executable(plasma_store_server ${PLASMA_EXTERNAL_STORE_SOURCES} ${PLASMA_STORE_SRCS})
if(ARROW_BUILD_STATIC)
target_link_libraries(plasma_store_server plasma_static ${PLASMA_STATIC_LINK_LIBS})
else()
Expand Down
166 changes: 166 additions & 0 deletions cpp/src/plasma/dlmalloc.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "plasma/malloc.h"

#include <assert.h>
#include <stddef.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/mman.h>
#include <unistd.h>

#include <cerrno>
#include <string>
#include <vector>

#include "plasma/common.h"
#include "plasma/plasma.h"

namespace plasma {

void* fake_mmap(size_t);
int fake_munmap(void*, int64_t);

#define MMAP(s) fake_mmap(s)
#define MUNMAP(a, s) fake_munmap(a, s)
#define DIRECT_MMAP(s) fake_mmap(s)
#define DIRECT_MUNMAP(a, s) fake_munmap(a, s)
#define USE_DL_PREFIX
#define HAVE_MORECORE 0
#define DEFAULT_MMAP_THRESHOLD MAX_SIZE_T
#define DEFAULT_GRANULARITY ((size_t)128U * 1024U)

#include "plasma/thirdparty/dlmalloc.c" // NOLINT

#undef MMAP
#undef MUNMAP
#undef DIRECT_MMAP
#undef DIRECT_MUNMAP
#undef USE_DL_PREFIX
#undef HAVE_MORECORE
#undef DEFAULT_GRANULARITY

// dlmalloc.c defined DEBUG which will conflict with ARROW_LOG(DEBUG).
#ifdef DEBUG
#undef DEBUG
#endif

constexpr int GRANULARITY_MULTIPLIER = 2;

static void* pointer_advance(void* p, ptrdiff_t n) { return (unsigned char*)p + n; }

static void* pointer_retreat(void* p, ptrdiff_t n) { return (unsigned char*)p - n; }

// Create a buffer. This is creating a temporary file and then
// immediately unlinking it so we do not leave traces in the system.
int create_buffer(int64_t size) {
int fd;
std::string file_template = plasma_config->directory;
#ifdef _WIN32
if (!CreateFileMapping(INVALID_HANDLE_VALUE, NULL, PAGE_READWRITE,
(DWORD)((uint64_t)size >> (CHAR_BIT * sizeof(DWORD))),
(DWORD)(uint64_t)size, NULL)) {
fd = -1;
}
#else
file_template += "/plasmaXXXXXX";
std::vector<char> file_name(file_template.begin(), file_template.end());
file_name.push_back('\0');
fd = mkstemp(&file_name[0]);
if (fd < 0) {
ARROW_LOG(FATAL) << "create_buffer failed to open file " << &file_name[0];
return -1;
}
// Immediately unlink the file so we do not leave traces in the system.
if (unlink(&file_name[0]) != 0) {
ARROW_LOG(FATAL) << "failed to unlink file " << &file_name[0];
return -1;
}
if (!plasma_config->hugepages_enabled) {
// Increase the size of the file to the desired size. This seems not to be
// needed for files that are backed by the huge page fs, see also
// http://www.mail-archive.com/kvm-devel@lists.sourceforge.net/msg14737.html
if (ftruncate(fd, (off_t)size) != 0) {
ARROW_LOG(FATAL) << "failed to ftruncate file " << &file_name[0];
return -1;
}
}
#endif
return fd;
}

void* fake_mmap(size_t size) {
// Add kMmapRegionsGap so that the returned pointer is deliberately not
// page-aligned. This ensures that the segments of memory returned by
// fake_mmap are never contiguous.
size += kMmapRegionsGap;

int fd = create_buffer(size);
ARROW_CHECK(fd >= 0) << "Failed to create buffer during mmap";
// MAP_POPULATE can be used to pre-populate the page tables for this memory region
// which avoids work when accessing the pages later. However it causes long pauses
// when mmapping the files. Only supported on Linux.
void* pointer = mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
if (pointer == MAP_FAILED) {
ARROW_LOG(ERROR) << "mmap failed with error: " << std::strerror(errno);
if (errno == ENOMEM && plasma_config->hugepages_enabled) {
ARROW_LOG(ERROR)
<< " (this probably means you have to increase /proc/sys/vm/nr_hugepages)";
}
return pointer;
}

// Increase dlmalloc's allocation granularity directly.
mparams.granularity *= GRANULARITY_MULTIPLIER;

MmapRecord& record = mmap_records[pointer];
record.fd = fd;
record.size = size;

// We lie to dlmalloc about where mapped memory actually lives.
pointer = pointer_advance(pointer, kMmapRegionsGap);
ARROW_LOG(DEBUG) << pointer << " = fake_mmap(" << size << ")";
return pointer;
}

int fake_munmap(void* addr, int64_t size) {
ARROW_LOG(DEBUG) << "fake_munmap(" << addr << ", " << size << ")";
addr = pointer_retreat(addr, kMmapRegionsGap);
size += kMmapRegionsGap;

auto entry = mmap_records.find(addr);

if (entry == mmap_records.end() || entry->second.size != size) {
// Reject requests to munmap that don't directly match previous
// calls to mmap, to prevent dlmalloc from trimming.
return -1;
}

int r = munmap(addr, size);
if (r == 0) {
close(entry->second.fd);
}

mmap_records.erase(entry);
return r;
}

void SetMallocGranularity(int value) { change_mparam(M_GRANULARITY, value); }

} // namespace plasma
2 changes: 1 addition & 1 deletion cpp/src/plasma/events.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
#include <errno.h>

extern "C" {
#include "ae/ae.h"
#include "plasma/thirdparty/ae/ae.h"
}

namespace plasma {
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/plasma/io.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@
using arrow::Status;

/// Number of times we try connecting to a socket.
constexpr int64_t kNumConnectAttempts = 50;
constexpr int64_t kNumConnectAttempts = 20;
/// Time to wait between connection attempts to a socket.
constexpr int64_t kConnectTimeoutMs = 100;
constexpr int64_t kConnectTimeoutMs = 400;

namespace plasma {

Expand Down
Loading