Skip to content

Commit

Permalink
Fixed message update race
Browse files Browse the repository at this point in the history
  • Loading branch information
gregzaitsev committed Aug 20, 2019
1 parent 2220212 commit 94b5c6a
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 7 deletions.
22 changes: 15 additions & 7 deletions src/jsonrpc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@ Json CJsonRpc::request(Json jsonMap, long timeout_s) {

// build request
Json request = Json::object{
{"id", query.id}, {"jsonrpc", _jsonrpcVersion}, {"method", jsonMap["method"]}, {"params", jsonMap["params"]},
{"id", query.id},
{"jsonrpc", _jsonrpcVersion},
{"method", jsonMap["method"]},
{"params", jsonMap["params"]},
};

// Send the command
Expand All @@ -59,6 +62,13 @@ Json CJsonRpc::request(Json jsonMap, long timeout_s) {
return move(result);
}

void CJsonRpc::delayedUpdateThread(Json message, int subscriptionId) {
usleep(1000000);
if (_wsSubscribers.count(subscriptionId) != 0) {
_wsSubscribers[subscriptionId]->handleWsMessage(subscriptionId, message["params"]["result"]);
}
}

void CJsonRpc::handleMessage(const string &payload) {
string err;
_logger->info(string("Message received: ") + payload);
Expand Down Expand Up @@ -90,13 +100,11 @@ void CJsonRpc::handleMessage(const string &payload) {
bool observerFound = (_wsSubscribers.count(subscriptionId) != 0);
_queryMtx.unlock();
if (!observerFound) {
usleep(500000);
}

_queryMtx.lock();
if (_wsSubscribers.count(subscriptionId))
std::thread t(&CJsonRpc::delayedUpdateThread, this, json, subscriptionId);
t.detach();
} else {
_wsSubscribers[subscriptionId]->handleWsMessage(subscriptionId, json["params"]["result"]);
_queryMtx.unlock();
}
} else {
_logger->error("Unknown type of response: " + payload);
}
Expand Down
2 changes: 2 additions & 0 deletions src/jsonrpc.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ class CJsonRpc : public IMessageObserver, public IJsonRpc {
// Map between subscription IDs and subscribers
map<int, IWebSocketMessageObserver *> _wsSubscribers;

void delayedUpdateThread(Json message, int subscriptionId);

int getNextId();

public:
Expand Down

0 comments on commit 94b5c6a

Please sign in to comment.