Skip to content

Commit 9f530dd

Browse files
Update substrait requirement from 0.49 to 0.50 (#13808)
* Update substrait requirement from 0.49 to 0.50 Updates the requirements on [substrait](https://github.com/substrait-io/substrait-rs) to permit the latest version. - [Release notes](https://github.com/substrait-io/substrait-rs/releases) - [Changelog](https://github.com/substrait-io/substrait-rs/blob/main/CHANGELOG.md) - [Commits](substrait-io/substrait-rs@v0.49.0...v0.50.0) --- updated-dependencies: - dependency-name: substrait dependency-type: direct:production ... Signed-off-by: dependabot[bot] <support@github.com> * Fix compilation * Add expr test --------- Signed-off-by: dependabot[bot] <support@github.com> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: jonahgao <jonahgao@msn.com>
1 parent 71f157f commit 9f530dd

File tree

4 files changed

+54
-27
lines changed

4 files changed

+54
-27
lines changed

datafusion/substrait/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ object_store = { workspace = true }
4242
pbjson-types = "0.7"
4343
# TODO use workspace version
4444
prost = "0.13"
45-
substrait = { version = "0.49", features = ["serde"] }
45+
substrait = { version = "0.50", features = ["serde"] }
4646
url = { workspace = true }
4747

4848
[dev-dependencies]

datafusion/substrait/src/logical_plan/consumer.rs

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ use datafusion::logical_expr::{
6262
col, expr, Cast, Extension, GroupingSet, Like, LogicalPlanBuilder, Partitioning,
6363
Repartition, Subquery, WindowFrameBound, WindowFrameUnits, WindowFunctionDefinition,
6464
};
65-
use datafusion::prelude::JoinType;
65+
use datafusion::prelude::{lit, JoinType};
6666
use datafusion::sql::TableReference;
6767
use datafusion::{
6868
error::Result, logical_expr::utils::split_conjunction, prelude::Column,
@@ -98,7 +98,7 @@ use substrait::proto::{
9898
sort_field::{SortDirection, SortKind::*},
9999
AggregateFunction, Expression, NamedStruct, Plan, Rel, RelCommon, Type,
100100
};
101-
use substrait::proto::{ExtendedExpression, FunctionArgument, SortField};
101+
use substrait::proto::{fetch_rel, ExtendedExpression, FunctionArgument, SortField};
102102

103103
use super::state::SubstraitPlanningState;
104104

@@ -640,14 +640,27 @@ pub async fn from_substrait_rel(
640640
let input = LogicalPlanBuilder::from(
641641
from_substrait_rel(state, input, extensions).await?,
642642
);
643-
let offset = fetch.offset as usize;
644-
// -1 means that ALL records should be returned
645-
let count = if fetch.count == -1 {
646-
None
647-
} else {
648-
Some(fetch.count as usize)
643+
let empty_schema = DFSchemaRef::new(DFSchema::empty());
644+
let offset = match &fetch.offset_mode {
645+
Some(fetch_rel::OffsetMode::Offset(offset)) => Some(lit(*offset)),
646+
Some(fetch_rel::OffsetMode::OffsetExpr(expr)) => Some(
647+
from_substrait_rex(state, expr, &empty_schema, extensions)
648+
.await?,
649+
),
650+
None => None,
651+
};
652+
let count = match &fetch.count_mode {
653+
Some(fetch_rel::CountMode::Count(count)) => {
654+
// -1 means that ALL records should be returned, equivalent to None
655+
(*count != -1).then(|| lit(*count))
656+
}
657+
Some(fetch_rel::CountMode::CountExpr(expr)) => Some(
658+
from_substrait_rex(state, expr, &empty_schema, extensions)
659+
.await?,
660+
),
661+
None => None,
649662
};
650-
input.limit(offset, count)?.build()
663+
input.limit_by_expr(offset, count)?.build()
651664
} else {
652665
not_impl_err!("Fetch without an input is not valid")
653666
}

datafusion/substrait/src/logical_plan/producer.rs

Lines changed: 25 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,7 @@ use std::sync::Arc;
2222
use substrait::proto::expression_reference::ExprType;
2323

2424
use datafusion::arrow::datatypes::{Field, IntervalUnit};
25-
use datafusion::logical_expr::{
26-
Distinct, FetchType, Like, Partitioning, SkipType, TryCast, WindowFrameUnits,
27-
};
25+
use datafusion::logical_expr::{Distinct, Like, Partitioning, TryCast, WindowFrameUnits};
2826
use datafusion::{
2927
arrow::datatypes::{DataType, TimeUnit},
3028
error::{DataFusionError, Result},
@@ -45,7 +43,7 @@ use datafusion::arrow::array::{Array, GenericListArray, OffsetSizeTrait};
4543
use datafusion::arrow::temporal_conversions::NANOSECONDS;
4644
use datafusion::common::{
4745
exec_err, internal_err, not_impl_err, plan_err, substrait_datafusion_err,
48-
substrait_err, DFSchemaRef, ToDFSchema,
46+
substrait_err, DFSchema, DFSchemaRef, ToDFSchema,
4947
};
5048
#[allow(unused_imports)]
5149
use datafusion::logical_expr::expr::{
@@ -69,7 +67,8 @@ use substrait::proto::read_rel::VirtualTable;
6967
use substrait::proto::rel_common::EmitKind;
7068
use substrait::proto::rel_common::EmitKind::Emit;
7169
use substrait::proto::{
72-
rel_common, ExchangeRel, ExpressionReference, ExtendedExpression, RelCommon,
70+
fetch_rel, rel_common, ExchangeRel, ExpressionReference, ExtendedExpression,
71+
RelCommon,
7372
};
7473
use substrait::{
7574
proto::{
@@ -333,19 +332,31 @@ pub fn to_substrait_rel(
333332
}
334333
LogicalPlan::Limit(limit) => {
335334
let input = to_substrait_rel(limit.input.as_ref(), state, extensions)?;
336-
let FetchType::Literal(fetch) = limit.get_fetch_type()? else {
337-
return not_impl_err!("Non-literal limit fetch");
338-
};
339-
let SkipType::Literal(skip) = limit.get_skip_type()? else {
340-
return not_impl_err!("Non-literal limit skip");
341-
};
335+
let empty_schema = Arc::new(DFSchema::empty());
336+
let offset_mode = limit
337+
.skip
338+
.as_ref()
339+
.map(|expr| {
340+
to_substrait_rex(state, expr.as_ref(), &empty_schema, 0, extensions)
341+
})
342+
.transpose()?
343+
.map(Box::new)
344+
.map(fetch_rel::OffsetMode::OffsetExpr);
345+
let count_mode = limit
346+
.fetch
347+
.as_ref()
348+
.map(|expr| {
349+
to_substrait_rex(state, expr.as_ref(), &empty_schema, 0, extensions)
350+
})
351+
.transpose()?
352+
.map(Box::new)
353+
.map(fetch_rel::CountMode::CountExpr);
342354
Ok(Box::new(Rel {
343355
rel_type: Some(RelType::Fetch(Box::new(FetchRel {
344356
common: None,
345357
input: Some(input),
346-
offset: skip as i64,
347-
// use -1 to signal that ALL records should be returned
348-
count: fetch.map(|f| f as i64).unwrap_or(-1),
358+
offset_mode,
359+
count_mode,
349360
advanced_extension: None,
350361
}))),
351362
}))

datafusion/substrait/tests/cases/roundtrip_logical_plan.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -240,17 +240,20 @@ async fn select_with_filter_bool_expr() -> Result<()> {
240240

241241
#[tokio::test]
242242
async fn select_with_limit() -> Result<()> {
243-
roundtrip_fill_na("SELECT * FROM data LIMIT 100").await
243+
roundtrip_fill_na("SELECT * FROM data LIMIT 100").await?;
244+
roundtrip_fill_na("SELECT * FROM data LIMIT 98+100/50").await
244245
}
245246

246247
#[tokio::test]
247248
async fn select_without_limit() -> Result<()> {
248-
roundtrip_fill_na("SELECT * FROM data OFFSET 10").await
249+
roundtrip_fill_na("SELECT * FROM data OFFSET 10").await?;
250+
roundtrip_fill_na("SELECT * FROM data OFFSET 5+7-2").await
249251
}
250252

251253
#[tokio::test]
252254
async fn select_with_limit_offset() -> Result<()> {
253-
roundtrip("SELECT * FROM data LIMIT 200 OFFSET 10").await
255+
roundtrip("SELECT * FROM data LIMIT 200 OFFSET 10").await?;
256+
roundtrip("SELECT * FROM data LIMIT 100+100 OFFSET 20/2").await
254257
}
255258

256259
#[tokio::test]

0 commit comments

Comments
 (0)