Skip to content

RUST-122 Support mongos pinning for sharded transactions #383

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 16 commits into from
Jul 19, 2021
Merged
58 changes: 45 additions & 13 deletions src/client/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,13 @@ use crate::{
Retryability,
},
options::SelectionCriteria,
sdam::{HandshakePhase, SelectedServer, SessionSupportStatus, TransactionSupportStatus},
sdam::{
HandshakePhase,
SelectedServer,
ServerType,
SessionSupportStatus,
TransactionSupportStatus,
},
selection_criteria::ReadPreference,
};

Expand Down Expand Up @@ -82,6 +88,7 @@ impl Client {
.into());
}
}

self.execute_operation_with_retry(op, Some(session)).await
}
None => {
Expand Down Expand Up @@ -126,18 +133,23 @@ impl Client {
}
}

let server = match self.select_server(op.selection_criteria()).await {
let selection_criteria = session
.as_ref()
.and_then(|s| s.transaction.pinned_mongos.as_ref())
.or_else(|| op.selection_criteria());

let server = match self.select_server(selection_criteria).await {
Ok(server) => server,
Err(mut err) => {
err.add_labels(None, &session, None)?;
err.add_labels_and_update_pin(None, &mut session, None)?;
return Err(err);
}
};

let mut conn = match server.pool.check_out().await {
Ok(conn) => conn,
Err(mut err) => {
err.add_labels(None, &session, None)?;
err.add_labels_and_update_pin(None, &mut session, None)?;

if err.is_pool_cleared() {
return self.execute_retry(&mut op, &mut session, None, err).await;
Expand Down Expand Up @@ -220,6 +232,8 @@ impl Client {
txn_number: Option<u64>,
first_error: Error,
) -> Result<T::O> {
op.update_for_retry();

let server = match self.select_server(op.selection_criteria()).await {
Ok(server) => server,
Err(_) => {
Expand All @@ -237,8 +251,6 @@ impl Client {
return Err(first_error);
}

op.update_for_retry();

match self
.execute_operation_on_connection(op, &mut conn, session, txn_number, &retryability)
.await
Expand Down Expand Up @@ -277,7 +289,8 @@ impl Client {
wc.validate()?;
}

let mut cmd = op.build(connection.stream_description()?)?;
let stream_description = connection.stream_description()?;
let mut cmd = op.build(stream_description)?;
self.inner
.topology
.update_command_with_read_pref(connection.address(), &mut cmd, op.selection_criteria())
Expand Down Expand Up @@ -315,6 +328,9 @@ impl Client {
cmd.set_start_transaction();
cmd.set_autocommit();
cmd.set_txn_read_concern(*session)?;
if stream_description.initial_server_type == ServerType::Mongos {
session.pin_mongos(connection.address().clone());
}
session.transaction.state = TransactionState::InProgress;
}
TransactionState::InProgress
Expand Down Expand Up @@ -460,13 +476,13 @@ impl Client {
handler.handle_command_failed_event(command_failed_event);
});

if let Some(session) = session {
if let Some(ref mut session) = session {
if err.is_network_error() {
session.mark_dirty();
}
}

err.add_labels(Some(connection), session, Some(retryability))?;
err.add_labels_and_update_pin(Some(connection), session, Some(retryability))?;
op.handle_error(err)
}
Ok(response) => {
Expand Down Expand Up @@ -495,7 +511,11 @@ impl Client {
match op.handle_response(response.deserialized, connection.stream_description()?) {
Ok(response) => Ok(response),
Err(mut err) => {
err.add_labels(Some(connection), session, Some(retryability))?;
err.add_labels_and_update_pin(
Some(connection),
session,
Some(retryability),
)?;
Err(err)
}
}
Expand Down Expand Up @@ -609,7 +629,7 @@ impl Client {
}

impl Error {
/// Adds the necessary labels to this Error.
/// Adds the necessary labels to this Error, and unpins the session if needed.
///
/// A TransientTransactionError label should be added if a transaction is in progress and the
/// error is a network or server selection error.
Expand All @@ -619,10 +639,13 @@ impl Error {
/// server version, a label should only be added if the `retry_writes` client option is not set
/// to `false`, the operation during which the error occured is write-retryable, and a
/// TransientTransactionError label has not already been added.
fn add_labels(
///
/// If the TransientTransactionError or UnknownTransactionCommitResult labels are added, the
/// ClientSession should be unpinned.
fn add_labels_and_update_pin(
&mut self,
conn: Option<&Connection>,
session: &Option<&mut ClientSession>,
session: &mut Option<&mut ClientSession>,
retryability: Option<&Retryability>,
) -> Result<()> {
let transaction_state = session.as_ref().map_or(&TransactionState::None, |session| {
Expand Down Expand Up @@ -666,6 +689,15 @@ impl Error {
}
}
}

if let Some(ref mut session) = session {
if self.contains_label(TRANSIENT_TRANSACTION_ERROR)
|| self.contains_label(UNKNOWN_TRANSACTION_COMMIT_RESULT)
{
session.unpin_mongos();
}
}

Ok(())
}
}
Expand Down
29 changes: 26 additions & 3 deletions src/client/session/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ mod test;

use std::{
collections::HashSet,
sync::Arc,
time::{Duration, Instant},
};

Expand All @@ -16,13 +17,16 @@ use crate::{
error::{ErrorKind, Result},
operation::{AbortTransaction, CommitTransaction, Operation},
options::{SessionOptions, TransactionOptions},
sdam::TransactionSupportStatus,
sdam::{ServerInfo, TransactionSupportStatus},
selection_criteria::SelectionCriteria,
Client,
RUNTIME,
};
pub(crate) use cluster_time::ClusterTime;
pub(super) use pool::ServerSessionPool;

use super::options::ServerAddress;

lazy_static! {
pub(crate) static ref SESSIONS_UNSUPPORTED_COMMANDS: HashSet<&'static str> = {
let mut hash_set = HashSet::new();
Expand Down Expand Up @@ -95,7 +99,7 @@ lazy_static! {
/// }
/// }
/// ```
// TODO RUST-122 Remove this note and adjust the above description to indicate that sharded
// TODO RUST-734 Remove this note and adjust the above description to indicate that sharded
// transactions are supported on 4.2+
/// Note: the driver does not currently support transactions on sharded clusters.
#[derive(Clone, Debug)]
Expand All @@ -113,6 +117,7 @@ pub struct ClientSession {
pub(crate) struct Transaction {
pub(crate) state: TransactionState,
pub(crate) options: Option<TransactionOptions>,
pub(crate) pinned_mongos: Option<SelectionCriteria>,
}

impl Transaction {
Expand All @@ -128,11 +133,13 @@ impl Transaction {
pub(crate) fn abort(&mut self) {
self.state = TransactionState::Aborted;
self.options = None;
self.pinned_mongos = None;
}

pub(crate) fn reset(&mut self) {
self.state = TransactionState::None;
self.options = None;
self.pinned_mongos = None;
}
}

Expand All @@ -141,6 +148,7 @@ impl Default for Transaction {
Self {
state: TransactionState::None,
options: None,
pinned_mongos: None,
}
}
}
Expand Down Expand Up @@ -245,6 +253,17 @@ impl ClientSession {
self.server_session.txn_number
}

/// Pin mongos to session.
pub(crate) fn pin_mongos(&mut self, address: ServerAddress) {
self.transaction.pinned_mongos = Some(SelectionCriteria::Predicate(Arc::new(
move |server_info: &ServerInfo| *server_info.address() == address,
)));
}

pub(crate) fn unpin_mongos(&mut self) {
self.transaction.pinned_mongos = None;
}

/// Whether this session is dirty.
#[cfg(test)]
pub(crate) fn is_dirty(&self) -> bool {
Expand Down Expand Up @@ -298,6 +317,9 @@ impl ClientSession {
}
.into());
}
TransactionState::Committed { .. } => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it necessary to have this here given that we are unpinning the session at the beginning of execute_operation_with_retry?

Copy link
Contributor Author

@NBSquare NBSquare Jul 7, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think so. Per the spec, we want to unpin the session when:

A new transaction is started on the ClientSession after the previous transaction has been committed. The session MUST be unpinned before performing server selection for the first operation of the new transaction.

Because start_transaction doesn't execute an operation but changes the transaction state to Starting we want to make sure the session is unpinned, so server selection is performed normally when the first operation on the transaction is executed.

self.unpin_mongos(); // Unpin session if previous transaction is committed.
}
_ => {}
}
match self.client.transaction_support_status().await? {
Expand Down Expand Up @@ -472,7 +494,8 @@ impl ClientSession {
.as_ref()
.and_then(|options| options.write_concern.as_ref())
.cloned();
let abort_transaction = AbortTransaction::new(write_concern);
let selection_criteria = self.transaction.pinned_mongos.clone();
let abort_transaction = AbortTransaction::new(write_concern, selection_criteria);
self.transaction.abort();
// Errors returned from running an abortTransaction command should be ignored.
let _result = self
Expand Down
2 changes: 1 addition & 1 deletion src/concern/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ async fn snapshot_read_concern() {
.database(function_name!())
.collection::<Document>(function_name!());

// TODO RUST-122 run this test on sharded clusters
// TODO RUST-734: Run this test on sharded clusters when transactions are complete.
if client.is_replica_set() && client.server_version_gte(4, 0) {
let mut session = client.start_session(None).await.unwrap();
let options = TransactionOptions::builder()
Expand Down
21 changes: 19 additions & 2 deletions src/operation/abort_transaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,25 @@ use crate::{
error::Result,
operation::{Operation, Retryability},
options::WriteConcern,
selection_criteria::SelectionCriteria,
};

use super::{CommandResponse, Response, WriteConcernOnlyBody};

pub(crate) struct AbortTransaction {
write_concern: Option<WriteConcern>,
selection_criteria: Option<SelectionCriteria>,
}

impl AbortTransaction {
pub(crate) fn new(write_concern: Option<WriteConcern>) -> Self {
Self { write_concern }
pub(crate) fn new(
write_concern: Option<WriteConcern>,
selection_criteria: Option<SelectionCriteria>,
) -> Self {
Self {
write_concern,
selection_criteria,
}
}
}

Expand Down Expand Up @@ -47,11 +55,20 @@ impl Operation for AbortTransaction {
response.validate()
}

fn selection_criteria(&self) -> Option<&SelectionCriteria> {
self.selection_criteria.as_ref()
}

fn write_concern(&self) -> Option<&WriteConcern> {
self.write_concern.as_ref()
}

fn retryability(&self) -> Retryability {
Retryability::Write
}

fn update_for_retry(&mut self) {
// The session must be "unpinned" before server selection for a retry.
self.selection_criteria = None;
}
}
24 changes: 9 additions & 15 deletions src/sdam/description/topology/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -329,21 +329,15 @@ impl TopologyDescription {
self.transaction_support_status = TransactionSupportStatus::Unsupported;
}
if let Ok(Some(max_wire_version)) = server_description.max_wire_version() {
match self.topology_type {
TopologyType::Sharded => {
// TODO RUST-122: support transactions on sharded clusters
self.transaction_support_status = TransactionSupportStatus::Unsupported;
}
_ => {
if max_wire_version < 7 {
self.transaction_support_status = TransactionSupportStatus::Unsupported;
} else {
self.transaction_support_status = TransactionSupportStatus::Supported;
}
}
// TODO RUST-734: Evaluate whether we should permanently support sharded transactions.
// If we leave the feature as unsupported, we should revert this code.
self.transaction_support_status = if max_wire_version < 7
|| (max_wire_version < 8 && self.topology_type == TopologyType::Sharded)
{
TransactionSupportStatus::Unsupported
} else {
TransactionSupportStatus::Supported
}
} else {
self.transaction_support_status = TransactionSupportStatus::Unsupported;
}
}

Expand Down Expand Up @@ -752,7 +746,7 @@ pub(crate) enum TransactionSupportStatus {

/// Transactions are supported by this topology. A topology supports transactions if it
/// supports sessions and its maxWireVersion >= 7. Transactions are not currently supported
/// on sharded clusters (TODO RUST-122).
/// on sharded clusters (TODO RUST-734).
///
/// Note that meeting these conditions does not guarantee that a deployment
/// supports transactions; any other missing qualification will be reported by the server.
Expand Down
2 changes: 1 addition & 1 deletion src/sync/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ fn transactions() {

let should_skip = RUNTIME.block_on(async {
let test_client = AsyncTestClient::new().await;
// TODO RUST-122: Unskip this test on sharded clusters
// TODO RUST-734: Unskip this test on sharded clusters when transactions are complete.
!test_client.is_replica_set() || test_client.server_version_lt(4, 0)
});
if should_skip {
Expand Down
Loading