Skip to content

Commit 117d03a

Browse files
committed
use only one mmapped file
1 parent 9a64805 commit 117d03a

File tree

2 files changed

+26
-26
lines changed

2 files changed

+26
-26
lines changed

cpp/src/plasma/store.cc

Lines changed: 15 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -905,21 +905,20 @@ class PlasmaStoreRunner {
905905
PlasmaStoreRunner() {}
906906

907907
void Start(char* socket_name, int64_t system_memory, std::string directory,
908-
bool hugepages_enabled, bool use_one_memory_mapped_file) {
908+
bool hugepages_enabled) {
909909
// Create the event loop.
910910
loop_.reset(new EventLoop);
911911
store_.reset(
912912
new PlasmaStore(loop_.get(), system_memory, directory, hugepages_enabled));
913913
plasma_config = store_->GetPlasmaStoreInfo();
914914

915-
// If the store is configured to use a single memory-mapped file, then we
916-
// achieve that by mallocing and freeing a single large amount of space.
917-
// that maximum allowed size up front.
918-
if (use_one_memory_mapped_file) {
919-
void* pointer = plasma::dlmemalign(kBlockSize, system_memory);
920-
ARROW_CHECK(pointer != nullptr);
921-
plasma::dlfree(pointer);
922-
}
915+
// We are using a single memory-mapped file by mallocing and freeing a single
916+
// large amount of space up front. According to the documentation,
917+
// dlmalloc might need up to 128*sizeof(size_t) bytes for internal
918+
// bookkeeping.
919+
void* pointer = plasma::dlmemalign(kBlockSize, system_memory - 128 * sizeof(size_t));
920+
ARROW_CHECK(pointer != nullptr);
921+
plasma::dlfree(pointer);
923922

924923
int socket = BindIpcSock(socket_name, true);
925924
// TODO(pcm): Check return value.
@@ -955,15 +954,14 @@ void HandleSignal(int signal) {
955954
}
956955

957956
void StartServer(char* socket_name, int64_t system_memory, std::string plasma_directory,
958-
bool hugepages_enabled, bool use_one_memory_mapped_file) {
957+
bool hugepages_enabled) {
959958
// Ignore SIGPIPE signals. If we don't do this, then when we attempt to write
960959
// to a client that has already died, the store could die.
961960
signal(SIGPIPE, SIG_IGN);
962961

963962
g_runner.reset(new PlasmaStoreRunner());
964963
signal(SIGTERM, HandleSignal);
965-
g_runner->Start(socket_name, system_memory, plasma_directory, hugepages_enabled,
966-
use_one_memory_mapped_file);
964+
g_runner->Start(socket_name, system_memory, plasma_directory, hugepages_enabled);
967965
}
968966

969967
} // namespace plasma
@@ -975,8 +973,6 @@ int main(int argc, char* argv[]) {
975973
// Directory where plasma memory mapped files are stored.
976974
std::string plasma_directory;
977975
bool hugepages_enabled = false;
978-
// True if a single large memory-mapped file should be created at startup.
979-
bool use_one_memory_mapped_file = false;
980976
int64_t system_memory = -1;
981977
int c;
982978
while ((c = getopt(argc, argv, "s:m:d:hf")) != -1) {
@@ -994,13 +990,16 @@ int main(int argc, char* argv[]) {
994990
char extra;
995991
int scanned = sscanf(optarg, "%" SCNd64 "%c", &system_memory, &extra);
996992
ARROW_CHECK(scanned == 1);
993+
// Set system memory, potentially rounding it to a page size
994+
// Also make it so dlmalloc fails if we try to request more memory than
995+
// is available.
996+
system_memory = plasma::dlmalloc_set_footprint_limit((size_t)system_memory);
997997
ARROW_LOG(INFO) << "Allowing the Plasma store to use up to "
998998
<< static_cast<double>(system_memory) / 1000000000
999999
<< "GB of memory.";
10001000
break;
10011001
}
10021002
case 'f':
1003-
use_one_memory_mapped_file = true;
10041003
break;
10051004
default:
10061005
exit(-1);
@@ -1051,12 +1050,8 @@ int main(int argc, char* argv[]) {
10511050
SetMallocGranularity(1024 * 1024 * 1024); // 1 GB
10521051
}
10531052
#endif
1054-
// Make it so dlmalloc fails if we try to request more memory than is
1055-
// available.
1056-
plasma::dlmalloc_set_footprint_limit((size_t)system_memory);
10571053
ARROW_LOG(DEBUG) << "starting server listening on " << socket_name;
1058-
plasma::StartServer(socket_name, system_memory, plasma_directory, hugepages_enabled,
1059-
use_one_memory_mapped_file);
1054+
plasma::StartServer(socket_name, system_memory, plasma_directory, hugepages_enabled);
10601055
plasma::g_runner->Shutdown();
10611056
plasma::g_runner = nullptr;
10621057

python/pyarrow/tests/test_plasma.py

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737

3838
DEFAULT_PLASMA_STORE_MEMORY = 10 ** 8
3939
USE_VALGRIND = os.getenv("PLASMA_VALGRIND") == "1"
40+
SMALL_OBJECT_SIZE = 9000
4041

4142

4243
def random_name():
@@ -471,22 +472,26 @@ def assert_create_raises_plasma_full(unit_test, size):
471472
memory_buffers.append(memory_buffer)
472473
# Remaining space is 50%. Make sure that we can't create an
473474
# object of size 50% + 1, but we can create one of size 20%.
474-
assert_create_raises_plasma_full(self, 50 * PERCENT + 1)
475+
assert_create_raises_plasma_full(
476+
self, 50 * PERCENT + SMALL_OBJECT_SIZE)
475477
_, memory_buffer, _ = create_object(self.plasma_client, 20 * PERCENT)
476478
del memory_buffer
477479
_, memory_buffer, _ = create_object(self.plasma_client, 20 * PERCENT)
478480
del memory_buffer
479-
assert_create_raises_plasma_full(self, 50 * PERCENT + 1)
481+
assert_create_raises_plasma_full(
482+
self, 50 * PERCENT + SMALL_OBJECT_SIZE)
480483

481484
_, memory_buffer, _ = create_object(self.plasma_client, 20 * PERCENT)
482485
memory_buffers.append(memory_buffer)
483486
# Remaining space is 30%.
484-
assert_create_raises_plasma_full(self, 30 * PERCENT + 1)
487+
assert_create_raises_plasma_full(
488+
self, 30 * PERCENT + SMALL_OBJECT_SIZE)
485489

486490
_, memory_buffer, _ = create_object(self.plasma_client, 10 * PERCENT)
487491
memory_buffers.append(memory_buffer)
488492
# Remaining space is 20%.
489-
assert_create_raises_plasma_full(self, 20 * PERCENT + 1)
493+
assert_create_raises_plasma_full(
494+
self, 20 * PERCENT + SMALL_OBJECT_SIZE)
490495

491496
def test_contains(self):
492497
fake_object_ids = [random_object_id() for _ in range(100)]
@@ -851,8 +856,8 @@ def test_use_one_memory_mapped_file(self):
851856
create_object(self.plasma_client2, DEFAULT_PLASMA_STORE_MEMORY, 0)
852857
# Verify that an object that is too large does not fit.
853858
with pytest.raises(pa.lib.PlasmaStoreFull):
854-
create_object(self.plasma_client2, DEFAULT_PLASMA_STORE_MEMORY + 1,
855-
0)
859+
create_object(self.plasma_client2,
860+
DEFAULT_PLASMA_STORE_MEMORY + SMALL_OBJECT_SIZE, 0)
856861

857862
def test_client_death_during_get(self):
858863
import pyarrow.plasma as plasma

0 commit comments

Comments
 (0)