Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Convert plasma client array and object notification queue to STL #482

Merged
merged 5 commits into from
Apr 24, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/plasma/plasma.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
/** Allocation granularity used in plasma for object allocation. */
#define BLOCK_SIZE 64

struct Client;

/**
* Object request data structure. Used in the plasma_wait_for_objects()
* argument.
Expand Down Expand Up @@ -111,7 +113,7 @@ typedef struct {
/** Pointer to the object data. Needed to free the object. */
uint8_t *pointer;
/** An array of the clients that are currently using this object. */
UT_array *clients;
std::vector<Client *> clients;
/** The state of the object, e.g., whether it is open or sealed. */
object_state state;
/** The digest of the object. Used to see if two objects are the same. */
Expand Down
63 changes: 27 additions & 36 deletions src/plasma/plasma_store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <limits.h>
#include <poll.h>

#include <deque>
#include <unordered_map>
#include <unordered_set>
#include <vector>
Expand Down Expand Up @@ -59,10 +60,6 @@ struct Client {
PlasmaStoreState *plasma_state;
};

/* This is used to define the array of clients used to define the
* object_table_entry type. */
UT_icd client_icd = {sizeof(Client *), NULL, NULL, NULL};

/* This is used to define the queue of object notifications for plasma
* subscribers. */
UT_icd object_info_icd = {sizeof(uint8_t *), NULL, NULL, NULL};
Expand All @@ -72,7 +69,7 @@ typedef struct {
int subscriber_fd;
/** The object notifications for clients. We notify the client about the
* objects in the order that the objects were sealed or deleted. */
UT_array *object_notifications;
std::deque<uint8_t *> *object_notifications;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about we make this not a pointer?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After the next PR where I make the pending_notifications an STL hashmap ;)

/** Handle for the uthash table. */
UT_hash_handle hh;
} NotificationQueue;
Expand Down Expand Up @@ -159,18 +156,16 @@ void PlasmaStoreState_free(PlasmaStoreState *state) {
* up to make the valgrind warnings go away. Objects that
* are still reachable are not cleaned up. */
for (const auto &it : state->plasma_store_info->objects) {
utarray_free(it.second->clients);
delete it.second;
}
NotificationQueue *queue, *temp_queue;
HASH_ITER(hh, state->pending_notifications, queue, temp_queue) {
for (int i = 0; i < utarray_len(queue->object_notifications); ++i) {
uint8_t **notification =
(uint8_t **) utarray_eltptr(queue->object_notifications, i);
uint8_t *data = *notification;
for (int i = 0; i < queue->object_notifications->size(); ++i) {
uint8_t *notification = (uint8_t *) queue->object_notifications->at(i);
uint8_t *data = notification;
free(data);
}
utarray_free(queue->object_notifications);
delete queue->object_notifications;
}
}

Expand All @@ -182,15 +177,14 @@ void push_notification(PlasmaStoreState *state,
void add_client_to_object_clients(object_table_entry *entry,
Client *client_info) {
/* Check if this client is already using the object. */
for (int i = 0; i < utarray_len(entry->clients); ++i) {
Client **c = (Client **) utarray_eltptr(entry->clients, i);
if (*c == client_info) {
for (int i = 0; i < entry->clients.size(); ++i) {
if (entry->clients[i] == client_info) {
return;
}
}
/* If there are no other clients using this object, notify the eviction policy
* that the object is being used. */
if (utarray_len(entry->clients) == 0) {
if (entry->clients.size() == 0) {
/* Tell the eviction policy that this object is being used. */
int64_t num_objects_to_evict;
ObjectID *objects_to_evict;
Expand All @@ -202,7 +196,7 @@ void add_client_to_object_clients(object_table_entry *entry,
objects_to_evict);
}
/* Add the client pointer to the list of clients using this object. */
utarray_push_back(entry->clients, &client_info);
entry->clients.push_back(client_info);
}

/* Create a new object buffer in the hash table. */
Expand Down Expand Up @@ -262,7 +256,6 @@ int create_object(Client *client_context,
entry->map_size = map_size;
entry->offset = offset;
entry->state = PLASMA_CREATED;
utarray_new(entry->clients, &client_icd);
plasma_state->plasma_store_info->objects[obj_id] = entry;
result->handle.store_fd = fd;
result->handle.mmap_size = map_size;
Expand Down Expand Up @@ -453,14 +446,13 @@ void process_get_request(Client *client_context,
int remove_client_from_object_clients(object_table_entry *entry,
Client *client_info) {
/* Find the location of the client in the array. */
for (int i = 0; i < utarray_len(entry->clients); ++i) {
Client **c = (Client **) utarray_eltptr(entry->clients, i);
if (*c == client_info) {
for (int i = 0; i < entry->clients.size(); ++i) {
if (entry->clients[i] == client_info) {
/* Remove the client from the array. */
utarray_erase(entry->clients, i, 1);
entry->clients.erase(entry->clients.begin() + i);
/* If no more clients are using this object, notify the eviction policy
* that the object is no longer being used. */
if (utarray_len(entry->clients) == 0) {
if (entry->clients.size() == 0) {
/* Tell the eviction policy that this object is no longer being used. */
int64_t num_objects_to_evict;
ObjectID *objects_to_evict;
Expand Down Expand Up @@ -530,11 +522,10 @@ void delete_object(PlasmaStoreState *plasma_state, ObjectID object_id) {
CHECKM(entry != NULL, "To delete an object it must be in the object table.");
CHECKM(entry->state == PLASMA_SEALED,
"To delete an object it must have been sealed.");
CHECKM(utarray_len(entry->clients) == 0,
CHECKM(entry->clients.size() == 0,
"To delete an object, there must be no clients currently using it.");
uint8_t *pointer = entry->pointer;
dlfree(pointer);
utarray_free(entry->clients);
plasma_state->plasma_store_info->objects.erase(object_id);
delete entry;
/* Inform all subscribers that the object has been deleted. */
Expand Down Expand Up @@ -563,7 +554,7 @@ void push_notification(PlasmaStoreState *plasma_state,
NotificationQueue *queue, *temp_queue;
HASH_ITER(hh, plasma_state->pending_notifications, queue, temp_queue) {
uint8_t *notification = create_object_info_buffer(object_info);
utarray_push_back(queue->object_notifications, &notification);
queue->object_notifications->push_back(notification);
send_notifications(plasma_state->loop, queue->subscriber_fd, plasma_state,
0);
/* The notification gets freed in send_notifications when the notification
Expand All @@ -585,15 +576,13 @@ void send_notifications(event_loop *loop,
bool closed = false;
/* Loop over the array of pending notifications and send as many of them as
* possible. */
for (int i = 0; i < utarray_len(queue->object_notifications); ++i) {
uint8_t **notification =
(uint8_t **) utarray_eltptr(queue->object_notifications, i);
uint8_t *data = *notification;
for (int i = 0; i < queue->object_notifications->size(); ++i) {
uint8_t *notification = (uint8_t *) queue->object_notifications->at(i);
/* Decode the length, which is the first bytes of the message. */
int64_t size = *((int64_t *) data);
int64_t size = *((int64_t *) notification);

/* Attempt to send a notification about this object ID. */
int nbytes = send(client_sock, data, sizeof(int64_t) + size, 0);
int nbytes = send(client_sock, notification, sizeof(int64_t) + size, 0);
if (nbytes >= 0) {
CHECK(nbytes == sizeof(int64_t) + size);
} else if (nbytes == -1 &&
Expand All @@ -618,21 +607,23 @@ void send_notifications(event_loop *loop,
num_processed += 1;
/* The corresponding malloc happened in create_object_info_buffer
* within push_notification. */
free(data);
free(notification);
}
/* Remove the sent notifications from the array. */
utarray_erase(queue->object_notifications, 0, num_processed);
queue->object_notifications->erase(
queue->object_notifications->begin(),
queue->object_notifications->begin() + num_processed);

/* Stop sending notifications if the pipe was broken. */
if (closed) {
close(client_sock);
utarray_free(queue->object_notifications);
delete queue->object_notifications;
HASH_DEL(plasma_state->pending_notifications, queue);
free(queue);
}

/* If we have sent all notifications, remove the fd from the event loop. */
if (utarray_len(queue->object_notifications) == 0) {
if (queue->object_notifications->empty()) {
event_loop_remove_file(loop, client_sock);
}
}
Expand All @@ -656,7 +647,7 @@ void subscribe_to_updates(Client *client_context, int conn) {
NotificationQueue *queue =
(NotificationQueue *) malloc(sizeof(NotificationQueue));
queue->subscriber_fd = fd;
utarray_new(queue->object_notifications, &object_info_icd);
queue->object_notifications = new std::deque<uint8_t *>();
HASH_ADD_INT(plasma_state->pending_notifications, subscriber_fd, queue);

/* Push notifications to the new subscriber about existing objects. */
Expand Down