Skip to content

RUST-585 Refactor Topology implementation to actor model #628

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 35 commits into from
May 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
a02a7b2
wip
patrickfreed Apr 4, 2022
4e40828
wip
patrickfreed Apr 5, 2022
cf68eed
new topology hooked up to server selection
patrickfreed Apr 7, 2022
738e39b
use new topology everywhere
patrickfreed Apr 8, 2022
caf13a3
update tests
patrickfreed Apr 8, 2022
f07192b
crud passing
patrickfreed Apr 8, 2022
f779a40
wip sdam tests
patrickfreed Apr 8, 2022
feaf3af
wait for acknowledgment in updater methods, sdam tests more wip
patrickfreed Apr 12, 2022
8a59dd6
wip fix dropping
patrickfreed Apr 12, 2022
d87610c
sdam tests passing
patrickfreed Apr 13, 2022
ace147b
all standalone tests passing
patrickfreed Apr 13, 2022
9461a72
remove debug print, fix srv tests, disable cs doc test
patrickfreed Apr 13, 2022
4e96ecc
fix clippy
patrickfreed Apr 14, 2022
5fd4a1b
add comments, more cleanup
patrickfreed Apr 14, 2022
083251c
delete old sdam/state module
patrickfreed Apr 14, 2022
b217db4
cleanup
patrickfreed Apr 14, 2022
e69c939
only emit events if topology is alive, add better debug
patrickfreed Apr 14, 2022
891964a
drop the broadcaster before emitting closed event
patrickfreed Apr 14, 2022
d7ac989
serialize sdam event emission, get txn number if retrying connection
patrickfreed Apr 15, 2022
e25015d
fix clippy
patrickfreed Apr 15, 2022
7f1e962
Revert serialize sdam event emission
patrickfreed Apr 15, 2022
752418a
Revert "Revert serialize sdam event emission"
patrickfreed Apr 15, 2022
f2c57b3
unify publisher terminology
patrickfreed Apr 15, 2022
d80a86d
acknowledge event emission
patrickfreed Apr 18, 2022
ed4196d
dont acknowledge event emission when emitting events from topology
patrickfreed Apr 18, 2022
9c18b9d
cleanup
patrickfreed Apr 18, 2022
cd418e9
fix racy test
patrickfreed Apr 18, 2022
bef78f7
fix sdam auth_error test on replsets
patrickfreed Apr 18, 2022
873f697
remove unused message manager
patrickfreed Apr 18, 2022
3465eea
make watcher api clearer
patrickfreed Apr 20, 2022
8c2775e
fmt
patrickfreed Apr 21, 2022
fda2683
remove unneeded dead_code ignore
patrickfreed May 2, 2022
0d2ff38
revert event timeout
patrickfreed May 2, 2022
10649dc
verify we saw both types of events
patrickfreed May 2, 2022
f05a507
Merge branch 'main' into RUST-585/refactor-sdam
patrickfreed May 2, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ version = "0.11.5"
optional = true

[dependencies.tokio]
version = "1.4.0"
version = "1.17.0"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was done to get some newer functionality in the sync::watch channel. It's not strictly necessary, but I figured there wasn't any risk in this, as I doubt any users have strict tokio dependency requirements.

features = ["io-util", "sync", "macros"]

[dependencies.tokio-rustls]
Expand Down
77 changes: 45 additions & 32 deletions src/client/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -351,27 +351,15 @@ impl Client {

let retryability = self.get_retryability(&conn, &op, &session)?;

let txn_number = match session {
Some(ref mut session) => {
if session.transaction.state != TransactionState::None {
Some(session.txn_number())
} else {
match retryability {
Retryability::Write => Some(session.get_and_increment_txn_number()),
_ => None,
}
}
}
None => None,
};
let txn_number = get_txn_number(&mut session, retryability);

match self
.execute_operation_on_connection(
&mut op,
&mut conn,
&mut session,
txn_number,
&retryability,
retryability,
)
.await
{
Expand All @@ -398,9 +386,9 @@ impl Client {
self.inner
.topology
.handle_application_error(
server.address.clone(),
err.clone(),
HandshakePhase::after_completion(&conn),
&server,
)
.await;
// release the connection to be processed by the connection pool
Expand All @@ -424,7 +412,7 @@ impl Client {
&self,
op: &mut T,
session: &mut Option<&mut ClientSession>,
txn_number: Option<i64>,
prior_txn_number: Option<i64>,
first_error: Error,
) -> Result<ExecutionOutput<T>> {
op.update_for_retry();
Expand All @@ -446,8 +434,10 @@ impl Client {
return Err(first_error);
}

let txn_number = prior_txn_number.or_else(|| get_txn_number(session, retryability));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure why these changes surfaced this problem, but basically if a commitTransaction retry failed during connection acquisition with a retryable error, the txnNumber would never get set and the user would see an error. This now checks to see if there was a txnNumber from before and if not, gets a new one from the session.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this was actually unrelated, but it seems to be fixed now. Filed RUST-1274 to track this separately.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we backport this fix to 2.2?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, I added a comment to RUST-1274 as a reminder. Once this is merged I'll do the backport.


match self
.execute_operation_on_connection(op, &mut conn, session, txn_number, &retryability)
.execute_operation_on_connection(op, &mut conn, session, txn_number, retryability)
.await
{
Ok(operation_output) => Ok(ExecutionOutput {
Expand All @@ -458,9 +448,9 @@ impl Client {
self.inner
.topology
.handle_application_error(
server.address.clone(),
err.clone(),
HandshakePhase::after_completion(&conn),
&server,
)
.await;
drop(server);
Expand All @@ -481,7 +471,7 @@ impl Client {
connection: &mut Connection,
session: &mut Option<&mut ClientSession>,
txn_number: Option<i64>,
retryability: &Retryability,
retryability: Retryability,
) -> Result<T::O> {
if let Some(wc) = op.write_concern() {
wc.validate()?;
Expand All @@ -490,10 +480,11 @@ impl Client {
let stream_description = connection.stream_description()?;
let is_sharded = stream_description.initial_server_type == ServerType::Mongos;
let mut cmd = op.build(stream_description)?;
self.inner
.topology
.update_command_with_read_pref(connection.address(), &mut cmd, op.selection_criteria())
.await;
self.inner.topology.update_command_with_read_pref(
connection.address(),
&mut cmd,
op.selection_criteria(),
);

match session {
Some(ref mut session) if op.supports_sessions() && op.is_acknowledged() => {
Expand Down Expand Up @@ -587,7 +578,7 @@ impl Client {
}

let session_cluster_time = session.as_ref().and_then(|session| session.cluster_time());
let client_cluster_time = self.inner.topology.cluster_time().await;
let client_cluster_time = self.inner.topology.cluster_time();
let max_cluster_time = std::cmp::max(session_cluster_time, client_cluster_time.as_ref());
if let Some(cluster_time) = max_cluster_time {
cmd.set_cluster_time(cluster_time);
Expand Down Expand Up @@ -788,7 +779,7 @@ impl Client {
}

async fn select_data_bearing_server(&self) -> Result<()> {
let topology_type = self.inner.topology.topology_type().await;
let topology_type = self.inner.topology.topology_type();
let criteria = SelectionCriteria::Predicate(Arc::new(move |server_info| {
let server_type = server_info.server_type();
(matches!(topology_type, TopologyType::Single) && server_type.is_available())
Expand All @@ -802,14 +793,14 @@ impl Client {
/// session timeout. If it has yet to be determined if the topology supports sessions, this
/// method will perform a server selection that will force that determination to be made.
pub(crate) async fn get_session_support_status(&self) -> Result<SessionSupportStatus> {
let initial_status = self.inner.topology.session_support_status().await;
let initial_status = self.inner.topology.session_support_status();

// Need to guarantee that we're connected to at least one server that can determine if
// sessions are supported or not.
match initial_status {
SessionSupportStatus::Undetermined => {
self.select_data_bearing_server().await?;
Ok(self.inner.topology.session_support_status().await)
Ok(self.inner.topology.session_support_status())
}
_ => Ok(initial_status),
}
Expand All @@ -819,14 +810,14 @@ impl Client {
/// topology supports transactions, this method will perform a server selection that will force
/// that determination to be made.
pub(crate) async fn transaction_support_status(&self) -> Result<TransactionSupportStatus> {
let initial_status = self.inner.topology.transaction_support_status().await;
let initial_status = self.inner.topology.transaction_support_status();

// Need to guarantee that we're connected to at least one server that can determine if
// sessions are supported or not.
match initial_status {
TransactionSupportStatus::Undetermined => {
self.select_data_bearing_server().await?;
Ok(self.inner.topology.transaction_support_status().await)
Ok(self.inner.topology.transaction_support_status())
}
_ => Ok(initial_status),
}
Expand Down Expand Up @@ -885,7 +876,10 @@ impl Client {
session: &mut Option<&mut ClientSession>,
) {
if let Some(ref cluster_time) = cluster_time {
self.inner.topology.advance_cluster_time(cluster_time).await;
self.inner
.topology
.advance_cluster_time(cluster_time.clone())
.await;
if let Some(ref mut session) = session {
session.advance_cluster_time(cluster_time)
}
Expand Down Expand Up @@ -918,6 +912,25 @@ async fn get_connection<T: Operation>(
}
}

fn get_txn_number(
session: &mut Option<&mut ClientSession>,
retryability: Retryability,
) -> Option<i64> {
match session {
Some(ref mut session) => {
if session.transaction.state != TransactionState::None {
Some(session.txn_number())
} else {
match retryability {
Retryability::Write => Some(session.get_and_increment_txn_number()),
_ => None,
}
}
}
None => None,
}
}

impl Error {
/// Adds the necessary labels to this Error, and unpins the session if needed.
///
Expand All @@ -936,7 +949,7 @@ impl Error {
&mut self,
conn: Option<&Connection>,
session: &mut Option<&mut ClientSession>,
retryability: Option<&Retryability>,
retryability: Option<Retryability>,
) -> Result<()> {
let transaction_state = session.as_ref().map_or(&TransactionState::None, |session| {
&session.transaction.state
Expand Down Expand Up @@ -970,7 +983,7 @@ impl Error {
}
}
TransactionState::None => {
if retryability == Some(&Retryability::Write) {
if retryability == Some(Retryability::Write) {
if let Some(max_wire_version) = max_wire_version {
if self.should_add_retryable_write_label(max_wire_version) {
self.add_label(RETRYABLE_WRITE_ERROR);
Expand Down
61 changes: 26 additions & 35 deletions src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use crate::{
SessionOptions,
},
results::DatabaseSpecification,
sdam::{SelectedServer, SessionSupportStatus, Topology},
sdam::{server_selection, SelectedServer, SessionSupportStatus, Topology},
ClientSession,
};
pub(crate) use executor::{HELLO_COMMAND_NAMES, REDACTED_COMMANDS};
Expand Down Expand Up @@ -103,12 +103,6 @@ struct ClientInner {
session_pool: ServerSessionPool,
}

impl Drop for ClientInner {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will happen automatically now.

fn drop(&mut self) {
self.topology.close()
}
}

impl Client {
/// Creates a new `Client` connected to the cluster specified by `uri`. `uri` must be a valid
/// MongoDB connection string.
Expand Down Expand Up @@ -323,7 +317,7 @@ impl Client {
/// If the session is expired or dirty, or the topology no longer supports sessions, the session
/// will be discarded.
pub(crate) async fn check_in_server_session(&self, session: ServerSession) {
let session_support_status = self.inner.topology.session_support_status().await;
let session_support_status = self.inner.topology.session_support_status();
if let SessionSupportStatus::Supported {
logical_session_timeout,
} = session_support_status
Expand Down Expand Up @@ -391,40 +385,30 @@ impl Client {
.server_selection_timeout
.unwrap_or(DEFAULT_SERVER_SELECTION_TIMEOUT);

let mut watcher = self.inner.topology.watch();
loop {
let mut topology_change_subscriber =
self.inner.topology.subscribe_to_topology_changes();
let state = watcher.observe_latest();

let selected_server = self
.inner
.topology
.attempt_to_select_server(criteria)
.await?;

if let Some(server) = selected_server {
if let Some(server) = server_selection::attempt_to_select_server(
criteria,
&state.description,
&state.servers,
)? {
return Ok(server);
}

self.inner.topology.request_topology_check();

// If the time that has passed since the start of the loop is greater than the timeout,
// then `time_remaining` will be 0, so no change will be found.
let time_passed = start_time.elapsed();
let time_remaining = timeout
.checked_sub(time_passed)
.unwrap_or_else(|| Duration::from_millis(0));

let change_occurred = topology_change_subscriber
.wait_for_message(time_remaining)
.await;
self.inner.topology.request_update();

let change_occurred = start_time.elapsed() < timeout
&& watcher
.wait_for_update(timeout - start_time.elapsed())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a potential race condition here? i.e. if the update happens after the request_update call but before the wait_for_update call, will this be waiting until some unrelated future update?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

each iteration of the loop marks the topology as "seen" in the clone_latest call, so any update after that point (potentially before the request_update call actually) will be accounted for in wait_for_update.

.await;
if !change_occurred {
return Err(ErrorKind::ServerSelection {
message: self
.inner
.topology
.server_selection_timeout_error_message(criteria)
.await,
.server_selection_timeout_error_message(criteria),
}
.into());
}
Expand All @@ -433,9 +417,11 @@ impl Client {

#[cfg(all(test, not(feature = "sync"), not(feature = "tokio-sync")))]
pub(crate) async fn get_hosts(&self) -> Vec<String> {
let servers = self.inner.topology.servers().await;
let watcher = self.inner.topology.watch();
let state = watcher.peek_latest();

let servers = state.servers.keys();
servers
.iter()
.map(|stream_address| format!("{}", stream_address))
.collect()
}
Expand All @@ -446,7 +432,12 @@ impl Client {
}

#[cfg(test)]
pub(crate) async fn topology_description(&self) -> crate::sdam::TopologyDescription {
self.inner.topology.description().await
pub(crate) fn topology_description(&self) -> crate::sdam::TopologyDescription {
self.inner
.topology
.watch()
.peek_latest()
.description
.clone()
}
}
2 changes: 1 addition & 1 deletion src/client/session/test/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,7 @@ async fn find_and_getmore_share_session() {
assert_eq!(getmore_session_id, session_id);
}

let topology_description = client.topology_description().await;
let topology_description = client.topology_description();
for (addr, server) in topology_description.servers {
if !server.server_type.is_data_bearing() {
continue;
Expand Down
3 changes: 2 additions & 1 deletion src/cmap/conn/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ impl RawCommandResponse {
.map_err(|_| Error::invalid_authentication_response(mechanism_name))
}

pub(crate) fn to_hello_reply(&self, round_trip_time: Duration) -> Result<HelloReply> {
pub(crate) fn into_hello_reply(self, round_trip_time: Duration) -> Result<HelloReply> {
match self.body::<CommandResponse<HelloCommandResponse>>() {
Ok(response) if response.is_success() => {
let server_address = self.source_address().clone();
Expand All @@ -254,6 +254,7 @@ impl RawCommandResponse {
command_response: response.body,
round_trip_time,
cluster_time,
raw_command_response: self.into_raw_document_buf(),
})
}
_ => match self.body::<CommandResponse<CommandErrorBody>>() {
Expand Down
10 changes: 4 additions & 6 deletions src/cmap/connection_requester.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
use tokio::sync::{mpsc, oneshot};

use super::{worker::PoolWorkerHandle, Connection};
use super::Connection;
use crate::{
error::{Error, Result},
runtime::AsyncJoinHandle,
runtime::{AsyncJoinHandle, WorkerHandle},
};

/// Returns a new requester/receiver pair.
pub(super) fn channel(
handle: PoolWorkerHandle,
) -> (ConnectionRequester, ConnectionRequestReceiver) {
pub(super) fn channel(handle: WorkerHandle) -> (ConnectionRequester, ConnectionRequestReceiver) {
let (sender, receiver) = mpsc::unbounded_channel();
(
ConnectionRequester {
Expand All @@ -26,7 +24,7 @@ pub(super) fn channel(
#[derive(Clone, Debug)]
pub(super) struct ConnectionRequester {
sender: mpsc::UnboundedSender<oneshot::Sender<ConnectionRequestResult>>,
_handle: PoolWorkerHandle,
_handle: WorkerHandle,
}

impl ConnectionRequester {
Expand Down
2 changes: 1 addition & 1 deletion src/cmap/establish/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ async fn speculative_auth_test(
);
pool_options.tls_options = CLIENT_OPTIONS.tls_options();

let description = client.topology_description().await;
let description = client.topology_description();

// if running against a replica set, use the primary to ensure the user creation has propagated.
let addr = match description.topology_type {
Expand Down
Loading