Skip to content

Commit bf7859e

Browse files
kosiewalamb
andauthored
Schema adapter helper (#16108)
* Add field casting utility functions and refactor schema mapping logic * Fix tests for field casting and schema mapping functionality * refactor: simplify SchemaMapping instantiation in DefaultSchemaAdapter * refactor: improve documentation for create_field_mapping and SchemaMapping::new functions * test: rename schema mapping test and add happy path scenario * trigger ci --------- Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
1 parent abbf73d commit bf7859e

File tree

1 file changed

+265
-27
lines changed

1 file changed

+265
-27
lines changed

datafusion/datasource/src/schema_adapter.rs

Lines changed: 265 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
2424
use arrow::array::{new_null_array, RecordBatch, RecordBatchOptions};
2525
use arrow::compute::{can_cast_types, cast};
26-
use arrow::datatypes::{Schema, SchemaRef};
26+
use arrow::datatypes::{Field, Schema, SchemaRef};
2727
use datafusion_common::{plan_err, ColumnStatistics};
2828
use std::fmt::Debug;
2929
use std::sync::Arc;
@@ -225,6 +225,25 @@ pub(crate) struct DefaultSchemaAdapter {
225225
projected_table_schema: SchemaRef,
226226
}
227227

228+
/// Checks if a file field can be cast to a table field
229+
///
230+
/// Returns Ok(true) if casting is possible, or an error explaining why casting is not possible
231+
pub(crate) fn can_cast_field(
232+
file_field: &Field,
233+
table_field: &Field,
234+
) -> datafusion_common::Result<bool> {
235+
if can_cast_types(file_field.data_type(), table_field.data_type()) {
236+
Ok(true)
237+
} else {
238+
plan_err!(
239+
"Cannot cast file schema field {} of type {:?} to table schema field of type {:?}",
240+
file_field.name(),
241+
file_field.data_type(),
242+
table_field.data_type()
243+
)
244+
}
245+
}
246+
228247
impl SchemaAdapter for DefaultSchemaAdapter {
229248
/// Map a column index in the table schema to a column index in a particular
230249
/// file schema
@@ -248,40 +267,53 @@ impl SchemaAdapter for DefaultSchemaAdapter {
248267
&self,
249268
file_schema: &Schema,
250269
) -> datafusion_common::Result<(Arc<dyn SchemaMapper>, Vec<usize>)> {
251-
let mut projection = Vec::with_capacity(file_schema.fields().len());
252-
let mut field_mappings = vec![None; self.projected_table_schema.fields().len()];
253-
254-
for (file_idx, file_field) in file_schema.fields.iter().enumerate() {
255-
if let Some((table_idx, table_field)) =
256-
self.projected_table_schema.fields().find(file_field.name())
257-
{
258-
match can_cast_types(file_field.data_type(), table_field.data_type()) {
259-
true => {
260-
field_mappings[table_idx] = Some(projection.len());
261-
projection.push(file_idx);
262-
}
263-
false => {
264-
return plan_err!(
265-
"Cannot cast file schema field {} of type {:?} to table schema field of type {:?}",
266-
file_field.name(),
267-
file_field.data_type(),
268-
table_field.data_type()
269-
)
270-
}
271-
}
272-
}
273-
}
270+
let (field_mappings, projection) = create_field_mapping(
271+
file_schema,
272+
&self.projected_table_schema,
273+
can_cast_field,
274+
)?;
274275

275276
Ok((
276-
Arc::new(SchemaMapping {
277-
projected_table_schema: Arc::clone(&self.projected_table_schema),
277+
Arc::new(SchemaMapping::new(
278+
Arc::clone(&self.projected_table_schema),
278279
field_mappings,
279-
}),
280+
)),
280281
projection,
281282
))
282283
}
283284
}
284285

286+
/// Helper function that creates field mappings between file schema and table schema
287+
///
288+
/// Maps columns from the file schema to their corresponding positions in the table schema,
289+
/// applying type compatibility checking via the provided predicate function.
290+
///
291+
/// Returns field mappings (for column reordering) and a projection (for field selection).
292+
pub(crate) fn create_field_mapping<F>(
293+
file_schema: &Schema,
294+
projected_table_schema: &SchemaRef,
295+
can_map_field: F,
296+
) -> datafusion_common::Result<(Vec<Option<usize>>, Vec<usize>)>
297+
where
298+
F: Fn(&Field, &Field) -> datafusion_common::Result<bool>,
299+
{
300+
let mut projection = Vec::with_capacity(file_schema.fields().len());
301+
let mut field_mappings = vec![None; projected_table_schema.fields().len()];
302+
303+
for (file_idx, file_field) in file_schema.fields.iter().enumerate() {
304+
if let Some((table_idx, table_field)) =
305+
projected_table_schema.fields().find(file_field.name())
306+
{
307+
if can_map_field(file_field, table_field)? {
308+
field_mappings[table_idx] = Some(projection.len());
309+
projection.push(file_idx);
310+
}
311+
}
312+
}
313+
314+
Ok((field_mappings, projection))
315+
}
316+
285317
/// The SchemaMapping struct holds a mapping from the file schema to the table
286318
/// schema and any necessary type conversions.
287319
///
@@ -304,6 +336,21 @@ pub struct SchemaMapping {
304336
field_mappings: Vec<Option<usize>>,
305337
}
306338

339+
impl SchemaMapping {
340+
/// Creates a new SchemaMapping instance
341+
///
342+
/// Initializes the field mappings needed to transform file data to the projected table schema
343+
pub fn new(
344+
projected_table_schema: SchemaRef,
345+
field_mappings: Vec<Option<usize>>,
346+
) -> Self {
347+
Self {
348+
projected_table_schema,
349+
field_mappings,
350+
}
351+
}
352+
}
353+
307354
impl SchemaMapper for SchemaMapping {
308355
/// Adapts a `RecordBatch` to match the `projected_table_schema` using the stored mapping and
309356
/// conversions.
@@ -462,4 +509,195 @@ mod tests {
462509
assert_eq!(table_col_stats[0], ColumnStatistics::new_unknown(),);
463510
assert_eq!(table_col_stats[1], ColumnStatistics::new_unknown(),);
464511
}
512+
513+
#[test]
514+
fn test_can_cast_field() {
515+
// Same type should work
516+
let from_field = Field::new("col", DataType::Int32, true);
517+
let to_field = Field::new("col", DataType::Int32, true);
518+
assert!(can_cast_field(&from_field, &to_field).unwrap());
519+
520+
// Casting Int32 to Float64 is allowed
521+
let from_field = Field::new("col", DataType::Int32, true);
522+
let to_field = Field::new("col", DataType::Float64, true);
523+
assert!(can_cast_field(&from_field, &to_field).unwrap());
524+
525+
// Casting Float64 to Utf8 should work (converts to string)
526+
let from_field = Field::new("col", DataType::Float64, true);
527+
let to_field = Field::new("col", DataType::Utf8, true);
528+
assert!(can_cast_field(&from_field, &to_field).unwrap());
529+
530+
// Binary to Utf8 is not supported - this is an example of a cast that should fail
531+
// Note: We use Binary instead of Utf8->Int32 because Arrow actually supports that cast
532+
let from_field = Field::new("col", DataType::Binary, true);
533+
let to_field = Field::new("col", DataType::Decimal128(10, 2), true);
534+
let result = can_cast_field(&from_field, &to_field);
535+
assert!(result.is_err());
536+
let error_msg = result.unwrap_err().to_string();
537+
assert!(error_msg.contains("Cannot cast file schema field col"));
538+
}
539+
540+
#[test]
541+
fn test_create_field_mapping() {
542+
// Define the table schema
543+
let table_schema = Arc::new(Schema::new(vec![
544+
Field::new("a", DataType::Int32, true),
545+
Field::new("b", DataType::Utf8, true),
546+
Field::new("c", DataType::Float64, true),
547+
]));
548+
549+
// Define file schema: different order, missing column c, and b has different type
550+
let file_schema = Schema::new(vec![
551+
Field::new("b", DataType::Float64, true), // Different type but castable to Utf8
552+
Field::new("a", DataType::Int32, true), // Same type
553+
Field::new("d", DataType::Boolean, true), // Not in table schema
554+
]);
555+
556+
// Custom can_map_field function that allows all mappings for testing
557+
let allow_all = |_: &Field, _: &Field| Ok(true);
558+
559+
// Test field mapping
560+
let (field_mappings, projection) =
561+
create_field_mapping(&file_schema, &table_schema, allow_all).unwrap();
562+
563+
// Expected:
564+
// - field_mappings[0] (a) maps to projection[1]
565+
// - field_mappings[1] (b) maps to projection[0]
566+
// - field_mappings[2] (c) is None (not in file)
567+
assert_eq!(field_mappings, vec![Some(1), Some(0), None]);
568+
assert_eq!(projection, vec![0, 1]); // Projecting file columns b, a
569+
570+
// Test with a failing mapper
571+
let fails_all = |_: &Field, _: &Field| Ok(false);
572+
let (field_mappings, projection) =
573+
create_field_mapping(&file_schema, &table_schema, fails_all).unwrap();
574+
575+
// Should have no mappings or projections if all cast checks fail
576+
assert_eq!(field_mappings, vec![None, None, None]);
577+
assert_eq!(projection, Vec::<usize>::new());
578+
579+
// Test with error-producing mapper
580+
let error_mapper = |_: &Field, _: &Field| plan_err!("Test error");
581+
let result = create_field_mapping(&file_schema, &table_schema, error_mapper);
582+
assert!(result.is_err());
583+
assert!(result.unwrap_err().to_string().contains("Test error"));
584+
}
585+
586+
#[test]
587+
fn test_schema_mapping_new() {
588+
// Define the projected table schema
589+
let projected_schema = Arc::new(Schema::new(vec![
590+
Field::new("a", DataType::Int32, true),
591+
Field::new("b", DataType::Utf8, true),
592+
]));
593+
594+
// Define field mappings from table to file
595+
let field_mappings = vec![Some(1), Some(0)];
596+
597+
// Create SchemaMapping manually
598+
let mapping =
599+
SchemaMapping::new(Arc::clone(&projected_schema), field_mappings.clone());
600+
601+
// Check that fields were set correctly
602+
assert_eq!(*mapping.projected_table_schema, *projected_schema);
603+
assert_eq!(mapping.field_mappings, field_mappings);
604+
605+
// Test with a batch to ensure it works properly
606+
let batch = RecordBatch::try_new(
607+
Arc::new(Schema::new(vec![
608+
Field::new("b_file", DataType::Utf8, true),
609+
Field::new("a_file", DataType::Int32, true),
610+
])),
611+
vec![
612+
Arc::new(arrow::array::StringArray::from(vec!["hello", "world"])),
613+
Arc::new(arrow::array::Int32Array::from(vec![1, 2])),
614+
],
615+
)
616+
.unwrap();
617+
618+
// Test that map_batch works with our manually created mapping
619+
let mapped_batch = mapping.map_batch(batch).unwrap();
620+
621+
// Verify the mapped batch has the correct schema and data
622+
assert_eq!(*mapped_batch.schema(), *projected_schema);
623+
assert_eq!(mapped_batch.num_columns(), 2);
624+
assert_eq!(mapped_batch.column(0).len(), 2); // a column
625+
assert_eq!(mapped_batch.column(1).len(), 2); // b column
626+
}
627+
628+
#[test]
629+
fn test_map_schema_error_path() {
630+
// Define the table schema
631+
let table_schema = Arc::new(Schema::new(vec![
632+
Field::new("a", DataType::Int32, true),
633+
Field::new("b", DataType::Utf8, true),
634+
Field::new("c", DataType::Decimal128(10, 2), true), // Use Decimal which has stricter cast rules
635+
]));
636+
637+
// Define file schema with incompatible type for column c
638+
let file_schema = Schema::new(vec![
639+
Field::new("a", DataType::Int32, true),
640+
Field::new("b", DataType::Float64, true), // Different but castable
641+
Field::new("c", DataType::Binary, true), // Not castable to Decimal128
642+
]);
643+
644+
// Create DefaultSchemaAdapter
645+
let adapter = DefaultSchemaAdapter {
646+
projected_table_schema: Arc::clone(&table_schema),
647+
};
648+
649+
// map_schema should error due to incompatible types
650+
let result = adapter.map_schema(&file_schema);
651+
assert!(result.is_err());
652+
let error_msg = result.unwrap_err().to_string();
653+
assert!(error_msg.contains("Cannot cast file schema field c"));
654+
}
655+
656+
#[test]
657+
fn test_map_schema_happy_path() {
658+
// Define the table schema
659+
let table_schema = Arc::new(Schema::new(vec![
660+
Field::new("a", DataType::Int32, true),
661+
Field::new("b", DataType::Utf8, true),
662+
Field::new("c", DataType::Decimal128(10, 2), true),
663+
]));
664+
665+
// Create DefaultSchemaAdapter
666+
let adapter = DefaultSchemaAdapter {
667+
projected_table_schema: Arc::clone(&table_schema),
668+
};
669+
670+
// Define compatible file schema (missing column c)
671+
let compatible_file_schema = Schema::new(vec![
672+
Field::new("a", DataType::Int64, true), // Can be cast to Int32
673+
Field::new("b", DataType::Float64, true), // Can be cast to Utf8
674+
]);
675+
676+
// Test successful schema mapping
677+
let (mapper, projection) = adapter.map_schema(&compatible_file_schema).unwrap();
678+
679+
// Verify field_mappings and projection created correctly
680+
assert_eq!(projection, vec![0, 1]); // Projecting a and b
681+
682+
// Verify the SchemaMapping works with actual data
683+
let file_batch = RecordBatch::try_new(
684+
Arc::new(compatible_file_schema.clone()),
685+
vec![
686+
Arc::new(arrow::array::Int64Array::from(vec![100, 200])),
687+
Arc::new(arrow::array::Float64Array::from(vec![1.5, 2.5])),
688+
],
689+
)
690+
.unwrap();
691+
692+
let mapped_batch = mapper.map_batch(file_batch).unwrap();
693+
694+
// Verify correct schema mapping
695+
assert_eq!(*mapped_batch.schema(), *table_schema);
696+
assert_eq!(mapped_batch.num_columns(), 3); // a, b, c
697+
698+
// Column c should be null since it wasn't in the file schema
699+
let c_array = mapped_batch.column(2);
700+
assert_eq!(c_array.len(), 2);
701+
assert_eq!(c_array.null_count(), 2);
702+
}
465703
}

0 commit comments

Comments
 (0)