Skip to content

RUST-954 Pin cursor connections in load balancer mode #446

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
1db15ce
fold Cursor construction into execute_cursor_operation
abr-egn Aug 16, 2021
e884e19
tidy
abr-egn Aug 16, 2021
ad879cd
pass connection out from execute
abr-egn Aug 16, 2021
b8ef956
attach connection to cursor
abr-egn Aug 16, 2021
4fc93aa
don't pin for cursor id 0
abr-egn Aug 16, 2021
7247301
thread connection through to cursor get more execution
abr-egn Aug 16, 2021
b1ffc5f
thread connection to execute_operation
abr-egn Aug 17, 2021
324c0ec
continue plumbing pinned connection
abr-egn Aug 17, 2021
28e00c1
name shuffling
abr-egn Aug 18, 2021
1cea507
shift pinned connection to operation
abr-egn Aug 18, 2021
6dff912
move PinnedConnection from GenericCursor to GetMoreProviders
abr-egn Aug 19, 2021
2dff7c1
finish pinned connection migration to GetMoreProvider
abr-egn Aug 19, 2021
8fdbf43
pass connection through execute; only use arc mutex for session cursor
abr-egn Aug 20, 2021
2dd05a4
use pinned connection in execute_retry
abr-egn Aug 20, 2021
178cf17
drop pinned connection on cursor exhaustion
abr-egn Aug 20, 2021
f727226
use wrapped arc mutex everywhere
abr-egn Aug 30, 2021
0ac80e9
flag pinned connections invalid on network error
abr-egn Aug 30, 2021
c2383ab
rustfmt
abr-egn Aug 30, 2021
4b66b58
lint
abr-egn Aug 30, 2021
a5e3d14
merge
abr-egn Aug 30, 2021
e7f98f1
review tidy
abr-egn Sep 1, 2021
c47d2f1
pin-by-id machinery
abr-egn Sep 2, 2021
4722ff0
pinning via handle
abr-egn Sep 2, 2021
ad5e3a9
pass pinned connection via operation
abr-egn Sep 2, 2021
c7991cc
format and lint
abr-egn Sep 2, 2021
b0b76fe
review updates
abr-egn Sep 8, 2021
b465438
rustfmt
abr-egn Sep 8, 2021
e2c380a
further review revisions
abr-egn Sep 9, 2021
55e8949
simpler pinning
abr-egn Sep 10, 2021
9513eb2
whoops
abr-egn Sep 10, 2021
a2ac863
rustfmt
abr-egn Sep 10, 2021
516f463
remove now-unused pinned connection data
abr-egn Sep 10, 2021
4f496cb
drain channel
abr-egn Sep 13, 2021
429a396
brevity
abr-egn Sep 13, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 109 additions & 32 deletions src/client/executor.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
use bson::doc;
use lazy_static::lazy_static;
use serde::de::DeserializeOwned;

use std::{collections::HashSet, sync::Arc, time::Instant};

use super::{session::TransactionState, Client, ClientSession};
use crate::{
bson::Document,
cmap::{Connection, RawCommand, RawCommandResponse},
cmap::{conn::PinnedConnectionHandle, Connection, RawCommand, RawCommandResponse},
cursor::{session::SessionCursor, Cursor, CursorSpecification},
error::{
Error,
ErrorKind,
Expand Down Expand Up @@ -70,6 +72,16 @@ impl Client {
op: T,
session: impl Into<Option<&mut ClientSession>>,
) -> Result<T::O> {
self.execute_operation_with_details(op, session)
.await
.map(|details| details.output.operation_output)
}

async fn execute_operation_with_details<T: Operation>(
&self,
op: T,
session: impl Into<Option<&mut ClientSession>>,
) -> Result<ExecutionDetails<T>> {
Box::pin(async {
// TODO RUST-9: allow unacknowledged write concerns
if !op.is_acknowledged() {
Expand All @@ -78,7 +90,8 @@ impl Client {
}
.into());
}
match session.into() {
let mut implicit_session = None;
let session = match session.into() {
Some(session) => {
if !Arc::ptr_eq(&self.inner, &session.client().inner) {
return Err(ErrorKind::InvalidArgument {
Expand All @@ -99,41 +112,83 @@ impl Client {
.into());
}
}
self.execute_operation_with_retry(op, Some(session)).await
Some(session)
}
None => {
let mut implicit_session = self.start_implicit_session(&op).await?;
self.execute_operation_with_retry(op, implicit_session.as_mut())
.await
implicit_session = self.start_implicit_session(&op).await?;
implicit_session.as_mut()
}
}
};
let output = self.execute_operation_with_retry(op, session).await?;
Ok(ExecutionDetails {
output,
implicit_session,
})
})
.await
}

/// Execute the given operation, returning the implicit session created for it if one was.
/// Execute the given operation, returning the cursor created by the operation.
///
/// Server selection be will performed using the criteria specified on the operation, if any.
pub(crate) async fn execute_cursor_operation<T: Operation>(
&self,
op: T,
) -> Result<(T::O, Option<ClientSession>)> {
pub(crate) async fn execute_cursor_operation<Op, T>(&self, op: Op) -> Result<Cursor<T>>
where
Op: Operation<O = CursorSpecification<T>>,
T: DeserializeOwned + Unpin + Send + Sync,
{
Box::pin(async {
let mut implicit_session = self.start_implicit_session(&op).await?;
self.execute_operation_with_retry(op, implicit_session.as_mut())
.await
.map(|result| (result, implicit_session))
let mut details = self.execute_operation_with_details(op, None).await?;
let pinned = self.pin_connection_for_cursor(&mut details.output)?;
Ok(Cursor::new(
self.clone(),
details.output.operation_output,
details.implicit_session,
pinned,
))
})
.await
}

pub(crate) async fn execute_session_cursor_operation<Op, T>(
&self,
op: Op,
session: &mut ClientSession,
) -> Result<SessionCursor<T>>
where
Op: Operation<O = CursorSpecification<T>>,
T: DeserializeOwned + Unpin + Send + Sync,
{
let mut details = self.execute_operation_with_details(op, session).await?;
let pinned = self.pin_connection_for_cursor(&mut details.output)?;
Ok(SessionCursor::new(
self.clone(),
details.output.operation_output,
pinned,
))
}

fn pin_connection_for_cursor<Op, T>(
&self,
details: &mut ExecutionOutput<Op>,
) -> Result<Option<PinnedConnectionHandle>>
where
Op: Operation<O = CursorSpecification<T>>,
{
let is_load_balanced = self.inner.options.load_balanced.unwrap_or(false);
if is_load_balanced && details.operation_output.info.id != 0 {
Ok(Some(details.connection.pin()?))
} else {
Ok(None)
}
}

/// Selects a server and executes the given operation on it, optionally using a provided
/// session. Retries the operation upon failure if retryability is supported.
async fn execute_operation_with_retry<T: Operation>(
&self,
mut op: T,
mut session: Option<&mut ClientSession>,
) -> Result<T::O> {
) -> Result<ExecutionOutput<T>> {
// If the current transaction has been committed/aborted and it is not being
// re-committed/re-aborted, reset the transaction's state to TransactionState::None.
if let Some(ref mut session) = session {
Expand Down Expand Up @@ -161,17 +216,20 @@ impl Client {
}
};

let mut conn = match server.pool.check_out().await {
Ok(conn) => conn,
Err(mut err) => {
err.add_labels_and_update_pin(None, &mut session, None)?;
let mut conn = match op.pinned_connection() {
Some(l) => l.take_connection().await?,
None => match server.pool.check_out().await {
Ok(c) => c,
Err(mut err) => {
err.add_labels_and_update_pin(None, &mut session, None)?;

if err.is_pool_cleared() {
return self.execute_retry(&mut op, &mut session, None, err).await;
} else {
return Err(err);
if err.is_pool_cleared() {
return self.execute_retry(&mut op, &mut session, None, err).await;
} else {
return Err(err);
}
}
}
},
};

let retryability = self.get_retryability(&conn, &op, &session).await?;
Expand Down Expand Up @@ -200,7 +258,10 @@ impl Client {
)
.await
{
Ok(result) => Ok(result),
Ok(operation_output) => Ok(ExecutionOutput {
operation_output,
connection: conn,
}),
Err(mut err) => {
// Retryable writes are only supported by storage engines with document-level
// locking, so users need to disable retryable writes if using mmapv1.
Expand Down Expand Up @@ -246,7 +307,7 @@ impl Client {
session: &mut Option<&mut ClientSession>,
txn_number: Option<i64>,
first_error: Error,
) -> Result<T::O> {
) -> Result<ExecutionOutput<T>> {
op.update_for_retry();

let server = match self.select_server(op.selection_criteria()).await {
Expand All @@ -256,9 +317,12 @@ impl Client {
}
};

let mut conn = match server.pool.check_out().await {
Ok(c) => c,
Err(_) => return Err(first_error),
let mut conn = match op.pinned_connection() {
Some(c) => c.take_connection().await?,
None => match server.pool.check_out().await {
Ok(c) => c,
Err(_) => return Err(first_error),
},
};

let retryability = self.get_retryability(&conn, op, session).await?;
Expand All @@ -270,7 +334,10 @@ impl Client {
.execute_operation_on_connection(op, &mut conn, session, txn_number, &retryability)
.await
{
Ok(result) => Ok(result),
Ok(operation_output) => Ok(ExecutionOutput {
operation_output,
connection: conn,
}),
Err(err) => {
self.inner
.topology
Expand Down Expand Up @@ -761,3 +828,13 @@ struct CommandResult<T> {
raw: RawCommandResponse,
deserialized: T,
}

struct ExecutionDetails<T: Operation> {
output: ExecutionOutput<T>,
implicit_session: Option<ClientSession>,
}

struct ExecutionOutput<T: Operation> {
operation_output: T::O,
connection: Connection,
}
98 changes: 95 additions & 3 deletions src/cmap/conn/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use std::{
};

use derivative::Derivative;
use tokio::sync::{mpsc, Mutex};

use self::wire::Message;
use super::manager::PoolManager;
Expand All @@ -17,7 +18,7 @@ use crate::{
options::{ConnectionOptions, StreamOptions},
PoolGeneration,
},
error::{load_balanced_mode_mismatch, ErrorKind, Result},
error::{load_balanced_mode_mismatch, Error, ErrorKind, Result},
event::cmap::{
CmapEventHandler,
ConnectionCheckedInEvent,
Expand Down Expand Up @@ -75,6 +76,10 @@ pub(crate) struct Connection {

stream: AsyncStream,

/// If the connection is pinned to a cursor or transaction, the channel sender to return this
/// connection to the pin holder.
pinned_sender: Option<mpsc::Sender<Connection>>,

#[derivative(Debug = "ignore")]
handler: Option<Arc<dyn CmapEventHandler>>,
}
Expand Down Expand Up @@ -103,6 +108,7 @@ impl Connection {
handler: options.and_then(|options| options.event_handler),
stream_description: None,
error: false,
pinned_sender: None,
};

Ok(conn)
Expand Down Expand Up @@ -286,6 +292,27 @@ impl Connection {
})
}

/// Pin the connection, removing it from the normal connection pool.
pub(crate) fn pin(&mut self) -> Result<PinnedConnectionHandle> {
if self.pinned_sender.is_some() {
return Err(Error::internal(format!(
"cannot pin an already-pinned connection (id = {})",
self.id
)));
}
if self.pool_manager.is_none() {
return Err(Error::internal(format!(
"cannot pin a checked-in connection (id = {})",
self.id
)));
}
let (tx, rx) = mpsc::channel(1);
self.pinned_sender = Some(tx);
Ok(PinnedConnectionHandle {
receiver: Arc::new(Mutex::new(rx)),
})
}

/// Close this connection, emitting a `ConnectionClosedEvent` with the supplied reason.
pub(super) fn close_and_drop(mut self, reason: ConnectionClosedReason) {
self.close(reason);
Expand Down Expand Up @@ -313,6 +340,7 @@ impl Connection {
error: self.error,
pool_manager: None,
ready_and_available_time: None,
pinned_sender: self.pinned_sender.clone(),
}
}
}
Expand All @@ -330,8 +358,36 @@ impl Drop for Connection {
// the `close_and_drop` helper explicitly, so we don't add it back to the
// pool or emit any events.
if let Some(pool_manager) = self.pool_manager.take() {
let dropped_connection = self.take();
if let Err(mut conn) = pool_manager.check_in(dropped_connection) {
let mut dropped_connection = self.take();
let result = if let Some(sender) = self.pinned_sender.as_mut() {
// Preserve the timestamp for pinned connections.
dropped_connection.ready_and_available_time = self.ready_and_available_time;
match sender.try_send(dropped_connection) {
Ok(()) => Ok(()),
// The connection has been unpinned and should be checked back in.
Err(mpsc::error::TrySendError::Closed(mut conn)) => {
conn.pinned_sender = None;
conn.ready_and_available_time = None;
pool_manager.check_in(conn)
}
// The connection is being returned to the pin holder while another connection
// is in the pin buffer; this should never happen. Only possible action is to
// check the connection back in.
Err(mpsc::error::TrySendError::Full(mut conn)) => {
// Panic in debug mode
if cfg!(debug_assertions) {
panic!("buffer full when attempting to return a pinned connection")
}
// TODO RUST-230 log an error in non-debug mode.
conn.pinned_sender = None;
conn.ready_and_available_time = None;
pool_manager.check_in(conn)
}
}
} else {
pool_manager.check_in(dropped_connection)
};
if let Err(mut conn) = result {
// the check in failed because the pool has been dropped, so we emit the event
// here and drop the connection.
conn.close(ConnectionClosedReason::PoolClosed);
Expand All @@ -340,6 +396,42 @@ impl Drop for Connection {
}
}

/// A handle to a pinned connection - the connection itself can be retrieved or returned to the
/// normal pool via this handle.
#[derive(Debug)]
pub(crate) struct PinnedConnectionHandle {
receiver: Arc<Mutex<mpsc::Receiver<Connection>>>,
}

impl PinnedConnectionHandle {
/// Make a new `PinnedConnectionHandle` that refers to the same connection as this one.
/// Use with care and only when "lending" a handle in a way that can't be expressed as a
/// normal borrow.
pub(crate) fn replicate(&self) -> Self {
Self {
receiver: self.receiver.clone(),
}
}

/// Retrieve the pinned connection, blocking until it's available for use. Will fail if the
/// connection has been unpinned.
pub(crate) async fn take_connection(&self) -> Result<Connection> {
let mut receiver = self.receiver.lock().await;
receiver
.recv()
.await
.ok_or_else(|| Error::internal("cannot take connection after unpin"))
}

/// Return the pinned connection to the normal connection pool.
pub(crate) async fn unpin_connection(&self) {
let mut receiver = self.receiver.lock().await;
receiver.close();
// Ensure any connections buffered in the channel are dropped, returning them to the pool.
while receiver.recv().await.is_some() {}
}
}

#[derive(Debug, Clone)]
pub(crate) enum ConnectionGeneration {
Normal(u32),
Expand Down
Loading