Skip to content

Commit

Permalink
Implemented wait_requests as vector (ray-project#943)
Browse files Browse the repository at this point in the history
  • Loading branch information
pschafhalter authored and robertnishihara committed Sep 8, 2017
1 parent 9538783 commit 8906a92
Showing 1 changed file with 16 additions and 28 deletions.
44 changes: 16 additions & 28 deletions src/plasma/plasma_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@

/* C++ includes. */
#include <unordered_map>
#include <vector>

#include "uthash.h"
#include "utlist.h"
#include "utarray.h"
#include "utstring.h"
#include "common_protocol.h"
#include "io.h"
Expand Down Expand Up @@ -216,15 +216,11 @@ struct WaitRequest {
int64_t num_satisfied;
};

/** This is used to define the utarray of wait requests in the
* ObjectWaitRequests struct. */
UT_icd wait_request_icd = {sizeof(WaitRequest *), NULL, NULL, NULL};

typedef struct {
/** The ID of the object. This is used as a key in a hash table. */
ObjectID object_id;
/** An array of the wait requests involving this object ID. */
UT_array *wait_requests;
std::vector<WaitRequest *> wait_requests;
/** Handle for the uthash table in the manager state that keeps track of the
* wait requests involving this object ID. */
UT_hash_handle hh;
Expand Down Expand Up @@ -383,16 +379,14 @@ void add_wait_request_for_object(PlasmaManagerState *manager_state,
* new ObjectWaitRequests struct for this object ID and add it to the hash
* table. */
if (object_wait_reqs == NULL) {
object_wait_reqs =
(ObjectWaitRequests *) malloc(sizeof(ObjectWaitRequests));
object_wait_reqs = new ObjectWaitRequests();
object_wait_reqs->object_id = object_id;
utarray_new(object_wait_reqs->wait_requests, &wait_request_icd);
HASH_ADD(hh, *object_wait_requests_table_ptr, object_id,
sizeof(object_wait_reqs->object_id), object_wait_reqs);
}
/* Add this wait request to the vector of wait requests involving this object
* ID. */
utarray_push_back(object_wait_reqs->wait_requests, &wait_req);
object_wait_reqs->wait_requests.push_back(wait_req);
}

void remove_wait_request_for_object(PlasmaManagerState *manager_state,
Expand All @@ -408,12 +402,11 @@ void remove_wait_request_for_object(PlasmaManagerState *manager_state,
* vector contains the wait request, then remove the wait request from the
* vector. */
if (object_wait_reqs != NULL) {
for (int i = 0; i < utarray_len(object_wait_reqs->wait_requests); ++i) {
WaitRequest **wait_req_ptr =
(WaitRequest **) utarray_eltptr(object_wait_reqs->wait_requests, i);
if (*wait_req_ptr == wait_req) {
for (int i = 0; i < object_wait_reqs->wait_requests.size(); ++i) {
if (object_wait_reqs->wait_requests[i] == wait_req) {
/* Remove the wait request from the array. */
utarray_erase(object_wait_reqs->wait_requests, i, 1);
object_wait_reqs->wait_requests.erase(
object_wait_reqs->wait_requests.begin() + i);
break;
}
}
Expand Down Expand Up @@ -461,18 +454,16 @@ void update_object_wait_requests(PlasmaManagerState *manager_state,
HASH_FIND(hh, *object_wait_requests_table_ptr, &obj_id, sizeof(obj_id),
object_wait_reqs);
if (object_wait_reqs != NULL) {
/* We compute the number of requests first because the length of the utarray
/* We compute the number of requests first because the length of the vector
* will change as we iterate over it (because each call to return_from_wait
* will remove one element). */
int num_requests = utarray_len(object_wait_reqs->wait_requests);
/* The argument index is the index of the current element of the utarray
int num_requests = object_wait_reqs->wait_requests.size();
/* The argument index is the index of the current element of the vector
* that we are processing. It may differ from the counter i when elements
* are removed from the array. */
int index = 0;
for (int i = 0; i < num_requests; ++i) {
WaitRequest **wait_req_ptr = (WaitRequest **) utarray_eltptr(
object_wait_reqs->wait_requests, index);
WaitRequest *wait_req = *wait_req_ptr;
WaitRequest *wait_req = object_wait_reqs->wait_requests[index];
wait_req->num_satisfied += 1;
/* Mark the object as present in the wait request. */
auto object_request =
Expand All @@ -493,12 +484,11 @@ void update_object_wait_requests(PlasmaManagerState *manager_state,
index += 1;
}
}
DCHECK(index == utarray_len(object_wait_reqs->wait_requests));
DCHECK(index == object_wait_reqs->wait_requests.size());
/* Remove the array of wait requests for this object, since no one should be
* waiting for this object anymore. */
HASH_DELETE(hh, *object_wait_requests_table_ptr, object_wait_reqs);
utarray_free(object_wait_reqs->wait_requests);
free(object_wait_reqs);
delete object_wait_reqs;
}
}

Expand Down Expand Up @@ -612,13 +602,11 @@ void PlasmaManagerState_free(PlasmaManagerState *state) {
ObjectWaitRequests *wait_reqs, *tmp_wait_reqs;
HASH_ITER(hh, state->object_wait_requests_local, wait_reqs, tmp_wait_reqs) {
HASH_DELETE(hh, state->object_wait_requests_local, wait_reqs);
utarray_free(wait_reqs->wait_requests);
free(wait_reqs);
delete wait_reqs;
}
HASH_ITER(hh, state->object_wait_requests_remote, wait_reqs, tmp_wait_reqs) {
HASH_DELETE(hh, state->object_wait_requests_remote, wait_reqs);
utarray_free(wait_reqs->wait_requests);
free(wait_reqs);
delete wait_reqs;
}

ARROW_CHECK_OK(state->plasma_conn->Disconnect());
Expand Down

0 comments on commit 8906a92

Please sign in to comment.