Skip to content

RUST-1585 Do not perform server selection to determine sessions support #854

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/client/csfle/state_machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use crate::{
error::{Error, Result},
operation::{RawOutput, RunCommand},
options::ReadConcern,
runtime::{AsyncStream, Process, TlsConfig},
runtime::{process::Process, AsyncStream, TlsConfig},
Client,
Namespace,
};
Expand Down
52 changes: 11 additions & 41 deletions src/client/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,7 @@ use crate::{
Retryability,
},
options::{ChangeStreamOptions, SelectionCriteria},
sdam::{
HandshakePhase,
SelectedServer,
ServerType,
SessionSupportStatus,
TopologyType,
TransactionSupportStatus,
},
sdam::{HandshakePhase, SelectedServer, ServerType, TopologyType, TransactionSupportStatus},
selection_criteria::ReadPreference,
ClusterTime,
};
Expand Down Expand Up @@ -349,8 +342,16 @@ impl Client {
}
};

if session.is_none() {
implicit_session = self.start_implicit_session(&op).await?;
if !conn.supports_sessions() && session.is_some() {
return Err(ErrorKind::SessionsNotSupported.into());
}

if conn.supports_sessions()
&& session.is_none()
&& op.supports_sessions()
&& op.is_acknowledged()
{
implicit_session = Some(ClientSession::new(self.clone(), None, true).await);
session = implicit_session.as_mut();
}

Expand Down Expand Up @@ -790,19 +791,6 @@ impl Client {
})
}

/// Start an implicit session if the operation and write concern are compatible with sessions.
async fn start_implicit_session<T: Operation>(&self, op: &T) -> Result<Option<ClientSession>> {
match self.get_session_support_status().await? {
SessionSupportStatus::Supported {
logical_session_timeout,
} if op.supports_sessions() && op.is_acknowledged() => Ok(Some(
self.start_session_with_timeout(logical_session_timeout, None, true)
.await,
)),
_ => Ok(None),
}
}

async fn select_data_bearing_server(&self, operation_name: &str) -> Result<()> {
let topology_type = self.inner.topology.topology_type();
let criteria = SelectionCriteria::Predicate(Arc::new(move |server_info| {
Expand All @@ -814,24 +802,6 @@ impl Client {
Ok(())
}

/// Gets whether the topology supports sessions, and if so, returns the topology's logical
/// session timeout. If it has yet to be determined if the topology supports sessions, this
/// method will perform a server selection that will force that determination to be made.
pub(crate) async fn get_session_support_status(&self) -> Result<SessionSupportStatus> {
let initial_status = self.inner.topology.session_support_status();

// Need to guarantee that we're connected to at least one server that can determine if
// sessions are supported or not.
match initial_status {
SessionSupportStatus::Undetermined => {
self.select_data_bearing_server(crate::client::SESSIONS_SUPPORT_OP_NAME)
.await?;
Ok(self.inner.topology.session_support_status())
}
_ => Ok(initial_status),
}
}

/// Gets whether the topology supports transactions. If it has yet to be determined if the
/// topology supports transactions, this method will perform a server selection that will force
/// that determination to be made.
Expand Down
54 changes: 7 additions & 47 deletions src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ use crate::{
SessionOptions,
},
results::DatabaseSpecification,
sdam::{server_selection, SelectedServer, SessionSupportStatus, Topology},
sdam::{server_selection, SelectedServer, Topology},
ClientSession,
};

Expand All @@ -56,8 +56,6 @@ pub(crate) use session::{ClusterTime, SESSIONS_UNSUPPORTED_COMMANDS};
use session::{ServerSession, ServerSessionPool};

const DEFAULT_SERVER_SELECTION_TIMEOUT: Duration = Duration::from_secs(30);
// TODO: RUST-1585 Remove this constant.
pub(crate) const SESSIONS_SUPPORT_OP_NAME: &str = "Check sessions support status";

/// This is the main entry point for the API. A `Client` is used to connect to a MongoDB cluster.
/// By default, it will monitor the topology of the cluster, keeping track of any changes, such
Expand Down Expand Up @@ -367,14 +365,7 @@ impl Client {
if let Some(ref options) = options {
options.validate()?;
}
match self.get_session_support_status().await? {
SessionSupportStatus::Supported {
logical_session_timeout,
} => Ok(self
.start_session_with_timeout(logical_session_timeout, options, false)
.await),
_ => Err(ErrorKind::SessionsNotSupported.into()),
}
Ok(ClientSession::new(self.clone(), options, false).await)
}

/// Starts a new [`ChangeStream`] that receives events for all changes in the cluster. The
Expand Down Expand Up @@ -428,42 +419,11 @@ impl Client {
.await
}

/// Check in a server session to the server session pool.
/// If the session is expired or dirty, or the topology no longer supports sessions, the session
/// will be discarded.
/// Check in a server session to the server session pool. The session will be discarded if it is
/// expired or dirty.
pub(crate) async fn check_in_server_session(&self, session: ServerSession) {
let session_support_status = self.inner.topology.session_support_status();
if let SessionSupportStatus::Supported {
logical_session_timeout,
} = session_support_status
{
self.inner
.session_pool
.check_in(session, logical_session_timeout)
.await;
}
}

/// Starts a `ClientSession`.
///
/// This method will attempt to re-use server sessions from the pool which are not about to
/// expire according to the provided logical session timeout. If no such sessions are
/// available, a new one will be created.
pub(crate) async fn start_session_with_timeout(
&self,
logical_session_timeout: Option<Duration>,
options: Option<SessionOptions>,
is_implicit: bool,
) -> ClientSession {
ClientSession::new(
self.inner
.session_pool
.check_out(logical_session_timeout)
.await,
self.clone(),
options,
is_implicit,
)
let timeout = self.inner.topology.logical_session_timeout();
self.inner.session_pool.check_in(session, timeout).await;
}

#[cfg(test)]
Expand Down Expand Up @@ -601,7 +561,7 @@ impl Client {
}

#[cfg(test)]
pub(crate) fn topology(&self) -> &crate::sdam::Topology {
pub(crate) fn topology(&self) -> &Topology {
&self.inner.topology
}

Expand Down
8 changes: 5 additions & 3 deletions src/client/session/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,13 +202,15 @@ pub(crate) enum TransactionPin {
}

impl ClientSession {
/// Creates a new `ClientSession` wrapping the provided server session.
pub(crate) fn new(
server_session: ServerSession,
/// Creates a new `ClientSession` by checking out a corresponding `ServerSession` from the
/// provided client's session pool.
pub(crate) async fn new(
client: Client,
options: Option<SessionOptions>,
is_implicit: bool,
) -> Self {
let timeout = client.inner.topology.logical_session_timeout();
let server_session = client.inner.session_pool.check_out(timeout).await;
Self {
client,
server_session,
Expand Down
2 changes: 2 additions & 0 deletions src/client/session/test/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,8 @@ async fn pool_is_lifo() {
let _guard: RwLockReadGuard<()> = LOCK.run_concurrently().await;

let client = TestClient::new().await;
// Wait for the implicit sessions created in TestClient::new to be returned to the pool.
runtime::delay_for(Duration::from_millis(500)).await;

if client.is_standalone() {
return;
Expand Down
8 changes: 8 additions & 0 deletions src/cmap/conn/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,14 @@ impl Connection {
pub(crate) fn is_streaming(&self) -> bool {
self.more_to_come
}

/// Whether the connection supports sessions.
pub(crate) fn supports_sessions(&self) -> bool {
self.stream_description
.as_ref()
.and_then(|sd| sd.logical_session_timeout)
.is_some()
}
}

impl Drop for Connection {
Expand Down
9 changes: 5 additions & 4 deletions src/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@ mod http;
#[cfg(feature = "async-std-runtime")]
mod interval;
mod join_handle;
#[cfg(feature = "in-use-encryption-unstable")]
mod process;
#[cfg(any(
feature = "in-use-encryption-unstable",
all(test, not(feature = "sync"), not(feature = "tokio-sync"))
))]
pub(crate) mod process;
mod resolver;
pub(crate) mod stream;
mod sync_read_ext;
Expand All @@ -16,8 +19,6 @@ mod worker_handle;

use std::{future::Future, net::SocketAddr, time::Duration};

#[cfg(feature = "in-use-encryption-unstable")]
pub(crate) use self::process::Process;
pub(crate) use self::{
acknowledged_message::AcknowledgedMessage,
join_handle::AsyncJoinHandle,
Expand Down
Loading