Skip to content

RUST-97 Support sharded transactions recovery token #398

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Jul 28, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
RUST-122 Support mongos pinning for sharded transactions (#383)
  • Loading branch information
NBSquare authored and Nathan Blinn committed Jul 27, 2021
commit 75b9253acc2e2341628cacfd8c9ff14e2093a520
58 changes: 45 additions & 13 deletions src/client/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,13 @@ use crate::{
Retryability,
},
options::SelectionCriteria,
sdam::{HandshakePhase, SelectedServer, SessionSupportStatus, TransactionSupportStatus},
sdam::{
HandshakePhase,
SelectedServer,
ServerType,
SessionSupportStatus,
TransactionSupportStatus,
},
selection_criteria::ReadPreference,
};

Expand Down Expand Up @@ -82,6 +88,7 @@ impl Client {
.into());
}
}

self.execute_operation_with_retry(op, Some(session)).await
}
None => {
Expand Down Expand Up @@ -126,18 +133,23 @@ impl Client {
}
}

let server = match self.select_server(op.selection_criteria()).await {
let selection_criteria = session
.as_ref()
.and_then(|s| s.transaction.pinned_mongos.as_ref())
.or_else(|| op.selection_criteria());

let server = match self.select_server(selection_criteria).await {
Ok(server) => server,
Err(mut err) => {
err.add_labels(None, &session, None)?;
err.add_labels_and_update_pin(None, &mut session, None)?;
return Err(err);
}
};

let mut conn = match server.pool.check_out().await {
Ok(conn) => conn,
Err(mut err) => {
err.add_labels(None, &session, None)?;
err.add_labels_and_update_pin(None, &mut session, None)?;

if err.is_pool_cleared() {
return self.execute_retry(&mut op, &mut session, None, err).await;
Expand Down Expand Up @@ -220,6 +232,8 @@ impl Client {
txn_number: Option<i64>,
first_error: Error,
) -> Result<T::O> {
op.update_for_retry();

let server = match self.select_server(op.selection_criteria()).await {
Ok(server) => server,
Err(_) => {
Expand All @@ -237,8 +251,6 @@ impl Client {
return Err(first_error);
}

op.update_for_retry();

match self
.execute_operation_on_connection(op, &mut conn, session, txn_number, &retryability)
.await
Expand Down Expand Up @@ -277,7 +289,8 @@ impl Client {
wc.validate()?;
}

let mut cmd = op.build(connection.stream_description()?)?;
let stream_description = connection.stream_description()?;
let mut cmd = op.build(stream_description)?;
self.inner
.topology
.update_command_with_read_pref(connection.address(), &mut cmd, op.selection_criteria())
Expand Down Expand Up @@ -315,6 +328,9 @@ impl Client {
cmd.set_start_transaction();
cmd.set_autocommit();
cmd.set_txn_read_concern(*session)?;
if stream_description.initial_server_type == ServerType::Mongos {
session.pin_mongos(connection.address().clone());
}
session.transaction.state = TransactionState::InProgress;
}
TransactionState::InProgress
Expand Down Expand Up @@ -460,13 +476,13 @@ impl Client {
handler.handle_command_failed_event(command_failed_event);
});

if let Some(session) = session {
if let Some(ref mut session) = session {
if err.is_network_error() {
session.mark_dirty();
}
}

err.add_labels(Some(connection), session, Some(retryability))?;
err.add_labels_and_update_pin(Some(connection), session, Some(retryability))?;
op.handle_error(err)
}
Ok(response) => {
Expand Down Expand Up @@ -495,7 +511,11 @@ impl Client {
match op.handle_response(response.deserialized, connection.stream_description()?) {
Ok(response) => Ok(response),
Err(mut err) => {
err.add_labels(Some(connection), session, Some(retryability))?;
err.add_labels_and_update_pin(
Some(connection),
session,
Some(retryability),
)?;
Err(err)
}
}
Expand Down Expand Up @@ -609,7 +629,7 @@ impl Client {
}

impl Error {
/// Adds the necessary labels to this Error.
/// Adds the necessary labels to this Error, and unpins the session if needed.
///
/// A TransientTransactionError label should be added if a transaction is in progress and the
/// error is a network or server selection error.
Expand All @@ -619,10 +639,13 @@ impl Error {
/// server version, a label should only be added if the `retry_writes` client option is not set
/// to `false`, the operation during which the error occured is write-retryable, and a
/// TransientTransactionError label has not already been added.
fn add_labels(
///
/// If the TransientTransactionError or UnknownTransactionCommitResult labels are added, the
/// ClientSession should be unpinned.
fn add_labels_and_update_pin(
&mut self,
conn: Option<&Connection>,
session: &Option<&mut ClientSession>,
session: &mut Option<&mut ClientSession>,
retryability: Option<&Retryability>,
) -> Result<()> {
let transaction_state = session.as_ref().map_or(&TransactionState::None, |session| {
Expand Down Expand Up @@ -666,6 +689,15 @@ impl Error {
}
}
}

if let Some(ref mut session) = session {
if self.contains_label(TRANSIENT_TRANSACTION_ERROR)
|| self.contains_label(UNKNOWN_TRANSACTION_COMMIT_RESULT)
{
session.unpin_mongos();
}
}

Ok(())
}
}
Expand Down
29 changes: 26 additions & 3 deletions src/client/session/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ mod test;

use std::{
collections::HashSet,
sync::Arc,
time::{Duration, Instant},
};

Expand All @@ -16,13 +17,16 @@ use crate::{
error::{ErrorKind, Result},
operation::{AbortTransaction, CommitTransaction, Operation},
options::{SessionOptions, TransactionOptions},
sdam::TransactionSupportStatus,
sdam::{ServerInfo, TransactionSupportStatus},
selection_criteria::SelectionCriteria,
Client,
RUNTIME,
};
pub(crate) use cluster_time::ClusterTime;
pub(super) use pool::ServerSessionPool;

use super::options::ServerAddress;

lazy_static! {
pub(crate) static ref SESSIONS_UNSUPPORTED_COMMANDS: HashSet<&'static str> = {
let mut hash_set = HashSet::new();
Expand Down Expand Up @@ -95,7 +99,7 @@ lazy_static! {
/// }
/// }
/// ```
// TODO RUST-122 Remove this note and adjust the above description to indicate that sharded
// TODO RUST-734 Remove this note and adjust the above description to indicate that sharded
// transactions are supported on 4.2+
/// Note: the driver does not currently support transactions on sharded clusters.
#[derive(Clone, Debug)]
Expand All @@ -113,6 +117,7 @@ pub struct ClientSession {
pub(crate) struct Transaction {
pub(crate) state: TransactionState,
pub(crate) options: Option<TransactionOptions>,
pub(crate) pinned_mongos: Option<SelectionCriteria>,
}

impl Transaction {
Expand All @@ -128,11 +133,13 @@ impl Transaction {
pub(crate) fn abort(&mut self) {
self.state = TransactionState::Aborted;
self.options = None;
self.pinned_mongos = None;
}

pub(crate) fn reset(&mut self) {
self.state = TransactionState::None;
self.options = None;
self.pinned_mongos = None;
}
}

Expand All @@ -141,6 +148,7 @@ impl Default for Transaction {
Self {
state: TransactionState::None,
options: None,
pinned_mongos: None,
}
}
}
Expand Down Expand Up @@ -245,6 +253,17 @@ impl ClientSession {
self.server_session.txn_number
}

/// Pin mongos to session.
pub(crate) fn pin_mongos(&mut self, address: ServerAddress) {
self.transaction.pinned_mongos = Some(SelectionCriteria::Predicate(Arc::new(
move |server_info: &ServerInfo| *server_info.address() == address,
)));
}

pub(crate) fn unpin_mongos(&mut self) {
self.transaction.pinned_mongos = None;
}

/// Whether this session is dirty.
#[cfg(test)]
pub(crate) fn is_dirty(&self) -> bool {
Expand Down Expand Up @@ -298,6 +317,9 @@ impl ClientSession {
}
.into());
}
TransactionState::Committed { .. } => {
self.unpin_mongos(); // Unpin session if previous transaction is committed.
}
_ => {}
}
match self.client.transaction_support_status().await? {
Expand Down Expand Up @@ -472,7 +494,8 @@ impl ClientSession {
.as_ref()
.and_then(|options| options.write_concern.as_ref())
.cloned();
let abort_transaction = AbortTransaction::new(write_concern);
let selection_criteria = self.transaction.pinned_mongos.clone();
let abort_transaction = AbortTransaction::new(write_concern, selection_criteria);
self.transaction.abort();
// Errors returned from running an abortTransaction command should be ignored.
let _result = self
Expand Down
2 changes: 1 addition & 1 deletion src/concern/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ async fn snapshot_read_concern() {
.database(function_name!())
.collection::<Document>(function_name!());

// TODO RUST-122 run this test on sharded clusters
// TODO RUST-734: Run this test on sharded clusters when transactions are complete.
if client.is_replica_set() && client.server_version_gte(4, 0) {
let mut session = client.start_session(None).await.unwrap();
let options = TransactionOptions::builder()
Expand Down
21 changes: 19 additions & 2 deletions src/operation/abort_transaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,25 @@ use crate::{
error::Result,
operation::{Operation, Retryability},
options::WriteConcern,
selection_criteria::SelectionCriteria,
};

use super::{CommandResponse, Response, WriteConcernOnlyBody};

pub(crate) struct AbortTransaction {
write_concern: Option<WriteConcern>,
selection_criteria: Option<SelectionCriteria>,
}

impl AbortTransaction {
pub(crate) fn new(write_concern: Option<WriteConcern>) -> Self {
Self { write_concern }
pub(crate) fn new(
write_concern: Option<WriteConcern>,
selection_criteria: Option<SelectionCriteria>,
) -> Self {
Self {
write_concern,
selection_criteria,
}
}
}

Expand Down Expand Up @@ -47,11 +55,20 @@ impl Operation for AbortTransaction {
response.validate()
}

fn selection_criteria(&self) -> Option<&SelectionCriteria> {
self.selection_criteria.as_ref()
}

fn write_concern(&self) -> Option<&WriteConcern> {
self.write_concern.as_ref()
}

fn retryability(&self) -> Retryability {
Retryability::Write
}

fn update_for_retry(&mut self) {
// The session must be "unpinned" before server selection for a retry.
self.selection_criteria = None;
}
}
24 changes: 9 additions & 15 deletions src/sdam/description/topology/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -326,21 +326,15 @@ impl TopologyDescription {
self.transaction_support_status = TransactionSupportStatus::Unsupported;
}
if let Ok(Some(max_wire_version)) = server_description.max_wire_version() {
match self.topology_type {
TopologyType::Sharded => {
// TODO RUST-122: support transactions on sharded clusters
self.transaction_support_status = TransactionSupportStatus::Unsupported;
}
_ => {
if max_wire_version < 7 {
self.transaction_support_status = TransactionSupportStatus::Unsupported;
} else {
self.transaction_support_status = TransactionSupportStatus::Supported;
}
}
// TODO RUST-734: Evaluate whether we should permanently support sharded transactions.
// If we leave the feature as unsupported, we should revert this code.
self.transaction_support_status = if max_wire_version < 7
|| (max_wire_version < 8 && self.topology_type == TopologyType::Sharded)
{
TransactionSupportStatus::Unsupported
} else {
TransactionSupportStatus::Supported
}
} else {
self.transaction_support_status = TransactionSupportStatus::Unsupported;
}
}

Expand Down Expand Up @@ -749,7 +743,7 @@ pub(crate) enum TransactionSupportStatus {

/// Transactions are supported by this topology. A topology supports transactions if it
/// supports sessions and its maxWireVersion >= 7. Transactions are not currently supported
/// on sharded clusters (TODO RUST-122).
/// on sharded clusters (TODO RUST-734).
///
/// Note that meeting these conditions does not guarantee that a deployment
/// supports transactions; any other missing qualification will be reported by the server.
Expand Down
2 changes: 1 addition & 1 deletion src/sync/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ fn transactions() {

let should_skip = RUNTIME.block_on(async {
let test_client = AsyncTestClient::new().await;
// TODO RUST-122: Unskip this test on sharded clusters
// TODO RUST-734: Unskip this test on sharded clusters when transactions are complete.
!test_client.is_replica_set() || test_client.server_version_lt(4, 0)
});
if should_skip {
Expand Down
Loading