Skip to content

Commit

Permalink
fix bifrost tests
Browse files Browse the repository at this point in the history
  • Loading branch information
AhmedSoliman committed May 28, 2024
1 parent 2f39be2 commit 1ac4c3f
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 6 deletions.
2 changes: 1 addition & 1 deletion crates/bifrost/src/bifrost.rs
Original file line number Diff line number Diff line change
Expand Up @@ -506,7 +506,7 @@ mod tests {
.await
}

#[test(tokio::test)]
#[test(tokio::test(flavor = "multi_thread", worker_threads = 2))]
async fn trim_log_smoke_test() -> googletest::Result<()> {
let node_env = TestCoreEnvBuilder::new_with_mock_network()
.set_provider_kind(ProviderKind::Local)
Expand Down
15 changes: 10 additions & 5 deletions crates/bifrost/src/read_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,8 @@ impl Stream for LogReadStream {
#[cfg(test)]
mod tests {

use std::sync::atomic::AtomicUsize;

use crate::{Bifrost, Record, TrimGap};

use super::*;
Expand All @@ -170,7 +172,7 @@ mod tests {

use restate_types::logs::Payload;

#[tokio::test]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[traced_test]
async fn test_basic_readstream() -> anyhow::Result<()> {
// Make sure that panics exits the process.
Expand Down Expand Up @@ -203,13 +205,15 @@ mod tests {
assert!(tail.is_none());
assert_eq!(read_after, reader.current_read_pointer());

let read_counter = Arc::new(AtomicUsize::new(0));
// spawn a reader that reads 5 records and exits.
let counter_clone = read_counter.clone();
let id = tc.spawn(TaskKind::TestRunner, "read-records", None, async move {
for i in 1..=5 {
let record = reader.next().await.expect("to never terminate")?;
let expected_lsn = Lsn::from(i) + read_after;
assert_eq!(expected_lsn, reader.current_read_pointer());
info!(?record, "read record");
counter_clone.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
assert_eq!(expected_lsn, record.offset);
assert_eq!(
Payload::new(format!("record{}", expected_lsn)).body(),
Expand Down Expand Up @@ -238,7 +242,7 @@ mod tests {
// Not finished, we still didn't append records
tokio::task::yield_now().await;
assert!(!reader_bg_handle.is_finished());
assert!(!logs_contain("read record"));
assert!(read_counter.load(std::sync::atomic::Ordering::Relaxed) == 0);

// write 5 more records.
for i in 6..=10 {
Expand All @@ -249,15 +253,16 @@ mod tests {

// reader has finished
reader_bg_handle.await?;
assert!(logs_contain("read record"));
assert!(read_counter.load(std::sync::atomic::Ordering::Relaxed) == 5);

anyhow::Ok(())
})
.await?;
Ok(())
}

#[tokio::test]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[traced_test]
async fn test_read_stream_with_trim() -> anyhow::Result<()> {
// Make sure that panics exits the process.
let orig_hook = std::panic::take_hook();
Expand Down

0 comments on commit 1ac4c3f

Please sign in to comment.