Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion arrow-schema/src/field.rs
Original file line number Diff line number Diff line change
Expand Up @@ -833,7 +833,7 @@ impl Field {
nested_fields
.iter()
.chain(from_nested_fields)
.try_for_each(|f| builder.try_merge(f))?;
.try_for_each(|f| builder.try_merge(f, true))?;
*nested_fields = builder.finish().fields;
}
_ => {
Expand Down
98 changes: 86 additions & 12 deletions arrow-schema/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,20 +95,55 @@ impl SchemaBuilder {
/// Appends a [`FieldRef`] to this [`SchemaBuilder`] checking for collision
///
/// If an existing field exists with the same name, calls [`Field::try_merge`]
pub fn try_merge(&mut self, field: &FieldRef) -> Result<(), ArrowError> {
///
/// If `preserve_nullability` is true, the nullability of the field will be preserved,
/// i.e. merging a nullable field into a non-nullable field will result in a nullable field.
/// If `preserve_nullability` is false, the nullability of the field will be set to true.
pub fn try_merge(
&mut self,
field: &FieldRef,
preserve_nullability: bool,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would be a breaking API change, which means we would have to wait for the next major release to get it in

) -> Result<(), ArrowError> {
// This could potentially be sped up with a HashMap or similar
let existing = self.fields.iter_mut().find(|f| f.name() == field.name());
match existing {
Some(e) if Arc::ptr_eq(e, field) => {} // Nothing to do
// The existing field is the same as the field to be merged (pointer equality)
Some(e) if Arc::ptr_eq(e, field) => {
if !preserve_nullability {
let mut t = e.as_ref().clone();
t.set_nullable(true);
*e = Arc::new(t);
}
}
// The existing field is different from the field to be merged (pointer inequality)
Some(e) => match Arc::get_mut(e) {
Some(e) => e.try_merge(field.as_ref())?,
// No other pointers to the same field, so we can mutate it
Some(e) => {
let mut merged = e.as_ref().clone();
if !preserve_nullability {
merged.set_nullable(true);
}
merged.try_merge(field.as_ref())?;
*e = merged;
}
// Other pointers to the same field exist, so we need to clone it
None => {
let mut t = e.as_ref().clone();
t.try_merge(field)?;
*e = Arc::new(t)
let mut merged = e.as_ref().clone();
if !preserve_nullability {
merged.set_nullable(true);
}
merged.try_merge(field)?;
*e = Arc::new(merged)
}
},
None => self.fields.push(field.clone()),
// The field is not found, add it to the schema
None => {
let mut new = field.as_ref().clone();
if !preserve_nullability {
new.set_nullable(true);
}
self.fields.push(Arc::new(new))
}
}
Ok(())
}
Expand Down Expand Up @@ -266,6 +301,9 @@ impl Schema {

/// Merge schema into self if it is compatible. Struct fields will be merged recursively.
///
/// If a field is present in all schemas, its nullability will be preserved across merges.
/// Otherwise, it will be set to nullable.
///
/// Example:
///
/// ```
Expand Down Expand Up @@ -293,6 +331,20 @@ impl Schema {
/// );
/// ```
pub fn try_merge(schemas: impl IntoIterator<Item = Self>) -> Result<Self, ArrowError> {
let schemas = schemas.into_iter().collect::<Vec<Self>>();
Copy link
Contributor Author

@vegarsti vegarsti Nov 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This increases the memory consumption of this method. An alternative could be to change the function signature to schemas: &[Self]? But I haven't checked if that is feasible.

let num_schemas = schemas.len();

// Count field occurrences
let mut field_counts: HashMap<String, usize> = HashMap::new();
for schema in &schemas {
for field in schema.fields.iter() {
field_counts
.entry(field.name().to_string())
.and_modify(|count| *count += 1)
.or_insert(1);
}
}

let mut out_meta = HashMap::new();
let mut out_fields = SchemaBuilder::new();
for schema in schemas {
Expand All @@ -312,7 +364,12 @@ impl Schema {
}

// merge fields
fields.iter().try_for_each(|x| out_fields.try_merge(x))?
fields.iter().try_for_each(|field| {
// Only preserve nullability of a field if it shows up in all schemas
let preserve_nullability =
field_counts.get(field.name()).unwrap_or(&0) == &num_schemas;
out_fields.try_merge(field, preserve_nullability)
})?
}

Ok(out_fields.finish().with_metadata(out_meta))
Expand Down Expand Up @@ -1367,17 +1424,17 @@ mod tests {
merged,
Schema::new_with_metadata(
vec![
Field::new("first_name", DataType::Utf8, false),
Field::new("last_name", DataType::Utf8, true),
Field::new("first_name", DataType::Utf8, true), // not present in all schemas, so nullable
Field::new("last_name", DataType::Utf8, true), // present in all schemas, but not nullable in all schemas, so nullable
Field::new(
"address",
DataType::Struct(Fields::from(vec![
Field::new("zip", DataType::UInt16, true),
Field::new("street", DataType::Utf8, false),
])),
false,
false, // present in all schemas as not nullable, so not nullable
),
Field::new("number", DataType::Utf8, true),
Field::new("number", DataType::Utf8, true), // nullable (only present in one schema, as nullable)
],
[("foo".to_string(), "bar".to_string())]
.iter()
Expand Down Expand Up @@ -1502,4 +1559,21 @@ mod tests {
assert_eq!(out.metadata["k"], "v");
assert_eq!(out.metadata["key"], "value");
}

#[test]
fn test_schema_merge_nullability() {
let merged = Schema::try_merge(vec![
Schema::new(vec![Field::new("first_name", DataType::Utf8, false)]),
Schema::new(vec![Field::new("last_name", DataType::Utf8, false)]),
])
.unwrap();

assert_eq!(
merged,
Schema::new(vec![
Field::new("first_name", DataType::Utf8, true),
Field::new("last_name", DataType::Utf8, true),
])
);
}
}
Loading