Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARROW-4296: [Plasma] Use one mmap file by default, prevent crash with -f #3490

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 19 additions & 23 deletions cpp/src/plasma/store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -905,21 +905,22 @@ class PlasmaStoreRunner {
PlasmaStoreRunner() {}

void Start(char* socket_name, int64_t system_memory, std::string directory,
bool hugepages_enabled, bool use_one_memory_mapped_file) {
bool hugepages_enabled) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove the command line flag also.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

// Create the event loop.
loop_.reset(new EventLoop);
store_.reset(
new PlasmaStore(loop_.get(), system_memory, directory, hugepages_enabled));
plasma_config = store_->GetPlasmaStoreInfo();

// If the store is configured to use a single memory-mapped file, then we
// achieve that by mallocing and freeing a single large amount of space.
// that maximum allowed size up front.
if (use_one_memory_mapped_file) {
void* pointer = plasma::dlmemalign(kBlockSize, system_memory);
ARROW_CHECK(pointer != nullptr);
plasma::dlfree(pointer);
}
// We are using a single memory-mapped file by mallocing and freeing a single
// large amount of space up front. According to the documentation,
// dlmalloc might need up to 128*sizeof(size_t) bytes for internal
// bookkeeping.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add note saying that this relies on implementation details of dlmalloc (that after the initial memory mapped file is unmapped, subsequent memory-mapped files will use the same granularity as the first page) and that if we switch to using jemalloc, this may need to change.

Alteratively, you could do

void* pointer_big = plasma::dlmemalign(kBlockSize, system_memory - 128 * sizeof(size_t));
// We do not deallocate this small object so that the memory mapped file never gets unmapped.
void* pointer_small = plasma::dlmalloc(1);
plasma::dlfree(pointer_big);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

void* pointer = plasma::dlmemalign(kBlockSize, system_memory - 256 * sizeof(size_t));
ARROW_CHECK(pointer != nullptr);
// This will unmap the file, but the next one created will be as large
// as this one (this is an implementation detail of dlmalloc).
plasma::dlfree(pointer);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We know that this won't cause the page to get unmapped?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried it out and it will, but upon remapping, dlmalloc will use the large granularity size, so this has the desired effect of using one large mmap file.


int socket = BindIpcSock(socket_name, true);
// TODO(pcm): Check return value.
Expand Down Expand Up @@ -955,15 +956,14 @@ void HandleSignal(int signal) {
}

void StartServer(char* socket_name, int64_t system_memory, std::string plasma_directory,
bool hugepages_enabled, bool use_one_memory_mapped_file) {
bool hugepages_enabled) {
// Ignore SIGPIPE signals. If we don't do this, then when we attempt to write
// to a client that has already died, the store could die.
signal(SIGPIPE, SIG_IGN);

g_runner.reset(new PlasmaStoreRunner());
signal(SIGTERM, HandleSignal);
g_runner->Start(socket_name, system_memory, plasma_directory, hugepages_enabled,
use_one_memory_mapped_file);
g_runner->Start(socket_name, system_memory, plasma_directory, hugepages_enabled);
}

} // namespace plasma
Expand All @@ -975,11 +975,9 @@ int main(int argc, char* argv[]) {
// Directory where plasma memory mapped files are stored.
std::string plasma_directory;
bool hugepages_enabled = false;
// True if a single large memory-mapped file should be created at startup.
bool use_one_memory_mapped_file = false;
int64_t system_memory = -1;
int c;
while ((c = getopt(argc, argv, "s:m:d:hf")) != -1) {
while ((c = getopt(argc, argv, "s:m:d:h")) != -1) {
switch (c) {
case 'd':
plasma_directory = std::string(optarg);
Expand All @@ -994,14 +992,16 @@ int main(int argc, char* argv[]) {
char extra;
int scanned = sscanf(optarg, "%" SCNd64 "%c", &system_memory, &extra);
ARROW_CHECK(scanned == 1);
// Set system memory, potentially rounding it to a page size
// Also make it so dlmalloc fails if we try to request more memory than
// is available.
system_memory =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While you're looking at this code, any idea about this ray-project/ray#3670

Somehow the way the object store computes the total system memory is different from the way we do it in Python (e.g., using psutil).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure what is going on but the only way to guarantee that the two sizes are consistent is to use the same method in both (either psutil or the system call we use here)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well we can't really use psutil here because it is in C++, right?

plasma::dlmalloc_set_footprint_limit(static_cast<size_t>(system_memory));
ARROW_LOG(INFO) << "Allowing the Plasma store to use up to "
<< static_cast<double>(system_memory) / 1000000000
<< "GB of memory.";
break;
}
case 'f':
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove f from getopt above

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

use_one_memory_mapped_file = true;
break;
default:
exit(-1);
}
Expand Down Expand Up @@ -1051,12 +1051,8 @@ int main(int argc, char* argv[]) {
SetMallocGranularity(1024 * 1024 * 1024); // 1 GB
}
#endif
// Make it so dlmalloc fails if we try to request more memory than is
// available.
plasma::dlmalloc_set_footprint_limit((size_t)system_memory);
ARROW_LOG(DEBUG) << "starting server listening on " << socket_name;
plasma::StartServer(socket_name, system_memory, plasma_directory, hugepages_enabled,
use_one_memory_mapped_file);
plasma::StartServer(socket_name, system_memory, plasma_directory, hugepages_enabled);
plasma::g_runner->Shutdown();
plasma::g_runner = nullptr;

Expand Down
2 changes: 1 addition & 1 deletion cpp/src/plasma/test/client_tests.cc
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ class TestPlasmaStore : public ::testing::Test {
std::string plasma_directory =
test_executable.substr(0, test_executable.find_last_of("/"));
std::string plasma_command = plasma_directory +
"/plasma_store_server -m 1000000000 -s " +
"/plasma_store_server -m 10000000 -s " +
store_socket_name_ + " 1> /dev/null 2> /dev/null &";
system(plasma_command.c_str());
ARROW_CHECK_OK(client_.Connect(store_socket_name_, ""));
Expand Down
5 changes: 0 additions & 5 deletions python/pyarrow/plasma.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ def build_plasma_tensorflow_op():
@contextlib.contextmanager
def start_plasma_store(plasma_store_memory,
use_valgrind=False, use_profiler=False,
use_one_memory_mapped_file=False,
plasma_directory=None, use_hugepages=False):
"""Start a plasma store process.
Args:
Expand All @@ -87,8 +86,6 @@ def start_plasma_store(plasma_store_memory,
of valgrind. If this is True, use_profiler must be False.
use_profiler (bool): True if the plasma store should be started inside
a profiler. If this is True, use_valgrind must be False.
use_one_memory_mapped_file: If True, then the store will use only a
single memory-mapped file.
plasma_directory (str): Directory where plasma memory mapped files
will be stored.
use_hugepages (bool): True if the plasma store should use huge pages.
Expand All @@ -107,8 +104,6 @@ def start_plasma_store(plasma_store_memory,
command = [plasma_store_executable,
"-s", plasma_store_name,
"-m", str(plasma_store_memory)]
if use_one_memory_mapped_file:
command += ["-f"]
if plasma_directory:
command += ["-d", plasma_directory]
if use_hugepages:
Expand Down
25 changes: 13 additions & 12 deletions python/pyarrow/tests/test_plasma.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@

DEFAULT_PLASMA_STORE_MEMORY = 10 ** 8
USE_VALGRIND = os.getenv("PLASMA_VALGRIND") == "1"
SMALL_OBJECT_SIZE = 9000


def random_name():
Expand Down Expand Up @@ -110,15 +111,11 @@ def assert_get_object_equal(unit_test, client1, client2, object_id,
class TestPlasmaClient(object):

def setup_method(self, test_method):
use_one_memory_mapped_file = (test_method ==
self.test_use_one_memory_mapped_file)

import pyarrow.plasma as plasma
# Start Plasma store.
self.plasma_store_ctx = plasma.start_plasma_store(
plasma_store_memory=DEFAULT_PLASMA_STORE_MEMORY,
use_valgrind=USE_VALGRIND,
use_one_memory_mapped_file=use_one_memory_mapped_file)
use_valgrind=USE_VALGRIND)
self.plasma_store_name, self.p = self.plasma_store_ctx.__enter__()
# Connect to Plasma.
self.plasma_client = plasma.connect(self.plasma_store_name)
Expand Down Expand Up @@ -471,22 +468,26 @@ def assert_create_raises_plasma_full(unit_test, size):
memory_buffers.append(memory_buffer)
# Remaining space is 50%. Make sure that we can't create an
# object of size 50% + 1, but we can create one of size 20%.
assert_create_raises_plasma_full(self, 50 * PERCENT + 1)
assert_create_raises_plasma_full(
self, 50 * PERCENT + SMALL_OBJECT_SIZE)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why replace 1 with SMALL_OBJECT_SIZE?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because right now, the memory capacity can (and will!) be off by some amount at the moment (due to the way that dlmalloc is computing the footprint limit).

_, memory_buffer, _ = create_object(self.plasma_client, 20 * PERCENT)
del memory_buffer
_, memory_buffer, _ = create_object(self.plasma_client, 20 * PERCENT)
del memory_buffer
assert_create_raises_plasma_full(self, 50 * PERCENT + 1)
assert_create_raises_plasma_full(
self, 50 * PERCENT + SMALL_OBJECT_SIZE)

_, memory_buffer, _ = create_object(self.plasma_client, 20 * PERCENT)
memory_buffers.append(memory_buffer)
# Remaining space is 30%.
assert_create_raises_plasma_full(self, 30 * PERCENT + 1)
assert_create_raises_plasma_full(
self, 30 * PERCENT + SMALL_OBJECT_SIZE)

_, memory_buffer, _ = create_object(self.plasma_client, 10 * PERCENT)
memory_buffers.append(memory_buffer)
# Remaining space is 20%.
assert_create_raises_plasma_full(self, 20 * PERCENT + 1)
assert_create_raises_plasma_full(
self, 20 * PERCENT + SMALL_OBJECT_SIZE)

def test_contains(self):
fake_object_ids = [random_object_id() for _ in range(100)]
Expand Down Expand Up @@ -838,7 +839,7 @@ def test_subscribe_deletions(self):
assert -1 == recv_dsize
assert -1 == recv_msize

def test_use_one_memory_mapped_file(self):
def test_use_full_memory(self):
# Fill the object store up with a large number of small objects and let
# them go out of scope.
for _ in range(100):
Expand All @@ -851,8 +852,8 @@ def test_use_one_memory_mapped_file(self):
create_object(self.plasma_client2, DEFAULT_PLASMA_STORE_MEMORY, 0)
# Verify that an object that is too large does not fit.
with pytest.raises(pa.lib.PlasmaStoreFull):
create_object(self.plasma_client2, DEFAULT_PLASMA_STORE_MEMORY + 1,
0)
create_object(self.plasma_client2,
DEFAULT_PLASMA_STORE_MEMORY + SMALL_OBJECT_SIZE, 0)

def test_client_death_during_get(self):
import pyarrow.plasma as plasma
Expand Down