Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ballista/rust/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ simd = ["datafusion/simd"]
[dependencies]
ahash = { version = "0.7", default-features = false }

arrow-flight = { version = "10.0" }
arrow-flight = { version = "11" }
async-trait = "0.1.41"
chrono = { version = "0.4", default-features = false }
clap = { version = "3", features = ["derive", "cargo"] }
Expand Down
4 changes: 2 additions & 2 deletions ballista/rust/executor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ snmalloc = ["snmalloc-rs"]

[dependencies]
anyhow = "1"
arrow = { version = "10.0" }
arrow-flight = { version = "10.0" }
arrow = { version = "11" }
arrow-flight = { version = "11" }
async-trait = "0.1.41"
ballista-core = { path = "../core", version = "0.6.0" }
chrono = { version = "0.4", default-features = false }
Expand Down
3 changes: 2 additions & 1 deletion ballista/rust/executor/src/flight_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,8 @@ impl FlightService for BallistaFlightService {
))
})
.map_err(|e| from_ballista_err(&e))?;
let reader = FileReader::try_new(file).map_err(|e| from_arrow_err(&e))?;
let reader =
FileReader::try_new(file, None).map_err(|e| from_arrow_err(&e))?;

let (tx, rx): (FlightDataSender, FlightDataReceiver) = channel(2);

Expand Down
2 changes: 1 addition & 1 deletion datafusion-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ repository = "https://github.com/apache/arrow-datafusion"
rust-version = "1.59"

[dependencies]
arrow = { version = "10.0" }
arrow = { version = "11" }
ballista = { path = "../ballista/rust/client", version = "0.6.0", optional = true }
clap = { version = "3", features = ["derive", "cargo"] }
datafusion = { path = "../datafusion", version = "7.0.0" }
Expand Down
4 changes: 2 additions & 2 deletions datafusion-common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,10 @@ jit = ["cranelift-module"]
pyarrow = ["pyo3"]

[dependencies]
arrow = { version = "10.0", features = ["prettyprint"] }
arrow = { version = "11", features = ["prettyprint"] }
avro-rs = { version = "0.13", features = ["snappy"], optional = true }
cranelift-module = { version = "0.82.0", optional = true }
ordered-float = "2.10"
parquet = { version = "10.0", features = ["arrow"], optional = true }
parquet = { version = "11", features = ["arrow"], optional = true }
pyo3 = { version = "0.16", optional = true }
sqlparser = "0.15"
2 changes: 1 addition & 1 deletion datafusion-examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ path = "examples/avro_sql.rs"
required-features = ["datafusion/avro"]

[dev-dependencies]
arrow-flight = { version = "10.0" }
arrow-flight = { version = "11" }
async-trait = "0.1.41"
datafusion = { path = "../datafusion" }
futures = "0.3"
Expand Down
2 changes: 1 addition & 1 deletion datafusion-expr/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,6 @@ path = "src/lib.rs"

[dependencies]
ahash = { version = "0.7", default-features = false }
arrow = { version = "10.0", features = ["prettyprint"] }
arrow = { version = "11", features = ["prettyprint"] }
datafusion-common = { path = "../datafusion-common", version = "7.0.0" }
sqlparser = "0.15"
2 changes: 1 addition & 1 deletion datafusion-physical-expr/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ unicode_expressions = ["unicode-segmentation"]

[dependencies]
ahash = { version = "0.7", default-features = false }
arrow = { version = "10.0", features = ["prettyprint"] }
arrow = { version = "11", features = ["prettyprint"] }
blake2 = { version = "^0.10.2", optional = true }
blake3 = { version = "1.0", optional = true }
chrono = { version = "0.4", default-features = false }
Expand Down
4 changes: 2 additions & 2 deletions datafusion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ unicode_expressions = ["datafusion-physical-expr/regex_expressions"]

[dependencies]
ahash = { version = "0.7", default-features = false }
arrow = { version = "10.0", features = ["prettyprint"] }
arrow = { version = "11", features = ["prettyprint"] }
async-trait = "0.1.41"
avro-rs = { version = "0.13", features = ["snappy"], optional = true }
chrono = { version = "0.4", default-features = false }
Expand All @@ -71,7 +71,7 @@ num-traits = { version = "0.2", optional = true }
num_cpus = "1.13.0"
ordered-float = "2.10"
parking_lot = "0.12"
parquet = { version = "10.0", features = ["arrow"] }
parquet = { version = "11", features = ["arrow"] }
paste = "^1.0"
pin-project-lite= "^0.2.7"
pyo3 = { version = "0.16", optional = true }
Expand Down
2 changes: 1 addition & 1 deletion datafusion/fuzz-utils/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,6 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
arrow = { version = "10.0", features = ["prettyprint"] }
arrow = { version = "11", features = ["prettyprint"] }
env_logger = "0.9.0"
rand = "0.8"
174 changes: 75 additions & 99 deletions datafusion/src/physical_plan/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,8 @@ use arrow::{
use log::{debug, warn};
use parquet::arrow::ArrowWriter;
use parquet::file::{
metadata::RowGroupMetaData,
reader::{FileReader, SerializedFileReader},
statistics::Statistics as ParquetStatistics,
metadata::RowGroupMetaData, reader::SerializedFileReader,
serialized_reader::ReadOptionsBuilder, statistics::Statistics as ParquetStatistics,
};

use fmt::Debug;
Expand Down Expand Up @@ -309,7 +308,7 @@ fn send_result(
/// Wraps parquet statistics in a way
/// that implements [`PruningStatistics`]
struct RowGroupPruningStatistics<'a> {
row_group_metadata: &'a [RowGroupMetaData],
row_group_metadata: &'a RowGroupMetaData,
parquet_schema: &'a Schema,
}

Expand Down Expand Up @@ -342,33 +341,26 @@ macro_rules! get_statistic {
// Extract the min or max value calling `func` or `bytes_func` on the ParquetStatistics as appropriate
macro_rules! get_min_max_values {
($self:expr, $column:expr, $func:ident, $bytes_func:ident) => {{
let (column_index, field) = if let Some((v, f)) = $self.parquet_schema.column_with_name(&$column.name) {
(v, f)
} else {
// Named column was not present
return None
};
let (column_index, field) =
if let Some((v, f)) = $self.parquet_schema.column_with_name(&$column.name) {
(v, f)
} else {
// Named column was not present
return None;
};

let data_type = field.data_type();
// The result may be None, because DataFusion doesn't have support for ScalarValues of the column type
let null_scalar: ScalarValue = data_type.try_into().ok()?;

let scalar_values : Vec<ScalarValue> = $self.row_group_metadata
.iter()
.flat_map(|meta| {
meta.column(column_index).statistics()
})
.map(|stats| {
get_statistic!(stats, $func, $bytes_func)
})
.map(|maybe_scalar| {
// column either did't have statistics at all or didn't have min/max values
maybe_scalar.unwrap_or_else(|| null_scalar.clone())
})
.collect();

// ignore errors converting to arrays (e.g. different types)
ScalarValue::iter_to_array(scalar_values).ok()
$self.row_group_metadata
.column(column_index)
.statistics()
.map(|stats| get_statistic!(stats, $func, $bytes_func))
.flatten()
// column either didn't have statistics at all or didn't have min/max values
.or_else(|| Some(null_scalar.clone()))
.map(|s| s.to_array())
}}
}

Expand All @@ -383,17 +375,14 @@ macro_rules! get_null_count_values {
return None;
};

let scalar_values: Vec<ScalarValue> = $self
.row_group_metadata
.iter()
.flat_map(|meta| meta.column(column_index).statistics())
.map(|stats| {
ScalarValue::UInt64(Some(stats.null_count().try_into().unwrap()))
})
.collect();

// ignore errors converting to arrays (e.g. different types)
ScalarValue::iter_to_array(scalar_values).ok()
let value = ScalarValue::UInt64(
$self
.row_group_metadata
.column(column_index)
.statistics()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

.map(|s| s.null_count()),
);
Some(value.to_array())
}};
}

Expand All @@ -407,7 +396,7 @@ impl<'a> PruningStatistics for RowGroupPruningStatistics<'a> {
}

fn num_containers(&self) -> usize {
self.row_group_metadata.len()
1
}

fn null_counts(&self, column: &Column) -> Option<ArrayRef> {
Expand All @@ -418,31 +407,33 @@ impl<'a> PruningStatistics for RowGroupPruningStatistics<'a> {
fn build_row_group_predicate(
pruning_predicate: &PruningPredicate,
metrics: ParquetFileMetrics,
row_group_metadata: &[RowGroupMetaData],
) -> Box<dyn Fn(&RowGroupMetaData, usize) -> bool> {
let parquet_schema = pruning_predicate.schema().as_ref();

let pruning_stats = RowGroupPruningStatistics {
row_group_metadata,
parquet_schema,
};
let predicate_values = pruning_predicate.prune(&pruning_stats);

match predicate_values {
Ok(values) => {
// NB: false means don't scan row group
let num_pruned = values.iter().filter(|&v| !*v).count();
metrics.row_groups_pruned.add(num_pruned);
Box::new(move |_, i| values[i])
}
// stats filter array could not be built
// return a closure which will not filter out any row groups
Err(e) => {
debug!("Error evaluating row group predicate values {}", e);
metrics.predicate_evaluation_errors.add(1);
Box::new(|_r, _i| true)
}
}
) -> Box<dyn FnMut(&RowGroupMetaData, usize) -> bool> {
let pruning_predicate = pruning_predicate.clone();
Box::new(
move |row_group_metadata: &RowGroupMetaData, _i: usize| -> bool {
let parquet_schema = pruning_predicate.schema().as_ref();
let pruning_stats = RowGroupPruningStatistics {
row_group_metadata,
parquet_schema,
};
let predicate_values = pruning_predicate.prune(&pruning_stats);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is probably some overhead here related to calling prune once per row group vs calling it once per file, but I think it will be ok and we can further optimize it in the future if it shows up in traces.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah... I just stumbled across this whilst updating #1617 - in IOx we found the prune method had non-trivial overheads when run in a non-columnar fashion as this is doing. Admittedly that was likely with more containers than there are likely to be row groups in a file.

I do wonder if we need to take a step back from extending the parquet arrow-rs interface, and take a more holistic look at what the desired end-state should be. I worry a bit that we're painting ourselves into a corner, I'll see if I can get my thoughts into some tickets

Copy link
Member

@yjshen yjshen Mar 22, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about we change ReadOptions like:

pub struct ReadOptions {
    predicates: Vec<Box<dyn Fn(&[RowGroupMetaData]) -> vec<bool>>>,
}

Copy link
Contributor

@tustvold tustvold Mar 22, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would definitely be one option, but I'm not sure why it needs to be lazy. SerializedFileReader already exposes the ParquetMetadata which in turn exposes the [RowGroupMetaData]. Why wouldn't the caller just specify the row groups to scan, much like it specifies the column indexes for a projection? Would this not be both simpler and more flexible?

match predicate_values {
Ok(values) => {
// NB: false means don't scan row group
let num_pruned = values.iter().filter(|&v| !*v).count();
metrics.row_groups_pruned.add(num_pruned);
values[0]
}
// stats filter array could not be built
// return a closure which will not filter out any row groups
Err(e) => {
debug!("Error evaluating row group predicate values {}", e);
metrics.predicate_evaluation_errors.add(1);
true
}
}
},
)
}

#[allow(clippy::too_many_arguments)]
Expand Down Expand Up @@ -470,17 +461,20 @@ fn read_partition(
);
let object_reader =
object_store.file_reader(partitioned_file.file_meta.sized_file.clone())?;
let mut file_reader =
SerializedFileReader::new(ChunkObjectReader(object_reader))?;

let mut opt = ReadOptionsBuilder::new();
if let Some(pruning_predicate) = pruning_predicate {
let row_group_predicate = build_row_group_predicate(
opt = opt.with_predicate(build_row_group_predicate(
pruning_predicate,
file_metrics,
file_reader.metadata().row_groups(),
);
file_reader.filter_row_groups(&row_group_predicate);
));
}

let file_reader = SerializedFileReader::new_with_options(
ChunkObjectReader(object_reader),
opt.build(),
)?;

let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(file_reader));
let adapted_projections =
schema_adapter.map_projections(&arrow_reader.get_schema()?, projection)?;
Expand Down Expand Up @@ -1054,11 +1048,8 @@ mod tests {
vec![ParquetStatistics::int32(Some(11), Some(20), None, 0, false)],
);
let row_group_metadata = vec![rgm1, rgm2];
let row_group_predicate = build_row_group_predicate(
&pruning_predicate,
parquet_file_metrics(),
&row_group_metadata,
);
let mut row_group_predicate =
build_row_group_predicate(&pruning_predicate, parquet_file_metrics());
let row_group_filter = row_group_metadata
.iter()
.enumerate()
Expand Down Expand Up @@ -1087,11 +1078,8 @@ mod tests {
vec![ParquetStatistics::int32(Some(11), Some(20), None, 0, false)],
);
let row_group_metadata = vec![rgm1, rgm2];
let row_group_predicate = build_row_group_predicate(
&pruning_predicate,
parquet_file_metrics(),
&row_group_metadata,
);
let mut row_group_predicate =
build_row_group_predicate(&pruning_predicate, parquet_file_metrics());
let row_group_filter = row_group_metadata
.iter()
.enumerate()
Expand Down Expand Up @@ -1135,11 +1123,8 @@ mod tests {
],
);
let row_group_metadata = vec![rgm1, rgm2];
let row_group_predicate = build_row_group_predicate(
&pruning_predicate,
parquet_file_metrics(),
&row_group_metadata,
);
let mut row_group_predicate =
build_row_group_predicate(&pruning_predicate, parquet_file_metrics());
let row_group_filter = row_group_metadata
.iter()
.enumerate()
Expand All @@ -1153,11 +1138,8 @@ mod tests {
// this bypasses the entire predicate expression and no row groups are filtered out
let expr = col("c1").gt(lit(15)).or(col("c2").modulus(lit(2)));
let pruning_predicate = PruningPredicate::try_new(expr, schema)?;
let row_group_predicate = build_row_group_predicate(
&pruning_predicate,
parquet_file_metrics(),
&row_group_metadata,
);
let mut row_group_predicate =
build_row_group_predicate(&pruning_predicate, parquet_file_metrics());
let row_group_filter = row_group_metadata
.iter()
.enumerate()
Expand Down Expand Up @@ -1202,11 +1184,8 @@ mod tests {
let pruning_predicate = PruningPredicate::try_new(expr, schema)?;
let row_group_metadata = gen_row_group_meta_data_for_pruning_predicate();

let row_group_predicate = build_row_group_predicate(
&pruning_predicate,
parquet_file_metrics(),
&row_group_metadata,
);
let mut row_group_predicate =
build_row_group_predicate(&pruning_predicate, parquet_file_metrics());
let row_group_filter = row_group_metadata
.iter()
.enumerate()
Expand Down Expand Up @@ -1234,11 +1213,8 @@ mod tests {
let pruning_predicate = PruningPredicate::try_new(expr, schema)?;
let row_group_metadata = gen_row_group_meta_data_for_pruning_predicate();

let row_group_predicate = build_row_group_predicate(
&pruning_predicate,
parquet_file_metrics(),
&row_group_metadata,
);
let mut row_group_predicate =
build_row_group_predicate(&pruning_predicate, parquet_file_metrics());
let row_group_filter = row_group_metadata
.iter()
.enumerate()
Expand Down
2 changes: 1 addition & 1 deletion datafusion/src/physical_plan/sorts/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ fn write_sorted(

fn read_spill(sender: Sender<ArrowResult<RecordBatch>>, path: &Path) -> Result<()> {
let file = BufReader::new(File::open(&path)?);
let reader = FileReader::try_new(file)?;
let reader = FileReader::try_new(file, None)?;
for batch in reader {
sender
.blocking_send(batch)
Expand Down
Loading