Skip to content

Commit f77590c

Browse files
committed
RUST-122 Support mongos pinning for sharded transactions (#383)
1 parent 92a2764 commit f77590c

File tree

20 files changed

+9233
-61
lines changed

20 files changed

+9233
-61
lines changed

src/client/executor.rs

Lines changed: 45 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,13 @@ use crate::{
2626
Retryability,
2727
},
2828
options::SelectionCriteria,
29-
sdam::{HandshakePhase, SelectedServer, SessionSupportStatus, TransactionSupportStatus},
29+
sdam::{
30+
HandshakePhase,
31+
SelectedServer,
32+
ServerType,
33+
SessionSupportStatus,
34+
TransactionSupportStatus,
35+
},
3036
selection_criteria::ReadPreference,
3137
};
3238

@@ -91,6 +97,7 @@ impl Client {
9197
.into());
9298
}
9399
}
100+
94101
self.execute_operation_with_retry(op, Some(session)).await
95102
}
96103
None => {
@@ -135,18 +142,23 @@ impl Client {
135142
}
136143
}
137144

138-
let server = match self.select_server(op.selection_criteria()).await {
145+
let selection_criteria = session
146+
.as_ref()
147+
.and_then(|s| s.transaction.pinned_mongos.as_ref())
148+
.or_else(|| op.selection_criteria());
149+
150+
let server = match self.select_server(selection_criteria).await {
139151
Ok(server) => server,
140152
Err(mut err) => {
141-
err.add_labels(None, &session, None)?;
153+
err.add_labels_and_update_pin(None, &mut session, None)?;
142154
return Err(err);
143155
}
144156
};
145157

146158
let mut conn = match server.pool.check_out().await {
147159
Ok(conn) => conn,
148160
Err(mut err) => {
149-
err.add_labels(None, &session, None)?;
161+
err.add_labels_and_update_pin(None, &mut session, None)?;
150162

151163
if err.is_pool_cleared() {
152164
return self.execute_retry(&mut op, &mut session, None, err).await;
@@ -229,6 +241,8 @@ impl Client {
229241
txn_number: Option<i64>,
230242
first_error: Error,
231243
) -> Result<T::O> {
244+
op.update_for_retry();
245+
232246
let server = match self.select_server(op.selection_criteria()).await {
233247
Ok(server) => server,
234248
Err(_) => {
@@ -246,8 +260,6 @@ impl Client {
246260
return Err(first_error);
247261
}
248262

249-
op.update_for_retry();
250-
251263
match self
252264
.execute_operation_on_connection(op, &mut conn, session, txn_number, &retryability)
253265
.await
@@ -286,7 +298,8 @@ impl Client {
286298
wc.validate()?;
287299
}
288300

289-
let mut cmd = op.build(connection.stream_description()?)?;
301+
let stream_description = connection.stream_description()?;
302+
let mut cmd = op.build(stream_description)?;
290303
self.inner
291304
.topology
292305
.update_command_with_read_pref(connection.address(), &mut cmd, op.selection_criteria())
@@ -324,6 +337,9 @@ impl Client {
324337
cmd.set_start_transaction();
325338
cmd.set_autocommit();
326339
cmd.set_txn_read_concern(*session)?;
340+
if stream_description.initial_server_type == ServerType::Mongos {
341+
session.pin_mongos(connection.address().clone());
342+
}
327343
session.transaction.state = TransactionState::InProgress;
328344
}
329345
TransactionState::InProgress
@@ -471,13 +487,13 @@ impl Client {
471487
handler.handle_command_failed_event(command_failed_event);
472488
});
473489

474-
if let Some(session) = session {
490+
if let Some(ref mut session) = session {
475491
if err.is_network_error() {
476492
session.mark_dirty();
477493
}
478494
}
479495

480-
err.add_labels(Some(connection), session, Some(retryability))?;
496+
err.add_labels_and_update_pin(Some(connection), session, Some(retryability))?;
481497
op.handle_error(err)
482498
}
483499
Ok(response) => {
@@ -504,7 +520,11 @@ impl Client {
504520
match op.handle_response(response.deserialized, connection.stream_description()?) {
505521
Ok(response) => Ok(response),
506522
Err(mut err) => {
507-
err.add_labels(Some(connection), session, Some(retryability))?;
523+
err.add_labels_and_update_pin(
524+
Some(connection),
525+
session,
526+
Some(retryability),
527+
)?;
508528
Err(err)
509529
}
510530
}
@@ -618,7 +638,7 @@ impl Client {
618638
}
619639

620640
impl Error {
621-
/// Adds the necessary labels to this Error.
641+
/// Adds the necessary labels to this Error, and unpins the session if needed.
622642
///
623643
/// A TransientTransactionError label should be added if a transaction is in progress and the
624644
/// error is a network or server selection error.
@@ -628,10 +648,13 @@ impl Error {
628648
/// server version, a label should only be added if the `retry_writes` client option is not set
629649
/// to `false`, the operation during which the error occured is write-retryable, and a
630650
/// TransientTransactionError label has not already been added.
631-
fn add_labels(
651+
///
652+
/// If the TransientTransactionError or UnknownTransactionCommitResult labels are added, the
653+
/// ClientSession should be unpinned.
654+
fn add_labels_and_update_pin(
632655
&mut self,
633656
conn: Option<&Connection>,
634-
session: &Option<&mut ClientSession>,
657+
session: &mut Option<&mut ClientSession>,
635658
retryability: Option<&Retryability>,
636659
) -> Result<()> {
637660
let transaction_state = session.as_ref().map_or(&TransactionState::None, |session| {
@@ -675,6 +698,15 @@ impl Error {
675698
}
676699
}
677700
}
701+
702+
if let Some(ref mut session) = session {
703+
if self.contains_label(TRANSIENT_TRANSACTION_ERROR)
704+
|| self.contains_label(UNKNOWN_TRANSACTION_COMMIT_RESULT)
705+
{
706+
session.unpin_mongos();
707+
}
708+
}
709+
678710
Ok(())
679711
}
680712
}

src/client/session/mod.rs

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ mod test;
55

66
use std::{
77
collections::HashSet,
8+
sync::Arc,
89
time::{Duration, Instant},
910
};
1011

@@ -16,13 +17,16 @@ use crate::{
1617
error::{ErrorKind, Result},
1718
operation::{AbortTransaction, CommitTransaction, Operation},
1819
options::{SessionOptions, TransactionOptions},
19-
sdam::TransactionSupportStatus,
20+
sdam::{ServerInfo, TransactionSupportStatus},
21+
selection_criteria::SelectionCriteria,
2022
Client,
2123
RUNTIME,
2224
};
2325
pub(crate) use cluster_time::ClusterTime;
2426
pub(super) use pool::ServerSessionPool;
2527

28+
use super::options::ServerAddress;
29+
2630
lazy_static! {
2731
pub(crate) static ref SESSIONS_UNSUPPORTED_COMMANDS: HashSet<&'static str> = {
2832
let mut hash_set = HashSet::new();
@@ -95,7 +99,7 @@ lazy_static! {
9599
/// }
96100
/// }
97101
/// ```
98-
// TODO RUST-122 Remove this note and adjust the above description to indicate that sharded
102+
// TODO RUST-734 Remove this note and adjust the above description to indicate that sharded
99103
// transactions are supported on 4.2+
100104
/// Note: the driver does not currently support transactions on sharded clusters.
101105
#[derive(Clone, Debug)]
@@ -113,6 +117,7 @@ pub struct ClientSession {
113117
pub(crate) struct Transaction {
114118
pub(crate) state: TransactionState,
115119
pub(crate) options: Option<TransactionOptions>,
120+
pub(crate) pinned_mongos: Option<SelectionCriteria>,
116121
}
117122

118123
impl Transaction {
@@ -128,11 +133,13 @@ impl Transaction {
128133
pub(crate) fn abort(&mut self) {
129134
self.state = TransactionState::Aborted;
130135
self.options = None;
136+
self.pinned_mongos = None;
131137
}
132138

133139
pub(crate) fn reset(&mut self) {
134140
self.state = TransactionState::None;
135141
self.options = None;
142+
self.pinned_mongos = None;
136143
}
137144
}
138145

@@ -141,6 +148,7 @@ impl Default for Transaction {
141148
Self {
142149
state: TransactionState::None,
143150
options: None,
151+
pinned_mongos: None,
144152
}
145153
}
146154
}
@@ -245,6 +253,17 @@ impl ClientSession {
245253
self.server_session.txn_number
246254
}
247255

256+
/// Pin mongos to session.
257+
pub(crate) fn pin_mongos(&mut self, address: ServerAddress) {
258+
self.transaction.pinned_mongos = Some(SelectionCriteria::Predicate(Arc::new(
259+
move |server_info: &ServerInfo| *server_info.address() == address,
260+
)));
261+
}
262+
263+
pub(crate) fn unpin_mongos(&mut self) {
264+
self.transaction.pinned_mongos = None;
265+
}
266+
248267
/// Whether this session is dirty.
249268
#[cfg(test)]
250269
pub(crate) fn is_dirty(&self) -> bool {
@@ -298,6 +317,9 @@ impl ClientSession {
298317
}
299318
.into());
300319
}
320+
TransactionState::Committed { .. } => {
321+
self.unpin_mongos(); // Unpin session if previous transaction is committed.
322+
}
301323
_ => {}
302324
}
303325
match self.client.transaction_support_status().await? {
@@ -472,7 +494,8 @@ impl ClientSession {
472494
.as_ref()
473495
.and_then(|options| options.write_concern.as_ref())
474496
.cloned();
475-
let abort_transaction = AbortTransaction::new(write_concern);
497+
let selection_criteria = self.transaction.pinned_mongos.clone();
498+
let abort_transaction = AbortTransaction::new(write_concern, selection_criteria);
476499
self.transaction.abort();
477500
// Errors returned from running an abortTransaction command should be ignored.
478501
let _result = self

src/concern/test.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,7 @@ async fn snapshot_read_concern() {
179179
.database(function_name!())
180180
.collection::<Document>(function_name!());
181181

182-
// TODO RUST-122 run this test on sharded clusters
182+
// TODO RUST-734: Run this test on sharded clusters when transactions are complete.
183183
if client.is_replica_set() && client.server_version_gte(4, 0) {
184184
let mut session = client.start_session(None).await.unwrap();
185185
let options = TransactionOptions::builder()

src/operation/abort_transaction/mod.rs

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,17 +4,25 @@ use crate::{
44
error::Result,
55
operation::{Operation, Retryability},
66
options::WriteConcern,
7+
selection_criteria::SelectionCriteria,
78
};
89

910
use super::{CommandResponse, Response, WriteConcernOnlyBody};
1011

1112
pub(crate) struct AbortTransaction {
1213
write_concern: Option<WriteConcern>,
14+
selection_criteria: Option<SelectionCriteria>,
1315
}
1416

1517
impl AbortTransaction {
16-
pub(crate) fn new(write_concern: Option<WriteConcern>) -> Self {
17-
Self { write_concern }
18+
pub(crate) fn new(
19+
write_concern: Option<WriteConcern>,
20+
selection_criteria: Option<SelectionCriteria>,
21+
) -> Self {
22+
Self {
23+
write_concern,
24+
selection_criteria,
25+
}
1826
}
1927
}
2028

@@ -47,11 +55,20 @@ impl Operation for AbortTransaction {
4755
response.validate()
4856
}
4957

58+
fn selection_criteria(&self) -> Option<&SelectionCriteria> {
59+
self.selection_criteria.as_ref()
60+
}
61+
5062
fn write_concern(&self) -> Option<&WriteConcern> {
5163
self.write_concern.as_ref()
5264
}
5365

5466
fn retryability(&self) -> Retryability {
5567
Retryability::Write
5668
}
69+
70+
fn update_for_retry(&mut self) {
71+
// The session must be "unpinned" before server selection for a retry.
72+
self.selection_criteria = None;
73+
}
5774
}

src/sdam/description/topology/mod.rs

Lines changed: 9 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -326,21 +326,15 @@ impl TopologyDescription {
326326
self.transaction_support_status = TransactionSupportStatus::Unsupported;
327327
}
328328
if let Ok(Some(max_wire_version)) = server_description.max_wire_version() {
329-
match self.topology_type {
330-
TopologyType::Sharded => {
331-
// TODO RUST-122: support transactions on sharded clusters
332-
self.transaction_support_status = TransactionSupportStatus::Unsupported;
333-
}
334-
_ => {
335-
if max_wire_version < 7 {
336-
self.transaction_support_status = TransactionSupportStatus::Unsupported;
337-
} else {
338-
self.transaction_support_status = TransactionSupportStatus::Supported;
339-
}
340-
}
329+
// TODO RUST-734: Evaluate whether we should permanently support sharded transactions.
330+
// If we leave the feature as unsupported, we should revert this code.
331+
self.transaction_support_status = if max_wire_version < 7
332+
|| (max_wire_version < 8 && self.topology_type == TopologyType::Sharded)
333+
{
334+
TransactionSupportStatus::Unsupported
335+
} else {
336+
TransactionSupportStatus::Supported
341337
}
342-
} else {
343-
self.transaction_support_status = TransactionSupportStatus::Unsupported;
344338
}
345339
}
346340

@@ -749,7 +743,7 @@ pub(crate) enum TransactionSupportStatus {
749743

750744
/// Transactions are supported by this topology. A topology supports transactions if it
751745
/// supports sessions and its maxWireVersion >= 7. Transactions are not currently supported
752-
/// on sharded clusters (TODO RUST-122).
746+
/// on sharded clusters (TODO RUST-734).
753747
///
754748
/// Note that meeting these conditions does not guarantee that a deployment
755749
/// supports transactions; any other missing qualification will be reported by the server.

src/sync/test.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,7 @@ fn transactions() {
202202

203203
let should_skip = RUNTIME.block_on(async {
204204
let test_client = AsyncTestClient::new().await;
205-
// TODO RUST-122: Unskip this test on sharded clusters
205+
// TODO RUST-734: Unskip this test on sharded clusters when transactions are complete.
206206
!test_client.is_replica_set() || test_client.server_version_lt(4, 0)
207207
});
208208
if should_skip {

0 commit comments

Comments
 (0)