-
Notifications
You must be signed in to change notification settings - Fork 3.9k
ARROW-4983: [Plasma] Unmap memory upon destruction of the PlasmaClient #4001
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
53e4b1d
af51c7a
0a6203b
7988a85
784f9af
893bcdc
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -105,7 +105,7 @@ static std::mutex gpu_mutex; | |
// PlasmaBuffer | ||
|
||
/// A Buffer class that automatically releases the backing plasma object | ||
/// when it goes out of scope. | ||
/// when it goes out of scope. This is returned by Get. | ||
class ARROW_NO_EXPORT PlasmaBuffer : public Buffer { | ||
public: | ||
~PlasmaBuffer(); | ||
|
@@ -123,6 +123,19 @@ class ARROW_NO_EXPORT PlasmaBuffer : public Buffer { | |
ObjectID object_id_; | ||
}; | ||
|
||
/// A mutable Buffer class that keeps the backing data alive by keeping a | ||
/// PlasmaClient shared pointer. This is returned by Create. Release will | ||
/// be called in the associated Seal call. | ||
class ARROW_NO_EXPORT PlasmaMutableBuffer : public MutableBuffer { | ||
public: | ||
PlasmaMutableBuffer(std::shared_ptr<PlasmaClient::Impl> client, uint8_t* mutable_data, | ||
int64_t data_size) | ||
: MutableBuffer(mutable_data, data_size), client_(client) {} | ||
|
||
private: | ||
std::shared_ptr<PlasmaClient::Impl> client_; | ||
}; | ||
|
||
// ---------------------------------------------------------------------- | ||
// PlasmaClient::Impl | ||
|
||
|
@@ -140,13 +153,47 @@ struct ObjectInUseEntry { | |
bool is_sealed; | ||
}; | ||
|
||
struct ClientMmapTableEntry { | ||
class ClientMmapTableEntry { | ||
public: | ||
ClientMmapTableEntry(int fd, int64_t map_size) | ||
: fd_(fd), pointer_(nullptr), length_(0) { | ||
// We subtract kMmapRegionsGap from the length that was added | ||
// in fake_mmap in malloc.h, to make map_size page-aligned again. | ||
length_ = map_size - kMmapRegionsGap; | ||
pointer_ = reinterpret_cast<uint8_t*>( | ||
mmap(NULL, length_, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0)); | ||
// TODO(pcm): Don't fail here, instead return a Status. | ||
if (pointer_ == MAP_FAILED) { | ||
ARROW_LOG(FATAL) << "mmap failed"; | ||
} | ||
close(fd); // Closing this fd has an effect on performance. | ||
} | ||
|
||
~ClientMmapTableEntry() { | ||
// At this point it is safe to unmap the memory, as the PlasmaBuffer | ||
// keeps the PlasmaClient (and therefore the ClientMmapTableEntry) | ||
// alive until it is destroyed. | ||
// We don't need to close the associated file, since it has | ||
// already been closed in the constructor. | ||
int r = munmap(pointer_, length_); | ||
if (r != 0) { | ||
ARROW_LOG(ERROR) << "munmap returned " << r << ", errno = " << errno; | ||
} | ||
} | ||
|
||
uint8_t* pointer() { return pointer_; } | ||
|
||
int fd() { return fd_; } | ||
|
||
private: | ||
/// The associated file descriptor on the client. | ||
int fd; | ||
int fd_; | ||
/// The result of mmap for this file descriptor. | ||
uint8_t* pointer; | ||
uint8_t* pointer_; | ||
/// The length of the memory-mapped file. | ||
size_t length; | ||
size_t length_; | ||
|
||
ARROW_DISALLOW_COPY_AND_ASSIGN(ClientMmapTableEntry); | ||
}; | ||
|
||
class PlasmaClient::Impl : public std::enable_shared_from_this<PlasmaClient::Impl> { | ||
|
@@ -244,7 +291,7 @@ class PlasmaClient::Impl : public std::enable_shared_from_this<PlasmaClient::Imp | |
/// Table of dlmalloc buffer files that have been memory mapped so far. This | ||
/// is a hash table mapping a file descriptor to a struct containing the | ||
/// address of the corresponding memory-mapped file. | ||
std::unordered_map<int, ClientMmapTableEntry> mmap_table_; | ||
std::unordered_map<int, std::unique_ptr<ClientMmapTableEntry>> mmap_table_; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh do you need that in order to compile when using There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, without the unique_ptr it would not be correct (upon rehashing). The constructor will run with unique_ptr as well. |
||
/// A hash table of the object IDs that are currently being used by this | ||
/// client. | ||
std::unordered_map<ObjectID, std::unique_ptr<ObjectInUseEntry>> objects_in_use_; | ||
|
@@ -277,23 +324,11 @@ PlasmaClient::Impl::~Impl() {} | |
uint8_t* PlasmaClient::Impl::LookupOrMmap(int fd, int store_fd_val, int64_t map_size) { | ||
auto entry = mmap_table_.find(store_fd_val); | ||
if (entry != mmap_table_.end()) { | ||
return entry->second.pointer; | ||
return entry->second->pointer(); | ||
} else { | ||
// We subtract kMmapRegionsGap from the length that was added | ||
// in fake_mmap in malloc.h, to make map_size page-aligned again. | ||
uint8_t* result = reinterpret_cast<uint8_t*>(mmap( | ||
NULL, map_size - kMmapRegionsGap, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0)); | ||
// TODO(pcm): Don't fail here, instead return a Status. | ||
if (result == MAP_FAILED) { | ||
ARROW_LOG(FATAL) << "mmap failed"; | ||
} | ||
close(fd); // Closing this fd has an effect on performance. | ||
|
||
ClientMmapTableEntry& entry = mmap_table_[store_fd_val]; | ||
entry.fd = fd; | ||
entry.pointer = result; | ||
entry.length = map_size; | ||
return result; | ||
mmap_table_[store_fd_val] = | ||
std::unique_ptr<ClientMmapTableEntry>(new ClientMmapTableEntry(fd, map_size)); | ||
return mmap_table_[store_fd_val]->pointer(); | ||
} | ||
} | ||
|
||
|
@@ -302,7 +337,7 @@ uint8_t* PlasmaClient::Impl::LookupOrMmap(int fd, int store_fd_val, int64_t map_ | |
uint8_t* PlasmaClient::Impl::LookupMmappedFile(int store_fd_val) { | ||
auto entry = mmap_table_.find(store_fd_val); | ||
ARROW_CHECK(entry != mmap_table_.end()); | ||
return entry->second.pointer; | ||
return entry->second->pointer(); | ||
} | ||
|
||
bool PlasmaClient::Impl::IsInUse(const ObjectID& object_id) { | ||
|
@@ -317,7 +352,7 @@ int PlasmaClient::Impl::GetStoreFd(int store_fd) { | |
ARROW_CHECK(fd >= 0) << "recv not successful"; | ||
return fd; | ||
} else { | ||
return entry->second.fd; | ||
return entry->second->fd(); | ||
} | ||
} | ||
|
||
|
@@ -369,8 +404,9 @@ Status PlasmaClient::Impl::Create(const ObjectID& object_id, int64_t data_size, | |
ARROW_CHECK(object.metadata_size == metadata_size); | ||
// The metadata should come right after the data. | ||
ARROW_CHECK(object.metadata_offset == object.data_offset + data_size); | ||
*data = std::make_shared<MutableBuffer>( | ||
LookupOrMmap(fd, store_fd, mmap_size) + object.data_offset, data_size); | ||
*data = std::make_shared<PlasmaMutableBuffer>( | ||
shared_from_this(), LookupOrMmap(fd, store_fd, mmap_size) + object.data_offset, | ||
data_size); | ||
// If plasma_create is being called from a transfer, then we will not copy the | ||
// metadata here. The metadata will be written along with the data streamed | ||
// from the transfer. | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there's no point to those initializations, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh I see @fsaintjacques suggested this. That's fine I guess, but what's the reason for this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because if we remove the copy constructor, then move can swap in un-initialized values.