Skip to content

RUST-978 Unified test format additions in support of load balanced tests #465

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 35 commits into from
Sep 23, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
2a743db
use enum for observed events
abr-egn Sep 13, 2021
bd5c283
allow more than just command events
abr-egn Sep 13, 2021
6e2e057
record and match against cmap events
abr-egn Sep 13, 2021
edb929e
expected command event changes
abr-egn Sep 13, 2021
7e8aa81
createFindCursor test operation
abr-egn Sep 15, 2021
df28cc3
deserialize_op helper
abr-egn Sep 15, 2021
eb297a1
iterateUntilDocumentOrError
abr-egn Sep 15, 2021
919ecba
close cursor test operation
abr-egn Sep 15, 2021
fef89f9
use alternate uris for load balanced setup
abr-egn Sep 15, 2021
9b32d9a
expected cmap events
abr-egn Sep 16, 2021
691dd4f
refactor to support ignoring test operation results
abr-egn Sep 16, 2021
a36a360
assertNumberConnectionsCheckedOut test operation
abr-egn Sep 16, 2021
6627311
unified test format invalid checker and first test in such
abr-egn Sep 16, 2021
75854e7
more invalid tests
abr-egn Sep 16, 2021
cf3f07f
invalid tests, continued
abr-egn Sep 16, 2021
3b49f57
invalid testing continues, now with a skip
abr-egn Sep 16, 2021
e336f64
one last invalid test
abr-egn Sep 16, 2021
c5f7399
a valid test, and fix for ExpectedCommandEvent
abr-egn Sep 16, 2021
3022636
another valid test, and actually match expected cmap events
abr-egn Sep 17, 2021
72e0280
remove debug prints
abr-egn Sep 17, 2021
c04c464
remove more debug
abr-egn Sep 17, 2021
04e2d3d
fix count name
abr-egn Sep 17, 2021
ca69b9a
skip some more invalid tests
abr-egn Sep 17, 2021
10a3ad9
wip
abr-egn Sep 17, 2021
f6da6dd
wait for kill on test operation cursor close
abr-egn Sep 20, 2021
99fecef
last two tests
abr-egn Sep 20, 2021
9c6fe57
remove debug prints
abr-egn Sep 20, 2021
2d7b6c8
rustfmt and clippy
abr-egn Sep 20, 2021
00b3cb8
more clippy
abr-egn Sep 20, 2021
e70ea7e
sync a few more tests
abr-egn Sep 20, 2021
919315c
comments
abr-egn Sep 20, 2021
6e58800
introduce syncpoint for test events
abr-egn Sep 20, 2021
09832c6
review
abr-egn Sep 20, 2021
66d1df8
embed => replicate
abr-egn Sep 21, 2021
a4f16ba
populate has_service_id
abr-egn Sep 23, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -360,4 +360,9 @@ impl Client {
.map(|stream_address| format!("{}", stream_address))
.collect()
}

#[cfg(test)]
pub(crate) async fn sync_workers(&self) {
self.inner.topology.sync_workers().await;
}
}
14 changes: 14 additions & 0 deletions src/cmap/manager.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use tokio::sync::mpsc;
#[cfg(test)]
use tokio::sync::oneshot;

use super::Connection;
use crate::{bson::oid::ObjectId, error::Error, runtime::AcknowledgedMessage};
Expand Down Expand Up @@ -72,6 +74,14 @@ impl PoolManager {
.sender
.send(PoolManagementRequest::HandleConnectionSucceeded(conn));
}

/// Create a synchronization point for the pool's worker.
#[cfg(test)]
pub(super) fn sync_worker(&self) -> oneshot::Receiver<()> {
let (tx, rx) = oneshot::channel();
let _ = self.sender.send(PoolManagementRequest::Sync(tx));
rx
}
}

#[derive(Debug)]
Expand Down Expand Up @@ -108,6 +118,10 @@ pub(super) enum PoolManagementRequest {
/// Update the pool after a successful connection, optionally populating the pool
/// with the successful connection.
HandleConnectionSucceeded(ConnectionSucceeded),

/// Synchronize the worker queue state with an external caller, i.e. a test.
#[cfg(test)]
Sync(oneshot::Sender<()>),
}

impl PoolManagementRequest {
Expand Down
7 changes: 7 additions & 0 deletions src/cmap/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ mod worker;
use std::sync::Arc;

use derivative::Derivative;
#[cfg(test)]
use tokio::sync::oneshot;

pub use self::conn::ConnectionInfo;
pub(crate) use self::{
Expand Down Expand Up @@ -172,4 +174,9 @@ impl ConnectionPool {
pub(crate) fn generation(&self) -> PoolGeneration {
self.generation_subscriber.generation()
}

#[cfg(test)]
pub(crate) fn sync_worker(&self) -> oneshot::Receiver<()> {
self.manager.sync_worker()
}
}
4 changes: 4 additions & 0 deletions src/cmap/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,10 @@ impl ConnectionPoolWorker {
PoolTask::HandleManagementRequest(
PoolManagementRequest::HandleConnectionFailed,
) => self.handle_connection_failed(),
#[cfg(test)]
PoolTask::HandleManagementRequest(PoolManagementRequest::Sync(tx)) => {
let _ = tx.send(());
}
PoolTask::Maintenance => {
self.perform_maintenance();
}
Expand Down
7 changes: 7 additions & 0 deletions src/cursor/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ use std::{
use derivative::Derivative;
use futures_core::{Future, Stream};
use serde::de::DeserializeOwned;
#[cfg(test)]
use tokio::sync::oneshot;

use crate::{
bson::Document,
Expand Down Expand Up @@ -303,13 +305,18 @@ pub(super) fn kill_cursor(
ns: &Namespace,
cursor_id: i64,
pinned_conn: PinnedConnection,
#[cfg(test)] kill_watcher: Option<oneshot::Sender<()>>,
) {
let coll = client
.database(ns.db.as_str())
.collection::<Document>(ns.coll.as_str());
RUNTIME.execute(async move {
if !pinned_conn.is_invalid() {
let _ = coll.kill_cursor(cursor_id, pinned_conn.handle()).await;
#[cfg(test)]
if let Some(tx) = kill_watcher {
let _ = tx.send(());
}
}
pinned_conn.unpin().await;
});
Expand Down
21 changes: 21 additions & 0 deletions src/cursor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ use std::{

use futures_core::{future::BoxFuture, Stream};
use serde::de::DeserializeOwned;
#[cfg(test)]
use tokio::sync::oneshot;

use crate::{
cmap::conn::PinnedConnectionHandle,
Expand Down Expand Up @@ -84,6 +86,8 @@ where
{
client: Client,
wrapped_cursor: ImplicitSessionCursor<T>,
#[cfg(test)]
kill_watcher: Option<oneshot::Sender<()>>,
_phantom: std::marker::PhantomData<T>,
}

Expand All @@ -107,9 +111,24 @@ where
PinnedConnection::new(pin),
provider,
),
#[cfg(test)]
kill_watcher: None,
_phantom: Default::default(),
}
}

/// Some tests need to be able to observe the events generated by `killCommand` execution;
/// however, because that happens asynchronously on `drop`, the test runner can conclude before
/// the event is published. To fix that, tests can set a "kill watcher" on cursors - a
/// one-shot channel with a `()` value pushed after `killCommand` is run that the test can wait
/// on.
#[cfg(test)]
pub(crate) fn set_kill_watcher(&mut self, tx: oneshot::Sender<()>) {
if self.kill_watcher.is_some() {
panic!("cursor already has a kill_watcher");
}
self.kill_watcher = Some(tx);
}
}

impl<T> Stream for Cursor<T>
Expand Down Expand Up @@ -137,6 +156,8 @@ where
self.wrapped_cursor.namespace(),
self.wrapped_cursor.id(),
self.wrapped_cursor.pinned_connection().replicate(),
#[cfg(test)]
self.kill_watcher.take(),
);
}
}
Expand Down
21 changes: 21 additions & 0 deletions src/cursor/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ use std::{
use futures_core::{future::BoxFuture, Stream};
use futures_util::StreamExt;
use serde::de::DeserializeOwned;
#[cfg(test)]
use tokio::sync::oneshot;

use super::common::{
kill_cursor,
Expand Down Expand Up @@ -64,6 +66,8 @@ where
info: CursorInformation,
buffer: VecDeque<T>,
pinned_connection: PinnedConnection,
#[cfg(test)]
kill_watcher: Option<oneshot::Sender<()>>,
}

impl<T> SessionCursor<T>
Expand All @@ -83,6 +87,8 @@ where
info: spec.info,
buffer: spec.initial_buffer,
pinned_connection: PinnedConnection::new(pinned),
#[cfg(test)]
kill_watcher: None,
}
}

Expand Down Expand Up @@ -177,6 +183,19 @@ where
pub async fn next(&mut self, session: &mut ClientSession) -> Option<Result<T>> {
self.stream(session).next().await
}

/// Some tests need to be able to observe the events generated by `killCommand` execution;
/// however, because that happens asynchronously on `drop`, the test runner can conclude before
/// the event is published. To fix that, tests can set a "kill watcher" on cursors - a
/// one-shot channel with a `()` value pushed after `killCommand` is run that the test can wait
/// on.
#[cfg(test)]
pub(crate) fn set_kill_watcher(&mut self, tx: oneshot::Sender<()>) {
if self.kill_watcher.is_some() {
panic!("cursor already has a kill_watcher");
}
self.kill_watcher = Some(tx);
}
}

impl<T> Drop for SessionCursor<T>
Expand All @@ -193,6 +212,8 @@ where
&self.info.ns,
self.info.id,
self.pinned_connection.replicate(),
#[cfg(test)]
self.kill_watcher.take(),
);
}
}
Expand Down
8 changes: 3 additions & 5 deletions src/operation/count/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,10 @@ impl Operation for Count {
const NAME: &'static str = "count";

fn build(&mut self, description: &StreamDescription) -> Result<Command> {
let mut name = Self::NAME.to_string();
let mut body = match description.max_wire_version {
Some(v) if v >= SERVER_4_9_0_WIRE_VERSION => {
name = "aggregate".to_string();
doc! {
"aggregate": self.ns.coll.clone(),
"pipeline": [
Expand All @@ -74,11 +76,7 @@ impl Operation for Count {

append_options(&mut body, self.options.as_ref())?;

Ok(Command::new(
Self::NAME.to_string(),
self.ns.db.clone(),
body,
))
Ok(Command::new(name, self.ns.db.clone(), body))
}

fn handle_response(
Expand Down
17 changes: 17 additions & 0 deletions src/sdam/state/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ use std::{
},
};

#[cfg(test)]
use futures_util::stream::{FuturesUnordered, StreamExt};
use tokio::sync::{RwLock, RwLockWriteGuard};

use self::server::Server;
Expand Down Expand Up @@ -532,6 +534,11 @@ impl Topology {
.map(|(addr, server)| (addr.clone(), Arc::downgrade(server)))
.collect()
}

#[cfg(test)]
pub(crate) async fn sync_workers(&self) {
self.state.read().await.sync_workers().await;
}
}

impl WeakTopology {
Expand Down Expand Up @@ -680,6 +687,16 @@ impl TopologyState {

self.servers.retain(|host, _| hosts.contains(host));
}

#[cfg(test)]
async fn sync_workers(&self) {
let rxen: FuturesUnordered<_> = self
.servers
.values()
.map(|v| v.pool.sync_worker())
.collect();
let _: Vec<_> = rxen.collect().await;
}
}

/// Enum describing a point in time during an operation's execution relative to when the MongoDB
Expand Down
4 changes: 4 additions & 0 deletions src/test/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ lazy_static! {
};
pub(crate) static ref SERVERLESS: bool =
matches!(std::env::var("SERVERLESS"), Ok(s) if s == "serverless");
pub(crate) static ref LOAD_BALANCED_SINGLE_URI: Option<String> =
std::env::var("MONGODB_LOAD_BALANCED_SINGLE_URI").ok();
pub(crate) static ref LOAD_BALANCED_MULTIPLE_URI: Option<String> =
std::env::var("MONGODB_LOAD_BALANCED_MULTIPLE_URI").ok();
}

fn get_default_uri() -> String {
Expand Down
15 changes: 14 additions & 1 deletion src/test/spec/command_monitoring/operation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@ impl TestOperation for DeleteOne {
#[serde(deny_unknown_fields)]
pub(super) struct Find {
filter: Option<Document>,
// `FindOptions` cannot be embedded directly because serde doesn't support combining `flatten`
// and `deny_unknown_fields`, so its fields are replicated here.
#[serde(default)]
sort: Option<Document>,
#[serde(default)]
Expand Down Expand Up @@ -149,6 +151,8 @@ impl TestOperation for Find {

fn execute(&self, collection: Collection<Document>) -> BoxFuture<Result<()>> {
async move {
// `FindOptions` is constructed without the use of `..Default::default()` to enforce at
// compile-time that any new fields added there need to be considered here.
let options = FindOptions {
sort: self.sort.clone(),
skip: self.skip,
Expand All @@ -161,7 +165,16 @@ impl TestOperation for Find {
max: self.max.clone(),
return_key: self.return_key,
show_record_id: self.show_record_id,
..Default::default()
allow_disk_use: None,
allow_partial_results: None,
cursor_type: None,
max_await_time: None,
max_scan: None,
no_cursor_timeout: None,
projection: None,
read_concern: None,
selection_criteria: None,
collation: None,
};

let mut cursor = collection.find(self.filter.clone(), options).await?;
Expand Down
10 changes: 5 additions & 5 deletions src/test/spec/connection_stepdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ async fn get_more() {
}

RUNTIME.delay_for(Duration::from_millis(250)).await;
assert!(client.get_pool_cleared_events().is_empty());
assert_eq!(client.count_pool_cleared_events(), 0);
}

run_test(function_name!(), get_more_test).await;
Expand Down Expand Up @@ -155,7 +155,7 @@ async fn not_master_keep_pool() {
.expect("insert should have succeeded");

RUNTIME.delay_for(Duration::from_millis(250)).await;
assert!(client.get_pool_cleared_events().is_empty());
assert_eq!(client.count_pool_cleared_events(), 0);
}

run_test(function_name!(), not_master_keep_pool_test).await;
Expand Down Expand Up @@ -201,7 +201,7 @@ async fn not_master_reset_pool() {
);

RUNTIME.delay_for(Duration::from_millis(250)).await;
assert!(client.get_pool_cleared_events().len() == 1);
assert_eq!(client.count_pool_cleared_events(), 1);

coll.insert_one(doc! { "test": 1 }, None)
.await
Expand Down Expand Up @@ -250,7 +250,7 @@ async fn shutdown_in_progress() {
);

RUNTIME.delay_for(Duration::from_millis(250)).await;
assert!(client.get_pool_cleared_events().len() == 1);
assert_eq!(client.count_pool_cleared_events(), 1);

coll.insert_one(doc! { "test": 1 }, None)
.await
Expand Down Expand Up @@ -299,7 +299,7 @@ async fn interrupted_at_shutdown() {
);

RUNTIME.delay_for(Duration::from_millis(250)).await;
assert!(client.get_pool_cleared_events().len() == 1);
assert_eq!(client.count_pool_cleared_events(), 1);

coll.insert_one(doc! { "test": 1 }, None)
.await
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
{
"description": "expectedCmapEvent-connectionCheckOutFailedEvent-reason-type",
"schemaVersion": "1.3",
"tests": [
{
"description": "foo",
"operations": [],
"expectEvents": [
{
"client": "client0",
"eventType": "cmap",
"events": [
{
"connectionCheckOutFailedEvent": {
"reason": 10
}
}
]
}
]
}
]
}
Loading