Skip to content

Commit

Permalink
Support RelationSubquery PPL (#775)
Browse files Browse the repository at this point in the history
* Support RelationSubquery PPL

Signed-off-by: Lantao Jin <ltjin@amazon.com>

* fix doc

Signed-off-by: Lantao Jin <ltjin@amazon.com>

* revert the FROM alias

Signed-off-by: Lantao Jin <ltjin@amazon.com>

* add the case for subquery in search filter

Signed-off-by: Lantao Jin <ltjin@amazon.com>

---------

Signed-off-by: Lantao Jin <ltjin@amazon.com>
  • Loading branch information
LantaoJin authored Oct 17, 2024
1 parent 73a6b77 commit 9d3909b
Show file tree
Hide file tree
Showing 11 changed files with 798 additions and 58 deletions.
30 changes: 26 additions & 4 deletions docs/ppl-lang/PPL-Example-Commands.md
Original file line number Diff line number Diff line change
Expand Up @@ -240,8 +240,7 @@ source = table | where ispresent(a) |
- `source = table1 | cross join left = l right = r table2`
- `source = table1 | left semi join left = l right = r on l.a = r.a table2`
- `source = table1 | left anti join left = l right = r on l.a = r.a table2`

_- **Limitation: sub-searches is unsupported in join right side now**_
- `source = table1 | join left = l right = r [ source = table2 | where d > 10 | head 5 ]`


#### **Lookup**
Expand All @@ -268,6 +267,8 @@ _- **Limitation: "REPLACE" or "APPEND" clause must contain "AS"**_
- `source = outer | where a not in [ source = inner | fields b ]`
- `source = outer | where (a) not in [ source = inner | fields b ]`
- `source = outer | where (a,b,c) not in [ source = inner | fields d,e,f ]`
- `source = outer a in [ source = inner | fields b ]` (search filtering with subquery)
- `source = outer a not in [ source = inner | fields b ]` (search filtering with subquery)
- `source = outer | where a in [ source = inner1 | where b not in [ source = inner2 | fields c ] | fields b ]` (nested)
- `source = table1 | inner join left = l right = r on l.a = r.a AND r.a in [ source = inner | fields d ] | fields l.a, r.a, b, c` (as join filter)

Expand Down Expand Up @@ -317,6 +318,9 @@ Assumptions: `a`, `b` are fields of table outer, `c`, `d` are fields of table in
- `source = outer | where not exists [ source = inner | where a = c ]`
- `source = outer | where exists [ source = inner | where a = c and b = d ]`
- `source = outer | where not exists [ source = inner | where a = c and b = d ]`
- `source = outer exists [ source = inner | where a = c ]` (search filtering with subquery)
- `source = outer not exists [ source = inner | where a = c ]` (search filtering with subquery)
- `source = table as t1 exists [ source = table as t2 | where t1.a = t2.a ]` (table alias is useful in exists subquery)
- `source = outer | where exists [ source = inner1 | where a = c and exists [ source = inner2 | where c = e ] ]` (nested)
- `source = outer | where exists [ source = inner1 | where a = c | where exists [ source = inner2 | where c = e ] ]` (nested)
- `source = outer | where exists [ source = inner | where c > 10 ]` (uncorrelated exists)
Expand All @@ -332,8 +336,13 @@ Assumptions: `a`, `b` are fields of table outer, `c`, `d` are fields of table in
- `source = outer | eval m = [ source = inner | stats max(c) ] | fields m, a`
- `source = outer | eval m = [ source = inner | stats max(c) ] + b | fields m, a`

**Uncorrelated scalar subquery in Select and Where**
- `source = outer | where a > [ source = inner | stats min(c) ] | eval m = [ source = inner | stats max(c) ] | fields m, a`
**Uncorrelated scalar subquery in Where**
- `source = outer | where a > [ source = inner | stats min(c) ] | fields a`
- `source = outer | where [ source = inner | stats min(c) ] > 0 | fields a`

**Uncorrelated scalar subquery in Search filter**
- `source = outer a > [ source = inner | stats min(c) ] | fields a`
- `source = outer [ source = inner | stats min(c) ] > 0 | fields a`

**Correlated scalar subquery in Select**
- `source = outer | eval m = [ source = inner | where outer.b = inner.d | stats max(c) ] | fields m, a`
Expand All @@ -345,10 +354,23 @@ Assumptions: `a`, `b` are fields of table outer, `c`, `d` are fields of table in
- `source = outer | where a = [ source = inner | where b = d | stats max(c) ]`
- `source = outer | where [ source = inner | where outer.b = inner.d OR inner.d = 1 | stats count() ] > 0 | fields a`

**Correlated scalar subquery in Search filter**
- `source = outer a = [ source = inner | where b = d | stats max(c) ]`
- `source = outer [ source = inner | where outer.b = inner.d OR inner.d = 1 | stats count() ] > 0 | fields a`

**Nested scalar subquery**
- `source = outer | where a = [ source = inner | stats max(c) | sort c ] OR b = [ source = inner | where c = 1 | stats min(d) | sort d ]`
- `source = outer | where a = [ source = inner | where c = [ source = nested | stats max(e) by f | sort f ] | stats max(d) by c | sort c | head 1 ]`

#### **(Relation) Subquery**
[See additional command details](ppl-subquery-command.md)

`InSubquery`, `ExistsSubquery` and `ScalarSubquery` are all subquery expressions. But `RelationSubquery` is not a subquery expression, it is a subquery plan which is common used in Join or Search clause.

- `source = table1 | join left = l right = r [ source = table2 | where d > 10 | head 5 ]` (subquery in join right side)
- `source = [ source = table1 | join left = l right = r [ source = table2 | where d > 10 | head 5 ] | stats count(a) by b ] as outer | head 1`

_- **Limitation: another command usage of (relation) subquery is in `appendcols` commands which is unsupported**_

---
#### Experimental Commands:
Expand Down
2 changes: 1 addition & 1 deletion docs/ppl-lang/ppl-search-command.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ The example show fetch all the document from accounts index with .

PPL query:

os> source=accounts account_number=1 or gender="F";
os> SEARCH source=accounts account_number=1 or gender="F";
+------------------+-------------+--------------------+-----------+----------+--------+------------+---------+-------+----------------------+------------+
| account_number | firstname | address | balance | gender | city | employer | state | age | email | lastname |
|------------------+-------------+--------------------+-----------+----------+--------+------------+---------+-------+----------------------+------------|
Expand Down
112 changes: 95 additions & 17 deletions docs/ppl-lang/ppl-subquery-command.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
## PPL SubQuery Commands:

**Syntax**
### Syntax
The subquery command should be implemented using a clean, logical syntax that integrates with existing PPL structure.

```sql
Expand All @@ -21,13 +21,15 @@ For additional info See [Issue](https://github.com/opensearch-project/opensearch

---

**InSubquery usage**
### InSubquery usage
- `source = outer | where a in [ source = inner | fields b ]`
- `source = outer | where (a) in [ source = inner | fields b ]`
- `source = outer | where (a,b,c) in [ source = inner | fields d,e,f ]`
- `source = outer | where a not in [ source = inner | fields b ]`
- `source = outer | where (a) not in [ source = inner | fields b ]`
- `source = outer | where (a,b,c) not in [ source = inner | fields d,e,f ]`
- `source = outer a in [ source = inner | fields b ]` (search filtering with subquery)
- `source = outer a not in [ source = inner | fields b ]` (search filtering with subquery)
- `source = outer | where a in [ source = inner1 | where b not in [ source = inner2 | fields c ] | fields b ]` (nested)
- `source = table1 | inner join left = l right = r on l.a = r.a AND r.a in [ source = inner | fields d ] | fields l.a, r.a, b, c` (as join filter)

Expand Down Expand Up @@ -111,15 +113,19 @@ source = supplier
nation
| sort s_name
```
---

**ExistsSubquery usage**
### ExistsSubquery usage

Assumptions: `a`, `b` are fields of table outer, `c`, `d` are fields of table inner, `e`, `f` are fields of table inner2

- `source = outer | where exists [ source = inner | where a = c ]`
- `source = outer | where not exists [ source = inner | where a = c ]`
- `source = outer | where exists [ source = inner | where a = c and b = d ]`
- `source = outer | where not exists [ source = inner | where a = c and b = d ]`
- `source = outer exists [ source = inner | where a = c ]` (search filtering with subquery)
- `source = outer not exists [ source = inner | where a = c ]` (search filtering with subquery)
- `source = table as t1 exists [ source = table as t2 | where t1.a = t2.a ]` (table alias is useful in exists subquery)
- `source = outer | where exists [ source = inner1 | where a = c and exists [ source = inner2 | where c = e ] ]` (nested)
- `source = outer | where exists [ source = inner1 | where a = c | where exists [ source = inner2 | where c = e ] ]` (nested)
- `source = outer | where exists [ source = inner | where c > 10 ]` (uncorrelated exists)
Expand Down Expand Up @@ -163,17 +169,21 @@ source = orders
| sort o_orderpriority
| fields o_orderpriority, order_count
```
---

**ScalarSubquery usage**
### ScalarSubquery usage

Assumptions: `a`, `b` are fields of table outer, `c`, `d` are fields of table inner, `e`, `f` are fields of table nested

**Uncorrelated scalar subquery in Select**
- `source = outer | eval m = [ source = inner | stats max(c) ] | fields m, a`
- `source = outer | eval m = [ source = inner | stats max(c) ] + b | fields m, a`

**Uncorrelated scalar subquery in Select and Where**
- `source = outer | where a > [ source = inner | stats min(c) ] | eval m = [ source = inner | stats max(c) ] | fields m, a`
**Uncorrelated scalar subquery in Where**
- `source = outer | where a > [ source = inner | stats min(c) ] | fields a`

**Uncorrelated scalar subquery in Search filter**
- `source = outer a > [ source = inner | stats min(c) ] | fields a`

**Correlated scalar subquery in Select**
- `source = outer | eval m = [ source = inner | where outer.b = inner.d | stats max(c) ] | fields m, a`
Expand All @@ -185,6 +195,10 @@ Assumptions: `a`, `b` are fields of table outer, `c`, `d` are fields of table in
- `source = outer | where a = [ source = inner | where b = d | stats max(c) ]`
- `source = outer | where [ source = inner | where outer.b = inner.d OR inner.d = 1 | stats count() ] > 0 | fields a`

**Correlated scalar subquery in Search filter**
- `source = outer a = [ source = inner | where b = d | stats max(c) ]`
- `source = outer [ source = inner | where outer.b = inner.d OR inner.d = 1 | stats count() ] > 0 | fields a`

**Nested scalar subquery**
- `source = outer | where a = [ source = inner | stats max(c) | sort c ] OR b = [ source = inner | where c = 1 | stats min(d) | sort d ]`
- `source = outer | where a = [ source = inner | where c = [ source = nested | stats max(e) by f | sort f ] | stats max(d) by c | sort c | head 1 ]`
Expand Down Expand Up @@ -240,27 +254,77 @@ source = spark_catalog.default.outer
source = spark_catalog.default.inner | where c = 1 | stats min(d) | sort d
]
```
---

### (Relation) Subquery
`InSubquery`, `ExistsSubquery` and `ScalarSubquery` are all subquery expressions. But `RelationSubquery` is not a subquery expression, it is a subquery plan which is common used in Join or From clause.

- `source = table1 | join left = l right = r [ source = table2 | where d > 10 | head 5 ]` (subquery in join right side)
- `source = [ source = table1 | join left = l right = r [ source = table2 | where d > 10 | head 5 ] | stats count(a) by b ] as outer | head 1`

### **Additional Context**
**_SQL Migration examples with Subquery PPL:_**

tpch q13
```sql
select
c_count,
count(*) as custdist
from
(
select
c_custkey,
count(o_orderkey) as c_count
from
customer left outer join orders on
c_custkey = o_custkey
and o_comment not like '%special%requests%'
group by
c_custkey
) as c_orders
group by
c_count
order by
custdist desc,
c_count desc
```
Rewritten by PPL (Relation) Subquery:
```sql
SEARCH source = [
SEARCH source = customer
| LEFT OUTER JOIN left = c right = o ON c_custkey = o_custkey
[
SEARCH source = orders
| WHERE not like(o_comment, '%special%requests%')
]
| STATS COUNT(o_orderkey) AS c_count BY c_custkey
] AS c_orders
| STATS COUNT(o_orderkey) AS c_count BY c_custkey
| STATS COUNT(1) AS custdist BY c_count
| SORT - custdist, - c_count
```
---

`InSubquery`, `ExistsSubquery` and `ScalarSubquery` are all subquery expression. The common usage of subquery expression is in `where` clause:
### Additional Context

The `where` command syntax is:
`InSubquery`, `ExistsSubquery` and `ScalarSubquery` as subquery expressions, their common usage is in `where` clause and `search filter`.

Where command:
```
| where <boolean expression> | ...
```
| where <boolean expression>
Search filter:
```
So the subquery is part of boolean expression, such as
search source=* <boolean expression> | ...
```
A subquery expression could be used in boolean expression, for example

```sql
| where orders.order_id in (subquery source=returns | where return_reason="damaged" | return order_id)
| where orders.order_id in [ source=returns | where return_reason="damaged" | field order_id ]
```

The `orders.order_id in (subquery source=...)` is a `<boolean expression>`.

In general, we name this kind of subquery clause the `InSubquery` expression, it is a `<boolean expression>`, one kind of `subquery expressions`.
The `orders.order_id in [ source=... ]` is a `<boolean expression>`.

PS: there are many kinds of `subquery expressions`, another commonly used one is `ScalarSubquery` expression:
In general, we name this kind of subquery clause the `InSubquery` expression, it is a `<boolean expression>`.

**Subquery with Different Join Types**

Expand Down Expand Up @@ -326,4 +390,18 @@ source = outer
| eval l = "nonEmpty"
| fields l
```
This query just print "nonEmpty" if the inner table is not empty.
This query just print "nonEmpty" if the inner table is not empty.

**Table alias in subquery**

Table alias is useful in query which contains a subquery, for example

```sql
select a, (
select sum(b)
from catalog.schema.table1 as t1
where t1.a = t2.a
) sum_b
from catalog.schema.table2 as t2
```
`t1` and `t2` are table aliases which are used in correlated subquery, `sum_b` are subquery alias.
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,44 @@ class FlintSparkPPLExistsSubqueryITSuite
comparePlans(logicalPlan, expectedPlan, checkAnalysis = false)
}

test("test simple exists subquery in search filter") {
val frame = sql(s"""
| source = $outerTable exists [ source = $innerTable | where id = uid ]
| | sort - salary
| | fields id, name, salary
| """.stripMargin)
val results: Array[Row] = frame.collect()
val expectedResults: Array[Row] = Array(
Row(1002, "John", 120000),
Row(1003, "David", 120000),
Row(1000, "Jake", 100000),
Row(1005, "Jane", 90000),
Row(1006, "Tommy", 30000))
implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, Integer](_.getAs[Integer](0))
assert(results.sorted.sameElements(expectedResults.sorted))

val logicalPlan: LogicalPlan = frame.queryExecution.logical

val outer = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test1"))
val inner = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test2"))
val existsSubquery = Filter(
Exists(Filter(EqualTo(UnresolvedAttribute("id"), UnresolvedAttribute("uid")), inner)),
outer)
val sortedPlan = Sort(
Seq(SortOrder(UnresolvedAttribute("salary"), Descending)),
global = true,
existsSubquery)
val expectedPlan =
Project(
Seq(
UnresolvedAttribute("id"),
UnresolvedAttribute("name"),
UnresolvedAttribute("salary")),
sortedPlan)

comparePlans(logicalPlan, expectedPlan, checkAnalysis = false)
}

test("test not exists subquery") {
val frame = sql(s"""
| source = $outerTable
Expand Down Expand Up @@ -122,6 +160,41 @@ class FlintSparkPPLExistsSubqueryITSuite
comparePlans(logicalPlan, expectedPlan, checkAnalysis = false)
}

test("test not exists subquery in search filter") {
val frame = sql(s"""
| source = $outerTable not exists [ source = $innerTable | where id = uid ]
| | sort - salary
| | fields id, name, salary
| """.stripMargin)
val results: Array[Row] = frame.collect()
val expectedResults: Array[Row] = Array(Row(1001, "Hello", 70000), Row(1004, "David", 0))
implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, Integer](_.getAs[Integer](0))
assert(results.sorted.sameElements(expectedResults.sorted))

val logicalPlan: LogicalPlan = frame.queryExecution.logical

val outer = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test1"))
val inner = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test2"))
val existsSubquery =
Filter(
Not(
Exists(Filter(EqualTo(UnresolvedAttribute("id"), UnresolvedAttribute("uid")), inner))),
outer)
val sortedPlan = Sort(
Seq(SortOrder(UnresolvedAttribute("salary"), Descending)),
global = true,
existsSubquery)
val expectedPlan =
Project(
Seq(
UnresolvedAttribute("id"),
UnresolvedAttribute("name"),
UnresolvedAttribute("salary")),
sortedPlan)

comparePlans(logicalPlan, expectedPlan, checkAnalysis = false)
}

test("test empty exists subquery") {
var frame = sql(s"""
| source = $outerTable
Expand Down
Loading

0 comments on commit 9d3909b

Please sign in to comment.