Skip to content

Commit 538a90c

Browse files
FrankPortmanrtyler
authored andcommitted
add merge test
Signed-off-by: Frank Portman <frank1214@gmail.com>
1 parent 74139ff commit 538a90c

File tree

2 files changed

+209
-0
lines changed

2 files changed

+209
-0
lines changed

crates/core/src/operations/merge/mod.rs

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1810,6 +1810,108 @@ mod tests {
18101810

18111811
assert_merge(table, metrics).await;
18121812
}
1813+
#[tokio::test]
1814+
async fn test_merge_preserves_nullability_without_schema_merge() {
1815+
// Test that nullability constraints are preserved when merge_schema is false (default)
1816+
let delta_schema = vec![
1817+
StructField::new(
1818+
"id".to_string(),
1819+
DataType::Primitive(PrimitiveType::String),
1820+
false, // non-nullable
1821+
),
1822+
StructField::new(
1823+
"value".to_string(),
1824+
DataType::Primitive(PrimitiveType::Integer),
1825+
false, // non-nullable
1826+
),
1827+
StructField::new(
1828+
"modified".to_string(),
1829+
DataType::Primitive(PrimitiveType::String),
1830+
true, // nullable
1831+
),
1832+
];
1833+
1834+
let table = DeltaOps::new_in_memory()
1835+
.create()
1836+
.with_save_mode(SaveMode::ErrorIfExists)
1837+
.with_columns(delta_schema)
1838+
.await
1839+
.unwrap();
1840+
1841+
// Verify initial schema nullability
1842+
let initial_fields: Vec<_> = table
1843+
.snapshot()
1844+
.unwrap()
1845+
.schema()
1846+
.fields()
1847+
.cloned()
1848+
.collect();
1849+
assert!(
1850+
!initial_fields[0].is_nullable(),
1851+
"id should be non-nullable"
1852+
);
1853+
assert!(
1854+
!initial_fields[1].is_nullable(),
1855+
"value should be non-nullable"
1856+
);
1857+
assert!(
1858+
initial_fields[2].is_nullable(),
1859+
"modified should be nullable"
1860+
);
1861+
1862+
// Source data with all nullable fields (typical from external sources)
1863+
let source_schema = Arc::new(ArrowSchema::new(vec![
1864+
Field::new("id", ArrowDataType::Utf8, true), // nullable in source
1865+
Field::new("value", ArrowDataType::Int32, true), // nullable in source
1866+
Field::new("modified", ArrowDataType::Utf8, true), // nullable in source
1867+
]));
1868+
1869+
let ctx = SessionContext::new();
1870+
let batch = RecordBatch::try_new(
1871+
source_schema,
1872+
vec![
1873+
Arc::new(arrow::array::StringArray::from(vec![Some("A"), Some("B")])),
1874+
Arc::new(arrow::array::Int32Array::from(vec![Some(1), Some(2)])),
1875+
Arc::new(arrow::array::StringArray::from(vec![
1876+
Some("2021-02-02"),
1877+
None,
1878+
])),
1879+
],
1880+
)
1881+
.unwrap();
1882+
let source = ctx.read_batch(batch).unwrap();
1883+
1884+
let (merged_table, _) = DeltaOps(table)
1885+
.merge(source, col("target.id").eq(col("source.id")))
1886+
.with_source_alias("source")
1887+
.with_target_alias("target")
1888+
// merge_schema is false by default
1889+
.when_not_matched_insert(|insert| {
1890+
insert
1891+
.set("id", col("source.id"))
1892+
.set("value", col("source.value"))
1893+
.set("modified", col("source.modified"))
1894+
})
1895+
.unwrap()
1896+
.await
1897+
.unwrap();
1898+
1899+
// Verify schema nullability is preserved after merge
1900+
let final_fields: Vec<_> = merged_table.snapshot().unwrap().schema().fields().collect();
1901+
assert!(
1902+
!final_fields[0].is_nullable(),
1903+
"id should remain non-nullable after merge"
1904+
);
1905+
assert!(
1906+
!final_fields[1].is_nullable(),
1907+
"value should remain non-nullable after merge"
1908+
);
1909+
assert!(
1910+
final_fields[2].is_nullable(),
1911+
"modified should remain nullable after merge"
1912+
);
1913+
}
1914+
18131915
#[tokio::test]
18141916
async fn test_merge_with_schema_merge_no_change_of_schema() {
18151917
let (table, _) = setup().await;

crates/core/src/operations/write/mod.rs

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2284,4 +2284,111 @@ mod tests {
22842284

22852285
Ok(())
22862286
}
2287+
2288+
#[tokio::test]
2289+
async fn test_schema_preserved_with_replace_where() -> TestResult {
2290+
// Test that schema is preserved when using overwrite with predicate (replaceWhere)
2291+
use arrow_array::{BooleanArray, Int32Array, Int64Array, RecordBatch, StringArray};
2292+
use arrow_schema::{DataType, Field, Schema as ArrowSchema};
2293+
use std::sync::Arc;
2294+
2295+
// Create initial table with mixed nullability
2296+
let initial_schema = Arc::new(ArrowSchema::new(vec![
2297+
Field::new("id", DataType::Int64, false), // non-nullable
2298+
Field::new("name", DataType::Utf8, true), // nullable
2299+
Field::new("active", DataType::Boolean, false), // non-nullable
2300+
Field::new("count", DataType::Int32, false), // non-nullable
2301+
]));
2302+
2303+
let initial_batch = RecordBatch::try_new(
2304+
initial_schema.clone(),
2305+
vec![
2306+
Arc::new(Int64Array::from(vec![1, 2, 3, 4, 5])),
2307+
Arc::new(StringArray::from(vec![
2308+
Some("Alice"),
2309+
Some("Bob"),
2310+
None,
2311+
Some("David"),
2312+
Some("Eve"),
2313+
])),
2314+
Arc::new(BooleanArray::from(vec![true, false, true, false, true])),
2315+
Arc::new(Int32Array::from(vec![10, 20, 30, 40, 50])),
2316+
],
2317+
)?;
2318+
2319+
let table = DeltaOps::new_in_memory()
2320+
.write(vec![initial_batch])
2321+
.with_save_mode(SaveMode::Overwrite)
2322+
.await?;
2323+
2324+
// Capture initial schema
2325+
let initial_fields: Vec<_> = table
2326+
.snapshot()
2327+
.unwrap()
2328+
.schema()
2329+
.fields()
2330+
.cloned()
2331+
.collect();
2332+
2333+
// Create new data with all nullable fields (typical from Pandas)
2334+
let new_schema = Arc::new(ArrowSchema::new(vec![
2335+
Field::new("id", DataType::Int64, true), // nullable in new data
2336+
Field::new("name", DataType::Utf8, true), // nullable
2337+
Field::new("active", DataType::Boolean, true), // nullable
2338+
Field::new("count", DataType::Int32, true), // nullable
2339+
]));
2340+
2341+
let replacement_batch = RecordBatch::try_new(
2342+
new_schema.clone(),
2343+
vec![
2344+
Arc::new(Int64Array::from(vec![Some(2), Some(4)])), // Replace ids 2 and 4
2345+
Arc::new(StringArray::from(vec![Some("Bob2"), Some("David2")])),
2346+
Arc::new(BooleanArray::from(vec![Some(true), Some(true)])),
2347+
Arc::new(Int32Array::from(vec![Some(200), Some(400)])),
2348+
],
2349+
)?;
2350+
2351+
// Use replaceWhere to selectively overwrite
2352+
let table = DeltaOps(table)
2353+
.write(vec![replacement_batch])
2354+
.with_save_mode(SaveMode::Overwrite)
2355+
.with_replace_where("id = 2 OR id = 4")
2356+
.await?;
2357+
2358+
// Verify schema is preserved
2359+
let final_fields: Vec<_> = table.snapshot().unwrap().schema().fields().collect();
2360+
2361+
for (i, field) in final_fields.iter().enumerate() {
2362+
assert_eq!(
2363+
field.is_nullable(),
2364+
initial_fields[i].is_nullable(),
2365+
"Field '{}' nullability should be preserved with replaceWhere",
2366+
field.name()
2367+
);
2368+
}
2369+
2370+
// Now test that constraints are still enforced with replaceWhere
2371+
let invalid_batch = RecordBatch::try_new(
2372+
new_schema,
2373+
vec![
2374+
Arc::new(Int64Array::from(vec![None, Some(3)])), // NULL in non-nullable id!
2375+
Arc::new(StringArray::from(vec![Some("Invalid"), Some("Valid")])),
2376+
Arc::new(BooleanArray::from(vec![Some(false), Some(false)])),
2377+
Arc::new(Int32Array::from(vec![Some(999), Some(333)])),
2378+
],
2379+
)?;
2380+
2381+
let result = DeltaOps(table)
2382+
.write(vec![invalid_batch])
2383+
.with_save_mode(SaveMode::Overwrite)
2384+
.with_replace_where("id = 1 OR id = 3")
2385+
.await;
2386+
2387+
assert!(
2388+
result.is_err(),
2389+
"replaceWhere should still enforce non-nullable constraints"
2390+
);
2391+
2392+
Ok(())
2393+
}
22872394
}

0 commit comments

Comments
 (0)