Skip to content

Commit

Permalink
address CR.
Browse files Browse the repository at this point in the history
  • Loading branch information
Rachelint committed Dec 15, 2022
1 parent a27b335 commit a925730
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 86 deletions.
9 changes: 8 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ meta_client = { workspace = true }
query_engine = { workspace = true }
server = { workspace = true }
signal-hook = "0.3"
sort = "0.8.5"
table_engine = { workspace = true }
tracing_util = { workspace = true }

Expand Down
2 changes: 1 addition & 1 deletion analytic_engine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,5 +51,5 @@ wal = { workspace = true }
common_types = { workspace = true, features = ["test"] }
common_util = { workspace = true, features = ["test"] }
env_logger = { workspace = true }
rand = { workspace = true }
wal = { workspace = true, features = ["test"] }
rand = "0.8.5"
121 changes: 37 additions & 84 deletions analytic_engine/src/sst/parquet/async_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,12 +95,7 @@ impl<'a> Reader<'a> {
};

self.init_if_necessary().await?;
let streams = self
.fetch_record_batch_streams(
read_parallelism,
self.parallelism_options.max_row_groups_in_batch,
)
.await?;
let streams = self.fetch_record_batch_streams(read_parallelism).await?;
if streams.is_empty() {
return Ok(Vec::new());
}
Expand Down Expand Up @@ -151,10 +146,8 @@ impl<'a> Reader<'a> {
async fn fetch_record_batch_streams(
&mut self,
read_parallelism: usize,
max_row_groups_in_batch: usize,
) -> Result<Vec<SendableRecordBatchStream>> {
assert!(self.meta_data.is_some());
assert!(max_row_groups_in_batch > 0);

let meta_data = self.meta_data.as_ref().unwrap();
let row_projector = self.row_projector.as_ref().unwrap();
Expand All @@ -167,41 +160,39 @@ impl<'a> Reader<'a> {
meta_data.parquet().row_groups(),
&meta_data.custom().bloom_filter,
)?;
let filtered_row_group_len = filtered_row_groups.len();

info!(
"Reader fetch record batches, row_groups total:{}, after filter:{}",
meta_data.parquet().num_row_groups(),
filtered_row_group_len,
filtered_row_groups.len(),
);

if filtered_row_groups.is_empty() {
return Ok(Vec::new());
}

// Partition the batches by `read_parallelism`.
let suggest_read_parallelism = read_parallelism;
let read_parallelism = std::cmp::min(filtered_row_groups.len(), suggest_read_parallelism);
// Partition by `max_row_groups_in_a_batch`.
let row_group_batches = partition_row_groups(filtered_row_groups, max_row_groups_in_batch);
debug!(
"Reader fetch record batches parallelly,
row_groups total:{}, after filter:{}, max_row_groups_in_a_batch:{}, row_group_batches:{}, read_parallelism:{}",
meta_data.parquet().num_row_groups(),
filtered_row_group_len,
max_row_groups_in_batch,
row_group_batches.len(),
"Reader fetch record batches parallelly, suggest_read_parallelism:{}, read_parallelism:{}",
suggest_read_parallelism,
read_parallelism,
);

// Partition the batches by `read_parallelism`.
let read_parallelism = std::cmp::min(row_group_batches.len(), read_parallelism);
// TODO: we only support read parallelly when `batch_size` ==
// `num_rows_per_row_group`, so this placing method is ok, we should
// adjust it when supporting it other situations.
let chunks_num = read_parallelism;
let chunk_size = filtered_row_group_len / read_parallelism + 1;
let chunk_size = filtered_row_groups.len() / read_parallelism + 1;
let mut filtered_row_group_chunks = vec![Vec::with_capacity(chunk_size); chunks_num];
for (row_group_batch_idx, row_group_batch) in row_group_batches.into_iter().enumerate() {
let chunk_idx = row_group_batch_idx % chunks_num;
for (row_group_idx, row_group) in filtered_row_groups.into_iter().enumerate() {
let chunk_idx = row_group_idx % chunks_num;
filtered_row_group_chunks
.get_mut(chunk_idx)
.unwrap()
.extend(row_group_batch);
.push(row_group);
}

let proj_mask = ProjectionMask::leaves(
Expand Down Expand Up @@ -307,48 +298,30 @@ struct ParallelismOptions {
/// (surely, `num_rows_per_row_group` > 0).
// TODO: maybe we should support `read_parallelly` in all situations.
enable_read_parallelly: bool,

/// The max row groups in a batch
max_row_groups_in_batch: usize,
// TODO: more configs will be add.
}

impl ParallelismOptions {
fn new(read_batch_row_num: usize, num_rows_per_row_group: usize) -> Self {
// Check if `read_batch_row_num` % `num_rows_per_row_group` != 0.
// If `false`, parallelly reading is not allowed.
let enable_read_parallelly = read_batch_row_num % num_rows_per_row_group == 0;
if !enable_read_parallelly {
let enable_read_parallelly = if read_batch_row_num != num_rows_per_row_group {
warn!(
"Reader new parallelism options not enable, don't allow read parallelly,
"Reader new parallelism options not enable, don't allow read parallelly because
read_batch_row_num != num_rows_per_row_group,
read_batch_row_num:{}, num_rows_per_row_group:{}",
read_batch_row_num, num_rows_per_row_group
);
}

let max_row_groups_in_batch = if read_batch_row_num >= num_rows_per_row_group {
read_batch_row_num / num_rows_per_row_group
false
} else {
1
true
};

Self {
enable_read_parallelly,
max_row_groups_in_batch,
}
}
}

#[inline]
fn partition_row_groups(
filtered_row_groups: Vec<usize>,
max_row_groups_in_a_batch: usize,
) -> Vec<Vec<usize>> {
filtered_row_groups
.chunks(max_row_groups_in_a_batch)
.map(|chunk| chunk.to_vec())
.collect()
}

#[derive(Debug, Clone)]
struct ReaderMetrics {
bytes_scanned: usize,
Expand Down Expand Up @@ -558,6 +531,16 @@ impl Stream for RecordBatchReceiver {

match poll_result {
Poll::Ready(result) => {
// If found `Poll::Pending`, we need to keep polling current rx
// until found `Poll::Ready` for ensuring the order of record batches,
// because batches are placed into each stream by round robin:
// +------+ +------+ +------+
// | 1 | | 2 | | 3 |
// +------+ +------+ +------+
// | 4 | | 5 | | 6 |
// +------+ +------+ +------+
// | ... | | ... | | ... |
// +------+ +------+ +------+
self.cur_rx_idx = (self.cur_rx_idx + 1) % self.rx_group.len();
Poll::Ready(result)
}
Expand Down Expand Up @@ -629,11 +612,7 @@ impl<'a> SstReader for ThreadedReader<'a> {
}) as _);
}

let read_parallelism = if self.read_parallelism >= sub_readers.len() {
sub_readers.len()
} else {
self.read_parallelism
};
let read_parallelism = sub_readers.len();
debug!(
"ThreadedReader read, suggest read_parallelism:{}, actual:{}",
self.read_parallelism, read_parallelism
Expand Down Expand Up @@ -668,7 +647,7 @@ mod tests {
use futures::{Stream, StreamExt};
use tokio::sync::mpsc::{self, Receiver, Sender};

use super::{partition_row_groups, ParallelismOptions};
use super::ParallelismOptions;

struct MockReceivers {
rx_group: Vec<Receiver<u32>>,
Expand Down Expand Up @@ -772,48 +751,22 @@ mod tests {

#[test]
fn test_parallelism_options() {
// `read_batch_row_num` < num_rows_per_row_group` && `read_batch_row_num` %
// num_rows_per_row_group` == 0
// `read_batch_row_num` < num_rows_per_row_group`
let read_batch_row_num = 2;
let num_rows_per_row_group = 4;
let options = ParallelismOptions::new(read_batch_row_num, num_rows_per_row_group);
assert!(!options.enable_read_parallelly);
assert_eq!(options.max_row_groups_in_batch, 1);

// `read_batch_row_num` < num_rows_per_row_group` && `read_batch_row_num` %
// num_rows_per_row_group` != 0
let read_batch_row_num = 3;
// `read_batch_row_num` > num_rows_per_row_group
let read_batch_row_num = 8;
let num_rows_per_row_group = 4;
let options = ParallelismOptions::new(read_batch_row_num, num_rows_per_row_group);
assert!(!options.enable_read_parallelly);
assert_eq!(options.max_row_groups_in_batch, 1);

// `read_batch_row_num` >= num_rows_per_row_group` && `read_batch_row_num` %
// num_rows_per_row_group` == 0
let read_batch_row_num = 8;
// `read_batch_row_num` == num_rows_per_row_group`
let read_batch_row_num = 4;
let num_rows_per_row_group = 4;
let options = ParallelismOptions::new(read_batch_row_num, num_rows_per_row_group);
assert!(options.enable_read_parallelly);
assert_eq!(options.max_row_groups_in_batch, 2);

// `read_batch_row_num` >= num_rows_per_row_group` && `read_batch_row_num` %
// num_rows_per_row_group` != 0
let read_batch_row_num = 7;
let num_rows_per_row_group = 4;
let options = ParallelismOptions::new(read_batch_row_num, num_rows_per_row_group);
assert!(!options.enable_read_parallelly);
assert_eq!(options.max_row_groups_in_batch, 1);
}

#[test]
fn test_partition_row_groups() {
let test_row_groups = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
let test_max_groups_in_batch = 3;

let expected = vec![vec![1, 2, 3], vec![4, 5, 6], vec![7, 8, 9], vec![10]];
let partitioned_row_group_batches =
partition_row_groups(test_row_groups, test_max_groups_in_batch);
assert_eq!(partitioned_row_group_batches.len(), 4);
assert_eq!(expected, partitioned_row_group_batches);
}
}

0 comments on commit a925730

Please sign in to comment.