Skip to content

Commit 6902da1

Browse files
NBSquareNathan Blinn
authored andcommitted
RUST-122 Support mongos pinning for sharded transactions (#383)
1 parent 25ea495 commit 6902da1

File tree

20 files changed

+9293
-61
lines changed

20 files changed

+9293
-61
lines changed

src/client/executor.rs

Lines changed: 45 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,13 @@ use crate::{
2626
Retryability,
2727
},
2828
options::SelectionCriteria,
29-
sdam::{HandshakePhase, SelectedServer, SessionSupportStatus, TransactionSupportStatus},
29+
sdam::{
30+
HandshakePhase,
31+
SelectedServer,
32+
ServerType,
33+
SessionSupportStatus,
34+
TransactionSupportStatus,
35+
},
3036
selection_criteria::ReadPreference,
3137
};
3238

@@ -82,6 +88,7 @@ impl Client {
8288
.into());
8389
}
8490
}
91+
8592
self.execute_operation_with_retry(op, Some(session)).await
8693
}
8794
None => {
@@ -126,18 +133,23 @@ impl Client {
126133
}
127134
}
128135

129-
let server = match self.select_server(op.selection_criteria()).await {
136+
let selection_criteria = session
137+
.as_ref()
138+
.and_then(|s| s.transaction.pinned_mongos.as_ref())
139+
.or_else(|| op.selection_criteria());
140+
141+
let server = match self.select_server(selection_criteria).await {
130142
Ok(server) => server,
131143
Err(mut err) => {
132-
err.add_labels(None, &session, None)?;
144+
err.add_labels_and_update_pin(None, &mut session, None)?;
133145
return Err(err);
134146
}
135147
};
136148

137149
let mut conn = match server.pool.check_out().await {
138150
Ok(conn) => conn,
139151
Err(mut err) => {
140-
err.add_labels(None, &session, None)?;
152+
err.add_labels_and_update_pin(None, &mut session, None)?;
141153

142154
if err.is_pool_cleared() {
143155
return self.execute_retry(&mut op, &mut session, None, err).await;
@@ -220,6 +232,8 @@ impl Client {
220232
txn_number: Option<u64>,
221233
first_error: Error,
222234
) -> Result<T::O> {
235+
op.update_for_retry();
236+
223237
let server = match self.select_server(op.selection_criteria()).await {
224238
Ok(server) => server,
225239
Err(_) => {
@@ -237,8 +251,6 @@ impl Client {
237251
return Err(first_error);
238252
}
239253

240-
op.update_for_retry();
241-
242254
match self
243255
.execute_operation_on_connection(op, &mut conn, session, txn_number, &retryability)
244256
.await
@@ -277,7 +289,8 @@ impl Client {
277289
wc.validate()?;
278290
}
279291

280-
let mut cmd = op.build(connection.stream_description()?)?;
292+
let stream_description = connection.stream_description()?;
293+
let mut cmd = op.build(stream_description)?;
281294
self.inner
282295
.topology
283296
.update_command_with_read_pref(connection.address(), &mut cmd, op.selection_criteria())
@@ -315,6 +328,9 @@ impl Client {
315328
cmd.set_start_transaction();
316329
cmd.set_autocommit();
317330
cmd.set_txn_read_concern(*session)?;
331+
if stream_description.initial_server_type == ServerType::Mongos {
332+
session.pin_mongos(connection.address().clone());
333+
}
318334
session.transaction.state = TransactionState::InProgress;
319335
}
320336
TransactionState::InProgress
@@ -460,13 +476,13 @@ impl Client {
460476
handler.handle_command_failed_event(command_failed_event);
461477
});
462478

463-
if let Some(session) = session {
479+
if let Some(ref mut session) = session {
464480
if err.is_network_error() {
465481
session.mark_dirty();
466482
}
467483
}
468484

469-
err.add_labels(Some(connection), session, Some(retryability))?;
485+
err.add_labels_and_update_pin(Some(connection), session, Some(retryability))?;
470486
op.handle_error(err)
471487
}
472488
Ok(response) => {
@@ -495,7 +511,11 @@ impl Client {
495511
match op.handle_response(response.deserialized, connection.stream_description()?) {
496512
Ok(response) => Ok(response),
497513
Err(mut err) => {
498-
err.add_labels(Some(connection), session, Some(retryability))?;
514+
err.add_labels_and_update_pin(
515+
Some(connection),
516+
session,
517+
Some(retryability),
518+
)?;
499519
Err(err)
500520
}
501521
}
@@ -609,7 +629,7 @@ impl Client {
609629
}
610630

611631
impl Error {
612-
/// Adds the necessary labels to this Error.
632+
/// Adds the necessary labels to this Error, and unpins the session if needed.
613633
///
614634
/// A TransientTransactionError label should be added if a transaction is in progress and the
615635
/// error is a network or server selection error.
@@ -619,10 +639,13 @@ impl Error {
619639
/// server version, a label should only be added if the `retry_writes` client option is not set
620640
/// to `false`, the operation during which the error occured is write-retryable, and a
621641
/// TransientTransactionError label has not already been added.
622-
fn add_labels(
642+
///
643+
/// If the TransientTransactionError or UnknownTransactionCommitResult labels are added, the
644+
/// ClientSession should be unpinned.
645+
fn add_labels_and_update_pin(
623646
&mut self,
624647
conn: Option<&Connection>,
625-
session: &Option<&mut ClientSession>,
648+
session: &mut Option<&mut ClientSession>,
626649
retryability: Option<&Retryability>,
627650
) -> Result<()> {
628651
let transaction_state = session.as_ref().map_or(&TransactionState::None, |session| {
@@ -666,6 +689,15 @@ impl Error {
666689
}
667690
}
668691
}
692+
693+
if let Some(ref mut session) = session {
694+
if self.contains_label(TRANSIENT_TRANSACTION_ERROR)
695+
|| self.contains_label(UNKNOWN_TRANSACTION_COMMIT_RESULT)
696+
{
697+
session.unpin_mongos();
698+
}
699+
}
700+
669701
Ok(())
670702
}
671703
}

src/client/session/mod.rs

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ mod test;
55

66
use std::{
77
collections::HashSet,
8+
sync::Arc,
89
time::{Duration, Instant},
910
};
1011

@@ -16,13 +17,16 @@ use crate::{
1617
error::{ErrorKind, Result},
1718
operation::{AbortTransaction, CommitTransaction, Operation},
1819
options::{SessionOptions, TransactionOptions},
19-
sdam::TransactionSupportStatus,
20+
sdam::{ServerInfo, TransactionSupportStatus},
21+
selection_criteria::SelectionCriteria,
2022
Client,
2123
RUNTIME,
2224
};
2325
pub(crate) use cluster_time::ClusterTime;
2426
pub(super) use pool::ServerSessionPool;
2527

28+
use super::options::ServerAddress;
29+
2630
lazy_static! {
2731
pub(crate) static ref SESSIONS_UNSUPPORTED_COMMANDS: HashSet<&'static str> = {
2832
let mut hash_set = HashSet::new();
@@ -95,7 +99,7 @@ lazy_static! {
9599
/// }
96100
/// }
97101
/// ```
98-
// TODO RUST-122 Remove this note and adjust the above description to indicate that sharded
102+
// TODO RUST-734 Remove this note and adjust the above description to indicate that sharded
99103
// transactions are supported on 4.2+
100104
/// Note: the driver does not currently support transactions on sharded clusters.
101105
#[derive(Clone, Debug)]
@@ -113,6 +117,7 @@ pub struct ClientSession {
113117
pub(crate) struct Transaction {
114118
pub(crate) state: TransactionState,
115119
pub(crate) options: Option<TransactionOptions>,
120+
pub(crate) pinned_mongos: Option<SelectionCriteria>,
116121
}
117122

118123
impl Transaction {
@@ -128,11 +133,13 @@ impl Transaction {
128133
pub(crate) fn abort(&mut self) {
129134
self.state = TransactionState::Aborted;
130135
self.options = None;
136+
self.pinned_mongos = None;
131137
}
132138

133139
pub(crate) fn reset(&mut self) {
134140
self.state = TransactionState::None;
135141
self.options = None;
142+
self.pinned_mongos = None;
136143
}
137144
}
138145

@@ -141,6 +148,7 @@ impl Default for Transaction {
141148
Self {
142149
state: TransactionState::None,
143150
options: None,
151+
pinned_mongos: None,
144152
}
145153
}
146154
}
@@ -245,6 +253,17 @@ impl ClientSession {
245253
self.server_session.txn_number
246254
}
247255

256+
/// Pin mongos to session.
257+
pub(crate) fn pin_mongos(&mut self, address: ServerAddress) {
258+
self.transaction.pinned_mongos = Some(SelectionCriteria::Predicate(Arc::new(
259+
move |server_info: &ServerInfo| *server_info.address() == address,
260+
)));
261+
}
262+
263+
pub(crate) fn unpin_mongos(&mut self) {
264+
self.transaction.pinned_mongos = None;
265+
}
266+
248267
/// Whether this session is dirty.
249268
#[cfg(test)]
250269
pub(crate) fn is_dirty(&self) -> bool {
@@ -298,6 +317,9 @@ impl ClientSession {
298317
}
299318
.into());
300319
}
320+
TransactionState::Committed { .. } => {
321+
self.unpin_mongos(); // Unpin session if previous transaction is committed.
322+
}
301323
_ => {}
302324
}
303325
match self.client.transaction_support_status().await? {
@@ -472,7 +494,8 @@ impl ClientSession {
472494
.as_ref()
473495
.and_then(|options| options.write_concern.as_ref())
474496
.cloned();
475-
let abort_transaction = AbortTransaction::new(write_concern);
497+
let selection_criteria = self.transaction.pinned_mongos.clone();
498+
let abort_transaction = AbortTransaction::new(write_concern, selection_criteria);
476499
self.transaction.abort();
477500
// Errors returned from running an abortTransaction command should be ignored.
478501
let _result = self

src/concern/test.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,7 @@ async fn snapshot_read_concern() {
179179
.database(function_name!())
180180
.collection::<Document>(function_name!());
181181

182-
// TODO RUST-122 run this test on sharded clusters
182+
// TODO RUST-734: Run this test on sharded clusters when transactions are complete.
183183
if client.is_replica_set() && client.server_version_gte(4, 0) {
184184
let mut session = client.start_session(None).await.unwrap();
185185
let options = TransactionOptions::builder()

src/operation/abort_transaction/mod.rs

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,17 +4,25 @@ use crate::{
44
error::Result,
55
operation::{Operation, Retryability},
66
options::WriteConcern,
7+
selection_criteria::SelectionCriteria,
78
};
89

910
use super::{CommandResponse, Response, WriteConcernOnlyBody};
1011

1112
pub(crate) struct AbortTransaction {
1213
write_concern: Option<WriteConcern>,
14+
selection_criteria: Option<SelectionCriteria>,
1315
}
1416

1517
impl AbortTransaction {
16-
pub(crate) fn new(write_concern: Option<WriteConcern>) -> Self {
17-
Self { write_concern }
18+
pub(crate) fn new(
19+
write_concern: Option<WriteConcern>,
20+
selection_criteria: Option<SelectionCriteria>,
21+
) -> Self {
22+
Self {
23+
write_concern,
24+
selection_criteria,
25+
}
1826
}
1927
}
2028

@@ -47,11 +55,20 @@ impl Operation for AbortTransaction {
4755
response.validate()
4856
}
4957

58+
fn selection_criteria(&self) -> Option<&SelectionCriteria> {
59+
self.selection_criteria.as_ref()
60+
}
61+
5062
fn write_concern(&self) -> Option<&WriteConcern> {
5163
self.write_concern.as_ref()
5264
}
5365

5466
fn retryability(&self) -> Retryability {
5567
Retryability::Write
5668
}
69+
70+
fn update_for_retry(&mut self) {
71+
// The session must be "unpinned" before server selection for a retry.
72+
self.selection_criteria = None;
73+
}
5774
}

src/sdam/description/topology/mod.rs

Lines changed: 9 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -329,21 +329,15 @@ impl TopologyDescription {
329329
self.transaction_support_status = TransactionSupportStatus::Unsupported;
330330
}
331331
if let Ok(Some(max_wire_version)) = server_description.max_wire_version() {
332-
match self.topology_type {
333-
TopologyType::Sharded => {
334-
// TODO RUST-122: support transactions on sharded clusters
335-
self.transaction_support_status = TransactionSupportStatus::Unsupported;
336-
}
337-
_ => {
338-
if max_wire_version < 7 {
339-
self.transaction_support_status = TransactionSupportStatus::Unsupported;
340-
} else {
341-
self.transaction_support_status = TransactionSupportStatus::Supported;
342-
}
343-
}
332+
// TODO RUST-734: Evaluate whether we should permanently support sharded transactions.
333+
// If we leave the feature as unsupported, we should revert this code.
334+
self.transaction_support_status = if max_wire_version < 7
335+
|| (max_wire_version < 8 && self.topology_type == TopologyType::Sharded)
336+
{
337+
TransactionSupportStatus::Unsupported
338+
} else {
339+
TransactionSupportStatus::Supported
344340
}
345-
} else {
346-
self.transaction_support_status = TransactionSupportStatus::Unsupported;
347341
}
348342
}
349343

@@ -752,7 +746,7 @@ pub(crate) enum TransactionSupportStatus {
752746

753747
/// Transactions are supported by this topology. A topology supports transactions if it
754748
/// supports sessions and its maxWireVersion >= 7. Transactions are not currently supported
755-
/// on sharded clusters (TODO RUST-122).
749+
/// on sharded clusters (TODO RUST-734).
756750
///
757751
/// Note that meeting these conditions does not guarantee that a deployment
758752
/// supports transactions; any other missing qualification will be reported by the server.

src/sync/test.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,7 @@ fn transactions() {
202202

203203
let should_skip = RUNTIME.block_on(async {
204204
let test_client = AsyncTestClient::new().await;
205-
// TODO RUST-122: Unskip this test on sharded clusters
205+
// TODO RUST-734: Unskip this test on sharded clusters when transactions are complete.
206206
!test_client.is_replica_set() || test_client.server_version_lt(4, 0)
207207
});
208208
if should_skip {

0 commit comments

Comments
 (0)