Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions ballista/rust/core/src/serde/physical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,15 +143,15 @@ impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {
let projection = scan.projection.iter().map(|i| *i as usize).collect();
let statistics = convert_required!(scan.statistics)?;

Ok(Arc::new(ParquetExec::new(
Ok(Arc::new(ParquetExec::new_for_deserialization(
Arc::new(LocalFileSystem {}),
scan.file_groups
.iter()
.map(|p| p.into())
.collect::<Vec<Vec<PartitionedFile>>>(),
statistics,
schema,
Some(projection),
projection,
// TODO predicate should be de-serialized
None,
scan.batch_size as usize,
Expand Down
46 changes: 46 additions & 0 deletions datafusion/src/physical_plan/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,52 @@ impl ParquetExec {
}
}

/// Create a new Parquet reader execution plan for deserialization.
#[allow(clippy::too_many_arguments)]
pub fn new_for_deserialization(
object_store: Arc<dyn ObjectStore>,
file_groups: Vec<Vec<PartitionedFile>>,
statistics: Statistics,
schema: SchemaRef,
projection: Vec<usize>,
predicate: Option<Expr>,
batch_size: usize,
limit: Option<usize>,
) -> Self {
debug!("Creating ParquetExec, files: {:?}, projection {:?}, predicate: {:?}, limit: {:?}",
file_groups, projection, predicate, limit);

let metrics = ExecutionPlanMetricsSet::new();
let predicate_creation_errors =
MetricBuilder::new(&metrics).global_counter("num_predicate_creation_errors");

let predicate_builder = predicate.and_then(|predicate_expr| {
match PruningPredicate::try_new(&predicate_expr, schema.clone()) {
Ok(predicate_builder) => Some(predicate_builder),
Err(e) => {
debug!(
"Could not create pruning predicate for {:?}: {}",
predicate_expr, e
);
predicate_creation_errors.add(1);
None
}
}
});

Self {
object_store,
file_groups,
schema,
projection,
metrics,
predicate_builder,
batch_size,
statistics,
limit,
}
}

fn project(
projection: &[usize],
schema: SchemaRef,
Expand Down