-
Notifications
You must be signed in to change notification settings - Fork 6.8k
[xray] Put GCS data into the redis data shard #2298
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
0d12ff3
15794b1
408a349
fd19650
c6506fe
2f5f4fe
603821d
c54ac4c
788ad35
2b881be
10e3053
6d1ab02
3007103
573defc
3611113
aed0a02
2b379c1
a60a3b3
ad35b8d
3046fa9
780224f
f65a839
7415513
2a6ebaa
326b9e1
004d4b6
5e1a619
2932c4b
442f65b
098d5d9
8b094a7
2c2b226
7457a70
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -107,6 +107,7 @@ matrix: | |
env: | ||
- PYTHON=3.5 | ||
- RAY_USE_NEW_GCS=on | ||
- RAY_USE_XRAY=1 | ||
|
||
- os: linux | ||
dist: trusty | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,6 +2,8 @@ | |
|
||
#include <unistd.h> | ||
|
||
#include <sstream> | ||
|
||
extern "C" { | ||
#include "hiredis/adapters/ae.h" | ||
#include "hiredis/async.h" | ||
|
@@ -36,7 +38,7 @@ namespace gcs { | |
// asynchronous redis call. It dispatches the appropriate callback | ||
// that was registered with the RedisCallbackManager. | ||
void GlobalRedisCallback(void *c, void *r, void *privdata) { | ||
if (r == NULL) { | ||
if (r == nullptr) { | ||
return; | ||
} | ||
int64_t callback_index = reinterpret_cast<int64_t>(privdata); | ||
|
@@ -67,7 +69,7 @@ void GlobalRedisCallback(void *c, void *r, void *privdata) { | |
} | ||
|
||
void SubscribeRedisCallback(void *c, void *r, void *privdata) { | ||
if (r == NULL) { | ||
if (r == nullptr) { | ||
return; | ||
} | ||
int64_t callback_index = reinterpret_cast<int64_t>(privdata); | ||
|
@@ -133,7 +135,70 @@ RedisContext::~RedisContext() { | |
} | ||
} | ||
|
||
Status RedisContext::Connect(const std::string &address, int port) { | ||
static void GetRedisShards(redisContext *context, std::vector<std::string> *addresses, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There's some overlap with #2281. @heyucongtom can you take a look? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We are both doing a translation (from old ray get_redis_shards to new) here. I think the major difference is that There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The same thing is: we are writing the exactly same thing in getRedisShards function There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sounds right. This PR is actually really doing sharding. It's just creating a single extra shard (as opposed to an arbitrary number of additional shards). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So I am guessing that there is some overlap(having sent out a msg on Slack on this), in which #2281 focuses more on distributing workload for table’s operations. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There should be little or no conflicts, though. |
||
std::vector<int> *ports) { | ||
// Get the total number of Redis shards in the system. | ||
int num_attempts = 0; | ||
redisReply *reply = nullptr; | ||
while (num_attempts < RayConfig::instance().redis_db_connect_retries()) { | ||
// Try to read the number of Redis shards from the primary shard. If the | ||
// entry is present, exit. | ||
reply = reinterpret_cast<redisReply *>(redisCommand(context, "GET NumRedisShards")); | ||
if (reply->type != REDIS_REPLY_NIL) { | ||
break; | ||
} | ||
|
||
// Sleep for a little, and try again if the entry isn't there yet. */ | ||
freeReplyObject(reply); | ||
usleep(RayConfig::instance().redis_db_connect_wait_milliseconds() * 1000); | ||
num_attempts++; | ||
} | ||
RAY_CHECK(num_attempts < RayConfig::instance().redis_db_connect_retries()) | ||
<< "No entry found for NumRedisShards"; | ||
RAY_CHECK(reply->type == REDIS_REPLY_STRING) << "Expected string, found Redis type " | ||
<< reply->type << " for NumRedisShards"; | ||
int num_redis_shards = atoi(reply->str); | ||
RAY_CHECK(num_redis_shards >= 1) << "Expected at least one Redis shard, " | ||
<< "found " << num_redis_shards; | ||
freeReplyObject(reply); | ||
|
||
// Get the addresses of all of the Redis shards. | ||
num_attempts = 0; | ||
while (num_attempts < RayConfig::instance().redis_db_connect_retries()) { | ||
// Try to read the Redis shard locations from the primary shard. If we find | ||
// that all of them are present, exit. | ||
reply = | ||
reinterpret_cast<redisReply *>(redisCommand(context, "LRANGE RedisShards 0 -1")); | ||
if (static_cast<int>(reply->elements) == num_redis_shards) { | ||
break; | ||
} | ||
|
||
// Sleep for a little, and try again if not all Redis shard addresses have | ||
// been added yet. | ||
freeReplyObject(reply); | ||
usleep(RayConfig::instance().redis_db_connect_wait_milliseconds() * 1000); | ||
num_attempts++; | ||
} | ||
RAY_CHECK(num_attempts < RayConfig::instance().redis_db_connect_retries()) | ||
<< "Expected " << num_redis_shards << " Redis shard addresses, found " | ||
<< reply->elements; | ||
|
||
// Parse the Redis shard addresses. | ||
for (size_t i = 0; i < reply->elements; ++i) { | ||
// Parse the shard addresses and ports. | ||
RAY_CHECK(reply->element[i]->type == REDIS_REPLY_STRING); | ||
std::string addr; | ||
std::stringstream ss(reply->element[i]->str); | ||
getline(ss, addr, ':'); | ||
addresses->push_back(addr); | ||
int port; | ||
ss >> port; | ||
ports->push_back(port); | ||
} | ||
freeReplyObject(reply); | ||
} | ||
|
||
Status RedisContext::Connect(const std::string &address, int port, bool sharding) { | ||
int connection_attempts = 0; | ||
context_ = redisConnect(address.c_str(), port); | ||
while (context_ == nullptr || context_->err) { | ||
|
@@ -158,17 +223,31 @@ Status RedisContext::Connect(const std::string &address, int port) { | |
REDIS_CHECK_ERROR(context_, reply); | ||
freeReplyObject(reply); | ||
|
||
std::string redis_address; | ||
int redis_port; | ||
if (sharding) { | ||
// Get the redis data shard | ||
std::vector<std::string> addresses; | ||
std::vector<int> ports; | ||
GetRedisShards(context_, &addresses, &ports); | ||
redis_address = addresses[0]; | ||
redis_port = ports[0]; | ||
} else { | ||
redis_address = address; | ||
redis_port = port; | ||
} | ||
|
||
// Connect to async context | ||
async_context_ = redisAsyncConnect(address.c_str(), port); | ||
async_context_ = redisAsyncConnect(redis_address.c_str(), redis_port); | ||
if (async_context_ == nullptr || async_context_->err) { | ||
RAY_LOG(FATAL) << "Could not establish connection to redis " << address << ":" | ||
<< port; | ||
RAY_LOG(FATAL) << "Could not establish connection to redis " << redis_address << ":" | ||
<< redis_port; | ||
} | ||
// Connect to subscribe context | ||
subscribe_context_ = redisAsyncConnect(address.c_str(), port); | ||
subscribe_context_ = redisAsyncConnect(redis_address.c_str(), redis_port); | ||
if (subscribe_context_ == nullptr || subscribe_context_->err) { | ||
RAY_LOG(FATAL) << "Could not establish subscribe connection to redis " << address | ||
<< ":" << port; | ||
RAY_LOG(FATAL) << "Could not establish subscribe connection to redis " | ||
<< redis_address << ":" << redis_port; | ||
} | ||
return Status::OK(); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we actually need this flag? It's only false in the tests, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes it is only used in the tests, however to keep the tests lean and not having to start multiple shards, it's better to deactivate it there.