Skip to content

RUST-981 Merge new load balancer spec tests #480

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 24 commits into from
Oct 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 45 additions & 35 deletions src/client/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,13 @@ use std::{collections::HashSet, sync::Arc, time::Instant};
use super::{session::TransactionState, Client, ClientSession};
use crate::{
bson::Document,
cmap::{conn::PinnedConnectionHandle, Connection, RawCommand, RawCommandResponse},
cmap::{
conn::PinnedConnectionHandle,
Connection,
ConnectionPool,
RawCommand,
RawCommandResponse,
},
cursor::{session::SessionCursor, Cursor, CursorSpecification},
error::{
Error,
Expand Down Expand Up @@ -158,11 +164,13 @@ impl Client {
Op: Operation<O = CursorSpecification<T>>,
T: DeserializeOwned + Unpin + Send + Sync,
{
let mut details = self.execute_operation_with_details(op, session).await?;
let pinned = if details.output.connection.is_pinned() {
// Cursor operations on load-balanced transactions will be pinned via the transaction
// pin.
None
let mut details = self
.execute_operation_with_details(op, &mut *session)
.await?;

let pinned = if let Some(handle) = session.transaction.pinned_connection() {
// Cursor operations on a transaction share the same pinned connection.
Some(handle.replicate())
} else {
self.pin_connection_for_cursor(&mut details.output)?
};
Expand Down Expand Up @@ -225,20 +233,17 @@ impl Client {
}
};

let mut conn = match op.pinned_connection() {
Some(l) => l.take_connection().await?,
None => match server.pool.check_out().await {
Ok(c) => c,
Err(mut err) => {
err.add_labels_and_update_pin(None, &mut session, None)?;
let mut conn = match get_connection(&session, &op, &server.pool).await {
Ok(c) => c,
Err(mut err) => {
err.add_labels_and_update_pin(None, &mut session, None)?;

if err.is_pool_cleared() {
return self.execute_retry(&mut op, &mut session, None, err).await;
} else {
return Err(err);
}
if err.is_pool_cleared() {
return self.execute_retry(&mut op, &mut session, None, err).await;
} else {
return Err(err);
}
},
}
};

let retryability = self.get_retryability(&conn, &op, &session).await?;
Expand Down Expand Up @@ -326,23 +331,9 @@ impl Client {
}
};

let session_pinned = session
.as_ref()
.and_then(|s| s.transaction.pinned_connection());
let mut conn = match (session_pinned, op.pinned_connection()) {
(Some(c), None) | (None, Some(c)) => c.take_connection().await?,
(Some(c), Some(_)) => {
// An operation executing in a transaction should never have a pinned connection,
// but in case it does, prefer the transaction's pin.
if cfg!(debug_assertions) {
panic!("pinned operation executing in pinned transaction");
}
c.take_connection().await?
}
(None, None) => match server.pool.check_out().await {
Ok(c) => c,
Err(_) => return Err(first_error),
},
let mut conn = match get_connection(session, op, &server.pool).await {
Ok(c) => c,
Err(_) => return Err(first_error),
};

let retryability = self.get_retryability(&conn, op, session).await?;
Expand Down Expand Up @@ -776,6 +767,25 @@ impl Client {
}
}

async fn get_connection<T: Operation>(
session: &Option<&mut ClientSession>,
op: &T,
pool: &ConnectionPool,
) -> Result<Connection> {
let session_pinned = session
.as_ref()
.and_then(|s| s.transaction.pinned_connection());
match (session_pinned, op.pinned_connection()) {
(Some(c), None) | (None, Some(c)) => c.take_connection().await,
(Some(session_handle), Some(op_handle)) => {
// An operation executing in a transaction should be sharing the same pinned connection.
debug_assert_eq!(session_handle.id(), op_handle.id());
session_handle.take_connection().await
}
(None, None) => pool.check_out().await,
}
}

impl Error {
/// Adds the necessary labels to this Error, and unpins the session if needed.
///
Expand Down
13 changes: 2 additions & 11 deletions src/cmap/conn/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -345,11 +345,6 @@ impl Connection {
})
}

/// Whether this connection has a live `PinnedConnectionHandle`.
pub(crate) fn is_pinned(&self) -> bool {
self.pinned_sender.is_some()
}

/// Close this connection, emitting a `ConnectionClosedEvent` with the supplied reason.
pub(super) fn close_and_drop(mut self, reason: ConnectionClosedReason) {
self.close(reason);
Expand Down Expand Up @@ -470,12 +465,8 @@ impl PinnedConnectionHandle {
})
}

/// Return the pinned connection to the normal connection pool.
pub(crate) async fn unpin_connection(&self) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was previously called by cursors on completion, but with the connection potentially shared by a cursor and transaction the unpin now happens (implicitly) when the final PinnedConnectionHandle for a given Connection is dropped via the final Arc causing the channel endpoint to drop.

let mut receiver = self.receiver.lock().await;
receiver.close();
// Ensure any connections buffered in the channel are dropped, returning them to the pool.
while receiver.recv().await.is_some() {}
pub(crate) fn id(&self) -> u32 {
self.id
}
}

Expand Down
11 changes: 9 additions & 2 deletions src/cmap/test/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ const TEST_DESCRIPTIONS_TO_SKIP: &[&str] = &[
// WaitQueueTimeoutMS is not supported
"must aggressively timeout threads enqueued longer than waitQueueTimeoutMS",
"waiting on maxConnecting is limited by WaitQueueTimeoutMS",
// TODO DRIVERS-1785 remove this skip when test event order is fixed
"error during minPoolSize population clears pool",
];

/// Many different types of CMAP events are emitted from tasks spawned in the drop
Expand Down Expand Up @@ -165,9 +167,12 @@ impl Executor {
let manager = pool.manager.clone();
RUNTIME.execute(async move {
while let Some(update) = update_receiver.recv().await {
match update.into_message() {
ServerUpdate::Error { error, .. } => manager.clear(error.cause, None).await,
match update.message() {
ServerUpdate::Error { error, .. } => {
manager.clear(error.cause.clone(), None).await
}
}
drop(update);
}
});

Expand Down Expand Up @@ -426,6 +431,7 @@ async fn cmap_spec_tests() {

let mut options = CLIENT_OPTIONS.clone();
if options.load_balanced.unwrap_or(false) {
println!("skipping due to load balanced topology");
return;
}
options.hosts.drain(1..);
Expand All @@ -434,6 +440,7 @@ async fn cmap_spec_tests() {
if let Some(ref run_on) = test_file.run_on {
let can_run_on = run_on.iter().any(|run_on| run_on.can_run_on(&client));
if !can_run_on {
println!("skipping due to runOn requirements");
return;
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/cmap/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -646,6 +646,7 @@ async fn establish_connection(

match establish_result {
Err(ref e) => {
server_updater.handle_error(e.clone()).await;
if let Some(handler) = event_handler {
let event = ConnectionClosedEvent {
address,
Expand All @@ -654,7 +655,6 @@ async fn establish_connection(
};
handler.handle_connection_closed_event(event);
}
server_updater.handle_error(e.clone()).await;
manager.handle_connection_failed();
}
Ok(ref mut connection) => {
Expand Down
2 changes: 1 addition & 1 deletion src/coll/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -920,7 +920,7 @@ pub struct ListIndexesOptions {
pub max_time: Option<Duration>,

/// The number of indexes the server should return per cursor batch.
#[serde(default, serialize_with = "bson_util::serialize_u32_option_as_i32")]
#[serde(default, skip_serializing)]
pub batch_size: Option<u32>,
}

Expand Down
7 changes: 0 additions & 7 deletions src/cursor/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,12 +283,6 @@ impl PinnedConnection {
matches!(self, Self::Invalid(_))
}

async fn unpin(&self) {
if let Some(h) = self.handle() {
h.unpin_connection().await;
}
}

fn invalidate(&mut self) {
take_mut::take(self, |self_| {
if let Self::Valid(c) = self_ {
Expand Down Expand Up @@ -318,6 +312,5 @@ pub(super) fn kill_cursor(
let _ = tx.send(());
}
}
pinned_conn.unpin().await;
});
}
3 changes: 3 additions & 0 deletions src/operation/list_indexes/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ impl Operation for ListIndexes {
let mut body = doc! {
"listIndexes": self.ns.coll.clone(),
};
if let Some(size) = self.options.as_ref().and_then(|o| o.batch_size) {
body.insert("cursor", doc! { "batchSize": size });
}
append_options(&mut body, self.options.as_ref())?;

Ok(Command::new(
Expand Down
4 changes: 3 additions & 1 deletion src/operation/list_indexes/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ async fn build() {
doc! {
"listIndexes": "test_coll",
"maxTimeMS": 42,
"batchSize": 4,
"cursor": doc! {
"batchSize": 4,
},
}
);
}
Expand Down
6 changes: 3 additions & 3 deletions src/runtime/acknowledged_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ impl<M, R> AcknowledgedMessage<M, R> {
)
}

/// Get the message.
pub(crate) fn into_message(self) -> M {
self.message
/// Borrow the message.
pub(crate) fn message(&self) -> &M {
&self.message
}

/// Send acknowledgement to the receiver.
Expand Down
8 changes: 2 additions & 6 deletions src/sdam/description/topology/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,12 +261,8 @@ impl TopologyDescription {
match read_preference {
ReadPreference::Secondary { .. }
| ReadPreference::PrimaryPreferred { .. }
| ReadPreference::Nearest { .. } => {
command.set_read_preference(read_preference.clone())
}
ReadPreference::SecondaryPreferred { ref options }
if options.max_staleness.is_some() || options.tag_sets.is_some() =>
{
| ReadPreference::Nearest { .. }
| ReadPreference::SecondaryPreferred { .. } => {
command.set_read_preference(read_preference.clone())
}
_ => {}
Expand Down
11 changes: 9 additions & 2 deletions src/sdam/monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,13 +255,20 @@ impl UpdateMonitor {
_ => return,
};

match update.into_message() {
// This needs to borrow the message rather than taking it so the update isn't sent
// until after the topology has processed the error.
match update.message() {
ServerUpdate::Error { error } => {
topology
.handle_application_error(error.cause, error.handshake_phase, &server)
.handle_application_error(
error.cause.clone(),
error.handshake_phase.clone(),
&server,
)
.await;
}
}
drop(update);
}
}
}
2 changes: 1 addition & 1 deletion src/selection_criteria.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ impl<'de> Deserialize<'de> for ReadPreference {
"PrimaryPreferred" => Ok(ReadPreference::PrimaryPreferred {
options: preference.options,
}),
"SecondaryPreferred" => Ok(ReadPreference::SecondaryPreferred {
"SecondaryPreferred" | "secondaryPreferred" => Ok(ReadPreference::SecondaryPreferred {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One of the spec tests uses "secondaryPreferred" as a value; not sure whether this was meant to be case sensitive or not.

options: preference.options,
}),
"Nearest" => Ok(ReadPreference::Nearest {
Expand Down
68 changes: 68 additions & 0 deletions src/test/spec/json/load-balancers/README.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
===========================
Load Balancer Support Tests
===========================

.. contents::

----

Introduction
============

This document describes how drivers should create load balanced clusters for
testing and how tests should be executed for such clusters.

Testing Requirements
====================

For each server version that supports load balanced clusters, drivers MUST
add two Evergreen tasks: one with a sharded cluster with both authentication
and TLS enabled and one with a sharded cluster with authentication and TLS
disabled. In each task, the sharded cluster MUST be configured with two
mongos nodes running on localhost ports 27017 and 27018. The shard and config
servers may run on any free ports. Each task MUST also start up two TCP load
balancers operating in round-robin mode: one fronting both mongos servers and
one fronting a single mongos.

Load Balancer Configuration
---------------------------

Drivers MUST use the ``run-load-balancer.sh`` script in
``drivers-evergreen-tools`` to start the TCP load balancers for Evergreen
tasks. This script MUST be run after the backing sharded cluster has already
been started. The script writes the URIs of the load balancers to a YAML
expansions file, which can be read by drivers via the ``expansions.update``
Evergreen command. This will store the URIs into the ``SINGLE_MONGOS_LB_URI``
and ``MULTI_MONGOS_LB_URI`` environment variables.

Test Runner Configuration
-------------------------

If the backing sharded cluster is configured with TLS enabled, drivers MUST
add the relevant TLS options to both ``SINGLE_MONGOS_LB_URI`` and
``MULTI_MONGOS_LB_URI`` to ensure that test clients can connect to the
cluster. Drivers MUST use the final URI stored in ``SINGLE_MONGOS_LB_URI``
(with additional TLS options if required) to configure internal clients for
test runners (e.g. the internal MongoClient described by the `Unified Test
Format spec <../../unified-test-format/unified-test-format.rst>`__).

In addition to modifying load balancer URIs, drivers MUST also mock server
support for returning a ``serviceId`` field in ``hello`` or legacy ``hello``
command responses when running tests against a load-balanced cluster. This
can be done by using the value of ``topologyVersion.processId`` to set
``serviceId``. This MUST be done for all connections established by the test
runner, including those made by any internal clients.

Tests
======

The YAML and JSON files in this directory contain platform-independent tests
written in the `Unified Test Format
<../../unified-test-format/unified-test-format.rst>`_. Drivers MUST run the
following test suites against a load balanced cluster:

#. All test suites written in the Unified Test Format
#. Retryable Reads
#. Retryable Writes
#. Change Streams
#. Initial DNS Seedlist Discovery
Loading