Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 62 additions & 26 deletions cpp/src/plasma/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ static std::mutex gpu_mutex;
// PlasmaBuffer

/// A Buffer class that automatically releases the backing plasma object
/// when it goes out of scope.
/// when it goes out of scope. This is returned by Get.
class ARROW_NO_EXPORT PlasmaBuffer : public Buffer {
public:
~PlasmaBuffer();
Expand All @@ -123,6 +123,19 @@ class ARROW_NO_EXPORT PlasmaBuffer : public Buffer {
ObjectID object_id_;
};

/// A mutable Buffer class that keeps the backing data alive by keeping a
/// PlasmaClient shared pointer. This is returned by Create. Release will
/// be called in the associated Seal call.
class ARROW_NO_EXPORT PlasmaMutableBuffer : public MutableBuffer {
public:
PlasmaMutableBuffer(std::shared_ptr<PlasmaClient::Impl> client, uint8_t* mutable_data,
int64_t data_size)
: MutableBuffer(mutable_data, data_size), client_(client) {}

private:
std::shared_ptr<PlasmaClient::Impl> client_;
};

// ----------------------------------------------------------------------
// PlasmaClient::Impl

Expand All @@ -140,13 +153,47 @@ struct ObjectInUseEntry {
bool is_sealed;
};

struct ClientMmapTableEntry {
class ClientMmapTableEntry {
public:
ClientMmapTableEntry(int fd, int64_t map_size)
: fd_(fd), pointer_(nullptr), length_(0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
: fd_(fd), pointer_(nullptr), length_(0) {
: fd_(fd) {

there's no point to those initializations, right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I see @fsaintjacques suggested this. That's fine I guess, but what's the reason for this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because if we remove the copy constructor, then move can swap in un-initialized values.

// We subtract kMmapRegionsGap from the length that was added
// in fake_mmap in malloc.h, to make map_size page-aligned again.
length_ = map_size - kMmapRegionsGap;
pointer_ = reinterpret_cast<uint8_t*>(
mmap(NULL, length_, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0));
// TODO(pcm): Don't fail here, instead return a Status.
if (pointer_ == MAP_FAILED) {
ARROW_LOG(FATAL) << "mmap failed";
}
close(fd); // Closing this fd has an effect on performance.
}

~ClientMmapTableEntry() {
// At this point it is safe to unmap the memory, as the PlasmaBuffer
// keeps the PlasmaClient (and therefore the ClientMmapTableEntry)
// alive until it is destroyed.
// We don't need to close the associated file, since it has
// already been closed in the constructor.
int r = munmap(pointer_, length_);
if (r != 0) {
ARROW_LOG(ERROR) << "munmap returned " << r << ", errno = " << errno;
}
}

uint8_t* pointer() { return pointer_; }

int fd() { return fd_; }

private:
/// The associated file descriptor on the client.
int fd;
int fd_;
/// The result of mmap for this file descriptor.
uint8_t* pointer;
uint8_t* pointer_;
/// The length of the memory-mapped file.
size_t length;
size_t length_;

ARROW_DISALLOW_COPY_AND_ASSIGN(ClientMmapTableEntry);
};

class PlasmaClient::Impl : public std::enable_shared_from_this<PlasmaClient::Impl> {
Expand Down Expand Up @@ -244,7 +291,7 @@ class PlasmaClient::Impl : public std::enable_shared_from_this<PlasmaClient::Imp
/// Table of dlmalloc buffer files that have been memory mapped so far. This
/// is a hash table mapping a file descriptor to a struct containing the
/// address of the corresponding memory-mapped file.
std::unordered_map<int, ClientMmapTableEntry> mmap_table_;
std::unordered_map<int, std::unique_ptr<ClientMmapTableEntry>> mmap_table_;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why unique_ptr? If you do std::unordered_map<int, ClientMmapTableEntry>, then when you delete an entry from mmap_table_, its destructor will run.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh do you need that in order to compile when using ARROW_DISALLOW_COPY_AND_ASSIGN?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, without the unique_ptr it would not be correct (upon rehashing). The constructor will run with unique_ptr as well.

/// A hash table of the object IDs that are currently being used by this
/// client.
std::unordered_map<ObjectID, std::unique_ptr<ObjectInUseEntry>> objects_in_use_;
Expand Down Expand Up @@ -277,23 +324,11 @@ PlasmaClient::Impl::~Impl() {}
uint8_t* PlasmaClient::Impl::LookupOrMmap(int fd, int store_fd_val, int64_t map_size) {
auto entry = mmap_table_.find(store_fd_val);
if (entry != mmap_table_.end()) {
return entry->second.pointer;
return entry->second->pointer();
} else {
// We subtract kMmapRegionsGap from the length that was added
// in fake_mmap in malloc.h, to make map_size page-aligned again.
uint8_t* result = reinterpret_cast<uint8_t*>(mmap(
NULL, map_size - kMmapRegionsGap, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0));
// TODO(pcm): Don't fail here, instead return a Status.
if (result == MAP_FAILED) {
ARROW_LOG(FATAL) << "mmap failed";
}
close(fd); // Closing this fd has an effect on performance.

ClientMmapTableEntry& entry = mmap_table_[store_fd_val];
entry.fd = fd;
entry.pointer = result;
entry.length = map_size;
return result;
mmap_table_[store_fd_val] =
std::unique_ptr<ClientMmapTableEntry>(new ClientMmapTableEntry(fd, map_size));
return mmap_table_[store_fd_val]->pointer();
}
}

Expand All @@ -302,7 +337,7 @@ uint8_t* PlasmaClient::Impl::LookupOrMmap(int fd, int store_fd_val, int64_t map_
uint8_t* PlasmaClient::Impl::LookupMmappedFile(int store_fd_val) {
auto entry = mmap_table_.find(store_fd_val);
ARROW_CHECK(entry != mmap_table_.end());
return entry->second.pointer;
return entry->second->pointer();
}

bool PlasmaClient::Impl::IsInUse(const ObjectID& object_id) {
Expand All @@ -317,7 +352,7 @@ int PlasmaClient::Impl::GetStoreFd(int store_fd) {
ARROW_CHECK(fd >= 0) << "recv not successful";
return fd;
} else {
return entry->second.fd;
return entry->second->fd();
}
}

Expand Down Expand Up @@ -369,8 +404,9 @@ Status PlasmaClient::Impl::Create(const ObjectID& object_id, int64_t data_size,
ARROW_CHECK(object.metadata_size == metadata_size);
// The metadata should come right after the data.
ARROW_CHECK(object.metadata_offset == object.data_offset + data_size);
*data = std::make_shared<MutableBuffer>(
LookupOrMmap(fd, store_fd, mmap_size) + object.data_offset, data_size);
*data = std::make_shared<PlasmaMutableBuffer>(
shared_from_this(), LookupOrMmap(fd, store_fd, mmap_size) + object.data_offset,
data_size);
// If plasma_create is being called from a transfer, then we will not copy the
// metadata here. The metadata will be written along with the data streamed
// from the transfer.
Expand Down
1 change: 1 addition & 0 deletions cpp/src/plasma/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,7 @@ class ARROW_EXPORT PlasmaClient {

private:
friend class PlasmaBuffer;
friend class PlasmaMutableBuffer;
FRIEND_TEST(TestPlasmaStore, GetTest);
FRIEND_TEST(TestPlasmaStore, LegacyGetTest);
FRIEND_TEST(TestPlasmaStore, AbortTest);
Expand Down