Skip to content

Local scheduler filters out dead clients during reconstruction #1182

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions python/ray/experimental/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -353,9 +353,9 @@ def client_table(self):
"Deleted": bool(int(decode(client_info[b"deleted"]))),
"DBClientID": binary_to_hex(client_info[b"ray_client_id"])
}
if b"aux_address" in client_info:
if b"manager_address" in client_info:
client_info_parsed["AuxAddress"] = decode(
client_info[b"aux_address"])
client_info[b"manager_address"])
if b"num_cpus" in client_info:
client_info_parsed["NumCPUs"] = float(
decode(client_info[b"num_cpus"]))
Expand Down
2 changes: 1 addition & 1 deletion python/ray/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -1095,7 +1095,7 @@ def get_address_info_from_redis_helper(redis_address, node_ip_address):
# Build the address information.
object_store_addresses = []
for manager in plasma_managers:
address = manager[b"address"].decode("ascii")
address = manager[b"manager_address"].decode("ascii")
port = services.get_port(address)
object_store_addresses.append(
services.ObjectStoreAddress(
Expand Down
4 changes: 4 additions & 0 deletions src/common/common.cc
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ bool DBClientID_equal(DBClientID first_id, DBClientID second_id) {
return UNIQUE_ID_EQ(first_id, second_id);
}

bool DBClientID_is_nil(DBClientID id) {
return IS_NIL_ID(id);
}

bool WorkerID_equal(WorkerID first_id, WorkerID second_id) {
return UNIQUE_ID_EQ(first_id, second_id);
}
Expand Down
8 changes: 8 additions & 0 deletions src/common/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,14 @@ typedef UniqueID DBClientID;
*/
bool DBClientID_equal(DBClientID first_id, DBClientID second_id);

/**
* Compare a db client ID to the nil ID.
*
* @param id The db client ID to compare to nil.
* @return True if the db client ID is equal to nil.
*/
bool DBClientID_is_nil(ObjectID id);

#define MAX(x, y) ((x) >= (y) ? (x) : (y))
#define MIN(x, y) ((x) <= (y) ? (x) : (y))

Expand Down
2 changes: 1 addition & 1 deletion src/common/format/common.fbs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ table SubscribeToDBClientTableReply {
client_type: string;
// If the client is a local scheduler, this is the address of the plasma
// manager that the local scheduler is connected to. Otherwise, it is empty.
aux_address: string;
manager_address: string;
// True if the message is about the addition of a client and false if it is
// about the deletion of a client.
is_insertion: bool;
Expand Down
163 changes: 69 additions & 94 deletions src/common/redis_module/ray_redis_module.cc
Original file line number Diff line number Diff line change
Expand Up @@ -83,41 +83,42 @@ flatbuffers::Offset<flatbuffers::String> RedisStringToFlatbuf(
*
* TODO(swang): Use flatbuffers for the notification message.
* The format for the published notification is:
* <ray_client_id>:<client type> <aux_address> <is_insertion>
* If no auxiliary address is provided, aux_address will be set to ":". If
* <ray_client_id>:<client type> <manager_address> <is_insertion>
* If no manager address is provided, manager_address will be set to ":". If
* is_insertion is true, then the last field will be "1", else "0".
*
* @param ctx The Redis context.
* @param ray_client_id The ID of the database client that was inserted or
* deleted.
* @param client_type The type of client that was inserted or deleted.
* @param aux_address An optional secondary address associated with the
* database client.
* @param manager_address An optional secondary address for the object manager
* associated with this database client.
* @param is_insertion A boolean that's true if the update was an insertion and
* false if deletion.
* @return True if the publish was successful and false otherwise.
*/
bool PublishDBClientNotification(RedisModuleCtx *ctx,
RedisModuleString *ray_client_id,
RedisModuleString *client_type,
RedisModuleString *aux_address,
RedisModuleString *manager_address,
bool is_insertion) {
/* Construct strings to publish on the db client channel. */
RedisModuleString *channel_name =
RedisModule_CreateString(ctx, "db_clients", strlen("db_clients"));
/* Construct the flatbuffers object to publish over the channel. */
flatbuffers::FlatBufferBuilder fbb;
/* Use an empty aux address if one is not passed in. */
flatbuffers::Offset<flatbuffers::String> aux_address_str;
if (aux_address != NULL) {
aux_address_str = RedisStringToFlatbuf(fbb, aux_address);
flatbuffers::Offset<flatbuffers::String> manager_address_str;
if (manager_address != NULL) {
manager_address_str = RedisStringToFlatbuf(fbb, manager_address);
} else {
aux_address_str = fbb.CreateString("", strlen(""));
manager_address_str = fbb.CreateString("", strlen(""));
}
/* Create the flatbuffers message. */
auto message = CreateSubscribeToDBClientTableReply(
fbb, RedisStringToFlatbuf(fbb, ray_client_id),
RedisStringToFlatbuf(fbb, client_type), aux_address_str, is_insertion);
RedisStringToFlatbuf(fbb, client_type), manager_address_str,
is_insertion);
fbb.Finish(message);
/* Create a Redis string to publish by serializing the flatbuffers object. */
RedisModuleString *client_info = RedisModule_CreateString(
Expand All @@ -141,14 +142,10 @@ bool PublishDBClientNotification(RedisModuleCtx *ctx,
* and these will be stored in a hashmap associated with this client. Several
* fields are singled out for special treatment:
*
* address: This is provided by plasma managers and it should be an address
* like "127.0.0.1:1234". It is returned by RAY.GET_CLIENT_ADDRESS so
* that other plasma managers know how to fetch objects.
* aux_address: This is provided by local schedulers and should be the
* address of the plasma manager that the local scheduler is connected
* to. This is published to the "db_clients" channel by the RAY.CONNECT
* command and is used by the global scheduler to determine which plasma
* managers and local schedulers are connected.
* manager_address: This is provided by local schedulers and plasma
* managers and should be the address of the plasma manager that the
* client is associated with. This is published to the "db_clients"
* channel by the RAY.CONNECT command.
*
* @param ray_client_id The db client ID of the client.
* @param node_ip_address The IP address of the node the client is on.
Expand Down Expand Up @@ -178,9 +175,9 @@ int Connect_RedisCommand(RedisModuleCtx *ctx,
}

/* This will be used to construct a publish message. */
RedisModuleString *aux_address = NULL;
RedisModuleString *aux_address_key =
RedisModule_CreateString(ctx, "aux_address", strlen("aux_address"));
RedisModuleString *manager_address = NULL;
RedisModuleString *manager_address_key = RedisModule_CreateString(
ctx, "manager_address", strlen("manager_address"));
RedisModuleString *deleted = RedisModule_CreateString(ctx, "0", strlen("0"));

RedisModule_HashSet(db_client_table_key, REDISMODULE_HASH_CFIELDS,
Expand All @@ -193,16 +190,16 @@ int Connect_RedisCommand(RedisModuleCtx *ctx,
RedisModuleString *value = argv[i + 1];
RedisModule_HashSet(db_client_table_key, REDISMODULE_HASH_NONE, key, value,
NULL);
if (RedisModule_StringCompare(key, aux_address_key) == 0) {
aux_address = value;
if (RedisModule_StringCompare(key, manager_address_key) == 0) {
manager_address = value;
}
}
/* Clean up. */
RedisModule_FreeString(ctx, deleted);
RedisModule_FreeString(ctx, aux_address_key);
RedisModule_FreeString(ctx, manager_address_key);
RedisModule_CloseKey(db_client_table_key);
if (!PublishDBClientNotification(ctx, ray_client_id, client_type, aux_address,
true)) {
if (!PublishDBClientNotification(ctx, ray_client_id, client_type,
manager_address, true)) {
return RedisModule_ReplyWithError(ctx, "PUBLISH unsuccessful");
}

Expand Down Expand Up @@ -256,16 +253,16 @@ int Disconnect_RedisCommand(RedisModuleCtx *ctx,
RedisModule_FreeString(ctx, deleted);

RedisModuleString *client_type;
RedisModuleString *aux_address;
RedisModuleString *manager_address;
RedisModule_HashGet(db_client_table_key, REDISMODULE_HASH_CFIELDS,
"client_type", &client_type, "aux_address",
&aux_address, NULL);
"client_type", &client_type, "manager_address",
&manager_address, NULL);

/* Publish the deletion notification on the db client channel. */
published = PublishDBClientNotification(ctx, ray_client_id, client_type,
aux_address, false);
if (aux_address != NULL) {
RedisModule_FreeString(ctx, aux_address);
manager_address, false);
if (manager_address != NULL) {
RedisModule_FreeString(ctx, manager_address);
}
RedisModule_FreeString(ctx, client_type);
}
Expand All @@ -282,50 +279,6 @@ int Disconnect_RedisCommand(RedisModuleCtx *ctx,
return REDISMODULE_OK;
}

/**
* Get the address of a client from its db client ID. This is called from a
* client with the command:
*
* RAY.GET_CLIENT_ADDRESS <ray client id>
*
* @param ray_client_id The db client ID of the client.
* @return The address of the client if the operation was successful.
*/
int GetClientAddress_RedisCommand(RedisModuleCtx *ctx,
RedisModuleString **argv,
int argc) {
if (argc != 2) {
return RedisModule_WrongArity(ctx);
}

RedisModuleString *ray_client_id = argv[1];
/* Get the request client address from the db client table. */
RedisModuleKey *db_client_table_key =
OpenPrefixedKey(ctx, DB_CLIENT_PREFIX, ray_client_id, REDISMODULE_READ);
if (db_client_table_key == NULL) {
/* There is no client with this ID. */
RedisModule_CloseKey(db_client_table_key);
return RedisModule_ReplyWithError(ctx, "invalid client ID");
}
RedisModuleString *address;
RedisModule_HashGet(db_client_table_key, REDISMODULE_HASH_CFIELDS, "address",
&address, NULL);
if (address == NULL) {
/* The key did not exist. This should not happen. */
RedisModule_CloseKey(db_client_table_key);
return RedisModule_ReplyWithError(
ctx, "Client does not have an address field. This shouldn't happen.");
}

RedisModule_ReplyWithString(ctx, address);

/* Cleanup. */
RedisModule_CloseKey(db_client_table_key);
RedisModule_FreeString(ctx, address);

return REDISMODULE_OK;
}

/**
* Lookup an entry in the object table.
*
Expand Down Expand Up @@ -1054,7 +1007,7 @@ int TaskTableUpdate_RedisCommand(RedisModuleCtx *ctx,
* This is called from a client with the command:
*
* RAY.TASK_TABLE_TEST_AND_UPDATE <task ID> <test state bitmask> <state>
* <local scheduler ID>
* <local scheduler ID> <test local scheduler ID (optional)>
*
* @param task_id A string that is the ID of the task.
* @param test_state_bitmask A string that is the test bitmask for the
Expand All @@ -1064,19 +1017,28 @@ int TaskTableUpdate_RedisCommand(RedisModuleCtx *ctx,
* instance) to update the task entry with.
* @param ray_client_id A string that is the ray client ID of the associated
* local scheduler, if any, to update the task entry with.
* @param test_local_scheduler_id A string to test the local scheduler ID. If
* provided, and if the current local scheduler ID does not match it,
* then the update does not happen.
* @return Returns the task entry as a TaskReply. The reply will reflect the
* update, if it happened.
*/
int TaskTableTestAndUpdate_RedisCommand(RedisModuleCtx *ctx,
RedisModuleString **argv,
int argc) {
if (argc != 5) {
if (argc < 5 || argc > 6) {
return RedisModule_WrongArity(ctx);
}
/* If a sixth argument was provided, then we should also test the current
* local scheduler ID. */
bool test_local_scheduler = (argc == 6);

RedisModuleString *state = argv[3];
RedisModuleString *task_id = argv[1];
RedisModuleString *test_state = argv[2];
RedisModuleString *update_state = argv[3];
RedisModuleString *local_scheduler_id = argv[4];

RedisModuleKey *key = OpenPrefixedKey(ctx, TASK_PREFIX, argv[1],
RedisModuleKey *key = OpenPrefixedKey(ctx, TASK_PREFIX, task_id,
REDISMODULE_READ | REDISMODULE_WRITE);
if (RedisModule_KeyType(key) == REDISMODULE_KEYTYPE_EMPTY) {
RedisModule_CloseKey(key);
Expand All @@ -1085,8 +1047,10 @@ int TaskTableTestAndUpdate_RedisCommand(RedisModuleCtx *ctx,

/* If the key exists, look up the fields and return them in an array. */
RedisModuleString *current_state = NULL;
RedisModuleString *current_local_scheduler_id = NULL;
RedisModule_HashGet(key, REDISMODULE_HASH_CFIELDS, "state", &current_state,
NULL);
"local_scheduler_id", &current_local_scheduler_id, NULL);

long long current_state_integer;
if (RedisModule_StringToLongLong(current_state, &current_state_integer) !=
REDISMODULE_OK) {
Expand All @@ -1098,25 +1062,42 @@ int TaskTableTestAndUpdate_RedisCommand(RedisModuleCtx *ctx,
return RedisModule_ReplyWithError(ctx, "Found invalid scheduling state.");
}
long long test_state_bitmask;
int status = RedisModule_StringToLongLong(argv[2], &test_state_bitmask);
int status = RedisModule_StringToLongLong(test_state, &test_state_bitmask);
if (status != REDISMODULE_OK) {
RedisModule_CloseKey(key);
return RedisModule_ReplyWithError(
ctx, "Invalid test value for scheduling state");
}

bool updated = false;
bool update = false;
if (current_state_integer & test_state_bitmask) {
/* The test passed, so perform the update. */
RedisModule_HashSet(key, REDISMODULE_HASH_CFIELDS, "state", state,
"local_scheduler_id", argv[4], NULL);
updated = true;
if (test_local_scheduler) {
/* A test local scheduler ID was provided. Test whether it is equal to
* the current local scheduler ID before performing the update. */
RedisModuleString *test_local_scheduler_id = argv[5];
if (RedisModule_StringCompare(current_local_scheduler_id,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we do this comparison before the if (current_state_integer & test_state_bitmask) { line? It seems a little confusing to start with update = false, then set update = true, and then set it back to update = false.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yes, let me clean up this section...

test_local_scheduler_id) == 0) {
/* If the current local scheduler ID does matches the test ID, then
* perform the update. */
update = true;
}
} else {
/* No test local scheduler ID was provided. Perform the update. */
update = true;
}
}

/* If the scheduling state and local scheduler ID tests passed, then perform
* the update. */
if (update) {
RedisModule_HashSet(key, REDISMODULE_HASH_CFIELDS, "state", update_state,
"local_scheduler_id", local_scheduler_id, NULL);
}

/* Clean up. */
RedisModule_CloseKey(key);
/* Construct a reply by getting the task from the task ID. */
return ReplyWithTask(ctx, argv[1], updated);
return ReplyWithTask(ctx, task_id, update);
}

/**
Expand Down Expand Up @@ -1168,12 +1149,6 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx,
return REDISMODULE_ERR;
}

if (RedisModule_CreateCommand(ctx, "ray.get_client_address",
GetClientAddress_RedisCommand, "write", 0, 0,
0) == REDISMODULE_ERR) {
return REDISMODULE_ERR;
}

if (RedisModule_CreateCommand(ctx, "ray.object_table_lookup",
ObjectTableLookup_RedisCommand, "readonly", 0,
0, 0) == REDISMODULE_ERR) {
Expand Down
Loading