Skip to content

RUST-585 Refactor Topology implementation to actor model #628

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 35 commits into from
May 2, 2022

Conversation

patrickfreed
Copy link
Contributor

RUST-585

This PR refactors the Topology implementation to follow an actor model (similar to the connection pool) rather than a lock based one, hopefully making the code easier to maintain and reason about.

@@ -119,7 +119,7 @@ version = "0.11.5"
optional = true

[dependencies.tokio]
version = "1.4.0"
version = "1.17.0"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was done to get some newer functionality in the sync::watch channel. It's not strictly necessary, but I figured there wasn't any risk in this, as I doubt any users have strict tokio dependency requirements.

@@ -103,12 +103,6 @@ struct ClientInner {
session_pool: ServerSessionPool,
}

impl Drop for ClientInner {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will happen automatically now.

@@ -446,8 +434,10 @@ impl Client {
return Err(first_error);
}

let txn_number = prior_txn_number.or_else(|| get_txn_number(session, retryability));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure why these changes surfaced this problem, but basically if a commitTransaction retry failed during connection acquisition with a retryable error, the txnNumber would never get set and the user would see an error. This now checks to see if there was a txnNumber from before and if not, gets a new one from the session.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this was actually unrelated, but it seems to be fixed now. Filed RUST-1274 to track this separately.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we backport this fix to 2.2?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, I added a comment to RUST-1274 as a reminder. Once this is merged I'll do the backport.

@@ -696,46 +701,6 @@ impl From<PoolManagementRequest> for PoolTask {
}
}

/// Constructs a new channel for for monitoring whether this pool still has references
/// to it.
fn handle_channel() -> (PoolWorkerHandle, HandleListener) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this was abstracted to a common type in the runtime module for use with the topology worker

@@ -133,6 +135,7 @@ where
pub(crate) struct HelloReply {
pub server_address: ServerAddress,
pub command_response: HelloCommandResponse,
pub raw_command_response: RawDocumentBuf,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is included as part of the HelloReply so that the monitors can emit SDAM events instead of doing it in the handshaker or the common functions in the hello module.


/// Handle used to request that monitors perform immediate checks of the topology.
#[derive(Clone, Debug)]
struct TopologyCheckRequester {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This type wasn't mentioned in the design, but it's a channel used to request topology checks. It was necessary to avoid having to go through the topology worker to request checks (the messages go straight to the monitors)

///
/// This is used to determine the error handling semantics for certain error types.
#[derive(Debug, Clone)]
pub(crate) enum HandshakePhase {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this type was copy/pasted

/// If the topology has been closed, events emitted via this handle will not be sent to
/// handlers.
#[derive(Clone)]
pub(crate) struct SdamEventEmitter {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this struct was required, instead of just emitting events directly from handlers, to ensure no events were emitted after the last TopologyClosedEvent. I originally tried using TopologyWatcher to detect if the topology was still alive before emitting events (similar to the existing implementation), but that proved to be too racy.

this also has the added benefit of preventing users / us from blocking the TopologyWorker via our SdamEventHandler implementations.

@@ -0,0 +1,949 @@
use std::{
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To summarize this file, we have the following types:

  • Topology: a complete handle to the topology. Allows the executor to select servers, update the topology based on appliction errors, request monitor checks, and keep the worker task running. Does so via the other handle types.
  • TopologyWorker: the aforementioned worker / actor task that process updates to the topology and publishes new states
  • TopologyUpdater: used to send new server descriptions, application errors, and monitor errors to the topology. Accessed by the executor via Topology and directly by server monitors, SRV polling monitors, and connection pools
  • TopologyWatcher: used to observe the latest published state of the topology. Used by the executor via Topology and by server monitors directly.
  • TopologyCheckRequester: used by the executor to request immediate monitor checks when server selection fails
  • SdamEventEmitter: used by the TopologyWorker and server monitors to publish SDAM events
  • TopologyState: "plain old data" containing a topology description and a hashmap of the servers. These are published whenever the topology is updated

To summarize the ownership:

  • Topology
    • TopologyUpdater
    • TopologyWatcher
    • TopologyCheckRequester
    • WorkerHandle
  • TopologyWorker
    • SdamEventEmitter
  • Server monitors
    • TopologyWatcher
    • TopologyUpdater
    • SdamEventEmitter
  • SRV polling monitor
    • TopologyUpdater
  • Connection pool
    • TopologyUpdater

@@ -1,64 +0,0 @@
use std::time::Duration;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is no longer needed now that we have the new handles

@patrickfreed patrickfreed marked this pull request as ready for review April 19, 2022 17:49

let change_occurred = start_time.elapsed() < timeout
&& watcher
.wait_for_update(timeout - start_time.elapsed())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a potential race condition here? i.e. if the update happens after the request_update call but before the wait_for_update call, will this be waiting until some unrelated future update?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

each iteration of the loop marks the topology as "seen" in the clone_latest call, so any update after that point (potentially before the request_update call actually) will be accounted for in wait_for_update.

@@ -208,3 +221,23 @@ pub trait SdamEventHandler: Send + Sync {
/// a server heartbeat fails.
fn handle_server_heartbeat_failed_event(&self, _event: ServerHeartbeatFailedEvent) {}
}

pub(crate) fn handle_sdam_event(handler: &dyn SdamEventHandler, event: SdamEvent) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any particular reason this accepts a &dyn SdamEventHandler instead of a generic bound?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We accept handlers in client options as Arc<dyn SdamEventHandler> so we can't get their concrete type from there.

pub(crate) fn watch(&self) -> TopologyWatcher {
let mut watcher = self.watcher.clone();
// mark the latest topology as seen
watcher.receiver.borrow_and_update();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't follow why this is needed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to ensure that any calls to wait_for_update on the returned TopologyWatcher will block until a new state is published and not return immediately.


/// Clone the latest state, marking it as seen.
pub(crate) fn clone_latest(&mut self) -> TopologyState {
self.receiver.borrow_and_update().clone()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand the usage of the receiver's "seen" flag. It's set in clone_latest but not borrow_latest or server_description?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The "seen" flag determines whether the currently observed TopologtState would be considered "new" or not for the purposes of wait_for_update. If we've called clone_latest and then immediately call wait_for_update, it'll block. If we haven't, it'll return immediately.

Not setting it in borrow_latest and server_description was more of an ergonomic choice, since it would require those methods to be &mut self.

To make this a bit clearer, I renamed the methods to observe_latest (formerly clone_latest) and peek_latest (formerly borrow_latest). Let me know if you think these are more helpful / if there are better ones.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Much clearer, thank you :)

@patrickfreed patrickfreed requested a review from abr-egn April 20, 2022 21:09
Copy link
Contributor

@abr-egn abr-egn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! Definitely much easier to follow, and thank you for explaining the bits I didn't get.


/// Clone the latest state, marking it as seen.
pub(crate) fn clone_latest(&mut self) -> TopologyState {
self.receiver.borrow_and_update().clone()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Much clearer, thank you :)

Copy link
Contributor

@kmahar kmahar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

few minor questions but besides those LGTM!

@@ -446,8 +434,10 @@ impl Client {
return Err(first_error);
}

let txn_number = prior_txn_number.or_else(|| get_txn_number(session, retryability));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we backport this fix to 2.2?

command: &mut Command<T>,
criteria: Option<&SelectionCriteria>,
) {
let server_type = self
.get_server_description(address)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when does this return None? would that be an (I think unexpected case) where we somehow selected a server but the TopologyDescription doesn't contain it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this would be really rare, but basically if a server were to be removed from the topology after we had selected it + checked out a connection from it (e.g. if a monitor check comes back that removes it from the topology). This can happen in the normal course of operations, so I think we need to handle it here. We could decide to return an error instead of defaulting to Unknown to try to prevent the operation from executing, but it's not entirely clear if that's what we want or not.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah that makes sense. I think considering it unknown makes sense then. thanks!

@@ -580,19 +586,23 @@ async fn topology_closed_event_last() {
drop(client);

subscriber
.wait_for_event(Duration::from_millis(500), |event| {
.wait_for_event(Duration::from_millis(5000), |event| {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

was this failing with the lower timeout / any hypotheses as to why?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I think I changed this when I was debugging an earlier version of the test, changed back to 500.

Copy link
Contributor

@isabelatkinson isabelatkinson left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks good, just one question

/// Borrow the message.
pub(crate) fn message(&self) -> &M {
/// Send acknowledgement to the receiver.
#[allow(dead_code)]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any reason for keeping this around even though it's unused?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh good catch, we actually do use this method in the connection pool, so this dead_code ignore can just be removed.

Copy link
Contributor Author

@patrickfreed patrickfreed left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To try to preserve the review comment / commit history, I did a regular merge commit to fix the merge conflicts. Will still squash it all down to a single commit at the end though.

/// Borrow the message.
pub(crate) fn message(&self) -> &M {
/// Send acknowledgement to the receiver.
#[allow(dead_code)]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh good catch, we actually do use this method in the connection pool, so this dead_code ignore can just be removed.

@@ -580,19 +586,23 @@ async fn topology_closed_event_last() {
drop(client);

subscriber
.wait_for_event(Duration::from_millis(500), |event| {
.wait_for_event(Duration::from_millis(5000), |event| {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I think I changed this when I was debugging an earlier version of the test, changed back to 500.

command: &mut Command<T>,
criteria: Option<&SelectionCriteria>,
) {
let server_type = self
.get_server_description(address)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this would be really rare, but basically if a server were to be removed from the topology after we had selected it + checked out a connection from it (e.g. if a monitor check comes back that removes it from the topology). This can happen in the normal course of operations, so I think we need to handle it here. We could decide to return an error instead of defaulting to Unknown to try to prevent the operation from executing, but it's not entirely clear if that's what we want or not.

@@ -446,8 +434,10 @@ impl Client {
return Err(first_error);
}

let txn_number = prior_txn_number.or_else(|| get_txn_number(session, retryability));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, I added a comment to RUST-1274 as a reminder. Once this is merged I'll do the backport.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants