Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 32 additions & 8 deletions cpp/src/plasma/store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ GetRequest::GetRequest(Client* client, const std::vector<ObjectID>& object_ids)
num_objects_to_wait_for = unique_ids.size();
}

Client::Client(int fd) : fd(fd) {}
Client::Client(int fd) : fd(fd), notification_fd(-1) {}

PlasmaStore::PlasmaStore(EventLoop* loop, int64_t system_memory, std::string directory,
bool hugepages_enabled)
Expand Down Expand Up @@ -559,10 +559,18 @@ void PlasmaStore::disconnect_client(int client_fd) {
remove_from_client_object_ids(entry, client);
}

// Note, the store may still attempt to send a message to the disconnected
// client (for example, when an object ID that the client was waiting for
// is ready). In these cases, the attempt to send the message will fail, but
// the store should just ignore the failure.
if (client->notification_fd > 0) {
// This client has subscribed for notifications.
auto notify_fd = client->notification_fd;
loop_->RemoveFileEvent(notify_fd);
// Close socket.
close(notify_fd);
// Remove notification queue for this fd from global map.
pending_notifications_.erase(notify_fd);
// Reset fd.
client->notification_fd = -1;
}

connected_clients_.erase(it);
}

Expand Down Expand Up @@ -642,9 +650,23 @@ void PlasmaStore::push_notification(ObjectInfoT* object_info) {
}
}

void PlasmaStore::push_notification(ObjectInfoT* object_info, int client_fd) {
auto it = pending_notifications_.find(client_fd);
if (it != pending_notifications_.end()) {
auto notification = create_object_info_buffer(object_info);
it->second.object_notifications.emplace_back(std::move(notification));
send_notifications(it);
}
}

// Subscribe to notifications about sealed objects.
void PlasmaStore::subscribe_to_updates(Client* client) {
ARROW_LOG(DEBUG) << "subscribing to updates on fd " << client->fd;
if (client->notification_fd > 0) {
// This client has already subscribed. Return.
return;
}

// TODO(rkn): The store could block here if the client doesn't send a file
// descriptor.
int fd = recv_fd(client->fd);
Expand All @@ -657,12 +679,14 @@ void PlasmaStore::subscribe_to_updates(Client* client) {

// Add this fd to global map, which is needed for this client to receive notifications.
pending_notifications_[fd];
client->notification_fd = fd;

// Push notifications to the new subscriber about existing objects.
// Push notifications to the new subscriber about existing sealed objects.
for (const auto& entry : store_info_.objects) {
push_notification(&entry.second->info);
if (entry.second->state == PLASMA_SEALED) {
push_notification(&entry.second->info, fd);
}
}
send_notifications(pending_notifications_.find(fd));
}

Status PlasmaStore::process_message(Client* client) {
Expand Down
6 changes: 6 additions & 0 deletions cpp/src/plasma/store.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ struct Client {

/// Object ids that are used by this client.
std::unordered_set<ObjectID> object_ids;

/// The file descriptor used to push notifications to client. This is only valid
/// if client subscribes to plasma store. -1 indicates invalid.
int notification_fd;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unrelated to the content of this patch: could we regularize the variable name convention in this header and elsewhere in Plasma? Would help with readability

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes that makes a lot of sense:)

};

class PlasmaStore {
Expand Down Expand Up @@ -170,6 +174,8 @@ class PlasmaStore {
private:
void push_notification(ObjectInfoT* object_notification);

void push_notification(ObjectInfoT* object_notification, int client_fd);

void add_to_client_object_ids(ObjectTableEntry* entry, Client* client);

void return_from_get(GetRequest* get_req);
Expand Down