Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/global_scheduler/global_scheduler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,10 @@ GlobalSchedulerState *GlobalSchedulerState_init(event_loop *loop,

RAY_CHECK_OK(state->gcs_client.Connect(
std::string(redis_primary_addr), redis_primary_port, /*sharding=*/true));
RAY_CHECK_OK(state->gcs_client.context()->AttachToEventLoop(loop));
RAY_CHECK_OK(state->gcs_client.primary_context()->AttachToEventLoop(loop));
for (auto context : state->gcs_client.shard_contexts()) {
RAY_CHECK_OK(context->AttachToEventLoop(loop));
}
state->policy_state = GlobalSchedulerPolicyState_init();
return state;
}
Expand Down
75 changes: 74 additions & 1 deletion src/local_scheduler/local_scheduler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,12 @@ void kill_worker(LocalSchedulerState *state,
/* Update the task table to reflect that the task failed to complete. */
if (state->db != NULL) {
Task_set_state(worker->task_in_progress, TaskStatus::LOST);
#if !RAY_USE_NEW_GCS
task_table_update(state->db, worker->task_in_progress, NULL, NULL, NULL);
#else
RAY_CHECK_OK(TaskTableAdd(&state->gcs_client, worker->task_in_progress));
Task_free(worker->task_in_progress);
#endif
} else {
Task_free(worker->task_in_progress);
}
Expand Down Expand Up @@ -354,8 +359,10 @@ LocalSchedulerState *LocalSchedulerState_init(

RAY_CHECK_OK(state->gcs_client.Connect(std::string(redis_primary_addr),
redis_primary_port, true));
RAY_CHECK_OK(state->gcs_client.context()->AttachToEventLoop(loop));
RAY_CHECK_OK(state->gcs_client.primary_context()->AttachToEventLoop(loop));
for (auto context : state->gcs_client.shard_contexts()) {
RAY_CHECK_OK(context->AttachToEventLoop(loop));
}
} else {
state->db = NULL;
}
Expand Down Expand Up @@ -553,7 +560,12 @@ void assign_task_to_worker(LocalSchedulerState *state,
worker->task_in_progress = Task_copy(task);
/* Update the global task table. */
if (state->db != NULL) {
#if !RAY_USE_NEW_GCS
task_table_update(state->db, task, NULL, NULL, NULL);
#else
RAY_CHECK_OK(TaskTableAdd(&state->gcs_client, task));
Task_free(task);
#endif
} else {
Task_free(task);
}
Expand Down Expand Up @@ -620,6 +632,7 @@ void finish_task(LocalSchedulerState *state, LocalSchedulerClient *worker) {
/* Update control state tables. */
TaskStatus task_state = TaskStatus::DONE;
Task_set_state(worker->task_in_progress, task_state);
#if !RAY_USE_NEW_GCS
auto retryInfo = RetryInfo{
.num_retries = 0, // This value is unused.
.timeout = 0, // This value is unused.
Expand All @@ -630,6 +643,10 @@ void finish_task(LocalSchedulerState *state, LocalSchedulerClient *worker) {
// task table entries have already been cleaned up by the monitor.
task_table_update(state->db, worker->task_in_progress, &retryInfo, NULL,
NULL);
#else
RAY_CHECK_OK(TaskTableAdd(&state->gcs_client, worker->task_in_progress));
Task_free(worker->task_in_progress);
#endif
} else {
Task_free(worker->task_in_progress);
}
Expand Down Expand Up @@ -677,10 +694,22 @@ void reconstruct_task_update_callback(Task *task,
/* (2) The current local scheduler for the task is dead. The task is
* lost, but the task table hasn't received the update yet. Retry the
* test-and-set. */
#if !RAY_USE_NEW_GCS
task_table_test_and_update(state->db, Task_task_id(task),
current_local_scheduler_id, Task_state(task),
TaskStatus::RECONSTRUCTING, NULL,
reconstruct_task_update_callback, state);
#else
RAY_CHECK_OK(gcs::TaskTableTestAndUpdate(
&state->gcs_client, Task_task_id(task), current_local_scheduler_id,
static_cast<SchedulingState>(Task_state(task)),
SchedulingState::RECONSTRUCTING,
[task, user_context](gcs::AsyncGcsClient *, const ray::TaskID &,
const TaskTableDataT &t, bool updated) {
reconstruct_task_update_callback(task, user_context, updated);
}));
Task_free(task);
#endif
}
}
/* The test-and-set failed, so it is not safe to resubmit the task for
Expand Down Expand Up @@ -724,10 +753,22 @@ void reconstruct_put_task_update_callback(Task *task,
/* (2) The current local scheduler for the task is dead. The task is
* lost, but the task table hasn't received the update yet. Retry the
* test-and-set. */
#if !RAY_USE_NEW_GCS
task_table_test_and_update(state->db, Task_task_id(task),
current_local_scheduler_id, Task_state(task),
TaskStatus::RECONSTRUCTING, NULL,
reconstruct_put_task_update_callback, state);
#else
RAY_CHECK_OK(gcs::TaskTableTestAndUpdate(
&state->gcs_client, Task_task_id(task), current_local_scheduler_id,
static_cast<SchedulingState>(Task_state(task)),
SchedulingState::RECONSTRUCTING,
[task, user_context](gcs::AsyncGcsClient *, const ray::TaskID &,
const TaskTableDataT &, bool updated) {
reconstruct_put_task_update_callback(task, user_context, updated);
}));
Task_free(task);
#endif
} else if (Task_state(task) == TaskStatus::RUNNING) {
/* (1) The task is still executing on a live node. The object created
* by `ray.put` was not able to be reconstructed, and the workload will
Expand Down Expand Up @@ -782,10 +823,27 @@ void reconstruct_evicted_result_lookup_callback(ObjectID reconstruct_object_id,
}
/* If there are no other instances of the task running, it's safe for us to
* claim responsibility for reconstruction. */
#if !RAY_USE_NEW_GCS
task_table_test_and_update(state->db, task_id, DBClientID::nil(),
(TaskStatus::DONE | TaskStatus::LOST),
TaskStatus::RECONSTRUCTING, NULL, done_callback,
state);
#else
RAY_CHECK_OK(gcs::TaskTableTestAndUpdate(
&state->gcs_client, task_id, DBClientID::nil(),
static_cast<SchedulingState>(static_cast<uint>(SchedulingState::DONE) |
static_cast<uint>(SchedulingState::LOST)),
SchedulingState::RECONSTRUCTING,
[done_callback, state](gcs::AsyncGcsClient *, const ray::TaskID &,
const TaskTableDataT &t, bool updated) {
Task *task = Task_alloc(t.task_info.data(), t.task_info.size(),
static_cast<TaskStatus>(t.scheduling_state),
DBClientID::from_binary(t.scheduler_id),
std::vector<ObjectID>());
done_callback(task, state, updated);
Task_free(task);
}));
#endif
}

void reconstruct_failed_result_lookup_callback(ObjectID reconstruct_object_id,
Expand All @@ -804,9 +862,24 @@ void reconstruct_failed_result_lookup_callback(ObjectID reconstruct_object_id,
LocalSchedulerState *state = (LocalSchedulerState *) user_context;
/* If the task failed to finish, it's safe for us to claim responsibility for
* reconstruction. */
#if !RAY_USE_NEW_GCS
task_table_test_and_update(state->db, task_id, DBClientID::nil(),
TaskStatus::LOST, TaskStatus::RECONSTRUCTING, NULL,
reconstruct_task_update_callback, state);
#else
RAY_CHECK_OK(gcs::TaskTableTestAndUpdate(
&state->gcs_client, task_id, DBClientID::nil(), SchedulingState::LOST,
SchedulingState::RECONSTRUCTING,
[state](gcs::AsyncGcsClient *, const ray::TaskID &,
const TaskTableDataT &t, bool updated) {
Task *task = Task_alloc(t.task_info.data(), t.task_info.size(),
static_cast<TaskStatus>(t.scheduling_state),
DBClientID::from_binary(t.scheduler_id),
std::vector<ObjectID>());
reconstruct_task_update_callback(task, state, updated);
Task_free(task);
}));
#endif
}

void reconstruct_object_lookup_callback(
Expand Down
7 changes: 5 additions & 2 deletions src/plasma/plasma_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -489,9 +489,12 @@ PlasmaManagerState *PlasmaManagerState_init(const char *store_socket_name,
RAY_CHECK_OK(state->gcs_client.Connect(std::string(redis_primary_addr),
redis_primary_port,
/*sharding=*/true));
RAY_CHECK_OK(state->gcs_client.context()->AttachToEventLoop(state->loop));
RAY_CHECK_OK(
state->gcs_client.primary_context()->AttachToEventLoop(state->loop));
state->gcs_client.primary_context()->AttachToEventLoop(state->loop));
for (auto context : state->gcs_client.shard_contexts()) {
RAY_CHECK_OK(context->AttachToEventLoop(state->loop));
}

} else {
state->db = NULL;
RAY_LOG(DEBUG) << "No db connection specified";
Expand Down
130 changes: 116 additions & 14 deletions src/ray/gcs/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,83 @@ namespace ray {

namespace gcs {

void GetRedisShards(redisContext *context, std::vector<std::string> &addresses,
std::vector<int> &ports) {
// Get the total number of Redis shards in the system.
int num_attempts = 0;
redisReply *reply = nullptr;
while (num_attempts < RayConfig::instance().redis_db_connect_retries()) {
// Try to read the number of Redis shards from the primary shard. If the
// entry is present, exit.
reply = reinterpret_cast<redisReply *>(redisCommand(context, "GET NumRedisShards"));
if (reply->type != REDIS_REPLY_NIL) {
break;
}

// Sleep for a little, and try again if the entry isn't there yet. */
freeReplyObject(reply);
usleep(RayConfig::instance().redis_db_connect_wait_milliseconds() * 1000);
num_attempts++;
}
RAY_CHECK(num_attempts < RayConfig::instance().redis_db_connect_retries())
<< "No entry found for NumRedisShards";
RAY_CHECK(reply->type == REDIS_REPLY_STRING) << "Expected string, found Redis type "
<< reply->type << " for NumRedisShards";
int num_redis_shards = atoi(reply->str);
RAY_CHECK(num_redis_shards >= 1) << "Expected at least one Redis shard, "
<< "found " << num_redis_shards;
freeReplyObject(reply);

// Get the addresses of all of the Redis shards.
num_attempts = 0;
while (num_attempts < RayConfig::instance().redis_db_connect_retries()) {
// Try to read the Redis shard locations from the primary shard. If we find
// that all of them are present, exit.
reply =
reinterpret_cast<redisReply *>(redisCommand(context, "LRANGE RedisShards 0 -1"));
if (static_cast<int>(reply->elements) == num_redis_shards) {
break;
}

// Sleep for a little, and try again if not all Redis shard addresses have
// been added yet.
freeReplyObject(reply);
usleep(RayConfig::instance().redis_db_connect_wait_milliseconds() * 1000);
num_attempts++;
}
RAY_CHECK(num_attempts < RayConfig::instance().redis_db_connect_retries())
<< "Expected " << num_redis_shards << " Redis shard addresses, found "
<< reply->elements;

// Parse the Redis shard addresses.
for (size_t i = 0; i < reply->elements; ++i) {
// Parse the shard addresses and ports.
RAY_CHECK(reply->element[i]->type == REDIS_REPLY_STRING);
std::string addr;
std::stringstream ss(reply->element[i]->str);
getline(ss, addr, ':');
addresses.push_back(addr);
int port;
ss >> port;
ports.push_back(port);
}
freeReplyObject(reply);
}

AsyncGcsClient::AsyncGcsClient(const ClientID &client_id, CommandType command_type) {
context_ = std::make_shared<RedisContext>();
primary_context_ = std::make_shared<RedisContext>();
client_table_.reset(new ClientTable(primary_context_, this, client_id));
object_table_.reset(new ObjectTable(context_, this, command_type));
actor_table_.reset(new ActorTable(context_, this));
task_table_.reset(new TaskTable(context_, this, command_type));
raylet_task_table_.reset(new raylet::TaskTable(context_, this, command_type));
task_reconstruction_log_.reset(new TaskReconstructionLog(context_, this));
heartbeat_table_.reset(new HeartbeatTable(context_, this));
error_table_.reset(new ErrorTable(primary_context_, this));
command_type_ = command_type;

// Create tables with empty contexts first.
// This is in accord with some use of tables before Connect.
client_table_.reset(new ClientTable(shard_contexts_, this, client_id));
error_table_.reset(new ErrorTable(shard_contexts_, this));
object_table_.reset(new ObjectTable(shard_contexts_, this, command_type));
actor_table_.reset(new ActorTable(shard_contexts_, this));
task_table_.reset(new TaskTable(shard_contexts_, this, command_type_));
raylet_task_table_.reset(new raylet::TaskTable(shard_contexts_, this, command_type_));
task_reconstruction_log_.reset(new TaskReconstructionLog(shard_contexts_, this));
heartbeat_table_.reset(new HeartbeatTable(shard_contexts_, this));
}

#if RAY_USE_NEW_GCS
Expand All @@ -36,8 +101,37 @@ AsyncGcsClient::AsyncGcsClient(CommandType command_type)
AsyncGcsClient::AsyncGcsClient() : AsyncGcsClient(ClientID::from_random()) {}

Status AsyncGcsClient::Connect(const std::string &address, int port, bool sharding) {
RAY_RETURN_NOT_OK(context_->Connect(address, port, sharding));
RAY_RETURN_NOT_OK(primary_context_->Connect(address, port, /*sharding=*/false));

// Primary context is responsible for client and error tables.
RAY_RETURN_NOT_OK(primary_context_->Connect(address, port));

// If not sharding, only creating one context, connect with same address & port
// as the primary one. This in effect works as the primary.
shard_contexts_.push_back(std::make_shared<RedisContext>());
RAY_RETURN_NOT_OK(shard_contexts_[0]->Connect(address, port));
client_table_->AddShards(shard_contexts_);
error_table_->AddShards(shard_contexts_);
task_table_->AddShards(shard_contexts_); // Until find a valid sharding.

// If sharding, add all shard contexts, distributes them.
if (sharding) {
// Else, connect the rest of contexts
std::vector<std::string> addresses;
std::vector<int> ports;

GetRedisShards(primary_context_->sync_context(), addresses, ports);
for (unsigned int i = 0; i < addresses.size(); ++i) {
shard_contexts_.push_back(std::make_shared<RedisContext>());
RAY_RETURN_NOT_OK(shard_contexts_.back()->Connect(addresses[i], ports[i]));
}
}

object_table_->AddShards(shard_contexts_);
actor_table_->AddShards(shard_contexts_);
raylet_task_table_->AddShards(shard_contexts_);
task_reconstruction_log_->AddShards(shard_contexts_);
heartbeat_table_->AddShards(shard_contexts_);

// TODO(swang): Call the client table's Connect() method here. To do this,
// we need to make sure that we are attached to an event loop first. This
// currently isn't possible because the aeEventLoop, which we use for
Expand All @@ -52,13 +146,21 @@ Status Attach(plasma::EventLoop &event_loop) {
}

Status AsyncGcsClient::Attach(boost::asio::io_service &io_service) {
asio_async_client_.reset(new RedisAsioClient(io_service, context_->async_context()));
asio_subscribe_client_.reset(
new RedisAsioClient(io_service, context_->subscribe_context()));

asio_async_auxiliary_client_.reset(
new RedisAsioClient(io_service, primary_context_->async_context()));
asio_subscribe_auxiliary_client_.reset(
new RedisAsioClient(io_service, primary_context_->subscribe_context()));

// Take care of sharding contexts.
for (std::shared_ptr<RedisContext> context : shard_contexts_) {
shard_asio_async_clients_.emplace_back(
new RedisAsioClient(io_service, context->async_context())
);
shard_asio_subscribe_clients_.emplace_back(
new RedisAsioClient(io_service, context->subscribe_context())
);
}
return Status::OK();
}

Expand Down
8 changes: 4 additions & 4 deletions src/ray/gcs/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ class RAY_EXPORT AsyncGcsClient {
Status GetExport(const std::string &driver_id, int64_t export_index,
const GetExportCallback &done_callback);

std::shared_ptr<RedisContext> context() { return context_; }
std::shared_ptr<RedisContext> primary_context() { return primary_context_; }
std::vector<std::shared_ptr<RedisContext>> shard_contexts() { return shard_contexts_; }

private:
std::unique_ptr<FunctionTable> function_table_;
Expand All @@ -83,9 +83,9 @@ class RAY_EXPORT AsyncGcsClient {
std::unique_ptr<ErrorTable> error_table_;
std::unique_ptr<ClientTable> client_table_;
// The following contexts write to the data shard
std::shared_ptr<RedisContext> context_;
std::unique_ptr<RedisAsioClient> asio_async_client_;
std::unique_ptr<RedisAsioClient> asio_subscribe_client_;
std::vector<std::shared_ptr<RedisContext>> shard_contexts_;
std::vector<std::unique_ptr<RedisAsioClient>> shard_asio_async_clients_;
std::vector<std::unique_ptr<RedisAsioClient>> shard_asio_subscribe_clients_;
// The following context writes everything to the primary shard
std::shared_ptr<RedisContext> primary_context_;
std::unique_ptr<RedisAsioClient> asio_async_auxiliary_client_;
Expand Down
5 changes: 4 additions & 1 deletion src/ray/gcs/client_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,10 @@ class TestGcsWithAe : public TestGcs {
public:
TestGcsWithAe(CommandType command_type) : TestGcs(command_type) {
loop_ = aeCreateEventLoop(1024);
RAY_CHECK_OK(client_->context()->AttachToEventLoop(loop_));
// Tests when sharding = false.
// TODO(heyucongtom): We shall figure out some way to test sharding=true.
RAY_CHECK_OK(client_->primary_context()->AttachToEventLoop(loop_));
RAY_CHECK_OK(client_->shard_contexts()[0]->AttachToEventLoop(loop_));
}

TestGcsWithAe() : TestGcsWithAe(CommandType::kRegular) {}
Expand Down
Loading