Skip to content

Commit 23d91c5

Browse files
Support WHERE, ORDER BY, LIMIT, SELECT, EXTEND pipe operators (#17278)
* support WHERE pipe operator * support order by * support limit * select pipe * extend support * document supported pipe operators in user guide * fmt * fix where pipe before extend * don't rebind * remove clone * move docs into select.md * avoid confusion by removing `>` in examples --------- Co-authored-by: Jeffrey Vo <jeffrey.vo.australia@gmail.com>
1 parent 22a1eab commit 23d91c5

File tree

4 files changed

+268
-7
lines changed

4 files changed

+268
-7
lines changed

datafusion/sql/src/query.rs

Lines changed: 74 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,15 @@ use crate::planner::{ContextProvider, PlannerContext, SqlToRel};
2121

2222
use crate::stack::StackGuard;
2323
use datafusion_common::{not_impl_err, Constraints, DFSchema, Result};
24-
use datafusion_expr::expr::Sort;
24+
use datafusion_expr::expr::{Sort, WildcardOptions};
2525

26+
use datafusion_expr::select_expr::SelectExpr;
2627
use datafusion_expr::{
2728
CreateMemoryTable, DdlStatement, Distinct, Expr, LogicalPlan, LogicalPlanBuilder,
2829
};
2930
use sqlparser::ast::{
30-
Expr as SQLExpr, Ident, LimitClause, OrderBy, OrderByExpr, OrderByKind, Query,
31-
SelectInto, SetExpr,
31+
Expr as SQLExpr, Ident, LimitClause, Offset, OffsetRows, OrderBy, OrderByExpr,
32+
OrderByKind, PipeOperator, Query, SelectInto, SetExpr,
3233
};
3334
use sqlparser::tokenizer::Span;
3435

@@ -49,7 +50,7 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
4950
}
5051

5152
let set_expr = *query.body;
52-
match set_expr {
53+
let plan = match set_expr {
5354
SetExpr::Select(mut select) => {
5455
let select_into = select.into.take();
5556
let plan =
@@ -78,6 +79,75 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
7879
let plan = self.order_by(plan, order_by_rex)?;
7980
self.limit(plan, query.limit_clause, planner_context)
8081
}
82+
}?;
83+
84+
self.pipe_operators(plan, query.pipe_operators, planner_context)
85+
}
86+
87+
/// Apply pipe operators to a plan
88+
fn pipe_operators(
89+
&self,
90+
mut plan: LogicalPlan,
91+
pipe_operators: Vec<PipeOperator>,
92+
planner_context: &mut PlannerContext,
93+
) -> Result<LogicalPlan> {
94+
for pipe_operator in pipe_operators {
95+
plan = self.pipe_operator(plan, pipe_operator, planner_context)?;
96+
}
97+
Ok(plan)
98+
}
99+
100+
/// Apply a pipe operator to a plan
101+
fn pipe_operator(
102+
&self,
103+
plan: LogicalPlan,
104+
pipe_operator: PipeOperator,
105+
planner_context: &mut PlannerContext,
106+
) -> Result<LogicalPlan> {
107+
match pipe_operator {
108+
PipeOperator::Where { expr } => {
109+
self.plan_selection(Some(expr), plan, planner_context)
110+
}
111+
PipeOperator::OrderBy { exprs } => {
112+
let sort_exprs = self.order_by_to_sort_expr(
113+
exprs,
114+
plan.schema(),
115+
planner_context,
116+
true,
117+
None,
118+
)?;
119+
self.order_by(plan, sort_exprs)
120+
}
121+
PipeOperator::Limit { expr, offset } => self.limit(
122+
plan,
123+
Some(LimitClause::LimitOffset {
124+
limit: Some(expr),
125+
offset: offset.map(|offset| Offset {
126+
value: offset,
127+
rows: OffsetRows::None,
128+
}),
129+
limit_by: vec![],
130+
}),
131+
planner_context,
132+
),
133+
PipeOperator::Select { exprs } => {
134+
let empty_from = matches!(plan, LogicalPlan::EmptyRelation(_));
135+
let select_exprs =
136+
self.prepare_select_exprs(&plan, exprs, empty_from, planner_context)?;
137+
self.project(plan, select_exprs)
138+
}
139+
PipeOperator::Extend { exprs } => {
140+
let empty_from = matches!(plan, LogicalPlan::EmptyRelation(_));
141+
let extend_exprs =
142+
self.prepare_select_exprs(&plan, exprs, empty_from, planner_context)?;
143+
let all_exprs =
144+
std::iter::once(SelectExpr::Wildcard(WildcardOptions::default()))
145+
.chain(extend_exprs)
146+
.collect();
147+
self.project(plan, all_exprs)
148+
}
149+
150+
x => not_impl_err!("`{x}` pipe operator is not supported yet"),
81151
}
82152
}
83153

datafusion/sql/src/select.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -585,7 +585,7 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
585585
Ok((intermediate_plan, intermediate_select_exprs))
586586
}
587587

588-
fn plan_selection(
588+
pub(crate) fn plan_selection(
589589
&self,
590590
selection: Option<SQLExpr>,
591591
plan: LogicalPlan,
@@ -666,7 +666,7 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
666666
}
667667

668668
/// Returns the `Expr`'s corresponding to a SQL query's SELECT expressions.
669-
fn prepare_select_exprs(
669+
pub(crate) fn prepare_select_exprs(
670670
&self,
671671
plan: &LogicalPlan,
672672
projection: Vec<SelectItem>,
@@ -826,7 +826,11 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
826826
}
827827

828828
/// Wrap a plan in a projection
829-
fn project(&self, input: LogicalPlan, expr: Vec<SelectExpr>) -> Result<LogicalPlan> {
829+
pub(crate) fn project(
830+
&self,
831+
input: LogicalPlan,
832+
expr: Vec<SelectExpr>,
833+
) -> Result<LogicalPlan> {
830834
// convert to Expr for validate_schema_satisfies_exprs
831835
let exprs = expr
832836
.iter()
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
# BigQuery supports the pipe operator syntax
19+
# TODO: Make the Generic dialect support the pipe operator syntax
20+
statement ok
21+
set datafusion.sql_parser.dialect = 'BigQuery';
22+
23+
statement ok
24+
CREATE TABLE test(
25+
a INT,
26+
b FLOAT,
27+
c VARCHAR,
28+
n VARCHAR
29+
) AS VALUES
30+
(1, 1.1, 'a', NULL),
31+
(2, 2.2, 'b', NULL),
32+
(3, 3.3, 'c', NULL)
33+
;
34+
35+
# WHERE pipe
36+
query IRTT
37+
SELECT *
38+
FROM test
39+
|> WHERE a > 1
40+
----
41+
2 2.2 b NULL
42+
3 3.3 c NULL
43+
44+
# ORDER BY pipe
45+
query IRTT
46+
SELECT *
47+
FROM test
48+
|> ORDER BY a DESC
49+
----
50+
3 3.3 c NULL
51+
2 2.2 b NULL
52+
1 1.1 a NULL
53+
54+
# ORDER BY pipe, limit
55+
query IRTT
56+
SELECT *
57+
FROM test
58+
|> ORDER BY a DESC
59+
|> LIMIT 1
60+
----
61+
3 3.3 c NULL
62+
63+
# SELECT pipe
64+
query I
65+
SELECT *
66+
FROM test
67+
|> SELECT a
68+
----
69+
1
70+
2
71+
3
72+
73+
# EXTEND pipe
74+
query IRR
75+
SELECT *
76+
FROM test
77+
|> SELECT a, b
78+
|> EXTEND a + b AS a_plus_b
79+
----
80+
1 1.1 2.1
81+
2 2.2 4.2
82+
3 3.3 6.3
83+
84+
query IRR
85+
SELECT *
86+
FROM test
87+
|> SELECT a, b
88+
|> where a = 1
89+
|> EXTEND a + b AS a_plus_b
90+
----
91+
1 1.1 2.1

docs/source/user-guide/sql/select.md

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ DataFusion supports the following syntax for queries:
4040
[ [ORDER BY](#order-by-clause) expression [ ASC | DESC ][, ...] ] <br/>
4141
[ [LIMIT](#limit-clause) count ] <br/>
4242
[ [EXCLUDE | EXCEPT](#exclude-and-except-clause) ] <br/>
43+
[Pipe operators](#pipe-operators) <br/>
4344

4445
</code>
4546

@@ -327,3 +328,98 @@ FROM table;
327328
SELECT * EXCLUDE(age, person)
328329
FROM table;
329330
```
331+
332+
## Pipe operators
333+
334+
Some SQL dialects (e.g. BigQuery) support the pipe operator `|>`.
335+
The SQL dialect can be set like this:
336+
337+
```sql
338+
set datafusion.sql_parser.dialect = 'BigQuery';
339+
```
340+
341+
DataFusion currently supports the following pipe operators:
342+
343+
- [WHERE](#pipe_where)
344+
- [ORDER BY](#pipe_order_by)
345+
- [LIMIT](#pipe_limit)
346+
- [SELECT](#pipe_select)
347+
- [EXTEND](#pipe_extend)
348+
349+
(pipe_where)=
350+
351+
### WHERE
352+
353+
```sql
354+
select * from range(0,10)
355+
|> where value < 2;
356+
+-------+
357+
| value |
358+
+-------+
359+
| 0 |
360+
| 1 |
361+
+-------+
362+
```
363+
364+
(pipe_order_by)=
365+
366+
### ORDER BY
367+
368+
```sql
369+
select * from range(0,3)
370+
|> order by value desc;
371+
+-------+
372+
| value |
373+
+-------+
374+
| 2 |
375+
| 1 |
376+
| 0 |
377+
+-------+
378+
```
379+
380+
(pipe_limit)=
381+
382+
### LIMIT
383+
384+
```sql
385+
select * from range(0,3)
386+
|> order by value desc
387+
|> limit 1;
388+
+-------+
389+
| value |
390+
+-------+
391+
| 2 |
392+
+-------+
393+
```
394+
395+
(pipe_select)=
396+
397+
### SELECT
398+
399+
```sql
400+
select * from range(0,3)
401+
|> select value + 10;
402+
+---------------------------+
403+
| range().value + Int64(10) |
404+
+---------------------------+
405+
| 10 |
406+
| 11 |
407+
| 12 |
408+
+---------------------------+
409+
```
410+
411+
(pipe_extend)=
412+
413+
### EXTEND
414+
415+
```sql
416+
select * from range(0,3)
417+
|> extend -value AS minus_value;
418+
+-------+-------------+
419+
| value | minus_value |
420+
+-------+-------------+
421+
| 0 | 0 |
422+
| 1 | -1 |
423+
| 2 | -2 |
424+
+-------+-------------+
425+
```

0 commit comments

Comments
 (0)