-
Notifications
You must be signed in to change notification settings - Fork 6.5k
[xray] Monitor for Raylet processes #1831
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
1cd014d
041dc86
1984f45
c515301
1801b31
ef0b180
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -77,7 +77,10 @@ | |
os.path.abspath(os.path.dirname(__file__)), | ||
"core/src/credis/build/src/libmember.so") | ||
|
||
# Location of the raylet executable. | ||
# Location of the raylet executables. | ||
RAYLET_MONITOR_EXECUTABLE = os.path.join( | ||
os.path.abspath(os.path.dirname(__file__)), | ||
"core/src/ray/raylet/raylet_monitor") | ||
RAYLET_EXECUTABLE = os.path.join( | ||
os.path.abspath(os.path.dirname(__file__)), | ||
"core/src/ray/raylet/raylet") | ||
|
@@ -1112,11 +1115,35 @@ def start_monitor(redis_address, node_ip_address, stdout_file=None, | |
command.append("--autoscaling-config=" + str(autoscaling_config)) | ||
p = subprocess.Popen(command, stdout=stdout_file, stderr=stderr_file) | ||
if cleanup: | ||
all_processes[PROCESS_TYPE_WORKER].append(p) | ||
all_processes[PROCESS_TYPE_MONITOR].append(p) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nice catch! |
||
record_log_files_in_redis(redis_address, node_ip_address, | ||
[stdout_file, stderr_file]) | ||
|
||
|
||
def start_raylet_monitor(redis_address, stdout_file=None, | ||
stderr_file=None, cleanup=True): | ||
"""Run a process to monitor the other processes. | ||
|
||
Args: | ||
redis_address (str): The address that the Redis server is listening on. | ||
stdout_file: A file handle opened for writing to redirect stdout to. If | ||
no redirection should happen, then this should be None. | ||
stderr_file: A file handle opened for writing to redirect stderr to. If | ||
no redirection should happen, then this should be None. | ||
cleanup (bool): True if using Ray in local mode. If cleanup is true, | ||
then this process will be killed by services.cleanup() when the | ||
Python process that imported services exits. This is True by | ||
default. | ||
""" | ||
gcs_ip_address, gcs_port = redis_address.split(":") | ||
command = [RAYLET_MONITOR_EXECUTABLE, | ||
gcs_ip_address, | ||
gcs_port] | ||
p = subprocess.Popen(command, stdout=stdout_file, stderr=stderr_file) | ||
if cleanup: | ||
all_processes[PROCESS_TYPE_MONITOR].append(p) | ||
|
||
|
||
def start_ray_processes(address_info=None, | ||
node_ip_address="127.0.0.1", | ||
redis_port=None, | ||
|
@@ -1253,6 +1280,11 @@ def start_ray_processes(address_info=None, | |
stderr_file=monitor_stderr_file, | ||
cleanup=cleanup, | ||
autoscaling_config=autoscaling_config) | ||
if use_raylet: | ||
start_raylet_monitor(redis_address, | ||
stdout_file=monitor_stdout_file, | ||
stderr_file=monitor_stderr_file, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. do we want these to go to the same log files as There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it makes sense to go to the same file as |
||
cleanup=cleanup) | ||
|
||
if redis_shards == []: | ||
# Get redis shards from primary redis instance. | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -276,41 +276,40 @@ Status ClientTable::Connect(const ClientTableDataT &local_client) { | |
RAY_CHECK(local_client.client_id == local_client_.client_id); | ||
local_client_ = local_client; | ||
|
||
// Construct the data to add to the client table. | ||
auto data = std::make_shared<ClientTableDataT>(local_client_); | ||
data->is_insertion = true; | ||
// Callback for a notification from the client table. | ||
auto notification_callback = [this]( | ||
AsyncGcsClient *client, const UniqueID &log_key, | ||
const std::vector<ClientTableDataT> ¬ifications) { | ||
RAY_CHECK(log_key == client_log_key_); | ||
for (auto ¬ification : notifications) { | ||
HandleNotification(client, notification); | ||
} | ||
}; | ||
// Callback to handle our own successful connection once we've added | ||
// ourselves. | ||
auto add_callback = [this](AsyncGcsClient *client, const UniqueID &log_key, | ||
std::shared_ptr<ClientTableDataT> data) { | ||
RAY_CHECK(log_key == client_log_key_); | ||
HandleConnected(client, data); | ||
|
||
// Callback for a notification from the client table. | ||
auto notification_callback = [this]( | ||
AsyncGcsClient *client, const UniqueID &log_key, | ||
const std::vector<ClientTableDataT> ¬ifications) { | ||
RAY_CHECK(log_key == client_log_key_); | ||
for (auto ¬ification : notifications) { | ||
HandleNotification(client, notification); | ||
} | ||
}; | ||
// Callback to request notifications from the client table once we've | ||
// successfully subscribed. | ||
auto subscription_callback = [this](AsyncGcsClient *c) { | ||
RAY_CHECK_OK(RequestNotifications(JobID::nil(), client_log_key_, client_id_)); | ||
}; | ||
// Subscribe to the client table. | ||
RAY_CHECK_OK(Subscribe(JobID::nil(), client_id_, notification_callback, | ||
subscription_callback)); | ||
}; | ||
// Callback to add ourselves once we've successfully subscribed. | ||
auto subscription_callback = [this, data, add_callback](AsyncGcsClient *c) { | ||
// Mark ourselves as deleted if we called Disconnect() since the last | ||
// Connect() call. | ||
if (disconnected_) { | ||
data->is_insertion = false; | ||
} | ||
RAY_CHECK_OK(RequestNotifications(JobID::nil(), client_log_key_, client_id_)); | ||
RAY_CHECK_OK(Append(JobID::nil(), client_log_key_, data, add_callback)); | ||
}; | ||
return Subscribe(JobID::nil(), client_id_, notification_callback, | ||
subscription_callback); | ||
return Append(JobID::nil(), client_log_key_, data, add_callback); | ||
} | ||
|
||
Status ClientTable::Disconnect() { | ||
auto data = std::make_shared<ClientTableDataT>(local_client_); | ||
data->is_insertion = true; | ||
data->is_insertion = false; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm this was a bug right? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, apparently this path wasn't covered in the tests :( I can add a regression unit test. |
||
auto add_callback = [this](AsyncGcsClient *client, const ClientID &id, | ||
std::shared_ptr<ClientTableDataT> data) { | ||
HandleConnected(client, data); | ||
|
@@ -322,6 +321,13 @@ Status ClientTable::Disconnect() { | |
return Status::OK(); | ||
} | ||
|
||
ray::Status ClientTable::MarkDisconnected(const ClientID &dead_client_id) { | ||
auto data = std::make_shared<ClientTableDataT>(); | ||
data->client_id = dead_client_id.binary(); | ||
data->is_insertion = false; | ||
return Append(JobID::nil(), client_log_key_, data, nullptr); | ||
} | ||
|
||
const ClientTableDataT &ClientTable::GetClient(const ClientID &client_id) { | ||
RAY_CHECK(!client_id.is_nil()); | ||
auto entry = client_cache_.find(client_id); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,66 @@ | ||
#include "ray/raylet/monitor.h" | ||
|
||
#include "ray/status.h" | ||
|
||
namespace ray { | ||
|
||
namespace raylet { | ||
|
||
/// \class Monitor | ||
/// | ||
/// The monitor is responsible for listening for heartbeats from Raylets and | ||
/// deciding when a Raylet has died. If the monitor does not hear from a Raylet | ||
/// within heartbeat_timeout_milliseconds * num_heartbeats_timeout (defined in | ||
/// the Ray configuration), then the monitor will mark that Raylet as dead in | ||
/// the client table, which broadcasts the event to all other Raylets. | ||
Monitor::Monitor(boost::asio::io_service &io_service, const std::string &redis_address, | ||
int redis_port) | ||
: gcs_client_(), | ||
heartbeat_timeout_ms_(RayConfig::instance().num_heartbeats_timeout()), | ||
heartbeat_timer_(io_service) { | ||
RAY_CHECK_OK(gcs_client_.Connect(redis_address, redis_port)); | ||
RAY_CHECK_OK(gcs_client_.Attach(io_service)); | ||
} | ||
|
||
void Monitor::HandleHeartbeat(const ClientID &client_id) { | ||
heartbeats_[client_id] = heartbeat_timeout_ms_; | ||
} | ||
|
||
void Monitor::Start() { | ||
const auto heartbeat_callback = [this](gcs::AsyncGcsClient *client, const ClientID &id, | ||
const HeartbeatTableDataT &heartbeat_data) { | ||
HandleHeartbeat(id); | ||
}; | ||
RAY_CHECK_OK(gcs_client_.heartbeat_table().Subscribe(UniqueID::nil(), UniqueID::nil(), | ||
heartbeat_callback, nullptr)); | ||
Tick(); | ||
} | ||
|
||
/// A periodic timer that checks for timed out clients. | ||
void Monitor::Tick() { | ||
for (auto it = heartbeats_.begin(); it != heartbeats_.end();) { | ||
it->second--; | ||
if (it->second == 0) { | ||
if (dead_clients_.count(it->first) == 0) { | ||
RAY_LOG(WARNING) << "Client timed out: " << it->first; | ||
RAY_CHECK_OK(gcs_client_.client_table().MarkDisconnected(it->first)); | ||
dead_clients_.insert(it->first); | ||
} | ||
it = heartbeats_.erase(it); | ||
} else { | ||
it++; | ||
} | ||
} | ||
|
||
auto heartbeat_period = boost::posix_time::milliseconds( | ||
RayConfig::instance().heartbeat_timeout_milliseconds()); | ||
heartbeat_timer_.expires_from_now(heartbeat_period); | ||
heartbeat_timer_.async_wait([this](const boost::system::error_code &error) { | ||
RAY_CHECK(!error); | ||
Tick(); | ||
}); | ||
} | ||
|
||
} // namespace raylet | ||
|
||
} // namespace ray |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,56 @@ | ||
#ifndef RAY_RAYLET_MONITOR_H | ||
#define RAY_RAYLET_MONITOR_H | ||
|
||
#include <memory> | ||
#include <unordered_set> | ||
|
||
#include "ray/gcs/client.h" | ||
#include "ray/id.h" | ||
|
||
namespace ray { | ||
|
||
namespace raylet { | ||
|
||
class Monitor { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. could we include some documentation about the purpose of this class? |
||
public: | ||
/// Create a Raylet monitor attached to the given GCS address and port. | ||
/// | ||
/// \param io_service The event loop to run the monitor on. | ||
/// \param redis_address The GCS Redis address to connect to. | ||
/// \param redis_port The GCS Redis port to connect to. | ||
Monitor(boost::asio::io_service &io_service, const std::string &redis_address, | ||
int redis_port); | ||
|
||
/// Start the monitor. Listen for heartbeats from Raylets and mark Raylets | ||
/// that do not send a heartbeat within a given period as dead. | ||
void Start(); | ||
|
||
/// A periodic timer that fires on every heartbeat period. Raylets that have | ||
/// not sent a heartbeat within the last num_heartbeats_timeout ticks will be | ||
/// marked as dead in the client table. | ||
void Tick(); | ||
|
||
/// Handle a heartbeat from a Raylet. | ||
/// | ||
/// \param client_id The client ID of the Raylet that sent the heartbeat. | ||
void HandleHeartbeat(const ClientID &client_id); | ||
|
||
private: | ||
/// A client to the GCS, through which heartbeats are received. | ||
gcs::AsyncGcsClient gcs_client_; | ||
/// The expected period between heartbeats, for an individual Raylet. | ||
int64_t heartbeat_timeout_ms_; | ||
/// A timer that ticks every heartbeat_timeout_ms_ milliseconds. | ||
boost::asio::deadline_timer heartbeat_timer_; | ||
/// For each Raylet that we receive a heartbeat from, the number of ticks | ||
/// that may pass before the Raylet will be declared dead. | ||
std::unordered_map<ClientID, int64_t, UniqueIDHasher> heartbeats_; | ||
/// The Raylets that have been marked as dead in the client table. | ||
std::unordered_set<ClientID, UniqueIDHasher> dead_clients_; | ||
}; | ||
|
||
} // namespace raylet | ||
|
||
} // namespace ray | ||
|
||
#endif // RAY_RAYLET_MONITOR_H |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe the executable also needs to be added to the
ray_files = [
list inpython/setup.py
.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks!