@@ -60,13 +60,15 @@ void JsonRpcConnection::Start()
60
60
61
61
void JsonRpcConnection::HandleIncomingMessages (boost::asio::yield_context yc)
62
62
{
63
+ namespace ch = std::chrono;
64
+
63
65
m_Stream->next_layer ().SetSeen (&m_Seen);
64
66
65
67
for (;;) {
66
- String message ;
68
+ String jsonString ;
67
69
68
70
try {
69
- message = JsonRpc::ReadMessage (m_Stream, yc, m_Endpoint ? -1 : 1024 * 1024 );
71
+ jsonString = JsonRpc::ReadMessage (m_Stream, yc, m_Endpoint ? -1 : 1024 * 1024 );
70
72
} catch (const std::exception& ex) {
71
73
Log (m_ShuttingDown ? LogDebug : LogNotice, " JsonRpcConnection" )
72
74
<< " Error while reading JSON-RPC message for identity '" << m_Identity
@@ -76,20 +78,54 @@ void JsonRpcConnection::HandleIncomingMessages(boost::asio::yield_context yc)
76
78
}
77
79
78
80
m_Seen = Utility::GetTime ();
81
+ if (m_Endpoint) {
82
+ m_Endpoint->AddMessageReceived (jsonString.GetLength ());
83
+ }
84
+
85
+ String rpcMethod (" UNKNOWN" );
86
+ String diagnosticInfo; // Contains the diagnostic/debug information in case of an error.
87
+
88
+ ch::steady_clock::duration cpuBoundDuration, totalDuration;
89
+ auto start (ch::steady_clock::now ());
79
90
80
91
try {
92
+ Defer extractTotalDuration ([&start, &totalDuration]() {
93
+ totalDuration = ch::steady_clock::now () - start;
94
+ });
95
+
81
96
CpuBoundWork handleMessage (yc);
82
97
98
+ // Cache the elapsed time to acquire a CPU semaphore used to detect extremely heavy workloads.
99
+ cpuBoundDuration = ch::steady_clock::now () - start;
100
+
101
+ Dictionary::Ptr message = JsonRpc::DecodeMessage (jsonString);
102
+ if (auto method = message->Get (" method" ); !method.IsEmpty ()) {
103
+ rpcMethod = method;
104
+ }
105
+
83
106
MessageHandler (message);
84
107
85
108
l_TaskStats.InsertValue (Utility::GetTime (), 1 );
86
109
} catch (const std::exception& ex) {
87
- Log (m_ShuttingDown ? LogDebug : LogWarning, " JsonRpcConnection" )
88
- << " Error while processing JSON-RPC message for identity '" << m_Identity
89
- << " ': " << DiagnosticInformation (ex);
110
+ diagnosticInfo = DiagnosticInformation (ex);
111
+ }
90
112
91
- break ;
113
+ auto severity = LogDebug;
114
+ if (totalDuration >= ch::seconds (5 ) || (!m_ShuttingDown && !diagnosticInfo.IsEmpty ())) {
115
+ // Either this RPC message took an unexpectedly long time to process, or a fatal error
116
+ // has occurred, so promote the log entry from debug to warning.
117
+ severity = LogWarning;
118
+ }
119
+
120
+ Log statsLog (severity, " JsonRpcConnection" );
121
+ statsLog << " Processing JSON-RPC '" << rpcMethod << " ' message for identity '" << m_Identity << " ' " ;
122
+
123
+ if (cpuBoundDuration >= ch::seconds (1 )) {
124
+ statsLog << " waited " << ch::duration_cast<ch::milliseconds>(cpuBoundDuration).count () << " ms on semaphore and " ;
92
125
}
126
+
127
+ statsLog << " took total " << ch::duration_cast<ch::milliseconds>(totalDuration).count ()
128
+ << " ms" << (diagnosticInfo.IsEmpty () ? " ." : " : Error: " +diagnosticInfo);
93
129
}
94
130
95
131
Disconnect ();
@@ -259,10 +295,8 @@ void JsonRpcConnection::Disconnect()
259
295
}
260
296
}
261
297
262
- void JsonRpcConnection::MessageHandler (const String& jsonString )
298
+ void JsonRpcConnection::MessageHandler (const Dictionary::Ptr& message )
263
299
{
264
- Dictionary::Ptr message = JsonRpc::DecodeMessage (jsonString);
265
-
266
300
if (m_Endpoint && message->Contains (" ts" )) {
267
301
double ts = message->Get (" ts" );
268
302
@@ -281,8 +315,6 @@ void JsonRpcConnection::MessageHandler(const String& jsonString)
281
315
origin->FromZone = m_Endpoint->GetZone ();
282
316
else
283
317
origin->FromZone = Zone::GetByName (message->Get (" originZone" ));
284
-
285
- m_Endpoint->AddMessageReceived (jsonString.GetLength ());
286
318
}
287
319
288
320
Value vmethod;
0 commit comments