Skip to content

Commit

Permalink
chore: add more context for query log (#523)
Browse files Browse the repository at this point in the history
  • Loading branch information
jiacai2050 authored Dec 30, 2022
1 parent d3f2441 commit 2ae08de
Showing 1 changed file with 20 additions and 17 deletions.
37 changes: 20 additions & 17 deletions analytic_engine/src/sst/parquet/async_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
//! Sst reader implementation based on parquet.

use std::{
fmt,
ops::Range,
pin::Pin,
sync::Arc,
Expand Down Expand Up @@ -219,8 +220,6 @@ impl<'a> Reader<'a> {

let meta_data = self.meta_data.as_ref().unwrap();
let row_projector = self.row_projector.as_ref().unwrap();
let object_store_reader =
ParquetFileReaderAdapter::new(self.file_reader.clone(), meta_data.clone());

// Get target row groups.
let filtered_row_groups = self.filter_row_groups(
Expand All @@ -230,7 +229,8 @@ impl<'a> Reader<'a> {
)?;

info!(
"Reader fetch record batches, row_groups total:{}, after filter:{}",
"Reader fetch record batches, path:{}, row_groups total:{}, after filter:{}",
self.path,
meta_data.parquet().num_row_groups(),
filtered_row_groups.len(),
);
Expand All @@ -242,25 +242,20 @@ impl<'a> Reader<'a> {
// Partition the batches by `read_parallelism`.
let suggest_read_parallelism = read_parallelism;
let read_parallelism = std::cmp::min(filtered_row_groups.len(), suggest_read_parallelism);
// Partition by `max_row_groups_in_a_batch`.
debug!(
"Reader fetch record batches parallelly, suggest_read_parallelism:{}, read_parallelism:{}",
suggest_read_parallelism,
read_parallelism,
);

// TODO: we only support read parallelly when `batch_size` ==
// `num_rows_per_row_group`, so this placing method is ok, we should
// adjust it when supporting it other situations.
let chunks_num = read_parallelism;
let chunk_size = filtered_row_groups.len() / read_parallelism + 1;
info!(
"Reader fetch record batches parallelly, parallelism suggest:{}, real:{}, chunk_size:{}",
suggest_read_parallelism, read_parallelism, chunk_size
);
let mut filtered_row_group_chunks = vec![Vec::with_capacity(chunk_size); chunks_num];
for (row_group_idx, row_group) in filtered_row_groups.into_iter().enumerate() {
let chunk_idx = row_group_idx % chunks_num;
filtered_row_group_chunks
.get_mut(chunk_idx)
.unwrap()
.push(row_group);
filtered_row_group_chunks[chunk_idx].push(row_group);
}

let proj_mask = ProjectionMask::leaves(
Expand All @@ -270,7 +265,8 @@ impl<'a> Reader<'a> {

let mut streams = Vec::with_capacity(filtered_row_group_chunks.len());
for chunk in filtered_row_group_chunks {
let object_store_reader = object_store_reader.clone();
let object_store_reader =
ParquetFileReaderAdapter::new(self.file_reader.clone(), meta_data.clone());
let builder = ParquetRecordBatchStreamBuilder::new(object_store_reader)
.await
.with_context(|| ParquetError)?;
Expand Down Expand Up @@ -395,13 +391,19 @@ impl ParallelismOptions {
}
}

#[derive(Debug, Clone)]
struct ReaderMetrics {
bytes_scanned: usize,
sst_get_range_length_histogram: LocalHistogram,
}

#[derive(Clone)]
impl fmt::Debug for ReaderMetrics {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ReaderMetrics")
.field("bytes_scanned", &self.bytes_scanned)
.finish()
}
}

struct ParquetFileReaderAdapter {
file_reader: AsyncFileReaderRef,
meta_data: MetaData,
Expand Down Expand Up @@ -453,6 +455,7 @@ impl AsyncParquetFileReader for ParquetFileReaderAdapter {
ranges: Vec<Range<usize>>,
) -> BoxFuture<'_, parquet::errors::Result<Vec<Bytes>>> {
for range in &ranges {
self.metrics.bytes_scanned += range.end - range.start;
self.metrics
.sst_get_range_length_histogram
.observe((range.end - range.start) as f64);
Expand Down Expand Up @@ -509,7 +512,7 @@ impl RecordBatchProjector {
impl Drop for RecordBatchProjector {
fn drop(&mut self) {
info!(
"RecordBatchProjector {}, read {} rows, cost:{}ms",
"RecordBatchProjector dropped, path:{} rows:{}, cost:{}ms.",
self.path,
self.row_num,
self.start_time.saturating_elapsed().as_millis(),
Expand Down

0 comments on commit 2ae08de

Please sign in to comment.