Skip to content

Commit 2de9bfc

Browse files
stephanie-wangpcmoritz
authored andcommitted
[xray] Log warnings for asio handlers that take too long (#2601)
* Add fatal check for heartbeat drift * Log warning messages for handlers that take too long * Add debug labels to all ClientConnections
1 parent d49b4be commit 2de9bfc

File tree

9 files changed

+72
-25
lines changed

9 files changed

+72
-25
lines changed

src/common/state/ray_config.h

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,18 @@ class RayConfig {
1212

1313
int64_t ray_protocol_version() const { return ray_protocol_version_; }
1414

15+
uint64_t handler_warning_timeout_ms() const {
16+
return handler_warning_timeout_ms_;
17+
}
18+
1519
int64_t heartbeat_timeout_milliseconds() const {
1620
return heartbeat_timeout_milliseconds_;
1721
}
1822

1923
int64_t num_heartbeats_timeout() const { return num_heartbeats_timeout_; }
2024

25+
uint64_t num_heartbeats_warning() const { return num_heartbeats_warning_; }
26+
2127
int64_t initial_reconstruction_timeout_milliseconds() const {
2228
return initial_reconstruction_timeout_milliseconds_;
2329
}
@@ -113,8 +119,10 @@ class RayConfig {
113119
private:
114120
RayConfig()
115121
: ray_protocol_version_(0x0000000000000000),
122+
handler_warning_timeout_ms_(100),
116123
heartbeat_timeout_milliseconds_(100),
117124
num_heartbeats_timeout_(100),
125+
num_heartbeats_warning_(5),
118126
initial_reconstruction_timeout_milliseconds_(200),
119127
get_timeout_milliseconds_(1000),
120128
worker_get_request_size_(10000),
@@ -154,13 +162,21 @@ class RayConfig {
154162
/// In theory, this is used to detect Ray version mismatches.
155163
int64_t ray_protocol_version_;
156164

165+
/// The duration that a single handler on the event loop can take before a
166+
/// warning is logged that the handler is taking too long.
167+
uint64_t handler_warning_timeout_ms_;
168+
157169
/// The duration between heartbeats. These are sent by the plasma manager and
158170
/// local scheduler.
159171
int64_t heartbeat_timeout_milliseconds_;
160172
/// If a component has not sent a heartbeat in the last num_heartbeats_timeout
161173
/// heartbeat intervals, the global scheduler or monitor process will report
162174
/// it as dead to the db_client table.
163175
int64_t num_heartbeats_timeout_;
176+
/// For a raylet, if the last heartbeat was sent more than this many
177+
/// heartbeat periods ago, then a warning will be logged that the heartbeat
178+
/// handler is drifting.
179+
uint64_t num_heartbeats_warning_;
164180

165181
/// The initial period for a task execution lease. The lease will expire this
166182
/// many milliseconds after the first acquisition of the lease. Nodes that

src/ray/common/client_connection.cc

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -86,18 +86,21 @@ ray::Status ServerConnection<T>::WriteMessage(int64_t type, int64_t length,
8686
template <class T>
8787
std::shared_ptr<ClientConnection<T>> ClientConnection<T>::Create(
8888
ClientHandler<T> &client_handler, MessageHandler<T> &message_handler,
89-
boost::asio::basic_stream_socket<T> &&socket) {
89+
boost::asio::basic_stream_socket<T> &&socket, const std::string &debug_label) {
9090
std::shared_ptr<ClientConnection<T>> self(
91-
new ClientConnection(message_handler, std::move(socket)));
91+
new ClientConnection(message_handler, std::move(socket), debug_label));
9292
// Let our manager process our new connection.
9393
client_handler(*self);
9494
return self;
9595
}
9696

9797
template <class T>
9898
ClientConnection<T>::ClientConnection(MessageHandler<T> &message_handler,
99-
boost::asio::basic_stream_socket<T> &&socket)
100-
: ServerConnection<T>(std::move(socket)), message_handler_(message_handler) {}
99+
boost::asio::basic_stream_socket<T> &&socket,
100+
const std::string &debug_label)
101+
: ServerConnection<T>(std::move(socket)),
102+
message_handler_(message_handler),
103+
debug_label_(debug_label) {}
101104

102105
template <class T>
103106
const ClientID &ClientConnection<T>::GetClientID() {
@@ -149,7 +152,14 @@ void ClientConnection<T>::ProcessMessage(const boost::system::error_code &error)
149152
if (error) {
150153
read_type_ = static_cast<int64_t>(protocol::MessageType::DisconnectClient);
151154
}
155+
156+
uint64_t start_ms = current_time_ms();
152157
message_handler_(this->shared_from_this(), read_type_, read_message_.data());
158+
uint64_t interval = current_time_ms() - start_ms;
159+
if (interval > RayConfig::instance().handler_warning_timeout_ms()) {
160+
RAY_LOG(WARNING) << "[" << debug_label_ << "]ProcessMessage with type " << read_type_
161+
<< " took " << interval << " ms ";
162+
}
153163
}
154164

155165
template class ServerConnection<boost::asio::local::stream_protocol>;

src/ray/common/client_connection.h

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ class ClientConnection : public ServerConnection<T>,
8484
/// \return std::shared_ptr<ClientConnection>.
8585
static std::shared_ptr<ClientConnection<T>> Create(
8686
ClientHandler<T> &new_client_handler, MessageHandler<T> &message_handler,
87-
boost::asio::basic_stream_socket<T> &&socket);
87+
boost::asio::basic_stream_socket<T> &&socket, const std::string &debug_label);
8888

8989
/// \return The ClientID of the remote client.
9090
const ClientID &GetClientID();
@@ -100,7 +100,8 @@ class ClientConnection : public ServerConnection<T>,
100100
private:
101101
/// A private constructor for a node client connection.
102102
ClientConnection(MessageHandler<T> &message_handler,
103-
boost::asio::basic_stream_socket<T> &&socket);
103+
boost::asio::basic_stream_socket<T> &&socket,
104+
const std::string &debug_label);
104105
/// Process an error from the last operation, then process the message
105106
/// header from the client.
106107
void ProcessMessageHeader(const boost::system::error_code &error);
@@ -112,6 +113,8 @@ class ClientConnection : public ServerConnection<T>,
112113
ClientID client_id_;
113114
/// The handler for a message from the client.
114115
MessageHandler<T> message_handler_;
116+
/// A label used for debug messages.
117+
const std::string debug_label_;
115118
/// Buffers for the current message being read rom the client.
116119
int64_t read_version_;
117120
int64_t read_type_;

src/ray/object_manager/test/object_manager_stress_test.cc

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,8 +74,9 @@ class MockServer {
7474
object_manager_.ProcessClientMessage(client, message_type, message);
7575
};
7676
// Accept a new local client and dispatch it to the node manager.
77-
auto new_connection = TcpClientConnection::Create(client_handler, message_handler,
78-
std::move(object_manager_socket_));
77+
auto new_connection =
78+
TcpClientConnection::Create(client_handler, message_handler,
79+
std::move(object_manager_socket_), "object manager");
7980
DoAcceptObjectManager();
8081
}
8182

src/ray/object_manager/test/object_manager_test.cc

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,8 +65,9 @@ class MockServer {
6565
object_manager_.ProcessClientMessage(client, message_type, message);
6666
};
6767
// Accept a new local client and dispatch it to the node manager.
68-
auto new_connection = TcpClientConnection::Create(client_handler, message_handler,
69-
std::move(object_manager_socket_));
68+
auto new_connection =
69+
TcpClientConnection::Create(client_handler, message_handler,
70+
std::move(object_manager_socket_), "object manager");
7071
DoAcceptObjectManager();
7172
}
7273

src/ray/raylet/node_manager.cc

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ NodeManager::NodeManager(boost::asio::io_service &io_service,
8282
object_manager_(object_manager),
8383
gcs_client_(gcs_client),
8484
heartbeat_timer_(io_service),
85-
heartbeat_period_ms_(config.heartbeat_period_ms),
85+
heartbeat_period_(std::chrono::milliseconds(config.heartbeat_period_ms)),
8686
local_resources_(config.resource_config),
8787
local_available_resources_(config.resource_config),
8888
worker_pool_(config.num_initial_workers, config.num_workers_per_process,
@@ -108,7 +108,7 @@ NodeManager::NodeManager(boost::asio::io_service &io_service,
108108
remote_clients_(),
109109
remote_server_connections_(),
110110
actor_registry_() {
111-
RAY_CHECK(heartbeat_period_ms_ > 0);
111+
RAY_CHECK(heartbeat_period_.count() > 0);
112112
// Initialize the resource map with own cluster resource configuration.
113113
ClientID local_client_id = gcs_client_->client_table().GetLocalClientId();
114114
cluster_resource_map_.emplace(local_client_id,
@@ -205,6 +205,7 @@ ray::Status NodeManager::RegisterGcs() {
205205
driver_table_handler, nullptr));
206206

207207
// Start sending heartbeats to the GCS.
208+
last_heartbeat_at_ms_ = current_time_ms();
208209
Heartbeat();
209210

210211
return ray::Status::OK();
@@ -223,6 +224,14 @@ void NodeManager::HandleDriverTableUpdate(
223224
}
224225

225226
void NodeManager::Heartbeat() {
227+
uint64_t now_ms = current_time_ms();
228+
uint64_t interval = now_ms - last_heartbeat_at_ms_;
229+
if (interval > RayConfig::instance().num_heartbeats_warning() *
230+
RayConfig::instance().heartbeat_timeout_milliseconds()) {
231+
RAY_LOG(WARNING) << "Last heartbeat was sent " << interval << " ms ago ";
232+
}
233+
last_heartbeat_at_ms_ = now_ms;
234+
226235
RAY_LOG(DEBUG) << "[Heartbeat] sending heartbeat.";
227236
auto &heartbeat_table = gcs_client_->heartbeat_table();
228237
auto heartbeat_data = std::make_shared<HeartbeatTableDataT>();
@@ -255,8 +264,7 @@ void NodeManager::Heartbeat() {
255264
RAY_CHECK_OK(status);
256265

257266
// Reset the timer.
258-
auto heartbeat_period = boost::posix_time::milliseconds(heartbeat_period_ms_);
259-
heartbeat_timer_.expires_from_now(heartbeat_period);
267+
heartbeat_timer_.expires_from_now(heartbeat_period_);
260268
heartbeat_timer_.async_wait([this](const boost::system::error_code &error) {
261269
RAY_CHECK(!error);
262270
Heartbeat();

src/ray/raylet/node_manager.h

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
#ifndef RAY_RAYLET_NODE_MANAGER_H
22
#define RAY_RAYLET_NODE_MANAGER_H
33

4+
#include <boost/asio/steady_timer.hpp>
5+
46
// clang-format off
57
#include "ray/raylet/task.h"
68
#include "ray/object_manager/object_manager.h"
@@ -159,8 +161,13 @@ class NodeManager {
159161
plasma::PlasmaClient store_client_;
160162
/// A client connection to the GCS.
161163
std::shared_ptr<gcs::AsyncGcsClient> gcs_client_;
162-
boost::asio::deadline_timer heartbeat_timer_;
163-
uint64_t heartbeat_period_ms_;
164+
/// The timer used to send heartbeats.
165+
boost::asio::steady_timer heartbeat_timer_;
166+
/// The period used for the heartbeat timer.
167+
std::chrono::milliseconds heartbeat_period_;
168+
/// The time that the last heartbeat was sent at. Used to make sure we are
169+
/// keeping up with heartbeats.
170+
uint64_t last_heartbeat_at_ms_;
164171
/// The resources local to this node.
165172
const SchedulingResources local_resources_;
166173
/// The resources (and specific resource IDs) that are currently available.

src/ray/raylet/raylet.cc

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -93,9 +93,9 @@ void Raylet::HandleAcceptNodeManager(const boost::system::error_code &error) {
9393
const uint8_t *message) {
9494
node_manager_.ProcessNodeManagerMessage(*client, message_type, message);
9595
};
96-
// Accept a new local client and dispatch it to the node manager.
97-
auto new_connection = TcpClientConnection::Create(client_handler, message_handler,
98-
std::move(node_manager_socket_));
96+
// Accept a new TCP client and dispatch it to the node manager.
97+
auto new_connection = TcpClientConnection::Create(
98+
client_handler, message_handler, std::move(node_manager_socket_), "node manager");
9999
}
100100
// We're ready to accept another client.
101101
DoAcceptNodeManager();
@@ -115,9 +115,10 @@ void Raylet::HandleAcceptObjectManager(const boost::system::error_code &error) {
115115
const uint8_t *message) {
116116
object_manager_.ProcessClientMessage(client, message_type, message);
117117
};
118-
// Accept a new local client and dispatch it to the node manager.
119-
auto new_connection = TcpClientConnection::Create(client_handler, message_handler,
120-
std::move(object_manager_socket_));
118+
// Accept a new TCP client and dispatch it to the node manager.
119+
auto new_connection =
120+
TcpClientConnection::Create(client_handler, message_handler,
121+
std::move(object_manager_socket_), "object manager");
121122
DoAcceptObjectManager();
122123
}
123124

@@ -138,7 +139,7 @@ void Raylet::HandleAccept(const boost::system::error_code &error) {
138139
};
139140
// Accept a new local client and dispatch it to the node manager.
140141
auto new_connection = LocalClientConnection::Create(client_handler, message_handler,
141-
std::move(socket_));
142+
std::move(socket_), "worker");
142143
}
143144
// We're ready to accept another client.
144145
DoAccept();

src/ray/raylet/worker_pool_test.cc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,8 @@ class WorkerPoolTest : public ::testing::Test {
4242
HandleMessage(client, message_type, message);
4343
};
4444
boost::asio::local::stream_protocol::socket socket(io_service_);
45-
auto client =
46-
LocalClientConnection::Create(client_handler, message_handler, std::move(socket));
45+
auto client = LocalClientConnection::Create(client_handler, message_handler,
46+
std::move(socket), "worker");
4747
return std::shared_ptr<Worker>(new Worker(pid, client));
4848
}
4949

0 commit comments

Comments
 (0)