Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions src/common/state/ray_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,18 @@ class RayConfig {

int64_t ray_protocol_version() const { return ray_protocol_version_; }

uint64_t handler_warning_timeout_ms() const {
return handler_warning_timeout_ms_;
}

int64_t heartbeat_timeout_milliseconds() const {
return heartbeat_timeout_milliseconds_;
}

int64_t num_heartbeats_timeout() const { return num_heartbeats_timeout_; }

uint64_t num_heartbeats_warning() const { return num_heartbeats_warning_; }

int64_t initial_reconstruction_timeout_milliseconds() const {
return initial_reconstruction_timeout_milliseconds_;
}
Expand Down Expand Up @@ -113,8 +119,10 @@ class RayConfig {
private:
RayConfig()
: ray_protocol_version_(0x0000000000000000),
handler_warning_timeout_ms_(100),
heartbeat_timeout_milliseconds_(100),
num_heartbeats_timeout_(100),
num_heartbeats_warning_(5),
initial_reconstruction_timeout_milliseconds_(200),
get_timeout_milliseconds_(1000),
worker_get_request_size_(10000),
Expand Down Expand Up @@ -154,13 +162,21 @@ class RayConfig {
/// In theory, this is used to detect Ray version mismatches.
int64_t ray_protocol_version_;

/// The duration that a single handler on the event loop can take before a
/// warning is logged that the handler is taking too long.
uint64_t handler_warning_timeout_ms_;

/// The duration between heartbeats. These are sent by the plasma manager and
/// local scheduler.
int64_t heartbeat_timeout_milliseconds_;
/// If a component has not sent a heartbeat in the last num_heartbeats_timeout
/// heartbeat intervals, the global scheduler or monitor process will report
/// it as dead to the db_client table.
int64_t num_heartbeats_timeout_;
/// For a raylet, if the last heartbeat was sent more than this many
/// heartbeat periods ago, then a warning will be logged that the heartbeat
/// handler is drifting.
uint64_t num_heartbeats_warning_;

/// The initial period for a task execution lease. The lease will expire this
/// many milliseconds after the first acquisition of the lease. Nodes that
Expand Down
19 changes: 15 additions & 4 deletions src/ray/common/client_connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

#include "common.h"
#include "ray/raylet/format/node_manager_generated.h"
#include "ray/util/util.h"

namespace ray {

Expand Down Expand Up @@ -93,18 +94,21 @@ ray::Status ServerConnection<T>::WriteMessage(int64_t type, int64_t length,
template <class T>
std::shared_ptr<ClientConnection<T>> ClientConnection<T>::Create(
ClientHandler<T> &client_handler, MessageHandler<T> &message_handler,
boost::asio::basic_stream_socket<T> &&socket) {
boost::asio::basic_stream_socket<T> &&socket, const std::string &debug_label) {
std::shared_ptr<ClientConnection<T>> self(
new ClientConnection(message_handler, std::move(socket)));
new ClientConnection(message_handler, std::move(socket), debug_label));
// Let our manager process our new connection.
client_handler(*self);
return self;
}

template <class T>
ClientConnection<T>::ClientConnection(MessageHandler<T> &message_handler,
boost::asio::basic_stream_socket<T> &&socket)
: ServerConnection<T>(std::move(socket)), message_handler_(message_handler) {}
boost::asio::basic_stream_socket<T> &&socket,
const std::string &debug_label)
: ServerConnection<T>(std::move(socket)),
message_handler_(message_handler),
debug_label_(debug_label) {}

template <class T>
const ClientID &ClientConnection<T>::GetClientID() {
Expand Down Expand Up @@ -156,7 +160,14 @@ void ClientConnection<T>::ProcessMessage(const boost::system::error_code &error)
if (error) {
read_type_ = static_cast<int64_t>(protocol::MessageType::DisconnectClient);
}

uint64_t start_ms = current_time_ms();
message_handler_(this->shared_from_this(), read_type_, read_message_.data());
uint64_t interval = current_time_ms() - start_ms;
if (interval > RayConfig::instance().handler_warning_timeout_ms()) {
RAY_LOG(WARNING) << "[" << debug_label_ << "]ProcessMessage with type " << read_type_
<< " took " << interval << " ms ";
}
}

template class ServerConnection<boost::asio::local::stream_protocol>;
Expand Down
7 changes: 5 additions & 2 deletions src/ray/common/client_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ class ClientConnection : public ServerConnection<T>,
/// \return std::shared_ptr<ClientConnection>.
static std::shared_ptr<ClientConnection<T>> Create(
ClientHandler<T> &new_client_handler, MessageHandler<T> &message_handler,
boost::asio::basic_stream_socket<T> &&socket);
boost::asio::basic_stream_socket<T> &&socket, const std::string &debug_label);

/// \return The ClientID of the remote client.
const ClientID &GetClientID();
Expand All @@ -100,7 +100,8 @@ class ClientConnection : public ServerConnection<T>,
private:
/// A private constructor for a node client connection.
ClientConnection(MessageHandler<T> &message_handler,
boost::asio::basic_stream_socket<T> &&socket);
boost::asio::basic_stream_socket<T> &&socket,
const std::string &debug_label);
/// Process an error from the last operation, then process the message
/// header from the client.
void ProcessMessageHeader(const boost::system::error_code &error);
Expand All @@ -112,6 +113,8 @@ class ClientConnection : public ServerConnection<T>,
ClientID client_id_;
/// The handler for a message from the client.
MessageHandler<T> message_handler_;
/// A label used for debug messages.
const std::string debug_label_;
/// Buffers for the current message being read rom the client.
int64_t read_version_;
int64_t read_type_;
Expand Down
5 changes: 3 additions & 2 deletions src/ray/object_manager/test/object_manager_stress_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,9 @@ class MockServer {
object_manager_.ProcessClientMessage(client, message_type, message);
};
// Accept a new local client and dispatch it to the node manager.
auto new_connection = TcpClientConnection::Create(client_handler, message_handler,
std::move(object_manager_socket_));
auto new_connection =
TcpClientConnection::Create(client_handler, message_handler,
std::move(object_manager_socket_), "object manager");
DoAcceptObjectManager();
}

Expand Down
5 changes: 3 additions & 2 deletions src/ray/object_manager/test/object_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,9 @@ class MockServer {
object_manager_.ProcessClientMessage(client, message_type, message);
};
// Accept a new local client and dispatch it to the node manager.
auto new_connection = TcpClientConnection::Create(client_handler, message_handler,
std::move(object_manager_socket_));
auto new_connection =
TcpClientConnection::Create(client_handler, message_handler,
std::move(object_manager_socket_), "object manager");
DoAcceptObjectManager();
}

Expand Down
16 changes: 12 additions & 4 deletions src/ray/raylet/node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ NodeManager::NodeManager(boost::asio::io_service &io_service,
object_manager_(object_manager),
gcs_client_(gcs_client),
heartbeat_timer_(io_service),
heartbeat_period_ms_(config.heartbeat_period_ms),
heartbeat_period_(std::chrono::milliseconds(config.heartbeat_period_ms)),
local_resources_(config.resource_config),
local_available_resources_(config.resource_config),
worker_pool_(config.num_initial_workers, config.num_workers_per_process,
Expand All @@ -106,7 +106,7 @@ NodeManager::NodeManager(boost::asio::io_service &io_service,
remote_clients_(),
remote_server_connections_(),
actor_registry_() {
RAY_CHECK(heartbeat_period_ms_ > 0);
RAY_CHECK(heartbeat_period_.count() > 0);
// Initialize the resource map with own cluster resource configuration.
ClientID local_client_id = gcs_client_->client_table().GetLocalClientId();
cluster_resource_map_.emplace(local_client_id,
Expand Down Expand Up @@ -194,12 +194,21 @@ ray::Status NodeManager::RegisterGcs() {
}));

// Start sending heartbeats to the GCS.
last_heartbeat_at_ms_ = current_time_ms();
Heartbeat();

return ray::Status::OK();
}

void NodeManager::Heartbeat() {
uint64_t now_ms = current_time_ms();
uint64_t interval = now_ms - last_heartbeat_at_ms_;
if (interval > RayConfig::instance().num_heartbeats_warning() *
RayConfig::instance().heartbeat_timeout_milliseconds()) {
RAY_LOG(WARNING) << "Last heartbeat was sent " << interval << " ms ago ";
}
last_heartbeat_at_ms_ = now_ms;

RAY_LOG(DEBUG) << "[Heartbeat] sending heartbeat.";
auto &heartbeat_table = gcs_client_->heartbeat_table();
auto heartbeat_data = std::make_shared<HeartbeatTableDataT>();
Expand Down Expand Up @@ -232,8 +241,7 @@ void NodeManager::Heartbeat() {
RAY_CHECK_OK(status);

// Reset the timer.
auto heartbeat_period = boost::posix_time::milliseconds(heartbeat_period_ms_);
heartbeat_timer_.expires_from_now(heartbeat_period);
heartbeat_timer_.expires_from_now(heartbeat_period_);
heartbeat_timer_.async_wait([this](const boost::system::error_code &error) {
RAY_CHECK(!error);
Heartbeat();
Expand Down
11 changes: 9 additions & 2 deletions src/ray/raylet/node_manager.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
#ifndef RAY_RAYLET_NODE_MANAGER_H
#define RAY_RAYLET_NODE_MANAGER_H

#include <boost/asio/steady_timer.hpp>

// clang-format off
#include "ray/raylet/task.h"
#include "ray/object_manager/object_manager.h"
Expand Down Expand Up @@ -152,8 +154,13 @@ class NodeManager {
plasma::PlasmaClient store_client_;
/// A client connection to the GCS.
std::shared_ptr<gcs::AsyncGcsClient> gcs_client_;
boost::asio::deadline_timer heartbeat_timer_;
uint64_t heartbeat_period_ms_;
/// The timer used to send heartbeats.
boost::asio::steady_timer heartbeat_timer_;
/// The period used for the heartbeat timer.
std::chrono::milliseconds heartbeat_period_;
/// The time that the last heartbeat was sent at. Used to make sure we are
/// keeping up with heartbeats.
uint64_t last_heartbeat_at_ms_;
/// The resources local to this node.
const SchedulingResources local_resources_;
/// The resources (and specific resource IDs) that are currently available.
Expand Down
15 changes: 8 additions & 7 deletions src/ray/raylet/raylet.cc
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,9 @@ void Raylet::HandleAcceptNodeManager(const boost::system::error_code &error) {
const uint8_t *message) {
node_manager_.ProcessNodeManagerMessage(*client, message_type, message);
};
// Accept a new local client and dispatch it to the node manager.
auto new_connection = TcpClientConnection::Create(client_handler, message_handler,
std::move(node_manager_socket_));
// Accept a new TCP client and dispatch it to the node manager.
auto new_connection = TcpClientConnection::Create(
client_handler, message_handler, std::move(node_manager_socket_), "node manager");
}
// We're ready to accept another client.
DoAcceptNodeManager();
Expand All @@ -115,9 +115,10 @@ void Raylet::HandleAcceptObjectManager(const boost::system::error_code &error) {
const uint8_t *message) {
object_manager_.ProcessClientMessage(client, message_type, message);
};
// Accept a new local client and dispatch it to the node manager.
auto new_connection = TcpClientConnection::Create(client_handler, message_handler,
std::move(object_manager_socket_));
// Accept a new TCP client and dispatch it to the node manager.
auto new_connection =
TcpClientConnection::Create(client_handler, message_handler,
std::move(object_manager_socket_), "object manager");
DoAcceptObjectManager();
}

Expand All @@ -138,7 +139,7 @@ void Raylet::HandleAccept(const boost::system::error_code &error) {
};
// Accept a new local client and dispatch it to the node manager.
auto new_connection = LocalClientConnection::Create(client_handler, message_handler,
std::move(socket_));
std::move(socket_), "worker");
}
// We're ready to accept another client.
DoAccept();
Expand Down
4 changes: 2 additions & 2 deletions src/ray/raylet/worker_pool_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ class WorkerPoolTest : public ::testing::Test {
HandleMessage(client, message_type, message);
};
boost::asio::local::stream_protocol::socket socket(io_service_);
auto client =
LocalClientConnection::Create(client_handler, message_handler, std::move(socket));
auto client = LocalClientConnection::Create(client_handler, message_handler,
std::move(socket), "worker");
return std::shared_ptr<Worker>(new Worker(pid, client));
}

Expand Down