Skip to content
This repository has been archived by the owner on Feb 20, 2023. It is now read-only.

Synchronous Replication #1472

Merged
merged 103 commits into from
Mar 9, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
103 commits
Select commit Hold shift + click to select a range
641bf6c
Checkpoint. New replication manager.
lmwnshn Nov 2, 2020
c27168f
tmp unpush
lmwnshn Nov 2, 2020
3854236
Checkpoint. Fix messenger_test.
lmwnshn Nov 4, 2020
b47569b
Refactor the Messenger with Matt's suggestion.
lmwnshn Nov 4, 2020
6e13413
Checkpoint.
lmwnshn Nov 5, 2020
0de754e
Checkpoint. Replication locally via noisepage. Pretty sure tests brok…
lmwnshn Nov 5, 2020
afb1280
Merge branch 'master' of github.com:cmu-db/noisepage into replication
lmwnshn Dec 28, 2020
5584bd4
Switch to sanctionedsharedptr for logger.
lmwnshn Dec 28, 2020
ea976b4
Nuke dead code.
lmwnshn Dec 28, 2020
26c2e87
Eliminate potential race in messenger_test.
lmwnshn Dec 29, 2020
80f7767
Checkpoint.
lmwnshn Dec 29, 2020
375fb14
Checkpoint.
lmwnshn Dec 29, 2020
0d4a470
Checkpoint.
lmwnshn Dec 29, 2020
410cd96
Checkpoint.
lmwnshn Dec 31, 2020
73cf888
Add Tianlei's ReplicationLogProvider port.
tpan496 Jan 4, 2021
ef64dd5
Add support for simple replication.
lmwnshn Jan 4, 2021
bd407cb
Checkpoint. Slight progress. ty matt
lmwnshn Jan 14, 2021
5ca79ba
Fix the replica not returning serialized buffers to the empty buffer …
lmwnshn Feb 4, 2021
4d83efd
unpush me later.
lmwnshn Feb 9, 2021
380abb4
Merge branch 'master' of github.com:cmu-db/noisepage into replication
lmwnshn Feb 11, 2021
ed0c6bd
Checkpoint.
lmwnshn Feb 11, 2021
e9803b6
Checkpoint. Add dummy replication_get_last_record_id builtin.
lmwnshn Feb 14, 2021
0acb22d
Checkpoint. Nuke the ITP legacy stuff. Hook up replication manager an…
lmwnshn Feb 14, 2021
4739b65
Checkpoint. Shutdown, except we had to add some DBMain stuff that I s…
lmwnshn Feb 15, 2021
7d7d2ba
Merge branch 'master' of github.com:cmu-db/noisepage into replication
lmwnshn Feb 15, 2021
730f62c
Checkpoint. recovery_test is failing?
lmwnshn Feb 15, 2021
3bf0b14
Minor cleanup.
lmwnshn Feb 15, 2021
f17cbe8
Simplify cleanup logic.
lmwnshn Feb 15, 2021
e6a738f
Fix RecoveryManager race-y logic. Add documentation and warnings.
lmwnshn Feb 16, 2021
a458409
Merge branch 'master' into replication
lmwnshn Feb 16, 2021
dde1ef6
Documentation.
lmwnshn Feb 17, 2021
6c61675
add retention policy
tpan496 Feb 16, 2021
72fcf94
Checkpoint. Doxygen fixes and reverting to changes that I'm not comfo…
lmwnshn Feb 17, 2021
95f2aa4
Merge branch 'master' into replication
lmwnshn Feb 17, 2021
9ec03e4
Format.
lmwnshn Feb 17, 2021
8741991
Merge branch 'master' of github.com:cmu-db/noisepage into replication
lmwnshn Feb 17, 2021
63fae63
Small fix after removing unused variable.
lmwnshn Feb 17, 2021
05b0d56
Clang-tidy doesn't like excessive padding.
lmwnshn Feb 17, 2021
3e04d56
Update db_main comments.
lmwnshn Feb 17, 2021
8336a14
Update settings comment.
lmwnshn Feb 17, 2021
0d26d03
Update RecoveryManager comment.
lmwnshn Feb 17, 2021
2521f90
Move TODO around.
lmwnshn Feb 17, 2021
b939e0e
Update comment.
lmwnshn Feb 17, 2021
85d8908
Add explicit.
lmwnshn Feb 17, 2021
03f468b
Fix parameter naming. I swear I did this already.
lmwnshn Feb 18, 2021
edfcff9
Fix transaction_manager weirdness. Still a bit iffy on this...
lmwnshn Feb 18, 2021
8d2c807
Checkpoint.
lmwnshn Feb 18, 2021
861cf5f
Checkpoint.
lmwnshn Feb 18, 2021
1b5af2a
Checkpoint.
lmwnshn Feb 18, 2021
f5ef916
gitignore...
lmwnshn Feb 18, 2021
99dc705
Checkpoint.
lmwnshn Feb 19, 2021
51e2ffa
Fix mini runners.
lmwnshn Feb 19, 2021
4d9ff46
Merge branch 'master' into replication
lmwnshn Feb 19, 2021
507df4c
Merge branch 'master' of github.com:cmu-db/noisepage into replication
lmwnshn Feb 19, 2021
0c86727
Merge branch 'replication' of github.com:lmwnshn/noisepage into repli…
lmwnshn Feb 20, 2021
23d3fda
Merge branch 'master' of github.com:cmu-db/noisepage into replication
lmwnshn Feb 20, 2021
5e014f1
Add ninja job pools. Num logical cores = compile pool + link pool (1)…
lmwnshn Feb 21, 2021
c371699
Merge branch 'ninja_link' into replication
lmwnshn Feb 21, 2021
554961e
Merge branch 'master' into replication
lmwnshn Feb 22, 2021
a2e48db
Store Messenger IPC locally instead of in /tmp. Also delete IPC file …
lmwnshn Feb 22, 2021
8cd8c57
Don't hang if the other node dies in sync replication.
lmwnshn Feb 22, 2021
0c515a6
Merge branch 'master' of github.com:cmu-db/noisepage into replication
lmwnshn Feb 25, 2021
b568f14
Format.
lmwnshn Feb 25, 2021
9430157
Fix bplustree.h redefined constant shared with bwtree.h.
lmwnshn Feb 25, 2021
e040415
Merge branch 'bplustree_fix' into replication
lmwnshn Feb 25, 2021
3c81a4f
Lint.
lmwnshn Feb 25, 2021
3a1684f
Merge branch 'bplustree_fix' into replication
lmwnshn Feb 25, 2021
86b2a11
Add logging for enabling/disabling replication.
lmwnshn Feb 26, 2021
1042ae7
Fix bug in replica_sync.
lmwnshn Feb 26, 2021
47a804e
Add (what I hope is) the description of the right message in Replicat…
lmwnshn Feb 26, 2021
6fa6ec2
Remove dead code.
lmwnshn Mar 4, 2021
cf9e5b3
Refactor PrepareForSerialization.
lmwnshn Mar 4, 2021
2b56dee
Remove dead message.
lmwnshn Mar 4, 2021
612d3e5
Remove superfluous unlock.
lmwnshn Mar 4, 2021
107053f
Remove debug logging.
lmwnshn Mar 4, 2021
0e05526
Checkpoint. First pass at separating out primary/replica logic in Rep…
lmwnshn Mar 4, 2021
5e6b544
Checkpoint. Add documentation, including for the BufferedLogWriter mo…
lmwnshn Mar 4, 2021
6414138
Refactor json message keys into a ReplicateBufferMessage container cl…
lmwnshn Mar 4, 2021
5c90e42
Merge branch 'master' of github.com:cmu-db/noisepage into replication
lmwnshn Mar 4, 2021
d24cf2c
Add warning comment on message delivery failure.
lmwnshn Mar 4, 2021
146c903
Fix clang-tidy.
lmwnshn Mar 4, 2021
e89354e
Stash the "primary" constant into the PrimaryReplicationManager.
lmwnshn Mar 5, 2021
e3547bc
Move the replication test to the top.
lmwnshn Mar 5, 2021
caba0b1
Try switching localhost to 127.0.0.1.
lmwnshn Mar 5, 2021
acc2cda
Merge branch 'master' into replication
lmwnshn Mar 6, 2021
8142da4
Add top information to print_docker_info.
lmwnshn Mar 6, 2021
61d9aad
Try disabling ccache.
lmwnshn Mar 6, 2021
04ffc5d
Revert "Try disabling ccache."
lmwnshn Mar 6, 2021
7a13faf
Revert "Stash the "primary" constant into the PrimaryReplicationManag…
lmwnshn Mar 6, 2021
34685bc
Sin by sleeping. CI...
lmwnshn Mar 7, 2021
466deee
Try fixing a potential race between ListenForConnection() and the con…
lmwnshn Mar 7, 2021
851502a
blyat
lmwnshn Mar 7, 2021
4a9936e
Nope. Didn't work. Back to having a 30 second sleep.
lmwnshn Mar 8, 2021
e027b92
It might have been the SNDRCV timeout of 1 second. Try bumping that u…
lmwnshn Mar 8, 2021
f6627fa
I AM A MONKEY OF THE HIGHEST ORDER.
lmwnshn Mar 8, 2021
4d24ade
Avoid signalling cvars if there are no routers to add.
lmwnshn Mar 8, 2021
bd80a73
Merge branch 'master' into replication
mbutrovich Mar 8, 2021
649f051
Switch back to the model where the Messenger is responsible for invok…
lmwnshn Mar 8, 2021
4560cf5
Merge remote-tracking branch 'origin/replication' into replication
lmwnshn Mar 8, 2021
fcba2ca
Rearchitect the server loop structure to rely on key_message_type ins…
lmwnshn Mar 9, 2021
e85036f
Add some messenger documentation.
lmwnshn Mar 9, 2021
8898968
Add documentation on why routers are added with a level of indirection.
lmwnshn Mar 9, 2021
9868193
Delete dead comment.
lmwnshn Mar 9, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Merge branch 'master' of github.com:cmu-db/noisepage into replication
  • Loading branch information
lmwnshn committed Dec 28, 2020
commit afb1280a020e1980d2dc0b8c1e289f34299f4421
17 changes: 17 additions & 0 deletions src/include/main/db_main.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "network/postgres/postgres_protocol_interpreter.h"
#include "optimizer/statistics/stats_storage.h"
#include "replication/replication_manager.h"
#include "self_driving/model_server/model_server_manager.h"
#include "settings/settings_manager.h"
#include "settings/settings_param.h"
#include "storage/garbage_collector_thread.h"
Expand Down Expand Up @@ -433,6 +434,13 @@ class DBMain {
network_port_, connection_thread_count_, uds_file_directory_);
}

std::unique_ptr<modelserver::ModelServerManager> model_server_manager = DISABLED;
if (model_server_enable_) {
NOISEPAGE_ASSERT(use_messenger_, "Pilot requires messenger layer.");
model_server_manager =
std::make_unique<modelserver::ModelServerManager>(model_server_path_, messenger_layer->GetMessenger());
}

db_main->settings_manager_ = std::move(settings_manager);
db_main->metrics_manager_ = std::move(metrics_manager);
db_main->metrics_thread_ = std::move(metrics_thread);
Expand Down Expand Up @@ -783,6 +791,13 @@ class DBMain {
uint16_t messenger_port_ = 9022;
uint16_t replication_port_ = 15445;
std::string replication_hosts_path_ = "replication.conf";
bool model_server_enable_ = false;
/**
* The ModelServer script is located at PROJECT_ROOT/script/model by default, and also assume
* the build binary at PROJECT_ROOT/build/bin/noisepage. This should be override or set explicitly
* in use cases where such assumptions are no longer true.
*/
std::string model_server_path_ = "../../script/model/model_server.py";

/**
* Instantiates the SettingsManager and reads all of the settings to override the Builder's settings.
Expand Down Expand Up @@ -844,6 +859,8 @@ class DBMain {
use_replication_ = settings_manager->GetBool(settings::Param::replication_enable);
replication_port_ = settings_manager->GetInt(settings::Param::replication_port);
replication_hosts_path_ = settings_manager->GetString(settings::Param::replication_hosts_path);
model_server_enable_ = settings_manager->GetBool(settings::Param::model_server_enable);
model_server_path_ = settings_manager->GetString(settings::Param::model_server_path);

return settings_manager;
}
Expand Down
8 changes: 4 additions & 4 deletions src/include/replication/replication_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class Replica {

messenger::ConnectionDestination replica_info_;
messenger::ConnectionId connection_;
uint64_t last_heartbeat_; //< Time (unix epoch) that the replica heartbeat was last successful.
uint64_t last_heartbeat_; ///< Time (unix epoch) that the replica heartbeat was last successful.
};

/**
Expand Down Expand Up @@ -91,9 +91,9 @@ class ReplicationManager {
common::ManagedPointer<messenger::ConnectionId> GetReplicaConnection(const std::string &replica_name);

common::ManagedPointer<messenger::Messenger> messenger_;
std::string identity_; //< The identity of this replica.
uint16_t port_; //< The port that replication runs on.
std::unordered_map<std::string, Replica> replicas_; //< Replica Name -> Connection ID.
std::string identity_; ///< The identity of this replica.
uint16_t port_; ///< The port that replication runs on.
std::unordered_map<std::string, Replica> replicas_; ///< Replica Name -> Connection ID.
std::mutex mutex_;
std::condition_variable cvar_;
};
Expand Down
5 changes: 5 additions & 0 deletions src/include/self_driving/model_server/model_server_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,19 @@

#pragma once

#include <atomic>
#include <condition_variable>
#include <string>
#include <thread>
#include <utility>
#include <vector>

#include "common/json.h"
#include "common/managed_pointer.h"
#include "messenger/messenger_defs.h"

namespace noisepage::messenger {
class ConnectionRouter;
class Messenger;
} // namespace noisepage::messenger

Expand Down
17 changes: 17 additions & 0 deletions src/include/settings/settings_defs.h
Original file line number Diff line number Diff line change
Expand Up @@ -368,4 +368,21 @@ SETTING_string(
false,
noisepage::settings::Callbacks::NoOp
)

SETTING_bool(
model_server_enable,
"Whether to enable the ModelServerManager (default: false)",
false,
false,
noisepage::settings::Callbacks::NoOp
)

// Relative path assuming binary locate at PROJECT_ROOT/build/bin/, and model_server.py at PROJECT_ROOT/script/model
SETTING_string(
model_server_path,
"The python model server script to invoke (default: ../../script/model/model_server.py)",
"../../script/model/model_server.py",
false,
noisepage::settings::Callbacks::NoOp
)
// clang-format on
101 changes: 87 additions & 14 deletions src/messenger/messenger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,81 @@

namespace noisepage::messenger {

///** An abstraction around ZeroMQ messages which explicitly have the sender specified. */
// class ZmqMessage {
// public:
// /**
// * Build a new ZmqMessage from the supplied information.
// * @param send_msg_id The ID of the message on the sender side.
// * @param recv_cb_id The ID of the message on the receiver side.
// * @param sender_id The identity of the sender.
// * @param message The contents of the message.
// * @return A ZmqMessage encapsulating the given message.
// */
// static ZmqMessage Build(uint64_t send_msg_id, uint64_t recv_cb_id, const std::string &sender_id,
// std::string_view message) {
// return ZmqMessage{sender_id, fmt::format("{}-{}-{}", send_msg_id, recv_cb_id, message)};
// }
//
// /**
// * Parse the given payload into a ZmqMessage.
// * @param routing_id The message's routing ID.
// * @param message The message received.
// * @return A ZmqMessage encapsulating the given message.
// */
// static ZmqMessage Parse(const std::string &routing_id, const std::string &message) {
// return ZmqMessage{routing_id, message};
// }
//
// /** @return The ID of this message (sender side). */
// uint64_t GetMessageIdSender() const { return send_msg_id_; }
//
// /** @return The ID of this message (receiver side). */
// uint64_t GetCallbackIdReceiver() const { return recv_cb_id_; }
//
// /** @return The routing ID of this message. */
// std::string_view GetRoutingId() const { return std::string_view(routing_id_); }
//
// /** @return The message itself. */
// std::string_view GetMessage() const { return message_; }
//
// /** @return The raw payload of the message. */
// std::string_view GetRawPayload() const { return std::string_view(payload_); }
//
// private:
// friend Messenger;
//
// /** Construct a new ZmqMessage with the given routing ID and payload. Payload of form ID-MESSAGE. */
// ZmqMessage(std::string routing_id, std::string payload)
// : routing_id_(std::move(routing_id)), payload_(std::move(payload)), message_(payload_) {
// if (payload_.empty()) {
// send_msg_id_ = 0;
// recv_cb_id_ = 0;
// } else {
// // TODO(WAN): atoi, stoull, from_chars, etc? Error checking in general.
// UNUSED_ATTRIBUTE int check =
// std::sscanf(payload_.c_str(), "%" SCNu64 "-%" SCNu64 "-", &send_msg_id_, &recv_cb_id_);
// NOISEPAGE_ASSERT(2 == check, "Couldn't parse the message header.");
//
// // Remove the prefix up to the second '-'
// message_.remove_prefix(message_.find_first_of('-') + 1);
// message_.remove_prefix(message_.find_first_of('-') + 1);
// }
// }
//
// /** The routing ID of the message. */
// std::string routing_id_;
// /** The payload in the message, of form ID-MESSAGE. */
// std::string payload_;
//
// /** The cached id of the message (sender side). */
// uint64_t send_msg_id_;
// /** The cached id of the message (receiver side). */
// uint64_t recv_cb_id_;
// /** The cached actual message. */
// std::string_view message_;
//};

ZmqMessage ZmqMessage::Build(uint64_t source_cb_id, uint64_t dest_cb_id, const std::string &routing_id,
std::string_view message) {
return ZmqMessage{routing_id, fmt::format("{}-{}-{}", source_cb_id, dest_cb_id, message)};
Expand All @@ -112,7 +187,10 @@ ZmqMessage::ZmqMessage(std::string routing_id, std::string payload)
UNUSED_ATTRIBUTE int check =
std::sscanf(payload_.c_str(), "%" SCNu64 "-%" SCNu64 "-", &source_cb_id_, &dest_cb_id_);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is my opinion, so feel free to ignore it if you disagree. I guess this was already here, but it makes me a little nervous seeing the expected format of the message hard coded into this constructor here and on lines 117, 118. It might be more obvious to make these constants and refer to those.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unfortunately it is also hardcoded into the model server message format, off in Python land, if I'm not mistaken. but yea, I can try to refactor -- I seem to recall facing some weird issue which ended with those SCNu64's there. you're right to point out hacks, please keep doing so!

NOISEPAGE_ASSERT(2 == check, "Couldn't parse the message header.");
message_.remove_prefix(message_.find_last_of('-') + 1);

// Remove the prefix up to the second '-'
message_.remove_prefix(message_.find_first_of('-') + 1);
message_.remove_prefix(message_.find_first_of('-') + 1);
}
}

Expand Down Expand Up @@ -276,20 +354,14 @@ ConnectionId::ConnectionId(common::ManagedPointer<Messenger> messenger, const Co
socket_->set(zmq::sockopt::linger, 0); // Discard all pending messages immediately on socket close.
socket_->connect(target.GetDestination());
routing_id_ = ZmqUtil::GetRoutingId(common::ManagedPointer(socket_));
MESSENGER_LOG_INFO(fmt::format("[PID={}] Registered (but haven't opened!) a new connection to {} ({}) as {}.",
MESSENGER_LOG_TRACE(fmt::format("[PID={}] Registered (but haven't opened!) a new connection to {} ({}) as {}.",
::getpid(), target_name_, target.GetDestination(), routing_id_.c_str()));
// Add the new socket to the list of sockets that will be polled by the server loop.
messenger->polled_sockets_->AddPollItem(socket_.get(), nullptr);
}

ConnectionId::~ConnectionId() = default;

ConnectionId::ConnectionId(ConnectionId &&other) {
this->socket_ = std::move(other.socket_);
this->routing_id_ = std::move(other.routing_id_);
this->target_name_ = std::move(other.target_name_);
}

ConnectionRouter::ConnectionRouter(common::ManagedPointer<Messenger> messenger, const ConnectionDestination &target,
const std::string &identity, CallbackFn callback)
: callback_(std::move(callback)), identity_(identity) {
Expand Down Expand Up @@ -392,7 +464,7 @@ ConnectionId Messenger::MakeConnection(const ConnectionDestination &target) {
}

void Messenger::SendMessage(common::ManagedPointer<ConnectionId> connection_id, const std::string &message,
CallbackFn callback, uint64_t remote_cb_id) {
CallbackFn callback, uint64_t recv_cb_id) {
// Note that a ConnectionId cannot be used from multiple threads. This is an inherent ZeroMQ limitation.
// If you need another thread to connect to the same ConnectionDestination, just get a new ConnectionId.
// This allows us to avoid taking latches around the socket that the ConnectionId wraps.
Expand All @@ -404,14 +476,14 @@ void Messenger::SendMessage(common::ManagedPointer<ConnectionId> connection_id,
callbacks_mutex_.unlock();

// Build and send the message.
ZmqMessage msg = ZmqMessage::Build(send_msg_id, remote_cb_id, connection_id->routing_id_, message);
ZmqMessage msg = ZmqMessage::Build(send_msg_id, recv_cb_id, connection_id->routing_id_, message);
ZmqUtil::SendMsgPayload(common::ManagedPointer(connection_id->socket_), msg);
MESSENGER_LOG_TRACE(
fmt::format("[PID={}] Messenger SENT-TO {}: {} ", ::getpid(), connection_id->target_name_, msg.GetRawPayload()));
}

void Messenger::SendMessage(common::ManagedPointer<ConnectionRouter> router_id, const std::string &recv_id,
const std::string &message, CallbackFn callback, uint64_t remote_cb_id) {
const std::string &message, CallbackFn callback, uint64_t recv_cb_id) {
uint64_t send_msg_id = GetNextSendMessageId();

// Register the callback that will be invoked when a response to this message is received.
Expand All @@ -422,7 +494,7 @@ void Messenger::SendMessage(common::ManagedPointer<ConnectionRouter> router_id,
// Build and send the message. Note that ConnectionRouter is a ROUTER socket.
zmq::message_t router_data(recv_id.data(), recv_id.size());
if (router_id->socket_->send(router_data, zmq::send_flags::sndmore).has_value()) {
ZmqMessage reply = ZmqMessage::Build(send_msg_id, remote_cb_id, router_id->identity_, message);
ZmqMessage reply = ZmqMessage::Build(send_msg_id, recv_cb_id, router_id->identity_, message);
ZmqUtil::SendMsgIdentity(common::ManagedPointer(router_id->socket_.get()), router_id->identity_);
ZmqUtil::SendMsgPayload(common::ManagedPointer(router_id->socket_.get()), reply);
MESSENGER_LOG_TRACE(fmt::format("[PID={}] Messenger ({}) SENT-TO {}: {} ", ::getpid(), router_id->identity_,
Expand Down Expand Up @@ -476,8 +548,9 @@ void Messenger::ServerLoop() {
bool has_custom_serverloop = poll_items.server_callbacks_[i] != nullptr;
MESSENGER_LOG_TRACE("[PID={}] Messenger RECV-FR {} (custom serverloop: {}): {}", ::getpid(), msg.GetRoutingId(),
has_custom_serverloop, msg.GetRawPayload());
ProcessMessage(msg);
if (has_custom_serverloop) {
if (!has_custom_serverloop) {
ProcessMessage(msg);
} else {
auto &server_callback = poll_items.server_callbacks_[i];
(*server_callback)(common::ManagedPointer(this), msg);
}
Expand Down
20 changes: 13 additions & 7 deletions src/replication/replication_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ ReplicationManager::ReplicationManager(common::ManagedPointer<noisepage::messeng
[this](common::ManagedPointer<messenger::Messenger> messenger,
const messenger::ZmqMessage &msg) { EventLoop(messenger, msg); });
BuildReplicaList(replication_hosts_path);

// TODO(WAN): development purposes
replication_logger->set_level(spdlog::level::trace);
lmwnshn marked this conversation as resolved.
Show resolved Hide resolved

for (const auto &replica : replicas_) {
ReplicaHeartbeat(replica.first);
}
Expand Down Expand Up @@ -105,7 +109,7 @@ void ReplicationManager::EventLoop(common::ManagedPointer<noisepage::messenger::
const noisepage::messenger::ZmqMessage &msg) {
switch (static_cast<MessageType>(msg.GetDestinationCallbackId())) {
case MessageType::HEARTBEAT:
REPLICATION_LOG_INFO(fmt::format("Heartbeat from: {}", msg.GetRoutingId()));
REPLICATION_LOG_TRACE(fmt::format("Heartbeat from: {}", msg.GetRoutingId()));
break;
default:
break;
Expand All @@ -121,10 +125,12 @@ void ReplicationManager::ReplicaHeartbeat(const std::string &replica_name) {
auto epoch_now_ms = std::chrono::duration_cast<std::chrono::milliseconds>(epoch_now);
if (0 == replica.last_heartbeat_) {
replica.last_heartbeat_ = epoch_now_ms.count();
REPLICATION_LOG_TRACE(
fmt::format("Replica {}: heartbeat initialized at {}.", replica_name, replica.last_heartbeat_));
}
}

REPLICATION_LOG_INFO(fmt::format("Replica {}: heartbeat start.", replica_name));
REPLICATION_LOG_TRACE(fmt::format("Replica {}: heartbeat start.", replica_name));
try {
messenger_->SendMessage(
GetReplicaConnection(replica_name), "",
Expand All @@ -133,21 +139,21 @@ void ReplicationManager::ReplicaHeartbeat(const std::string &replica_name) {
auto epoch_now = std::chrono::system_clock::now().time_since_epoch();
auto epoch_now_ms = std::chrono::duration_cast<std::chrono::milliseconds>(epoch_now);
replica.last_heartbeat_ = epoch_now_ms.count();
REPLICATION_LOG_INFO(fmt::format("Replica {}: last heartbeat {}, heartbeat {} OK.", replica_name,
replica.last_heartbeat_, epoch_now_ms.count()));
REPLICATION_LOG_TRACE(fmt::format("Replica {}: last heartbeat {}, heartbeat {} OK.", replica_name,
replica.last_heartbeat_, epoch_now_ms.count()));
},
static_cast<uint64_t>(MessageType::HEARTBEAT));
} catch (const MessengerException &e) {
REPLICATION_LOG_INFO(
REPLICATION_LOG_TRACE(
fmt::format("Replica {}: last heartbeat {}, heartbeat failed.", replica_name, replica.last_heartbeat_));
auto epoch_now = std::chrono::system_clock::now().time_since_epoch();
auto epoch_now_ms = std::chrono::duration_cast<std::chrono::milliseconds>(epoch_now);
if (epoch_now_ms.count() - replica.last_heartbeat_ >= REPLICATION_CARDIAC_ARREST_MS) {
REPLICATION_LOG_INFO(fmt::format("Replica {}: last heartbeat {}, declared dead {}.", replica_name,
REPLICATION_LOG_WARN(fmt::format("Replica {}: last heartbeat {}, declared dead {}.", replica_name,
replica.last_heartbeat_, epoch_now_ms.count()));
}
}
REPLICATION_LOG_INFO(fmt::format("Replica {}: heartbeat end.", replica_name));
REPLICATION_LOG_TRACE(fmt::format("Replica {}: heartbeat end.", replica_name));
}

common::ManagedPointer<noisepage::messenger::ConnectionId> ReplicationManager::GetReplicaConnection(
Expand Down
26 changes: 15 additions & 11 deletions src/self_driving/model_server/model_server_manager.cpp
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
#include "self_driving/model_server/model_server_manager.h"

#include <sys/wait.h>

#include <csignal>
#include <thread> // NOLINT

#include "common/json.h"
#include "loggers/model_server_logger.h"
#include "messenger/connection_destination.h"
#include "messenger/messenger.h"
#include "self_driving/model_server/model_server_manager.h"

namespace noisepage::modelserver {
static constexpr const char *MODEL_CONN_ID_NAME = "model-server-conn";
Expand Down Expand Up @@ -50,8 +52,11 @@ ModelServerManager::ModelServerManager(const std::string &model_bin,
}
})) {
// Model Initialization handling logic
auto msm_handler = [&](common::ManagedPointer<messenger::Messenger> messenger, std::string_view sender_id,
std::string_view message, uint64_t recv_cb_id) {
auto msm_handler = [&](common::ManagedPointer<messenger::Messenger> messenger, const messenger::ZmqMessage &msg) {
uint64_t sender_id = msg.GetSourceCallbackId();
uint64_t recv_cb_id = msg.GetDestinationCallbackId();
std::string_view message = msg.GetMessage();

// ModelServer connected
MODEL_SERVER_LOG_TRACE("[PID={},SENDER_ID={}] Messenger RECV: {}, {}", ::getpid(), sender_id, message, recv_cb_id);

Expand Down Expand Up @@ -168,10 +173,10 @@ bool ModelServerManager::TrainWith(const std::vector<std::string> &methods, cons
j["data"]["save_path"] = save_path;

// Callback to notify the waiter for result, or failure to parse the result.
auto callback = [&, future](common::ManagedPointer<messenger::Messenger> messenger, std::string_view sender_id,
std::string_view message, uint64_t recv_cb_id) {
MODEL_SERVER_LOG_INFO("Callback :recv_cb_id={}, message={}", recv_cb_id, message);
future->Done(message);
auto callback = [&, future](common::ManagedPointer<messenger::Messenger> messenger,
const messenger::ZmqMessage &msg) {
MODEL_SERVER_LOG_INFO("Callback :recv_cb_id={}, message={}", msg.GetDestinationCallbackId(), msg.GetMessage());
future->Done(msg.GetMessage());
};

return SendMessage(j.dump(), callback);
Expand All @@ -189,10 +194,9 @@ std::pair<std::vector<std::vector<double>>, bool> ModelServerManager::DoInferenc
ModelServerFuture<std::vector<std::vector<double>>> future;

// Callback to notify waiter with result
auto callback = [&](common::ManagedPointer<messenger::Messenger> messenger, std::string_view sender_id,
std::string_view message, uint64_t recv_cb_id) {
MODEL_SERVER_LOG_INFO("Callback :recv_cb_id={}, message={}", recv_cb_id, message);
future.Done(message);
auto callback = [&](common::ManagedPointer<messenger::Messenger> messenger, const messenger::ZmqMessage &msg) {
MODEL_SERVER_LOG_INFO("Callback :recv_cb_id={}, message={}", msg.GetDestinationCallbackId(), msg.GetMessage());
future.Done(msg.GetMessage());
};

// Fail to send the message
Expand Down
You are viewing a condensed version of this merge commit. You can view the full changes here.