Skip to content

Commit 74ac806

Browse files
stephanie-wangrobertnishihara
authored andcommitted
Local scheduler sends a null heartbeat to global scheduler (#962)
* Local scheduler sends a null heartbeat to global scheduler to notify death * Add whitespace. * Speed up component failures test * Free local scheduler state upon plasma manager disconnection
1 parent dd4e99b commit 74ac806

File tree

11 files changed

+109
-26
lines changed

11 files changed

+109
-26
lines changed

src/common/format/common.fbs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,9 @@ table LocalSchedulerInfoMessage {
129129
// The resource vector of resources currently available to this local
130130
// scheduler.
131131
dynamic_resources: [double];
132+
// Whether the local scheduler is dead. If true, then all other fields
133+
// besides `db_client_id` will not be set.
134+
is_dead: bool;
132135
}
133136

134137
root_type LocalSchedulerInfoMessage;

src/common/state/local_scheduler_table.cc

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,3 +27,7 @@ void local_scheduler_table_send_info(DBHandle *db_handle,
2727
init_table_callback(db_handle, NIL_ID, __func__, data, retry, NULL,
2828
redis_local_scheduler_table_send_info, NULL);
2929
}
30+
31+
void local_scheduler_table_disconnect(DBHandle *db_handle) {
32+
redis_local_scheduler_table_disconnect(db_handle);
33+
}

src/common/state/local_scheduler_table.h

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,9 @@ typedef struct {
2121
/** The resource vector of resources currently available to this local
2222
* scheduler. */
2323
double dynamic_resources[ResourceIndex_MAX];
24+
/** Whether the local scheduler is dead. If true, then all other fields
25+
* should be ignored. */
26+
bool is_dead;
2427
} LocalSchedulerInfo;
2528

2629
/*
@@ -58,13 +61,14 @@ typedef struct {
5861
} LocalSchedulerTableSubscribeData;
5962

6063
/**
61-
* Send a heartbeat to all subscriers to the local scheduler table. This
64+
* Send a heartbeat to all subscribers to the local scheduler table. This
6265
* heartbeat contains some information about the load on the local scheduler.
6366
*
6467
* @param db_handle Database handle.
6568
* @param info Information about the local scheduler, including the load on the
6669
* local scheduler.
6770
* @param retry Information about retrying the request to the database.
71+
* @return Void.
6872
*/
6973
void local_scheduler_table_send_info(DBHandle *db_handle,
7074
LocalSchedulerInfo *info,
@@ -77,4 +81,14 @@ typedef struct {
7781
LocalSchedulerInfo info;
7882
} LocalSchedulerTableSendInfoData;
7983

84+
/**
85+
* Send a null heartbeat to all subscribers to the local scheduler table to
86+
* notify them that we are about to exit. This operation is performed
87+
* synchronously.
88+
*
89+
* @param db_handle Database handle.
90+
* @return Void.
91+
*/
92+
void local_scheduler_table_disconnect(DBHandle *db_handle);
93+
8094
#endif /* LOCAL_SCHEDULER_TABLE_H */

src/common/state/redis.cc

Lines changed: 33 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1289,14 +1289,22 @@ void redis_local_scheduler_table_subscribe_callback(redisAsyncContext *c,
12891289
DBClientID client_id = from_flatbuf(message->db_client_id());
12901290
/* Extract the fields of the local scheduler info struct. */
12911291
LocalSchedulerInfo info;
1292-
info.total_num_workers = message->total_num_workers();
1293-
info.task_queue_length = message->task_queue_length();
1294-
info.available_workers = message->available_workers();
1295-
for (int i = 0; i < ResourceIndex_MAX; ++i) {
1296-
info.static_resources[i] = message->static_resources()->Get(i);
1297-
}
1298-
for (int i = 0; i < ResourceIndex_MAX; ++i) {
1299-
info.dynamic_resources[i] = message->dynamic_resources()->Get(i);
1292+
memset(&info, 0, sizeof(info));
1293+
if (message->is_dead()) {
1294+
/* If the local scheduler is dead, then ignore all other fields in the
1295+
* message. */
1296+
info.is_dead = true;
1297+
} else {
1298+
/* If the local scheduler is alive, collect load information. */
1299+
info.total_num_workers = message->total_num_workers();
1300+
info.task_queue_length = message->task_queue_length();
1301+
info.available_workers = message->available_workers();
1302+
for (int i = 0; i < ResourceIndex_MAX; ++i) {
1303+
info.static_resources[i] = message->static_resources()->Get(i);
1304+
}
1305+
for (int i = 0; i < ResourceIndex_MAX; ++i) {
1306+
info.dynamic_resources[i] = message->dynamic_resources()->Get(i);
1307+
}
13001308
}
13011309

13021310
/* Call the subscribe callback. */
@@ -1355,7 +1363,7 @@ void redis_local_scheduler_table_send_info(TableCallbackData *callback_data) {
13551363
fbb, to_flatbuf(fbb, db->client), info.total_num_workers,
13561364
info.task_queue_length, info.available_workers,
13571365
fbb.CreateVector(info.static_resources, ResourceIndex_MAX),
1358-
fbb.CreateVector(info.dynamic_resources, ResourceIndex_MAX));
1366+
fbb.CreateVector(info.dynamic_resources, ResourceIndex_MAX), false);
13591367
fbb.Finish(message);
13601368

13611369
int status = redisAsyncCommand(
@@ -1368,6 +1376,22 @@ void redis_local_scheduler_table_send_info(TableCallbackData *callback_data) {
13681376
}
13691377
}
13701378

1379+
void redis_local_scheduler_table_disconnect(DBHandle *db) {
1380+
flatbuffers::FlatBufferBuilder fbb;
1381+
LocalSchedulerInfoMessageBuilder builder(fbb);
1382+
builder.add_db_client_id(to_flatbuf(fbb, db->client));
1383+
builder.add_is_dead(true);
1384+
auto message = builder.Finish();
1385+
fbb.Finish(message);
1386+
redisReply *reply = (redisReply *) redisCommand(
1387+
db->sync_context, "PUBLISH local_schedulers %b", fbb.GetBufferPointer(),
1388+
fbb.GetSize());
1389+
CHECK(reply->type != REDIS_REPLY_ERROR);
1390+
CHECK(reply->type == REDIS_REPLY_INTEGER);
1391+
LOG_DEBUG("%" PRId64 " subscribers received this publish.\n", reply->integer);
1392+
freeReplyObject(reply);
1393+
}
1394+
13711395
void redis_driver_table_subscribe_callback(redisAsyncContext *c,
13721396
void *r,
13731397
void *privdata) {

src/common/state/redis.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -291,6 +291,15 @@ void redis_local_scheduler_table_subscribe(TableCallbackData *callback_data);
291291
*/
292292
void redis_local_scheduler_table_send_info(TableCallbackData *callback_data);
293293

294+
/**
295+
* Synchronously publish a null update to the local scheduler table signifying
296+
* that we are about to exit.
297+
*
298+
* @param db The database handle of the dying local scheduler.
299+
* @return Void.
300+
*/
301+
void redis_local_scheduler_table_disconnect(DBHandle *db);
302+
294303
/**
295304
* Subscribe to updates from the driver table.
296305
*

src/global_scheduler/global_scheduler.cc

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -333,14 +333,22 @@ void local_scheduler_table_handler(DBClientID client_id,
333333
LOG_DEBUG(
334334
"total workers = %d, task queue length = %d, available workers = %d",
335335
info.total_num_workers, info.task_queue_length, info.available_workers);
336+
336337
/* Update the local scheduler info struct. */
337338
auto it = state->local_schedulers.find(client_id);
338339
if (it != state->local_schedulers.end()) {
339-
/* Reset the number of tasks sent since the last heartbeat. */
340-
LocalScheduler &local_scheduler = it->second;
341-
local_scheduler.num_heartbeats_missed = 0;
342-
local_scheduler.num_recent_tasks_sent = 0;
343-
local_scheduler.info = info;
340+
if (info.is_dead) {
341+
/* The local scheduler is exiting. Increase the number of heartbeats
342+
* missed to the timeout threshold. This will trigger removal of the
343+
* local scheduler the next time the timeout handler fires. */
344+
it->second.num_heartbeats_missed = NUM_HEARTBEATS_TIMEOUT;
345+
} else {
346+
/* Reset the number of tasks sent since the last heartbeat. */
347+
LocalScheduler &local_scheduler = it->second;
348+
local_scheduler.num_heartbeats_missed = 0;
349+
local_scheduler.num_recent_tasks_sent = 0;
350+
local_scheduler.info = info;
351+
}
344352
} else {
345353
LOG_WARN("client_id didn't match any cached local scheduler entries");
346354
}

src/local_scheduler/local_scheduler.cc

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -175,10 +175,9 @@ void LocalSchedulerState_free(LocalSchedulerState *state) {
175175
* responsible for deleting our entry from the db_client table, so do not
176176
* delete it here. */
177177
if (state->db != NULL) {
178-
/* TODO(swang): Add a null heartbeat that tells the global scheduler that
179-
* we are dead. This avoids having to wait for the timeout before marking
180-
* us as dead in the db_client table, in cases where we can do a clean
181-
* exit. */
178+
/* Send a null heartbeat that tells the global scheduler that we are dead
179+
* to avoid waiting for the heartbeat timeout. */
180+
local_scheduler_table_disconnect(state->db);
182181
DBHandle_free(state->db);
183182
}
184183

src/local_scheduler/local_scheduler.h

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -185,8 +185,6 @@ LocalSchedulerState *LocalSchedulerState_init(
185185
const char *worker_path,
186186
int num_workers);
187187

188-
void LocalSchedulerState_free(LocalSchedulerState *state);
189-
190188
SchedulingAlgorithmState *get_algorithm_state(LocalSchedulerState *state);
191189

192190
void process_message(event_loop *loop,

src/local_scheduler/local_scheduler_algorithm.cc

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -480,7 +480,14 @@ void fetch_missing_dependency(LocalSchedulerState *state,
480480
/* We weren't actively fetching this object. Try the fetch once
481481
* immediately. */
482482
if (state->plasma_conn->get_manager_fd() != -1) {
483-
ARROW_CHECK_OK(state->plasma_conn->Fetch(1, &obj_id));
483+
auto arrow_status = state->plasma_conn->Fetch(1, &obj_id);
484+
if (!arrow_status.ok()) {
485+
LocalSchedulerState_free(state);
486+
LOG_FATAL(
487+
"Lost connection to the plasma manager, local scheduler is "
488+
"exiting. Error: %s",
489+
arrow_status.ToString().c_str());
490+
}
484491
}
485492
/* Create an entry and add it to the list of active fetch requests to
486493
* ensure that the fetch actually happens. The entry will be moved to the
@@ -578,9 +585,16 @@ int fetch_object_timeout_handler(event_loop *loop, timer_id id, void *context) {
578585
for (int64_t j = 0; j < num_object_ids; j += fetch_request_size) {
579586
int num_objects_in_request =
580587
std::min(num_object_ids, j + fetch_request_size) - j;
581-
ARROW_CHECK_OK(state->plasma_conn->Fetch(
588+
auto arrow_status = state->plasma_conn->Fetch(
582589
num_objects_in_request,
583-
reinterpret_cast<plasma::ObjectID *>(&object_ids[j])));
590+
reinterpret_cast<plasma::ObjectID *>(&object_ids[j]));
591+
if (!arrow_status.ok()) {
592+
LocalSchedulerState_free(state);
593+
LOG_FATAL(
594+
"Lost connection to the plasma manager, local scheduler is exiting. "
595+
"Error: %s",
596+
arrow_status.ToString().c_str());
597+
}
584598
}
585599

586600
/* Print a warning if this method took too long. */

src/local_scheduler/local_scheduler_shared.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,4 +121,13 @@ struct LocalSchedulerClient {
121121
LocalSchedulerState *local_scheduler_state;
122122
};
123123

124+
/**
125+
* Free the local scheduler state. This disconnects all clients and notifies
126+
* the global scheduler of the local scheduler's exit.
127+
*
128+
* @param state The state to free.
129+
* @return Void
130+
*/
131+
void LocalSchedulerState_free(LocalSchedulerState *state);
132+
124133
#endif /* LOCAL_SCHEDULER_SHARED_H */

0 commit comments

Comments
 (0)