Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/common/format/common.fbs
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,9 @@ table LocalSchedulerInfoMessage {
// The resource vector of resources currently available to this local
// scheduler.
dynamic_resources: [double];
// Whether the local scheduler is dead. If true, then all other fields
// besides `db_client_id` will not be set.
is_dead: bool;
}

root_type LocalSchedulerInfoMessage;
Expand Down
4 changes: 4 additions & 0 deletions src/common/state/local_scheduler_table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,7 @@ void local_scheduler_table_send_info(DBHandle *db_handle,
init_table_callback(db_handle, NIL_ID, __func__, data, retry, NULL,
redis_local_scheduler_table_send_info, NULL);
}

void local_scheduler_table_disconnect(DBHandle *db_handle) {
redis_local_scheduler_table_disconnect(db_handle);
}
16 changes: 15 additions & 1 deletion src/common/state/local_scheduler_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ typedef struct {
/** The resource vector of resources currently available to this local
* scheduler. */
double dynamic_resources[ResourceIndex_MAX];
/** Whether the local scheduler is dead. If true, then all other fields
* should be ignored. */
bool is_dead;
} LocalSchedulerInfo;

/*
Expand Down Expand Up @@ -58,13 +61,14 @@ typedef struct {
} LocalSchedulerTableSubscribeData;

/**
* Send a heartbeat to all subscriers to the local scheduler table. This
* Send a heartbeat to all subscribers to the local scheduler table. This
* heartbeat contains some information about the load on the local scheduler.
*
* @param db_handle Database handle.
* @param info Information about the local scheduler, including the load on the
* local scheduler.
* @param retry Information about retrying the request to the database.
* @return Void.
*/
void local_scheduler_table_send_info(DBHandle *db_handle,
LocalSchedulerInfo *info,
Expand All @@ -77,4 +81,14 @@ typedef struct {
LocalSchedulerInfo info;
} LocalSchedulerTableSendInfoData;

/**
* Send a null heartbeat to all subscribers to the local scheduler table to
* notify them that we are about to exit. This operation is performed
* synchronously.
*
* @param db_handle Database handle.
* @return Void.
*/
void local_scheduler_table_disconnect(DBHandle *db_handle);

#endif /* LOCAL_SCHEDULER_TABLE_H */
42 changes: 33 additions & 9 deletions src/common/state/redis.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1289,14 +1289,22 @@ void redis_local_scheduler_table_subscribe_callback(redisAsyncContext *c,
DBClientID client_id = from_flatbuf(message->db_client_id());
/* Extract the fields of the local scheduler info struct. */
LocalSchedulerInfo info;
info.total_num_workers = message->total_num_workers();
info.task_queue_length = message->task_queue_length();
info.available_workers = message->available_workers();
for (int i = 0; i < ResourceIndex_MAX; ++i) {
info.static_resources[i] = message->static_resources()->Get(i);
}
for (int i = 0; i < ResourceIndex_MAX; ++i) {
info.dynamic_resources[i] = message->dynamic_resources()->Get(i);
memset(&info, 0, sizeof(info));
if (message->is_dead()) {
/* If the local scheduler is dead, then ignore all other fields in the
* message. */
info.is_dead = true;
} else {
/* If the local scheduler is alive, collect load information. */
info.total_num_workers = message->total_num_workers();
info.task_queue_length = message->task_queue_length();
info.available_workers = message->available_workers();
for (int i = 0; i < ResourceIndex_MAX; ++i) {
info.static_resources[i] = message->static_resources()->Get(i);
}
for (int i = 0; i < ResourceIndex_MAX; ++i) {
info.dynamic_resources[i] = message->dynamic_resources()->Get(i);
}
}

/* Call the subscribe callback. */
Expand Down Expand Up @@ -1355,7 +1363,7 @@ void redis_local_scheduler_table_send_info(TableCallbackData *callback_data) {
fbb, to_flatbuf(fbb, db->client), info.total_num_workers,
info.task_queue_length, info.available_workers,
fbb.CreateVector(info.static_resources, ResourceIndex_MAX),
fbb.CreateVector(info.dynamic_resources, ResourceIndex_MAX));
fbb.CreateVector(info.dynamic_resources, ResourceIndex_MAX), false);
fbb.Finish(message);

int status = redisAsyncCommand(
Expand All @@ -1368,6 +1376,22 @@ void redis_local_scheduler_table_send_info(TableCallbackData *callback_data) {
}
}

void redis_local_scheduler_table_disconnect(DBHandle *db) {
flatbuffers::FlatBufferBuilder fbb;
LocalSchedulerInfoMessageBuilder builder(fbb);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Everywhere else I think we are constructing these messages with just a single big call to the CreateLocalSchedulerInfoMessage method (or equivalent). Any particular reason for doing it this way instead? Just curious.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm I did this to make it explicit that the other fields are unset, but we could also set them to an empty LocalSchedulerInfo. Do you have a preference?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I see, I don't have a preference, the current way seems fine to me.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, it turns out this was a bug. It was causing the local scheduler valgrind error that we've been seeing. Hopefully fixed by #1037.

It was causing an assertion failure

local_scheduler_tests: /home/travis/build/ray-project/ray/python/ray/core/flatbuffers_ep-prefix/src/flatbuffers_ep-install/include/flatbuffers/flatbuffers.h:890: void flatbuffers::FlatBufferBuilder::NotNested(): Assertion `!nested' failed.

in the test

bash ../../../src/local_scheduler/test/run_valgrind.sh

builder.add_db_client_id(to_flatbuf(fbb, db->client));
builder.add_is_dead(true);
auto message = builder.Finish();
fbb.Finish(message);
redisReply *reply = (redisReply *) redisCommand(
db->sync_context, "PUBLISH local_schedulers %b", fbb.GetBufferPointer(),
fbb.GetSize());
CHECK(reply->type != REDIS_REPLY_ERROR);
CHECK(reply->type == REDIS_REPLY_INTEGER);
LOG_DEBUG("%" PRId64 " subscribers received this publish.\n", reply->integer);
freeReplyObject(reply);
}

void redis_driver_table_subscribe_callback(redisAsyncContext *c,
void *r,
void *privdata) {
Expand Down
9 changes: 9 additions & 0 deletions src/common/state/redis.h
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,15 @@ void redis_local_scheduler_table_subscribe(TableCallbackData *callback_data);
*/
void redis_local_scheduler_table_send_info(TableCallbackData *callback_data);

/**
* Synchronously publish a null update to the local scheduler table signifying
* that we are about to exit.
*
* @param db The database handle of the dying local scheduler.
* @return Void.
*/
void redis_local_scheduler_table_disconnect(DBHandle *db);

/**
* Subscribe to updates from the driver table.
*
Expand Down
18 changes: 13 additions & 5 deletions src/global_scheduler/global_scheduler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -333,14 +333,22 @@ void local_scheduler_table_handler(DBClientID client_id,
LOG_DEBUG(
"total workers = %d, task queue length = %d, available workers = %d",
info.total_num_workers, info.task_queue_length, info.available_workers);

/* Update the local scheduler info struct. */
auto it = state->local_schedulers.find(client_id);
if (it != state->local_schedulers.end()) {
/* Reset the number of tasks sent since the last heartbeat. */
LocalScheduler &local_scheduler = it->second;
local_scheduler.num_heartbeats_missed = 0;
local_scheduler.num_recent_tasks_sent = 0;
local_scheduler.info = info;
if (info.is_dead) {
/* The local scheduler is exiting. Increase the number of heartbeats
* missed to the timeout threshold. This will trigger removal of the
* local scheduler the next time the timeout handler fires. */
it->second.num_heartbeats_missed = NUM_HEARTBEATS_TIMEOUT;
} else {
/* Reset the number of tasks sent since the last heartbeat. */
LocalScheduler &local_scheduler = it->second;
local_scheduler.num_heartbeats_missed = 0;
local_scheduler.num_recent_tasks_sent = 0;
local_scheduler.info = info;
}
} else {
LOG_WARN("client_id didn't match any cached local scheduler entries");
}
Expand Down
7 changes: 3 additions & 4 deletions src/local_scheduler/local_scheduler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -175,10 +175,9 @@ void LocalSchedulerState_free(LocalSchedulerState *state) {
* responsible for deleting our entry from the db_client table, so do not
* delete it here. */
if (state->db != NULL) {
/* TODO(swang): Add a null heartbeat that tells the global scheduler that
* we are dead. This avoids having to wait for the timeout before marking
* us as dead in the db_client table, in cases where we can do a clean
* exit. */
/* Send a null heartbeat that tells the global scheduler that we are dead
* to avoid waiting for the heartbeat timeout. */
local_scheduler_table_disconnect(state->db);
DBHandle_free(state->db);
}

Expand Down
2 changes: 0 additions & 2 deletions src/local_scheduler/local_scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -185,8 +185,6 @@ LocalSchedulerState *LocalSchedulerState_init(
const char *worker_path,
int num_workers);

void LocalSchedulerState_free(LocalSchedulerState *state);

SchedulingAlgorithmState *get_algorithm_state(LocalSchedulerState *state);

void process_message(event_loop *loop,
Expand Down
20 changes: 17 additions & 3 deletions src/local_scheduler/local_scheduler_algorithm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -480,7 +480,14 @@ void fetch_missing_dependency(LocalSchedulerState *state,
/* We weren't actively fetching this object. Try the fetch once
* immediately. */
if (state->plasma_conn->get_manager_fd() != -1) {
ARROW_CHECK_OK(state->plasma_conn->Fetch(1, &obj_id));
auto arrow_status = state->plasma_conn->Fetch(1, &obj_id);
if (!arrow_status.ok()) {
LocalSchedulerState_free(state);
LOG_FATAL(
"Lost connection to the plasma manager, local scheduler is "
"exiting. Error: %s",
arrow_status.ToString().c_str());
}
}
/* Create an entry and add it to the list of active fetch requests to
* ensure that the fetch actually happens. The entry will be moved to the
Expand Down Expand Up @@ -578,9 +585,16 @@ int fetch_object_timeout_handler(event_loop *loop, timer_id id, void *context) {
for (int64_t j = 0; j < num_object_ids; j += fetch_request_size) {
int num_objects_in_request =
std::min(num_object_ids, j + fetch_request_size) - j;
ARROW_CHECK_OK(state->plasma_conn->Fetch(
auto arrow_status = state->plasma_conn->Fetch(
num_objects_in_request,
reinterpret_cast<plasma::ObjectID *>(&object_ids[j])));
reinterpret_cast<plasma::ObjectID *>(&object_ids[j]));
if (!arrow_status.ok()) {
LocalSchedulerState_free(state);
LOG_FATAL(
"Lost connection to the plasma manager, local scheduler is exiting. "
"Error: %s",
arrow_status.ToString().c_str());
}
}

/* Print a warning if this method took too long. */
Expand Down
9 changes: 9 additions & 0 deletions src/local_scheduler/local_scheduler_shared.h
Original file line number Diff line number Diff line change
Expand Up @@ -121,4 +121,13 @@ struct LocalSchedulerClient {
LocalSchedulerState *local_scheduler_state;
};

/**
* Free the local scheduler state. This disconnects all clients and notifies
* the global scheduler of the local scheduler's exit.
*
* @param state The state to free.
* @return Void
*/
void LocalSchedulerState_free(LocalSchedulerState *state);

#endif /* LOCAL_SCHEDULER_SHARED_H */
5 changes: 3 additions & 2 deletions test/component_failures_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,11 +151,12 @@ def f(x, j):
components = ray.services.all_processes[component_type]
for process in components[1:]:
process.terminate()
time.sleep(0.1)
time.sleep(1)

for process in components[1:]:
process.kill()
process.wait()
self.assertNotEqual(process.poll(), None)
time.sleep(1)

# Make sure that we can still get the objects after the executing tasks
# died.
Expand Down