Skip to content

Commit

Permalink
[GCS] Use new interface class GcsClient in ray (ray-project#6805)
Browse files Browse the repository at this point in the history
  • Loading branch information
micafan authored and zhijunfu committed Jan 17, 2020
1 parent 8e8b66a commit e143f85
Show file tree
Hide file tree
Showing 24 changed files with 80 additions and 99 deletions.
2 changes: 1 addition & 1 deletion src/ray/core_worker/core_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -589,7 +589,7 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
bool connected_ = false;

// Client to the GCS shared by core worker interfaces.
std::shared_ptr<gcs::RedisGcsClient> gcs_client_;
std::shared_ptr<gcs::GcsClient> gcs_client_;

// Client to the raylet shared by core worker interfaces. This needs to be a
// shared_ptr for direct calls because we can lease multiple workers through
Expand Down
2 changes: 1 addition & 1 deletion src/ray/core_worker/profiling.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ ProfileEvent::ProfileEvent(const std::shared_ptr<Profiler> &profiler,

Profiler::Profiler(WorkerContext &worker_context, const std::string &node_ip_address,
boost::asio::io_service &io_service,
const std::shared_ptr<gcs::RedisGcsClient> &gcs_client)
const std::shared_ptr<gcs::GcsClient> &gcs_client)
: io_service_(io_service),
timer_(io_service_, boost::asio::chrono::seconds(1)),
rpc_profile_data_(new rpc::ProfileTableData()),
Expand Down
4 changes: 2 additions & 2 deletions src/ray/core_worker/profiling.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ class Profiler {
public:
Profiler(WorkerContext &worker_context, const std::string &node_ip_address,
boost::asio::io_service &io_service,
const std::shared_ptr<gcs::RedisGcsClient> &gcs_client);
const std::shared_ptr<gcs::GcsClient> &gcs_client);

// Add an event to the queue to be flushed periodically.
void AddEvent(const rpc::ProfileTableData::ProfileEvent &event) LOCKS_EXCLUDED(mutex_);
Expand All @@ -38,7 +38,7 @@ class Profiler {
std::shared_ptr<rpc::ProfileTableData> rpc_profile_data_ GUARDED_BY(mutex_);

// Client to the GCS used to push profile events to it.
std::shared_ptr<gcs::RedisGcsClient> gcs_client_;
std::shared_ptr<gcs::GcsClient> gcs_client_;
};

class ProfileEvent {
Expand Down
7 changes: 5 additions & 2 deletions src/ray/gcs/gcs_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ class GcsClient : public std::enable_shared_from_this<GcsClient> {
/// Disconnect with GCS Service. Non-thread safe.
virtual void Disconnect() = 0;

/// Return client information for debug.
virtual std::string DebugString() const { return ""; }

/// Get the sub-interface for accessing actor information in GCS.
/// This function is thread safe.
ActorInfoAccessor &Actors() {
Expand Down Expand Up @@ -102,14 +105,14 @@ class GcsClient : public std::enable_shared_from_this<GcsClient> {
return *error_accessor_;
}

/// Get the sub-interface for accessing stats in GCS.
/// Get the sub-interface for accessing stats information in GCS.
/// This function is thread safe.
StatsInfoAccessor &Stats() {
RAY_CHECK(stats_accessor_ != nullptr);
return *stats_accessor_;
}

/// Get the sub-interface for accessing stats in GCS.
/// Get the sub-interface for accessing worker information in GCS.
/// This function is thread safe.
WorkerInfoAccessor &Workers() {
RAY_CHECK(worker_accessor_ != nullptr);
Expand Down
61 changes: 21 additions & 40 deletions src/ray/gcs/redis_gcs_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,26 +18,6 @@ namespace gcs {
class RedisContext;

class RAY_EXPORT RedisGcsClient : public GcsClient {
// TODO(micafan) Will remove those friend classes after we replace RedisGcsClient
// with interface class GcsClient in raylet.
friend class RedisActorInfoAccessor;
friend class RedisJobInfoAccessor;
friend class RedisTaskInfoAccessor;
friend class RedisNodeInfoAccessor;
friend class RedisObjectInfoAccessor;
friend class RedisErrorInfoAccessor;
friend class RedisStatsInfoAccessor;
friend class RedisWorkerInfoAccessor;
friend class SubscriptionExecutorTest;
friend class LogSubscribeTestHelper;
friend class LogLookupTestHelper;
friend class LogDeleteTestHelper;
friend class TaskTableTestHelper;
friend class ClientTableTestHelper;
friend class SetTestHelper;
friend class HashTableTestHelper;
friend class ActorCheckpointIdTable;

public:
/// Constructor of RedisGcsClient.
/// Connect() must be called(and return ok) before you call any other methods.
Expand All @@ -61,10 +41,15 @@ class RAY_EXPORT RedisGcsClient : public GcsClient {
/// Must be single-threaded io_service (get more information from RedisAsioClient).
///
/// \return Status
Status Connect(boost::asio::io_service &io_service);
Status Connect(boost::asio::io_service &io_service) override;

/// Disconnect with GCS Service. Non-thread safe.
void Disconnect();
void Disconnect() override;

/// Returns debug string for class.
///
/// \return string.
std::string DebugString() const override;

// We also need something to export generic code to run on workers from the
// driver (to set the PYTHONPATH)
Expand All @@ -77,41 +62,37 @@ class RAY_EXPORT RedisGcsClient : public GcsClient {
std::vector<std::shared_ptr<RedisContext>> shard_contexts() { return shard_contexts_; }
std::shared_ptr<RedisContext> primary_context() { return primary_context_; }

/// Returns debug string for class.
///
/// \return string.
std::string DebugString() const;

private:
/// Attach this client to an asio event loop. Note that only
/// one event loop should be attached at a time.
void Attach(boost::asio::io_service &io_service);

/// The following three methods will be deprecated, use method Actors() instead.
/// The following xxx_table methods implement the Accessor interfaces.
/// Implements the Actors() interface.
ActorTable &actor_table();
ActorCheckpointTable &actor_checkpoint_table();
ActorCheckpointIdTable &actor_checkpoint_id_table();
/// This method will be deprecated, use method Jobs() instead.
/// Implements the Jobs() interface.
JobTable &job_table();
/// This method will be deprecated, use method Objects() instead.
/// Implements the Objects() interface.
ObjectTable &object_table();
/// The following four methods will be deprecated, use method Nodes() instead.
/// Implements the Nodes() interface.
ClientTable &client_table();
HeartbeatTable &heartbeat_table();
HeartbeatBatchTable &heartbeat_batch_table();
DynamicResourceTable &resource_table();
/// The following three methods will be deprecated, use method Tasks() instead.
/// Implements the Tasks() interface.
raylet::TaskTable &raylet_task_table();
TaskLeaseTable &task_lease_table();
TaskReconstructionLog &task_reconstruction_log();
/// This method will be deprecated, use method Errors() instead.
/// Implements the Errors() interface.
// TODO: Some API for getting the error on the driver
ErrorTable &error_table();
/// This method will be deprecated, use method Stats() instead.
/// Implements the Stats() interface.
ProfileTable &profile_table();
/// This method will be deprecated, use method Workers() instead.
/// Implements the Workers() interface.
WorkerFailureTable &worker_failure_table();

private:
/// Attach this client to an asio event loop. Note that only
/// one event loop should be attached at a time.
void Attach(boost::asio::io_service &io_service);

// GCS command type. If CommandType::kChain, chain-replicated versions of the tables
// might be used, if available.
CommandType command_type_{CommandType::kUnknown};
Expand Down
4 changes: 2 additions & 2 deletions src/ray/object_manager/object_directory.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
namespace ray {

ObjectDirectory::ObjectDirectory(boost::asio::io_service &io_service,
std::shared_ptr<gcs::RedisGcsClient> &gcs_client)
std::shared_ptr<gcs::GcsClient> &gcs_client)
: io_service_(io_service), gcs_client_(gcs_client) {}

namespace {
Expand All @@ -17,7 +17,7 @@ using ray::rpc::ObjectTableData;
/// object table entries up to but not including this notification.
void UpdateObjectLocations(bool is_added,
const std::vector<ObjectTableData> &location_updates,
std::shared_ptr<gcs::RedisGcsClient> gcs_client,
std::shared_ptr<gcs::GcsClient> gcs_client,
std::unordered_set<ClientID> *node_ids) {
// location_updates contains the updates of locations of the object.
// with GcsChangeMode, we can determine whether the update mode is
Expand Down
4 changes: 2 additions & 2 deletions src/ray/object_manager/object_directory.h
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ class ObjectDirectory : public ObjectDirectoryInterface {
/// \param gcs_client A Ray GCS client to request object and client
/// information from.
ObjectDirectory(boost::asio::io_service &io_service,
std::shared_ptr<gcs::RedisGcsClient> &gcs_client);
std::shared_ptr<gcs::GcsClient> &gcs_client);

virtual ~ObjectDirectory() {}

Expand Down Expand Up @@ -178,7 +178,7 @@ class ObjectDirectory : public ObjectDirectoryInterface {
/// Reference to the event loop.
boost::asio::io_service &io_service_;
/// Reference to the gcs client.
std::shared_ptr<gcs::RedisGcsClient> gcs_client_;
std::shared_ptr<gcs::GcsClient> gcs_client_;
/// Info about subscribers to object locations.
std::unordered_map<ObjectID, LocationListenerState> listeners_;
};
Expand Down
14 changes: 6 additions & 8 deletions src/ray/object_manager/test/object_manager_stress_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class MockServer {
public:
MockServer(boost::asio::io_service &main_service,
const ObjectManagerConfig &object_manager_config,
std::shared_ptr<gcs::RedisGcsClient> gcs_client)
std::shared_ptr<gcs::GcsClient> gcs_client)
: node_id_(ClientID::FromRandom()),
config_(object_manager_config),
gcs_client_(gcs_client),
Expand Down Expand Up @@ -62,7 +62,7 @@ class MockServer {

ClientID node_id_;
ObjectManagerConfig config_;
std::shared_ptr<gcs::RedisGcsClient> gcs_client_;
std::shared_ptr<gcs::GcsClient> gcs_client_;
ObjectManager object_manager_;
};

Expand Down Expand Up @@ -106,8 +106,7 @@ class TestObjectManagerBase : public ::testing::Test {
// start first server
gcs::GcsClientOptions client_options("127.0.0.1", 6379, /*password*/ "",
/*is_test_client=*/true);
gcs_client_1 =
std::shared_ptr<gcs::RedisGcsClient>(new gcs::RedisGcsClient(client_options));
gcs_client_1 = std::make_shared<gcs::RedisGcsClient>(client_options);
RAY_CHECK_OK(gcs_client_1->Connect(main_service));
ObjectManagerConfig om_config_1;
om_config_1.store_socket_name = store_id_1;
Expand All @@ -119,8 +118,7 @@ class TestObjectManagerBase : public ::testing::Test {
server1.reset(new MockServer(main_service, om_config_1, gcs_client_1));

// start second server
gcs_client_2 =
std::shared_ptr<gcs::RedisGcsClient>(new gcs::RedisGcsClient(client_options));
gcs_client_2 = std::make_shared<gcs::RedisGcsClient>(client_options);
RAY_CHECK_OK(gcs_client_2->Connect(main_service));
ObjectManagerConfig om_config_2;
om_config_2.store_socket_name = store_id_2;
Expand Down Expand Up @@ -170,8 +168,8 @@ class TestObjectManagerBase : public ::testing::Test {
protected:
std::thread p;
boost::asio::io_service main_service;
std::shared_ptr<gcs::RedisGcsClient> gcs_client_1;
std::shared_ptr<gcs::RedisGcsClient> gcs_client_2;
std::shared_ptr<gcs::GcsClient> gcs_client_1;
std::shared_ptr<gcs::GcsClient> gcs_client_2;
std::unique_ptr<MockServer> server1;
std::unique_ptr<MockServer> server2;

Expand Down
14 changes: 6 additions & 8 deletions src/ray/object_manager/test/object_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class MockServer {
public:
MockServer(boost::asio::io_service &main_service,
const ObjectManagerConfig &object_manager_config,
std::shared_ptr<gcs::RedisGcsClient> gcs_client)
std::shared_ptr<gcs::GcsClient> gcs_client)
: node_id_(ClientID::FromRandom()),
config_(object_manager_config),
gcs_client_(gcs_client),
Expand Down Expand Up @@ -56,7 +56,7 @@ class MockServer {

ClientID node_id_;
ObjectManagerConfig config_;
std::shared_ptr<gcs::RedisGcsClient> gcs_client_;
std::shared_ptr<gcs::GcsClient> gcs_client_;
ObjectManager object_manager_;
};

Expand Down Expand Up @@ -98,8 +98,7 @@ class TestObjectManagerBase : public ::testing::Test {
// start first server
gcs::GcsClientOptions client_options("127.0.0.1", 6379, /*password*/ "",
/*is_test_client=*/true);
gcs_client_1 =
std::shared_ptr<gcs::RedisGcsClient>(new gcs::RedisGcsClient(client_options));
gcs_client_1 = std::make_shared<gcs::RedisGcsClient>(client_options);
RAY_CHECK_OK(gcs_client_1->Connect(main_service));
ObjectManagerConfig om_config_1;
om_config_1.store_socket_name = store_id_1;
Expand All @@ -111,8 +110,7 @@ class TestObjectManagerBase : public ::testing::Test {
server1.reset(new MockServer(main_service, om_config_1, gcs_client_1));

// start second server
gcs_client_2 =
std::shared_ptr<gcs::RedisGcsClient>(new gcs::RedisGcsClient(client_options));
gcs_client_2 = std::make_shared<gcs::RedisGcsClient>(client_options);
RAY_CHECK_OK(gcs_client_2->Connect(main_service));
ObjectManagerConfig om_config_2;
om_config_2.store_socket_name = store_id_2;
Expand Down Expand Up @@ -166,8 +164,8 @@ class TestObjectManagerBase : public ::testing::Test {
protected:
std::thread p;
boost::asio::io_service main_service;
std::shared_ptr<gcs::RedisGcsClient> gcs_client_1;
std::shared_ptr<gcs::RedisGcsClient> gcs_client_2;
std::shared_ptr<gcs::GcsClient> gcs_client_1;
std::shared_ptr<gcs::GcsClient> gcs_client_2;
std::unique_ptr<MockServer> server1;
std::unique_ptr<MockServer> server2;

Expand Down
2 changes: 1 addition & 1 deletion src/ray/raylet/lineage_cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ const std::unordered_set<TaskID> &Lineage::GetChildren(const TaskID &task_id) co
}

LineageCache::LineageCache(const ClientID &self_node_id,
std::shared_ptr<gcs::RedisGcsClient> gcs_client,
std::shared_ptr<gcs::GcsClient> gcs_client,
uint64_t max_lineage_size)
: self_node_id_(self_node_id), gcs_client_(gcs_client) {}

Expand Down
5 changes: 2 additions & 3 deletions src/ray/raylet/lineage_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -209,8 +209,7 @@ class LineageCache {
public:
/// Create a lineage cache for the given task storage system.
/// TODO(swang): Pass in the policy (interface?).
LineageCache(const ClientID &self_node_id,
std::shared_ptr<gcs::RedisGcsClient> gcs_client,
LineageCache(const ClientID &self_node_id, std::shared_ptr<gcs::GcsClient> gcs_client,
uint64_t max_lineage_size);

/// Asynchronously commit a task to the GCS.
Expand Down Expand Up @@ -306,7 +305,7 @@ class LineageCache {
/// ID of this node.
ClientID self_node_id_;
/// A client connection to the GCS.
std::shared_ptr<gcs::RedisGcsClient> gcs_client_;
std::shared_ptr<gcs::GcsClient> gcs_client_;
/// All tasks and objects that we are responsible for writing back to the
/// GCS, and the tasks and objects in their lineage.
Lineage lineage_;
Expand Down
18 changes: 9 additions & 9 deletions src/ray/raylet/monitor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@ namespace raylet {
/// within heartbeat_timeout_milliseconds * num_heartbeats_timeout (defined in
/// the Ray configuration), then the monitor will mark that Raylet as dead in
/// the client table, which broadcasts the event to all other Raylets.
Monitor::Monitor(boost::asio::io_service &io_service, const std::string &redis_address,
int redis_port, const std::string &redis_password)
: gcs_client_(gcs::GcsClientOptions(redis_address, redis_port, redis_password)),
Monitor::Monitor(boost::asio::io_service &io_service,
const gcs::GcsClientOptions &gcs_client_options)
: gcs_client_(new gcs::RedisGcsClient(gcs_client_options)),
num_heartbeats_timeout_(RayConfig::instance().num_heartbeats_timeout()),
heartbeat_timer_(io_service) {
RAY_CHECK_OK(gcs_client_.Connect(io_service));
RAY_CHECK_OK(gcs_client_->Connect(io_service));
}

void Monitor::HandleHeartbeat(const ClientID &node_id,
Expand All @@ -35,7 +35,7 @@ void Monitor::Start() {
const HeartbeatTableData &heartbeat_data) {
HandleHeartbeat(id, heartbeat_data);
};
RAY_CHECK_OK(gcs_client_.Nodes().AsyncSubscribeHeartbeat(heartbeat_callback, nullptr));
RAY_CHECK_OK(gcs_client_->Nodes().AsyncSubscribeHeartbeat(heartbeat_callback, nullptr));
Tick();
}

Expand All @@ -59,7 +59,7 @@ void Monitor::Tick() {
}
if (!marked) {
RAY_CHECK_OK(
gcs_client_.Nodes().AsyncUnregister(node_id, /* callback */ nullptr));
gcs_client_->Nodes().AsyncUnregister(node_id, /* callback */ nullptr));
// Broadcast a warning to all of the drivers indicating that the node
// has been marked as dead.
// TODO(rkn): Define this constant somewhere else.
Expand All @@ -71,10 +71,10 @@ void Monitor::Tick() {
auto error_data_ptr =
gcs::CreateErrorTableData(type, error_message.str(), current_time_ms());
RAY_CHECK_OK(
gcs_client_.Errors().AsyncReportJobError(error_data_ptr, nullptr));
gcs_client_->Errors().AsyncReportJobError(error_data_ptr, nullptr));
}
};
RAY_CHECK_OK(gcs_client_.Nodes().AsyncGetAll(lookup_callback));
RAY_CHECK_OK(gcs_client_->Nodes().AsyncGetAll(lookup_callback));
dead_nodes_.insert(node_id);
}
it = heartbeats_.erase(it);
Expand All @@ -89,7 +89,7 @@ void Monitor::Tick() {
for (const auto &heartbeat : heartbeat_buffer_) {
batch->add_batch()->CopyFrom(heartbeat.second);
}
RAY_CHECK_OK(gcs_client_.Nodes().AsyncReportBatchHeartbeat(batch, nullptr));
RAY_CHECK_OK(gcs_client_->Nodes().AsyncReportBatchHeartbeat(batch, nullptr));
heartbeat_buffer_.clear();
}

Expand Down
Loading

0 comments on commit e143f85

Please sign in to comment.