Skip to content

Commit

Permalink
Avoid per-batch field lookups in SchemaMapping (#6563)
Browse files Browse the repository at this point in the history
  • Loading branch information
tustvold authored Jun 7, 2023
1 parent 25b6556 commit 8f7f76d
Showing 1 changed file with 39 additions and 83 deletions.
122 changes: 39 additions & 83 deletions datafusion/core/src/datasource/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ use crate::{
use datafusion_common::tree_node::{TreeNode, VisitRecursion};
use datafusion_physical_expr::expressions::Column;

use arrow::compute::cast;
use log::{debug, warn};
use object_store::path::Path;
use object_store::ObjectMeta;
Expand Down Expand Up @@ -425,84 +426,51 @@ impl SchemaAdapter {
file_schema: &Schema,
) -> Option<usize> {
let field = self.table_schema.field(index);
file_schema.index_of(field.name()).ok()
}

/// Re-order projected columns by index in record batch to match table schema column ordering. If the record
/// batch does not contain a column for an expected field, insert a null-valued column at the
/// required column index.
#[allow(dead_code)]
pub fn adapt_batch(
&self,
batch: RecordBatch,
projections: &[usize],
) -> Result<RecordBatch> {
let batch_rows = batch.num_rows();

let batch_schema = batch.schema();

let mut cols: Vec<ArrayRef> = Vec::with_capacity(batch.columns().len());
let batch_cols = batch.columns().to_vec();

for field_idx in projections {
let table_field = &self.table_schema.fields()[*field_idx];
if let Some((batch_idx, _name)) =
batch_schema.column_with_name(table_field.name().as_str())
{
cols.push(batch_cols[batch_idx].clone());
} else {
cols.push(new_null_array(table_field.data_type(), batch_rows))
}
}

let projected_schema = Arc::new(self.table_schema.clone().project(projections)?);

// Necessary to handle empty batches
let options = RecordBatchOptions::new().with_row_count(Some(batch.num_rows()));

Ok(RecordBatch::try_new_with_options(
projected_schema,
cols,
&options,
)?)
Some(file_schema.fields.find(field.name())?.0)
}

/// Creates a `SchemaMapping` that can be used to cast or map the columns from the file schema to the table schema.
///
/// If the provided `file_schema` contains columns of a different type to the expected
/// `table_schema`, the method will attempt to cast the array data from the file schema
/// to the table schema where possible.
///
/// Returns a [`SchemaMapping`] that can be applied to the output batch
/// along with an ordered list of columns to project from the file
pub fn map_schema(
&self,
file_schema: &Schema,
) -> Result<(SchemaMapping, Vec<usize>)> {
let mut field_mappings: Vec<bool> = vec![false; self.table_schema.fields().len()];
let mut mapped: Vec<usize> = vec![];

for (idx, field) in self.table_schema.fields().iter().enumerate() {
if let Ok(mapped_idx) = file_schema.index_of(field.name().as_str()) {
if can_cast_types(
file_schema.field(mapped_idx).data_type(),
field.data_type(),
) {
field_mappings[idx] = true;
mapped.push(mapped_idx);
} else {
return Err(DataFusionError::Plan(format!(
"Cannot cast file schema field {} of type {:?} to table schema field of type {:?}",
field.name(),
file_schema.field(mapped_idx).data_type(),
field.data_type()
)));
let mut projection = Vec::with_capacity(file_schema.fields().len());
let mut field_mappings = vec![None; self.table_schema.fields().len()];

for (file_idx, file_field) in file_schema.fields.iter().enumerate() {
if let Some((table_idx, table_field)) =
self.table_schema.fields().find(file_field.name())
{
match can_cast_types(file_field.data_type(), table_field.data_type()) {
true => {
field_mappings[table_idx] = Some(projection.len());
projection.push(file_idx);
}
false => {
return Err(DataFusionError::Plan(format!(
"Cannot cast file schema field {} of type {:?} to table schema field of type {:?}",
file_field.name(),
file_field.data_type(),
table_field.data_type()
)))
}
}
}
}

Ok((
SchemaMapping {
table_schema: self.table_schema.clone(),
field_mappings,
},
mapped,
projection,
))
}
}
Expand All @@ -513,44 +481,32 @@ impl SchemaAdapter {
pub struct SchemaMapping {
/// The schema of the table. This is the expected schema after conversion and it should match the schema of the query result.
table_schema: SchemaRef,
/// In `field_mappings`, a `true` value indicates that the corresponding field in `table_schema` exists in `file_schema`,
/// while a `false` value indicates that the corresponding field does not exist.
field_mappings: Vec<bool>,
/// Mapping from field index in `table_schema` to index in projected file_schema
field_mappings: Vec<Option<usize>>,
}

impl SchemaMapping {
/// Adapts a `RecordBatch` to match the `table_schema` using the stored mapping and conversions.
fn map_batch(&self, batch: RecordBatch) -> Result<RecordBatch> {
let batch_rows = batch.num_rows();
let batch_cols = batch.columns().to_vec();
let batch_schema = batch.schema();

let cols = self
.table_schema
.fields()
.iter()
.enumerate()
.map(|(idx, field)| {
if self.field_mappings[idx] {
match batch_schema.index_of(field.name()) {
Ok(batch_idx) => arrow::compute::cast(
&batch_cols[batch_idx],
field.data_type(),
)
.map_err(DataFusionError::ArrowError),
Err(_) => Ok(new_null_array(field.data_type(), batch_rows)),
}
} else {
Ok(new_null_array(field.data_type(), batch_rows))
}
.zip(&self.field_mappings)
.map(|(field, file_idx)| match file_idx {
Some(batch_idx) => cast(&batch_cols[*batch_idx], field.data_type()),
None => Ok(new_null_array(field.data_type(), batch_rows)),
})
.collect::<Result<Vec<_>>>()?;
.collect::<Result<Vec<_>, _>>()?;

// Necessary to handle empty batches
let options = RecordBatchOptions::new().with_row_count(Some(batch.num_rows()));

let record_batch =
RecordBatch::try_new_with_options(self.table_schema.clone(), cols, &options)?;
let schema = self.table_schema.clone();
let record_batch = RecordBatch::try_new_with_options(schema, cols, &options)?;
Ok(record_batch)
}
}
Expand Down Expand Up @@ -1246,7 +1202,7 @@ mod tests {
let indices = vec![1, 2, 4];
let schema = SchemaRef::from(table_schema.project(&indices).unwrap());
let adapter = SchemaAdapter::new(schema);
let (mapping, _) = adapter.map_schema(&file_schema).unwrap();
let (mapping, projection) = adapter.map_schema(&file_schema).unwrap();

let id = Int32Array::from(vec![Some(1), Some(2), Some(3)]);
let c1 = BooleanArray::from(vec![Some(true), Some(false), Some(true)]);
Expand All @@ -1268,9 +1224,9 @@ mod tests {
],
)
.unwrap();

let rows_num = batch.num_rows();
let mapped_batch = mapping.map_batch(batch).unwrap();
let projected = batch.project(&projection).unwrap();
let mapped_batch = mapping.map_batch(projected).unwrap();

assert_eq!(
mapped_batch.schema(),
Expand Down

0 comments on commit 8f7f76d

Please sign in to comment.