Skip to content

Commit

Permalink
fix: bugs found by unit test
Browse files Browse the repository at this point in the history
  • Loading branch information
ShiKaiWi committed Jul 14, 2023
1 parent 1fe4cbf commit 568e135
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 1 deletion.
1 change: 1 addition & 0 deletions analytic_engine/src/row_iter/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,7 @@ impl ChainIterator {
self.streams[self.next_prefetch_stream_idx]
.start_prefetch()
.await;
self.next_prefetch_stream_idx += 1;
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion analytic_engine/src/sst/parquet/async_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -585,7 +585,7 @@ impl PrefetchableStream for RecordBatchReceiver {
}

async fn fetch_next(&mut self) -> Option<Self::Item> {
self.fetch_next().await
self.next().await
}
}

Expand Down
22 changes: 22 additions & 0 deletions common_util/src/prefetchable_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,3 +148,25 @@ impl<T> PrefetchableStream for NoopPrefetcher<T> {
self.0.next().await
}
}

#[cfg(test)]
mod tests {
use futures::stream;

use super::*;

#[tokio::test]
async fn test_trait_object_prefetchable_stream() {
let numbers = vec![1, 2, 3];
let stream = stream::iter(numbers.clone());
let stream = NoopPrefetcher(Box::new(stream));
let mut stream: Box<dyn PrefetchableStream<Item = i32>> = Box::new(stream);

let mut fetched_numbers = Vec::with_capacity(numbers.len());
while let Some(v) = stream.fetch_next().await {
fetched_numbers.push(v);
}

assert_eq!(numbers, fetched_numbers);
}
}

0 comments on commit 568e135

Please sign in to comment.