Skip to content

Commit

Permalink
fix parallelly reading but and add tests.
Browse files Browse the repository at this point in the history
  • Loading branch information
Rachelint committed Dec 13, 2022
1 parent 21b1c66 commit 27fafff
Show file tree
Hide file tree
Showing 3 changed files with 118 additions and 3 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions analytic_engine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,4 @@ common_types = { workspace = true, features = ["test"] }
common_util = { workspace = true, features = ["test"] }
env_logger = { workspace = true }
wal = { workspace = true, features = ["test"] }
rand = "0.8.5"
119 changes: 116 additions & 3 deletions analytic_engine/src/sst/parquet/async_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use common_types::{
use common_util::{runtime::Runtime, time::InstantExt};
use datafusion::datasource::file_format;
use futures::{future::BoxFuture, FutureExt, Stream, StreamExt, TryFutureExt};
use log::{debug, error, info};
use log::{error, info};
use object_store::{ObjectMeta, ObjectStoreRef, Path};
use parquet::{
arrow::{async_reader::AsyncFileReader, ParquetRecordBatchStreamBuilder, ProjectionMask},
Expand Down Expand Up @@ -457,8 +457,13 @@ impl Stream for RecordBatchReceiver {
let cur_rx = self.rx_group.get_mut(cur_rx_idx).unwrap();
let poll_result = cur_rx.poll_recv(cx);

self.cur_rx_idx = (self.cur_rx_idx + 1) % self.rx_group.len();
poll_result
match poll_result {
Poll::Ready(result) => {
self.cur_rx_idx = (self.cur_rx_idx + 1) % self.rx_group.len();
Poll::Ready(result)
}
Poll::Pending => Poll::Pending,
}
}

fn size_hint(&self) -> (usize, Option<usize>) {
Expand Down Expand Up @@ -536,3 +541,111 @@ impl<'a> SstReader for ThreadedReader<'a> {
}) as _)
}
}

#[cfg(test)]
mod tests {
use std::{
pin::Pin,
task::{Context, Poll},
time::Duration,
};

use futures::{Stream, StreamExt};
use tokio::sync::mpsc::{self, Receiver, Sender};

struct MockReceivers {
rx_group: Vec<Receiver<u32>>,
cur_rx_idx: usize,
}

impl Stream for MockReceivers {
type Item = u32;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let cur_rx_idx = self.cur_rx_idx;
// `cur_rx_idx` is impossible to be out-of-range, because it is got by round
// robin.
let cur_rx = self.rx_group.get_mut(cur_rx_idx).unwrap();
let poll_result = cur_rx.poll_recv(cx);

match poll_result {
Poll::Ready(result) => {
self.cur_rx_idx = (self.cur_rx_idx + 1) % self.rx_group.len();
Poll::Ready(result)
}
Poll::Pending => Poll::Pending,
}
}

fn size_hint(&self) -> (usize, Option<usize>) {
(0, None)
}
}

struct MockRandomSenders {
tx_group: Vec<Sender<u32>>,
test_datas: Vec<Vec<u32>>,
}

impl MockRandomSenders {
fn start_to_send(&mut self) {
while !self.tx_group.is_empty() {
let tx = self.tx_group.pop().unwrap();
let test_data = self.test_datas.pop().unwrap();
tokio::spawn(async move {
for datum in test_data {
let random_millis = rand::random::<u64>() % 30;
tokio::time::sleep(Duration::from_millis(random_millis)).await;
tx.send(datum).await.unwrap();
}
});
}
}
}

fn gen_test_data(amount: usize) -> Vec<u32> {
(0..amount)
.into_iter()
.map(|_| rand::random::<u32>())
.collect()
}

#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn test_simulated_threaded_reader() {
let test_data = gen_test_data(123);
let expected = test_data.clone();
let channel_cap_per_sub_reader = 10;
let reader_num = 5;
let (tx_group, rx_group): (Vec<_>, Vec<_>) = (0..reader_num)
.into_iter()
.map(|_| mpsc::channel::<u32>(channel_cap_per_sub_reader))
.unzip();

// Partition datas.
let chunk_len = reader_num;
let mut test_data_chunks = vec![Vec::new(); chunk_len];
for (idx, datum) in test_data.into_iter().enumerate() {
let chunk_idx = idx % chunk_len;
test_data_chunks.get_mut(chunk_idx).unwrap().push(datum);
}

// Start senders.
let mut mock_senders = MockRandomSenders {
tx_group,
test_datas: test_data_chunks,
};
mock_senders.start_to_send();

// Poll receivers.
let mut actual = Vec::new();
let mut mock_receivers = MockReceivers {
rx_group,
cur_rx_idx: 0,
};
while let Some(datum) = mock_receivers.next().await {
actual.push(datum);
}

assert_eq!(actual, expected);
}
}

0 comments on commit 27fafff

Please sign in to comment.