Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify resource bookkeeping in local scheduler. #494

Merged
merged 2 commits into from
Apr 28, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 99 additions & 35 deletions src/local_scheduler/local_scheduler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -125,14 +125,11 @@ void kill_worker(LocalSchedulerState *state,
sizeof(task_id), task_id.id);
}

/* Release any resources held by the worker. */
release_resources(state, worker, worker->cpus_in_use, worker->gpus_in_use);

/* Clean up the task in progress. */
if (worker->task_in_progress) {
if (!worker->is_blocked) {
/* Return the resources that the worker was using, if any. Blocked
* workers do not use resources. */
TaskSpec *spec = Task_task_spec(worker->task_in_progress);
update_dynamic_resources(state, spec, true);
}
/* Update the task table to reflect that the task failed to complete. */
if (state->db != NULL) {
Task_set_state(worker->task_in_progress, TASK_STATUS_LOST);
Expand Down Expand Up @@ -396,32 +393,63 @@ LocalSchedulerState *LocalSchedulerState_init(
return state;
}

void update_dynamic_resources(LocalSchedulerState *state,
TaskSpec *spec,
bool return_resources) {
for (int i = 0; i < ResourceIndex_MAX; ++i) {
double resource = TaskSpec_get_required_resource(spec, i);
if (!return_resources) {
/* If we are not returning resources, we are leasing them, so we want to
* subtract the resource quantities from our accounting. */
resource *= -1;
}
bool check_dynamic_resources(LocalSchedulerState *state,
double num_cpus,
double num_gpus) {
if (num_cpus > 0 && state->dynamic_resources[ResourceIndex_CPU] < num_cpus) {
/* We only use this check when num_cpus is positive so that we can still
* create actors even when the CPUs are oversubscribed. */
return false;
}
if (state->dynamic_resources[ResourceIndex_GPU] < num_gpus) {
return false;
}
return true;
}

bool oversubscribed =
(!return_resources && state->dynamic_resources[i] < 0);
/* Add or subtract the task's resources from our count. */
state->dynamic_resources[i] += resource;

if ((!return_resources && state->dynamic_resources[i] < 0) &&
!oversubscribed) {
/* Log a warning if we are using more resources than we have been
* allocated, and we weren't already oversubscribed. */
LOG_WARN("local_scheduler dynamic resources dropped to %8.4f\t%8.4f\n",
state->dynamic_resources[0], state->dynamic_resources[1]);
}
CHECK(state->dynamic_resources[i] <= state->static_resources[i]);
void acquire_resources(LocalSchedulerState *state,
LocalSchedulerClient *worker,
double num_cpus,
double num_gpus) {
/* Acquire the CPU resources. */
bool oversubscribed = (state->dynamic_resources[ResourceIndex_CPU] < 0);
state->dynamic_resources[ResourceIndex_CPU] -= num_cpus;
CHECK(worker->cpus_in_use == 0);
worker->cpus_in_use += num_cpus;
/* Log a warning if we are using more resources than we have been allocated,
* and we weren't already oversubscribed. */
if (!oversubscribed && state->dynamic_resources[ResourceIndex_CPU] < 0) {
LOG_WARN("local_scheduler dynamic resources dropped to %8.4f\t%8.4f\n",
state->dynamic_resources[ResourceIndex_CPU],
state->dynamic_resources[ResourceIndex_GPU]);
}

/* Acquire the GPU resources. */
if (num_gpus != 0) {
/* Make sure that the worker isn't using any GPUs already. */
CHECK(worker->gpus_in_use == 0);
worker->gpus_in_use += num_gpus;
/* Update the total quantity of GPU resources available. */
CHECK(state->dynamic_resources[ResourceIndex_GPU] >= num_gpus);
state->dynamic_resources[ResourceIndex_GPU] -= num_gpus;
}
}

void release_resources(LocalSchedulerState *state,
LocalSchedulerClient *worker,
double num_cpus,
double num_gpus) {
/* Release the CPU resources. */
CHECK(num_cpus == worker->cpus_in_use);
state->dynamic_resources[ResourceIndex_CPU] += num_cpus;
worker->cpus_in_use = 0;

/* Release the GPU resources. */
if (num_gpus != 0) {
CHECK(num_gpus == worker->gpus_in_use);
state->dynamic_resources[ResourceIndex_GPU] += num_gpus;
worker->gpus_in_use = 0;
}
print_resource_info(state, spec);
}

bool is_driver_alive(LocalSchedulerState *state, WorkerID driver_id) {
Expand All @@ -432,6 +460,7 @@ void assign_task_to_worker(LocalSchedulerState *state,
TaskSpec *spec,
int64_t task_spec_size,
LocalSchedulerClient *worker) {
CHECK(ActorID_equal(worker->actor_id, TaskSpec_actor_id(spec)));
/* Make sure the driver for this task is still alive. */
WorkerID driver_id = TaskSpec_driver_id(spec);
CHECK(is_driver_alive(state, driver_id));
Expand All @@ -456,9 +485,14 @@ void assign_task_to_worker(LocalSchedulerState *state,
}
}

/* Resource accounting:
* Update dynamic resource vector in the local scheduler state. */
update_dynamic_resources(state, spec, false);
/* Acquire the necessary resources for running this task. TODO(rkn): We are
* currently ignoring resource bookkeeping for actor methods. */
if (ActorID_equal(worker->actor_id, NIL_ACTOR_ID)) {
acquire_resources(state, worker,
TaskSpec_get_required_resource(spec, ResourceIndex_CPU),
TaskSpec_get_required_resource(spec, ResourceIndex_GPU));
}

Task *task = Task_alloc(spec, task_spec_size, TASK_STATUS_RUNNING,
state->db ? get_db_client_id(state->db) : NIL_ID);
/* Record which task this worker is executing. This will be freed in
Expand Down Expand Up @@ -820,8 +854,16 @@ void process_message(event_loop *loop,
/* If this worker reports a completed task: account for resources. */
if (worker->task_in_progress != NULL) {
TaskSpec *spec = Task_task_spec(worker->task_in_progress);
/* Return dynamic resources back for the task in progress. */
update_dynamic_resources(state, spec, true);
/* Return dynamic resources back for the task in progress. TODO(rkn): We
* are currently ignoring resource bookkeeping for actor methods. */
if (ActorID_equal(worker->actor_id, NIL_ACTOR_ID)) {
CHECK(worker->cpus_in_use ==
TaskSpec_get_required_resource(spec, ResourceIndex_CPU));
CHECK(worker->gpus_in_use ==
TaskSpec_get_required_resource(spec, ResourceIndex_GPU));
release_resources(state, worker, worker->cpus_in_use,
worker->gpus_in_use);
}
/* If we're connected to Redis, update tables. */
if (state->db != NULL) {
/* Update control state tables. */
Expand Down Expand Up @@ -852,6 +894,12 @@ void process_message(event_loop *loop,
/* If the worker was executing a task (i.e. non-driver) and it wasn't
* already blocked on an object that's not locally available, update its
* state to blocked. */
worker->is_blocked = true;
/* Return the CPU resources that the blocked worker was using, but not
* GPU resources. */
release_resources(state, worker, worker->cpus_in_use, 0);
/* Let the scheduling algorithm process the fact that the worker is
* blocked. */
handle_worker_blocked(state, state->algorithm_state, worker);
print_worker_info("Reconstructing", state->algorithm_state);
}
Expand All @@ -863,12 +911,26 @@ void process_message(event_loop *loop,
handle_client_disconnect(state, worker);
} break;
case MessageType_NotifyUnblocked: {
/* TODO(rkn): A driver may call this as well, right? */
if (worker->task_in_progress != NULL) {
/* TODO(swang): For now, we don't handle blocked actors. */
if (ActorID_equal(worker->actor_id, NIL_ACTOR_ID)) {
/* If the worker was executing a task (i.e. non-driver), update its
* state to not blocked. */
CHECK(worker->is_blocked);
worker->is_blocked = false;
/* Lease back the CPU resources that the blocked worker needs (note that
* it never released its GPU resources). TODO(swang): Leasing back the
* resources to blocked workers can cause us to transiently exceed the
* maximum number of resources. This could be fixed by having blocked
* workers explicitly yield and wait to be given back resources before
* continuing execution. */
TaskSpec *spec = Task_task_spec(worker->task_in_progress);
acquire_resources(
state, worker,
TaskSpec_get_required_resource(spec, ResourceIndex_CPU), 0);
/* Let the scheduling algorithm process the fact that the worker is
* unblocked. */
handle_worker_unblocked(state, state->algorithm_state, worker);
}
}
Expand Down Expand Up @@ -902,6 +964,8 @@ void new_client_connection(event_loop *loop,
worker->is_worker = true;
worker->client_id = NIL_WORKER_ID;
worker->task_in_progress = NULL;
worker->cpus_in_use = 0;
worker->gpus_in_use = 0;
worker->is_blocked = false;
worker->pid = 0;
worker->is_child = false;
Expand Down
46 changes: 36 additions & 10 deletions src/local_scheduler/local_scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,20 +109,46 @@ void kill_worker(LocalSchedulerState *state,
void start_worker(LocalSchedulerState *state, ActorID actor_id);

/**
* Update our accounting for the current resources being used, according to
* some task that is starting or finishing execution.
* Check if a certain quantity of dynamic resources are available. If num_cpus
* is 0, we ignore the dynamic number of available CPUs (which may be negative).
*
* @param state The state of the local scheduler.
* @param num_cpus Check if this many CPUs are available.
* @param num_gpus Check if this many GPUs are available.
* @return True if there are enough CPUs and GPUs and false otherwise.
*/
bool check_dynamic_resources(LocalSchedulerState *state,
double num_cpus,
double num_gpus);

/**
* Acquire additional resources (CPUs and GPUs) for a worker.
*
* @param state The local scheduler state.
* @param worker The worker who is acquiring resources.
* @param num_cpus The number of CPU resources to acquire.
* @param num_gpus The number of GPU resources to acquire.
* @return Void.
*/
void acquire_resources(LocalSchedulerState *state,
LocalSchedulerClient *worker,
double num_cpus,
double num_gpus);

/**
* Return resources (CPUs and GPUs) being used by a worker to the local
* scheduler.
*
* @param state The local scheduler state.
* @param spec The specification for the task that is or was using resources.
* @param return_resources A boolean representing whether the task is starting
* or finishing execution. If true, then the task is finishing execution
* (possibly temporarily), so it will add to the dynamic resources
* available. Else, it will take from the dynamic resources available.
* @param worker The worker who is returning resources.
* @param num_cpus The number of CPU resources to return.
* @param num_gpus The number of GPU resources to return.
* @return Void.
*/
void update_dynamic_resources(LocalSchedulerState *state,
TaskSpec *spec,
bool return_resources);
void release_resources(LocalSchedulerState *state,
LocalSchedulerClient *worker,
double num_cpus,
double num_gpus);

/** The following methods are for testing purposes only. */
#ifdef LOCAL_SCHEDULER_TEST
Expand Down
14 changes: 0 additions & 14 deletions src/local_scheduler/local_scheduler_algorithm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1022,12 +1022,7 @@ void handle_worker_blocked(LocalSchedulerState *state,
DCHECK(!worker_in_vector(algorithm_state->blocked_workers, worker));

/* Add the worker to the list of blocked workers. */
worker->is_blocked = true;
algorithm_state->blocked_workers.push_back(worker);
/* Return the resources that the blocked worker was using. */
CHECK(worker->task_in_progress != NULL);
TaskSpec *spec = Task_task_spec(worker->task_in_progress);
update_dynamic_resources(state, spec, true);

/* Try to dispatch tasks, since we may have freed up some resources. */
dispatch_tasks(state, algorithm_state);
Expand All @@ -1042,16 +1037,7 @@ void handle_worker_unblocked(LocalSchedulerState *state,
/* Check that the worker isn't in the list of executing workers. */
DCHECK(!worker_in_vector(algorithm_state->executing_workers, worker));

/* Lease back the resources that the blocked worker will need. */
/* TODO(swang): Leasing back the resources to blocked workers can cause
* us to transiently exceed the maximum number of resources. This can be
* fixed by having blocked workers explicitly yield and wait to be given
* back resources before continuing execution. */
CHECK(worker->task_in_progress != NULL);
TaskSpec *spec = Task_task_spec(worker->task_in_progress);
update_dynamic_resources(state, spec, false);
/* Add the worker to the list of executing workers. */
worker->is_blocked = false;
algorithm_state->executing_workers.push_back(worker);
}

Expand Down
11 changes: 11 additions & 0 deletions src/local_scheduler/local_scheduler_shared.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,17 @@ struct LocalSchedulerClient {
* no task is running on the worker, this will be NULL. This is used to
* update the task table. */
Task *task_in_progress;
/** The number of CPUs that the worker is currently using. This will only be
* nonzero when the worker is actively executing a task. If the worker is
* blocked, then this value will be zero. */
double cpus_in_use;
/** The number of GPUs that the worker is currently using. If the worker is an
* actor, this will be constant throughout the lifetime of the actor (and
* will be equal to the number of GPUs requested by the actor). If the worker
* is not an actor, this will be constant for the duration of a task and will
* have length equal to the number of GPUs requested by the task (in
* particular it will not change if the task blocks). */
double gpus_in_use;
/** A flag to indicate whether this worker is currently blocking on an
* object(s) that isn't available locally yet. */
bool is_blocked;
Expand Down