Skip to content

RUST-951 Enable sessions on load balancer connections #421

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Aug 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ impl Client {
/// available, a new one will be created.
pub(crate) async fn start_session_with_timeout(
&self,
logical_session_timeout: Duration,
logical_session_timeout: Option<Duration>,
options: Option<SessionOptions>,
is_implicit: bool,
) -> ClientSession {
Expand Down
8 changes: 6 additions & 2 deletions src/client/session/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -600,8 +600,12 @@ impl ServerSession {
}

/// Determines if this server session is about to expire in a short amount of time (1 minute).
fn is_about_to_expire(&self, logical_session_timeout: Duration) -> bool {
let expiration_date = self.last_use + logical_session_timeout;
fn is_about_to_expire(&self, logical_session_timeout: Option<Duration>) -> bool {
let timeout = match logical_session_timeout {
Some(t) => t,
None => return false,
};
let expiration_date = self.last_use + timeout;
expiration_date < Instant::now() + Duration::from_secs(60)
}
}
11 changes: 9 additions & 2 deletions src/client/session/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@ impl ServerSessionPool {
/// Checks out a server session from the pool. Before doing so, it first clears out all the
/// expired sessions. If there are no sessions left in the pool after clearing expired ones
/// out, a new session will be created.
pub(crate) async fn check_out(&self, logical_session_timeout: Duration) -> ServerSession {
pub(crate) async fn check_out(
&self,
logical_session_timeout: Option<Duration>,
) -> ServerSession {
let mut pool = self.pool.lock().await;
while let Some(session) = pool.pop_front() {
// If a session is about to expire within the next minute, remove it from pool.
Expand All @@ -37,7 +40,11 @@ impl ServerSessionPool {
/// discarded.
///
/// This method will also clear out any expired session from the pool before checking in.
pub(crate) async fn check_in(&self, session: ServerSession, logical_session_timeout: Duration) {
pub(crate) async fn check_in(
&self,
session: ServerSession,
logical_session_timeout: Option<Duration>,
) {
let mut pool = self.pool.lock().await;
while let Some(pooled_session) = pool.pop_back() {
if session.is_about_to_expire(logical_session_timeout) {
Expand Down
1 change: 1 addition & 0 deletions src/sdam/description/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ impl ServerType {
| ServerType::RsPrimary
| ServerType::RsSecondary
| ServerType::Mongos
| ServerType::LoadBalancer
)
}
}
Expand Down
22 changes: 16 additions & 6 deletions src/sdam/description/topology/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,14 +124,22 @@ impl TopologyDescription {
})
.collect();

let session_support_status = if topology_type == TopologyType::LoadBalanced {
SessionSupportStatus::Supported {
logical_session_timeout: None,
}
} else {
SessionSupportStatus::Undetermined
};

Ok(Self {
single_seed: servers.len() == 1,
topology_type,
set_name: options.repl_set_name,
max_set_version: None,
max_election_id: None,
compatibility_error: None,
session_support_status: SessionSupportStatus::Undetermined,
session_support_status,
transaction_support_status: TransactionSupportStatus::Undetermined,
cluster_time: None,
local_threshold: options.local_threshold,
Expand Down Expand Up @@ -279,12 +287,12 @@ impl TopologyDescription {
logical_session_timeout: topology_timeout,
} => {
self.session_support_status = SessionSupportStatus::Supported {
logical_session_timeout: std::cmp::min(timeout, topology_timeout),
logical_session_timeout: std::cmp::min(Some(timeout), topology_timeout),
};
}
SessionSupportStatus::Undetermined => {
self.session_support_status = SessionSupportStatus::Supported {
logical_session_timeout: timeout,
logical_session_timeout: Some(timeout),
}
}
SessionSupportStatus::Unsupported { .. } => {
Expand All @@ -301,7 +309,7 @@ impl TopologyDescription {
match min_timeout {
Some(timeout) => {
self.session_support_status = SessionSupportStatus::Supported {
logical_session_timeout: timeout,
logical_session_timeout: Some(timeout),
}
}
None => {
Expand Down Expand Up @@ -721,7 +729,9 @@ pub(crate) enum SessionSupportStatus {

/// Sessions are supported by this topology. This is the minimum timeout of all data-bearing
/// servers in the deployment.
Supported { logical_session_timeout: Duration },
Supported {
logical_session_timeout: Option<Duration>,
},
}

impl Default for SessionSupportStatus {
Expand All @@ -740,7 +750,7 @@ impl SessionSupportStatus {
} => *logical_session_timeout,
Self::Supported {
logical_session_timeout,
} => Some(*logical_session_timeout),
} => *logical_session_timeout,
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/test/util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ impl TestClient {
// To avoid populating the session pool with leftover implicit sessions, we check out a
// session here and immediately mark it as dirty, then use it with any operations we need.
let mut session = client
.start_session_with_timeout(Duration::from_secs(60 * 60), None, true)
.start_session_with_timeout(Some(Duration::from_secs(60 * 60)), None, true)
.await;
session.mark_dirty();

Expand Down