Skip to content

Commit

Permalink
[xray] Monitor for Raylet processes (ray-project#1831)
Browse files Browse the repository at this point in the history
* Add raylet monitor script to timeout Raylet heartbeats

* Unit test for removing a different client from the client table

* Set node manager heartbeat according to global config

* Doc and fixes

* Add regression test for client table disconnect, refactor client table

* Fix linting.
  • Loading branch information
stephanie-wang authored and robertnishihara committed Apr 6, 2018
1 parent 0d9a7a3 commit cbf3181
Show file tree
Hide file tree
Showing 11 changed files with 274 additions and 29 deletions.
36 changes: 34 additions & 2 deletions python/ray/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,10 @@
os.path.abspath(os.path.dirname(__file__)),
"core/src/credis/build/src/libmember.so")

# Location of the raylet executable.
# Location of the raylet executables.
RAYLET_MONITOR_EXECUTABLE = os.path.join(
os.path.abspath(os.path.dirname(__file__)),
"core/src/ray/raylet/raylet_monitor")
RAYLET_EXECUTABLE = os.path.join(
os.path.abspath(os.path.dirname(__file__)),
"core/src/ray/raylet/raylet")
Expand Down Expand Up @@ -1112,11 +1115,35 @@ def start_monitor(redis_address, node_ip_address, stdout_file=None,
command.append("--autoscaling-config=" + str(autoscaling_config))
p = subprocess.Popen(command, stdout=stdout_file, stderr=stderr_file)
if cleanup:
all_processes[PROCESS_TYPE_WORKER].append(p)
all_processes[PROCESS_TYPE_MONITOR].append(p)
record_log_files_in_redis(redis_address, node_ip_address,
[stdout_file, stderr_file])


def start_raylet_monitor(redis_address, stdout_file=None,
stderr_file=None, cleanup=True):
"""Run a process to monitor the other processes.
Args:
redis_address (str): The address that the Redis server is listening on.
stdout_file: A file handle opened for writing to redirect stdout to. If
no redirection should happen, then this should be None.
stderr_file: A file handle opened for writing to redirect stderr to. If
no redirection should happen, then this should be None.
cleanup (bool): True if using Ray in local mode. If cleanup is true,
then this process will be killed by services.cleanup() when the
Python process that imported services exits. This is True by
default.
"""
gcs_ip_address, gcs_port = redis_address.split(":")
command = [RAYLET_MONITOR_EXECUTABLE,
gcs_ip_address,
gcs_port]
p = subprocess.Popen(command, stdout=stdout_file, stderr=stderr_file)
if cleanup:
all_processes[PROCESS_TYPE_MONITOR].append(p)


def start_ray_processes(address_info=None,
node_ip_address="127.0.0.1",
redis_port=None,
Expand Down Expand Up @@ -1253,6 +1280,11 @@ def start_ray_processes(address_info=None,
stderr_file=monitor_stderr_file,
cleanup=cleanup,
autoscaling_config=autoscaling_config)
if use_raylet:
start_raylet_monitor(redis_address,
stdout_file=monitor_stdout_file,
stderr_file=monitor_stderr_file,
cleanup=cleanup)

if redis_shards == []:
# Get redis shards from primary redis instance.
Expand Down
1 change: 1 addition & 0 deletions python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
"ray/core/src/local_scheduler/local_scheduler",
"ray/core/src/local_scheduler/liblocal_scheduler_library.so",
"ray/core/src/global_scheduler/global_scheduler",
"ray/core/src/ray/raylet/raylet_monitor",
"ray/core/src/ray/raylet/raylet",
"ray/WebUI.ipynb"
]
Expand Down
1 change: 1 addition & 0 deletions src/ray/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ set(RAY_SRCS
object_manager/object_directory.cc
object_manager/transfer_queue.cc
object_manager/object_manager.cc
raylet/monitor.cc
raylet/mock_gcs_client.cc
raylet/task.cc
raylet/task_execution_spec.cc
Expand Down
63 changes: 60 additions & 3 deletions src/ray/gcs/client_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -781,20 +781,22 @@ void TestClientTableDisconnect(const JobID &job_id,
client->client_table().RegisterClientAddedCallback(
[](gcs::AsyncGcsClient *client, const UniqueID &id, const ClientTableDataT &data) {
ClientTableNotification(client, id, data, true);
// Disconnect from the client table. We should receive a notification
// for the removal of our own entry.
RAY_CHECK_OK(client->client_table().Disconnect());
});
client->client_table().RegisterClientRemovedCallback(
[](gcs::AsyncGcsClient *client, const UniqueID &id, const ClientTableDataT &data) {
ClientTableNotification(client, id, data, false);
test->Stop();
});
// Connect and disconnect to client table. We should receive notifications
// for the addition and removal of our own entry.
// Connect to the client table. We should receive notification for the
// addition of our own entry.
ClientTableDataT local_client_info = client->client_table().GetLocalClient();
local_client_info.node_manager_address = "127.0.0.1";
local_client_info.node_manager_port = 0;
local_client_info.object_manager_port = 0;
RAY_CHECK_OK(client->client_table().Connect(local_client_info));
RAY_CHECK_OK(client->client_table().Disconnect());
test->Start();
}

Expand All @@ -803,4 +805,59 @@ TEST_F(TestGcsWithAsio, TestClientTableDisconnect) {
TestClientTableDisconnect(job_id_, client_);
}

void TestClientTableImmediateDisconnect(const JobID &job_id,
std::shared_ptr<gcs::AsyncGcsClient> client) {
// Register callbacks for when a client gets added and removed. The latter
// event will stop the event loop.
client->client_table().RegisterClientAddedCallback(
[](gcs::AsyncGcsClient *client, const UniqueID &id, const ClientTableDataT &data) {
ClientTableNotification(client, id, data, true);
});
client->client_table().RegisterClientRemovedCallback(
[](gcs::AsyncGcsClient *client, const UniqueID &id, const ClientTableDataT &data) {
ClientTableNotification(client, id, data, false);
test->Stop();
});
// Connect to then immediately disconnect from the client table. We should
// receive notifications for the addition and removal of our own entry.
ClientTableDataT local_client_info = client->client_table().GetLocalClient();
local_client_info.node_manager_address = "127.0.0.1";
local_client_info.node_manager_port = 0;
local_client_info.object_manager_port = 0;
RAY_CHECK_OK(client->client_table().Connect(local_client_info));
RAY_CHECK_OK(client->client_table().Disconnect());
test->Start();
}

TEST_F(TestGcsWithAsio, TestClientTableImmediateDisconnect) {
test = this;
TestClientTableImmediateDisconnect(job_id_, client_);
}

void TestClientTableMarkDisconnected(const JobID &job_id,
std::shared_ptr<gcs::AsyncGcsClient> client) {
ClientTableDataT local_client_info = client->client_table().GetLocalClient();
local_client_info.node_manager_address = "127.0.0.1";
local_client_info.node_manager_port = 0;
local_client_info.object_manager_port = 0;
// Connect to the client table to start receiving notifications.
RAY_CHECK_OK(client->client_table().Connect(local_client_info));
// Mark a different client as dead.
ClientID dead_client_id = ClientID::from_random();
RAY_CHECK_OK(client->client_table().MarkDisconnected(dead_client_id));
// Make sure we only get a notification for the removal of the client we
// marked as dead.
client->client_table().RegisterClientRemovedCallback([dead_client_id](
gcs::AsyncGcsClient *client, const UniqueID &id, const ClientTableDataT &data) {
ASSERT_EQ(ClientID::from_binary(data.client_id), dead_client_id);
test->Stop();
});
test->Start();
}

TEST_F(TestGcsWithAsio, TestClientTableMarkDisconnected) {
test = this;
TestClientTableMarkDisconnected(job_id_, client_);
}

} // namespace
50 changes: 28 additions & 22 deletions src/ray/gcs/tables.cc
Original file line number Diff line number Diff line change
Expand Up @@ -276,41 +276,40 @@ Status ClientTable::Connect(const ClientTableDataT &local_client) {
RAY_CHECK(local_client.client_id == local_client_.client_id);
local_client_ = local_client;

// Construct the data to add to the client table.
auto data = std::make_shared<ClientTableDataT>(local_client_);
data->is_insertion = true;
// Callback for a notification from the client table.
auto notification_callback = [this](
AsyncGcsClient *client, const UniqueID &log_key,
const std::vector<ClientTableDataT> &notifications) {
RAY_CHECK(log_key == client_log_key_);
for (auto &notification : notifications) {
HandleNotification(client, notification);
}
};
// Callback to handle our own successful connection once we've added
// ourselves.
auto add_callback = [this](AsyncGcsClient *client, const UniqueID &log_key,
std::shared_ptr<ClientTableDataT> data) {
RAY_CHECK(log_key == client_log_key_);
HandleConnected(client, data);

// Callback for a notification from the client table.
auto notification_callback = [this](
AsyncGcsClient *client, const UniqueID &log_key,
const std::vector<ClientTableDataT> &notifications) {
RAY_CHECK(log_key == client_log_key_);
for (auto &notification : notifications) {
HandleNotification(client, notification);
}
};
// Callback to request notifications from the client table once we've
// successfully subscribed.
auto subscription_callback = [this](AsyncGcsClient *c) {
RAY_CHECK_OK(RequestNotifications(JobID::nil(), client_log_key_, client_id_));
};
// Subscribe to the client table.
RAY_CHECK_OK(Subscribe(JobID::nil(), client_id_, notification_callback,
subscription_callback));
};
// Callback to add ourselves once we've successfully subscribed.
auto subscription_callback = [this, data, add_callback](AsyncGcsClient *c) {
// Mark ourselves as deleted if we called Disconnect() since the last
// Connect() call.
if (disconnected_) {
data->is_insertion = false;
}
RAY_CHECK_OK(RequestNotifications(JobID::nil(), client_log_key_, client_id_));
RAY_CHECK_OK(Append(JobID::nil(), client_log_key_, data, add_callback));
};
return Subscribe(JobID::nil(), client_id_, notification_callback,
subscription_callback);
return Append(JobID::nil(), client_log_key_, data, add_callback);
}

Status ClientTable::Disconnect() {
auto data = std::make_shared<ClientTableDataT>(local_client_);
data->is_insertion = true;
data->is_insertion = false;
auto add_callback = [this](AsyncGcsClient *client, const ClientID &id,
std::shared_ptr<ClientTableDataT> data) {
HandleConnected(client, data);
Expand All @@ -322,6 +321,13 @@ Status ClientTable::Disconnect() {
return Status::OK();
}

ray::Status ClientTable::MarkDisconnected(const ClientID &dead_client_id) {
auto data = std::make_shared<ClientTableDataT>();
data->client_id = dead_client_id.binary();
data->is_insertion = false;
return Append(JobID::nil(), client_log_key_, data, nullptr);
}

const ClientTableDataT &ClientTable::GetClient(const ClientID &client_id) {
RAY_CHECK(!client_id.is_nil());
auto entry = client_cache_.find(client_id);
Expand Down
7 changes: 7 additions & 0 deletions src/ray/gcs/tables.h
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,13 @@ class ClientTable : private Log<UniqueID, ClientTableData> {
/// \return Status
ray::Status Disconnect();

/// Mark a different client as disconnected. The client ID should never be
/// reused for a new client.
///
/// \param dead_client_id The ID of the client to mark as dead.
/// \return Status
ray::Status MarkDisconnected(const ClientID &dead_client_id);

/// Register a callback to call when a new client is added.
///
/// \param callback The callback to register.
Expand Down
3 changes: 3 additions & 0 deletions src/ray/raylet/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ target_link_libraries(rayletlib ray_static ${Boost_SYSTEM_LIBRARY})
add_executable(raylet main.cc)
target_link_libraries(raylet rayletlib ${Boost_SYSTEM_LIBRARY} pthread)

add_executable(raylet_monitor monitor_main.cc)
target_link_libraries(raylet_monitor rayletlib ${Boost_SYSTEM_LIBRARY} pthread)

install(FILES
raylet
DESTINATION "${CMAKE_INSTALL_INCLUDEDIR}/ray/raylet")
4 changes: 2 additions & 2 deletions src/ray/raylet/main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ int main(int argc, char *argv[]) {
node_manager_config.worker_command.push_back(token);
}

// TODO(swang): Set this from a global config.
node_manager_config.heartbeat_period_ms = 100;
node_manager_config.heartbeat_period_ms =
RayConfig::instance().heartbeat_timeout_milliseconds();

// Configuration for the object manager.
ray::ObjectManagerConfig object_manager_config;
Expand Down
66 changes: 66 additions & 0 deletions src/ray/raylet/monitor.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
#include "ray/raylet/monitor.h"

#include "ray/status.h"

namespace ray {

namespace raylet {

/// \class Monitor
///
/// The monitor is responsible for listening for heartbeats from Raylets and
/// deciding when a Raylet has died. If the monitor does not hear from a Raylet
/// within heartbeat_timeout_milliseconds * num_heartbeats_timeout (defined in
/// the Ray configuration), then the monitor will mark that Raylet as dead in
/// the client table, which broadcasts the event to all other Raylets.
Monitor::Monitor(boost::asio::io_service &io_service, const std::string &redis_address,
int redis_port)
: gcs_client_(),
heartbeat_timeout_ms_(RayConfig::instance().num_heartbeats_timeout()),
heartbeat_timer_(io_service) {
RAY_CHECK_OK(gcs_client_.Connect(redis_address, redis_port));
RAY_CHECK_OK(gcs_client_.Attach(io_service));
}

void Monitor::HandleHeartbeat(const ClientID &client_id) {
heartbeats_[client_id] = heartbeat_timeout_ms_;
}

void Monitor::Start() {
const auto heartbeat_callback = [this](gcs::AsyncGcsClient *client, const ClientID &id,
const HeartbeatTableDataT &heartbeat_data) {
HandleHeartbeat(id);
};
RAY_CHECK_OK(gcs_client_.heartbeat_table().Subscribe(UniqueID::nil(), UniqueID::nil(),
heartbeat_callback, nullptr));
Tick();
}

/// A periodic timer that checks for timed out clients.
void Monitor::Tick() {
for (auto it = heartbeats_.begin(); it != heartbeats_.end();) {
it->second--;
if (it->second == 0) {
if (dead_clients_.count(it->first) == 0) {
RAY_LOG(WARNING) << "Client timed out: " << it->first;
RAY_CHECK_OK(gcs_client_.client_table().MarkDisconnected(it->first));
dead_clients_.insert(it->first);
}
it = heartbeats_.erase(it);
} else {
it++;
}
}

auto heartbeat_period = boost::posix_time::milliseconds(
RayConfig::instance().heartbeat_timeout_milliseconds());
heartbeat_timer_.expires_from_now(heartbeat_period);
heartbeat_timer_.async_wait([this](const boost::system::error_code &error) {
RAY_CHECK(!error);
Tick();
});
}

} // namespace raylet

} // namespace ray
56 changes: 56 additions & 0 deletions src/ray/raylet/monitor.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
#ifndef RAY_RAYLET_MONITOR_H
#define RAY_RAYLET_MONITOR_H

#include <memory>
#include <unordered_set>

#include "ray/gcs/client.h"
#include "ray/id.h"

namespace ray {

namespace raylet {

class Monitor {
public:
/// Create a Raylet monitor attached to the given GCS address and port.
///
/// \param io_service The event loop to run the monitor on.
/// \param redis_address The GCS Redis address to connect to.
/// \param redis_port The GCS Redis port to connect to.
Monitor(boost::asio::io_service &io_service, const std::string &redis_address,
int redis_port);

/// Start the monitor. Listen for heartbeats from Raylets and mark Raylets
/// that do not send a heartbeat within a given period as dead.
void Start();

/// A periodic timer that fires on every heartbeat period. Raylets that have
/// not sent a heartbeat within the last num_heartbeats_timeout ticks will be
/// marked as dead in the client table.
void Tick();

/// Handle a heartbeat from a Raylet.
///
/// \param client_id The client ID of the Raylet that sent the heartbeat.
void HandleHeartbeat(const ClientID &client_id);

private:
/// A client to the GCS, through which heartbeats are received.
gcs::AsyncGcsClient gcs_client_;
/// The expected period between heartbeats, for an individual Raylet.
int64_t heartbeat_timeout_ms_;
/// A timer that ticks every heartbeat_timeout_ms_ milliseconds.
boost::asio::deadline_timer heartbeat_timer_;
/// For each Raylet that we receive a heartbeat from, the number of ticks
/// that may pass before the Raylet will be declared dead.
std::unordered_map<ClientID, int64_t, UniqueIDHasher> heartbeats_;
/// The Raylets that have been marked as dead in the client table.
std::unordered_set<ClientID, UniqueIDHasher> dead_clients_;
};

} // namespace raylet

} // namespace ray

#endif // RAY_RAYLET_MONITOR_H
Loading

0 comments on commit cbf3181

Please sign in to comment.