From cbf3181fd247e9c84e860c251dbb6b2a536165bf Mon Sep 17 00:00:00 2001 From: Stephanie Wang Date: Thu, 5 Apr 2018 20:45:38 -0700 Subject: [PATCH] [xray] Monitor for Raylet processes (#1831) * Add raylet monitor script to timeout Raylet heartbeats * Unit test for removing a different client from the client table * Set node manager heartbeat according to global config * Doc and fixes * Add regression test for client table disconnect, refactor client table * Fix linting. --- python/ray/services.py | 36 +++++++++++++++++-- python/setup.py | 1 + src/ray/CMakeLists.txt | 1 + src/ray/gcs/client_test.cc | 63 ++++++++++++++++++++++++++++++-- src/ray/gcs/tables.cc | 50 ++++++++++++++------------ src/ray/gcs/tables.h | 7 ++++ src/ray/raylet/CMakeLists.txt | 3 ++ src/ray/raylet/main.cc | 4 +-- src/ray/raylet/monitor.cc | 66 ++++++++++++++++++++++++++++++++++ src/ray/raylet/monitor.h | 56 +++++++++++++++++++++++++++++ src/ray/raylet/monitor_main.cc | 16 +++++++++ 11 files changed, 274 insertions(+), 29 deletions(-) create mode 100644 src/ray/raylet/monitor.cc create mode 100644 src/ray/raylet/monitor.h create mode 100644 src/ray/raylet/monitor_main.cc diff --git a/python/ray/services.py b/python/ray/services.py index ab1ce70e92461..f9773a24e551f 100644 --- a/python/ray/services.py +++ b/python/ray/services.py @@ -77,7 +77,10 @@ os.path.abspath(os.path.dirname(__file__)), "core/src/credis/build/src/libmember.so") -# Location of the raylet executable. +# Location of the raylet executables. +RAYLET_MONITOR_EXECUTABLE = os.path.join( + os.path.abspath(os.path.dirname(__file__)), + "core/src/ray/raylet/raylet_monitor") RAYLET_EXECUTABLE = os.path.join( os.path.abspath(os.path.dirname(__file__)), "core/src/ray/raylet/raylet") @@ -1112,11 +1115,35 @@ def start_monitor(redis_address, node_ip_address, stdout_file=None, command.append("--autoscaling-config=" + str(autoscaling_config)) p = subprocess.Popen(command, stdout=stdout_file, stderr=stderr_file) if cleanup: - all_processes[PROCESS_TYPE_WORKER].append(p) + all_processes[PROCESS_TYPE_MONITOR].append(p) record_log_files_in_redis(redis_address, node_ip_address, [stdout_file, stderr_file]) +def start_raylet_monitor(redis_address, stdout_file=None, + stderr_file=None, cleanup=True): + """Run a process to monitor the other processes. + + Args: + redis_address (str): The address that the Redis server is listening on. + stdout_file: A file handle opened for writing to redirect stdout to. If + no redirection should happen, then this should be None. + stderr_file: A file handle opened for writing to redirect stderr to. If + no redirection should happen, then this should be None. + cleanup (bool): True if using Ray in local mode. If cleanup is true, + then this process will be killed by services.cleanup() when the + Python process that imported services exits. This is True by + default. + """ + gcs_ip_address, gcs_port = redis_address.split(":") + command = [RAYLET_MONITOR_EXECUTABLE, + gcs_ip_address, + gcs_port] + p = subprocess.Popen(command, stdout=stdout_file, stderr=stderr_file) + if cleanup: + all_processes[PROCESS_TYPE_MONITOR].append(p) + + def start_ray_processes(address_info=None, node_ip_address="127.0.0.1", redis_port=None, @@ -1253,6 +1280,11 @@ def start_ray_processes(address_info=None, stderr_file=monitor_stderr_file, cleanup=cleanup, autoscaling_config=autoscaling_config) + if use_raylet: + start_raylet_monitor(redis_address, + stdout_file=monitor_stdout_file, + stderr_file=monitor_stderr_file, + cleanup=cleanup) if redis_shards == []: # Get redis shards from primary redis instance. diff --git a/python/setup.py b/python/setup.py index 2dbbffc22fd48..ff736dacd4336 100644 --- a/python/setup.py +++ b/python/setup.py @@ -23,6 +23,7 @@ "ray/core/src/local_scheduler/local_scheduler", "ray/core/src/local_scheduler/liblocal_scheduler_library.so", "ray/core/src/global_scheduler/global_scheduler", + "ray/core/src/ray/raylet/raylet_monitor", "ray/core/src/ray/raylet/raylet", "ray/WebUI.ipynb" ] diff --git a/src/ray/CMakeLists.txt b/src/ray/CMakeLists.txt index 513832dc5963b..a0a34f61e3208 100644 --- a/src/ray/CMakeLists.txt +++ b/src/ray/CMakeLists.txt @@ -44,6 +44,7 @@ set(RAY_SRCS object_manager/object_directory.cc object_manager/transfer_queue.cc object_manager/object_manager.cc + raylet/monitor.cc raylet/mock_gcs_client.cc raylet/task.cc raylet/task_execution_spec.cc diff --git a/src/ray/gcs/client_test.cc b/src/ray/gcs/client_test.cc index fd913fa00b0a2..679177e45c0a4 100644 --- a/src/ray/gcs/client_test.cc +++ b/src/ray/gcs/client_test.cc @@ -781,20 +781,22 @@ void TestClientTableDisconnect(const JobID &job_id, client->client_table().RegisterClientAddedCallback( [](gcs::AsyncGcsClient *client, const UniqueID &id, const ClientTableDataT &data) { ClientTableNotification(client, id, data, true); + // Disconnect from the client table. We should receive a notification + // for the removal of our own entry. + RAY_CHECK_OK(client->client_table().Disconnect()); }); client->client_table().RegisterClientRemovedCallback( [](gcs::AsyncGcsClient *client, const UniqueID &id, const ClientTableDataT &data) { ClientTableNotification(client, id, data, false); test->Stop(); }); - // Connect and disconnect to client table. We should receive notifications - // for the addition and removal of our own entry. + // Connect to the client table. We should receive notification for the + // addition of our own entry. ClientTableDataT local_client_info = client->client_table().GetLocalClient(); local_client_info.node_manager_address = "127.0.0.1"; local_client_info.node_manager_port = 0; local_client_info.object_manager_port = 0; RAY_CHECK_OK(client->client_table().Connect(local_client_info)); - RAY_CHECK_OK(client->client_table().Disconnect()); test->Start(); } @@ -803,4 +805,59 @@ TEST_F(TestGcsWithAsio, TestClientTableDisconnect) { TestClientTableDisconnect(job_id_, client_); } +void TestClientTableImmediateDisconnect(const JobID &job_id, + std::shared_ptr client) { + // Register callbacks for when a client gets added and removed. The latter + // event will stop the event loop. + client->client_table().RegisterClientAddedCallback( + [](gcs::AsyncGcsClient *client, const UniqueID &id, const ClientTableDataT &data) { + ClientTableNotification(client, id, data, true); + }); + client->client_table().RegisterClientRemovedCallback( + [](gcs::AsyncGcsClient *client, const UniqueID &id, const ClientTableDataT &data) { + ClientTableNotification(client, id, data, false); + test->Stop(); + }); + // Connect to then immediately disconnect from the client table. We should + // receive notifications for the addition and removal of our own entry. + ClientTableDataT local_client_info = client->client_table().GetLocalClient(); + local_client_info.node_manager_address = "127.0.0.1"; + local_client_info.node_manager_port = 0; + local_client_info.object_manager_port = 0; + RAY_CHECK_OK(client->client_table().Connect(local_client_info)); + RAY_CHECK_OK(client->client_table().Disconnect()); + test->Start(); +} + +TEST_F(TestGcsWithAsio, TestClientTableImmediateDisconnect) { + test = this; + TestClientTableImmediateDisconnect(job_id_, client_); +} + +void TestClientTableMarkDisconnected(const JobID &job_id, + std::shared_ptr client) { + ClientTableDataT local_client_info = client->client_table().GetLocalClient(); + local_client_info.node_manager_address = "127.0.0.1"; + local_client_info.node_manager_port = 0; + local_client_info.object_manager_port = 0; + // Connect to the client table to start receiving notifications. + RAY_CHECK_OK(client->client_table().Connect(local_client_info)); + // Mark a different client as dead. + ClientID dead_client_id = ClientID::from_random(); + RAY_CHECK_OK(client->client_table().MarkDisconnected(dead_client_id)); + // Make sure we only get a notification for the removal of the client we + // marked as dead. + client->client_table().RegisterClientRemovedCallback([dead_client_id]( + gcs::AsyncGcsClient *client, const UniqueID &id, const ClientTableDataT &data) { + ASSERT_EQ(ClientID::from_binary(data.client_id), dead_client_id); + test->Stop(); + }); + test->Start(); +} + +TEST_F(TestGcsWithAsio, TestClientTableMarkDisconnected) { + test = this; + TestClientTableMarkDisconnected(job_id_, client_); +} + } // namespace diff --git a/src/ray/gcs/tables.cc b/src/ray/gcs/tables.cc index 53dbc2b2cab6d..293608ac6f5f6 100644 --- a/src/ray/gcs/tables.cc +++ b/src/ray/gcs/tables.cc @@ -276,41 +276,40 @@ Status ClientTable::Connect(const ClientTableDataT &local_client) { RAY_CHECK(local_client.client_id == local_client_.client_id); local_client_ = local_client; + // Construct the data to add to the client table. auto data = std::make_shared(local_client_); data->is_insertion = true; - // Callback for a notification from the client table. - auto notification_callback = [this]( - AsyncGcsClient *client, const UniqueID &log_key, - const std::vector ¬ifications) { - RAY_CHECK(log_key == client_log_key_); - for (auto ¬ification : notifications) { - HandleNotification(client, notification); - } - }; // Callback to handle our own successful connection once we've added // ourselves. auto add_callback = [this](AsyncGcsClient *client, const UniqueID &log_key, std::shared_ptr data) { RAY_CHECK(log_key == client_log_key_); HandleConnected(client, data); + + // Callback for a notification from the client table. + auto notification_callback = [this]( + AsyncGcsClient *client, const UniqueID &log_key, + const std::vector ¬ifications) { + RAY_CHECK(log_key == client_log_key_); + for (auto ¬ification : notifications) { + HandleNotification(client, notification); + } + }; + // Callback to request notifications from the client table once we've + // successfully subscribed. + auto subscription_callback = [this](AsyncGcsClient *c) { + RAY_CHECK_OK(RequestNotifications(JobID::nil(), client_log_key_, client_id_)); + }; + // Subscribe to the client table. + RAY_CHECK_OK(Subscribe(JobID::nil(), client_id_, notification_callback, + subscription_callback)); }; - // Callback to add ourselves once we've successfully subscribed. - auto subscription_callback = [this, data, add_callback](AsyncGcsClient *c) { - // Mark ourselves as deleted if we called Disconnect() since the last - // Connect() call. - if (disconnected_) { - data->is_insertion = false; - } - RAY_CHECK_OK(RequestNotifications(JobID::nil(), client_log_key_, client_id_)); - RAY_CHECK_OK(Append(JobID::nil(), client_log_key_, data, add_callback)); - }; - return Subscribe(JobID::nil(), client_id_, notification_callback, - subscription_callback); + return Append(JobID::nil(), client_log_key_, data, add_callback); } Status ClientTable::Disconnect() { auto data = std::make_shared(local_client_); - data->is_insertion = true; + data->is_insertion = false; auto add_callback = [this](AsyncGcsClient *client, const ClientID &id, std::shared_ptr data) { HandleConnected(client, data); @@ -322,6 +321,13 @@ Status ClientTable::Disconnect() { return Status::OK(); } +ray::Status ClientTable::MarkDisconnected(const ClientID &dead_client_id) { + auto data = std::make_shared(); + data->client_id = dead_client_id.binary(); + data->is_insertion = false; + return Append(JobID::nil(), client_log_key_, data, nullptr); +} + const ClientTableDataT &ClientTable::GetClient(const ClientID &client_id) { RAY_CHECK(!client_id.is_nil()); auto entry = client_cache_.find(client_id); diff --git a/src/ray/gcs/tables.h b/src/ray/gcs/tables.h index 533c258f31bda..e2053b2c9e674 100644 --- a/src/ray/gcs/tables.h +++ b/src/ray/gcs/tables.h @@ -435,6 +435,13 @@ class ClientTable : private Log { /// \return Status ray::Status Disconnect(); + /// Mark a different client as disconnected. The client ID should never be + /// reused for a new client. + /// + /// \param dead_client_id The ID of the client to mark as dead. + /// \return Status + ray::Status MarkDisconnected(const ClientID &dead_client_id); + /// Register a callback to call when a new client is added. /// /// \param callback The callback to register. diff --git a/src/ray/raylet/CMakeLists.txt b/src/ray/raylet/CMakeLists.txt index c0580552e6792..37f61b7c062ff 100644 --- a/src/ray/raylet/CMakeLists.txt +++ b/src/ray/raylet/CMakeLists.txt @@ -32,6 +32,9 @@ target_link_libraries(rayletlib ray_static ${Boost_SYSTEM_LIBRARY}) add_executable(raylet main.cc) target_link_libraries(raylet rayletlib ${Boost_SYSTEM_LIBRARY} pthread) +add_executable(raylet_monitor monitor_main.cc) +target_link_libraries(raylet_monitor rayletlib ${Boost_SYSTEM_LIBRARY} pthread) + install(FILES raylet DESTINATION "${CMAKE_INSTALL_INCLUDEDIR}/ray/raylet") diff --git a/src/ray/raylet/main.cc b/src/ray/raylet/main.cc index dd219f26a3861..6f01af56121e8 100644 --- a/src/ray/raylet/main.cc +++ b/src/ray/raylet/main.cc @@ -29,8 +29,8 @@ int main(int argc, char *argv[]) { node_manager_config.worker_command.push_back(token); } - // TODO(swang): Set this from a global config. - node_manager_config.heartbeat_period_ms = 100; + node_manager_config.heartbeat_period_ms = + RayConfig::instance().heartbeat_timeout_milliseconds(); // Configuration for the object manager. ray::ObjectManagerConfig object_manager_config; diff --git a/src/ray/raylet/monitor.cc b/src/ray/raylet/monitor.cc new file mode 100644 index 0000000000000..fe570564ac16c --- /dev/null +++ b/src/ray/raylet/monitor.cc @@ -0,0 +1,66 @@ +#include "ray/raylet/monitor.h" + +#include "ray/status.h" + +namespace ray { + +namespace raylet { + +/// \class Monitor +/// +/// The monitor is responsible for listening for heartbeats from Raylets and +/// deciding when a Raylet has died. If the monitor does not hear from a Raylet +/// within heartbeat_timeout_milliseconds * num_heartbeats_timeout (defined in +/// the Ray configuration), then the monitor will mark that Raylet as dead in +/// the client table, which broadcasts the event to all other Raylets. +Monitor::Monitor(boost::asio::io_service &io_service, const std::string &redis_address, + int redis_port) + : gcs_client_(), + heartbeat_timeout_ms_(RayConfig::instance().num_heartbeats_timeout()), + heartbeat_timer_(io_service) { + RAY_CHECK_OK(gcs_client_.Connect(redis_address, redis_port)); + RAY_CHECK_OK(gcs_client_.Attach(io_service)); +} + +void Monitor::HandleHeartbeat(const ClientID &client_id) { + heartbeats_[client_id] = heartbeat_timeout_ms_; +} + +void Monitor::Start() { + const auto heartbeat_callback = [this](gcs::AsyncGcsClient *client, const ClientID &id, + const HeartbeatTableDataT &heartbeat_data) { + HandleHeartbeat(id); + }; + RAY_CHECK_OK(gcs_client_.heartbeat_table().Subscribe(UniqueID::nil(), UniqueID::nil(), + heartbeat_callback, nullptr)); + Tick(); +} + +/// A periodic timer that checks for timed out clients. +void Monitor::Tick() { + for (auto it = heartbeats_.begin(); it != heartbeats_.end();) { + it->second--; + if (it->second == 0) { + if (dead_clients_.count(it->first) == 0) { + RAY_LOG(WARNING) << "Client timed out: " << it->first; + RAY_CHECK_OK(gcs_client_.client_table().MarkDisconnected(it->first)); + dead_clients_.insert(it->first); + } + it = heartbeats_.erase(it); + } else { + it++; + } + } + + auto heartbeat_period = boost::posix_time::milliseconds( + RayConfig::instance().heartbeat_timeout_milliseconds()); + heartbeat_timer_.expires_from_now(heartbeat_period); + heartbeat_timer_.async_wait([this](const boost::system::error_code &error) { + RAY_CHECK(!error); + Tick(); + }); +} + +} // namespace raylet + +} // namespace ray diff --git a/src/ray/raylet/monitor.h b/src/ray/raylet/monitor.h new file mode 100644 index 0000000000000..3b383160e101b --- /dev/null +++ b/src/ray/raylet/monitor.h @@ -0,0 +1,56 @@ +#ifndef RAY_RAYLET_MONITOR_H +#define RAY_RAYLET_MONITOR_H + +#include +#include + +#include "ray/gcs/client.h" +#include "ray/id.h" + +namespace ray { + +namespace raylet { + +class Monitor { + public: + /// Create a Raylet monitor attached to the given GCS address and port. + /// + /// \param io_service The event loop to run the monitor on. + /// \param redis_address The GCS Redis address to connect to. + /// \param redis_port The GCS Redis port to connect to. + Monitor(boost::asio::io_service &io_service, const std::string &redis_address, + int redis_port); + + /// Start the monitor. Listen for heartbeats from Raylets and mark Raylets + /// that do not send a heartbeat within a given period as dead. + void Start(); + + /// A periodic timer that fires on every heartbeat period. Raylets that have + /// not sent a heartbeat within the last num_heartbeats_timeout ticks will be + /// marked as dead in the client table. + void Tick(); + + /// Handle a heartbeat from a Raylet. + /// + /// \param client_id The client ID of the Raylet that sent the heartbeat. + void HandleHeartbeat(const ClientID &client_id); + + private: + /// A client to the GCS, through which heartbeats are received. + gcs::AsyncGcsClient gcs_client_; + /// The expected period between heartbeats, for an individual Raylet. + int64_t heartbeat_timeout_ms_; + /// A timer that ticks every heartbeat_timeout_ms_ milliseconds. + boost::asio::deadline_timer heartbeat_timer_; + /// For each Raylet that we receive a heartbeat from, the number of ticks + /// that may pass before the Raylet will be declared dead. + std::unordered_map heartbeats_; + /// The Raylets that have been marked as dead in the client table. + std::unordered_set dead_clients_; +}; + +} // namespace raylet + +} // namespace ray + +#endif // RAY_RAYLET_MONITOR_H diff --git a/src/ray/raylet/monitor_main.cc b/src/ray/raylet/monitor_main.cc new file mode 100644 index 0000000000000..c48588fbf52b7 --- /dev/null +++ b/src/ray/raylet/monitor_main.cc @@ -0,0 +1,16 @@ +#include + +#include "ray/raylet/monitor.h" + +int main(int argc, char *argv[]) { + RAY_CHECK(argc == 3); + + const std::string redis_address = std::string(argv[1]); + int redis_port = std::stoi(argv[2]); + + // Initialize the monitor. + boost::asio::io_service io_service; + ray::raylet::Monitor monitor(io_service, redis_address, redis_port); + monitor.Start(); + io_service.run(); +}