Skip to content

Commit

Permalink
DRI refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Oct 16, 2024
1 parent 97b686a commit a20ac87
Showing 1 changed file with 35 additions and 47 deletions.
82 changes: 35 additions & 47 deletions datafusion/core/src/datasource/file_format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ use crate::error::Result;
use crate::execution::context::SessionState;
use crate::physical_plan::{ExecutionPlan, Statistics};

use arrow_schema::{DataType, Field, Schema};
use arrow_schema::{DataType, Field, FieldRef, Schema};
use datafusion_common::file_options::file_type::FileType;
use datafusion_common::{internal_err, not_impl_err, GetExt};
use datafusion_expr::Expr;
Expand Down Expand Up @@ -235,6 +235,12 @@ pub fn file_type_to_format(
}
}

/// Create a new field with the specified data type, copying the other
/// properties from the input field
fn field_with_new_type(field: &FieldRef, new_type: DataType) -> FieldRef {
Arc::new(field.as_ref().clone().with_data_type(new_type))
}

/// Transform a schema to use view types for Utf8 and Binary
///
/// See [parquet::ParquetFormat::force_view_types] for details
Expand All @@ -243,14 +249,12 @@ pub fn transform_schema_to_view(schema: &Schema) -> Schema {
.fields
.iter()
.map(|field| match field.data_type() {
DataType::Utf8 | DataType::LargeUtf8 => Arc::new(
Field::new(field.name(), DataType::Utf8View, field.is_nullable())
.with_metadata(field.metadata().to_owned()),
),
DataType::Binary | DataType::LargeBinary => Arc::new(
Field::new(field.name(), DataType::BinaryView, field.is_nullable())
.with_metadata(field.metadata().to_owned()),
),
DataType::Utf8 | DataType::LargeUtf8 => {
field_with_new_type(field, DataType::Utf8View)
}
DataType::Binary | DataType::LargeBinary => {
field_with_new_type(field, DataType::BinaryView)
}
_ => field.clone(),
})
.collect();
Expand All @@ -276,6 +280,7 @@ pub(crate) fn coerce_file_schema_to_view_type(
(f.name(), dt)
})
.collect();

if !transform {
return None;
}
Expand All @@ -285,14 +290,13 @@ pub(crate) fn coerce_file_schema_to_view_type(
.iter()
.map(
|field| match (table_fields.get(field.name()), field.data_type()) {
(Some(DataType::Utf8View), DataType::Utf8)
| (Some(DataType::Utf8View), DataType::LargeUtf8) => Arc::new(
Field::new(field.name(), DataType::Utf8View, field.is_nullable()),
),
(Some(DataType::BinaryView), DataType::Binary)
| (Some(DataType::BinaryView), DataType::LargeBinary) => Arc::new(
Field::new(field.name(), DataType::BinaryView, field.is_nullable()),
),
(Some(DataType::Utf8View), DataType::Utf8 | DataType::LargeUtf8) => {
field_with_new_type(field, DataType::Utf8View)
}
(
Some(DataType::BinaryView),
DataType::Binary | DataType::LargeBinary,
) => field_with_new_type(field, DataType::BinaryView),
_ => field.clone(),
},
)
Expand All @@ -310,18 +314,9 @@ pub fn transform_binary_to_string(schema: &Schema) -> Schema {
.fields
.iter()
.map(|field| match field.data_type() {
DataType::Binary => Arc::new(
Field::new(field.name(), DataType::Utf8, field.is_nullable())
.with_metadata(field.metadata().to_owned()),
),
DataType::LargeBinary => Arc::new(
Field::new(field.name(), DataType::LargeUtf8, field.is_nullable())
.with_metadata(field.metadata().to_owned()),
),
DataType::BinaryView => Arc::new(
Field::new(field.name(), DataType::BinaryView, field.is_nullable())
.with_metadata(field.metadata().to_owned()),
),
DataType::Binary => field_with_new_type(field, DataType::Utf8),
DataType::LargeBinary => field_with_new_type(field, DataType::LargeUtf8),
DataType::BinaryView => field_with_new_type(field, DataType::Utf8View),
_ => field.clone(),
})
.collect();
Expand All @@ -347,35 +342,28 @@ pub(crate) fn coerce_file_schema_to_string_type(
.map(
|field| match (table_fields.get(field.name()), field.data_type()) {
// table schema uses string type, coerce the file schema to use string type
(Some(DataType::Utf8),
DataType::Binary | DataType::LargeBinary | DataType::BinaryView) => {
(
Some(DataType::Utf8),
DataType::Binary | DataType::LargeBinary | DataType::BinaryView,
) => {
transform = true;
Arc::new(Field::new(
field.name(),
DataType::Utf8,
field.is_nullable(),
))
field_with_new_type(field, DataType::Utf8)
}
// table schema uses large string type, coerce the file schema to use large string type
(Some(DataType::LargeUtf8), DataType::Binary | DataType::LargeBinary | DataType::BinaryView) => {
(
Some(DataType::LargeUtf8),
DataType::Binary | DataType::LargeBinary | DataType::BinaryView,
) => {
transform = true;
Arc::new(Field::new(
field.name(),
DataType::LargeUtf8,
field.is_nullable(),
))
field_with_new_type(field, DataType::LargeUtf8)
}
// table schema uses string view type, coerce the file schema to use view type
(
Some(DataType::Utf8View),
DataType::Binary | DataType::LargeBinary | DataType::BinaryView,
) => {
transform = true;
Arc::new(Field::new(
field.name(),
DataType::Utf8View,
field.is_nullable(),
))
field_with_new_type(field, DataType::Utf8View)
}
_ => field.clone(),
},
Expand Down

0 comments on commit a20ac87

Please sign in to comment.