Skip to content

Commit

Permalink
[Core] Remove 'PlasmaBuffer' in the buffer header (ray-project#13188)
Browse files Browse the repository at this point in the history
  • Loading branch information
suquark authored Jan 20, 2021
1 parent b796de4 commit a09997d
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 83 deletions.
32 changes: 1 addition & 31 deletions src/ray/common/buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ class SharedMemoryBuffer : public Buffer {

size_t Size() const override { return size_; }

bool OwnsData() const override { return false; }
bool OwnsData() const override { return true; }

bool IsPlasmaBuffer() const override { return true; }

Expand All @@ -172,34 +172,4 @@ class SharedMemoryBuffer : public Buffer {
std::shared_ptr<Buffer> parent_;
};

/// Represents a byte buffer for plasma object. This can be used to hold the
/// reference to a plasma object (via the underlying plasma::PlasmaBuffer).
class PlasmaBuffer : public Buffer {
public:
PlasmaBuffer(std::shared_ptr<Buffer> buffer,
std::function<void(PlasmaBuffer *)> on_delete = nullptr)
: buffer_(buffer), on_delete_(on_delete) {}

uint8_t *Data() const override { return buffer_->Data(); }

size_t Size() const override { return buffer_->Size(); }

bool OwnsData() const override { return true; }

bool IsPlasmaBuffer() const override { return true; }

~PlasmaBuffer() {
if (on_delete_ != nullptr) {
on_delete_(this);
}
};

private:
/// shared_ptr to a buffer which can potentially hold a reference
/// for the object (when it's a plasma::PlasmaBuffer).
std::shared_ptr<Buffer> buffer_;
/// Callback to run on destruction.
std::function<void(PlasmaBuffer *)> on_delete_;
};

} // namespace ray
72 changes: 39 additions & 33 deletions src/ray/core_worker/store_provider/plasma_store_provider.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,36 @@

namespace ray {

void BufferTracker::Record(const ObjectID &object_id, TrackedBuffer *buffer,
const std::string &call_site) {
absl::MutexLock lock(&active_buffers_mutex_);
active_buffers_[std::make_pair(object_id, buffer)] = call_site;
}

void BufferTracker::Release(const ObjectID &object_id, TrackedBuffer *buffer) {
absl::MutexLock lock(&active_buffers_mutex_);
auto key = std::make_pair(object_id, buffer);
RAY_CHECK(active_buffers_.contains(key));
active_buffers_.erase(key);
}

absl::flat_hash_map<ObjectID, std::pair<int64_t, std::string>>
BufferTracker::UsedObjects() const {
absl::flat_hash_map<ObjectID, std::pair<int64_t, std::string>> used;
absl::MutexLock lock(&active_buffers_mutex_);
for (const auto &entry : active_buffers_) {
auto it = used.find(entry.first.first);
if (it != used.end()) {
// Prefer to keep entries that have non-empty callsites.
if (!it->second.second.empty()) {
continue;
}
}
used[entry.first.first] = std::make_pair(entry.first.second->Size(), entry.second);
}
return used;
}

CoreWorkerPlasmaStoreProvider::CoreWorkerPlasmaStoreProvider(
const std::string &store_socket,
const std::shared_ptr<raylet::RayletClient> raylet_client,
Expand Down Expand Up @@ -85,13 +115,12 @@ Status CoreWorkerPlasmaStoreProvider::Create(const std::shared_ptr<Buffer> &meta
const rpc::Address &owner_address,
std::shared_ptr<Buffer> *data) {
Status status;
std::shared_ptr<Buffer> plasma_buffer;
uint64_t retry_with_request_id = 0;
{
std::lock_guard<std::mutex> guard(store_client_mutex_);
status = store_client_.Create(
object_id, owner_address, data_size, metadata ? metadata->Data() : nullptr,
metadata ? metadata->Size() : 0, &retry_with_request_id, &plasma_buffer,
metadata ? metadata->Size() : 0, &retry_with_request_id, data,
/*device_num=*/0);
}

Expand All @@ -104,7 +133,7 @@ Status CoreWorkerPlasmaStoreProvider::Create(const std::shared_ptr<Buffer> &meta
<< retry_with_request_id;
status = store_client_.RetryCreate(object_id, retry_with_request_id,
metadata ? metadata->Data() : nullptr,
&retry_with_request_id, &plasma_buffer);
&retry_with_request_id, data);
}
}

Expand All @@ -129,7 +158,6 @@ Status CoreWorkerPlasmaStoreProvider::Create(const std::shared_ptr<Buffer> &meta
status = Status::OK();
} else {
RAY_RETURN_NOT_OK(status);
*data = std::make_shared<PlasmaBuffer>(PlasmaBuffer(plasma_buffer));
}
return status;
}
Expand Down Expand Up @@ -171,27 +199,17 @@ Status CoreWorkerPlasmaStoreProvider::FetchAndGetFromPlasmaStore(
for (size_t i = 0; i < plasma_results.size(); i++) {
if (plasma_results[i].data != nullptr || plasma_results[i].metadata != nullptr) {
const auto &object_id = batch_ids[i];
std::shared_ptr<PlasmaBuffer> data = nullptr;
std::shared_ptr<PlasmaBuffer> metadata = nullptr;
std::shared_ptr<TrackedBuffer> data = nullptr;
std::shared_ptr<Buffer> metadata = nullptr;
if (plasma_results[i].data && plasma_results[i].data->Size()) {
// We track the set of active data buffers in active_buffers_. On destruction,
// the buffer entry will be removed from the set via callback.
std::shared_ptr<BufferTracker> tracker = buffer_tracker_;
data = std::make_shared<PlasmaBuffer>(
plasma_results[i].data, [tracker, object_id](PlasmaBuffer *this_buffer) {
absl::MutexLock lock(&tracker->active_buffers_mutex_);
auto key = std::make_pair(object_id, this_buffer);
RAY_CHECK(tracker->active_buffers_.contains(key));
tracker->active_buffers_.erase(key);
});
auto call_site = get_current_call_site_();
{
absl::MutexLock lock(&tracker->active_buffers_mutex_);
tracker->active_buffers_[std::make_pair(object_id, data.get())] = call_site;
}
data = std::make_shared<TrackedBuffer>(plasma_results[i].data, buffer_tracker_,
object_id);
buffer_tracker_->Record(object_id, data.get(), get_current_call_site_());
}
if (plasma_results[i].metadata && plasma_results[i].metadata->Size()) {
metadata = std::make_shared<PlasmaBuffer>(plasma_results[i].metadata);
metadata = plasma_results[i].metadata;
}
const auto result_object =
std::make_shared<RayObject>(data, metadata, std::vector<ObjectID>());
Expand Down Expand Up @@ -373,19 +391,7 @@ std::string CoreWorkerPlasmaStoreProvider::MemoryUsageString() {

absl::flat_hash_map<ObjectID, std::pair<int64_t, std::string>>
CoreWorkerPlasmaStoreProvider::UsedObjectsList() const {
absl::flat_hash_map<ObjectID, std::pair<int64_t, std::string>> used;
absl::MutexLock lock(&buffer_tracker_->active_buffers_mutex_);
for (const auto &entry : buffer_tracker_->active_buffers_) {
auto it = used.find(entry.first.first);
if (it != used.end()) {
// Prefer to keep entries that have non-empty callsites.
if (!it->second.second.empty()) {
continue;
}
}
used[entry.first.first] = std::make_pair(entry.first.second->Size(), entry.second);
}
return used;
return buffer_tracker_->UsedObjects();
}

void CoreWorkerPlasmaStoreProvider::WarnIfAttemptedTooManyTimes(
Expand Down
66 changes: 51 additions & 15 deletions src/ray/core_worker/store_provider/plasma_store_provider.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,57 @@

namespace ray {

class TrackedBuffer;

// Active buffers tracker. This must be allocated as a separate structure since its
// lifetime can exceed that of the store provider due to TrackedBuffer.
class BufferTracker {
public:
// Track an object.
void Record(const ObjectID &object_id, TrackedBuffer *buffer,
const std::string &call_site);
// Release an object from tracking.
void Release(const ObjectID &object_id, TrackedBuffer *buffer);
// List tracked objects.
absl::flat_hash_map<ObjectID, std::pair<int64_t, std::string>> UsedObjects() const;

private:
// Guards the active buffers map. This mutex may be acquired during TrackedBuffer
// destruction.
mutable absl::Mutex active_buffers_mutex_;
// Mapping of live object buffers to their creation call site. Destroyed buffers are
// automatically removed from this list via destructor. The map key uniquely
// identifies a buffer. It should not be a shared ptr since that would keep the Buffer
// alive forever (i.e., this is a weak ref map).
absl::flat_hash_map<std::pair<ObjectID, TrackedBuffer *>, std::string> active_buffers_
GUARDED_BY(active_buffers_mutex_);
};

/// This can be used to hold the reference to a buffer.
class TrackedBuffer : public Buffer {
public:
TrackedBuffer(std::shared_ptr<Buffer> buffer,
const std::shared_ptr<BufferTracker> &tracker, const ObjectID &object_id)
: buffer_(buffer), tracker_(tracker), object_id_(object_id) {}

uint8_t *Data() const override { return buffer_->Data(); }

size_t Size() const override { return buffer_->Size(); }

bool OwnsData() const override { return true; }

bool IsPlasmaBuffer() const override { return true; }

~TrackedBuffer() { tracker_->Release(object_id_, this); }

private:
/// shared_ptr to a buffer which can potentially hold a reference
/// for the object (when it's a SharedMemoryBuffer).
std::shared_ptr<Buffer> buffer_;
std::shared_ptr<BufferTracker> tracker_;
ObjectID object_id_;
};

/// The class provides implementations for accessing plasma store, which includes both
/// local and remote stores. Local access goes is done via a
/// CoreWorkerLocalPlasmaStoreProvider and remote access goes through the raylet.
Expand Down Expand Up @@ -154,21 +205,6 @@ class CoreWorkerPlasmaStoreProvider {
std::function<Status()> check_signals_;
std::function<std::string()> get_current_call_site_;
uint32_t object_store_full_delay_ms_;

// Active buffers tracker. This must be allocated as a separate structure since its
// lifetime can exceed that of the store provider due to callback references.
struct BufferTracker {
// Guards the active buffers map. This mutex may be acquired during PlasmaBuffer
// destruction.
mutable absl::Mutex active_buffers_mutex_;
// Mapping of live object buffers to their creation call site. Destroyed buffers are
// automatically removed from this list via destructor callback. The map key uniquely
// identifies a buffer. It should not be a shared ptr since that would keep the Buffer
// alive forever (i.e., this is a weak ref map).
absl::flat_hash_map<std::pair<ObjectID, PlasmaBuffer *>, std::string> active_buffers_
GUARDED_BY(active_buffers_mutex_);
};

// Pointer to the shared buffer tracker.
std::shared_ptr<BufferTracker> buffer_tracker_;
};
Expand Down
2 changes: 1 addition & 1 deletion src/ray/core_worker/test/core_worker_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -820,7 +820,7 @@ TEST_F(SingleNodeTest, TestObjectInterface) {
ASSERT_EQ(wait_results, std::vector<bool>({true, true, false}));

// Test Delete().
// clear the reference held by PlasmaBuffer.
// clear the reference held by TrackedBuffer.
results.clear();
RAY_CHECK_OK(core_worker.Delete(ids, true));

Expand Down
5 changes: 2 additions & 3 deletions src/ray/raylet/node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2368,9 +2368,8 @@ void NodeManager::HandlePinObjectIDs(const rpc::PinObjectIDsRequest &request,
if (plasma_results[i].data == nullptr) {
objects.push_back(nullptr);
} else {
objects.emplace_back(std::unique_ptr<RayObject>(new RayObject(
std::make_shared<PlasmaBuffer>(plasma_results[i].data),
std::make_shared<PlasmaBuffer>(plasma_results[i].metadata), {})));
objects.emplace_back(std::unique_ptr<RayObject>(
new RayObject(plasma_results[i].data, plasma_results[i].metadata, {})));
}
}
local_object_manager_.PinObjects(object_ids, std::move(objects));
Expand Down

0 comments on commit a09997d

Please sign in to comment.