@@ -60,13 +60,19 @@ void JsonRpcConnection::Start()
60
60
61
61
void JsonRpcConnection::HandleIncomingMessages (boost::asio::yield_context yc)
62
62
{
63
+ namespace ch = std::chrono;
64
+
65
+ auto toMilliseconds ([](ch::steady_clock::duration d) {
66
+ return ch::duration_cast<ch::milliseconds>(d).count ();
67
+ });
68
+
63
69
m_Stream->next_layer ().SetSeen (&m_Seen);
64
70
65
71
while (!m_ShuttingDown) {
66
- String message ;
72
+ String jsonString ;
67
73
68
74
try {
69
- message = JsonRpc::ReadMessage (m_Stream, yc, m_Endpoint ? -1 : 1024 * 1024 );
75
+ jsonString = JsonRpc::ReadMessage (m_Stream, yc, m_Endpoint ? -1 : 1024 * 1024 );
70
76
} catch (const std::exception& ex) {
71
77
Log (m_ShuttingDown ? LogDebug : LogNotice, " JsonRpcConnection" )
72
78
<< " Error while reading JSON-RPC message for identity '" << m_Identity
@@ -76,17 +82,50 @@ void JsonRpcConnection::HandleIncomingMessages(boost::asio::yield_context yc)
76
82
}
77
83
78
84
m_Seen = Utility::GetTime ();
85
+ if (m_Endpoint) {
86
+ m_Endpoint->AddMessageReceived (jsonString.GetLength ());
87
+ }
88
+
89
+ String rpcMethod (" UNKNOWN" );
90
+ ch::steady_clock::duration cpuBoundDuration (0 );
91
+ auto start (ch::steady_clock::now ());
79
92
80
93
try {
81
94
CpuBoundWork handleMessage (yc);
82
95
96
+ // Cache the elapsed time to acquire a CPU semaphore used to detect extremely heavy workloads.
97
+ cpuBoundDuration = ch::steady_clock::now () - start;
98
+
99
+ Dictionary::Ptr message = JsonRpc::DecodeMessage (jsonString);
100
+ if (String method = message->Get (" method" ); !method.IsEmpty ()) {
101
+ rpcMethod = std::move (method);
102
+ }
103
+
83
104
MessageHandler (message);
84
105
85
106
l_TaskStats.InsertValue (Utility::GetTime (), 1 );
107
+
108
+ auto total = ch::steady_clock::now () - start;
109
+
110
+ Log msg (total >= ch::seconds (5 ) ? LogWarning : LogDebug, " JsonRpcConnection" );
111
+ msg << " Processed JSON-RPC '" << rpcMethod << " ' message for identity '" << m_Identity
112
+ << " ' (took total " << toMilliseconds (total) << " ms" ;
113
+
114
+ if (cpuBoundDuration >= ch::seconds (1 )) {
115
+ msg << " , waited " << toMilliseconds (cpuBoundDuration) << " ms on semaphore" ;
116
+ }
117
+ msg << " )." ;
86
118
} catch (const std::exception& ex) {
87
- Log (m_ShuttingDown ? LogDebug : LogWarning, " JsonRpcConnection" )
88
- << " Error while processing JSON-RPC message for identity '" << m_Identity
89
- << " ': " << DiagnosticInformation (ex);
119
+ auto total = ch::steady_clock::now () - start;
120
+
121
+ Log msg (m_ShuttingDown ? LogDebug : LogWarning, " JsonRpcConnection" );
122
+ msg << " Error while processing JSON-RPC '" << rpcMethod << " ' message for identity '"
123
+ << m_Identity << " ' (took total " << toMilliseconds (total) << " ms" ;
124
+
125
+ if (cpuBoundDuration >= ch::seconds (1 )) {
126
+ msg << " , waited " << toMilliseconds (cpuBoundDuration) << " ms on semaphore" ;
127
+ }
128
+ msg << " ): " << DiagnosticInformation (ex);
90
129
91
130
break ;
92
131
}
@@ -259,10 +298,19 @@ void JsonRpcConnection::Disconnect()
259
298
}
260
299
}
261
300
262
- void JsonRpcConnection::MessageHandler (const String& jsonString)
301
+ /* *
302
+ * Route the provided message to its corresponding handler (if any).
303
+ *
304
+ * This will first verify the timestamp of that RPC message (if any) and subsequently, rejects any message whose
305
+ * timestamp is less than the remote log position of the client Endpoint; otherwise, the endpoint's remote log
306
+ * position is updated to that timestamp. It is not expected to happen, but any message lacking an RPC method or
307
+ * referring to a non-existent one is also discarded. Afterward, the RPC handler is then called for that message
308
+ * and sends it's result back to the sender if the message contains an ID.
309
+ *
310
+ * @param message The RPC message you want to process.
311
+ */
312
+ void JsonRpcConnection::MessageHandler (const Dictionary::Ptr& message)
263
313
{
264
- Dictionary::Ptr message = JsonRpc::DecodeMessage (jsonString);
265
-
266
314
if (m_Endpoint && message->Contains (" ts" )) {
267
315
double ts = message->Get (" ts" );
268
316
@@ -281,8 +329,6 @@ void JsonRpcConnection::MessageHandler(const String& jsonString)
281
329
origin->FromZone = m_Endpoint->GetZone ();
282
330
else
283
331
origin->FromZone = Zone::GetByName (message->Get (" originZone" ));
284
-
285
- m_Endpoint->AddMessageReceived (jsonString.GetLength ());
286
332
}
287
333
288
334
Value vmethod;
0 commit comments