Skip to content

Commit

Permalink
Rename getMetadata to prepareMetadata and ensure data access protected
Browse files Browse the repository at this point in the history
Currently the setChanged flag is being cleared/set while the data mutex is not
locked.
As getMessage() already aquires this lock and is only ever used when sending a
message, move the flag clear inside this method and rename to prepareMetadata to
reflect the fact it is no longer simply retrieving data.
Add a lock within the error callback in case sending fails and the flag must be
reset
This looks scary as we have multiple locks being aquired within sendMetadata()
I think it's ok as on the send side we are already aquiring this lock, we're
just adding a data mutation. On the receive side we're adding a lock, but we
will only ever call the success or failure path, never both, so shouldn't be the
case where we might deadlock waiting for two different mutexes - in this way
it's no different to aquiring a lock in any of the setter methods.
  • Loading branch information
rsjbailey committed Oct 15, 2021
1 parent c52615b commit 3cd8d8a
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class ObjectMetadataSender {
private:
void sendMetadata();
void startTimer();
MessageBuffer getMessage();
MessageBuffer prepareMessage();
void handleTimeout(std::error_code ec);
template<typename FunctionT>
void setData(FunctionT&& set) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,11 @@ void ObjectMetadataSender::connect(const std::string& endpoint,
startTimer();
}

MessageBuffer ObjectMetadataSender::getMessage() {
MessageBuffer ObjectMetadataSender::prepareMessage() {
std::lock_guard<std::mutex> lock(dataMutex_);
MessageBuffer buffer = allocBuffer(data_.ByteSizeLong());
data_.SerializeToArray(buffer.data(), buffer.size());
data_.set_changed(false);
return buffer;
}

Expand All @@ -65,16 +66,18 @@ void ObjectMetadataSender::sendMetadata() {
if (!connectionId_.isValid()) {
return;
}
auto msg = getMessage();
data_.set_changed(false);
auto msg = prepareMessage();
socket_.asyncSend(
msg, [this](std::error_code ec, const nng::Message& ignored) {
if (!ec) {
std::lock_guard<std::mutex> lock(timeoutMutex_);
lastSendTimestamp_ = std::chrono::system_clock::now();
} else {
data_.set_changed(true);
EAR_LOGGER_WARN(logger_, "Metadata sending failed: {}", ec.message());
{
std::lock_guard<std::mutex> dataLock(dataMutex_);
data_.set_changed(true);
}
EAR_LOGGER_WARN(logger_, "Metadata sending failed: {}", ec.message())
}
});
}
Expand Down

0 comments on commit 3cd8d8a

Please sign in to comment.