diff --git a/analytic_engine/src/sst/parquet/async_reader.rs b/analytic_engine/src/sst/parquet/async_reader.rs index c552c65060..8256002888 100644 --- a/analytic_engine/src/sst/parquet/async_reader.rs +++ b/analytic_engine/src/sst/parquet/async_reader.rs @@ -3,6 +3,7 @@ //! Sst reader implementation based on parquet. use std::{ + fmt, ops::Range, pin::Pin, sync::Arc, @@ -219,8 +220,6 @@ impl<'a> Reader<'a> { let meta_data = self.meta_data.as_ref().unwrap(); let row_projector = self.row_projector.as_ref().unwrap(); - let object_store_reader = - ParquetFileReaderAdapter::new(self.file_reader.clone(), meta_data.clone()); // Get target row groups. let filtered_row_groups = self.filter_row_groups( @@ -230,7 +229,8 @@ impl<'a> Reader<'a> { )?; info!( - "Reader fetch record batches, row_groups total:{}, after filter:{}", + "Reader fetch record batches, path:{}, row_groups total:{}, after filter:{}", + self.path, meta_data.parquet().num_row_groups(), filtered_row_groups.len(), ); @@ -242,25 +242,20 @@ impl<'a> Reader<'a> { // Partition the batches by `read_parallelism`. let suggest_read_parallelism = read_parallelism; let read_parallelism = std::cmp::min(filtered_row_groups.len(), suggest_read_parallelism); - // Partition by `max_row_groups_in_a_batch`. - debug!( - "Reader fetch record batches parallelly, suggest_read_parallelism:{}, read_parallelism:{}", - suggest_read_parallelism, - read_parallelism, - ); // TODO: we only support read parallelly when `batch_size` == // `num_rows_per_row_group`, so this placing method is ok, we should // adjust it when supporting it other situations. let chunks_num = read_parallelism; let chunk_size = filtered_row_groups.len() / read_parallelism + 1; + info!( + "Reader fetch record batches parallelly, parallelism suggest:{}, real:{}, chunk_size:{}", + suggest_read_parallelism, read_parallelism, chunk_size + ); let mut filtered_row_group_chunks = vec![Vec::with_capacity(chunk_size); chunks_num]; for (row_group_idx, row_group) in filtered_row_groups.into_iter().enumerate() { let chunk_idx = row_group_idx % chunks_num; - filtered_row_group_chunks - .get_mut(chunk_idx) - .unwrap() - .push(row_group); + filtered_row_group_chunks[chunk_idx].push(row_group); } let proj_mask = ProjectionMask::leaves( @@ -270,7 +265,8 @@ impl<'a> Reader<'a> { let mut streams = Vec::with_capacity(filtered_row_group_chunks.len()); for chunk in filtered_row_group_chunks { - let object_store_reader = object_store_reader.clone(); + let object_store_reader = + ParquetFileReaderAdapter::new(self.file_reader.clone(), meta_data.clone()); let builder = ParquetRecordBatchStreamBuilder::new(object_store_reader) .await .with_context(|| ParquetError)?; @@ -395,13 +391,19 @@ impl ParallelismOptions { } } -#[derive(Debug, Clone)] struct ReaderMetrics { bytes_scanned: usize, sst_get_range_length_histogram: LocalHistogram, } -#[derive(Clone)] +impl fmt::Debug for ReaderMetrics { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("ReaderMetrics") + .field("bytes_scanned", &self.bytes_scanned) + .finish() + } +} + struct ParquetFileReaderAdapter { file_reader: AsyncFileReaderRef, meta_data: MetaData, @@ -453,6 +455,7 @@ impl AsyncParquetFileReader for ParquetFileReaderAdapter { ranges: Vec>, ) -> BoxFuture<'_, parquet::errors::Result>> { for range in &ranges { + self.metrics.bytes_scanned += range.end - range.start; self.metrics .sst_get_range_length_histogram .observe((range.end - range.start) as f64); @@ -509,7 +512,7 @@ impl RecordBatchProjector { impl Drop for RecordBatchProjector { fn drop(&mut self) { info!( - "RecordBatchProjector {}, read {} rows, cost:{}ms", + "RecordBatchProjector dropped, path:{} rows:{}, cost:{}ms.", self.path, self.row_num, self.start_time.saturating_elapsed().as_millis(),