Skip to content

[xray] Monitor for Raylet processes #1831

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Apr 6, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 34 additions & 2 deletions python/ray/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,10 @@
os.path.abspath(os.path.dirname(__file__)),
"core/src/credis/build/src/libmember.so")

# Location of the raylet executable.
# Location of the raylet executables.
RAYLET_MONITOR_EXECUTABLE = os.path.join(
os.path.abspath(os.path.dirname(__file__)),
"core/src/ray/raylet/raylet_monitor")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe the executable also needs to be added to the ray_files = [ list in python/setup.py.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

RAYLET_EXECUTABLE = os.path.join(
os.path.abspath(os.path.dirname(__file__)),
"core/src/ray/raylet/raylet")
Expand Down Expand Up @@ -1112,11 +1115,35 @@ def start_monitor(redis_address, node_ip_address, stdout_file=None,
command.append("--autoscaling-config=" + str(autoscaling_config))
p = subprocess.Popen(command, stdout=stdout_file, stderr=stderr_file)
if cleanup:
all_processes[PROCESS_TYPE_WORKER].append(p)
all_processes[PROCESS_TYPE_MONITOR].append(p)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice catch!

record_log_files_in_redis(redis_address, node_ip_address,
[stdout_file, stderr_file])


def start_raylet_monitor(redis_address, stdout_file=None,
stderr_file=None, cleanup=True):
"""Run a process to monitor the other processes.

Args:
redis_address (str): The address that the Redis server is listening on.
stdout_file: A file handle opened for writing to redirect stdout to. If
no redirection should happen, then this should be None.
stderr_file: A file handle opened for writing to redirect stderr to. If
no redirection should happen, then this should be None.
cleanup (bool): True if using Ray in local mode. If cleanup is true,
then this process will be killed by services.cleanup() when the
Python process that imported services exits. This is True by
default.
"""
gcs_ip_address, gcs_port = redis_address.split(":")
command = [RAYLET_MONITOR_EXECUTABLE,
gcs_ip_address,
gcs_port]
p = subprocess.Popen(command, stdout=stdout_file, stderr=stderr_file)
if cleanup:
all_processes[PROCESS_TYPE_MONITOR].append(p)


def start_ray_processes(address_info=None,
node_ip_address="127.0.0.1",
redis_port=None,
Expand Down Expand Up @@ -1253,6 +1280,11 @@ def start_ray_processes(address_info=None,
stderr_file=monitor_stderr_file,
cleanup=cleanup,
autoscaling_config=autoscaling_config)
if use_raylet:
start_raylet_monitor(redis_address,
stdout_file=monitor_stdout_file,
stderr_file=monitor_stderr_file,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we want these to go to the same log files as monitor.py or to their own log file?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it makes sense to go to the same file as monitor.py, since only one of the monitors should ever be active at a given time (except for the autoscaler part of monitor.py).

cleanup=cleanup)

if redis_shards == []:
# Get redis shards from primary redis instance.
Expand Down
1 change: 1 addition & 0 deletions python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
"ray/core/src/local_scheduler/local_scheduler",
"ray/core/src/local_scheduler/liblocal_scheduler_library.so",
"ray/core/src/global_scheduler/global_scheduler",
"ray/core/src/ray/raylet/raylet_monitor",
"ray/core/src/ray/raylet/raylet",
"ray/WebUI.ipynb"
]
Expand Down
1 change: 1 addition & 0 deletions src/ray/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ set(RAY_SRCS
object_manager/object_directory.cc
object_manager/transfer_queue.cc
object_manager/object_manager.cc
raylet/monitor.cc
raylet/mock_gcs_client.cc
raylet/task.cc
raylet/task_execution_spec.cc
Expand Down
63 changes: 60 additions & 3 deletions src/ray/gcs/client_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -781,20 +781,22 @@ void TestClientTableDisconnect(const JobID &job_id,
client->client_table().RegisterClientAddedCallback(
[](gcs::AsyncGcsClient *client, const UniqueID &id, const ClientTableDataT &data) {
ClientTableNotification(client, id, data, true);
// Disconnect from the client table. We should receive a notification
// for the removal of our own entry.
RAY_CHECK_OK(client->client_table().Disconnect());
});
client->client_table().RegisterClientRemovedCallback(
[](gcs::AsyncGcsClient *client, const UniqueID &id, const ClientTableDataT &data) {
ClientTableNotification(client, id, data, false);
test->Stop();
});
// Connect and disconnect to client table. We should receive notifications
// for the addition and removal of our own entry.
// Connect to the client table. We should receive notification for the
// addition of our own entry.
ClientTableDataT local_client_info = client->client_table().GetLocalClient();
local_client_info.node_manager_address = "127.0.0.1";
local_client_info.node_manager_port = 0;
local_client_info.object_manager_port = 0;
RAY_CHECK_OK(client->client_table().Connect(local_client_info));
RAY_CHECK_OK(client->client_table().Disconnect());
test->Start();
}

Expand All @@ -803,4 +805,59 @@ TEST_F(TestGcsWithAsio, TestClientTableDisconnect) {
TestClientTableDisconnect(job_id_, client_);
}

void TestClientTableImmediateDisconnect(const JobID &job_id,
std::shared_ptr<gcs::AsyncGcsClient> client) {
// Register callbacks for when a client gets added and removed. The latter
// event will stop the event loop.
client->client_table().RegisterClientAddedCallback(
[](gcs::AsyncGcsClient *client, const UniqueID &id, const ClientTableDataT &data) {
ClientTableNotification(client, id, data, true);
});
client->client_table().RegisterClientRemovedCallback(
[](gcs::AsyncGcsClient *client, const UniqueID &id, const ClientTableDataT &data) {
ClientTableNotification(client, id, data, false);
test->Stop();
});
// Connect to then immediately disconnect from the client table. We should
// receive notifications for the addition and removal of our own entry.
ClientTableDataT local_client_info = client->client_table().GetLocalClient();
local_client_info.node_manager_address = "127.0.0.1";
local_client_info.node_manager_port = 0;
local_client_info.object_manager_port = 0;
RAY_CHECK_OK(client->client_table().Connect(local_client_info));
RAY_CHECK_OK(client->client_table().Disconnect());
test->Start();
}

TEST_F(TestGcsWithAsio, TestClientTableImmediateDisconnect) {
test = this;
TestClientTableImmediateDisconnect(job_id_, client_);
}

void TestClientTableMarkDisconnected(const JobID &job_id,
std::shared_ptr<gcs::AsyncGcsClient> client) {
ClientTableDataT local_client_info = client->client_table().GetLocalClient();
local_client_info.node_manager_address = "127.0.0.1";
local_client_info.node_manager_port = 0;
local_client_info.object_manager_port = 0;
// Connect to the client table to start receiving notifications.
RAY_CHECK_OK(client->client_table().Connect(local_client_info));
// Mark a different client as dead.
ClientID dead_client_id = ClientID::from_random();
RAY_CHECK_OK(client->client_table().MarkDisconnected(dead_client_id));
// Make sure we only get a notification for the removal of the client we
// marked as dead.
client->client_table().RegisterClientRemovedCallback([dead_client_id](
gcs::AsyncGcsClient *client, const UniqueID &id, const ClientTableDataT &data) {
ASSERT_EQ(ClientID::from_binary(data.client_id), dead_client_id);
test->Stop();
});
test->Start();
}

TEST_F(TestGcsWithAsio, TestClientTableMarkDisconnected) {
test = this;
TestClientTableMarkDisconnected(job_id_, client_);
}

} // namespace
50 changes: 28 additions & 22 deletions src/ray/gcs/tables.cc
Original file line number Diff line number Diff line change
Expand Up @@ -276,41 +276,40 @@ Status ClientTable::Connect(const ClientTableDataT &local_client) {
RAY_CHECK(local_client.client_id == local_client_.client_id);
local_client_ = local_client;

// Construct the data to add to the client table.
auto data = std::make_shared<ClientTableDataT>(local_client_);
data->is_insertion = true;
// Callback for a notification from the client table.
auto notification_callback = [this](
AsyncGcsClient *client, const UniqueID &log_key,
const std::vector<ClientTableDataT> &notifications) {
RAY_CHECK(log_key == client_log_key_);
for (auto &notification : notifications) {
HandleNotification(client, notification);
}
};
// Callback to handle our own successful connection once we've added
// ourselves.
auto add_callback = [this](AsyncGcsClient *client, const UniqueID &log_key,
std::shared_ptr<ClientTableDataT> data) {
RAY_CHECK(log_key == client_log_key_);
HandleConnected(client, data);

// Callback for a notification from the client table.
auto notification_callback = [this](
AsyncGcsClient *client, const UniqueID &log_key,
const std::vector<ClientTableDataT> &notifications) {
RAY_CHECK(log_key == client_log_key_);
for (auto &notification : notifications) {
HandleNotification(client, notification);
}
};
// Callback to request notifications from the client table once we've
// successfully subscribed.
auto subscription_callback = [this](AsyncGcsClient *c) {
RAY_CHECK_OK(RequestNotifications(JobID::nil(), client_log_key_, client_id_));
};
// Subscribe to the client table.
RAY_CHECK_OK(Subscribe(JobID::nil(), client_id_, notification_callback,
subscription_callback));
};
// Callback to add ourselves once we've successfully subscribed.
auto subscription_callback = [this, data, add_callback](AsyncGcsClient *c) {
// Mark ourselves as deleted if we called Disconnect() since the last
// Connect() call.
if (disconnected_) {
data->is_insertion = false;
}
RAY_CHECK_OK(RequestNotifications(JobID::nil(), client_log_key_, client_id_));
RAY_CHECK_OK(Append(JobID::nil(), client_log_key_, data, add_callback));
};
return Subscribe(JobID::nil(), client_id_, notification_callback,
subscription_callback);
return Append(JobID::nil(), client_log_key_, data, add_callback);
}

Status ClientTable::Disconnect() {
auto data = std::make_shared<ClientTableDataT>(local_client_);
data->is_insertion = true;
data->is_insertion = false;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm this was a bug right?

Copy link
Contributor Author

@stephanie-wang stephanie-wang Apr 4, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, apparently this path wasn't covered in the tests :( I can add a regression unit test.

auto add_callback = [this](AsyncGcsClient *client, const ClientID &id,
std::shared_ptr<ClientTableDataT> data) {
HandleConnected(client, data);
Expand All @@ -322,6 +321,13 @@ Status ClientTable::Disconnect() {
return Status::OK();
}

ray::Status ClientTable::MarkDisconnected(const ClientID &dead_client_id) {
auto data = std::make_shared<ClientTableDataT>();
data->client_id = dead_client_id.binary();
data->is_insertion = false;
return Append(JobID::nil(), client_log_key_, data, nullptr);
}

const ClientTableDataT &ClientTable::GetClient(const ClientID &client_id) {
RAY_CHECK(!client_id.is_nil());
auto entry = client_cache_.find(client_id);
Expand Down
7 changes: 7 additions & 0 deletions src/ray/gcs/tables.h
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,13 @@ class ClientTable : private Log<UniqueID, ClientTableData> {
/// \return Status
ray::Status Disconnect();

/// Mark a different client as disconnected. The client ID should never be
/// reused for a new client.
///
/// \param dead_client_id The ID of the client to mark as dead.
/// \return Status
ray::Status MarkDisconnected(const ClientID &dead_client_id);

/// Register a callback to call when a new client is added.
///
/// \param callback The callback to register.
Expand Down
3 changes: 3 additions & 0 deletions src/ray/raylet/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ target_link_libraries(rayletlib ray_static ${Boost_SYSTEM_LIBRARY})
add_executable(raylet main.cc)
target_link_libraries(raylet rayletlib ${Boost_SYSTEM_LIBRARY} pthread)

add_executable(raylet_monitor monitor_main.cc)
target_link_libraries(raylet_monitor rayletlib ${Boost_SYSTEM_LIBRARY} pthread)

install(FILES
raylet
DESTINATION "${CMAKE_INSTALL_INCLUDEDIR}/ray/raylet")
4 changes: 2 additions & 2 deletions src/ray/raylet/main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ int main(int argc, char *argv[]) {
node_manager_config.worker_command.push_back(token);
}

// TODO(swang): Set this from a global config.
node_manager_config.heartbeat_period_ms = 100;
node_manager_config.heartbeat_period_ms =
RayConfig::instance().heartbeat_timeout_milliseconds();

// Configuration for the object manager.
ray::ObjectManagerConfig object_manager_config;
Expand Down
66 changes: 66 additions & 0 deletions src/ray/raylet/monitor.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
#include "ray/raylet/monitor.h"

#include "ray/status.h"

namespace ray {

namespace raylet {

/// \class Monitor
///
/// The monitor is responsible for listening for heartbeats from Raylets and
/// deciding when a Raylet has died. If the monitor does not hear from a Raylet
/// within heartbeat_timeout_milliseconds * num_heartbeats_timeout (defined in
/// the Ray configuration), then the monitor will mark that Raylet as dead in
/// the client table, which broadcasts the event to all other Raylets.
Monitor::Monitor(boost::asio::io_service &io_service, const std::string &redis_address,
int redis_port)
: gcs_client_(),
heartbeat_timeout_ms_(RayConfig::instance().num_heartbeats_timeout()),
heartbeat_timer_(io_service) {
RAY_CHECK_OK(gcs_client_.Connect(redis_address, redis_port));
RAY_CHECK_OK(gcs_client_.Attach(io_service));
}

void Monitor::HandleHeartbeat(const ClientID &client_id) {
heartbeats_[client_id] = heartbeat_timeout_ms_;
}

void Monitor::Start() {
const auto heartbeat_callback = [this](gcs::AsyncGcsClient *client, const ClientID &id,
const HeartbeatTableDataT &heartbeat_data) {
HandleHeartbeat(id);
};
RAY_CHECK_OK(gcs_client_.heartbeat_table().Subscribe(UniqueID::nil(), UniqueID::nil(),
heartbeat_callback, nullptr));
Tick();
}

/// A periodic timer that checks for timed out clients.
void Monitor::Tick() {
for (auto it = heartbeats_.begin(); it != heartbeats_.end();) {
it->second--;
if (it->second == 0) {
if (dead_clients_.count(it->first) == 0) {
RAY_LOG(WARNING) << "Client timed out: " << it->first;
RAY_CHECK_OK(gcs_client_.client_table().MarkDisconnected(it->first));
dead_clients_.insert(it->first);
}
it = heartbeats_.erase(it);
} else {
it++;
}
}

auto heartbeat_period = boost::posix_time::milliseconds(
RayConfig::instance().heartbeat_timeout_milliseconds());
heartbeat_timer_.expires_from_now(heartbeat_period);
heartbeat_timer_.async_wait([this](const boost::system::error_code &error) {
RAY_CHECK(!error);
Tick();
});
}

} // namespace raylet

} // namespace ray
56 changes: 56 additions & 0 deletions src/ray/raylet/monitor.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
#ifndef RAY_RAYLET_MONITOR_H
#define RAY_RAYLET_MONITOR_H

#include <memory>
#include <unordered_set>

#include "ray/gcs/client.h"
#include "ray/id.h"

namespace ray {

namespace raylet {

class Monitor {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could we include some documentation about the purpose of this class?

public:
/// Create a Raylet monitor attached to the given GCS address and port.
///
/// \param io_service The event loop to run the monitor on.
/// \param redis_address The GCS Redis address to connect to.
/// \param redis_port The GCS Redis port to connect to.
Monitor(boost::asio::io_service &io_service, const std::string &redis_address,
int redis_port);

/// Start the monitor. Listen for heartbeats from Raylets and mark Raylets
/// that do not send a heartbeat within a given period as dead.
void Start();

/// A periodic timer that fires on every heartbeat period. Raylets that have
/// not sent a heartbeat within the last num_heartbeats_timeout ticks will be
/// marked as dead in the client table.
void Tick();

/// Handle a heartbeat from a Raylet.
///
/// \param client_id The client ID of the Raylet that sent the heartbeat.
void HandleHeartbeat(const ClientID &client_id);

private:
/// A client to the GCS, through which heartbeats are received.
gcs::AsyncGcsClient gcs_client_;
/// The expected period between heartbeats, for an individual Raylet.
int64_t heartbeat_timeout_ms_;
/// A timer that ticks every heartbeat_timeout_ms_ milliseconds.
boost::asio::deadline_timer heartbeat_timer_;
/// For each Raylet that we receive a heartbeat from, the number of ticks
/// that may pass before the Raylet will be declared dead.
std::unordered_map<ClientID, int64_t, UniqueIDHasher> heartbeats_;
/// The Raylets that have been marked as dead in the client table.
std::unordered_set<ClientID, UniqueIDHasher> dead_clients_;
};

} // namespace raylet

} // namespace ray

#endif // RAY_RAYLET_MONITOR_H
Loading