From e143f85ca05692119d33c0e439a0dee0a4e5e4cf Mon Sep 17 00:00:00 2001 From: micafan <550435771@qq.com> Date: Fri, 17 Jan 2020 14:51:18 +0800 Subject: [PATCH] [GCS] Use new interface class GcsClient in ray (#6805) --- src/ray/core_worker/core_worker.h | 2 +- src/ray/core_worker/profiling.cc | 2 +- src/ray/core_worker/profiling.h | 4 +- src/ray/gcs/gcs_client.h | 7 ++- src/ray/gcs/redis_gcs_client.h | 61 +++++++------------ src/ray/object_manager/object_directory.cc | 4 +- src/ray/object_manager/object_directory.h | 4 +- .../test/object_manager_stress_test.cc | 14 ++--- .../test/object_manager_test.cc | 14 ++--- src/ray/raylet/lineage_cache.cc | 2 +- src/ray/raylet/lineage_cache.h | 5 +- src/ray/raylet/monitor.cc | 18 +++--- src/ray/raylet/monitor.h | 6 +- src/ray/raylet/monitor_main.cc | 5 +- src/ray/raylet/node_manager.cc | 2 +- src/ray/raylet/node_manager.h | 4 +- src/ray/raylet/raylet.cc | 2 +- src/ray/raylet/raylet.h | 4 +- src/ray/raylet/reconstruction_policy.cc | 2 +- src/ray/raylet/reconstruction_policy.h | 4 +- src/ray/raylet/task_dependency_manager.cc | 2 +- src/ray/raylet/task_dependency_manager.h | 4 +- src/ray/raylet/worker_pool.cc | 2 +- src/ray/raylet/worker_pool.h | 5 +- 24 files changed, 80 insertions(+), 99 deletions(-) diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index aefe41139ba7..5daa374d18ce 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -589,7 +589,7 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { bool connected_ = false; // Client to the GCS shared by core worker interfaces. - std::shared_ptr gcs_client_; + std::shared_ptr gcs_client_; // Client to the raylet shared by core worker interfaces. This needs to be a // shared_ptr for direct calls because we can lease multiple workers through diff --git a/src/ray/core_worker/profiling.cc b/src/ray/core_worker/profiling.cc index 4c8cbede277f..ea4818ccad71 100644 --- a/src/ray/core_worker/profiling.cc +++ b/src/ray/core_worker/profiling.cc @@ -15,7 +15,7 @@ ProfileEvent::ProfileEvent(const std::shared_ptr &profiler, Profiler::Profiler(WorkerContext &worker_context, const std::string &node_ip_address, boost::asio::io_service &io_service, - const std::shared_ptr &gcs_client) + const std::shared_ptr &gcs_client) : io_service_(io_service), timer_(io_service_, boost::asio::chrono::seconds(1)), rpc_profile_data_(new rpc::ProfileTableData()), diff --git a/src/ray/core_worker/profiling.h b/src/ray/core_worker/profiling.h index 01ed41b5bbea..bbb057b75148 100644 --- a/src/ray/core_worker/profiling.h +++ b/src/ray/core_worker/profiling.h @@ -15,7 +15,7 @@ class Profiler { public: Profiler(WorkerContext &worker_context, const std::string &node_ip_address, boost::asio::io_service &io_service, - const std::shared_ptr &gcs_client); + const std::shared_ptr &gcs_client); // Add an event to the queue to be flushed periodically. void AddEvent(const rpc::ProfileTableData::ProfileEvent &event) LOCKS_EXCLUDED(mutex_); @@ -38,7 +38,7 @@ class Profiler { std::shared_ptr rpc_profile_data_ GUARDED_BY(mutex_); // Client to the GCS used to push profile events to it. - std::shared_ptr gcs_client_; + std::shared_ptr gcs_client_; }; class ProfileEvent { diff --git a/src/ray/gcs/gcs_client.h b/src/ray/gcs/gcs_client.h index 68ae30f3d3c0..14fbcfb62c34 100644 --- a/src/ray/gcs/gcs_client.h +++ b/src/ray/gcs/gcs_client.h @@ -60,6 +60,9 @@ class GcsClient : public std::enable_shared_from_this { /// Disconnect with GCS Service. Non-thread safe. virtual void Disconnect() = 0; + /// Return client information for debug. + virtual std::string DebugString() const { return ""; } + /// Get the sub-interface for accessing actor information in GCS. /// This function is thread safe. ActorInfoAccessor &Actors() { @@ -102,14 +105,14 @@ class GcsClient : public std::enable_shared_from_this { return *error_accessor_; } - /// Get the sub-interface for accessing stats in GCS. + /// Get the sub-interface for accessing stats information in GCS. /// This function is thread safe. StatsInfoAccessor &Stats() { RAY_CHECK(stats_accessor_ != nullptr); return *stats_accessor_; } - /// Get the sub-interface for accessing stats in GCS. + /// Get the sub-interface for accessing worker information in GCS. /// This function is thread safe. WorkerInfoAccessor &Workers() { RAY_CHECK(worker_accessor_ != nullptr); diff --git a/src/ray/gcs/redis_gcs_client.h b/src/ray/gcs/redis_gcs_client.h index 42780c2f6427..2d6c6053b6bc 100644 --- a/src/ray/gcs/redis_gcs_client.h +++ b/src/ray/gcs/redis_gcs_client.h @@ -18,26 +18,6 @@ namespace gcs { class RedisContext; class RAY_EXPORT RedisGcsClient : public GcsClient { - // TODO(micafan) Will remove those friend classes after we replace RedisGcsClient - // with interface class GcsClient in raylet. - friend class RedisActorInfoAccessor; - friend class RedisJobInfoAccessor; - friend class RedisTaskInfoAccessor; - friend class RedisNodeInfoAccessor; - friend class RedisObjectInfoAccessor; - friend class RedisErrorInfoAccessor; - friend class RedisStatsInfoAccessor; - friend class RedisWorkerInfoAccessor; - friend class SubscriptionExecutorTest; - friend class LogSubscribeTestHelper; - friend class LogLookupTestHelper; - friend class LogDeleteTestHelper; - friend class TaskTableTestHelper; - friend class ClientTableTestHelper; - friend class SetTestHelper; - friend class HashTableTestHelper; - friend class ActorCheckpointIdTable; - public: /// Constructor of RedisGcsClient. /// Connect() must be called(and return ok) before you call any other methods. @@ -61,10 +41,15 @@ class RAY_EXPORT RedisGcsClient : public GcsClient { /// Must be single-threaded io_service (get more information from RedisAsioClient). /// /// \return Status - Status Connect(boost::asio::io_service &io_service); + Status Connect(boost::asio::io_service &io_service) override; /// Disconnect with GCS Service. Non-thread safe. - void Disconnect(); + void Disconnect() override; + + /// Returns debug string for class. + /// + /// \return string. + std::string DebugString() const override; // We also need something to export generic code to run on workers from the // driver (to set the PYTHONPATH) @@ -77,41 +62,37 @@ class RAY_EXPORT RedisGcsClient : public GcsClient { std::vector> shard_contexts() { return shard_contexts_; } std::shared_ptr primary_context() { return primary_context_; } - /// Returns debug string for class. - /// - /// \return string. - std::string DebugString() const; - - private: - /// Attach this client to an asio event loop. Note that only - /// one event loop should be attached at a time. - void Attach(boost::asio::io_service &io_service); - - /// The following three methods will be deprecated, use method Actors() instead. + /// The following xxx_table methods implement the Accessor interfaces. + /// Implements the Actors() interface. ActorTable &actor_table(); ActorCheckpointTable &actor_checkpoint_table(); ActorCheckpointIdTable &actor_checkpoint_id_table(); - /// This method will be deprecated, use method Jobs() instead. + /// Implements the Jobs() interface. JobTable &job_table(); - /// This method will be deprecated, use method Objects() instead. + /// Implements the Objects() interface. ObjectTable &object_table(); - /// The following four methods will be deprecated, use method Nodes() instead. + /// Implements the Nodes() interface. ClientTable &client_table(); HeartbeatTable &heartbeat_table(); HeartbeatBatchTable &heartbeat_batch_table(); DynamicResourceTable &resource_table(); - /// The following three methods will be deprecated, use method Tasks() instead. + /// Implements the Tasks() interface. raylet::TaskTable &raylet_task_table(); TaskLeaseTable &task_lease_table(); TaskReconstructionLog &task_reconstruction_log(); - /// This method will be deprecated, use method Errors() instead. + /// Implements the Errors() interface. // TODO: Some API for getting the error on the driver ErrorTable &error_table(); - /// This method will be deprecated, use method Stats() instead. + /// Implements the Stats() interface. ProfileTable &profile_table(); - /// This method will be deprecated, use method Workers() instead. + /// Implements the Workers() interface. WorkerFailureTable &worker_failure_table(); + private: + /// Attach this client to an asio event loop. Note that only + /// one event loop should be attached at a time. + void Attach(boost::asio::io_service &io_service); + // GCS command type. If CommandType::kChain, chain-replicated versions of the tables // might be used, if available. CommandType command_type_{CommandType::kUnknown}; diff --git a/src/ray/object_manager/object_directory.cc b/src/ray/object_manager/object_directory.cc index 7d510e6afd5a..b25f086321ca 100644 --- a/src/ray/object_manager/object_directory.cc +++ b/src/ray/object_manager/object_directory.cc @@ -3,7 +3,7 @@ namespace ray { ObjectDirectory::ObjectDirectory(boost::asio::io_service &io_service, - std::shared_ptr &gcs_client) + std::shared_ptr &gcs_client) : io_service_(io_service), gcs_client_(gcs_client) {} namespace { @@ -17,7 +17,7 @@ using ray::rpc::ObjectTableData; /// object table entries up to but not including this notification. void UpdateObjectLocations(bool is_added, const std::vector &location_updates, - std::shared_ptr gcs_client, + std::shared_ptr gcs_client, std::unordered_set *node_ids) { // location_updates contains the updates of locations of the object. // with GcsChangeMode, we can determine whether the update mode is diff --git a/src/ray/object_manager/object_directory.h b/src/ray/object_manager/object_directory.h index 669d9b7fa59c..777c54cd5f79 100644 --- a/src/ray/object_manager/object_directory.h +++ b/src/ray/object_manager/object_directory.h @@ -129,7 +129,7 @@ class ObjectDirectory : public ObjectDirectoryInterface { /// \param gcs_client A Ray GCS client to request object and client /// information from. ObjectDirectory(boost::asio::io_service &io_service, - std::shared_ptr &gcs_client); + std::shared_ptr &gcs_client); virtual ~ObjectDirectory() {} @@ -178,7 +178,7 @@ class ObjectDirectory : public ObjectDirectoryInterface { /// Reference to the event loop. boost::asio::io_service &io_service_; /// Reference to the gcs client. - std::shared_ptr gcs_client_; + std::shared_ptr gcs_client_; /// Info about subscribers to object locations. std::unordered_map listeners_; }; diff --git a/src/ray/object_manager/test/object_manager_stress_test.cc b/src/ray/object_manager/test/object_manager_stress_test.cc index 4efd7f45fe5d..5b76be9fd7f3 100644 --- a/src/ray/object_manager/test/object_manager_stress_test.cc +++ b/src/ray/object_manager/test/object_manager_stress_test.cc @@ -34,7 +34,7 @@ class MockServer { public: MockServer(boost::asio::io_service &main_service, const ObjectManagerConfig &object_manager_config, - std::shared_ptr gcs_client) + std::shared_ptr gcs_client) : node_id_(ClientID::FromRandom()), config_(object_manager_config), gcs_client_(gcs_client), @@ -62,7 +62,7 @@ class MockServer { ClientID node_id_; ObjectManagerConfig config_; - std::shared_ptr gcs_client_; + std::shared_ptr gcs_client_; ObjectManager object_manager_; }; @@ -106,8 +106,7 @@ class TestObjectManagerBase : public ::testing::Test { // start first server gcs::GcsClientOptions client_options("127.0.0.1", 6379, /*password*/ "", /*is_test_client=*/true); - gcs_client_1 = - std::shared_ptr(new gcs::RedisGcsClient(client_options)); + gcs_client_1 = std::make_shared(client_options); RAY_CHECK_OK(gcs_client_1->Connect(main_service)); ObjectManagerConfig om_config_1; om_config_1.store_socket_name = store_id_1; @@ -119,8 +118,7 @@ class TestObjectManagerBase : public ::testing::Test { server1.reset(new MockServer(main_service, om_config_1, gcs_client_1)); // start second server - gcs_client_2 = - std::shared_ptr(new gcs::RedisGcsClient(client_options)); + gcs_client_2 = std::make_shared(client_options); RAY_CHECK_OK(gcs_client_2->Connect(main_service)); ObjectManagerConfig om_config_2; om_config_2.store_socket_name = store_id_2; @@ -170,8 +168,8 @@ class TestObjectManagerBase : public ::testing::Test { protected: std::thread p; boost::asio::io_service main_service; - std::shared_ptr gcs_client_1; - std::shared_ptr gcs_client_2; + std::shared_ptr gcs_client_1; + std::shared_ptr gcs_client_2; std::unique_ptr server1; std::unique_ptr server2; diff --git a/src/ray/object_manager/test/object_manager_test.cc b/src/ray/object_manager/test/object_manager_test.cc index f552a2d1cc29..19a5f07d5448 100644 --- a/src/ray/object_manager/test/object_manager_test.cc +++ b/src/ray/object_manager/test/object_manager_test.cc @@ -28,7 +28,7 @@ class MockServer { public: MockServer(boost::asio::io_service &main_service, const ObjectManagerConfig &object_manager_config, - std::shared_ptr gcs_client) + std::shared_ptr gcs_client) : node_id_(ClientID::FromRandom()), config_(object_manager_config), gcs_client_(gcs_client), @@ -56,7 +56,7 @@ class MockServer { ClientID node_id_; ObjectManagerConfig config_; - std::shared_ptr gcs_client_; + std::shared_ptr gcs_client_; ObjectManager object_manager_; }; @@ -98,8 +98,7 @@ class TestObjectManagerBase : public ::testing::Test { // start first server gcs::GcsClientOptions client_options("127.0.0.1", 6379, /*password*/ "", /*is_test_client=*/true); - gcs_client_1 = - std::shared_ptr(new gcs::RedisGcsClient(client_options)); + gcs_client_1 = std::make_shared(client_options); RAY_CHECK_OK(gcs_client_1->Connect(main_service)); ObjectManagerConfig om_config_1; om_config_1.store_socket_name = store_id_1; @@ -111,8 +110,7 @@ class TestObjectManagerBase : public ::testing::Test { server1.reset(new MockServer(main_service, om_config_1, gcs_client_1)); // start second server - gcs_client_2 = - std::shared_ptr(new gcs::RedisGcsClient(client_options)); + gcs_client_2 = std::make_shared(client_options); RAY_CHECK_OK(gcs_client_2->Connect(main_service)); ObjectManagerConfig om_config_2; om_config_2.store_socket_name = store_id_2; @@ -166,8 +164,8 @@ class TestObjectManagerBase : public ::testing::Test { protected: std::thread p; boost::asio::io_service main_service; - std::shared_ptr gcs_client_1; - std::shared_ptr gcs_client_2; + std::shared_ptr gcs_client_1; + std::shared_ptr gcs_client_2; std::unique_ptr server1; std::unique_ptr server2; diff --git a/src/ray/raylet/lineage_cache.cc b/src/ray/raylet/lineage_cache.cc index 634c526ce7cf..3dc01958c0ec 100644 --- a/src/ray/raylet/lineage_cache.cc +++ b/src/ray/raylet/lineage_cache.cc @@ -153,7 +153,7 @@ const std::unordered_set &Lineage::GetChildren(const TaskID &task_id) co } LineageCache::LineageCache(const ClientID &self_node_id, - std::shared_ptr gcs_client, + std::shared_ptr gcs_client, uint64_t max_lineage_size) : self_node_id_(self_node_id), gcs_client_(gcs_client) {} diff --git a/src/ray/raylet/lineage_cache.h b/src/ray/raylet/lineage_cache.h index abe22c3b4b12..bd3acb48ea8d 100644 --- a/src/ray/raylet/lineage_cache.h +++ b/src/ray/raylet/lineage_cache.h @@ -209,8 +209,7 @@ class LineageCache { public: /// Create a lineage cache for the given task storage system. /// TODO(swang): Pass in the policy (interface?). - LineageCache(const ClientID &self_node_id, - std::shared_ptr gcs_client, + LineageCache(const ClientID &self_node_id, std::shared_ptr gcs_client, uint64_t max_lineage_size); /// Asynchronously commit a task to the GCS. @@ -306,7 +305,7 @@ class LineageCache { /// ID of this node. ClientID self_node_id_; /// A client connection to the GCS. - std::shared_ptr gcs_client_; + std::shared_ptr gcs_client_; /// All tasks and objects that we are responsible for writing back to the /// GCS, and the tasks and objects in their lineage. Lineage lineage_; diff --git a/src/ray/raylet/monitor.cc b/src/ray/raylet/monitor.cc index d6566d3316f3..1b6e58af7114 100644 --- a/src/ray/raylet/monitor.cc +++ b/src/ray/raylet/monitor.cc @@ -16,12 +16,12 @@ namespace raylet { /// within heartbeat_timeout_milliseconds * num_heartbeats_timeout (defined in /// the Ray configuration), then the monitor will mark that Raylet as dead in /// the client table, which broadcasts the event to all other Raylets. -Monitor::Monitor(boost::asio::io_service &io_service, const std::string &redis_address, - int redis_port, const std::string &redis_password) - : gcs_client_(gcs::GcsClientOptions(redis_address, redis_port, redis_password)), +Monitor::Monitor(boost::asio::io_service &io_service, + const gcs::GcsClientOptions &gcs_client_options) + : gcs_client_(new gcs::RedisGcsClient(gcs_client_options)), num_heartbeats_timeout_(RayConfig::instance().num_heartbeats_timeout()), heartbeat_timer_(io_service) { - RAY_CHECK_OK(gcs_client_.Connect(io_service)); + RAY_CHECK_OK(gcs_client_->Connect(io_service)); } void Monitor::HandleHeartbeat(const ClientID &node_id, @@ -35,7 +35,7 @@ void Monitor::Start() { const HeartbeatTableData &heartbeat_data) { HandleHeartbeat(id, heartbeat_data); }; - RAY_CHECK_OK(gcs_client_.Nodes().AsyncSubscribeHeartbeat(heartbeat_callback, nullptr)); + RAY_CHECK_OK(gcs_client_->Nodes().AsyncSubscribeHeartbeat(heartbeat_callback, nullptr)); Tick(); } @@ -59,7 +59,7 @@ void Monitor::Tick() { } if (!marked) { RAY_CHECK_OK( - gcs_client_.Nodes().AsyncUnregister(node_id, /* callback */ nullptr)); + gcs_client_->Nodes().AsyncUnregister(node_id, /* callback */ nullptr)); // Broadcast a warning to all of the drivers indicating that the node // has been marked as dead. // TODO(rkn): Define this constant somewhere else. @@ -71,10 +71,10 @@ void Monitor::Tick() { auto error_data_ptr = gcs::CreateErrorTableData(type, error_message.str(), current_time_ms()); RAY_CHECK_OK( - gcs_client_.Errors().AsyncReportJobError(error_data_ptr, nullptr)); + gcs_client_->Errors().AsyncReportJobError(error_data_ptr, nullptr)); } }; - RAY_CHECK_OK(gcs_client_.Nodes().AsyncGetAll(lookup_callback)); + RAY_CHECK_OK(gcs_client_->Nodes().AsyncGetAll(lookup_callback)); dead_nodes_.insert(node_id); } it = heartbeats_.erase(it); @@ -89,7 +89,7 @@ void Monitor::Tick() { for (const auto &heartbeat : heartbeat_buffer_) { batch->add_batch()->CopyFrom(heartbeat.second); } - RAY_CHECK_OK(gcs_client_.Nodes().AsyncReportBatchHeartbeat(batch, nullptr)); + RAY_CHECK_OK(gcs_client_->Nodes().AsyncReportBatchHeartbeat(batch, nullptr)); heartbeat_buffer_.clear(); } diff --git a/src/ray/raylet/monitor.h b/src/ray/raylet/monitor.h index 4d8fba55794e..1c4f889296b1 100644 --- a/src/ray/raylet/monitor.h +++ b/src/ray/raylet/monitor.h @@ -22,8 +22,8 @@ class Monitor { /// \param io_service The event loop to run the monitor on. /// \param redis_address The GCS Redis address to connect to. /// \param redis_port The GCS Redis port to connect to. - Monitor(boost::asio::io_service &io_service, const std::string &redis_address, - int redis_port, const std::string &redis_password); + Monitor(boost::asio::io_service &io_service, + const gcs::GcsClientOptions &gcs_client_options); /// Start the monitor. Listen for heartbeats from Raylets and mark Raylets /// that do not send a heartbeat within a given period as dead. @@ -43,7 +43,7 @@ class Monitor { private: /// A client to the GCS, through which heartbeats are received. - gcs::RedisGcsClient gcs_client_; + std::unique_ptr gcs_client_; /// The number of heartbeats that can be missed before a client is removed. int64_t num_heartbeats_timeout_; /// A timer that ticks every heartbeat_timeout_ms_ milliseconds. diff --git a/src/ray/raylet/monitor_main.cc b/src/ray/raylet/monitor_main.cc index 6f3c57136ca7..7bc29e682e3e 100644 --- a/src/ray/raylet/monitor_main.cc +++ b/src/ray/raylet/monitor_main.cc @@ -24,6 +24,9 @@ int main(int argc, char *argv[]) { const std::string redis_password = FLAGS_redis_password; gflags::ShutDownCommandLineFlags(); + ray::gcs::GcsClientOptions gcs_client_options(redis_address, redis_port, + redis_password); + std::unordered_map raylet_config; // Parse the configuration list. @@ -52,7 +55,7 @@ int main(int argc, char *argv[]) { // signals.async_wait(handler); // Initialize the monitor. - ray::raylet::Monitor monitor(io_service, redis_address, redis_port, redis_password); + ray::raylet::Monitor monitor(io_service, gcs_client_options); monitor.Start(); io_service.run(); } diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 399154641266..d5e8605a1ea8 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -73,7 +73,7 @@ namespace raylet { NodeManager::NodeManager(boost::asio::io_service &io_service, const ClientID &self_node_id, const NodeManagerConfig &config, ObjectManager &object_manager, - std::shared_ptr gcs_client, + std::shared_ptr gcs_client, std::shared_ptr object_directory) : self_node_id_(self_node_id), io_service_(io_service), diff --git a/src/ray/raylet/node_manager.h b/src/ray/raylet/node_manager.h index 141217f10531..e360b7460250 100644 --- a/src/ray/raylet/node_manager.h +++ b/src/ray/raylet/node_manager.h @@ -77,7 +77,7 @@ class NodeManager : public rpc::NodeManagerServiceHandler { /// \param object_manager A reference to the local object manager. NodeManager(boost::asio::io_service &io_service, const ClientID &self_node_id, const NodeManagerConfig &config, ObjectManager &object_manager, - std::shared_ptr gcs_client, + std::shared_ptr gcs_client, std::shared_ptr object_directory_); /// Process a new client connection. @@ -587,7 +587,7 @@ class NodeManager : public rpc::NodeManagerServiceHandler { /// actor died) and to pin objects that are in scope in the cluster. plasma::PlasmaClient store_client_; /// A client connection to the GCS. - std::shared_ptr gcs_client_; + std::shared_ptr gcs_client_; /// The object table. This is shared with the object manager. std::shared_ptr object_directory_; /// The timer used to send heartbeats. diff --git a/src/ray/raylet/raylet.cc b/src/ray/raylet/raylet.cc index 936b69fd02b7..cd67c8ce4c3c 100644 --- a/src/ray/raylet/raylet.cc +++ b/src/ray/raylet/raylet.cc @@ -44,7 +44,7 @@ Raylet::Raylet(boost::asio::io_service &main_service, const std::string &socket_ int redis_port, const std::string &redis_password, const NodeManagerConfig &node_manager_config, const ObjectManagerConfig &object_manager_config, - std::shared_ptr gcs_client) + std::shared_ptr gcs_client) : self_node_id_(ClientID::FromRandom()), gcs_client_(gcs_client), object_directory_(std::make_shared(main_service, gcs_client_)), diff --git a/src/ray/raylet/raylet.h b/src/ray/raylet/raylet.h index 304c4f04939a..c66241f3f776 100644 --- a/src/ray/raylet/raylet.h +++ b/src/ray/raylet/raylet.h @@ -41,7 +41,7 @@ class Raylet { int redis_port, const std::string &redis_password, const NodeManagerConfig &node_manager_config, const ObjectManagerConfig &object_manager_config, - std::shared_ptr gcs_client); + std::shared_ptr gcs_client); /// Start this raylet. void Start(); @@ -69,7 +69,7 @@ class Raylet { GcsNodeInfo self_node_info_; /// A client connection to the GCS. - std::shared_ptr gcs_client_; + std::shared_ptr gcs_client_; /// The object table. This is shared between the object manager and node /// manager. std::shared_ptr object_directory_; diff --git a/src/ray/raylet/reconstruction_policy.cc b/src/ray/raylet/reconstruction_policy.cc index d53331cfac33..29f9b475896e 100644 --- a/src/ray/raylet/reconstruction_policy.cc +++ b/src/ray/raylet/reconstruction_policy.cc @@ -10,7 +10,7 @@ ReconstructionPolicy::ReconstructionPolicy( boost::asio::io_service &io_service, std::function reconstruction_handler, int64_t initial_reconstruction_timeout_ms, const ClientID &client_id, - std::shared_ptr gcs_client, + std::shared_ptr gcs_client, std::shared_ptr object_directory) : io_service_(io_service), reconstruction_handler_(reconstruction_handler), diff --git a/src/ray/raylet/reconstruction_policy.h b/src/ray/raylet/reconstruction_policy.h index 5b6438365c4e..e047eef7cf92 100644 --- a/src/ray/raylet/reconstruction_policy.h +++ b/src/ray/raylet/reconstruction_policy.h @@ -43,7 +43,7 @@ class ReconstructionPolicy : public ReconstructionPolicyInterface { boost::asio::io_service &io_service, std::function reconstruction_handler, int64_t initial_reconstruction_timeout_ms, const ClientID &client_id, - std::shared_ptr gcs_client, + std::shared_ptr gcs_client, std::shared_ptr object_directory); /// Listen for task lease notifications about an object that may require @@ -142,7 +142,7 @@ class ReconstructionPolicy : public ReconstructionPolicyInterface { /// The client ID to use when requesting notifications from the GCS. const ClientID client_id_; /// A client connection to the GCS. - std::shared_ptr gcs_client_; + std::shared_ptr gcs_client_; /// The object directory used to lookup object locations. std::shared_ptr object_directory_; /// The tasks that we are currently subscribed to in the GCS. diff --git a/src/ray/raylet/task_dependency_manager.cc b/src/ray/raylet/task_dependency_manager.cc index b6ea1575ef65..fe074b542953 100644 --- a/src/ray/raylet/task_dependency_manager.cc +++ b/src/ray/raylet/task_dependency_manager.cc @@ -12,7 +12,7 @@ TaskDependencyManager::TaskDependencyManager( ObjectManagerInterface &object_manager, ReconstructionPolicyInterface &reconstruction_policy, boost::asio::io_service &io_service, const ClientID &client_id, - int64_t initial_lease_period_ms, std::shared_ptr gcs_client) + int64_t initial_lease_period_ms, std::shared_ptr gcs_client) : object_manager_(object_manager), reconstruction_policy_(reconstruction_policy), io_service_(io_service), diff --git a/src/ray/raylet/task_dependency_manager.h b/src/ray/raylet/task_dependency_manager.h index 9403ba64e7d4..36a53a666355 100644 --- a/src/ray/raylet/task_dependency_manager.h +++ b/src/ray/raylet/task_dependency_manager.h @@ -34,7 +34,7 @@ class TaskDependencyManager { ReconstructionPolicyInterface &reconstruction_policy, boost::asio::io_service &io_service, const ClientID &client_id, int64_t initial_lease_period_ms, - std::shared_ptr gcs_client); + std::shared_ptr gcs_client); /// Check whether an object is locally available. /// @@ -228,7 +228,7 @@ class TaskDependencyManager { /// lease is renewed. const int64_t initial_lease_period_ms_; /// A client connection to the GCS. - std::shared_ptr gcs_client_; + std::shared_ptr gcs_client_; /// A mapping from task ID of each subscribed task to its list of object /// dependencies, either task arguments or objects passed into `ray.get`. std::unordered_map task_dependencies_; diff --git a/src/ray/raylet/worker_pool.cc b/src/ray/raylet/worker_pool.cc index 9fb922959537..be4c33b5a221 100644 --- a/src/ray/raylet/worker_pool.cc +++ b/src/ray/raylet/worker_pool.cc @@ -48,7 +48,7 @@ namespace raylet { /// each language. WorkerPool::WorkerPool(boost::asio::io_service &io_service, int num_workers, int maximum_startup_concurrency, - std::shared_ptr gcs_client, + std::shared_ptr gcs_client, const WorkerCommandMap &worker_commands) : io_service_(&io_service), maximum_startup_concurrency_(maximum_startup_concurrency), diff --git a/src/ray/raylet/worker_pool.h b/src/ray/raylet/worker_pool.h index 267c88647a33..aa749d035d16 100644 --- a/src/ray/raylet/worker_pool.h +++ b/src/ray/raylet/worker_pool.h @@ -44,8 +44,7 @@ class WorkerPool { /// \param worker_commands The commands used to start the worker process, grouped by /// language. WorkerPool(boost::asio::io_service &io_service, int num_workers, - int maximum_startup_concurrency, - std::shared_ptr gcs_client, + int maximum_startup_concurrency, std::shared_ptr gcs_client, const WorkerCommandMap &worker_commands); /// Destructor responsible for freeing a set of workers owned by this class. @@ -228,7 +227,7 @@ class WorkerPool { /// The maximum number of worker processes that can be started concurrently. int maximum_startup_concurrency_; /// A client connection to the GCS. - std::shared_ptr gcs_client_; + std::shared_ptr gcs_client_; FRIEND_TEST(WorkerPoolTest, InitialWorkerProcessCount); };