Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 34 additions & 64 deletions datafusion/substrait/src/logical_plan/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -937,72 +937,42 @@ pub async fn from_substrait_rel(
}
}
Some(RelType::Set(set)) => match set_rel::SetOp::try_from(set.op) {
Ok(set_op) => match set_op {
set_rel::SetOp::UnionAll => {
if !set.inputs.is_empty() {
union_rels(&set.inputs, ctx, extensions, true).await
} else {
not_impl_err!("Union relation requires at least one input")
}
}
set_rel::SetOp::UnionDistinct => {
if !set.inputs.is_empty() {
union_rels(&set.inputs, ctx, extensions, false).await
} else {
not_impl_err!("Union relation requires at least one input")
}
}
set_rel::SetOp::IntersectionPrimary => {
if set.inputs.len() >= 2 {
LogicalPlanBuilder::intersect(
from_substrait_rel(ctx, &set.inputs[0], extensions).await?,
union_rels(&set.inputs[1..], ctx, extensions, true).await?,
false,
)
} else {
not_impl_err!(
"Primary Intersect relation requires at least two inputs"
)
}
}
set_rel::SetOp::IntersectionMultiset => {
if set.inputs.len() >= 2 {
intersect_rels(&set.inputs, ctx, extensions, false).await
} else {
not_impl_err!(
"Multiset Intersect relation requires at least two inputs"
)
}
}
set_rel::SetOp::IntersectionMultisetAll => {
if set.inputs.len() >= 2 {
intersect_rels(&set.inputs, ctx, extensions, true).await
} else {
not_impl_err!(
"MultisetAll Intersect relation requires at least two inputs"
)
}
}
set_rel::SetOp::MinusPrimary => {
if set.inputs.len() >= 2 {
except_rels(&set.inputs, ctx, extensions, false).await
} else {
not_impl_err!(
"Primary Minus relation requires at least two inputs"
)
}
}
set_rel::SetOp::MinusPrimaryAll => {
if set.inputs.len() >= 2 {
except_rels(&set.inputs, ctx, extensions, true).await
} else {
not_impl_err!(
"PrimaryAll Minus relation requires at least two inputs"
)
Ok(set_op) => {
if set.inputs.len() < 2 {
substrait_err!("Set operation requires at least two inputs")
} else {
match set_op {
set_rel::SetOp::UnionAll => {
union_rels(&set.inputs, ctx, extensions, true).await
}
set_rel::SetOp::UnionDistinct => {
union_rels(&set.inputs, ctx, extensions, false).await
}
set_rel::SetOp::IntersectionPrimary => {
LogicalPlanBuilder::intersect(
from_substrait_rel(ctx, &set.inputs[0], extensions)
.await?,
union_rels(&set.inputs[1..], ctx, extensions, true)
.await?,
false,
)
}
set_rel::SetOp::IntersectionMultiset => {
intersect_rels(&set.inputs, ctx, extensions, false).await
}
set_rel::SetOp::IntersectionMultisetAll => {
intersect_rels(&set.inputs, ctx, extensions, true).await
}
set_rel::SetOp::MinusPrimary => {
except_rels(&set.inputs, ctx, extensions, false).await
}
set_rel::SetOp::MinusPrimaryAll => {
except_rels(&set.inputs, ctx, extensions, true).await
}
_ => not_impl_err!("Unsupported set operator: {set_op:?}"),
}
}
_ => not_impl_err!("Unsupported set operator: {set_op:?}"),
},
}
Err(e) => not_impl_err!("Invalid set operation type {}: {e}", set.op),
},
Some(RelType::ExtensionLeaf(extension)) => {
Expand Down