-
Notifications
You must be signed in to change notification settings - Fork 501
Conversation
Instead of sender and receiver callback IDs, which I got confused by, use source and destination callback IDs. This is helpful when writing callbacks that take in a ZmqResponse message, because now the source is clearly the remote host and the destination is clearly the local host, whereas previously sender is ambiguous since the receiver is now sending a message to the original sender. That last sentence is itself confusing.
…queue. Additionally, change replication to blocking to make it more reliable. However, this is unacceptably slow and should be undone at some point.
…hould check with someone.
More seriously, the messenger listen used to be the last message. Now it is back to the domain socket listen being the last message.
…ing all SendMessage-related callbacks, before the server loop gets a chance to operate on the message.
@jkosh44 Sorry to keep bugging you, could you look at this commit? This came up before in the ModelServerManager. I refactored it so that the Messenger exposed its callbacks, and the server loop callback became responsible for invoking the send message callback. But upon further reflection, I think it is preferable that the Messenger automagically handles it. |
} | ||
|
||
void Messenger::ListenForConnection(const ConnectionDestination &target, const std::string &identity, | ||
CallbackFn callback) { | ||
std::lock_guard lock(routers_add_mutex_); | ||
std::unique_lock lock(routers_add_mutex_); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a little confused by this part. If we're taking the time to get a latch, why not just have the latch be for routers_
and insert it directly into that? Is there something I'm missing here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a comment, if it makes sense?
…tead of destination callback ID, in line with the model server manager.
Major Decrease in PerformanceSTOP: this PR has a major negative performance impact
|
Codecov Report
@@ Coverage Diff @@
## master #1472 +/- ##
==========================================
- Coverage 81.53% 81.04% -0.50%
==========================================
Files 681 685 +4
Lines 48251 48640 +389
==========================================
+ Hits 39340 39418 +78
- Misses 8911 9222 +311
Continue to review full report at Codecov.
|
src/messenger/messenger.cpp
Outdated
} | ||
|
||
void Messenger::ListenForConnection(const ConnectionDestination &target, const std::string &identity, | ||
CallbackFn callback) { | ||
std::lock_guard lock(routers_add_mutex_); | ||
std::unique_lock lock(routers_add_mutex_); | ||
// TODO(WAN): all this copying is stupid. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you give a bit more on what you'd do to fix it (if you know) and what the restriction on implementing it now is (it's okay if it's just time).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, just saw this. I think this was an old comment that wanted to avoid the indirection of
- T1: invokes
ListenForConnection
trying to add a new connection endpoint, which enqueues the router to be added - dedicated messenger thread T2: grabs it off the queue, adds it
In addressing Joe's comment, I added documentation that notes why this is necessary (iirc zeromq requires that sockets are used on the thread where they are created)
@@ -153,7 +166,7 @@ class LogManager : public common::DedicatedThreadOwner { | |||
std::vector<BufferedLogWriter> buffers_; | |||
// The queue containing empty buffers which the serializer thread will use. We use a blocking queue because the | |||
// serializer thread should block when requesting a new buffer until it receives an empty buffer | |||
common::ConcurrentBlockingQueue<BufferedLogWriter *> empty_buffer_queue_; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mbutrovich This got pulled out to DBMain so that other components, e.g., replication manager, can return buffers to it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do we believe the argument for this data structure is? Performance over malloc
? Back-pressure on the serializer if the disk consumer falls behind?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
*was, I believe it was backpressure. This is more or less leaving things as they are.
* | ||
* @param policy The retention policy that describes the destinations for this BufferedLogWriter. | ||
*/ | ||
void PrepareForSerialization(transaction::RetentionPolicy policy) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mbutrovich The current retention policy thing.
Minor Decrease in PerformanceBe warned: this PR may have decreased the throughput of the system slightly.
|
All numbers on dev9. This PR
|
Main branch
|
Corrected numbers since I forgot rate limits were a thing: this branch no wal: main no wal: Within noise. |
Synchronous Replication
This PR adds support for synchronous replication to
noisepage
. Replication depends on the messenger and recovery manager being enabled.Summary
./noisepage --messenger_enable=true --replication_enable=true --port=15721 --messenger_port=9022 --replication_port=15445 --network_identity=primary
./noisepage --messenger_enable=true --replication_enable=true --port=15722 --messenger_port=9023 --replication_port=15446 --network_identity=replica1
./noisepage --messenger_enable=true --replication_enable=true --port=15723 --messenger_port=9024 --replication_port=15447 --network_identity=replica2
To add more primaries/replicas, see
build-support/data/replication.config
. Note that this is copied to your build/bin folder as a post-build ofnoisepage
.You should be able to run queries on the primary and run basic read-only things on the replicas.
Note that there is currently no "I'm missing logs xyz" mechanism, so replicas have to have seen all the primary's logs as they were generated and shipped out.
Background
Description
Fix jumbotest_messenger_messenger_test timesout on jenkins #1319. I believe this was caused by the Messenger's
ListenForConnection
being a non-blocking add to the list of routers. This changes it so that theListenForConnection
waits on acvar
which the messenger's main loop will signal whenever it grabs and adds new routers.Adds a new builtin
replication_get_last_record_id()
. On the primary, this returns the last record ID that was sent out, and on replicas, this returns the last record ID that was applied. This is used in testing to see if the replicas are up to date. This is implemented by pushing a pointer to the replication manager into the execution context.Most of the PR is in
replication/replication_manager
andstorage/replication_log_provider
.Adds a new test
script/testing/replication/tests_simple
, which will run under the simple unittests part of CI as well. No end to end oltpbench yet.Adds the idea of a transaction-wide
RetentionPolicy
, which is however not currently able to distinguish between "serialize locally, don't replicate" and "serialize locally, also replicate". We probably have to revisit the implementation. The idea is that at a per-transaction level, you describe whether (1) you serialize and replicate logs (2) you only serialize logs or (3) you don't serialize logs at all.Future work, future PR
RetentionPolicy
is basically just a flag right now, we need to figure out more details on doing it properly.