-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ARROW-4296: [Plasma] Use one mmap file by default, prevent crash with -f #3490
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -905,21 +905,22 @@ class PlasmaStoreRunner { | |
PlasmaStoreRunner() {} | ||
|
||
void Start(char* socket_name, int64_t system_memory, std::string directory, | ||
bool hugepages_enabled, bool use_one_memory_mapped_file) { | ||
bool hugepages_enabled) { | ||
// Create the event loop. | ||
loop_.reset(new EventLoop); | ||
store_.reset( | ||
new PlasmaStore(loop_.get(), system_memory, directory, hugepages_enabled)); | ||
plasma_config = store_->GetPlasmaStoreInfo(); | ||
|
||
// If the store is configured to use a single memory-mapped file, then we | ||
// achieve that by mallocing and freeing a single large amount of space. | ||
// that maximum allowed size up front. | ||
if (use_one_memory_mapped_file) { | ||
void* pointer = plasma::dlmemalign(kBlockSize, system_memory); | ||
ARROW_CHECK(pointer != nullptr); | ||
plasma::dlfree(pointer); | ||
} | ||
// We are using a single memory-mapped file by mallocing and freeing a single | ||
// large amount of space up front. According to the documentation, | ||
// dlmalloc might need up to 128*sizeof(size_t) bytes for internal | ||
// bookkeeping. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add note saying that this relies on implementation details of dlmalloc (that after the initial memory mapped file is unmapped, subsequent memory-mapped files will use the same granularity as the first page) and that if we switch to using jemalloc, this may need to change. Alteratively, you could do void* pointer_big = plasma::dlmemalign(kBlockSize, system_memory - 128 * sizeof(size_t));
// We do not deallocate this small object so that the memory mapped file never gets unmapped.
void* pointer_small = plasma::dlmalloc(1);
plasma::dlfree(pointer_big); There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
void* pointer = plasma::dlmemalign(kBlockSize, system_memory - 256 * sizeof(size_t)); | ||
ARROW_CHECK(pointer != nullptr); | ||
// This will unmap the file, but the next one created will be as large | ||
// as this one (this is an implementation detail of dlmalloc). | ||
plasma::dlfree(pointer); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We know that this won't cause the page to get unmapped? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I tried it out and it will, but upon remapping, dlmalloc will use the large granularity size, so this has the desired effect of using one large mmap file. |
||
|
||
int socket = BindIpcSock(socket_name, true); | ||
// TODO(pcm): Check return value. | ||
|
@@ -955,15 +956,14 @@ void HandleSignal(int signal) { | |
} | ||
|
||
void StartServer(char* socket_name, int64_t system_memory, std::string plasma_directory, | ||
bool hugepages_enabled, bool use_one_memory_mapped_file) { | ||
bool hugepages_enabled) { | ||
// Ignore SIGPIPE signals. If we don't do this, then when we attempt to write | ||
// to a client that has already died, the store could die. | ||
signal(SIGPIPE, SIG_IGN); | ||
|
||
g_runner.reset(new PlasmaStoreRunner()); | ||
signal(SIGTERM, HandleSignal); | ||
g_runner->Start(socket_name, system_memory, plasma_directory, hugepages_enabled, | ||
use_one_memory_mapped_file); | ||
g_runner->Start(socket_name, system_memory, plasma_directory, hugepages_enabled); | ||
} | ||
|
||
} // namespace plasma | ||
|
@@ -975,11 +975,9 @@ int main(int argc, char* argv[]) { | |
// Directory where plasma memory mapped files are stored. | ||
std::string plasma_directory; | ||
bool hugepages_enabled = false; | ||
// True if a single large memory-mapped file should be created at startup. | ||
bool use_one_memory_mapped_file = false; | ||
int64_t system_memory = -1; | ||
int c; | ||
while ((c = getopt(argc, argv, "s:m:d:hf")) != -1) { | ||
while ((c = getopt(argc, argv, "s:m:d:h")) != -1) { | ||
switch (c) { | ||
case 'd': | ||
plasma_directory = std::string(optarg); | ||
|
@@ -994,14 +992,16 @@ int main(int argc, char* argv[]) { | |
char extra; | ||
int scanned = sscanf(optarg, "%" SCNd64 "%c", &system_memory, &extra); | ||
ARROW_CHECK(scanned == 1); | ||
// Set system memory, potentially rounding it to a page size | ||
// Also make it so dlmalloc fails if we try to request more memory than | ||
// is available. | ||
system_memory = | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. While you're looking at this code, any idea about this ray-project/ray#3670 Somehow the way the object store computes the total system memory is different from the way we do it in Python (e.g., using There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. not sure what is going on but the only way to guarantee that the two sizes are consistent is to use the same method in both (either psutil or the system call we use here) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Well we can't really use psutil here because it is in C++, right? |
||
plasma::dlmalloc_set_footprint_limit(static_cast<size_t>(system_memory)); | ||
ARROW_LOG(INFO) << "Allowing the Plasma store to use up to " | ||
<< static_cast<double>(system_memory) / 1000000000 | ||
<< "GB of memory."; | ||
break; | ||
} | ||
case 'f': | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Remove There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
use_one_memory_mapped_file = true; | ||
break; | ||
default: | ||
exit(-1); | ||
} | ||
|
@@ -1051,12 +1051,8 @@ int main(int argc, char* argv[]) { | |
SetMallocGranularity(1024 * 1024 * 1024); // 1 GB | ||
} | ||
#endif | ||
// Make it so dlmalloc fails if we try to request more memory than is | ||
// available. | ||
plasma::dlmalloc_set_footprint_limit((size_t)system_memory); | ||
ARROW_LOG(DEBUG) << "starting server listening on " << socket_name; | ||
plasma::StartServer(socket_name, system_memory, plasma_directory, hugepages_enabled, | ||
use_one_memory_mapped_file); | ||
plasma::StartServer(socket_name, system_memory, plasma_directory, hugepages_enabled); | ||
plasma::g_runner->Shutdown(); | ||
plasma::g_runner = nullptr; | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -37,6 +37,7 @@ | |
|
||
DEFAULT_PLASMA_STORE_MEMORY = 10 ** 8 | ||
USE_VALGRIND = os.getenv("PLASMA_VALGRIND") == "1" | ||
SMALL_OBJECT_SIZE = 9000 | ||
|
||
|
||
def random_name(): | ||
|
@@ -110,15 +111,11 @@ def assert_get_object_equal(unit_test, client1, client2, object_id, | |
class TestPlasmaClient(object): | ||
|
||
def setup_method(self, test_method): | ||
use_one_memory_mapped_file = (test_method == | ||
self.test_use_one_memory_mapped_file) | ||
|
||
import pyarrow.plasma as plasma | ||
# Start Plasma store. | ||
self.plasma_store_ctx = plasma.start_plasma_store( | ||
plasma_store_memory=DEFAULT_PLASMA_STORE_MEMORY, | ||
use_valgrind=USE_VALGRIND, | ||
use_one_memory_mapped_file=use_one_memory_mapped_file) | ||
use_valgrind=USE_VALGRIND) | ||
self.plasma_store_name, self.p = self.plasma_store_ctx.__enter__() | ||
# Connect to Plasma. | ||
self.plasma_client = plasma.connect(self.plasma_store_name) | ||
|
@@ -471,22 +468,26 @@ def assert_create_raises_plasma_full(unit_test, size): | |
memory_buffers.append(memory_buffer) | ||
# Remaining space is 50%. Make sure that we can't create an | ||
# object of size 50% + 1, but we can create one of size 20%. | ||
assert_create_raises_plasma_full(self, 50 * PERCENT + 1) | ||
assert_create_raises_plasma_full( | ||
self, 50 * PERCENT + SMALL_OBJECT_SIZE) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why replace There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. because right now, the memory capacity can (and will!) be off by some amount at the moment (due to the way that dlmalloc is computing the footprint limit). |
||
_, memory_buffer, _ = create_object(self.plasma_client, 20 * PERCENT) | ||
del memory_buffer | ||
_, memory_buffer, _ = create_object(self.plasma_client, 20 * PERCENT) | ||
del memory_buffer | ||
assert_create_raises_plasma_full(self, 50 * PERCENT + 1) | ||
assert_create_raises_plasma_full( | ||
self, 50 * PERCENT + SMALL_OBJECT_SIZE) | ||
|
||
_, memory_buffer, _ = create_object(self.plasma_client, 20 * PERCENT) | ||
memory_buffers.append(memory_buffer) | ||
# Remaining space is 30%. | ||
assert_create_raises_plasma_full(self, 30 * PERCENT + 1) | ||
assert_create_raises_plasma_full( | ||
self, 30 * PERCENT + SMALL_OBJECT_SIZE) | ||
|
||
_, memory_buffer, _ = create_object(self.plasma_client, 10 * PERCENT) | ||
memory_buffers.append(memory_buffer) | ||
# Remaining space is 20%. | ||
assert_create_raises_plasma_full(self, 20 * PERCENT + 1) | ||
assert_create_raises_plasma_full( | ||
self, 20 * PERCENT + SMALL_OBJECT_SIZE) | ||
|
||
def test_contains(self): | ||
fake_object_ids = [random_object_id() for _ in range(100)] | ||
|
@@ -838,7 +839,7 @@ def test_subscribe_deletions(self): | |
assert -1 == recv_dsize | ||
assert -1 == recv_msize | ||
|
||
def test_use_one_memory_mapped_file(self): | ||
def test_use_full_memory(self): | ||
# Fill the object store up with a large number of small objects and let | ||
# them go out of scope. | ||
for _ in range(100): | ||
|
@@ -851,8 +852,8 @@ def test_use_one_memory_mapped_file(self): | |
create_object(self.plasma_client2, DEFAULT_PLASMA_STORE_MEMORY, 0) | ||
# Verify that an object that is too large does not fit. | ||
with pytest.raises(pa.lib.PlasmaStoreFull): | ||
create_object(self.plasma_client2, DEFAULT_PLASMA_STORE_MEMORY + 1, | ||
0) | ||
create_object(self.plasma_client2, | ||
DEFAULT_PLASMA_STORE_MEMORY + SMALL_OBJECT_SIZE, 0) | ||
|
||
def test_client_death_during_get(self): | ||
import pyarrow.plasma as plasma | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove the command line flag also.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed