Skip to content

JsonRpcConnection#Send*(): discard messages ASAP once shutting down #9991

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Sep 3, 2024
7 changes: 7 additions & 0 deletions lib/base/defer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ class Defer
{
}

Defer() = default;

Defer(const Defer&) = delete;
Defer(Defer&&) = delete;
Defer& operator=(const Defer&) = delete;
Expand All @@ -39,6 +41,11 @@ class Defer
}
}

inline void SetFunc(std::function<void()> func)
{
m_Func = std::move(func);
}

inline
void Cancel()
{
Expand Down
44 changes: 24 additions & 20 deletions lib/remote/apilistener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1021,17 +1021,22 @@ void ApiListener::ApiTimerHandler()
maxTs = client->GetTimestamp();
}

Log(LogNotice, "ApiListener")
<< "Setting log position for identity '" << endpoint->GetName() << "': "
<< Utility::FormatDateTime("%Y/%m/%d %H:%M:%S", ts);

for (const JsonRpcConnection::Ptr& client : endpoint->GetClients()) {
if (client->GetTimestamp() == maxTs) {
client->SendMessage(lmessage);
try {
client->SendMessage(lmessage);
} catch (const std::runtime_error& ex) {
Log(LogNotice, "ApiListener")
<< "Error while setting log position for identity '" << endpoint->GetName() << "': " << DiagnosticInformation(ex, false);
}
} else {
client->Disconnect();
}
}

Log(LogNotice, "ApiListener")
<< "Setting log position for identity '" << endpoint->GetName() << "': "
<< Utility::FormatDateTime("%Y/%m/%d %H:%M:%S", ts);
}
}

Expand Down Expand Up @@ -1195,7 +1200,12 @@ void ApiListener::SyncSendMessage(const Endpoint::Ptr& endpoint, const Dictionar
if (client->GetTimestamp() != maxTs)
continue;

client->SendMessage(message);
try {
client->SendMessage(message);
} catch (const std::runtime_error& ex) {
Log(LogNotice, "ApiListener")
<< "Error while sending message to endpoint '" << endpoint->GetName() << "': " << DiagnosticInformation(ex, false);
}
}
}
}
Expand Down Expand Up @@ -1437,10 +1447,12 @@ void ApiListener::LogGlobHandler(std::vector<int>& files, const String& file)
void ApiListener::ReplayLog(const JsonRpcConnection::Ptr& client)
{
Endpoint::Ptr endpoint = client->GetEndpoint();
Defer resetEndpointSyncing ([&endpoint]() {
ObjectLock olock(endpoint);
endpoint->SetSyncing(false);
});

if (endpoint->GetLogDuration() == 0) {
ObjectLock olock2(endpoint);
endpoint->SetSyncing(false);
return;
}

Expand All @@ -1457,21 +1469,21 @@ void ApiListener::ReplayLog(const JsonRpcConnection::Ptr& client)
Zone::Ptr target_zone = target_endpoint->GetZone();

if (!target_zone) {
ObjectLock olock2(endpoint);
endpoint->SetSyncing(false);
return;
}

for (;;) {
std::unique_lock<std::mutex> lock(m_LogLock);

CloseLogFile();
Defer reopenLog;

if (count == -1 || count > 50000) {
OpenLogFile();
lock.unlock();
} else {
last_sync = true;
reopenLog.SetFunc([this]() { OpenLogFile(); });
}

count = 0;
Expand Down Expand Up @@ -1544,8 +1556,7 @@ void ApiListener::ReplayLog(const JsonRpcConnection::Ptr& client)

Log(LogDebug, "ApiListener")
<< "Error while replaying log for endpoint '" << endpoint->GetName() << "': " << DiagnosticInformation(ex);

break;
return;
}

peer_ts = pmessage->Get("timestamp");
Expand Down Expand Up @@ -1578,14 +1589,7 @@ void ApiListener::ReplayLog(const JsonRpcConnection::Ptr& client)
}

if (last_sync) {
{
ObjectLock olock2(endpoint);
endpoint->SetSyncing(false);
}

OpenLogFile();

break;
return;
}
}
}
Expand Down
28 changes: 21 additions & 7 deletions lib/remote/jsonrpcconnection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -163,23 +163,39 @@ ConnectionRole JsonRpcConnection::GetRole() const

void JsonRpcConnection::SendMessage(const Dictionary::Ptr& message)
{
if (m_ShuttingDown) {
BOOST_THROW_EXCEPTION(std::runtime_error("Cannot send message to already disconnected API client '" + GetIdentity() + "'!"));
Copy link
Member Author

@Al2Klimov Al2Klimov Mar 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not a big friend of this tbh. All callers catch exceptions – ex. SyncSendMessage() which I didn't finish checking, yet. But so far I see that SyncSendMessage() is called by RelayMessageOne() which is called by SyncRelayMessage(). The latter misses PersistMessage() if RelayMessageOne() throws. Not good. you patched to catch them in 0c7c7ba, to be fair.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But so far I see that SyncSendMessage() is called by RelayMessageOne() which is called by SyncRelayMessage(). The latter misses PersistMessage() if RelayMessageOne() throws.

This is never going happen! Nothing has changed in the previous synchronisation behaviour. The only thing that has changed is that we have added an error handling here.

try {
client->SendMessage(message);
} catch (const std::runtime_error& ex) {
Log(LogInformation, "ApiListener")
<< "Error while sending message to endpoint '" << endpoint->GetName() << "': " << DiagnosticInformation(ex, false);
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I opt for getting the anti-memory-leak ready w/o changing too much for now. It shouldn't depend on the exceptions part.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#9896 (comment) stated the reason for splitting that PR. Tracing the callers of these two functions now throwing exceptions until one reaches exception handling and verifying that it's in the right place is annoying, but feasible in a rather short amount of time. Currently, when a client disconnects, both CPU generating the messages and RAM storing them is wasted, and your suggestion would only fix one (that was actually one of the first things I noticed when looking at this PR: #9991 (comment)).

}

Ptr keepAlive (this);

m_IoStrand.post([this, keepAlive, message]() { SendMessageInternal(message); });
}

void JsonRpcConnection::SendRawMessage(const String& message)
{
if (m_ShuttingDown) {
BOOST_THROW_EXCEPTION(std::runtime_error("Cannot send message to already disconnected API client '" + GetIdentity() + "'!"));
}

Ptr keepAlive (this);

m_IoStrand.post([this, keepAlive, message]() {
if (m_ShuttingDown) {
return;
}

m_OutgoingMessagesQueue.emplace_back(message);
m_OutgoingMessagesQueued.Set();
});
}

void JsonRpcConnection::SendMessageInternal(const Dictionary::Ptr& message)
{
if (m_ShuttingDown) {
return;
}

m_OutgoingMessagesQueue.emplace_back(JsonEncode(message));
m_OutgoingMessagesQueued.Set();
}
Expand All @@ -188,12 +204,10 @@ void JsonRpcConnection::Disconnect()
{
namespace asio = boost::asio;

JsonRpcConnection::Ptr keepAlive (this);

IoEngine::SpawnCoroutine(m_IoStrand, [this, keepAlive](asio::yield_context yc) {
if (!m_ShuttingDown) {
m_ShuttingDown = true;
if (!m_ShuttingDown.exchange(true)) {
JsonRpcConnection::Ptr keepAlive (this);

IoEngine::SpawnCoroutine(m_IoStrand, [this, keepAlive](asio::yield_context yc) {
Log(LogWarning, "JsonRpcConnection")
<< "API client disconnected for identity '" << m_Identity << "'";

Expand Down Expand Up @@ -241,8 +255,8 @@ void JsonRpcConnection::Disconnect()
shutdownTimeout->Cancel();

m_Stream->lowest_layer().shutdown(m_Stream->lowest_layer().shutdown_both, ec);
}
});
});
}
}

void JsonRpcConnection::MessageHandler(const String& jsonString)
Expand Down
3 changes: 2 additions & 1 deletion lib/remote/jsonrpcconnection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

#include "remote/i2-remote.hpp"
#include "remote/endpoint.hpp"
#include "base/atomic.hpp"
#include "base/io-engine.hpp"
#include "base/tlsstream.hpp"
#include "base/timer.hpp"
Expand Down Expand Up @@ -77,7 +78,7 @@ class JsonRpcConnection final : public Object
std::vector<String> m_OutgoingMessagesQueue;
AsioConditionVariable m_OutgoingMessagesQueued;
AsioConditionVariable m_WriterDone;
bool m_ShuttingDown;
Atomic<bool> m_ShuttingDown;
boost::asio::deadline_timer m_CheckLivenessTimer, m_HeartbeatTimer;

JsonRpcConnection(const String& identity, bool authenticated, const Shared<AsioTlsStream>::Ptr& stream, ConnectionRole role, boost::asio::io_context& io);
Expand Down
Loading