Skip to content

Commit

Permalink
Merge pull request #54 from palfrey/parallel
Browse files Browse the repository at this point in the history
Parallel test running support
  • Loading branch information
palfrey authored Jun 4, 2022
2 parents 0960613 + 9c90183 commit c422658
Show file tree
Hide file tree
Showing 13 changed files with 487 additions and 158 deletions.
15 changes: 12 additions & 3 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,20 @@ jobs:
uses: actions-rs/cargo@v1.0.3
with:
command: test
args: --features ${{ matrix.features }}
args: --features ${{ matrix.features }} -- --nocapture
env:
RUST_TEST_THREADS: 3 # So the parallel tests have enough threads
RUST_LOG: debug
if: ${{ matrix.features != 'all' }}
- name: Build and test all features
uses: actions-rs/cargo@v1.0.3
with:
command: test
args: --all-features
args: --all-features -- --nocapture
if: ${{ matrix.features == 'all' }}
env:
RUST_TEST_THREADS: 3 # So the parallel tests have enough threads
RUST_LOG: debug

multi-os-testing:
name: Test suite
Expand All @@ -81,7 +87,10 @@ jobs:
uses: actions-rs/cargo@v1.0.3
with:
command: test
args: --all-features
args: --all-features -- --nocapture
env:
RUST_TEST_THREADS: 3 # So the parallel tests have enough threads
RUST_LOG: debug

minimal-versions:
name: minimal versions check
Expand Down
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,14 @@ async fn test_serial_another() {
// Do things asynchronously
}
```
Multiple tests with the `serial` attribute are guaranteed to be executed in serial. Ordering of the tests is not guaranteed however.
Tests without the `serial` attribute may run at any time, including in parallel to tests marked as `serial`. Note that if you're using
an async test reactor attribute (e.g. `tokio::test` or `actix_rt::test`) then they should be listed *before* `serial`, otherwise we
Multiple tests with the `serial` attribute are guaranteed to be executed in serial. Ordering of the tests is not guaranteed however. Other tests with the `parallel` attribute may run at the same time as each other, but not at the same time as a test with `serial`. Tests with neither attribute may run at any time and no guarantees are made about their timing!

Note that if you're using an async test reactor attribute (e.g. `tokio::test` or `actix_rt::test`) then they should be listed *before* `serial`, otherwise we
don't get an async function and things break. There's now an error for this case to improve debugging.

For cases like doctests and integration tests where the tests are run as separate processes, we also support `file_serial`, with
similar properties but based off file locking. Note that there are no guarantees about one test with `serial` and another with
`file_serial` as they lock using different methods.
`file_serial` as they lock using different methods, and `parallel` doesn't support `file_serial` yet (patches welcomed!).

## Usage
We require at least Rust 1.51. Upgrades to this will require at least a minor version bump (while in 0.x versions) and a major version bump post-1.0.
Expand Down
1 change: 1 addition & 0 deletions serial_test/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ parking_lot = "^0.12"
serial_test_derive = { version = "~0.7.0", path = "../serial_test_derive" }
fslock = {version = "0.2", optional = true}
document-features = {version = "0.2", optional=true}
log = "0.4"

[dev-dependencies]
itertools = "0.10"
Expand Down
169 changes: 48 additions & 121 deletions serial_test/src/code_lock.rs
Original file line number Diff line number Diff line change
@@ -1,29 +1,39 @@
use crate::rwlock::{Locks, MutexGuardWrapper};
use lazy_static::lazy_static;
use parking_lot::{Mutex, ReentrantMutex, ReentrantMutexGuard, RwLock};
use log::debug;
use parking_lot::{Mutex, RwLock};
use std::{
cell::RefCell,
collections::HashMap,
ops::{Deref, DerefMut},
sync::{atomic::AtomicU32, Arc},
time::Duration,
time::{Duration, Instant},
};

struct UniqueReentrantMutex {
mutex: ReentrantMutex<()>,
pub(crate) struct UniqueReentrantMutex {
locks: Locks,

// Only actually used for tests
#[allow(dead_code)]
id: u32,
pub(crate) id: u32,
}

impl UniqueReentrantMutex {
fn lock(&self) -> ReentrantMutexGuard<()> {
self.mutex.lock()
pub(crate) fn lock(&self) -> MutexGuardWrapper {
self.locks.serial()
}

pub(crate) fn start_parallel(&self) {
self.locks.start_parallel();
}

pub(crate) fn end_parallel(&self) {
self.locks.end_parallel();
}
}

lazy_static! {
static ref LOCKS: Arc<RwLock<HashMap<String, UniqueReentrantMutex>>> =
pub(crate) static ref LOCKS: Arc<RwLock<HashMap<String, UniqueReentrantMutex>>> =
Arc::new(RwLock::new(HashMap::new()));
static ref MAX_WAIT: Arc<Mutex<RefCell<Duration>>> =
Arc::new(Mutex::new(RefCell::new(Duration::from_secs(60))));
Expand All @@ -33,14 +43,14 @@ lazy_static! {
impl Default for UniqueReentrantMutex {
fn default() -> Self {
Self {
mutex: Default::default(),
locks: Locks::new(),
id: MUTEX_ID.fetch_add(1, std::sync::atomic::Ordering::SeqCst),
}
}
}

/// Sets the maximum amount of time the serial locks will wait to unlock
/// By default, this is set to 60 seconds, which is almost always much longer than is needed
/// Sets the maximum amount of time the serial locks will wait to unlock.
/// By default, this is set to 60 seconds, which is almost always much longer than is needed.
/// This is deliberately set high to try and avoid situations where we accidentally hit the limits
/// but is set at all so we can timeout rather than hanging forever.
///
Expand All @@ -50,123 +60,40 @@ pub fn set_max_wait(max_wait: Duration) {
MAX_WAIT.lock().replace(max_wait);
}

fn wait_duration() -> Duration {
pub(crate) fn wait_duration() -> Duration {
*MAX_WAIT.lock().borrow()
}

fn check_new_key(name: &str) {
// Check if a new key is needed. Just need a read lock, which can be done in sync with everyone else
let new_key = {
let unlock = LOCKS
.try_read_recursive_for(wait_duration())
.expect("read lock didn't work");
!unlock.deref().contains_key(name)
};
if new_key {
// This is the rare path, which avoids the multi-writer situation mostly
let mut lock = LOCKS
.try_write_for(wait_duration())
.expect("write lock didn't work");

lock.deref_mut().entry(name.to_string()).or_default();
}
}

#[doc(hidden)]
pub fn local_serial_core_with_return<E>(
name: &str,
function: fn() -> Result<(), E>,
) -> Result<(), E> {
check_new_key(name);

let unlock = LOCKS.read_recursive();
// _guard needs to be named to avoid being instant dropped
let _guard = unlock.deref()[name].lock();
function()
}

#[doc(hidden)]
pub fn local_serial_core(name: &str, function: fn()) {
check_new_key(name);

let unlock = LOCKS.read_recursive();
// _guard needs to be named to avoid being instant dropped
let _guard = unlock.deref()[name].lock();
function();
}

#[doc(hidden)]
pub async fn local_async_serial_core_with_return<E>(
name: &str,
fut: impl std::future::Future<Output = Result<(), E>>,
) -> Result<(), E> {
check_new_key(name);

let unlock = LOCKS.read_recursive();
// _guard needs to be named to avoid being instant dropped
let _guard = unlock.deref()[name].lock();
fut.await
}

#[doc(hidden)]
pub async fn local_async_serial_core(name: &str, fut: impl std::future::Future<Output = ()>) {
check_new_key(name);

let unlock = LOCKS.read_recursive();
// _guard needs to be named to avoid being instant dropped
let _guard = unlock.deref()[name].lock();
fut.await;
}

#[cfg(test)]
mod tests {
use super::{check_new_key, wait_duration, LOCKS};
use itertools::Itertools;
use parking_lot::RwLock;
use std::{
ops::Deref,
sync::{Arc, Barrier},
thread,
};

#[test]
fn test_hammer_check_new_key() {
let ptrs = Arc::new(RwLock::new(Vec::new()));
let mut threads = Vec::new();
pub(crate) fn check_new_key(name: &str) {
let start = Instant::now();
loop {
let duration = Instant::now() - start;
debug!("Waiting for '{}' {:?}", name, duration);
// Check if a new key is needed. Just need a read lock, which can be done in sync with everyone else
let try_unlock = LOCKS.try_read_recursive_for(Duration::from_secs(1));
if let Some(unlock) = try_unlock {
if unlock.deref().contains_key(name) {
return;
}
drop(unlock); // so that we don't hold the read lock and so the writer can maybe succeed
} else {
continue; // wasn't able to get read lock
}

let count = 100;
let barrier = Arc::new(Barrier::new(count));
// This is the rare path, which avoids the multi-writer situation mostly
let try_lock = LOCKS.try_write_for(Duration::from_secs(1));

for _ in 0..count {
let local_locks = LOCKS.clone();
let local_ptrs = ptrs.clone();
let c = barrier.clone();
threads.push(thread::spawn(move || {
c.wait();
check_new_key("foo");
{
let unlock = local_locks
.try_read_recursive_for(wait_duration())
.expect("read lock didn't work");
let mutex = unlock.deref().get("foo").unwrap();
if let Some(mut lock) = try_lock {
lock.deref_mut().entry(name.to_string()).or_default();
return;
}

let mut ptr_guard = local_ptrs
.try_write_for(wait_duration())
.expect("write lock didn't work");
ptr_guard.push(mutex.id);
}
// If the try_lock fails, then go around the loop again
// Odds are another test was also locking on the write and has now written the key

c.wait();
}));
}
for thread in threads {
thread.join().expect("thread join worked");
let duration = Instant::now() - start;
if duration >= wait_duration() {
panic!("check_new_key timed out!");
}
let ptrs_read_lock = ptrs
.try_read_recursive_for(wait_duration())
.expect("ptrs read work");
assert_eq!(ptrs_read_lock.len(), count);
println!("{:?}", ptrs_read_lock);
assert_eq!(ptrs_read_lock.iter().unique().count(), 1);
}
}
27 changes: 23 additions & 4 deletions serial_test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,19 @@
//! fn test_serial_another() {
//! // Do things
//! }
//!
//! #[test]
//! #[parallel]
//! fn test_parallel_another() {
//! // Do parallel things
//! }
//! ````
//! Multiple tests with the [serial](macro@serial) attribute are guaranteed to be executed in serial. Ordering
//! of the tests is not guaranteed however. Tests without the `serial` attribute may run at any time, including
//! in parallel to tests marked as `serial`. Note that if you're using an async test reactor attribute (e.g.
//! of the tests is not guaranteed however. Other tests with the [parallel](macro@parallel) attribute may run
//! at the same time as each other, but not at the same time as a test with [serial](macro@serial). Tests with
//! neither attribute may run at any time and no guarantees are made about their timing!
//!
//! Note that if you're using an async test reactor attribute (e.g.
//! `tokio::test` or `actix_rt::test`) then they should be listed *before* `serial`, otherwise we don't get an
//! async function and things break. There's now an error for this case to improve debugging.
//!
Expand All @@ -41,12 +50,21 @@
)]

mod code_lock;
mod parallel_code_lock;
mod rwlock;
mod serial_code_lock;

#[cfg(feature = "file_locks")]
mod file_lock;

pub use code_lock::{
pub use code_lock::set_max_wait;
pub use parallel_code_lock::{
local_async_parallel_core, local_async_parallel_core_with_return, local_parallel_core,
local_parallel_core_with_return,
};
pub use serial_code_lock::{
local_async_serial_core, local_async_serial_core_with_return, local_serial_core,
local_serial_core_with_return, set_max_wait,
local_serial_core_with_return,
};

#[cfg(feature = "file_locks")]
Expand All @@ -56,6 +74,7 @@ pub use file_lock::{
};

// Re-export #[serial/file_serial].
pub use serial_test_derive::parallel;
#[allow(unused_imports)]
pub use serial_test_derive::serial;

Expand Down
Loading

0 comments on commit c422658

Please sign in to comment.