-
Notifications
You must be signed in to change notification settings - Fork 6.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[xray] Put GCS data into the redis data shard #2298
Conversation
src/ray/gcs/redis_context.cc
Outdated
freeReplyObject(reply); | ||
usleep(RayConfig::instance().redis_db_connect_wait_milliseconds() * 1000); | ||
num_attempts++; | ||
continue; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove continue; it is useless here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
Test FAILed. |
src/ray/gcs/redis_context.cc
Outdated
while (num_attempts < RayConfig::instance().redis_db_connect_retries()) { | ||
// Try to read the number of Redis shards from the primary shard. If the | ||
// entry is present, exit. | ||
reply = reinterpret_cast<redisReply *>(redisCommand(context, "GET NumRedisShards")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a function std::unique_ptr< redisReply, std::function<void(redisReply*)>> ExecutionCommand(context, const char* command) {
return std::unique_ptr< redisReply, std::function<void(redisReply*)>>(reinterpret_cast<redisReply *>(redisCommand(context, command)), [](redisReply *reply) {freeReplyObject(reply);});
}
to avoid manually call freeReplyObject(reply);
src/ray/gcs/redis_context.cc
Outdated
@@ -133,6 +135,71 @@ RedisContext::~RedisContext() { | |||
} | |||
} | |||
|
|||
void GetRedisShards(redisContext *context, std::vector<std::string> *addresses, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
static void GetRedisShards to avoid polluted global namespace.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
Test FAILed. |
Test PASSed. |
src/ray/gcs/client.h
Outdated
@@ -37,7 +37,7 @@ class RAY_EXPORT AsyncGcsClient { | |||
/// \param address The GCS IP address. | |||
/// \param port The GCS port. | |||
/// \return Status. | |||
Status Connect(const std::string &address, int port); | |||
Status Connect(const std::string &address, int port, bool sharding); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
document the argument
@@ -133,7 +135,71 @@ RedisContext::~RedisContext() { | |||
} | |||
} | |||
|
|||
Status RedisContext::Connect(const std::string &address, int port) { | |||
static void GetRedisShards(redisContext *context, std::vector<std::string> *addresses, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's some overlap with #2281. @heyucongtom can you take a look?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are both doing a translation (from old ray get_redis_shards to new) here. I think the major difference is that
[+] #2298 add an use_sharding bool to indicate if we are using sharding.
[-] #2281 defaultly using all shards.
[-] #2281 modifies table.cc so that when tables do the work, they randomly select a redis shard by the ID.
[-] #2281 attaches all contexts to event loop, while only connecting the main context (I guess it may be wrong: since tests are giving timeouts.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The same thing is: we are writing the exactly same thing in getRedisShards function
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds right. This PR is actually really doing sharding. It's just creating a single extra shard (as opposed to an arbitrary number of additional shards).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So I am guessing that there is some overlap(having sent out a msg on Slack on this), in which #2281 focuses more on distributing workload for table’s operations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There should be little or no conflicts, though.
src/ray/gcs/redis_context.cc
Outdated
@@ -133,7 +135,71 @@ RedisContext::~RedisContext() { | |||
} | |||
} | |||
|
|||
Status RedisContext::Connect(const std::string &address, int port) { | |||
static void GetRedisShards(redisContext *context, std::vector<std::string> *addresses, | |||
std::vector<int> *ports) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cleaner to pass in as std::vector<int> &ports
I think.
src/ray/gcs/redis_context.cc
Outdated
std::vector<int> *ports) { | ||
// Get the total number of Redis shards in the system. | ||
int num_attempts = 0; | ||
redisReply *reply = NULL; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nullptr
Test FAILed. |
Test FAILed. |
Test FAILed. |
Test PASSed. |
Test PASSed. |
Test FAILed. |
Test PASSed. |
Test PASSed. |
Test PASSed. |
Test PASSed. |
Test FAILed. |
In the latest travis build for this PR, Build #751, I do not see the quoted error. Just restarted that build let's see what's going on. |
@@ -142,8 +142,9 @@ GlobalSchedulerState *GlobalSchedulerState_init(event_loop *loop, | |||
db_attach(state->db, loop, false); | |||
|
|||
RAY_CHECK_OK(state->gcs_client.Connect(std::string(redis_primary_addr), | |||
redis_primary_port)); | |||
redis_primary_port, true)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In all C++ files in this PR, please use /*sharding=*/true
(or false, correspondingly).
src/ray/gcs/client.h
Outdated
std::shared_ptr<RedisContext> context_; | ||
std::unique_ptr<RedisAsioClient> asio_async_client_; | ||
std::unique_ptr<RedisAsioClient> asio_subscribe_client_; | ||
|
||
// The following context writes everything to the primary shard | ||
std::shared_ptr<RedisContext> auxiliary_context_; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason to call this auxiliary
instead of primary
?
Test PASSed. |
66d6b85
to
d4697de
Compare
d4697de
to
442f65b
Compare
Test FAILed. |
Test PASSed. |
Test PASSed. |
Test PASSed. |
Test FAILed. |
Test PASSed. |
Test PASSed. |
Basically a re-implementation of #2281, with modifications of #2298 (A fix of #2334, for rebasing issues.). [+] Implement sharding for gcs tables. [+] Keep ClientTable and ErrorTable managed by the primary_shard. TaskTable is managed by the primary_shard for now, until a good hashing for tasks is implemented. [+] Move AsyncGcsClient's initialization into Connect function. [-] Move GetRedisShard and bool sharding from RedisContext's connect into AsyncGcsClient. This may make the interface cleaner.
This moves the data for the new GCS into a separate data shard. It also changes the RAY_NEW_GCS codepath to use XRay (the codepath was only there for development until XRay is ready).