Skip to content
30 changes: 27 additions & 3 deletions datafusion/substrait/src/logical_plan/consumer/rel/read_rel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,14 +114,37 @@ pub async fn from_read_rel(
.await
}
Some(ReadType::VirtualTable(vt)) => {
if vt.values.is_empty() {
if vt.values.is_empty() && vt.expressions.is_empty() {
return Ok(LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
schema: DFSchemaRef::new(substrait_schema),
}));
}

let values = vt
let values = if !vt.expressions.is_empty() {
let mut exprs = vec![];
for row in &vt.expressions {
let mut name_idx = 0;
let mut row_exprs = vec![];
for expression in &row.fields {
name_idx += 1;
let expr = consumer
.consume_expression(expression, &DFSchema::empty())
.await?;
row_exprs.push(expr);
}
if name_idx != named_struct.names.len() {
return substrait_err!(
"Names list must match exactly to nested schema, but found {} uses for {} names",
name_idx,
named_struct.names.len()
);
}
exprs.push(row_exprs);
}
exprs
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This does introduce a bit of duplication with the block below, but given that consume_expression is async and from_substrait_literal is not it would be a bit tricky to unify the processing. In the long-term Substrait drop support for the values field which is deprecated so we'll be able to remove the lower block entirely.

} else {
vt
.values
.iter()
.map(|row| {
Expand All @@ -148,7 +171,8 @@ pub async fn from_read_rel(
}
Ok(lits)
})
.collect::<datafusion::common::Result<_>>()?;
.collect::<datafusion::common::Result<_>>()?
};

Ok(LogicalPlan::Values(Values {
schema: DFSchemaRef::new(substrait_schema),
Expand Down
15 changes: 15 additions & 0 deletions datafusion/substrait/tests/cases/consumer_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,21 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn test_expressions_in_virtual_table() -> Result<()> {
let plan_str =
test_plan_to_string("virtual_table_with_expressions.substrait.json").await?;

assert_snapshot!(
plan_str,
@r#"
Projection: dummy1 AS result1, dummy2 AS result2
Values: (Int64(0), Utf8("temp")), (Int64(1), Utf8("test"))
"#
);
Ok(())
}

#[tokio::test]
async fn test_multiple_joins() -> Result<()> {
let plan_str = test_plan_to_string("multiple_joins.json").await?;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
{
"relations": [
{
"root": {
"input": {
"read": {
"common": {
"direct": {
}
},
"baseSchema": {
"names": [
"dummy1", "dummy2"
],
"struct": {
"types": [
{
"i64": {
"nullability": "NULLABILITY_REQUIRED"
}
},
{
"string": {
"nullability": "NULLABILITY_REQUIRED"
}
}
],
"nullability": "NULLABILITY_REQUIRED"
}
},
"virtualTable": {
"expressions": [
{
"fields": [
{
"literal": {
"i64": "0",
"nullable": false
}
},
{
"literal": {
"string": "temp",
"nullable": false
}
}
]
},
{
"fields": [
{
"literal": {
"i64": "1",
"nullable": false
}
},
{
"literal": {
"string": "test",
"nullable": false
}
}
]
}
]
}
}
},
"names": [
"result1", "result2"
]
}
}
]
}