-
Notifications
You must be signed in to change notification settings - Fork 6.8k
Make xray object table credis-managed and hence flushable. #2338
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
e71d1e0
628c2ea
37f4e01
b32c532
9418339
2403fe6
86d8801
9186222
f143e45
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -654,37 +654,15 @@ int ChainTableAdd_RedisCommand(RedisModuleCtx *ctx, | |
} | ||
#endif | ||
|
||
/// Append an entry to the log stored at a key. Publishes a notification about | ||
/// the update to all subscribers, if a pubsub channel is provided. | ||
/// | ||
/// This is called from a client with the command: | ||
// | ||
/// RAY.TABLE_APPEND <table_prefix> <pubsub_channel> <id> <data> | ||
/// <index (optional)> | ||
/// | ||
/// \param table_prefix The prefix string for keys in this table. | ||
/// \param pubsub_channel The pubsub channel name that notifications for | ||
/// this key should be published to. When publishing to a specific | ||
/// client, the channel name should be <pubsub_channel>:<client_id>. | ||
/// \param id The ID of the key to append to. | ||
/// \param data The data to append to the key. | ||
/// \param index If this is set, then the data must be appended at this index. | ||
/// If the current log is shorter or longer than the requested index, | ||
/// then the append will fail and an error message will be returned as a | ||
/// string. | ||
/// \return OK if the append succeeds, or an error message string if the append | ||
/// fails. | ||
int TableAppend_RedisCommand(RedisModuleCtx *ctx, | ||
RedisModuleString **argv, | ||
int argc) { | ||
RedisModule_AutoMemory(ctx); | ||
|
||
int TableAppend_DoWrite(RedisModuleCtx *ctx, | ||
RedisModuleString **argv, | ||
int argc, | ||
RedisModuleString **mutated_key_str) { | ||
if (argc < 5 || argc > 6) { | ||
return RedisModule_WrongArity(ctx); | ||
} | ||
|
||
RedisModuleString *prefix_str = argv[1]; | ||
RedisModuleString *pubsub_channel_str = argv[2]; | ||
RedisModuleString *id = argv[3]; | ||
RedisModuleString *data = argv[4]; | ||
RedisModuleString *index_str = nullptr; | ||
|
@@ -693,8 +671,9 @@ int TableAppend_RedisCommand(RedisModuleCtx *ctx, | |
} | ||
|
||
// Set the keys in the table. | ||
RedisModuleKey *key = OpenPrefixedKey(ctx, prefix_str, id, | ||
REDISMODULE_READ | REDISMODULE_WRITE); | ||
RedisModuleKey *key = | ||
OpenPrefixedKey(ctx, prefix_str, id, REDISMODULE_READ | REDISMODULE_WRITE, | ||
mutated_key_str); | ||
// Determine the index at which the data should be appended. If no index is | ||
// requested, then is the current length of the log. | ||
size_t index = RedisModule_ValueLength(key); | ||
|
@@ -717,23 +696,76 @@ int TableAppend_RedisCommand(RedisModuleCtx *ctx, | |
// necessary since we implement the log with a sorted set, so all entries | ||
// must be unique, or else we will have gaps in the log. | ||
RAY_CHECK(flags == REDISMODULE_ZADD_ADDED) << "Appended a duplicate entry"; | ||
// Publish a message on the requested pubsub channel if necessary. | ||
TablePubsub pubsub_channel = ParseTablePubsub(pubsub_channel_str); | ||
if (pubsub_channel != TablePubsub::NO_PUBLISH) { | ||
// All other pubsub channels write the data back directly onto the | ||
// channel. | ||
return PublishTableAdd(ctx, pubsub_channel_str, id, data); | ||
} else { | ||
return RedisModule_ReplyWithSimpleString(ctx, "OK"); | ||
} | ||
return REDISMODULE_OK; | ||
} else { | ||
// The requested index did not match the current length of the log. Return | ||
// an error message as a string. | ||
const char *reply = "ERR entry exists"; | ||
return RedisModule_ReplyWithStringBuffer(ctx, reply, strlen(reply)); | ||
static const char *reply = "ERR entry exists"; | ||
RedisModule_ReplyWithStringBuffer(ctx, reply, strlen(reply)); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can we use RedisModule_ReplyWithError here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It also returns OK (0),
So I'd like to not do this & keep this PR as close to the original behavior as possible. |
||
return REDISMODULE_ERR; | ||
} | ||
} | ||
|
||
int TableAppend_DoPublish(RedisModuleCtx *ctx, | ||
RedisModuleString **argv, | ||
int /*argc*/) { | ||
RedisModuleString *pubsub_channel_str = argv[2]; | ||
RedisModuleString *id = argv[3]; | ||
RedisModuleString *data = argv[4]; | ||
// Publish a message on the requested pubsub channel if necessary. | ||
TablePubsub pubsub_channel = ParseTablePubsub(pubsub_channel_str); | ||
if (pubsub_channel != TablePubsub::NO_PUBLISH) { | ||
// All other pubsub channels write the data back directly onto the | ||
// channel. | ||
return PublishTableAdd(ctx, pubsub_channel_str, id, data); | ||
} else { | ||
return RedisModule_ReplyWithSimpleString(ctx, "OK"); | ||
} | ||
} | ||
|
||
/// Append an entry to the log stored at a key. Publishes a notification about | ||
/// the update to all subscribers, if a pubsub channel is provided. | ||
/// | ||
/// This is called from a client with the command: | ||
// | ||
/// RAY.TABLE_APPEND <table_prefix> <pubsub_channel> <id> <data> | ||
/// <index (optional)> | ||
/// | ||
/// \param table_prefix The prefix string for keys in this table. | ||
/// \param pubsub_channel The pubsub channel name that notifications for | ||
/// this key should be published to. When publishing to a specific | ||
/// client, the channel name should be <pubsub_channel>:<client_id>. | ||
/// \param id The ID of the key to append to. | ||
/// \param data The data to append to the key. | ||
/// \param index If this is set, then the data must be appended at this index. | ||
/// If the current log is shorter or longer than the requested index, | ||
/// then the append will fail and an error message will be returned as a | ||
/// string. | ||
/// \return OK if the append succeeds, or an error message string if the append | ||
/// fails. | ||
int TableAppend_RedisCommand(RedisModuleCtx *ctx, | ||
RedisModuleString **argv, | ||
int argc) { | ||
RedisModule_AutoMemory(ctx); | ||
const int status = TableAppend_DoWrite(ctx, argv, argc, | ||
/*mutated_key_str=*/nullptr); | ||
if (status) { | ||
return status; | ||
} | ||
return TableAppend_DoPublish(ctx, argv, argc); | ||
} | ||
|
||
#if RAY_USE_NEW_GCS | ||
int ChainTableAppend_RedisCommand(RedisModuleCtx *ctx, | ||
RedisModuleString **argv, | ||
int argc) { | ||
RedisModule_AutoMemory(ctx); | ||
return module.ChainReplicate(ctx, argv, argc, | ||
/*node_func=*/TableAppend_DoWrite, | ||
/*tail_func=*/TableAppend_DoPublish); | ||
} | ||
#endif | ||
|
||
/// A helper function to create and finish a GcsTableEntry, based on the | ||
/// current value or values at the given key. | ||
void TableEntryToFlatbuf(RedisModuleKey *table_key, | ||
|
@@ -1833,6 +1865,11 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, | |
0, 0) == REDISMODULE_ERR) { | ||
return REDISMODULE_ERR; | ||
} | ||
if (RedisModule_CreateCommand(ctx, "ray.chain.table_append", | ||
ChainTableAppend_RedisCommand, "write pubsub", | ||
0, 0, 0) == REDISMODULE_ERR) { | ||
return REDISMODULE_ERR; | ||
} | ||
#endif | ||
|
||
return REDISMODULE_OK; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We may need to be more careful about which functions (node vs. tail) should return a Redis reply and when, to make sure that every Redis command returns exactly one reply. I think it should be:
I think this means that for the master commit on credis, we are incorrectly handling the case where the node func does not error and the command is executed on a non-tail node. In this case, the node func does not return a reply and the tail func isn't run, so we won't be returning a Redis reply.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@stephanie-wang yes, bullets 1-3 are exactly what I have in mind.
For singleton chain case (default deployment), both
ActAsTail()
andActAsHead()
will be true. So if node func does not error, the tail func will be run, and hence we will exactly get 1 reply.For distributed, >1-chain case, the
ChainReplicate()
interface specifically designed for ray's redis module lacks this distributed support (i.e., propagating the update). (credis' nativeMemberPut_RedisCommand()/Put()
do handle propagation, of course.) . Since it's not handled right now we should make sure 1-node case is correct at the moment.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good, just something to keep in mind once we move to distributed.