Skip to content

Commit 7bec762

Browse files
Create SchemaAdapter trait to map table schema to file schemas (#1709)
* Create SchemaAdapter trait to map table schema to file schemas * Linting fix * Remove commented code
1 parent d01d8d5 commit 7bec762

File tree

6 files changed

+330
-58
lines changed

6 files changed

+330
-58
lines changed

datafusion/src/physical_plan/file_format/avro.rs

Lines changed: 63 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -165,13 +165,14 @@ impl ExecutionPlan for AvroExec {
165165
#[cfg(test)]
166166
#[cfg(feature = "avro")]
167167
mod tests {
168-
169168
use crate::datasource::file_format::{avro::AvroFormat, FileFormat};
170169
use crate::datasource::object_store::local::{
171170
local_object_reader_stream, local_unpartitioned_file, LocalFileSystem,
172171
};
173172
use crate::scalar::ScalarValue;
173+
use arrow::datatypes::{DataType, Field, Schema};
174174
use futures::StreamExt;
175+
use sqlparser::ast::ObjectType::Schema;
175176

176177
use super::*;
177178

@@ -228,6 +229,67 @@ mod tests {
228229
Ok(())
229230
}
230231

232+
#[tokio::test]
233+
async fn avro_exec_missing_column() -> Result<()> {
234+
let testdata = crate::test_util::arrow_test_data();
235+
let filename = format!("{}/avro/alltypes_plain.avro", testdata);
236+
let actual_schema = AvroFormat {}
237+
.infer_schema(local_object_reader_stream(vec![filename]))
238+
.await?;
239+
240+
let mut fields = actual_schema.fields().clone();
241+
fields.push(Field::new("missing_col", DataType::Int32, true));
242+
243+
let file_schema = Arc::new(Schema::new(fields));
244+
245+
let avro_exec = AvroExec::new(FileScanConfig {
246+
object_store: Arc::new(LocalFileSystem {}),
247+
file_groups: vec![vec![local_unpartitioned_file(filename.clone())]],
248+
file_schema,
249+
statistics: Statistics::default(),
250+
// Include the missing column in the projection
251+
projection: Some(vec![0, 1, 2, file_schema.fields().len()]),
252+
limit: None,
253+
table_partition_cols: vec![],
254+
});
255+
assert_eq!(avro_exec.output_partitioning().partition_count(), 1);
256+
257+
let mut results = avro_exec.execute(0).await.expect("plan execution failed");
258+
let batch = results
259+
.next()
260+
.await
261+
.expect("plan iterator empty")
262+
.expect("plan iterator returned an error");
263+
264+
let expected = vec![
265+
"+----+----------+-------------+-------------+",
266+
"| id | bool_col | tinyint_col | missing_col |",
267+
"+----+----------+-------------+-------------+",
268+
"| 4 | true | 0 | |",
269+
"| 5 | false | 1 | |",
270+
"| 6 | true | 0 | |",
271+
"| 7 | false | 1 | |",
272+
"| 2 | true | 0 | |",
273+
"| 3 | false | 1 | |",
274+
"| 0 | true | 0 | |",
275+
"| 1 | false | 1 | |",
276+
"+----+----------+-------------+-------------+",
277+
];
278+
279+
crate::assert_batches_eq!(expected, &[batch]);
280+
281+
let batch = results.next().await;
282+
assert!(batch.is_none());
283+
284+
let batch = results.next().await;
285+
assert!(batch.is_none());
286+
287+
let batch = results.next().await;
288+
assert!(batch.is_none());
289+
290+
Ok(())
291+
}
292+
231293
#[tokio::test]
232294
async fn avro_exec_with_partition() -> Result<()> {
233295
let testdata = crate::test_util::arrow_test_data();

datafusion/src/physical_plan/file_format/csv.rs

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,7 @@ impl ExecutionPlan for CsvExec {
170170
#[cfg(test)]
171171
mod tests {
172172
use super::*;
173+
use crate::test_util::aggr_test_schema_with_missing_col;
173174
use crate::{
174175
datasource::object_store::local::{local_unpartitioned_file, LocalFileSystem},
175176
scalar::ScalarValue,
@@ -269,6 +270,52 @@ mod tests {
269270
Ok(())
270271
}
271272

273+
#[tokio::test]
274+
async fn csv_exec_with_missing_column() -> Result<()> {
275+
let runtime = Arc::new(RuntimeEnv::default());
276+
let file_schema = aggr_test_schema_with_missing_col();
277+
let testdata = crate::test_util::arrow_test_data();
278+
let filename = "aggregate_test_100.csv";
279+
let path = format!("{}/csv/{}", testdata, filename);
280+
let csv = CsvExec::new(
281+
FileScanConfig {
282+
object_store: Arc::new(LocalFileSystem {}),
283+
file_schema,
284+
file_groups: vec![vec![local_unpartitioned_file(path)]],
285+
statistics: Statistics::default(),
286+
projection: None,
287+
limit: Some(5),
288+
table_partition_cols: vec![],
289+
},
290+
true,
291+
b',',
292+
);
293+
assert_eq!(14, csv.base_config.file_schema.fields().len());
294+
assert_eq!(14, csv.projected_schema.fields().len());
295+
assert_eq!(14, csv.schema().fields().len());
296+
297+
let mut it = csv.execute(0, runtime).await?;
298+
let batch = it.next().await.unwrap()?;
299+
assert_eq!(14, batch.num_columns());
300+
assert_eq!(5, batch.num_rows());
301+
302+
let expected = vec![
303+
"+----+----+-----+--------+------------+----------------------+-----+-------+------------+----------------------+-------------+---------------------+--------------------------------+-------------+",
304+
"| c1 | c2 | c3 | c4 | c5 | c6 | c7 | c8 | c9 | c10 | c11 | c12 | c13 | missing_col |",
305+
"+----+----+-----+--------+------------+----------------------+-----+-------+------------+----------------------+-------------+---------------------+--------------------------------+-------------+",
306+
"| c | 2 | 1 | 18109 | 2033001162 | -6513304855495910254 | 25 | 43062 | 1491205016 | 5863949479783605708 | 0.110830784 | 0.9294097332465232 | 6WfVFBVGJSQb7FhA7E0lBwdvjfZnSW | |",
307+
"| d | 5 | -40 | 22614 | 706441268 | -7542719935673075327 | 155 | 14337 | 3373581039 | 11720144131976083864 | 0.69632107 | 0.3114712539863804 | C2GT5KVyOPZpgKVl110TyZO0NcJ434 | |",
308+
"| b | 1 | 29 | -18218 | 994303988 | 5983957848665088916 | 204 | 9489 | 3275293996 | 14857091259186476033 | 0.53840446 | 0.17909035118828576 | AyYVExXK6AR2qUTxNZ7qRHQOVGMLcz | |",
309+
"| a | 1 | -85 | -15154 | 1171968280 | 1919439543497968449 | 77 | 52286 | 774637006 | 12101411955859039553 | 0.12285209 | 0.6864391962767343 | 0keZ5G8BffGwgF2RwQD59TFzMStxCB | |",
310+
"| b | 5 | -82 | 22080 | 1824882165 | 7373730676428214987 | 208 | 34331 | 3342719438 | 3330177516592499461 | 0.82634634 | 0.40975383525297016 | Ig1QcuKsjHXkproePdERo2w0mYzIqd | |",
311+
"+----+----+-----+--------+------------+----------------------+-----+-------+------------+----------------------+-------------+---------------------+--------------------------------+-------------+",
312+
];
313+
314+
crate::assert_batches_eq!(expected, &[batch]);
315+
316+
Ok(())
317+
}
318+
272319
#[tokio::test]
273320
async fn csv_exec_with_partition() -> Result<()> {
274321
let runtime = Arc::new(RuntimeEnv::default());

datafusion/src/physical_plan/file_format/json.rs

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,8 @@ impl ExecutionPlan for NdJsonExec {
137137

138138
#[cfg(test)]
139139
mod tests {
140+
use arrow::array::Array;
141+
use arrow::datatypes::{Field, Schema};
140142
use futures::StreamExt;
141143

142144
use crate::datasource::{
@@ -211,6 +213,47 @@ mod tests {
211213
Ok(())
212214
}
213215

216+
#[tokio::test]
217+
async fn nd_json_exec_file_with_missing_column() -> Result<()> {
218+
let runtime = Arc::new(RuntimeEnv::default());
219+
use arrow::datatypes::DataType;
220+
let path = format!("{}/1.json", TEST_DATA_BASE);
221+
222+
let actual_schema = infer_schema(path.clone()).await?;
223+
224+
let mut fields = actual_schema.fields().clone();
225+
fields.push(Field::new("missing_col", DataType::Int32, true));
226+
let missing_field_idx = fields.len() - 1;
227+
228+
let file_schema = Arc::new(Schema::new(fields));
229+
230+
let exec = NdJsonExec::new(FileScanConfig {
231+
object_store: Arc::new(LocalFileSystem {}),
232+
file_groups: vec![vec![local_unpartitioned_file(path.clone())]],
233+
file_schema,
234+
statistics: Statistics::default(),
235+
projection: None,
236+
limit: Some(3),
237+
table_partition_cols: vec![],
238+
});
239+
240+
let mut it = exec.execute(0, runtime).await?;
241+
let batch = it.next().await.unwrap()?;
242+
243+
assert_eq!(batch.num_rows(), 3);
244+
let values = batch
245+
.column(missing_field_idx)
246+
.as_any()
247+
.downcast_ref::<arrow::array::Int32Array>()
248+
.unwrap();
249+
assert_eq!(values.len(), 3);
250+
assert!(values.is_null(0));
251+
assert!(values.is_null(1));
252+
assert!(values.is_null(2));
253+
254+
Ok(())
255+
}
256+
214257
#[tokio::test]
215258
async fn nd_json_exec_file_projection() -> Result<()> {
216259
let runtime = Arc::new(RuntimeEnv::default());

datafusion/src/physical_plan/file_format/mod.rs

Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,11 +35,15 @@ pub use avro::AvroExec;
3535
pub use csv::CsvExec;
3636
pub use json::NdJsonExec;
3737

38+
use crate::error::DataFusionError;
3839
use crate::{
3940
datasource::{object_store::ObjectStore, PartitionedFile},
41+
error::Result,
4042
scalar::ScalarValue,
4143
};
44+
use arrow::array::new_null_array;
4245
use lazy_static::lazy_static;
46+
use log::info;
4347
use std::{
4448
collections::HashMap,
4549
fmt::{Display, Formatter, Result as FmtResult},
@@ -165,6 +169,87 @@ impl<'a> Display for FileGroupsDisplay<'a> {
165169
}
166170
}
167171

172+
/// A utility which can adapt file-level record batches to a table schema which may have a schema
173+
/// obtained from merging multiple file-level schemas.
174+
///
175+
/// This is useful for enabling schema evolution in partitioned datasets.
176+
///
177+
/// This has to be done in two stages.
178+
///
179+
/// 1. Before reading the file, we have to map projected column indexes from the table schema to
180+
/// the file schema.
181+
///
182+
/// 2. After reading a record batch we need to map the read columns back to the expected columns
183+
/// indexes and insert null-valued columns wherever the file schema was missing a colum present
184+
/// in the table schema.
185+
#[derive(Clone, Debug)]
186+
pub(crate) struct SchemaAdapter {
187+
/// Schema for the table
188+
table_schema: SchemaRef,
189+
}
190+
191+
impl SchemaAdapter {
192+
pub(crate) fn new(table_schema: SchemaRef) -> SchemaAdapter {
193+
Self { table_schema }
194+
}
195+
196+
/// Map projected column indexes to the file schema. This will fail if the table schema
197+
/// and the file schema contain a field with the same name and different types.
198+
pub fn map_projections(
199+
&self,
200+
file_schema: &Schema,
201+
projections: &[usize],
202+
) -> Result<Vec<usize>> {
203+
let mut mapped: Vec<usize> = vec![];
204+
for idx in projections {
205+
let field = self.table_schema.field(*idx);
206+
if let Ok(mapped_idx) = file_schema.index_of(field.name().as_str()) {
207+
if file_schema.field(mapped_idx).data_type() == field.data_type() {
208+
mapped.push(mapped_idx)
209+
} else {
210+
let msg = format!("Failed to map column projection for field {}. Incompatible data types {:?} and {:?}", field.name(), file_schema.field(mapped_idx).data_type(), field.data_type());
211+
info!("{}", msg);
212+
return Err(DataFusionError::Execution(msg));
213+
}
214+
}
215+
}
216+
Ok(mapped)
217+
}
218+
219+
/// Re-order projected columns by index in record batch to match table schema column ordering. If the record
220+
/// batch does not contain a column for an expected field, insert a null-valued column at the
221+
/// required column index.
222+
pub fn adapt_batch(
223+
&self,
224+
batch: RecordBatch,
225+
projections: &[usize],
226+
) -> Result<RecordBatch> {
227+
let batch_rows = batch.num_rows();
228+
229+
let batch_schema = batch.schema();
230+
231+
let mut cols: Vec<ArrayRef> = Vec::with_capacity(batch.columns().len());
232+
let batch_cols = batch.columns().to_vec();
233+
234+
for field_idx in projections {
235+
let table_field = &self.table_schema.fields()[*field_idx];
236+
if let Some((batch_idx, _name)) =
237+
batch_schema.column_with_name(table_field.name().as_str())
238+
{
239+
cols.push(batch_cols[batch_idx].clone());
240+
} else {
241+
cols.push(new_null_array(table_field.data_type(), batch_rows))
242+
}
243+
}
244+
245+
let projected_schema = Arc::new(self.table_schema.clone().project(projections)?);
246+
247+
let merged_batch = RecordBatch::try_new(projected_schema, cols)?;
248+
249+
Ok(merged_batch)
250+
}
251+
}
252+
168253
/// A helper that projects partition columns into the file record batches.
169254
///
170255
/// One interesting trick is the usage of a cache for the key buffers of the partition column
@@ -467,6 +552,61 @@ mod tests {
467552
crate::assert_batches_eq!(expected, &[projected_batch]);
468553
}
469554

555+
#[test]
556+
fn schema_adapter_adapt_projections() {
557+
let table_schema = Arc::new(Schema::new(vec![
558+
Field::new("c1", DataType::Utf8, true),
559+
Field::new("c2", DataType::Int64, true),
560+
Field::new("c3", DataType::Int8, true),
561+
]));
562+
563+
let file_schema = Schema::new(vec![
564+
Field::new("c1", DataType::Utf8, true),
565+
Field::new("c2", DataType::Int64, true),
566+
]);
567+
568+
let file_schema_2 = Arc::new(Schema::new(vec![
569+
Field::new("c3", DataType::Int8, true),
570+
Field::new("c2", DataType::Int64, true),
571+
]));
572+
573+
let file_schema_3 =
574+
Arc::new(Schema::new(vec![Field::new("c3", DataType::Float32, true)]));
575+
576+
let adapter = SchemaAdapter::new(table_schema);
577+
578+
let projections1: Vec<usize> = vec![0, 1, 2];
579+
let projections2: Vec<usize> = vec![2];
580+
581+
let mapped = adapter
582+
.map_projections(&file_schema, projections1.as_slice())
583+
.expect("mapping projections");
584+
585+
assert_eq!(mapped, vec![0, 1]);
586+
587+
let mapped = adapter
588+
.map_projections(&file_schema, projections2.as_slice())
589+
.expect("mapping projections");
590+
591+
assert!(mapped.is_empty());
592+
593+
let mapped = adapter
594+
.map_projections(&file_schema_2, projections1.as_slice())
595+
.expect("mapping projections");
596+
597+
assert_eq!(mapped, vec![1, 0]);
598+
599+
let mapped = adapter
600+
.map_projections(&file_schema_2, projections2.as_slice())
601+
.expect("mapping projections");
602+
603+
assert_eq!(mapped, vec![0]);
604+
605+
let mapped = adapter.map_projections(&file_schema_3, projections1.as_slice());
606+
607+
assert!(mapped.is_err());
608+
}
609+
470610
// sets default for configs that play no role in projections
471611
fn config_for_projection(
472612
file_schema: SchemaRef,

0 commit comments

Comments
 (0)