Skip to content

Commit 8156e25

Browse files
zhijunfuwesm
authored andcommitted
ARROW-2551: [Plasma] Improve notification logic
This change targets to improve a few places in current plasma notification code: 1. When a client subscribes to Plasma, the store pushes notifications about existing objects to ALL subscribers, while it should only push to the new subscriber. 2. And in the above scenario, it should only push "sealed" objects to the new subscriber, while currently it pushes all objects regardless of the state. 3. When a client disconnects, it will no longer be able to receive notifications, thus the NotificationQueue for the client should be removed from global map. Author: Zhijun Fu <pingfu.fzj@antfin.com> Author: Zhijun Fu <zhijun.fu@outlook.com> Closes #2031 from zhijunfu/refactor-notification and squashes the following commits: 84f4935 <Zhijun Fu> Trigger fce35f1 <Zhijun Fu> Trigger b93ba0e <Zhijun Fu> Trigger f2377f8 <Zhijun Fu> fix code with rebase d50651e <Zhijun Fu> Address review comments 6a7f492 <Zhijun Fu> delete object notification queue for a client when it disconnects with plasma 57bbab3 <Zhijun Fu> plasma shouldn't push notifications to all subscribers when a new client subscribes
1 parent d5c5f39 commit 8156e25

File tree

2 files changed

+38
-8
lines changed

2 files changed

+38
-8
lines changed

cpp/src/plasma/store.cc

Lines changed: 32 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ GetRequest::GetRequest(Client* client, const std::vector<ObjectID>& object_ids)
103103
num_objects_to_wait_for = unique_ids.size();
104104
}
105105

106-
Client::Client(int fd) : fd(fd) {}
106+
Client::Client(int fd) : fd(fd), notification_fd(-1) {}
107107

108108
PlasmaStore::PlasmaStore(EventLoop* loop, int64_t system_memory, std::string directory,
109109
bool hugepages_enabled)
@@ -559,10 +559,18 @@ void PlasmaStore::disconnect_client(int client_fd) {
559559
remove_from_client_object_ids(entry, client);
560560
}
561561

562-
// Note, the store may still attempt to send a message to the disconnected
563-
// client (for example, when an object ID that the client was waiting for
564-
// is ready). In these cases, the attempt to send the message will fail, but
565-
// the store should just ignore the failure.
562+
if (client->notification_fd > 0) {
563+
// This client has subscribed for notifications.
564+
auto notify_fd = client->notification_fd;
565+
loop_->RemoveFileEvent(notify_fd);
566+
// Close socket.
567+
close(notify_fd);
568+
// Remove notification queue for this fd from global map.
569+
pending_notifications_.erase(notify_fd);
570+
// Reset fd.
571+
client->notification_fd = -1;
572+
}
573+
566574
connected_clients_.erase(it);
567575
}
568576

@@ -642,9 +650,23 @@ void PlasmaStore::push_notification(ObjectInfoT* object_info) {
642650
}
643651
}
644652

653+
void PlasmaStore::push_notification(ObjectInfoT* object_info, int client_fd) {
654+
auto it = pending_notifications_.find(client_fd);
655+
if (it != pending_notifications_.end()) {
656+
auto notification = create_object_info_buffer(object_info);
657+
it->second.object_notifications.emplace_back(std::move(notification));
658+
send_notifications(it);
659+
}
660+
}
661+
645662
// Subscribe to notifications about sealed objects.
646663
void PlasmaStore::subscribe_to_updates(Client* client) {
647664
ARROW_LOG(DEBUG) << "subscribing to updates on fd " << client->fd;
665+
if (client->notification_fd > 0) {
666+
// This client has already subscribed. Return.
667+
return;
668+
}
669+
648670
// TODO(rkn): The store could block here if the client doesn't send a file
649671
// descriptor.
650672
int fd = recv_fd(client->fd);
@@ -657,12 +679,14 @@ void PlasmaStore::subscribe_to_updates(Client* client) {
657679

658680
// Add this fd to global map, which is needed for this client to receive notifications.
659681
pending_notifications_[fd];
682+
client->notification_fd = fd;
660683

661-
// Push notifications to the new subscriber about existing objects.
684+
// Push notifications to the new subscriber about existing sealed objects.
662685
for (const auto& entry : store_info_.objects) {
663-
push_notification(&entry.second->info);
686+
if (entry.second->state == PLASMA_SEALED) {
687+
push_notification(&entry.second->info, fd);
688+
}
664689
}
665-
send_notifications(pending_notifications_.find(fd));
666690
}
667691

668692
Status PlasmaStore::process_message(Client* client) {

cpp/src/plasma/store.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,10 @@ struct Client {
5050

5151
/// Object ids that are used by this client.
5252
std::unordered_set<ObjectID> object_ids;
53+
54+
/// The file descriptor used to push notifications to client. This is only valid
55+
/// if client subscribes to plasma store. -1 indicates invalid.
56+
int notification_fd;
5357
};
5458

5559
class PlasmaStore {
@@ -170,6 +174,8 @@ class PlasmaStore {
170174
private:
171175
void push_notification(ObjectInfoT* object_notification);
172176

177+
void push_notification(ObjectInfoT* object_notification, int client_fd);
178+
173179
void add_to_client_object_ids(ObjectTableEntry* entry, Client* client);
174180

175181
void return_from_get(GetRequest* get_req);

0 commit comments

Comments
 (0)