Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 21 additions & 9 deletions datafusion/src/datasource/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

//! Parquet data source

use std::any::{Any, type_name};
use std::any::{type_name, Any};
use std::fs::File;
use std::sync::Arc;

Expand Down Expand Up @@ -221,7 +221,8 @@ impl ParquetTableDescriptor {
if let DataType::$DT = fields[i].data_type() {
let stats = stats
.as_any()
.downcast_ref::<ParquetPrimitiveStatistics<$PRIMITIVE_TYPE>>().ok_or_else(|| {
.downcast_ref::<ParquetPrimitiveStatistics<$PRIMITIVE_TYPE>>()
.ok_or_else(|| {
DataFusionError::Internal(format!(
"Failed to cast stats to {} stats",
type_name::<$PRIMITIVE_TYPE>()
Expand Down Expand Up @@ -254,9 +255,13 @@ impl ParquetTableDescriptor {
match stats.physical_type() {
PhysicalType::Boolean => {
if let DataType::Boolean = fields[i].data_type() {
let stats =
stats.as_any().downcast_ref::<ParquetBooleanStatistics>().ok_or_else(|| {
DataFusionError::Internal("Failed to cast stats to boolean stats".to_owned())
let stats = stats
.as_any()
.downcast_ref::<ParquetBooleanStatistics>()
.ok_or_else(|| {
DataFusionError::Internal(
"Failed to cast stats to boolean stats".to_owned(),
)
})?;
if let Some(max_value) = &mut max_values[i] {
if let Some(v) = stats.max_value {
Expand Down Expand Up @@ -296,9 +301,13 @@ impl ParquetTableDescriptor {
}
PhysicalType::ByteArray => {
if let DataType::Utf8 = fields[i].data_type() {
let stats =
stats.as_any().downcast_ref::<ParquetBinaryStatistics>().ok_or_else(|| {
DataFusionError::Internal("Failed to cast stats to binary stats".to_owned())
let stats = stats
.as_any()
.downcast_ref::<ParquetBinaryStatistics>()
.ok_or_else(|| {
DataFusionError::Internal(
"Failed to cast stats to binary stats".to_owned(),
)
})?;
if let Some(max_value) = &mut max_values[i] {
if let Some(v) = stats.max_value {
Expand Down Expand Up @@ -395,7 +404,10 @@ impl TableDescriptorBuilder for ParquetTableDescriptor {
};

Ok(FileAndSchema {
file: PartitionedFile { path: path.to_owned(), statistics },
file: PartitionedFile {
path: path.to_owned(),
statistics,
},
schema,
})
}
Expand Down
4 changes: 2 additions & 2 deletions datafusion/src/execution/dataframe_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@

use std::sync::{Arc, Mutex};

use arrow::io::print;
use arrow::record_batch::RecordBatch;
use crate::error::Result;
use crate::execution::context::{ExecutionContext, ExecutionContextState};
use crate::logical_plan::{
Expand All @@ -31,6 +29,8 @@ use crate::{
dataframe::*,
physical_plan::{collect, collect_partitioned},
};
use arrow::io::print;
use arrow::record_batch::RecordBatch;

use crate::physical_plan::{
execute_stream, execute_stream_partitioned, ExecutionPlan, SendableRecordBatchStream,
Expand Down
7 changes: 2 additions & 5 deletions datafusion/src/physical_plan/analyze.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ use arrow::{datatypes::SchemaRef, record_batch::RecordBatch};
use futures::StreamExt;

use super::{stream::RecordBatchReceiverStream, Distribution, SendableRecordBatchStream};
use async_trait::async_trait;
use arrow::array::MutableUtf8Array;
use async_trait::async_trait;

/// `EXPLAIN ANALYZE` execution plan operator. This operator runs its input,
/// discards the results, and then prints out an annotated plan with metrics
Expand Down Expand Up @@ -182,10 +182,7 @@ impl ExecutionPlan for AnalyzeExec {

let maybe_batch = RecordBatch::try_new(
captured_schema,
vec![
type_builder.into_arc(),
plan_builder.into_arc(),
],
vec![type_builder.into_arc(), plan_builder.into_arc()],
);
// again ignore error
tx.send(maybe_batch).await.ok();
Expand Down
4 changes: 1 addition & 3 deletions datafusion/src/physical_plan/datetime_expressions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,7 @@ use crate::{
use arrow::{
array::*,
compute::cast,
datatypes::{
DataType, TimeUnit,
},
datatypes::{DataType, TimeUnit},
temporal_conversions::utf8_to_timestamp_ns_scalar,
types::NativeType,
};
Expand Down
6 changes: 2 additions & 4 deletions datafusion/src/physical_plan/expressions/in_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,17 +175,15 @@ fn in_list_primitive<T: NativeType>(
array: &PrimitiveArray<T>,
values: &[T],
) -> Result<BooleanArray> {
compare_primitive_op_scalar!(array, values, |x, v| v
.contains(&x))
compare_primitive_op_scalar!(array, values, |x, v| v.contains(&x))
}

// whether each value on the left (can be null) is contained in the non-null list
fn not_in_list_primitive<T: NativeType>(
array: &PrimitiveArray<T>,
values: &[T],
) -> Result<BooleanArray> {
compare_primitive_op_scalar!(array, values, |x, v| !v
.contains(&x))
compare_primitive_op_scalar!(array, values, |x, v| !v.contains(&x))
}

// whether each value on the left (can be null) is contained in the non-null list
Expand Down
2 changes: 1 addition & 1 deletion datafusion/src/physical_plan/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ pub struct ParquetExec {
pub schema: Arc<Schema>,
/// Projection for which columns to load
projection: Vec<usize>,
/// Batch size
/// Batch size
batch_size: usize,
/// Statistics for the data set (sum of statistics for all partitions)
statistics: Statistics,
Expand Down
Loading