Skip to content

RUST-1712 Provide a connection pool warmup method #932

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Aug 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -612,6 +612,24 @@ impl Client {
self.inner.shutdown.executed.store(true, Ordering::SeqCst);
}

/// Add connections to the connection pool up to `min_pool_size`. This is normally not needed -
/// the connection pool will be filled in the background, and new connections created as needed
/// up to `max_pool_size`. However, it can sometimes be preferable to pay the (larger) cost of
/// creating new connections up-front so that individual operations execute as quickly as
/// possible.
///
/// Note that topology changes require rebuilding the connection pool, so this method cannot
/// guarantee that the pool will always be filled for the lifetime of the `Client`.
///
/// Does nothing if `min_pool_size` is unset or zero.
pub async fn warm_connection_pool(&self) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm open to naming suggestions here. "warm" seems a little vague but I couldn't think of a better one that wasn't super verbose.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah this seems fine to me. can you note in the comment for this method that nothing will happen if min_pool_size is 0?

if !self.inner.options.min_pool_size.map_or(false, |s| s > 0) {
// No-op when min_pool_size is zero.
return;
}
self.inner.topology.warm_pool().await;
}

/// Check in a server session to the server session pool. The session will be discarded if it is
/// expired or dirty.
pub(crate) async fn check_in_server_session(&self, session: ServerSession) {
Expand Down
3 changes: 3 additions & 0 deletions src/cmap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,9 @@ impl ConnectionPool {
ConnectionRequestResult::PoolCleared(e) => {
Err(Error::pool_cleared_error(&self.address, &e))
}
ConnectionRequestResult::PoolWarmed => {
Err(Error::internal("Invalid result from connection requester"))
}
};

match conn {
Expand Down
54 changes: 47 additions & 7 deletions src/cmap/connection_requester.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ pub(super) fn channel(handle: WorkerHandle) -> (ConnectionRequester, ConnectionR
/// the pool will stop servicing requests, drop its available connections, and close.
#[derive(Clone, Debug)]
pub(super) struct ConnectionRequester {
sender: mpsc::UnboundedSender<oneshot::Sender<ConnectionRequestResult>>,
sender: mpsc::UnboundedSender<ConnectionRequest>,
_handle: WorkerHandle,
}

Expand All @@ -34,33 +34,66 @@ impl ConnectionRequester {

// this only errors if the receiver end is dropped, which can't happen because
// we own a handle to the worker, keeping it alive.
self.sender.send(sender).unwrap();
self.sender
.send(ConnectionRequest {
sender,
warm_pool: false,
})
.unwrap();

// similarly, the receiver only returns an error if the sender is dropped, which
// can't happen due to the handle.
receiver.await.unwrap()
}

pub(super) fn weak(&self) -> WeakConnectionRequester {
WeakConnectionRequester {
sender: self.sender.clone(),
}
}
}

/// Handle for requesting Connections from the pool. This does *not* keep the
/// pool alive.
#[derive(Clone, Debug)]
pub(super) struct WeakConnectionRequester {
sender: mpsc::UnboundedSender<ConnectionRequest>,
}

impl WeakConnectionRequester {
pub(super) async fn request_warm_pool(&self) -> Option<ConnectionRequestResult> {
let (sender, receiver) = oneshot::channel();
if self
.sender
.send(ConnectionRequest {
sender,
warm_pool: true,
})
.is_err()
{
return None;
}
receiver.await.ok()
}
}

/// Receiving end of a given ConnectionRequester.
#[derive(Debug)]
pub(super) struct ConnectionRequestReceiver {
receiver: mpsc::UnboundedReceiver<oneshot::Sender<ConnectionRequestResult>>,
receiver: mpsc::UnboundedReceiver<ConnectionRequest>,
}

impl ConnectionRequestReceiver {
pub(super) async fn recv(&mut self) -> Option<ConnectionRequest> {
self.receiver
.recv()
.await
.map(|sender| ConnectionRequest { sender })
self.receiver.recv().await
}
}

/// Struct encapsulating a request for a connection.
#[derive(Debug)]
pub(super) struct ConnectionRequest {
sender: oneshot::Sender<ConnectionRequestResult>,
warm_pool: bool,
}

impl ConnectionRequest {
Expand All @@ -72,6 +105,10 @@ impl ConnectionRequest {
) -> std::result::Result<(), ConnectionRequestResult> {
self.sender.send(result)
}

pub(super) fn is_warm_pool(&self) -> bool {
self.warm_pool
}
}

#[derive(Debug)]
Expand All @@ -86,6 +123,9 @@ pub(super) enum ConnectionRequestResult {
/// The request was rejected because the pool was cleared before it could
/// be fulfilled. The error that caused the pool to be cleared is returned.
PoolCleared(Error),

/// The request set `warm_pool: true` and the pool has reached `min_pool_size`.
PoolWarmed,
}

impl ConnectionRequestResult {
Expand Down
85 changes: 65 additions & 20 deletions src/cmap/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use super::{
ConnectionRequestReceiver,
ConnectionRequestResult,
ConnectionRequester,
WeakConnectionRequester,
},
establish::ConnectionEstablisher,
manager,
Expand Down Expand Up @@ -110,6 +111,10 @@ pub(crate) struct ConnectionPoolWorker {
/// sender ends of this receiver drop, this worker will be notified and drop too.
handle_listener: WorkerHandleListener,

/// Sender for connection check out requests. Does not keep the worker alive the way
/// a `ConnectionRequeter` would since it doesn't hold a `WorkerHandle`.
weak_requester: WeakConnectionRequester,

/// Receiver for incoming connection check out requests.
request_receiver: ConnectionRequestReceiver,

Expand Down Expand Up @@ -218,6 +223,7 @@ impl ConnectionPoolWorker {
service_connection_count: HashMap::new(),
available_connections: VecDeque::new(),
max_pool_size,
weak_requester: connection_requester.weak(),
request_receiver,
wait_queue: Default::default(),
management_receiver,
Expand Down Expand Up @@ -312,6 +318,12 @@ impl ConnectionPoolWorker {
shutdown_ack = Some(ack);
break;
}
BroadcastMessage::FillPool => {
crate::runtime::execute(fill_pool(
self.weak_requester.clone(),
ack,
));
}
#[cfg(test)]
BroadcastMessage::SyncWorkers => {
ack.acknowledge(());
Expand Down Expand Up @@ -363,30 +375,39 @@ impl ConnectionPoolWorker {
}

fn check_out(&mut self, request: ConnectionRequest) {
// first attempt to check out an available connection
while let Some(mut conn) = self.available_connections.pop_back() {
// Close the connection if it's stale.
if conn.generation.is_stale(&self.generation) {
self.close_connection(conn, ConnectionClosedReason::Stale);
continue;
if request.is_warm_pool() {
if self.total_connection_count >= self.min_pool_size.unwrap_or(0) {
let _ = request.fulfill(ConnectionRequestResult::PoolWarmed);
return;
}
} else {
// first attempt to check out an available connection
while let Some(mut conn) = self.available_connections.pop_back() {
// Close the connection if it's stale.
if conn.generation.is_stale(&self.generation) {
self.close_connection(conn, ConnectionClosedReason::Stale);
continue;
}

// Close the connection if it's idle.
if conn.is_idle(self.max_idle_time) {
self.close_connection(conn, ConnectionClosedReason::Idle);
continue;
}
// Close the connection if it's idle.
if conn.is_idle(self.max_idle_time) {
self.close_connection(conn, ConnectionClosedReason::Idle);
continue;
}

conn.mark_as_in_use(self.manager.clone());
if let Err(request) = request.fulfill(ConnectionRequestResult::Pooled(Box::new(conn))) {
// checking out thread stopped listening, indicating it hit the WaitQueue
// timeout, so we put connection back into pool.
let mut connection = request.unwrap_pooled_connection();
connection.mark_as_available();
self.available_connections.push_back(connection);
}
conn.mark_as_in_use(self.manager.clone());
if let Err(request) =
request.fulfill(ConnectionRequestResult::Pooled(Box::new(conn)))
{
// checking out thread stopped listening, indicating it hit the WaitQueue
// timeout, so we put connection back into pool.
let mut connection = request.unwrap_pooled_connection();
connection.mark_as_available();
self.available_connections.push_back(connection);
}

return;
return;
}
}

// otherwise, attempt to create a connection.
Expand Down Expand Up @@ -669,6 +690,30 @@ async fn establish_connection(
establish_result.map_err(|e| e.cause)
}

async fn fill_pool(
requester: WeakConnectionRequester,
ack: crate::runtime::AcknowledgmentSender<()>,
) {
let mut establishing = vec![];
loop {
let result = requester.request_warm_pool().await;
match result {
None => break,
Some(ConnectionRequestResult::Establishing(handle)) => {
// Let connections finish establishing in parallel.
establishing.push(crate::runtime::spawn(async move {
let _ = handle.await;
// The connection is dropped here, returning it to the pool.
}));
}
_ => break,
};
}
// Wait for all connections to finish establishing before reporting completion.
futures_util::future::join_all(establishing).await;
ack.acknowledge(());
}

/// Enum modeling the possible pool states as described in the CMAP spec.
///
/// The "closed" state is omitted here because the pool considered closed only
Expand Down
2 changes: 1 addition & 1 deletion src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ mod worker_handle;
use std::{future::Future, net::SocketAddr, time::Duration};

pub(crate) use self::{
acknowledged_message::{AcknowledgedMessage, AcknowledgmentReceiver},
acknowledged_message::{AcknowledgedMessage, AcknowledgmentReceiver, AcknowledgmentSender},
join_handle::AsyncJoinHandle,
resolver::AsyncResolver,
stream::AsyncStream,
Expand Down
10 changes: 10 additions & 0 deletions src/sdam/topology.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,10 @@ impl Topology {
self.updater.shutdown().await;
}

pub(crate) async fn warm_pool(&self) {
self.updater.fill_pool().await;
}

/// Gets the addresses of the servers in the cluster.
#[cfg(test)]
pub(crate) fn server_addresses(&mut self) -> HashSet<ServerAddress> {
Expand Down Expand Up @@ -278,6 +282,7 @@ pub(crate) enum UpdateMessage {
#[derive(Debug, Clone)]
pub(crate) enum BroadcastMessage {
Shutdown,
FillPool,
#[cfg(test)]
SyncWorkers,
}
Expand Down Expand Up @@ -877,6 +882,11 @@ impl TopologyUpdater {
.await;
}

pub(crate) async fn fill_pool(&self) {
self.send_message(UpdateMessage::Broadcast(BroadcastMessage::FillPool))
.await;
}

#[cfg(test)]
pub(crate) async fn sync_workers(&self) {
self.send_message(UpdateMessage::Broadcast(BroadcastMessage::SyncWorkers))
Expand Down
19 changes: 19 additions & 0 deletions src/test/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -968,3 +968,22 @@ async fn manual_shutdown_immediate_with_resources() {
.is_empty());
assert!(events.get_command_started_events(&["delete"]).is_empty());
}

// Verifies that `Client::warm_connection_pool` succeeds.
#[cfg_attr(feature = "tokio-runtime", tokio::test)]
#[cfg_attr(feature = "async-std-runtime", async_std::test)]
async fn warm_connection_pool() {
let _guard = LOCK.run_exclusively().await;
let client = Client::test_builder()
.options({
let mut opts = CLIENT_OPTIONS.get().await.clone();
opts.min_pool_size = Some(10);
opts
})
.build()
.await;

client.warm_connection_pool().await;
// Validate that a command executes.
client.list_database_names(None, None).await.unwrap();
}