Skip to content

Minor: Avoid need for PartitionedFile default #11829

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 14 additions & 4 deletions datafusion/core/src/datasource/listing/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
//! Helper functions for the table implementation

use std::collections::HashMap;
use std::mem;
use std::sync::Arc;

use super::PartitionedFile;
Expand Down Expand Up @@ -138,10 +139,19 @@ pub fn split_files(

// effectively this is div with rounding up instead of truncating
let chunk_size = (partitioned_files.len() + n - 1) / n;
partitioned_files
.chunks(chunk_size)
.map(|c| c.to_vec())
.collect()
let mut chunks = Vec::with_capacity(n);

let mut current_chunk = Vec::with_capacity(chunk_size);
for file in partitioned_files.drain(..) {
current_chunk.push(file);
if current_chunk.len() == chunk_size {
chunks.push(mem::take(&mut current_chunk));
}
}
if !current_chunk.is_empty() {
chunks.push(current_chunk)
}
chunks
}

struct Partition {
Expand Down
3 changes: 2 additions & 1 deletion datafusion/core/src/datasource/listing/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,11 @@ pub struct PartitionedFile {
///
/// DataFusion relies on these statistics for planning (in particular to sort file groups),
/// so if they are incorrect, incorrect answers may result.
pub statistics: Option<Statistics>,
pub statistics: Option<Arc<Statistics>>,
/// An optional field for user defined per object metadata
pub extensions: Option<Arc<dyn std::any::Any + Send + Sync>>,
}

impl PartitionedFile {
/// Create a simple file without metadata or partition
pub fn new(path: impl Into<String>, size: u64) -> Self {
Expand Down
19 changes: 11 additions & 8 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -978,10 +978,12 @@ impl ListingTable {
let statistics =
self.do_collect_statistics(ctx, &store, &part_file).await?;
part_file.statistics = Some(statistics.clone());
Ok((part_file, statistics)) as Result<(PartitionedFile, Statistics)>
Ok((part_file, statistics))
} else {
Ok((part_file, Statistics::new_unknown(&self.file_schema)))
as Result<(PartitionedFile, Statistics)>
Ok((
part_file,
Arc::new(Statistics::new_unknown(&self.file_schema)),
))
}
})
.boxed()
Expand Down Expand Up @@ -1011,12 +1013,12 @@ impl ListingTable {
ctx: &SessionState,
store: &Arc<dyn ObjectStore>,
part_file: &PartitionedFile,
) -> Result<Statistics> {
) -> Result<Arc<Statistics>> {
let statistics_cache = self.collected_statistics.clone();
return match statistics_cache
match statistics_cache
.get_with_extra(&part_file.object_meta.location, &part_file.object_meta)
{
Some(statistics) => Ok(statistics.as_ref().clone()),
Some(statistics) => Ok(statistics.clone()),
None => {
let statistics = self
.options
Expand All @@ -1028,14 +1030,15 @@ impl ListingTable {
&part_file.object_meta,
)
.await?;
let statistics = Arc::new(statistics);
statistics_cache.put_with_extra(
&part_file.object_meta.location,
statistics.clone().into(),
statistics.clone(),
&part_file.object_meta,
);
Ok(statistics)
}
};
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1193,7 +1193,7 @@ mod tests {
},
partition_values: vec![ScalarValue::from(file.date)],
range: None,
statistics: Some(Statistics {
statistics: Some(Arc::new(Statistics {
num_rows: Precision::Absent,
total_byte_size: Precision::Absent,
column_statistics: file
Expand All @@ -1213,7 +1213,7 @@ mod tests {
.unwrap_or_default()
})
.collect::<Vec<_>>(),
}),
})),
extensions: None,
}
}
Expand Down
150 changes: 71 additions & 79 deletions datafusion/core/src/datasource/statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
// specific language governing permissions and limitations
// under the License.

use std::mem;
use std::sync::Arc;

use super::listing::PartitionedFile;
use crate::arrow::datatypes::{Schema, SchemaRef};
use crate::error::Result;
Expand All @@ -26,16 +29,14 @@ use datafusion_common::stats::Precision;
use datafusion_common::ScalarValue;

use futures::{Stream, StreamExt};
use itertools::izip;
use itertools::multiunzip;

/// Get all files as well as the file level summary statistics (no statistic for partition columns).
/// If the optional `limit` is provided, includes only sufficient files. Needed to read up to
/// `limit` number of rows. `collect_stats` is passed down from the configuration parameter on
/// `ListingTable`. If it is false we only construct bare statistics and skip a potentially expensive
/// call to `multiunzip` for constructing file level summary statistics.
pub async fn get_statistics_with_limit(
all_files: impl Stream<Item = Result<(PartitionedFile, Statistics)>>,
all_files: impl Stream<Item = Result<(PartitionedFile, Arc<Statistics>)>>,
file_schema: SchemaRef,
limit: Option<usize>,
collect_stats: bool,
Expand All @@ -48,9 +49,7 @@ pub async fn get_statistics_with_limit(
// - zero for summations, and
// - neutral element for extreme points.
let size = file_schema.fields().len();
let mut null_counts: Vec<Precision<usize>> = vec![Precision::Absent; size];
let mut max_values: Vec<Precision<ScalarValue>> = vec![Precision::Absent; size];
let mut min_values: Vec<Precision<ScalarValue>> = vec![Precision::Absent; size];
let mut col_stats_set = vec![ColumnStatistics::default(); size];
let mut num_rows = Precision::<usize>::Absent;
let mut total_byte_size = Precision::<usize>::Absent;

Expand All @@ -62,12 +61,14 @@ pub async fn get_statistics_with_limit(
result_files.push(file);

// First file, we set them directly from the file statistics.
num_rows = file_stats.num_rows;
total_byte_size = file_stats.total_byte_size;
for (index, file_column) in file_stats.column_statistics.into_iter().enumerate() {
null_counts[index] = file_column.null_count;
max_values[index] = file_column.max_value;
min_values[index] = file_column.min_value;
num_rows = file_stats.num_rows.clone();
total_byte_size = file_stats.total_byte_size.clone();
for (index, file_column) in
file_stats.column_statistics.clone().into_iter().enumerate()
{
col_stats_set[index].null_count = file_column.null_count;
col_stats_set[index].max_value = file_column.max_value;
col_stats_set[index].min_value = file_column.min_value;
}

// If the number of rows exceeds the limit, we can stop processing
Expand All @@ -90,38 +91,28 @@ pub async fn get_statistics_with_limit(
// counts across all the files in question. If any file does not
// provide any information or provides an inexact value, we demote
// the statistic precision to inexact.
num_rows = add_row_stats(file_stats.num_rows, num_rows);
num_rows = add_row_stats(file_stats.num_rows.clone(), num_rows);

total_byte_size =
add_row_stats(file_stats.total_byte_size, total_byte_size);
add_row_stats(file_stats.total_byte_size.clone(), total_byte_size);

(null_counts, max_values, min_values) = multiunzip(
izip!(
file_stats.column_statistics.into_iter(),
null_counts.into_iter(),
max_values.into_iter(),
min_values.into_iter()
)
.map(
|(
ColumnStatistics {
null_count: file_nc,
max_value: file_max,
min_value: file_min,
distinct_count: _,
},
null_count,
max_value,
min_value,
)| {
(
add_row_stats(file_nc, null_count),
set_max_if_greater(file_max, max_value),
set_min_if_lesser(file_min, min_value),
)
},
),
);
for (file_col_stats, col_stats) in file_stats
.column_statistics
.iter()
.zip(col_stats_set.iter_mut())
{
let ColumnStatistics {
null_count: file_nc,
max_value: file_max,
min_value: file_min,
distinct_count: _,
} = file_col_stats;

col_stats.null_count =
add_row_stats(file_nc.clone(), col_stats.null_count.clone());
set_max_if_greater(file_max, &mut col_stats.max_value);
set_min_if_lesser(file_min, &mut col_stats.min_value)
}

// If the number of rows exceeds the limit, we can stop processing
// files. This only applies when we know the number of rows. It also
Expand All @@ -139,7 +130,7 @@ pub async fn get_statistics_with_limit(
let mut statistics = Statistics {
num_rows,
total_byte_size,
column_statistics: get_col_stats_vec(null_counts, max_values, min_values),
column_statistics: col_stats_set,
};
if all_files.next().await.is_some() {
// If we still have files in the stream, it means that the limit kicked
Expand Down Expand Up @@ -182,21 +173,6 @@ fn add_row_stats(
}
}

pub(crate) fn get_col_stats_vec(
null_counts: Vec<Precision<usize>>,
max_values: Vec<Precision<ScalarValue>>,
min_values: Vec<Precision<ScalarValue>>,
) -> Vec<ColumnStatistics> {
izip!(null_counts, max_values, min_values)
.map(|(null_count, max_value, min_value)| ColumnStatistics {
null_count,
max_value,
min_value,
distinct_count: Precision::Absent,
})
.collect()
}

pub(crate) fn get_col_stats(
schema: &Schema,
null_counts: Vec<Precision<usize>>,
Expand Down Expand Up @@ -238,45 +214,61 @@ fn min_max_aggregate_data_type(input_type: &DataType) -> &DataType {
/// If the given value is numerically greater than the original maximum value,
/// return the new maximum value with appropriate exactness information.
fn set_max_if_greater(
max_nominee: Precision<ScalarValue>,
max_values: Precision<ScalarValue>,
) -> Precision<ScalarValue> {
match (&max_values, &max_nominee) {
(Precision::Exact(val1), Precision::Exact(val2)) if val1 < val2 => max_nominee,
max_nominee: &Precision<ScalarValue>,
max_value: &mut Precision<ScalarValue>,
) {
match (&max_value, max_nominee) {
(Precision::Exact(val1), Precision::Exact(val2)) if val1 < val2 => {
*max_value = max_nominee.clone();
}
(Precision::Exact(val1), Precision::Inexact(val2))
| (Precision::Inexact(val1), Precision::Inexact(val2))
| (Precision::Inexact(val1), Precision::Exact(val2))
if val1 < val2 =>
{
max_nominee.to_inexact()
*max_value = max_nominee.clone().to_inexact();
}
(Precision::Exact(_), Precision::Absent) => {
let exact_max = mem::take(max_value);
*max_value = exact_max.to_inexact();
}
(Precision::Absent, Precision::Exact(_)) => {
*max_value = max_nominee.clone().to_inexact();
}
(Precision::Absent, Precision::Inexact(_)) => {
*max_value = max_nominee.clone();
}
(Precision::Exact(_), Precision::Absent) => max_values.to_inexact(),
(Precision::Absent, Precision::Exact(_)) => max_nominee.to_inexact(),
(Precision::Absent, Precision::Inexact(_)) => max_nominee,
(Precision::Absent, Precision::Absent) => Precision::Absent,
_ => max_values,
_ => {}
}
}

/// If the given value is numerically lesser than the original minimum value,
/// return the new minimum value with appropriate exactness information.
fn set_min_if_lesser(
min_nominee: Precision<ScalarValue>,
min_values: Precision<ScalarValue>,
) -> Precision<ScalarValue> {
match (&min_values, &min_nominee) {
(Precision::Exact(val1), Precision::Exact(val2)) if val1 > val2 => min_nominee,
min_nominee: &Precision<ScalarValue>,
min_value: &mut Precision<ScalarValue>,
) {
match (&min_value, min_nominee) {
(Precision::Exact(val1), Precision::Exact(val2)) if val1 > val2 => {
*min_value = min_nominee.clone();
}
(Precision::Exact(val1), Precision::Inexact(val2))
| (Precision::Inexact(val1), Precision::Inexact(val2))
| (Precision::Inexact(val1), Precision::Exact(val2))
if val1 > val2 =>
{
min_nominee.to_inexact()
*min_value = min_nominee.clone().to_inexact();
}
(Precision::Exact(_), Precision::Absent) => {
let exact_min = mem::take(min_value);
*min_value = exact_min.to_inexact();
}
(Precision::Absent, Precision::Exact(_)) => {
*min_value = min_nominee.clone().to_inexact();
}
(Precision::Absent, Precision::Inexact(_)) => {
*min_value = min_nominee.clone();
}
(Precision::Exact(_), Precision::Absent) => min_values.to_inexact(),
(Precision::Absent, Precision::Exact(_)) => min_nominee.to_inexact(),
(Precision::Absent, Precision::Inexact(_)) => min_nominee,
(Precision::Absent, Precision::Absent) => Precision::Absent,
_ => min_values,
_ => {}
}
}
6 changes: 5 additions & 1 deletion datafusion/proto/src/physical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -559,7 +559,11 @@ impl TryFrom<&protobuf::PartitionedFile> for PartitionedFile {
.map(|v| v.try_into())
.collect::<Result<Vec<_>, _>>()?,
range: val.range.as_ref().map(|v| v.try_into()).transpose()?,
statistics: val.statistics.as_ref().map(|v| v.try_into()).transpose()?,
statistics: val
.statistics
.as_ref()
.map(|v| v.try_into().map(Arc::new))
.transpose()?,
extensions: None,
})
}
Expand Down
2 changes: 1 addition & 1 deletion datafusion/proto/src/physical_plan/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -527,7 +527,7 @@ impl TryFrom<&PartitionedFile> for protobuf::PartitionedFile {
.map(|v| v.try_into())
.collect::<Result<Vec<_>, _>>()?,
range: pf.range.as_ref().map(|r| r.try_into()).transpose()?,
statistics: pf.statistics.as_ref().map(|s| s.into()),
statistics: pf.statistics.as_ref().map(|s| s.as_ref().into()),
})
}
}
Expand Down
Loading