Skip to content

Commit 43bd239

Browse files
abr-egnAndrew Witten
authored andcommitted
RUST-954 Pin cursor connections in load balancer mode (mongodb#446)
1 parent c930f6a commit 43bd239

File tree

14 files changed

+450
-131
lines changed

14 files changed

+450
-131
lines changed

src/client/executor.rs

Lines changed: 109 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
11
use bson::doc;
22
use lazy_static::lazy_static;
3+
use serde::de::DeserializeOwned;
34

45
use std::{collections::HashSet, sync::Arc, time::Instant};
56

67
use super::{session::TransactionState, Client, ClientSession};
78
use crate::{
89
bson::Document,
9-
cmap::{Connection, RawCommand, RawCommandResponse},
10+
cmap::{conn::PinnedConnectionHandle, Connection, RawCommand, RawCommandResponse},
11+
cursor::{session::SessionCursor, Cursor, CursorSpecification},
1012
error::{
1113
Error,
1214
ErrorKind,
@@ -70,6 +72,16 @@ impl Client {
7072
op: T,
7173
session: impl Into<Option<&mut ClientSession>>,
7274
) -> Result<T::O> {
75+
self.execute_operation_with_details(op, session)
76+
.await
77+
.map(|details| details.output.operation_output)
78+
}
79+
80+
async fn execute_operation_with_details<T: Operation>(
81+
&self,
82+
op: T,
83+
session: impl Into<Option<&mut ClientSession>>,
84+
) -> Result<ExecutionDetails<T>> {
7385
Box::pin(async {
7486
// TODO RUST-9: allow unacknowledged write concerns
7587
if !op.is_acknowledged() {
@@ -78,7 +90,8 @@ impl Client {
7890
}
7991
.into());
8092
}
81-
match session.into() {
93+
let mut implicit_session = None;
94+
let session = match session.into() {
8295
Some(session) => {
8396
if !Arc::ptr_eq(&self.inner, &session.client().inner) {
8497
return Err(ErrorKind::InvalidArgument {
@@ -99,41 +112,83 @@ impl Client {
99112
.into());
100113
}
101114
}
102-
self.execute_operation_with_retry(op, Some(session)).await
115+
Some(session)
103116
}
104117
None => {
105-
let mut implicit_session = self.start_implicit_session(&op).await?;
106-
self.execute_operation_with_retry(op, implicit_session.as_mut())
107-
.await
118+
implicit_session = self.start_implicit_session(&op).await?;
119+
implicit_session.as_mut()
108120
}
109-
}
121+
};
122+
let output = self.execute_operation_with_retry(op, session).await?;
123+
Ok(ExecutionDetails {
124+
output,
125+
implicit_session,
126+
})
110127
})
111128
.await
112129
}
113130

114-
/// Execute the given operation, returning the implicit session created for it if one was.
131+
/// Execute the given operation, returning the cursor created by the operation.
115132
///
116133
/// Server selection be will performed using the criteria specified on the operation, if any.
117-
pub(crate) async fn execute_cursor_operation<T: Operation>(
118-
&self,
119-
op: T,
120-
) -> Result<(T::O, Option<ClientSession>)> {
134+
pub(crate) async fn execute_cursor_operation<Op, T>(&self, op: Op) -> Result<Cursor<T>>
135+
where
136+
Op: Operation<O = CursorSpecification<T>>,
137+
T: DeserializeOwned + Unpin + Send + Sync,
138+
{
121139
Box::pin(async {
122-
let mut implicit_session = self.start_implicit_session(&op).await?;
123-
self.execute_operation_with_retry(op, implicit_session.as_mut())
124-
.await
125-
.map(|result| (result, implicit_session))
140+
let mut details = self.execute_operation_with_details(op, None).await?;
141+
let pinned = self.pin_connection_for_cursor(&mut details.output)?;
142+
Ok(Cursor::new(
143+
self.clone(),
144+
details.output.operation_output,
145+
details.implicit_session,
146+
pinned,
147+
))
126148
})
127149
.await
128150
}
129151

152+
pub(crate) async fn execute_session_cursor_operation<Op, T>(
153+
&self,
154+
op: Op,
155+
session: &mut ClientSession,
156+
) -> Result<SessionCursor<T>>
157+
where
158+
Op: Operation<O = CursorSpecification<T>>,
159+
T: DeserializeOwned + Unpin + Send + Sync,
160+
{
161+
let mut details = self.execute_operation_with_details(op, session).await?;
162+
let pinned = self.pin_connection_for_cursor(&mut details.output)?;
163+
Ok(SessionCursor::new(
164+
self.clone(),
165+
details.output.operation_output,
166+
pinned,
167+
))
168+
}
169+
170+
fn pin_connection_for_cursor<Op, T>(
171+
&self,
172+
details: &mut ExecutionOutput<Op>,
173+
) -> Result<Option<PinnedConnectionHandle>>
174+
where
175+
Op: Operation<O = CursorSpecification<T>>,
176+
{
177+
let is_load_balanced = self.inner.options.load_balanced.unwrap_or(false);
178+
if is_load_balanced && details.operation_output.info.id != 0 {
179+
Ok(Some(details.connection.pin()?))
180+
} else {
181+
Ok(None)
182+
}
183+
}
184+
130185
/// Selects a server and executes the given operation on it, optionally using a provided
131186
/// session. Retries the operation upon failure if retryability is supported.
132187
async fn execute_operation_with_retry<T: Operation>(
133188
&self,
134189
mut op: T,
135190
mut session: Option<&mut ClientSession>,
136-
) -> Result<T::O> {
191+
) -> Result<ExecutionOutput<T>> {
137192
// If the current transaction has been committed/aborted and it is not being
138193
// re-committed/re-aborted, reset the transaction's state to TransactionState::None.
139194
if let Some(ref mut session) = session {
@@ -161,17 +216,20 @@ impl Client {
161216
}
162217
};
163218

164-
let mut conn = match server.pool.check_out().await {
165-
Ok(conn) => conn,
166-
Err(mut err) => {
167-
err.add_labels_and_update_pin(None, &mut session, None)?;
219+
let mut conn = match op.pinned_connection() {
220+
Some(l) => l.take_connection().await?,
221+
None => match server.pool.check_out().await {
222+
Ok(c) => c,
223+
Err(mut err) => {
224+
err.add_labels_and_update_pin(None, &mut session, None)?;
168225

169-
if err.is_pool_cleared() {
170-
return self.execute_retry(&mut op, &mut session, None, err).await;
171-
} else {
172-
return Err(err);
226+
if err.is_pool_cleared() {
227+
return self.execute_retry(&mut op, &mut session, None, err).await;
228+
} else {
229+
return Err(err);
230+
}
173231
}
174-
}
232+
},
175233
};
176234

177235
let retryability = self.get_retryability(&conn, &op, &session).await?;
@@ -200,7 +258,10 @@ impl Client {
200258
)
201259
.await
202260
{
203-
Ok(result) => Ok(result),
261+
Ok(operation_output) => Ok(ExecutionOutput {
262+
operation_output,
263+
connection: conn,
264+
}),
204265
Err(mut err) => {
205266
// Retryable writes are only supported by storage engines with document-level
206267
// locking, so users need to disable retryable writes if using mmapv1.
@@ -246,7 +307,7 @@ impl Client {
246307
session: &mut Option<&mut ClientSession>,
247308
txn_number: Option<i64>,
248309
first_error: Error,
249-
) -> Result<T::O> {
310+
) -> Result<ExecutionOutput<T>> {
250311
op.update_for_retry();
251312

252313
let server = match self.select_server(op.selection_criteria()).await {
@@ -256,9 +317,12 @@ impl Client {
256317
}
257318
};
258319

259-
let mut conn = match server.pool.check_out().await {
260-
Ok(c) => c,
261-
Err(_) => return Err(first_error),
320+
let mut conn = match op.pinned_connection() {
321+
Some(c) => c.take_connection().await?,
322+
None => match server.pool.check_out().await {
323+
Ok(c) => c,
324+
Err(_) => return Err(first_error),
325+
},
262326
};
263327

264328
let retryability = self.get_retryability(&conn, op, session).await?;
@@ -270,7 +334,10 @@ impl Client {
270334
.execute_operation_on_connection(op, &mut conn, session, txn_number, &retryability)
271335
.await
272336
{
273-
Ok(result) => Ok(result),
337+
Ok(operation_output) => Ok(ExecutionOutput {
338+
operation_output,
339+
connection: conn,
340+
}),
274341
Err(err) => {
275342
self.inner
276343
.topology
@@ -761,3 +828,13 @@ struct CommandResult<T> {
761828
raw: RawCommandResponse,
762829
deserialized: T,
763830
}
831+
832+
struct ExecutionDetails<T: Operation> {
833+
output: ExecutionOutput<T>,
834+
implicit_session: Option<ClientSession>,
835+
}
836+
837+
struct ExecutionOutput<T: Operation> {
838+
operation_output: T::O,
839+
connection: Connection,
840+
}

src/cmap/conn/mod.rs

Lines changed: 95 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use std::{
88
};
99

1010
use derivative::Derivative;
11+
use tokio::sync::{mpsc, Mutex};
1112

1213
use self::wire::Message;
1314
use super::manager::PoolManager;
@@ -17,7 +18,7 @@ use crate::{
1718
options::{ConnectionOptions, StreamOptions},
1819
PoolGeneration,
1920
},
20-
error::{load_balanced_mode_mismatch, ErrorKind, Result},
21+
error::{load_balanced_mode_mismatch, Error, ErrorKind, Result},
2122
event::cmap::{
2223
CmapEventHandler,
2324
ConnectionCheckedInEvent,
@@ -75,6 +76,10 @@ pub(crate) struct Connection {
7576

7677
stream: AsyncStream,
7778

79+
/// If the connection is pinned to a cursor or transaction, the channel sender to return this
80+
/// connection to the pin holder.
81+
pinned_sender: Option<mpsc::Sender<Connection>>,
82+
7883
#[derivative(Debug = "ignore")]
7984
handler: Option<Arc<dyn CmapEventHandler>>,
8085
}
@@ -103,6 +108,7 @@ impl Connection {
103108
handler: options.and_then(|options| options.event_handler),
104109
stream_description: None,
105110
error: false,
111+
pinned_sender: None,
106112
};
107113

108114
Ok(conn)
@@ -286,6 +292,27 @@ impl Connection {
286292
})
287293
}
288294

295+
/// Pin the connection, removing it from the normal connection pool.
296+
pub(crate) fn pin(&mut self) -> Result<PinnedConnectionHandle> {
297+
if self.pinned_sender.is_some() {
298+
return Err(Error::internal(format!(
299+
"cannot pin an already-pinned connection (id = {})",
300+
self.id
301+
)));
302+
}
303+
if self.pool_manager.is_none() {
304+
return Err(Error::internal(format!(
305+
"cannot pin a checked-in connection (id = {})",
306+
self.id
307+
)));
308+
}
309+
let (tx, rx) = mpsc::channel(1);
310+
self.pinned_sender = Some(tx);
311+
Ok(PinnedConnectionHandle {
312+
receiver: Arc::new(Mutex::new(rx)),
313+
})
314+
}
315+
289316
/// Close this connection, emitting a `ConnectionClosedEvent` with the supplied reason.
290317
pub(super) fn close_and_drop(mut self, reason: ConnectionClosedReason) {
291318
self.close(reason);
@@ -313,6 +340,7 @@ impl Connection {
313340
error: self.error,
314341
pool_manager: None,
315342
ready_and_available_time: None,
343+
pinned_sender: self.pinned_sender.clone(),
316344
}
317345
}
318346
}
@@ -330,8 +358,36 @@ impl Drop for Connection {
330358
// the `close_and_drop` helper explicitly, so we don't add it back to the
331359
// pool or emit any events.
332360
if let Some(pool_manager) = self.pool_manager.take() {
333-
let dropped_connection = self.take();
334-
if let Err(mut conn) = pool_manager.check_in(dropped_connection) {
361+
let mut dropped_connection = self.take();
362+
let result = if let Some(sender) = self.pinned_sender.as_mut() {
363+
// Preserve the timestamp for pinned connections.
364+
dropped_connection.ready_and_available_time = self.ready_and_available_time;
365+
match sender.try_send(dropped_connection) {
366+
Ok(()) => Ok(()),
367+
// The connection has been unpinned and should be checked back in.
368+
Err(mpsc::error::TrySendError::Closed(mut conn)) => {
369+
conn.pinned_sender = None;
370+
conn.ready_and_available_time = None;
371+
pool_manager.check_in(conn)
372+
}
373+
// The connection is being returned to the pin holder while another connection
374+
// is in the pin buffer; this should never happen. Only possible action is to
375+
// check the connection back in.
376+
Err(mpsc::error::TrySendError::Full(mut conn)) => {
377+
// Panic in debug mode
378+
if cfg!(debug_assertions) {
379+
panic!("buffer full when attempting to return a pinned connection")
380+
}
381+
// TODO RUST-230 log an error in non-debug mode.
382+
conn.pinned_sender = None;
383+
conn.ready_and_available_time = None;
384+
pool_manager.check_in(conn)
385+
}
386+
}
387+
} else {
388+
pool_manager.check_in(dropped_connection)
389+
};
390+
if let Err(mut conn) = result {
335391
// the check in failed because the pool has been dropped, so we emit the event
336392
// here and drop the connection.
337393
conn.close(ConnectionClosedReason::PoolClosed);
@@ -340,6 +396,42 @@ impl Drop for Connection {
340396
}
341397
}
342398

399+
/// A handle to a pinned connection - the connection itself can be retrieved or returned to the
400+
/// normal pool via this handle.
401+
#[derive(Debug)]
402+
pub(crate) struct PinnedConnectionHandle {
403+
receiver: Arc<Mutex<mpsc::Receiver<Connection>>>,
404+
}
405+
406+
impl PinnedConnectionHandle {
407+
/// Make a new `PinnedConnectionHandle` that refers to the same connection as this one.
408+
/// Use with care and only when "lending" a handle in a way that can't be expressed as a
409+
/// normal borrow.
410+
pub(crate) fn replicate(&self) -> Self {
411+
Self {
412+
receiver: self.receiver.clone(),
413+
}
414+
}
415+
416+
/// Retrieve the pinned connection, blocking until it's available for use. Will fail if the
417+
/// connection has been unpinned.
418+
pub(crate) async fn take_connection(&self) -> Result<Connection> {
419+
let mut receiver = self.receiver.lock().await;
420+
receiver
421+
.recv()
422+
.await
423+
.ok_or_else(|| Error::internal("cannot take connection after unpin"))
424+
}
425+
426+
/// Return the pinned connection to the normal connection pool.
427+
pub(crate) async fn unpin_connection(&self) {
428+
let mut receiver = self.receiver.lock().await;
429+
receiver.close();
430+
// Ensure any connections buffered in the channel are dropped, returning them to the pool.
431+
while receiver.recv().await.is_some() {}
432+
}
433+
}
434+
343435
#[derive(Debug, Clone)]
344436
pub(crate) enum ConnectionGeneration {
345437
Normal(u32),

0 commit comments

Comments
 (0)