-
Notifications
You must be signed in to change notification settings - Fork 180
RUST-585 Refactor Topology implementation to actor model #628
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
RUST-585 Refactor Topology implementation to actor model #628
Conversation
@@ -119,7 +119,7 @@ version = "0.11.5" | |||
optional = true | |||
|
|||
[dependencies.tokio] | |||
version = "1.4.0" | |||
version = "1.17.0" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was done to get some newer functionality in the sync::watch
channel. It's not strictly necessary, but I figured there wasn't any risk in this, as I doubt any users have strict tokio dependency requirements.
@@ -103,12 +103,6 @@ struct ClientInner { | |||
session_pool: ServerSessionPool, | |||
} | |||
|
|||
impl Drop for ClientInner { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will happen automatically now.
@@ -446,8 +434,10 @@ impl Client { | |||
return Err(first_error); | |||
} | |||
|
|||
let txn_number = prior_txn_number.or_else(|| get_txn_number(session, retryability)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure why these changes surfaced this problem, but basically if a commitTransaction
retry failed during connection acquisition with a retryable error, the txnNumber would never get set and the user would see an error. This now checks to see if there was a txnNumber from before and if not, gets a new one from the session.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So this was actually unrelated, but it seems to be fixed now. Filed RUST-1274 to track this separately.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we backport this fix to 2.2?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, I added a comment to RUST-1274 as a reminder. Once this is merged I'll do the backport.
@@ -696,46 +701,6 @@ impl From<PoolManagementRequest> for PoolTask { | |||
} | |||
} | |||
|
|||
/// Constructs a new channel for for monitoring whether this pool still has references | |||
/// to it. | |||
fn handle_channel() -> (PoolWorkerHandle, HandleListener) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this was abstracted to a common type in the runtime
module for use with the topology worker
@@ -133,6 +135,7 @@ where | |||
pub(crate) struct HelloReply { | |||
pub server_address: ServerAddress, | |||
pub command_response: HelloCommandResponse, | |||
pub raw_command_response: RawDocumentBuf, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is included as part of the HelloReply
so that the monitors can emit SDAM events instead of doing it in the handshaker or the common functions in the hello
module.
|
||
/// Handle used to request that monitors perform immediate checks of the topology. | ||
#[derive(Clone, Debug)] | ||
struct TopologyCheckRequester { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This type wasn't mentioned in the design, but it's a channel used to request topology checks. It was necessary to avoid having to go through the topology worker to request checks (the messages go straight to the monitors)
/// | ||
/// This is used to determine the error handling semantics for certain error types. | ||
#[derive(Debug, Clone)] | ||
pub(crate) enum HandshakePhase { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this type was copy/pasted
/// If the topology has been closed, events emitted via this handle will not be sent to | ||
/// handlers. | ||
#[derive(Clone)] | ||
pub(crate) struct SdamEventEmitter { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this struct was required, instead of just emitting events directly from handlers, to ensure no events were emitted after the last TopologyClosedEvent
. I originally tried using TopologyWatcher
to detect if the topology was still alive before emitting events (similar to the existing implementation), but that proved to be too racy.
this also has the added benefit of preventing users / us from blocking the TopologyWorker
via our SdamEventHandler
implementations.
@@ -0,0 +1,949 @@ | |||
use std::{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To summarize this file, we have the following types:
Topology
: a complete handle to the topology. Allows the executor to select servers, update the topology based on appliction errors, request monitor checks, and keep the worker task running. Does so via the other handle types.TopologyWorker
: the aforementioned worker / actor task that process updates to the topology and publishes new statesTopologyUpdater
: used to send new server descriptions, application errors, and monitor errors to the topology. Accessed by the executor viaTopology
and directly by server monitors, SRV polling monitors, and connection poolsTopologyWatcher
: used to observe the latest published state of the topology. Used by the executor viaTopology
and by server monitors directly.TopologyCheckRequester
: used by the executor to request immediate monitor checks when server selection failsSdamEventEmitter
: used by theTopologyWorker
and server monitors to publish SDAM eventsTopologyState
: "plain old data" containing a topology description and a hashmap of the servers. These are published whenever the topology is updated
To summarize the ownership:
Topology
TopologyUpdater
TopologyWatcher
TopologyCheckRequester
WorkerHandle
TopologyWorker
SdamEventEmitter
- Server monitors
TopologyWatcher
TopologyUpdater
SdamEventEmitter
- SRV polling monitor
TopologyUpdater
- Connection pool
TopologyUpdater
@@ -1,64 +0,0 @@ | |||
use std::time::Duration; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is no longer needed now that we have the new handles
|
||
let change_occurred = start_time.elapsed() < timeout | ||
&& watcher | ||
.wait_for_update(timeout - start_time.elapsed()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a potential race condition here? i.e. if the update happens after the request_update
call but before the wait_for_update
call, will this be waiting until some unrelated future update?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
each iteration of the loop marks the topology as "seen" in the clone_latest
call, so any update after that point (potentially before the request_update
call actually) will be accounted for in wait_for_update
.
@@ -208,3 +221,23 @@ pub trait SdamEventHandler: Send + Sync { | |||
/// a server heartbeat fails. | |||
fn handle_server_heartbeat_failed_event(&self, _event: ServerHeartbeatFailedEvent) {} | |||
} | |||
|
|||
pub(crate) fn handle_sdam_event(handler: &dyn SdamEventHandler, event: SdamEvent) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any particular reason this accepts a &dyn SdamEventHandler
instead of a generic bound?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We accept handlers in client options as Arc<dyn SdamEventHandler>
so we can't get their concrete type from there.
pub(crate) fn watch(&self) -> TopologyWatcher { | ||
let mut watcher = self.watcher.clone(); | ||
// mark the latest topology as seen | ||
watcher.receiver.borrow_and_update(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't follow why this is needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is to ensure that any calls to wait_for_update
on the returned TopologyWatcher
will block until a new state is published and not return immediately.
|
||
/// Clone the latest state, marking it as seen. | ||
pub(crate) fn clone_latest(&mut self) -> TopologyState { | ||
self.receiver.borrow_and_update().clone() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand the usage of the receiver's "seen" flag. It's set in clone_latest
but not borrow_latest
or server_description
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The "seen" flag determines whether the currently observed TopologtState
would be considered "new" or not for the purposes of wait_for_update
. If we've called clone_latest
and then immediately call wait_for_update
, it'll block. If we haven't, it'll return immediately.
Not setting it in borrow_latest
and server_description
was more of an ergonomic choice, since it would require those methods to be &mut self
.
To make this a bit clearer, I renamed the methods to observe_latest
(formerly clone_latest
) and peek_latest
(formerly borrow_latest
). Let me know if you think these are more helpful / if there are better ones.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Much clearer, thank you :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! Definitely much easier to follow, and thank you for explaining the bits I didn't get.
|
||
/// Clone the latest state, marking it as seen. | ||
pub(crate) fn clone_latest(&mut self) -> TopologyState { | ||
self.receiver.borrow_and_update().clone() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Much clearer, thank you :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
few minor questions but besides those LGTM!
@@ -446,8 +434,10 @@ impl Client { | |||
return Err(first_error); | |||
} | |||
|
|||
let txn_number = prior_txn_number.or_else(|| get_txn_number(session, retryability)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we backport this fix to 2.2?
command: &mut Command<T>, | ||
criteria: Option<&SelectionCriteria>, | ||
) { | ||
let server_type = self | ||
.get_server_description(address) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
when does this return None
? would that be an (I think unexpected case) where we somehow selected a server but the TopologyDescription
doesn't contain it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, this would be really rare, but basically if a server were to be removed from the topology after we had selected it + checked out a connection from it (e.g. if a monitor check comes back that removes it from the topology). This can happen in the normal course of operations, so I think we need to handle it here. We could decide to return an error instead of defaulting to Unknown
to try to prevent the operation from executing, but it's not entirely clear if that's what we want or not.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah that makes sense. I think considering it unknown makes sense then. thanks!
@@ -580,19 +586,23 @@ async fn topology_closed_event_last() { | |||
drop(client); | |||
|
|||
subscriber | |||
.wait_for_event(Duration::from_millis(500), |event| { | |||
.wait_for_event(Duration::from_millis(5000), |event| { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
was this failing with the lower timeout / any hypotheses as to why?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh I think I changed this when I was debugging an earlier version of the test, changed back to 500.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks good, just one question
src/runtime/acknowledged_message.rs
Outdated
/// Borrow the message. | ||
pub(crate) fn message(&self) -> &M { | ||
/// Send acknowledgement to the receiver. | ||
#[allow(dead_code)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
any reason for keeping this around even though it's unused?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh good catch, we actually do use this method in the connection pool, so this dead_code ignore can just be removed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To try to preserve the review comment / commit history, I did a regular merge commit to fix the merge conflicts. Will still squash it all down to a single commit at the end though.
src/runtime/acknowledged_message.rs
Outdated
/// Borrow the message. | ||
pub(crate) fn message(&self) -> &M { | ||
/// Send acknowledgement to the receiver. | ||
#[allow(dead_code)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh good catch, we actually do use this method in the connection pool, so this dead_code ignore can just be removed.
@@ -580,19 +586,23 @@ async fn topology_closed_event_last() { | |||
drop(client); | |||
|
|||
subscriber | |||
.wait_for_event(Duration::from_millis(500), |event| { | |||
.wait_for_event(Duration::from_millis(5000), |event| { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh I think I changed this when I was debugging an earlier version of the test, changed back to 500.
command: &mut Command<T>, | ||
criteria: Option<&SelectionCriteria>, | ||
) { | ||
let server_type = self | ||
.get_server_description(address) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, this would be really rare, but basically if a server were to be removed from the topology after we had selected it + checked out a connection from it (e.g. if a monitor check comes back that removes it from the topology). This can happen in the normal course of operations, so I think we need to handle it here. We could decide to return an error instead of defaulting to Unknown
to try to prevent the operation from executing, but it's not entirely clear if that's what we want or not.
@@ -446,8 +434,10 @@ impl Client { | |||
return Err(first_error); | |||
} | |||
|
|||
let txn_number = prior_txn_number.or_else(|| get_txn_number(session, retryability)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, I added a comment to RUST-1274 as a reminder. Once this is merged I'll do the backport.
RUST-585
This PR refactors the Topology implementation to follow an actor model (similar to the connection pool) rather than a lock based one, hopefully making the code easier to maintain and reason about.