Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement object table notification subscriptions and switch to using Redis modules for object table. #134

Merged
merged 12 commits into from
Dec 19, 2016
Prev Previous commit
Next Next commit
Add prefix to the object notification channel name.
  • Loading branch information
robertnishihara committed Dec 19, 2016
commit 13b1f214615ca9f57f2690c096deff2a4eb39525
11 changes: 8 additions & 3 deletions src/common/redis_module/ray_redis_module.c
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
#define OBJECT_NOTIFICATION_PREFIX "ON:"
#define TASK_PREFIX "TT:"

#define OBJECT_CHANNEL_PREFIX "OC:"

#define CHECK_ERROR(STATUS, MESSAGE) \
if ((STATUS) == REDISMODULE_ERR) { \
return RedisModule_ReplyWithError(ctx, (MESSAGE)); \
Expand Down Expand Up @@ -254,13 +256,16 @@ bool PublishObjectNotification(RedisModuleCtx *ctx,

/* Publish the notification to the clients notification channel.
* TODO(rkn): These notifications could be batched together. */
RedisModuleString *channel_name =
CreatePrefixedString(ctx, OBJECT_CHANNEL_PREFIX, client_id);
RedisModuleCallReply *reply;
reply = RedisModule_Call(ctx, "PUBLISH", "ss", client_id, manager_list);
reply = RedisModule_Call(ctx, "PUBLISH", "ss", channel_name,
manager_list);
RedisModule_FreeString(ctx, channel_name);
RedisModule_FreeString(ctx, manager_list);
if (reply == NULL) {
return false;
}
/* Clean up. */
RedisModule_FreeString(ctx, manager_list);
return true;
}

Expand Down
3 changes: 2 additions & 1 deletion src/common/redis_module/runtest.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
OBJECT_LOCATION_PREFIX = "OL:"
OBJECT_SUBSCRIBE_PREFIX = "OS:"
TASK_PREFIX = "TT:"
OBJECT_CHANNEL_PREFIX = "OC:"

class TestGlobalStateStore(unittest.TestCase):

Expand Down Expand Up @@ -87,7 +88,7 @@ def testObjectTableAddAndLookup(self):
def testObjectTableSubscribeToNotifications(self):
p = self.redis.pubsub()
# Subscribe to an object ID.
p.psubscribe("manager_id1")
p.psubscribe("{}manager_id1".format(OBJECT_CHANNEL_PREFIX))
self.redis.execute_command("RAY.OBJECT_TABLE_ADD", "object_id1", 1, "hash1", "manager_id2")
# Receive the acknowledgement message.
self.assertEqual(p.get_message()["data"], 1)
Expand Down
7 changes: 5 additions & 2 deletions src/common/state/redis.c
Original file line number Diff line number Diff line change
Expand Up @@ -647,13 +647,16 @@ void object_table_redis_subscribe_to_notifications_callback(
void redis_object_table_subscribe_to_notifications(
table_callback_data *callback_data) {
db_handle *db = callback_data->db_handle;
/* The object channel prefix must match the value defined in
* src/common/redismodule/ray_redis_module.c. */
const char *object_channel_prefix = "OC:";
/* Subscribe to notifications from the object table. This uses the client ID
* as the channel name so this channel is specific to this client. TODO(rkn):
* The channel name should probably be the client ID with some prefix. */
int status = redisAsyncCommand(
db->sub_context, object_table_redis_subscribe_to_notifications_callback,
(void *) callback_data->timer_id, "SUBSCRIBE %b", db->client.id,
sizeof(db->client.id));
(void *) callback_data->timer_id, "SUBSCRIBE %s%b",
object_channel_prefix, db->client.id, sizeof(db->client.id));
if ((status == REDIS_ERR) || db->sub_context->err) {
LOG_REDIS_DEBUG(db->sub_context,
"error in redis_object_table_subscribe_to_notifications");
Expand Down