-
Notifications
You must be signed in to change notification settings - Fork 6.8k
Make xray object table credis-managed and hence flushable. #2338
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Test FAILed. |
Test FAILed. |
8c17209
to
0d82c5d
Compare
This reverts commit 628c2ea.
0d82c5d
to
2403fe6
Compare
Test FAILed. |
if self.issue_gcs_flushes: | ||
# For now, we take the primary redis server to issue flushes, | ||
# because task table entries are stored there under this flag. | ||
# Data is stored under the first data shard, so we issue flushes to |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's add a note or todo here that this needs to change when we implement sharding
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done, added a preventive check
python/ray/monitor.py
Outdated
# Data is stored under the first data shard, so we issue flushes to | ||
# that redis server. | ||
addr_port = self.redis.lrange("RedisShards", 0, -1)[0] | ||
addr_port = addr_port.split(b":") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
addr, port = addr_port.split(b":") would be slightly cleaner but up to you
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ack
src/ray/gcs/tables.cc
Outdated
return context_->RunAsync("RAY.TABLE_APPEND", id, fbb.GetBufferPointer(), fbb.GetSize(), | ||
prefix_, pubsub_channel_, std::move(callback)); | ||
|
||
std::string command; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have this code at enough places now that it warrants a little helper function I think
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
src/ray/gcs/tables.cc
Outdated
return context_->RunAsync("RAY.TABLE_APPEND", id, fbb.GetBufferPointer(), fbb.GetSize(), | ||
prefix_, pubsub_channel_, std::move(callback), log_length); | ||
|
||
std::string command; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
similar to the above
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
const char *reply = "ERR entry exists"; | ||
return RedisModule_ReplyWithStringBuffer(ctx, reply, strlen(reply)); | ||
static const char *reply = "ERR entry exists"; | ||
RedisModule_ReplyWithStringBuffer(ctx, reply, strlen(reply)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we use RedisModule_ReplyWithError here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It also returns OK (0),
* The function always returns REDISMODULE_OK.
*/
int RM_ReplyWithError(RedisModuleCtx *ctx, const char *err) {
return replyWithStatus(ctx,err,"-");
}
So I'd like to not do this & keep this PR as close to the original behavior as possible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good modulo small fixes (see comments)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left a few comments. Overall, looks great!
python/ray/monitor.py
Outdated
host=addr_port[0], port=addr_port[1]) | ||
try: | ||
self.redis.execute_command("HEAD.FLUSH 0") | ||
self.redis_shard.execute_command("HEAD.FLUSH 0") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why was this change necessary? Is self.redis
pointing to the wrong server?
Also, is this code run every time a flush is required? If so, we should try to avoid running the self.redis.lrange
command every time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
-
It's necessary because before, in RAY_USE_NEW_GCS, the task table is placed in primary and the object table in data shard. I implemented flushing for task table first, hence this code here is issuing flushes to primary. @pcmoritz's PR on Saturday fixed that issue.
-
No, this code is only run once at Monitor's start. It's used to poke whether the loaded modules contain support for flushing.
} else { | ||
return RedisModule_ReplyWithSimpleString(ctx, "OK"); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you need to reply with a Redis error using RedisModule_ReplyWithError
here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where? If you mean RAY_CHECK(flags == REDISMODULE_ZADD_ADDED) << "Appended a duplicate entry";
then I think no since (1) it will crash (2) the existing behavior is like so too?
RedisModule_ReplyWithStringBuffer(ctx, reply, strlen(reply)); | ||
return REDISMODULE_ERR; | ||
} | ||
return REDISMODULE_OK; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move this into the if
block above? Since it's only reachable by that branch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done, good call
#include "ray/gcs/client.h" | ||
|
||
namespace { | ||
static const std::string kTableAppendCommand = "RAY.TABLE_APPEND"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we also move the TABLE_ADD
commands here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not really since that code is not duplicated. Also I don't see a good way to abstract that piece across append and add, without doing costly / unwieldy string concatenation. Here the strings are compile-time constants.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, I just meant to also define static const std::string
s for the TABLE_ADD
commands.
I'm fine with not trying to abstract it out right now, but in the future we could still define the strings statically while doing the concatenation in the definition. As we add more commands, it would be nice to have a static table of commands that we could lookup as part of the helper function that you added in this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
Test FAILed. |
Addressed comments, PTAL, thanks! |
Test FAILed. |
Test PASSed. |
#include "ray/gcs/client.h" | ||
|
||
namespace { | ||
static const std::string kTableAppendCommand = "RAY.TABLE_APPEND"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, I just meant to also define static const std::string
s for the TABLE_ADD
commands.
I'm fine with not trying to abstract it out right now, but in the future we could still define the strings statically while doing the concatenation in the definition. As we add more commands, it would be nice to have a static table of commands that we could lookup as part of the helper function that you added in this PR.
} else { | ||
return RedisModule_ReplyWithSimpleString(ctx, "OK"); | ||
} | ||
return REDISMODULE_OK; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We may need to be more careful about which functions (node vs. tail) should return a Redis reply and when, to make sure that every Redis command returns exactly one reply. I think it should be:
- If node func errors, then it should return an error reply.
- If node func does not error, then it should not return anything.
- Tail func should always return a Redis reply.
I think this means that for the master commit on credis, we are incorrectly handling the case where the node func does not error and the command is executed on a non-tail node. In this case, the node func does not return a reply and the tail func isn't run, so we won't be returning a Redis reply.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@stephanie-wang yes, bullets 1-3 are exactly what I have in mind.
For singleton chain case (default deployment), both ActAsTail()
and ActAsHead()
will be true. So if node func does not error, the tail func will be run, and hence we will exactly get 1 reply.
For distributed, >1-chain case, the ChainReplicate()
interface specifically designed for ray's redis module lacks this distributed support (i.e., propagating the update). (credis' native MemberPut_RedisCommand()/Put()
do handle propagation, of course.) . Since it's not handled right now we should make sure 1-node case is correct at the moment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good, just something to keep in mind once we move to distributed.
Travis |
Test PASSed. |
@concretevitamin This also happens on the current master so is unrelated to this PR. I'm investigating it. |
This is a followup to #2266 -- that PR laid down the flushing policy framework + made task table flushable. This PR in addition makes xray's task table flushable.
Testing program: submitting N noops in a loop, monitor memory usage of the redis data shard. Built and ran with both
RAY_USE_NEW_GCS
andRAY_USE_XRAY
.