-
Notifications
You must be signed in to change notification settings - Fork 180
RUST-585 Refactor Topology implementation to actor model #628
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
a02a7b2
4e40828
cf68eed
738e39b
caf13a3
f07192b
f779a40
feaf3af
8a59dd6
d87610c
ace147b
9461a72
4e96ecc
5fd4a1b
083251c
b217db4
e69c939
891964a
d7ac989
e25015d
7f1e962
752418a
f2c57b3
d80a86d
ed4196d
9c18b9d
cd418e9
bef78f7
873f697
3465eea
8c2775e
fda2683
0d2ff38
10649dc
f05a507
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -351,27 +351,15 @@ impl Client { | |
|
||
let retryability = self.get_retryability(&conn, &op, &session)?; | ||
|
||
let txn_number = match session { | ||
Some(ref mut session) => { | ||
if session.transaction.state != TransactionState::None { | ||
Some(session.txn_number()) | ||
} else { | ||
match retryability { | ||
Retryability::Write => Some(session.get_and_increment_txn_number()), | ||
_ => None, | ||
} | ||
} | ||
} | ||
None => None, | ||
}; | ||
let txn_number = get_txn_number(&mut session, retryability); | ||
|
||
match self | ||
.execute_operation_on_connection( | ||
&mut op, | ||
&mut conn, | ||
&mut session, | ||
txn_number, | ||
&retryability, | ||
retryability, | ||
) | ||
.await | ||
{ | ||
|
@@ -398,9 +386,9 @@ impl Client { | |
self.inner | ||
.topology | ||
.handle_application_error( | ||
server.address.clone(), | ||
err.clone(), | ||
HandshakePhase::after_completion(&conn), | ||
&server, | ||
) | ||
.await; | ||
// release the connection to be processed by the connection pool | ||
|
@@ -424,7 +412,7 @@ impl Client { | |
&self, | ||
op: &mut T, | ||
session: &mut Option<&mut ClientSession>, | ||
txn_number: Option<i64>, | ||
prior_txn_number: Option<i64>, | ||
first_error: Error, | ||
) -> Result<ExecutionOutput<T>> { | ||
op.update_for_retry(); | ||
|
@@ -446,8 +434,10 @@ impl Client { | |
return Err(first_error); | ||
} | ||
|
||
let txn_number = prior_txn_number.or_else(|| get_txn_number(session, retryability)); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure why these changes surfaced this problem, but basically if a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So this was actually unrelated, but it seems to be fixed now. Filed RUST-1274 to track this separately. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should we backport this fix to 2.2? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yep, I added a comment to RUST-1274 as a reminder. Once this is merged I'll do the backport. |
||
|
||
match self | ||
.execute_operation_on_connection(op, &mut conn, session, txn_number, &retryability) | ||
.execute_operation_on_connection(op, &mut conn, session, txn_number, retryability) | ||
.await | ||
{ | ||
Ok(operation_output) => Ok(ExecutionOutput { | ||
|
@@ -458,9 +448,9 @@ impl Client { | |
self.inner | ||
.topology | ||
.handle_application_error( | ||
server.address.clone(), | ||
err.clone(), | ||
HandshakePhase::after_completion(&conn), | ||
&server, | ||
) | ||
.await; | ||
drop(server); | ||
|
@@ -481,7 +471,7 @@ impl Client { | |
connection: &mut Connection, | ||
session: &mut Option<&mut ClientSession>, | ||
txn_number: Option<i64>, | ||
retryability: &Retryability, | ||
retryability: Retryability, | ||
) -> Result<T::O> { | ||
if let Some(wc) = op.write_concern() { | ||
wc.validate()?; | ||
|
@@ -490,10 +480,11 @@ impl Client { | |
let stream_description = connection.stream_description()?; | ||
let is_sharded = stream_description.initial_server_type == ServerType::Mongos; | ||
let mut cmd = op.build(stream_description)?; | ||
self.inner | ||
.topology | ||
.update_command_with_read_pref(connection.address(), &mut cmd, op.selection_criteria()) | ||
.await; | ||
self.inner.topology.update_command_with_read_pref( | ||
connection.address(), | ||
&mut cmd, | ||
op.selection_criteria(), | ||
); | ||
|
||
match session { | ||
Some(ref mut session) if op.supports_sessions() && op.is_acknowledged() => { | ||
|
@@ -587,7 +578,7 @@ impl Client { | |
} | ||
|
||
let session_cluster_time = session.as_ref().and_then(|session| session.cluster_time()); | ||
let client_cluster_time = self.inner.topology.cluster_time().await; | ||
let client_cluster_time = self.inner.topology.cluster_time(); | ||
let max_cluster_time = std::cmp::max(session_cluster_time, client_cluster_time.as_ref()); | ||
if let Some(cluster_time) = max_cluster_time { | ||
cmd.set_cluster_time(cluster_time); | ||
|
@@ -788,7 +779,7 @@ impl Client { | |
} | ||
|
||
async fn select_data_bearing_server(&self) -> Result<()> { | ||
let topology_type = self.inner.topology.topology_type().await; | ||
let topology_type = self.inner.topology.topology_type(); | ||
let criteria = SelectionCriteria::Predicate(Arc::new(move |server_info| { | ||
let server_type = server_info.server_type(); | ||
(matches!(topology_type, TopologyType::Single) && server_type.is_available()) | ||
|
@@ -802,14 +793,14 @@ impl Client { | |
/// session timeout. If it has yet to be determined if the topology supports sessions, this | ||
/// method will perform a server selection that will force that determination to be made. | ||
pub(crate) async fn get_session_support_status(&self) -> Result<SessionSupportStatus> { | ||
let initial_status = self.inner.topology.session_support_status().await; | ||
let initial_status = self.inner.topology.session_support_status(); | ||
|
||
// Need to guarantee that we're connected to at least one server that can determine if | ||
// sessions are supported or not. | ||
match initial_status { | ||
SessionSupportStatus::Undetermined => { | ||
self.select_data_bearing_server().await?; | ||
Ok(self.inner.topology.session_support_status().await) | ||
Ok(self.inner.topology.session_support_status()) | ||
} | ||
_ => Ok(initial_status), | ||
} | ||
|
@@ -819,14 +810,14 @@ impl Client { | |
/// topology supports transactions, this method will perform a server selection that will force | ||
/// that determination to be made. | ||
pub(crate) async fn transaction_support_status(&self) -> Result<TransactionSupportStatus> { | ||
let initial_status = self.inner.topology.transaction_support_status().await; | ||
let initial_status = self.inner.topology.transaction_support_status(); | ||
|
||
// Need to guarantee that we're connected to at least one server that can determine if | ||
// sessions are supported or not. | ||
match initial_status { | ||
TransactionSupportStatus::Undetermined => { | ||
self.select_data_bearing_server().await?; | ||
Ok(self.inner.topology.transaction_support_status().await) | ||
Ok(self.inner.topology.transaction_support_status()) | ||
} | ||
_ => Ok(initial_status), | ||
} | ||
|
@@ -885,7 +876,10 @@ impl Client { | |
session: &mut Option<&mut ClientSession>, | ||
) { | ||
if let Some(ref cluster_time) = cluster_time { | ||
self.inner.topology.advance_cluster_time(cluster_time).await; | ||
self.inner | ||
.topology | ||
.advance_cluster_time(cluster_time.clone()) | ||
.await; | ||
if let Some(ref mut session) = session { | ||
session.advance_cluster_time(cluster_time) | ||
} | ||
|
@@ -918,6 +912,25 @@ async fn get_connection<T: Operation>( | |
} | ||
} | ||
|
||
fn get_txn_number( | ||
session: &mut Option<&mut ClientSession>, | ||
retryability: Retryability, | ||
) -> Option<i64> { | ||
match session { | ||
Some(ref mut session) => { | ||
if session.transaction.state != TransactionState::None { | ||
Some(session.txn_number()) | ||
} else { | ||
match retryability { | ||
Retryability::Write => Some(session.get_and_increment_txn_number()), | ||
_ => None, | ||
} | ||
} | ||
} | ||
None => None, | ||
} | ||
} | ||
|
||
impl Error { | ||
/// Adds the necessary labels to this Error, and unpins the session if needed. | ||
/// | ||
|
@@ -936,7 +949,7 @@ impl Error { | |
&mut self, | ||
conn: Option<&Connection>, | ||
session: &mut Option<&mut ClientSession>, | ||
retryability: Option<&Retryability>, | ||
retryability: Option<Retryability>, | ||
) -> Result<()> { | ||
let transaction_state = session.as_ref().map_or(&TransactionState::None, |session| { | ||
&session.transaction.state | ||
|
@@ -970,7 +983,7 @@ impl Error { | |
} | ||
} | ||
TransactionState::None => { | ||
if retryability == Some(&Retryability::Write) { | ||
if retryability == Some(Retryability::Write) { | ||
if let Some(max_wire_version) = max_wire_version { | ||
if self.should_add_retryable_write_label(max_wire_version) { | ||
self.add_label(RETRYABLE_WRITE_ERROR); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -34,7 +34,7 @@ use crate::{ | |
SessionOptions, | ||
}, | ||
results::DatabaseSpecification, | ||
sdam::{SelectedServer, SessionSupportStatus, Topology}, | ||
sdam::{server_selection, SelectedServer, SessionSupportStatus, Topology}, | ||
ClientSession, | ||
}; | ||
pub(crate) use executor::{HELLO_COMMAND_NAMES, REDACTED_COMMANDS}; | ||
|
@@ -103,12 +103,6 @@ struct ClientInner { | |
session_pool: ServerSessionPool, | ||
} | ||
|
||
impl Drop for ClientInner { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This will happen automatically now. |
||
fn drop(&mut self) { | ||
self.topology.close() | ||
} | ||
} | ||
|
||
impl Client { | ||
/// Creates a new `Client` connected to the cluster specified by `uri`. `uri` must be a valid | ||
/// MongoDB connection string. | ||
|
@@ -323,7 +317,7 @@ impl Client { | |
/// If the session is expired or dirty, or the topology no longer supports sessions, the session | ||
/// will be discarded. | ||
pub(crate) async fn check_in_server_session(&self, session: ServerSession) { | ||
let session_support_status = self.inner.topology.session_support_status().await; | ||
let session_support_status = self.inner.topology.session_support_status(); | ||
if let SessionSupportStatus::Supported { | ||
logical_session_timeout, | ||
} = session_support_status | ||
|
@@ -391,40 +385,30 @@ impl Client { | |
.server_selection_timeout | ||
.unwrap_or(DEFAULT_SERVER_SELECTION_TIMEOUT); | ||
|
||
let mut watcher = self.inner.topology.watch(); | ||
loop { | ||
let mut topology_change_subscriber = | ||
self.inner.topology.subscribe_to_topology_changes(); | ||
let state = watcher.observe_latest(); | ||
|
||
let selected_server = self | ||
.inner | ||
.topology | ||
.attempt_to_select_server(criteria) | ||
.await?; | ||
|
||
if let Some(server) = selected_server { | ||
if let Some(server) = server_selection::attempt_to_select_server( | ||
criteria, | ||
&state.description, | ||
&state.servers, | ||
)? { | ||
return Ok(server); | ||
} | ||
|
||
self.inner.topology.request_topology_check(); | ||
|
||
// If the time that has passed since the start of the loop is greater than the timeout, | ||
// then `time_remaining` will be 0, so no change will be found. | ||
let time_passed = start_time.elapsed(); | ||
let time_remaining = timeout | ||
.checked_sub(time_passed) | ||
.unwrap_or_else(|| Duration::from_millis(0)); | ||
|
||
let change_occurred = topology_change_subscriber | ||
.wait_for_message(time_remaining) | ||
.await; | ||
self.inner.topology.request_update(); | ||
|
||
let change_occurred = start_time.elapsed() < timeout | ||
&& watcher | ||
.wait_for_update(timeout - start_time.elapsed()) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there a potential race condition here? i.e. if the update happens after the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. each iteration of the loop marks the topology as "seen" in the |
||
.await; | ||
if !change_occurred { | ||
return Err(ErrorKind::ServerSelection { | ||
message: self | ||
.inner | ||
.topology | ||
.server_selection_timeout_error_message(criteria) | ||
.await, | ||
.server_selection_timeout_error_message(criteria), | ||
} | ||
.into()); | ||
} | ||
|
@@ -433,9 +417,11 @@ impl Client { | |
|
||
#[cfg(all(test, not(feature = "sync"), not(feature = "tokio-sync")))] | ||
pub(crate) async fn get_hosts(&self) -> Vec<String> { | ||
let servers = self.inner.topology.servers().await; | ||
let watcher = self.inner.topology.watch(); | ||
let state = watcher.peek_latest(); | ||
|
||
let servers = state.servers.keys(); | ||
servers | ||
.iter() | ||
.map(|stream_address| format!("{}", stream_address)) | ||
.collect() | ||
} | ||
|
@@ -446,7 +432,12 @@ impl Client { | |
} | ||
|
||
#[cfg(test)] | ||
pub(crate) async fn topology_description(&self) -> crate::sdam::TopologyDescription { | ||
self.inner.topology.description().await | ||
pub(crate) fn topology_description(&self) -> crate::sdam::TopologyDescription { | ||
self.inner | ||
.topology | ||
.watch() | ||
.peek_latest() | ||
.description | ||
.clone() | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was done to get some newer functionality in the
sync::watch
channel. It's not strictly necessary, but I figured there wasn't any risk in this, as I doubt any users have strict tokio dependency requirements.