Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -691,6 +691,15 @@ config_namespace! {
/// then the output will be coerced to a non-view.
/// Coerces `Utf8View` to `LargeUtf8`, and `BinaryView` to `LargeBinary`.
pub expand_views_at_output: bool, default = false

/// Number of input streams to prefetch for ProgressiveEvalExec
/// Since ProgressiveEvalExec only polls one stream at a time in their stream order,
/// we do not need to prefetch all streams at once to save resources. However, if the
/// streams' IO time is way more than their CPU/procesing time, prefetching them will help
/// improve the performance.
/// Default is 2 which means we will prefetch one extra stream before polling the current one.
/// Increasing this value if IO time to read a stream is often much more than CPU time to process its previous one.
pub progressive_eval_num_prefetch_input_streams: usize, default = 2
}
}

Expand Down
126 changes: 103 additions & 23 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,16 @@

//! The table implementation.

use std::collections::HashMap;
use std::{any::Any, str::FromStr, sync::Arc};

use super::helpers::{expr_applicable_for_cols, pruned_partition_list};
use super::{ListingTableUrl, PartitionedFile};
use std::collections::HashMap;
use std::{any::Any, str::FromStr, sync::Arc};

use crate::datasource::{
create_ordering,
file_format::{
file_compression_type::FileCompressionType, FileFormat, FilePushdownSupport,
},
get_statistics_with_limit,
physical_plan::FileSinkConfig,
};
use crate::execution::context::SessionState;
Expand All @@ -53,11 +51,13 @@ use datafusion_physical_expr::{
create_physical_expr, LexOrdering, PhysicalSortRequirement,
};

use crate::datasource::statistics::compute_all_files_statistics;
use async_trait::async_trait;
use datafusion_catalog::Session;
use datafusion_common::stats::Precision;
use datafusion_datasource::file_groups::FileGroup;
use datafusion_physical_expr_common::sort_expr::LexRequirement;
use futures::{future, stream, StreamExt, TryStreamExt};
use futures::{future, stream, Stream, StreamExt, TryStreamExt};
use itertools::Itertools;
use object_store::ObjectStore;

Expand Down Expand Up @@ -1114,32 +1114,26 @@ impl ListingTable {
let files = file_list
.map(|part_file| async {
let part_file = part_file?;
if self.options.collect_stat {
let statistics =
self.do_collect_statistics(ctx, &store, &part_file).await?;
Ok((part_file, statistics))
let statistics = if self.options.collect_stat {
self.do_collect_statistics(ctx, &store, &part_file).await?
} else {
Ok((
part_file,
Arc::new(Statistics::new_unknown(&self.file_schema)),
))
}
Arc::new(Statistics::new_unknown(&self.file_schema))
};
Ok(part_file.with_statistics(statistics))
})
.boxed()
.buffer_unordered(ctx.config_options().execution.meta_fetch_concurrency);

let (files, statistics) = get_statistics_with_limit(
files,
let (file_group, inexact_stats) =
get_files_with_limit(files, limit, self.options.collect_stat).await?;

let file_groups = file_group.split_files(self.options.target_partitions);
compute_all_files_statistics(
file_groups,
self.schema(),
limit,
self.options.collect_stat,
inexact_stats,
)
.await?;

Ok((
files.split_files(self.options.target_partitions),
statistics,
))
}

/// Collects statistics for a given partitioned file.
Expand Down Expand Up @@ -1181,6 +1175,92 @@ impl ListingTable {
}
}

/// Processes a stream of partitioned files and returns a `FileGroup` containing the files.
///
/// This function collects files from the provided stream until either:
/// 1. The stream is exhausted
/// 2. The accumulated number of rows exceeds the provided `limit` (if specified)
///
/// # Arguments
/// * `files` - A stream of `Result<PartitionedFile>` items to process
/// * `limit` - An optional row count limit. If provided, the function will stop collecting files
/// once the accumulated number of rows exceeds this limit
/// * `collect_stats` - Whether to collect and accumulate statistics from the files
///
/// # Returns
/// A `Result` containing a `FileGroup` with the collected files
/// and a boolean indicating whether the statistics are inexact.
///
/// # Note
/// The function will continue processing files if statistics are not available or if the
/// limit is not provided. If `collect_stats` is false, statistics won't be accumulated
/// but files will still be collected.
async fn get_files_with_limit(
files: impl Stream<Item = Result<PartitionedFile>>,
limit: Option<usize>,
collect_stats: bool,
) -> Result<(FileGroup, bool)> {
let mut file_group = FileGroup::default();
// Fusing the stream allows us to call next safely even once it is finished.
let mut all_files = Box::pin(files.fuse());
let mut num_rows = Precision::<usize>::Absent;
while let Some(first_file) = all_files.next().await {
let file = first_file?;
if let Some(file_statistic) = &file.statistics {
num_rows = file_statistic.num_rows;
}
file_group.push(file);

// If the number of rows exceeds the limit, we can stop processing
// files. This only applies when we know the number of rows. It also
// currently ignores tables that have no statistics regarding the
// number of rows.
let conservative_num_rows = match num_rows {
Precision::Exact(nr) => nr,
_ => usize::MIN,
};
if conservative_num_rows <= limit.unwrap_or(usize::MAX) {
while let Some(current) = all_files.next().await {
let file = current?;
if !collect_stats {
file_group.push(file);
continue;
}

// We accumulate the number of rows, total byte size and null
// counts across all the files in question. If any file does not
// provide any information or provides an inexact value, we demote
// the statistic precision to inexact.
if let Some(file_stats) = &file.statistics {
num_rows = crate::datasource::statistics::add_row_stats(
num_rows,
file_stats.num_rows,
);
}
file_group.push(file);

// If the number of rows exceeds the limit, we can stop processing
// files. This only applies when we know the number of rows. It also
// currently ignores tables that have no statistics regarding the
// number of rows.
if num_rows.get_value().unwrap_or(&usize::MIN)
> &limit.unwrap_or(usize::MAX)
{
break;
}
}
}
}
let mut inexact_stats = false;
if all_files.next().await.is_some() {
// If we still have files in the stream, it means that the limit kicked
// in, and the statistic could have been different had we processed the
// files in a different order.
inexact_stats = true;
}
Ok((file_group, inexact_stats))
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
1 change: 0 additions & 1 deletion datafusion/core/src/datasource/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ pub use datafusion_datasource::schema_adapter;
pub use datafusion_datasource::source;
pub use datafusion_execution::object_store;
pub use datafusion_physical_expr::create_ordering;
pub use statistics::get_statistics_with_limit;

#[cfg(all(test, feature = "parquet"))]
mod tests {
Expand Down
157 changes: 147 additions & 10 deletions datafusion/core/src/datasource/statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,25 +15,27 @@
// specific language governing permissions and limitations
// under the License.

use std::mem;
use std::sync::Arc;

use futures::{Stream, StreamExt};

use crate::arrow::datatypes::SchemaRef;
use crate::error::Result;
use crate::physical_plan::{ColumnStatistics, Statistics};
use datafusion_common::stats::Precision;
use datafusion_common::ScalarValue;
use datafusion_datasource::file_groups::FileGroup;

use super::listing::PartitionedFile;
use datafusion_datasource::PartitionedFile;
use futures::{Stream, StreamExt};
use std::mem;
use std::sync::Arc;

/// Get all files as well as the file level summary statistics (no statistic for partition columns).
/// If the optional `limit` is provided, includes only sufficient files. Needed to read up to
/// `limit` number of rows. `collect_stats` is passed down from the configuration parameter on
/// `ListingTable`. If it is false we only construct bare statistics and skip a potentially expensive
/// call to `multiunzip` for constructing file level summary statistics.
#[deprecated(
since = "47.0.0",
note = "Please use `get_files_with_limit` and `compute_all_files_statistics` instead"
)]
#[allow(unused)]
pub async fn get_statistics_with_limit(
all_files: impl Stream<Item = Result<(PartitionedFile, Arc<Statistics>)>>,
file_schema: SchemaRef,
Expand All @@ -57,7 +59,7 @@ pub async fn get_statistics_with_limit(

if let Some(first_file) = all_files.next().await {
let (mut file, file_stats) = first_file?;
file.statistics = Some(file_stats.as_ref().clone());
file.statistics = Some(Arc::clone(&file_stats));
result_files.push(file);

// First file, we set them directly from the file statistics.
Expand All @@ -83,7 +85,7 @@ pub async fn get_statistics_with_limit(
if conservative_num_rows <= limit.unwrap_or(usize::MAX) {
while let Some(current) = all_files.next().await {
let (mut file, file_stats) = current?;
file.statistics = Some(file_stats.as_ref().clone());
file.statistics = Some(Arc::clone(&file_stats));
result_files.push(file);
if !collect_stats {
continue;
Expand Down Expand Up @@ -145,7 +147,142 @@ pub async fn get_statistics_with_limit(
Ok((result_files, statistics))
}

fn add_row_stats(
/// Generic function to compute statistics across multiple items that have statistics
fn compute_summary_statistics<T, I>(
items: I,
file_schema: &SchemaRef,
stats_extractor: impl Fn(&T) -> Option<&Statistics>,
) -> Statistics
where
I: IntoIterator<Item = T>,
{
let size = file_schema.fields().len();
let mut col_stats_set = vec![ColumnStatistics::default(); size];
let mut num_rows = Precision::<usize>::Absent;
let mut total_byte_size = Precision::<usize>::Absent;

for (idx, item) in items.into_iter().enumerate() {
if let Some(item_stats) = stats_extractor(&item) {
if idx == 0 {
// First item, set values directly
num_rows = item_stats.num_rows;
total_byte_size = item_stats.total_byte_size;
for (index, column_stats) in
item_stats.column_statistics.iter().enumerate()
{
col_stats_set[index].null_count = column_stats.null_count;
col_stats_set[index].max_value = column_stats.max_value.clone();
col_stats_set[index].min_value = column_stats.min_value.clone();
col_stats_set[index].sum_value = column_stats.sum_value.clone();
}
continue;
}

// Accumulate statistics for subsequent items
num_rows = add_row_stats(item_stats.num_rows, num_rows);
total_byte_size = add_row_stats(item_stats.total_byte_size, total_byte_size);

for (item_col_stats, col_stats) in item_stats
.column_statistics
.iter()
.zip(col_stats_set.iter_mut())
{
col_stats.null_count =
add_row_stats(item_col_stats.null_count, col_stats.null_count);
set_max_if_greater(&item_col_stats.max_value, &mut col_stats.max_value);
set_min_if_lesser(&item_col_stats.min_value, &mut col_stats.min_value);
col_stats.sum_value = item_col_stats.sum_value.add(&col_stats.sum_value);
}
}
}

Statistics {
num_rows,
total_byte_size,
column_statistics: col_stats_set,
}
}

/// Computes the summary statistics for a group of files(`FileGroup` level's statistics).
///
/// This function combines statistics from all files in the file group to create
/// summary statistics. It handles the following aspects:
/// - Merges row counts and byte sizes across files
/// - Computes column-level statistics like min/max values
/// - Maintains appropriate precision information (exact, inexact, absent)
///
/// # Parameters
/// * `file_group` - The group of files to process
/// * `file_schema` - Schema of the files
/// * `collect_stats` - Whether to collect statistics (if false, returns original file group)
///
/// # Returns
/// A new file group with summary statistics attached
pub fn compute_file_group_statistics(
file_group: FileGroup,
file_schema: SchemaRef,
collect_stats: bool,
) -> Result<FileGroup> {
if !collect_stats {
return Ok(file_group);
}

let statistics =
compute_summary_statistics(file_group.iter(), &file_schema, |file| {
file.statistics.as_ref().map(|stats| stats.as_ref())
});

Ok(file_group.with_statistics(statistics))
}

/// Computes statistics for all files across multiple file groups.
///
/// This function:
/// 1. Computes statistics for each individual file group
/// 2. Summary statistics across all file groups
/// 3. Optionally marks statistics as inexact
///
/// # Parameters
/// * `file_groups` - Vector of file groups to process
/// * `file_schema` - Schema of the files
/// * `collect_stats` - Whether to collect statistics
/// * `inexact_stats` - Whether to mark the resulting statistics as inexact
///
/// # Returns
/// A tuple containing:
/// * The processed file groups with their individual statistics attached
/// * The summary statistics across all file groups, aka all files summary statistics
pub fn compute_all_files_statistics(
file_groups: Vec<FileGroup>,
file_schema: SchemaRef,
collect_stats: bool,
inexact_stats: bool,
) -> Result<(Vec<FileGroup>, Statistics)> {
let mut file_groups = file_groups;

// First compute statistics for each file group
for file_group in file_groups.iter_mut() {
*file_group = compute_file_group_statistics(
file_group.clone(),
Arc::clone(&file_schema),
collect_stats,
)?;
}

// Then summary statistics across all file groups
let mut statistics =
compute_summary_statistics(&file_groups, &file_schema, |file_group| {
file_group.statistics()
});

if inexact_stats {
statistics = statistics.to_inexact()
}

Ok((file_groups, statistics))
}

pub(crate) fn add_row_stats(
file_num_rows: Precision<usize>,
num_rows: Precision<usize>,
) -> Precision<usize> {
Expand Down
Loading
Loading