Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ matrix:
env:
- PYTHON=3.5
- RAY_USE_NEW_GCS=on
- RAY_USE_XRAY=1

- os: linux
dist: trusty
Expand Down
16 changes: 8 additions & 8 deletions python/ray/experimental/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,9 +214,9 @@ def _object_table(self, object_id):

else:
# Use the raylet code path.
message = self.redis_client.execute_command(
"RAY.TABLE_LOOKUP", ray.gcs_utils.TablePrefix.OBJECT, "",
object_id.id())
message = self._execute_command(object_id, "RAY.TABLE_LOOKUP",
ray.gcs_utils.TablePrefix.OBJECT,
"", object_id.id())
result = []
gcs_entry = ray.gcs_utils.GcsTableEntry.GetRootAsGcsTableEntry(
message, 0)
Expand Down Expand Up @@ -263,7 +263,7 @@ def object_table(self, object_id=None):
for key in object_location_keys
])
else:
object_keys = self.redis_client.keys(
object_keys = self._keys(
ray.gcs_utils.TablePrefix_OBJECT_string + "*")
object_ids_binary = {
key[len(ray.gcs_utils.TablePrefix_OBJECT_string):]
Expand Down Expand Up @@ -346,9 +346,9 @@ def _task_table(self, task_id):

else:
# Use the raylet code path.
message = self.redis_client.execute_command(
"RAY.TABLE_LOOKUP", ray.gcs_utils.TablePrefix.RAYLET_TASK, "",
task_id.id())
message = self._execute_command(
task_id, "RAY.TABLE_LOOKUP",
ray.gcs_utils.TablePrefix.RAYLET_TASK, "", task_id.id())
gcs_entries = ray.gcs_utils.GcsTableEntry.GetRootAsGcsTableEntry(
message, 0)

Expand Down Expand Up @@ -416,7 +416,7 @@ def task_table(self, task_id=None):
for key in task_table_keys
]
else:
task_table_keys = self.redis_client.keys(
task_table_keys = self._keys(
ray.gcs_utils.TablePrefix_RAYLET_TASK_string + "*")
task_ids_binary = [
key[len(ray.gcs_utils.TablePrefix_RAYLET_TASK_string):]
Expand Down
20 changes: 6 additions & 14 deletions python/ray/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -470,10 +470,7 @@ def start_redis(node_ip_address,
# It is important to load the credis module BEFORE the ray module,
# as the latter contains an extern declaration that the former
# supplies.
# NOTE: once data entries are all put under the redis shard(s)
# instead of the primary server when RAY_USE_NEW_GCS is set, we
# should load CREDIS_MASTER_MODULE here.
modules=[CREDIS_MEMBER_MODULE, REDIS_MODULE])
modules=[CREDIS_MASTER_MODULE, REDIS_MODULE])
if port is not None:
assert assigned_port == port
port = assigned_port
Expand Down Expand Up @@ -526,10 +523,7 @@ def start_redis(node_ip_address,
# It is important to load the credis module BEFORE the ray
# module, as the latter contains an extern declaration that the
# former supplies.
# NOTE: once data entries are all put under the redis shard(s)
# instead of the primary server when RAY_USE_NEW_GCS is set, we
# should load CREDIS_MEMBER_MODULE here.
modules=[CREDIS_MASTER_MODULE, REDIS_MODULE])
modules=[CREDIS_MEMBER_MODULE, REDIS_MODULE])

if redis_shard_ports[i] is not None:
assert redis_shard_port == redis_shard_ports[i]
Expand All @@ -542,12 +536,10 @@ def start_redis(node_ip_address,
shard_client = redis.StrictRedis(
host=node_ip_address, port=redis_shard_port)
# Configure the chain state.
# NOTE: once data entries are all put under the redis shard(s) instead
# of the primary server when RAY_USE_NEW_GCS is set, we should swap the
# callers here.
shard_client.execute_command("MASTER.ADD", node_ip_address, port)
primary_redis_client.execute_command("MEMBER.CONNECT_TO_MASTER",
node_ip_address, redis_shard_port)
primary_redis_client.execute_command("MASTER.ADD", node_ip_address,
redis_shard_port)
shard_client.execute_command("MEMBER.CONNECT_TO_MASTER",
node_ip_address, port)

return redis_address, redis_shards

Expand Down
7 changes: 3 additions & 4 deletions python/ray/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -2305,10 +2305,9 @@ def connect(info,
driver_task.execution_dependencies_string(), 0,
ray.local_scheduler.task_to_string(driver_task))
else:
# TODO(rkn): When we shard the GCS in xray, we will need to change
# this to use _execute_command.
global_state.redis_client.execute_command(
"RAY.TABLE_ADD", ray.gcs_utils.TablePrefix.RAYLET_TASK,
global_state._execute_command(
driver_task.task_id(), "RAY.TABLE_ADD",
ray.gcs_utils.TablePrefix.RAYLET_TASK,
ray.gcs_utils.TablePubsub.RAYLET_TASK,
driver_task.task_id().id(),
driver_task._serialized_raylet_task())
Expand Down
5 changes: 3 additions & 2 deletions src/global_scheduler/global_scheduler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -141,9 +141,10 @@ GlobalSchedulerState *GlobalSchedulerState_init(event_loop *loop,
std::vector<std::string>());
db_attach(state->db, loop, false);

RAY_CHECK_OK(state->gcs_client.Connect(std::string(redis_primary_addr),
redis_primary_port));
RAY_CHECK_OK(state->gcs_client.Connect(
std::string(redis_primary_addr), redis_primary_port, /*sharding=*/true));
RAY_CHECK_OK(state->gcs_client.context()->AttachToEventLoop(loop));
RAY_CHECK_OK(state->gcs_client.primary_context()->AttachToEventLoop(loop));
state->policy_state = GlobalSchedulerPolicyState_init();
return state;
}
Expand Down
3 changes: 2 additions & 1 deletion src/local_scheduler/local_scheduler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -358,8 +358,9 @@ LocalSchedulerState *LocalSchedulerState_init(
db_attach(state->db, loop, false);

RAY_CHECK_OK(state->gcs_client.Connect(std::string(redis_primary_addr),
redis_primary_port));
redis_primary_port, true));
RAY_CHECK_OK(state->gcs_client.context()->AttachToEventLoop(loop));
RAY_CHECK_OK(state->gcs_client.primary_context()->AttachToEventLoop(loop));
} else {
state->db = NULL;
}
Expand Down
5 changes: 4 additions & 1 deletion src/plasma/plasma_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -487,8 +487,11 @@ PlasmaManagerState *PlasmaManagerState_init(const char *store_socket_name,
db_attach(state->db, state->loop, false);

RAY_CHECK_OK(state->gcs_client.Connect(std::string(redis_primary_addr),
redis_primary_port));
redis_primary_port,
/*sharding=*/true));
RAY_CHECK_OK(state->gcs_client.context()->AttachToEventLoop(state->loop));
RAY_CHECK_OK(
state->gcs_client.primary_context()->AttachToEventLoop(state->loop));
} else {
state->db = NULL;
RAY_LOG(DEBUG) << "No db connection specified";
Expand Down
14 changes: 10 additions & 4 deletions src/ray/gcs/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,15 @@ namespace gcs {

AsyncGcsClient::AsyncGcsClient(const ClientID &client_id, CommandType command_type) {
context_ = std::make_shared<RedisContext>();
client_table_.reset(new ClientTable(context_, this, client_id));
primary_context_ = std::make_shared<RedisContext>();
client_table_.reset(new ClientTable(primary_context_, this, client_id));
object_table_.reset(new ObjectTable(context_, this));
actor_table_.reset(new ActorTable(context_, this));
task_table_.reset(new TaskTable(context_, this, command_type));
raylet_task_table_.reset(new raylet::TaskTable(context_, this, command_type));
task_reconstruction_log_.reset(new TaskReconstructionLog(context_, this));
heartbeat_table_.reset(new HeartbeatTable(context_, this));
error_table_.reset(new ErrorTable(context_, this));
error_table_.reset(new ErrorTable(primary_context_, this));
command_type_ = command_type;
}

Expand All @@ -34,8 +35,9 @@ AsyncGcsClient::AsyncGcsClient(CommandType command_type)

AsyncGcsClient::AsyncGcsClient() : AsyncGcsClient(ClientID::from_random()) {}

Status AsyncGcsClient::Connect(const std::string &address, int port) {
RAY_RETURN_NOT_OK(context_->Connect(address, port));
Status AsyncGcsClient::Connect(const std::string &address, int port, bool sharding) {
RAY_RETURN_NOT_OK(context_->Connect(address, port, sharding));
RAY_RETURN_NOT_OK(primary_context_->Connect(address, port, /*sharding=*/false));
// TODO(swang): Call the client table's Connect() method here. To do this,
// we need to make sure that we are attached to an event loop first. This
// currently isn't possible because the aeEventLoop, which we use for
Expand All @@ -53,6 +55,10 @@ Status AsyncGcsClient::Attach(boost::asio::io_service &io_service) {
asio_async_client_.reset(new RedisAsioClient(io_service, context_->async_context()));
asio_subscribe_client_.reset(
new RedisAsioClient(io_service, context_->subscribe_context()));
asio_async_auxiliary_client_.reset(
new RedisAsioClient(io_service, primary_context_->async_context()));
asio_subscribe_auxiliary_client_.reset(
new RedisAsioClient(io_service, primary_context_->subscribe_context()));
return Status::OK();
}

Expand Down
10 changes: 8 additions & 2 deletions src/ray/gcs/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,9 @@ class RAY_EXPORT AsyncGcsClient {
///
/// \param address The GCS IP address.
/// \param port The GCS port.
/// \param sharding If true, use sharded redis for the GCS.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we actually need this flag? It's only false in the tests, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it is only used in the tests, however to keep the tests lean and not having to start multiple shards, it's better to deactivate it there.

/// \return Status.
Status Connect(const std::string &address, int port);
Status Connect(const std::string &address, int port, bool sharding);
/// Attach this client to a plasma event loop. Note that only
/// one event loop should be attached at a time.
Status Attach(plasma::EventLoop &event_loop);
Expand Down Expand Up @@ -68,6 +69,7 @@ class RAY_EXPORT AsyncGcsClient {
const GetExportCallback &done_callback);

std::shared_ptr<RedisContext> context() { return context_; }
std::shared_ptr<RedisContext> primary_context() { return primary_context_; }

private:
std::unique_ptr<FunctionTable> function_table_;
Expand All @@ -80,10 +82,14 @@ class RAY_EXPORT AsyncGcsClient {
std::unique_ptr<HeartbeatTable> heartbeat_table_;
std::unique_ptr<ErrorTable> error_table_;
std::unique_ptr<ClientTable> client_table_;
// The following contexts write to the data shard
std::shared_ptr<RedisContext> context_;
std::unique_ptr<RedisAsioClient> asio_async_client_;
std::unique_ptr<RedisAsioClient> asio_subscribe_client_;

// The following context writes everything to the primary shard
std::shared_ptr<RedisContext> primary_context_;
std::unique_ptr<RedisAsioClient> asio_async_auxiliary_client_;
std::unique_ptr<RedisAsioClient> asio_subscribe_auxiliary_client_;
CommandType command_type_;
};

Expand Down
2 changes: 1 addition & 1 deletion src/ray/gcs/client_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class TestGcs : public ::testing::Test {
public:
TestGcs(CommandType command_type) : num_callbacks_(0), command_type_(command_type) {
client_ = std::make_shared<gcs::AsyncGcsClient>(command_type_);
RAY_CHECK_OK(client_->Connect("127.0.0.1", 6379));
RAY_CHECK_OK(client_->Connect("127.0.0.1", 6379, /*sharding=*/false));

job_id_ = JobID::from_random();
}
Expand Down
97 changes: 88 additions & 9 deletions src/ray/gcs/redis_context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

#include <unistd.h>

#include <sstream>

extern "C" {
#include "hiredis/adapters/ae.h"
#include "hiredis/async.h"
Expand Down Expand Up @@ -36,7 +38,7 @@ namespace gcs {
// asynchronous redis call. It dispatches the appropriate callback
// that was registered with the RedisCallbackManager.
void GlobalRedisCallback(void *c, void *r, void *privdata) {
if (r == NULL) {
if (r == nullptr) {
return;
}
int64_t callback_index = reinterpret_cast<int64_t>(privdata);
Expand Down Expand Up @@ -67,7 +69,7 @@ void GlobalRedisCallback(void *c, void *r, void *privdata) {
}

void SubscribeRedisCallback(void *c, void *r, void *privdata) {
if (r == NULL) {
if (r == nullptr) {
return;
}
int64_t callback_index = reinterpret_cast<int64_t>(privdata);
Expand Down Expand Up @@ -133,7 +135,70 @@ RedisContext::~RedisContext() {
}
}

Status RedisContext::Connect(const std::string &address, int port) {
static void GetRedisShards(redisContext *context, std::vector<std::string> *addresses,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's some overlap with #2281. @heyucongtom can you take a look?

Copy link
Contributor

@heyucongtom heyucongtom Jun 25, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are both doing a translation (from old ray get_redis_shards to new) here. I think the major difference is that
[+] #2298 add an use_sharding bool to indicate if we are using sharding.
[-] #2281 defaultly using all shards.
[-] #2281 modifies table.cc so that when tables do the work, they randomly select a redis shard by the ID.
[-] #2281 attaches all contexts to event loop, while only connecting the main context (I guess it may be wrong: since tests are giving timeouts.)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same thing is: we are writing the exactly same thing in getRedisShards function

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds right. This PR is actually really doing sharding. It's just creating a single extra shard (as opposed to an arbitrary number of additional shards).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I am guessing that there is some overlap(having sent out a msg on Slack on this), in which #2281 focuses more on distributing workload for table’s operations.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There should be little or no conflicts, though.

std::vector<int> *ports) {
// Get the total number of Redis shards in the system.
int num_attempts = 0;
redisReply *reply = nullptr;
while (num_attempts < RayConfig::instance().redis_db_connect_retries()) {
// Try to read the number of Redis shards from the primary shard. If the
// entry is present, exit.
reply = reinterpret_cast<redisReply *>(redisCommand(context, "GET NumRedisShards"));
if (reply->type != REDIS_REPLY_NIL) {
break;
}

// Sleep for a little, and try again if the entry isn't there yet. */
freeReplyObject(reply);
usleep(RayConfig::instance().redis_db_connect_wait_milliseconds() * 1000);
num_attempts++;
}
RAY_CHECK(num_attempts < RayConfig::instance().redis_db_connect_retries())
<< "No entry found for NumRedisShards";
RAY_CHECK(reply->type == REDIS_REPLY_STRING) << "Expected string, found Redis type "
<< reply->type << " for NumRedisShards";
int num_redis_shards = atoi(reply->str);
RAY_CHECK(num_redis_shards >= 1) << "Expected at least one Redis shard, "
<< "found " << num_redis_shards;
freeReplyObject(reply);

// Get the addresses of all of the Redis shards.
num_attempts = 0;
while (num_attempts < RayConfig::instance().redis_db_connect_retries()) {
// Try to read the Redis shard locations from the primary shard. If we find
// that all of them are present, exit.
reply =
reinterpret_cast<redisReply *>(redisCommand(context, "LRANGE RedisShards 0 -1"));
if (static_cast<int>(reply->elements) == num_redis_shards) {
break;
}

// Sleep for a little, and try again if not all Redis shard addresses have
// been added yet.
freeReplyObject(reply);
usleep(RayConfig::instance().redis_db_connect_wait_milliseconds() * 1000);
num_attempts++;
}
RAY_CHECK(num_attempts < RayConfig::instance().redis_db_connect_retries())
<< "Expected " << num_redis_shards << " Redis shard addresses, found "
<< reply->elements;

// Parse the Redis shard addresses.
for (size_t i = 0; i < reply->elements; ++i) {
// Parse the shard addresses and ports.
RAY_CHECK(reply->element[i]->type == REDIS_REPLY_STRING);
std::string addr;
std::stringstream ss(reply->element[i]->str);
getline(ss, addr, ':');
addresses->push_back(addr);
int port;
ss >> port;
ports->push_back(port);
}
freeReplyObject(reply);
}

Status RedisContext::Connect(const std::string &address, int port, bool sharding) {
int connection_attempts = 0;
context_ = redisConnect(address.c_str(), port);
while (context_ == nullptr || context_->err) {
Expand All @@ -158,17 +223,31 @@ Status RedisContext::Connect(const std::string &address, int port) {
REDIS_CHECK_ERROR(context_, reply);
freeReplyObject(reply);

std::string redis_address;
int redis_port;
if (sharding) {
// Get the redis data shard
std::vector<std::string> addresses;
std::vector<int> ports;
GetRedisShards(context_, &addresses, &ports);
redis_address = addresses[0];
redis_port = ports[0];
} else {
redis_address = address;
redis_port = port;
}

// Connect to async context
async_context_ = redisAsyncConnect(address.c_str(), port);
async_context_ = redisAsyncConnect(redis_address.c_str(), redis_port);
if (async_context_ == nullptr || async_context_->err) {
RAY_LOG(FATAL) << "Could not establish connection to redis " << address << ":"
<< port;
RAY_LOG(FATAL) << "Could not establish connection to redis " << redis_address << ":"
<< redis_port;
}
// Connect to subscribe context
subscribe_context_ = redisAsyncConnect(address.c_str(), port);
subscribe_context_ = redisAsyncConnect(redis_address.c_str(), redis_port);
if (subscribe_context_ == nullptr || subscribe_context_->err) {
RAY_LOG(FATAL) << "Could not establish subscribe connection to redis " << address
<< ":" << port;
RAY_LOG(FATAL) << "Could not establish subscribe connection to redis "
<< redis_address << ":" << redis_port;
}
return Status::OK();
}
Expand Down
2 changes: 1 addition & 1 deletion src/ray/gcs/redis_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class RedisContext {
RedisContext()
: context_(nullptr), async_context_(nullptr), subscribe_context_(nullptr) {}
~RedisContext();
Status Connect(const std::string &address, int port);
Status Connect(const std::string &address, int port, bool sharding);
Status AttachToEventLoop(aeEventLoop *loop);

/// Run an operation on some table key.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class MockServer {

private:
ray::Status RegisterGcs(boost::asio::io_service &io_service) {
RAY_RETURN_NOT_OK(gcs_client_->Connect("127.0.0.1", 6379));
RAY_RETURN_NOT_OK(gcs_client_->Connect("127.0.0.1", 6379, /*sharding=*/false));
RAY_RETURN_NOT_OK(gcs_client_->Attach(io_service));

boost::asio::ip::tcp::endpoint endpoint = object_manager_acceptor_.local_endpoint();
Expand Down
2 changes: 1 addition & 1 deletion src/ray/object_manager/test/object_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class MockServer {

private:
ray::Status RegisterGcs(boost::asio::io_service &io_service) {
RAY_RETURN_NOT_OK(gcs_client_->Connect("127.0.0.1", 6379));
RAY_RETURN_NOT_OK(gcs_client_->Connect("127.0.0.1", 6379, /*sharding=*/false));
RAY_RETURN_NOT_OK(gcs_client_->Attach(io_service));

boost::asio::ip::tcp::endpoint endpoint = object_manager_acceptor_.local_endpoint();
Expand Down
Loading