-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Description
Describe the bug
The schema adaption logic added in #1709 misbehaves for CSV data. In particular it incorrectly assumes that it can create a schema for the entire dataset that is a superset of those of the individual files, and that the CSV reader will pad any missing columns with nulls, and reorder those that appear in a different order.
In reality the CSV reader does not handle missing or reordered columns, it only accidentally works when the file schema happens to be an exact prefix of the aggregate schema. This was relying on an accidental quirk of the arrow reader prior to 30.0.0, after this the arrow reader returns an error as the schema does not match.
To Reproduce
Both of the following tests fail on current master
#[tokio::test]
async fn csv_schema_reordered() -> Result<()> {
use object_store::path::Path;
let session_ctx = SessionContext::new();
let store = InMemory::new();
let data = bytes::Bytes::from("a,b\n1,2\n3,4");
store.put(&Path::from("a.csv"), data).await.unwrap();
let data = bytes::Bytes::from("b,a\n1,2\n3,4");
store.put(&Path::from("b.csv"), data).await.unwrap();
session_ctx
.runtime_env()
.register_object_store("memory", "", Arc::new(store));
let df = session_ctx
.read_csv("memory:///", CsvReadOptions::new())
.await
.unwrap();
let result = df.collect().await.unwrap();
let expected = vec![
"+---+---+",
"| a | b |",
"+---+---+",
"| 1 | 2 |",
"| 2 | 1 |",
"| 3 | 4 |",
"| 4 | 3 |",
"+---+---+",
];
crate::assert_batches_eq!(expected, &result);
Ok(())
}
#[tokio::test]
async fn csv_schema_extra_column() -> Result<()> {
use object_store::path::Path;
let session_ctx = SessionContext::new();
let store = InMemory::new();
let data = bytes::Bytes::from("a,b\n1,2\n3,4");
store.put(&Path::from("a.csv"), data).await.unwrap();
let data = bytes::Bytes::from("a,c\n5,6\n7,8");
store.put(&Path::from("b.csv"), data).await.unwrap();
session_ctx
.runtime_env()
.register_object_store("memory", "", Arc::new(store));
let df = session_ctx
.read_csv("memory:///", CsvReadOptions::new())
.await
.unwrap();
let result = df.collect().await.unwrap();
let expected = vec![
"+---+---+---+",
"| a | b | c |",
"+---+---+---+",
"| 1 | 2 | |",
"| 3 | 4 | |",
"| 5 | | 6 |",
"| 7 | | 8 |",
"+---+---+---+",
];
crate::assert_batches_eq!(expected, &result);
Ok(())
}
Expected behavior
I think both of the following would be valid:
- Don't perform schema adaption for CSV, as they aren't a self-describing format like JSON or parquet, instead returning an error if the schema don't match
- Correctly infer the schema on a per-file basis, and use this when reading
Additional context
#4818 (comment)