From 568e135daaeb26bc1fe75ec99d4ea0b46678a3a4 Mon Sep 17 00:00:00 2001 From: "xikai.wxk" Date: Wed, 12 Jul 2023 17:11:00 +0800 Subject: [PATCH] fix: bugs found by unit test --- analytic_engine/src/row_iter/chain.rs | 1 + .../src/sst/parquet/async_reader.rs | 2 +- common_util/src/prefetchable_stream.rs | 22 +++++++++++++++++++ 3 files changed, 24 insertions(+), 1 deletion(-) diff --git a/analytic_engine/src/row_iter/chain.rs b/analytic_engine/src/row_iter/chain.rs index 2ea99bae00..8fc2a64033 100644 --- a/analytic_engine/src/row_iter/chain.rs +++ b/analytic_engine/src/row_iter/chain.rs @@ -285,6 +285,7 @@ impl ChainIterator { self.streams[self.next_prefetch_stream_idx] .start_prefetch() .await; + self.next_prefetch_stream_idx += 1; } } } diff --git a/analytic_engine/src/sst/parquet/async_reader.rs b/analytic_engine/src/sst/parquet/async_reader.rs index 9e8b29f030..b0f252bdde 100644 --- a/analytic_engine/src/sst/parquet/async_reader.rs +++ b/analytic_engine/src/sst/parquet/async_reader.rs @@ -585,7 +585,7 @@ impl PrefetchableStream for RecordBatchReceiver { } async fn fetch_next(&mut self) -> Option { - self.fetch_next().await + self.next().await } } diff --git a/common_util/src/prefetchable_stream.rs b/common_util/src/prefetchable_stream.rs index 1e9d7005f6..a41380279d 100644 --- a/common_util/src/prefetchable_stream.rs +++ b/common_util/src/prefetchable_stream.rs @@ -148,3 +148,25 @@ impl PrefetchableStream for NoopPrefetcher { self.0.next().await } } + +#[cfg(test)] +mod tests { + use futures::stream; + + use super::*; + + #[tokio::test] + async fn test_trait_object_prefetchable_stream() { + let numbers = vec![1, 2, 3]; + let stream = stream::iter(numbers.clone()); + let stream = NoopPrefetcher(Box::new(stream)); + let mut stream: Box> = Box::new(stream); + + let mut fetched_numbers = Vec::with_capacity(numbers.len()); + while let Some(v) = stream.fetch_next().await { + fetched_numbers.push(v); + } + + assert_eq!(numbers, fetched_numbers); + } +}