Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions datafusion/substrait/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ datafusion = { workspace = true, features = ["nested_expressions"] }
datafusion-functions-aggregate = { workspace = true }
serde_json = "1.0"
tokio = { workspace = true }
insta = { workspace = true }

[features]
default = ["physical"]
Expand Down
608 changes: 323 additions & 285 deletions datafusion/substrait/tests/cases/consumer_integration.rs

Large diffs are not rendered by default.

72 changes: 40 additions & 32 deletions datafusion/substrait/tests/cases/emit_kind_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ mod tests {
use datafusion::prelude::{CsvReadOptions, SessionConfig, SessionContext};
use datafusion_substrait::logical_plan::consumer::from_substrait_plan;
use datafusion_substrait::logical_plan::producer::to_substrait_plan;
use insta::assert_snapshot;

#[tokio::test]
async fn project_respects_direct_emit_kind() -> Result<()> {
Expand All @@ -35,13 +36,13 @@ mod tests {
let ctx = add_plan_schemas_to_ctx(SessionContext::new(), &proto_plan)?;
let plan = from_substrait_plan(&ctx.state(), &proto_plan).await?;

let plan_str = format!("{}", plan);

assert_eq!(
plan_str,
"Projection: DATA.A AS a, DATA.B AS b, DATA.A + Int64(1) AS add1\
\n TableScan: DATA"
);
assert_snapshot!(
plan,
@r#"
Projection: DATA.A AS a, DATA.B AS b, DATA.A + Int64(1) AS add1
TableScan: DATA
"#
);
Ok(())
}

Expand All @@ -53,15 +54,15 @@ mod tests {
let ctx = add_plan_schemas_to_ctx(SessionContext::new(), &proto_plan)?;
let plan = from_substrait_plan(&ctx.state(), &proto_plan).await?;

let plan_str = format!("{}", plan);

assert_eq!(
plan_str,
// Note that duplicate references in the remap are aliased
"Projection: DATA.B, DATA.A AS A1, DATA.A AS DATA.A__temp__0 AS A2\
\n Filter: DATA.B = Int64(2)\
\n TableScan: DATA"
);
assert_snapshot!(
plan,
// Note that duplicate references in the remap are aliased
@r#"
Projection: DATA.B, DATA.A AS A1, DATA.A AS DATA.A__temp__0 AS A2
Filter: DATA.B = Int64(2)
TableScan: DATA
"#
);
Ok(())
}

Expand All @@ -85,21 +86,24 @@ mod tests {
.await?;

let plan = df.into_unoptimized_plan();
assert_eq!(
format!("{}", plan),
"Projection: random() AS c1, data.a + Int64(1) AS c2\
\n TableScan: data"
);
assert_snapshot!(
plan,
@r#"
Projection: random() AS c1, data.a + Int64(1) AS c2
TableScan: data
"# );

let proto = to_substrait_plan(&plan, &ctx.state())?;
let plan2 = from_substrait_plan(&ctx.state(), &proto).await?;
// note how the Projections are not flattened
assert_eq!(
format!("{}", plan2),
"Projection: random() AS c1, data.a + Int64(1) AS c2\
\n Projection: data.a, data.b, data.c, data.d, data.e, data.f, random(), data.a + Int64(1)\
\n TableScan: data"
);
assert_snapshot!(
plan2,
@r#"
Projection: random() AS c1, data.a + Int64(1) AS c2
Projection: data.a, data.b, data.c, data.d, data.e, data.f, random(), data.a + Int64(1)
TableScan: data
"#
);
Ok(())
}

Expand All @@ -109,17 +113,21 @@ mod tests {
let df = ctx.sql("SELECT a + 1, b + 2 FROM data").await?;

let plan = df.into_unoptimized_plan();
assert_eq!(
format!("{}", plan),
"Projection: data.a + Int64(1), data.b + Int64(2)\
\n TableScan: data"
);
assert_snapshot!(
plan,
@r#"
Projection: data.a + Int64(1), data.b + Int64(2)
TableScan: data
"#
);

let proto = to_substrait_plan(&plan, &ctx.state())?;
let plan2 = from_substrait_plan(&ctx.state(), &proto).await?;

let plan1str = format!("{plan}");
let plan2str = format!("{plan2}");
println!("{}", plan1str);
println!("{}", plan2str);
assert_eq!(plan1str, plan2str);

Ok(())
Expand Down
17 changes: 9 additions & 8 deletions datafusion/substrait/tests/cases/function_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,21 +24,22 @@ mod tests {
use datafusion::common::Result;
use datafusion::prelude::SessionContext;
use datafusion_substrait::logical_plan::consumer::from_substrait_plan;
use insta::assert_snapshot;

#[tokio::test]
async fn contains_function_test() -> Result<()> {
let proto_plan = read_json("tests/testdata/contains_plan.substrait.json");
let ctx = add_plan_schemas_to_ctx(SessionContext::new(), &proto_plan)?;
let plan = from_substrait_plan(&ctx.state(), &proto_plan).await?;

let plan_str = format!("{}", plan);

assert_eq!(
plan_str,
"Projection: nation.n_name\
\n Filter: contains(nation.n_name, Utf8(\"IA\"))\
\n TableScan: nation"
);
assert_snapshot!(
plan,
@r#"
Projection: nation.n_name
Filter: contains(nation.n_name, Utf8("IA"))
TableScan: nation
"#
);
Ok(())
}
}
80 changes: 48 additions & 32 deletions datafusion/substrait/tests/cases/logical_plans.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ mod tests {
use datafusion::dataframe::DataFrame;
use datafusion::prelude::SessionContext;
use datafusion_substrait::logical_plan::consumer::from_substrait_plan;
use insta::assert_snapshot;

#[tokio::test]
async fn scalar_function_compound_signature() -> Result<()> {
Expand All @@ -40,11 +41,13 @@ mod tests {
let ctx = add_plan_schemas_to_ctx(SessionContext::new(), &proto_plan)?;
let plan = from_substrait_plan(&ctx.state(), &proto_plan).await?;

assert_eq!(
format!("{}", plan),
"Projection: NOT DATA.D AS EXPR$0\
\n TableScan: DATA"
);
assert_snapshot!(
plan,
@r#"
Projection: NOT DATA.D AS EXPR$0
TableScan: DATA
"#
);

// Trigger execution to ensure plan validity
DataFrame::new(ctx.state(), plan).show().await?;
Expand All @@ -69,12 +72,14 @@ mod tests {
let ctx = add_plan_schemas_to_ctx(SessionContext::new(), &proto_plan)?;
let plan = from_substrait_plan(&ctx.state(), &proto_plan).await?;

assert_eq!(
format!("{}", plan),
"Projection: sum(DATA.D) PARTITION BY [DATA.PART] ORDER BY [DATA.ORD ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND UNBOUNDED FOLLOWING AS LEAD_EXPR\
\n WindowAggr: windowExpr=[[sum(DATA.D) PARTITION BY [DATA.PART] ORDER BY [DATA.ORD ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND UNBOUNDED FOLLOWING]]\
\n TableScan: DATA"
);
assert_snapshot!(
plan,
@r#"
Projection: sum(DATA.D) PARTITION BY [DATA.PART] ORDER BY [DATA.ORD ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND UNBOUNDED FOLLOWING AS LEAD_EXPR
WindowAggr: windowExpr=[[sum(DATA.D) PARTITION BY [DATA.PART] ORDER BY [DATA.ORD ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND UNBOUNDED FOLLOWING]]
TableScan: DATA
"#
);

// Trigger execution to ensure plan validity
DataFrame::new(ctx.state(), plan).show().await?;
Expand All @@ -94,12 +99,14 @@ mod tests {
let ctx = add_plan_schemas_to_ctx(SessionContext::new(), &proto_plan)?;
let plan = from_substrait_plan(&ctx.state(), &proto_plan).await?;

assert_eq!(
format!("{}", plan),
"Projection: row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS EXPR$0, row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW__temp__0 AS ALIASED\
\n WindowAggr: windowExpr=[[row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]\
\n TableScan: DATA"
);
assert_snapshot!(
plan,
@r#"
Projection: row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS EXPR$0, row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW__temp__0 AS ALIASED
WindowAggr: windowExpr=[[row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
TableScan: DATA
"#
);

// Trigger execution to ensure plan validity
DataFrame::new(ctx.state(), plan).show().await?;
Expand All @@ -121,13 +128,15 @@ mod tests {
let ctx = add_plan_schemas_to_ctx(SessionContext::new(), &proto_plan)?;
let plan = from_substrait_plan(&ctx.state(), &proto_plan).await?;

assert_eq!(
format!("{}", plan),
"Projection: row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS EXPR$0, row_number() PARTITION BY [DATA.A] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS EXPR$1\
\n WindowAggr: windowExpr=[[row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]\
\n WindowAggr: windowExpr=[[row_number() PARTITION BY [DATA.A] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]\
\n TableScan: DATA"
);
assert_snapshot!(
plan,
@r#"
Projection: row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS EXPR$0, row_number() PARTITION BY [DATA.A] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS EXPR$1
WindowAggr: windowExpr=[[row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
WindowAggr: windowExpr=[[row_number() PARTITION BY [DATA.A] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
TableScan: DATA
"#
);

// Trigger execution to ensure plan validity
DataFrame::new(ctx.state(), plan).show().await?;
Expand All @@ -145,7 +154,12 @@ mod tests {
let ctx = add_plan_schemas_to_ctx(SessionContext::new(), &proto_plan)?;
let plan = from_substrait_plan(&ctx.state(), &proto_plan).await?;

assert_eq!(format!("{}", &plan), "Values: (List([1, 2]))");
assert_snapshot!(
&plan,
@r#"
Values: (List([1, 2]))
"#
);

// Trigger execution to ensure plan validity
DataFrame::new(ctx.state(), plan).show().await?;
Expand All @@ -160,13 +174,15 @@ mod tests {
let ctx = add_plan_schemas_to_ctx(SessionContext::new(), &proto_plan)?;
let plan = from_substrait_plan(&ctx.state(), &proto_plan).await?;

assert_eq!(
format!("{}", plan),
"Projection: lower(sales.product) AS lower(product), sum(count(sales.product)) AS product_count\
\n Aggregate: groupBy=[[sales.product]], aggr=[[sum(count(sales.product))]]\
\n Aggregate: groupBy=[[sales.product]], aggr=[[count(sales.product)]]\
\n TableScan: sales"
);
assert_snapshot!(
plan,
@r#"
Projection: lower(sales.product) AS lower(product), sum(count(sales.product)) AS product_count
Aggregate: groupBy=[[sales.product]], aggr=[[sum(count(sales.product))]]
Aggregate: groupBy=[[sales.product]], aggr=[[count(sales.product)]]
TableScan: sales
"#
);

// Trigger execution to ensure plan validity
DataFrame::new(ctx.state(), plan).show().await?;
Expand Down
29 changes: 18 additions & 11 deletions datafusion/substrait/tests/cases/serialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ mod tests {
use datafusion::error::Result;
use datafusion::prelude::*;

use insta::assert_snapshot;
use std::fs;
use substrait::proto::plan_rel::RelType;
use substrait::proto::rel_common::{Emit, EmitKind};
Expand Down Expand Up @@ -92,11 +93,14 @@ mod tests {
let df = ctx.sql("SELECT b, a + a, a FROM data").await?;
let datafusion_plan = df.into_optimized_plan()?;

assert_eq!(
format!("{}", datafusion_plan),
"Projection: data.b, data.a + data.a, data.a\
\n TableScan: data projection=[a, b]",
);
assert_snapshot!(
format!("{}", datafusion_plan),
@r#"
Projection: data.b, data.a + data.a, data.a
TableScan: data projection=[a, b]
"#
,
);

let plan = to_substrait_plan(&datafusion_plan, &ctx.state())?
.as_ref()
Expand Down Expand Up @@ -136,12 +140,15 @@ mod tests {
.sql("SELECT b, RANK() OVER (PARTITION BY a), c FROM data;")
.await?;
let datafusion_plan = df.into_optimized_plan()?;
assert_eq!(
format!("{}", datafusion_plan),
"Projection: data.b, rank() PARTITION BY [data.a] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING, data.c\
\n WindowAggr: windowExpr=[[rank() PARTITION BY [data.a] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING]]\
\n TableScan: data projection=[a, b, c]",
);
assert_snapshot!(
datafusion_plan,
@r#"
Projection: data.b, rank() PARTITION BY [data.a] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING, data.c
WindowAggr: windowExpr=[[rank() PARTITION BY [data.a] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING]]
TableScan: data projection=[a, b, c]
"#
,
);

let plan = to_substrait_plan(&datafusion_plan, &ctx.state())?
.as_ref()
Expand Down
Loading