Skip to content

Commit

Permalink
[xray] Put GCS data into the redis data shard (ray-project#2298)
Browse files Browse the repository at this point in the history
  • Loading branch information
pcmoritz authored and robertnishihara committed Jul 1, 2018
1 parent d75b39f commit 762bdf6
Show file tree
Hide file tree
Showing 19 changed files with 154 additions and 62 deletions.
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ matrix:
env:
- PYTHON=3.5
- RAY_USE_NEW_GCS=on
- RAY_USE_XRAY=1

- os: linux
dist: trusty
Expand Down
16 changes: 8 additions & 8 deletions python/ray/experimental/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,9 +214,9 @@ def _object_table(self, object_id):

else:
# Use the raylet code path.
message = self.redis_client.execute_command(
"RAY.TABLE_LOOKUP", ray.gcs_utils.TablePrefix.OBJECT, "",
object_id.id())
message = self._execute_command(object_id, "RAY.TABLE_LOOKUP",
ray.gcs_utils.TablePrefix.OBJECT,
"", object_id.id())
result = []
gcs_entry = ray.gcs_utils.GcsTableEntry.GetRootAsGcsTableEntry(
message, 0)
Expand Down Expand Up @@ -263,7 +263,7 @@ def object_table(self, object_id=None):
for key in object_location_keys
])
else:
object_keys = self.redis_client.keys(
object_keys = self._keys(
ray.gcs_utils.TablePrefix_OBJECT_string + "*")
object_ids_binary = {
key[len(ray.gcs_utils.TablePrefix_OBJECT_string):]
Expand Down Expand Up @@ -346,9 +346,9 @@ def _task_table(self, task_id):

else:
# Use the raylet code path.
message = self.redis_client.execute_command(
"RAY.TABLE_LOOKUP", ray.gcs_utils.TablePrefix.RAYLET_TASK, "",
task_id.id())
message = self._execute_command(
task_id, "RAY.TABLE_LOOKUP",
ray.gcs_utils.TablePrefix.RAYLET_TASK, "", task_id.id())
gcs_entries = ray.gcs_utils.GcsTableEntry.GetRootAsGcsTableEntry(
message, 0)

Expand Down Expand Up @@ -416,7 +416,7 @@ def task_table(self, task_id=None):
for key in task_table_keys
]
else:
task_table_keys = self.redis_client.keys(
task_table_keys = self._keys(
ray.gcs_utils.TablePrefix_RAYLET_TASK_string + "*")
task_ids_binary = [
key[len(ray.gcs_utils.TablePrefix_RAYLET_TASK_string):]
Expand Down
20 changes: 6 additions & 14 deletions python/ray/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -470,10 +470,7 @@ def start_redis(node_ip_address,
# It is important to load the credis module BEFORE the ray module,
# as the latter contains an extern declaration that the former
# supplies.
# NOTE: once data entries are all put under the redis shard(s)
# instead of the primary server when RAY_USE_NEW_GCS is set, we
# should load CREDIS_MASTER_MODULE here.
modules=[CREDIS_MEMBER_MODULE, REDIS_MODULE])
modules=[CREDIS_MASTER_MODULE, REDIS_MODULE])
if port is not None:
assert assigned_port == port
port = assigned_port
Expand Down Expand Up @@ -526,10 +523,7 @@ def start_redis(node_ip_address,
# It is important to load the credis module BEFORE the ray
# module, as the latter contains an extern declaration that the
# former supplies.
# NOTE: once data entries are all put under the redis shard(s)
# instead of the primary server when RAY_USE_NEW_GCS is set, we
# should load CREDIS_MEMBER_MODULE here.
modules=[CREDIS_MASTER_MODULE, REDIS_MODULE])
modules=[CREDIS_MEMBER_MODULE, REDIS_MODULE])

if redis_shard_ports[i] is not None:
assert redis_shard_port == redis_shard_ports[i]
Expand All @@ -542,12 +536,10 @@ def start_redis(node_ip_address,
shard_client = redis.StrictRedis(
host=node_ip_address, port=redis_shard_port)
# Configure the chain state.
# NOTE: once data entries are all put under the redis shard(s) instead
# of the primary server when RAY_USE_NEW_GCS is set, we should swap the
# callers here.
shard_client.execute_command("MASTER.ADD", node_ip_address, port)
primary_redis_client.execute_command("MEMBER.CONNECT_TO_MASTER",
node_ip_address, redis_shard_port)
primary_redis_client.execute_command("MASTER.ADD", node_ip_address,
redis_shard_port)
shard_client.execute_command("MEMBER.CONNECT_TO_MASTER",
node_ip_address, port)

return redis_address, redis_shards

Expand Down
7 changes: 3 additions & 4 deletions python/ray/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -2288,10 +2288,9 @@ def connect(info,
driver_task.execution_dependencies_string(), 0,
ray.local_scheduler.task_to_string(driver_task))
else:
# TODO(rkn): When we shard the GCS in xray, we will need to change
# this to use _execute_command.
global_state.redis_client.execute_command(
"RAY.TABLE_ADD", ray.gcs_utils.TablePrefix.RAYLET_TASK,
global_state._execute_command(
driver_task.task_id(), "RAY.TABLE_ADD",
ray.gcs_utils.TablePrefix.RAYLET_TASK,
ray.gcs_utils.TablePubsub.RAYLET_TASK,
driver_task.task_id().id(),
driver_task._serialized_raylet_task())
Expand Down
5 changes: 3 additions & 2 deletions src/global_scheduler/global_scheduler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -141,9 +141,10 @@ GlobalSchedulerState *GlobalSchedulerState_init(event_loop *loop,
std::vector<std::string>());
db_attach(state->db, loop, false);

RAY_CHECK_OK(state->gcs_client.Connect(std::string(redis_primary_addr),
redis_primary_port));
RAY_CHECK_OK(state->gcs_client.Connect(
std::string(redis_primary_addr), redis_primary_port, /*sharding=*/true));
RAY_CHECK_OK(state->gcs_client.context()->AttachToEventLoop(loop));
RAY_CHECK_OK(state->gcs_client.primary_context()->AttachToEventLoop(loop));
state->policy_state = GlobalSchedulerPolicyState_init();
return state;
}
Expand Down
3 changes: 2 additions & 1 deletion src/local_scheduler/local_scheduler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -358,8 +358,9 @@ LocalSchedulerState *LocalSchedulerState_init(
db_attach(state->db, loop, false);

RAY_CHECK_OK(state->gcs_client.Connect(std::string(redis_primary_addr),
redis_primary_port));
redis_primary_port, true));
RAY_CHECK_OK(state->gcs_client.context()->AttachToEventLoop(loop));
RAY_CHECK_OK(state->gcs_client.primary_context()->AttachToEventLoop(loop));
} else {
state->db = NULL;
}
Expand Down
5 changes: 4 additions & 1 deletion src/plasma/plasma_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -487,8 +487,11 @@ PlasmaManagerState *PlasmaManagerState_init(const char *store_socket_name,
db_attach(state->db, state->loop, false);

RAY_CHECK_OK(state->gcs_client.Connect(std::string(redis_primary_addr),
redis_primary_port));
redis_primary_port,
/*sharding=*/true));
RAY_CHECK_OK(state->gcs_client.context()->AttachToEventLoop(state->loop));
RAY_CHECK_OK(
state->gcs_client.primary_context()->AttachToEventLoop(state->loop));
} else {
state->db = NULL;
RAY_LOG(DEBUG) << "No db connection specified";
Expand Down
14 changes: 10 additions & 4 deletions src/ray/gcs/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,15 @@ namespace gcs {

AsyncGcsClient::AsyncGcsClient(const ClientID &client_id, CommandType command_type) {
context_ = std::make_shared<RedisContext>();
client_table_.reset(new ClientTable(context_, this, client_id));
primary_context_ = std::make_shared<RedisContext>();
client_table_.reset(new ClientTable(primary_context_, this, client_id));
object_table_.reset(new ObjectTable(context_, this));
actor_table_.reset(new ActorTable(context_, this));
task_table_.reset(new TaskTable(context_, this, command_type));
raylet_task_table_.reset(new raylet::TaskTable(context_, this, command_type));
task_reconstruction_log_.reset(new TaskReconstructionLog(context_, this));
heartbeat_table_.reset(new HeartbeatTable(context_, this));
error_table_.reset(new ErrorTable(context_, this));
error_table_.reset(new ErrorTable(primary_context_, this));
command_type_ = command_type;
}

Expand All @@ -34,8 +35,9 @@ AsyncGcsClient::AsyncGcsClient(CommandType command_type)

AsyncGcsClient::AsyncGcsClient() : AsyncGcsClient(ClientID::from_random()) {}

Status AsyncGcsClient::Connect(const std::string &address, int port) {
RAY_RETURN_NOT_OK(context_->Connect(address, port));
Status AsyncGcsClient::Connect(const std::string &address, int port, bool sharding) {
RAY_RETURN_NOT_OK(context_->Connect(address, port, sharding));
RAY_RETURN_NOT_OK(primary_context_->Connect(address, port, /*sharding=*/false));
// TODO(swang): Call the client table's Connect() method here. To do this,
// we need to make sure that we are attached to an event loop first. This
// currently isn't possible because the aeEventLoop, which we use for
Expand All @@ -53,6 +55,10 @@ Status AsyncGcsClient::Attach(boost::asio::io_service &io_service) {
asio_async_client_.reset(new RedisAsioClient(io_service, context_->async_context()));
asio_subscribe_client_.reset(
new RedisAsioClient(io_service, context_->subscribe_context()));
asio_async_auxiliary_client_.reset(
new RedisAsioClient(io_service, primary_context_->async_context()));
asio_subscribe_auxiliary_client_.reset(
new RedisAsioClient(io_service, primary_context_->subscribe_context()));
return Status::OK();
}

Expand Down
10 changes: 8 additions & 2 deletions src/ray/gcs/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,9 @@ class RAY_EXPORT AsyncGcsClient {
///
/// \param address The GCS IP address.
/// \param port The GCS port.
/// \param sharding If true, use sharded redis for the GCS.
/// \return Status.
Status Connect(const std::string &address, int port);
Status Connect(const std::string &address, int port, bool sharding);
/// Attach this client to a plasma event loop. Note that only
/// one event loop should be attached at a time.
Status Attach(plasma::EventLoop &event_loop);
Expand Down Expand Up @@ -68,6 +69,7 @@ class RAY_EXPORT AsyncGcsClient {
const GetExportCallback &done_callback);

std::shared_ptr<RedisContext> context() { return context_; }
std::shared_ptr<RedisContext> primary_context() { return primary_context_; }

private:
std::unique_ptr<FunctionTable> function_table_;
Expand All @@ -80,10 +82,14 @@ class RAY_EXPORT AsyncGcsClient {
std::unique_ptr<HeartbeatTable> heartbeat_table_;
std::unique_ptr<ErrorTable> error_table_;
std::unique_ptr<ClientTable> client_table_;
// The following contexts write to the data shard
std::shared_ptr<RedisContext> context_;
std::unique_ptr<RedisAsioClient> asio_async_client_;
std::unique_ptr<RedisAsioClient> asio_subscribe_client_;

// The following context writes everything to the primary shard
std::shared_ptr<RedisContext> primary_context_;
std::unique_ptr<RedisAsioClient> asio_async_auxiliary_client_;
std::unique_ptr<RedisAsioClient> asio_subscribe_auxiliary_client_;
CommandType command_type_;
};

Expand Down
2 changes: 1 addition & 1 deletion src/ray/gcs/client_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class TestGcs : public ::testing::Test {
public:
TestGcs(CommandType command_type) : num_callbacks_(0), command_type_(command_type) {
client_ = std::make_shared<gcs::AsyncGcsClient>(command_type_);
RAY_CHECK_OK(client_->Connect("127.0.0.1", 6379));
RAY_CHECK_OK(client_->Connect("127.0.0.1", 6379, /*sharding=*/false));

job_id_ = JobID::from_random();
}
Expand Down
97 changes: 88 additions & 9 deletions src/ray/gcs/redis_context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

#include <unistd.h>

#include <sstream>

extern "C" {
#include "hiredis/adapters/ae.h"
#include "hiredis/async.h"
Expand Down Expand Up @@ -36,7 +38,7 @@ namespace gcs {
// asynchronous redis call. It dispatches the appropriate callback
// that was registered with the RedisCallbackManager.
void GlobalRedisCallback(void *c, void *r, void *privdata) {
if (r == NULL) {
if (r == nullptr) {
return;
}
int64_t callback_index = reinterpret_cast<int64_t>(privdata);
Expand Down Expand Up @@ -67,7 +69,7 @@ void GlobalRedisCallback(void *c, void *r, void *privdata) {
}

void SubscribeRedisCallback(void *c, void *r, void *privdata) {
if (r == NULL) {
if (r == nullptr) {
return;
}
int64_t callback_index = reinterpret_cast<int64_t>(privdata);
Expand Down Expand Up @@ -133,7 +135,70 @@ RedisContext::~RedisContext() {
}
}

Status RedisContext::Connect(const std::string &address, int port) {
static void GetRedisShards(redisContext *context, std::vector<std::string> *addresses,
std::vector<int> *ports) {
// Get the total number of Redis shards in the system.
int num_attempts = 0;
redisReply *reply = nullptr;
while (num_attempts < RayConfig::instance().redis_db_connect_retries()) {
// Try to read the number of Redis shards from the primary shard. If the
// entry is present, exit.
reply = reinterpret_cast<redisReply *>(redisCommand(context, "GET NumRedisShards"));
if (reply->type != REDIS_REPLY_NIL) {
break;
}

// Sleep for a little, and try again if the entry isn't there yet. */
freeReplyObject(reply);
usleep(RayConfig::instance().redis_db_connect_wait_milliseconds() * 1000);
num_attempts++;
}
RAY_CHECK(num_attempts < RayConfig::instance().redis_db_connect_retries())
<< "No entry found for NumRedisShards";
RAY_CHECK(reply->type == REDIS_REPLY_STRING) << "Expected string, found Redis type "
<< reply->type << " for NumRedisShards";
int num_redis_shards = atoi(reply->str);
RAY_CHECK(num_redis_shards >= 1) << "Expected at least one Redis shard, "
<< "found " << num_redis_shards;
freeReplyObject(reply);

// Get the addresses of all of the Redis shards.
num_attempts = 0;
while (num_attempts < RayConfig::instance().redis_db_connect_retries()) {
// Try to read the Redis shard locations from the primary shard. If we find
// that all of them are present, exit.
reply =
reinterpret_cast<redisReply *>(redisCommand(context, "LRANGE RedisShards 0 -1"));
if (static_cast<int>(reply->elements) == num_redis_shards) {
break;
}

// Sleep for a little, and try again if not all Redis shard addresses have
// been added yet.
freeReplyObject(reply);
usleep(RayConfig::instance().redis_db_connect_wait_milliseconds() * 1000);
num_attempts++;
}
RAY_CHECK(num_attempts < RayConfig::instance().redis_db_connect_retries())
<< "Expected " << num_redis_shards << " Redis shard addresses, found "
<< reply->elements;

// Parse the Redis shard addresses.
for (size_t i = 0; i < reply->elements; ++i) {
// Parse the shard addresses and ports.
RAY_CHECK(reply->element[i]->type == REDIS_REPLY_STRING);
std::string addr;
std::stringstream ss(reply->element[i]->str);
getline(ss, addr, ':');
addresses->push_back(addr);
int port;
ss >> port;
ports->push_back(port);
}
freeReplyObject(reply);
}

Status RedisContext::Connect(const std::string &address, int port, bool sharding) {
int connection_attempts = 0;
context_ = redisConnect(address.c_str(), port);
while (context_ == nullptr || context_->err) {
Expand All @@ -158,17 +223,31 @@ Status RedisContext::Connect(const std::string &address, int port) {
REDIS_CHECK_ERROR(context_, reply);
freeReplyObject(reply);

std::string redis_address;
int redis_port;
if (sharding) {
// Get the redis data shard
std::vector<std::string> addresses;
std::vector<int> ports;
GetRedisShards(context_, &addresses, &ports);
redis_address = addresses[0];
redis_port = ports[0];
} else {
redis_address = address;
redis_port = port;
}

// Connect to async context
async_context_ = redisAsyncConnect(address.c_str(), port);
async_context_ = redisAsyncConnect(redis_address.c_str(), redis_port);
if (async_context_ == nullptr || async_context_->err) {
RAY_LOG(FATAL) << "Could not establish connection to redis " << address << ":"
<< port;
RAY_LOG(FATAL) << "Could not establish connection to redis " << redis_address << ":"
<< redis_port;
}
// Connect to subscribe context
subscribe_context_ = redisAsyncConnect(address.c_str(), port);
subscribe_context_ = redisAsyncConnect(redis_address.c_str(), redis_port);
if (subscribe_context_ == nullptr || subscribe_context_->err) {
RAY_LOG(FATAL) << "Could not establish subscribe connection to redis " << address
<< ":" << port;
RAY_LOG(FATAL) << "Could not establish subscribe connection to redis "
<< redis_address << ":" << redis_port;
}
return Status::OK();
}
Expand Down
2 changes: 1 addition & 1 deletion src/ray/gcs/redis_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class RedisContext {
RedisContext()
: context_(nullptr), async_context_(nullptr), subscribe_context_(nullptr) {}
~RedisContext();
Status Connect(const std::string &address, int port);
Status Connect(const std::string &address, int port, bool sharding);
Status AttachToEventLoop(aeEventLoop *loop);

/// Run an operation on some table key.
Expand Down
2 changes: 1 addition & 1 deletion src/ray/object_manager/test/object_manager_stress_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class MockServer {

private:
ray::Status RegisterGcs(boost::asio::io_service &io_service) {
RAY_RETURN_NOT_OK(gcs_client_->Connect("127.0.0.1", 6379));
RAY_RETURN_NOT_OK(gcs_client_->Connect("127.0.0.1", 6379, /*sharding=*/false));
RAY_RETURN_NOT_OK(gcs_client_->Attach(io_service));

boost::asio::ip::tcp::endpoint endpoint = object_manager_acceptor_.local_endpoint();
Expand Down
2 changes: 1 addition & 1 deletion src/ray/object_manager/test/object_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class MockServer {

private:
ray::Status RegisterGcs(boost::asio::io_service &io_service) {
RAY_RETURN_NOT_OK(gcs_client_->Connect("127.0.0.1", 6379));
RAY_RETURN_NOT_OK(gcs_client_->Connect("127.0.0.1", 6379, /*sharding=*/false));
RAY_RETURN_NOT_OK(gcs_client_->Attach(io_service));

boost::asio::ip::tcp::endpoint endpoint = object_manager_acceptor_.local_endpoint();
Expand Down
Loading

0 comments on commit 762bdf6

Please sign in to comment.