Skip to content

Commit

Permalink
Dynamically grow worker pool to partially solve hanging workloads (ra…
Browse files Browse the repository at this point in the history
…y-project#286)

* First pass at a policy to solve deadlock

* Address Robert's comments

* stress test

* unit test

* Fix test cases

* Fix test for python3

* add more logging

* White space.
  • Loading branch information
stephanie-wang authored and robertnishihara committed Feb 18, 2017
1 parent 0bbf08a commit a0dd3a4
Show file tree
Hide file tree
Showing 11 changed files with 393 additions and 38 deletions.
6 changes: 6 additions & 0 deletions python/ray/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -471,6 +471,7 @@ def get_object(self, object_ids):
# their original index in the object_ids argument.
unready_ids = dict((object_id, i) for (i, (object_id, val)) in
enumerate(final_results) if val is None)
was_blocked = (len(unready_ids) > 0)
# Try reconstructing any objects we haven't gotten yet. Try to get them
# until GET_TIMEOUT_MILLISECONDS milliseconds passes, then repeat.
while len(unready_ids) > 0:
Expand All @@ -487,6 +488,11 @@ def get_object(self, object_ids):
final_results[index] = (object_id, val)
unready_ids.pop(object_id)

# If there were objects that we weren't able to get locally, let the local
# scheduler know that we're now unblocked.
if was_blocked:
self.photon_client.notify_unblocked()

# Unwrap the object from the list (it was wrapped put_object).
assert len(final_results) == len(object_ids)
for i in range(len(final_results)):
Expand Down
8 changes: 7 additions & 1 deletion src/photon/photon.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@ enum photon_message_type {
EVENT_LOG_MESSAGE,
/** Send an initial connection message to the local scheduler.
* This contains the worker's process ID and actor ID. */
REGISTER_WORKER_INFO
REGISTER_WORKER_INFO,
/** For a worker that was blocked on some object(s), tell the local scheduler
* that the worker is now unblocked. */
NOTIFY_UNBLOCKED,
};

/* These are needed to define the UT_arrays. */
Expand Down Expand Up @@ -112,6 +115,9 @@ typedef struct {
* no task is running on the worker, this will be NULL. This is used to
* update the task table. */
task *task_in_progress;
/** A flag to indicate whether this worker is currently blocking on an
* object(s) that isn't available locally yet. */
bool is_blocked;
/** The process ID of the client. If this is set to zero, the client has not
* yet registered a process ID. */
pid_t pid;
Expand Down
160 changes: 151 additions & 9 deletions src/photon/photon_algorithm.c
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,19 @@ struct scheduling_algorithm_state {
* about a new local scheduler arrives, we will resubmit all of these tasks
* locally. */
UT_array *cached_submitted_actor_tasks;
/** An array of worker indices corresponding to clients that are
* waiting for tasks. */
/** An array of pointers to workers in the worker pool. These are workers
* that have registered a PID with us and that are now waiting to be
* assigned a task to execute. */
UT_array *available_workers;
/** An array of pointers to workers that are currently executing a task,
* unblocked. These are the workers that are leasing some number of
* resources. */
UT_array *executing_workers;
/** An array of pointers to workers that are currently executing a task,
* blocked on some object(s) that isn't available locally yet. These are the
* workers that are executing a task, but that have temporarily returned the
* task's required resources. */
UT_array *blocked_workers;
/** A hash map of the objects that are available in the local Plasma store.
* The key is the object ID. This information could be a little stale. */
object_entry *local_objects;
Expand All @@ -107,9 +117,13 @@ scheduling_algorithm_state *make_scheduling_algorithm_state(void) {
/* Initialize the local data structures used for queuing tasks and workers. */
algorithm_state->waiting_task_queue = NULL;
algorithm_state->dispatch_task_queue = NULL;
utarray_new(algorithm_state->available_workers, &worker_icd);

utarray_new(algorithm_state->cached_submitted_actor_tasks, &task_spec_icd);
algorithm_state->local_actor_infos = NULL;

utarray_new(algorithm_state->available_workers, &worker_icd);
utarray_new(algorithm_state->executing_workers, &worker_icd);
utarray_new(algorithm_state->blocked_workers, &worker_icd);
return algorithm_state;
}

Expand Down Expand Up @@ -146,6 +160,8 @@ void free_scheduling_algorithm_state(
utarray_free(algorithm_state->cached_submitted_actor_tasks);
/* Free the list of available workers. */
utarray_free(algorithm_state->available_workers);
utarray_free(algorithm_state->executing_workers);
utarray_free(algorithm_state->blocked_workers);
/* Free the cached information about which objects are present locally. */
object_entry *obj_entry, *tmp_obj_entry;
HASH_ITER(hh, algorithm_state->local_objects, obj_entry, tmp_obj_entry) {
Expand Down Expand Up @@ -556,10 +572,6 @@ void dispatch_tasks(local_scheduler_state *state,

/* Assign as many tasks as we can, while there are workers available. */
DL_FOREACH_SAFE(algorithm_state->dispatch_task_queue, elt, tmp) {
if (utarray_len(algorithm_state->available_workers) <= 0) {
/* There are no more available workers, so we're done. */
break;
}
/* TODO(atumanov): as an optimization, we can also check if all dynamic
* capacity is zero and bail early. */
bool task_satisfied = true;
Expand All @@ -574,17 +586,32 @@ void dispatch_tasks(local_scheduler_state *state,
if (!task_satisfied) {
continue; /* Proceed to the next task. */
}

/* If there is a task to assign, but there are no more available workers in
* the worker pool, then exit. Ensure that there will be an available
* worker during a future invocation of dispatch_tasks. */
if (utarray_len(algorithm_state->available_workers) == 0) {
if (utarray_len(state->child_pids) == 0) {
/* If there are no workers, including those pending PID registration,
* then we must start a new one to replenish the worker pool. */
start_worker(state, NIL_ACTOR_ID);
}
break;
}

/* Dispatch this task to an available worker and dequeue the task. */
LOG_DEBUG("Dispatching task");
/* Get the last available worker in the available worker queue. */
local_scheduler_client **worker = (local_scheduler_client **) utarray_back(
algorithm_state->available_workers);
/* Tell the available worker to execute the task. */
assign_task_to_worker(state, elt->spec, *worker);
/* Remove the available worker from the queue and free the struct. */
/* Remove the worker from the available queue, and add it to the executing
* workers. */
utarray_pop_back(algorithm_state->available_workers);
utarray_push_back(algorithm_state->executing_workers, worker);
/* Dequeue the task and free the struct. */
print_resource_info(state, elt->spec);
/* Deque the task. */
DL_DELETE(algorithm_state->dispatch_task_queue, elt);
free_task_spec(elt->spec);
free(elt);
Expand Down Expand Up @@ -923,12 +950,37 @@ void handle_worker_available(local_scheduler_state *state,
scheduling_algorithm_state *algorithm_state,
local_scheduler_client *worker) {
CHECK(worker->task_in_progress == NULL);
/* Check that the worker isn't in the pool of available workers. */
for (local_scheduler_client **p = (local_scheduler_client **) utarray_front(
algorithm_state->available_workers);
p != NULL; p = (local_scheduler_client **) utarray_next(
algorithm_state->available_workers, p)) {
DCHECK(*p != worker);
}
/* Check that the worker isn't in the list of blocked workers. */
for (local_scheduler_client **p = (local_scheduler_client **) utarray_front(
algorithm_state->blocked_workers);
p != NULL; p = (local_scheduler_client **) utarray_next(
algorithm_state->blocked_workers, p)) {
DCHECK(*p != worker);
}
/* If the worker was executing a task, it must have finished, so remove it
* from the list of executing workers. */
for (int i = 0; i < utarray_len(algorithm_state->executing_workers); ++i) {
local_scheduler_client **p = (local_scheduler_client **) utarray_eltptr(
algorithm_state->executing_workers, i);
if (*p == worker) {
utarray_erase(algorithm_state->executing_workers, i, 1);
break;
}
}
/* Check that we actually erased the worker. */
for (int i = 0; i < utarray_len(algorithm_state->executing_workers); ++i) {
local_scheduler_client **p = (local_scheduler_client **) utarray_eltptr(
algorithm_state->executing_workers, i);
DCHECK(*p != worker);
}

/* Add worker to the list of available workers. */
utarray_push_back(algorithm_state->available_workers, &worker);

Expand All @@ -954,6 +1006,88 @@ void handle_actor_worker_available(local_scheduler_state *state,
dispatch_actor_task(state, algorithm_state, actor_id);
}

void handle_worker_blocked(local_scheduler_state *state,
scheduling_algorithm_state *algorithm_state,
local_scheduler_client *worker) {
/* Find the worker in the list of executing workers. */
for (int i = 0; i < utarray_len(algorithm_state->executing_workers); ++i) {
local_scheduler_client **p = (local_scheduler_client **) utarray_eltptr(
algorithm_state->executing_workers, i);
if (*p == worker) {
/* Remove the worker from the list of executing workers. */
utarray_erase(algorithm_state->executing_workers, i, 1);

/* Check that the worker isn't in the list of blocked workers. */
for (local_scheduler_client **q =
(local_scheduler_client **) utarray_front(
algorithm_state->blocked_workers);
q != NULL; q = (local_scheduler_client **) utarray_next(
algorithm_state->blocked_workers, q)) {
DCHECK(*q != worker);
}

/* Return the resources that the blocked worker was using. */
CHECK(worker->task_in_progress != NULL);
task_spec *spec = task_task_spec(worker->task_in_progress);
update_dynamic_resources(state, spec, true);
/* Add the worker to the list of blocked workers. */
worker->is_blocked = true;
utarray_push_back(algorithm_state->blocked_workers, &worker);

return;
}
}

/* The worker should have been in the list of executing workers, so this line
* should be unreachable. */
LOG_FATAL(
"Worker registered as blocked, but it was not in the list of executing "
"workers.");
}

void handle_worker_unblocked(local_scheduler_state *state,
scheduling_algorithm_state *algorithm_state,
local_scheduler_client *worker) {
/* Find the worker in the list of blocked workers. */
for (int i = 0; i < utarray_len(algorithm_state->blocked_workers); ++i) {
local_scheduler_client **p = (local_scheduler_client **) utarray_eltptr(
algorithm_state->blocked_workers, i);
if (*p == worker) {
/* Remove the worker from the list of blocked workers. */
utarray_erase(algorithm_state->blocked_workers, i, 1);

/* Check that the worker isn't in the list of executing workers. */
for (local_scheduler_client **q =
(local_scheduler_client **) utarray_front(
algorithm_state->executing_workers);
q != NULL; q = (local_scheduler_client **) utarray_next(
algorithm_state->executing_workers, q)) {
DCHECK(*q != worker);
}

/* Lease back the resources that the blocked worker will need. */
/* TODO(swang): Leasing back the resources to blocked workers can cause
* us to transiently exceed the maximum number of resources. This can be
* fixed by having blocked workers explicitly yield and wait to be given
* back resources before continuing execution. */
CHECK(worker->task_in_progress != NULL);
task_spec *spec = task_task_spec(worker->task_in_progress);
update_dynamic_resources(state, spec, false);
/* Add the worker to the list of executing workers. */
worker->is_blocked = false;
utarray_push_back(algorithm_state->executing_workers, &worker);

return;
}
}

/* The worker should have been in the list of blocked workers, so this line
* should be unreachable. */
LOG_FATAL(
"Worker registered as unblocked, but it was not in the list of blocked "
"workers.");
}

void handle_object_available(local_scheduler_state *state,
scheduling_algorithm_state *algorithm_state,
object_id object_id) {
Expand Down Expand Up @@ -1064,3 +1198,11 @@ int num_dispatch_tasks(scheduling_algorithm_state *algorithm_state) {
DL_COUNT(algorithm_state->dispatch_task_queue, elt, count);
return count;
}

void print_worker_info(const char *message,
scheduling_algorithm_state *algorithm_state) {
LOG_DEBUG("%s: %d available, %d executing, %d blocked", message,
utarray_len(algorithm_state->available_workers),
utarray_len(algorithm_state->executing_workers),
utarray_len(algorithm_state->blocked_workers));
}
39 changes: 38 additions & 1 deletion src/photon/photon_algorithm.h
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ void handle_object_available(local_scheduler_state *state,
void handle_object_removed(local_scheduler_state *state, object_id object_id);

/**
* This function is called when a new worker becomes available
* This function is called when a new worker becomes available.
*
* @param state The state of the local scheduler.
* @param algorithm_state State maintained by the scheduling algorithm.
Expand Down Expand Up @@ -189,6 +189,32 @@ void handle_actor_worker_disconnect(local_scheduler_state *state,
scheduling_algorithm_state *algorithm_state,
actor_id actor_id);

/**
* This function is called when a worker that was executing a task becomes
* blocked on an object that isn't available locally yet.
*
* @param state The state of the local scheduler.
* @param algorithm_state State maintained by the scheduling algorithm.
* @param worker The worker that is blocked.
* @return Void.
*/
void handle_worker_blocked(local_scheduler_state *state,
scheduling_algorithm_state *algorithm_state,
local_scheduler_client *worker);

/**
* This function is called when a worker that was blocked on an object that
* wasn't available locally yet becomes unblocked.
*
* @param state The state of the local scheduler.
* @param algorithm_state State maintained by the scheduling algorithm.
* @param worker The worker that is now unblocked.
* @return Void.
*/
void handle_worker_unblocked(local_scheduler_state *state,
scheduling_algorithm_state *algorithm_state,
local_scheduler_client *worker);

/**
* This function fetches queued task's missing object dependencies. It is
* called every LOCAL_SCHEDULER_FETCH_TIMEOUT_MILLISECONDS.
Expand All @@ -201,6 +227,17 @@ void handle_actor_worker_disconnect(local_scheduler_state *state,
*/
int fetch_object_timeout_handler(event_loop *loop, timer_id id, void *context);

/**
* A helper function to print debug information about the current state and
* number of workers.
*
* @param message A message to identify the log message.
* @param algorithm_state State maintained by the scheduling algorithm.
* @return Void.
*/
void print_worker_info(const char *message,
scheduling_algorithm_state *algorithm_state);

/** The following methods are for testing purposes only. */
#ifdef PHOTON_TEST
/**
Expand Down
4 changes: 4 additions & 0 deletions src/photon/photon_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -76,3 +76,7 @@ void photon_reconstruct_object(photon_conn *conn, object_id object_id) {
void photon_log_message(photon_conn *conn) {
write_message(conn->conn, LOG_MESSAGE, 0, NULL);
}

void photon_notify_unblocked(photon_conn *conn) {
write_message(conn->conn, NOTIFY_UNBLOCKED, 0, NULL);
}
8 changes: 8 additions & 0 deletions src/photon/photon_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,4 +93,12 @@ void photon_reconstruct_object(photon_conn *conn, object_id object_id);
*/
void photon_log_message(photon_conn *conn);

/**
* Notify the local scheduler that this client (worker) is no longer blocked.
*
* @param conn The connection information.
* @return Void.
*/
void photon_notify_unblocked(photon_conn *conn);

#endif
7 changes: 7 additions & 0 deletions src/photon/photon_extension.c
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,11 @@ static PyObject *PyPhotonClient_log_event(PyObject *self, PyObject *args) {
Py_RETURN_NONE;
}

static PyObject *PyPhotonClient_notify_unblocked(PyObject *self) {
photon_notify_unblocked(((PyPhotonClient *) self)->photon_connection);
Py_RETURN_NONE;
}

static PyMethodDef PyPhotonClient_methods[] = {
{"submit", (PyCFunction) PyPhotonClient_submit, METH_VARARGS,
"Submit a task to the local scheduler."},
Expand All @@ -89,6 +94,8 @@ static PyMethodDef PyPhotonClient_methods[] = {
METH_VARARGS, "Ask the local scheduler to reconstruct an object."},
{"log_event", (PyCFunction) PyPhotonClient_log_event, METH_VARARGS,
"Log an event to the event log through the local scheduler."},
{"notify_unblocked", (PyCFunction) PyPhotonClient_notify_unblocked,
METH_NOARGS, "Notify the local scheduler that we are unblocked."},
{NULL} /* Sentinel */
};

Expand Down
Loading

0 comments on commit a0dd3a4

Please sign in to comment.