Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support RelationSubquery PPL #775

Merged
merged 6 commits into from
Oct 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 26 additions & 4 deletions docs/ppl-lang/PPL-Example-Commands.md
Original file line number Diff line number Diff line change
Expand Up @@ -240,8 +240,7 @@ source = table | where ispresent(a) |
- `source = table1 | cross join left = l right = r table2`
- `source = table1 | left semi join left = l right = r on l.a = r.a table2`
- `source = table1 | left anti join left = l right = r on l.a = r.a table2`

_- **Limitation: sub-searches is unsupported in join right side now**_
- `source = table1 | join left = l right = r [ source = table2 | where d > 10 | head 5 ]`


#### **Lookup**
Expand All @@ -268,6 +267,8 @@ _- **Limitation: "REPLACE" or "APPEND" clause must contain "AS"**_
- `source = outer | where a not in [ source = inner | fields b ]`
- `source = outer | where (a) not in [ source = inner | fields b ]`
- `source = outer | where (a,b,c) not in [ source = inner | fields d,e,f ]`
- `source = outer a in [ source = inner | fields b ]` (search filtering with subquery)
- `source = outer a not in [ source = inner | fields b ]` (search filtering with subquery)
- `source = outer | where a in [ source = inner1 | where b not in [ source = inner2 | fields c ] | fields b ]` (nested)
- `source = table1 | inner join left = l right = r on l.a = r.a AND r.a in [ source = inner | fields d ] | fields l.a, r.a, b, c` (as join filter)

Expand Down Expand Up @@ -317,6 +318,9 @@ Assumptions: `a`, `b` are fields of table outer, `c`, `d` are fields of table in
- `source = outer | where not exists [ source = inner | where a = c ]`
- `source = outer | where exists [ source = inner | where a = c and b = d ]`
- `source = outer | where not exists [ source = inner | where a = c and b = d ]`
- `source = outer exists [ source = inner | where a = c ]` (search filtering with subquery)
- `source = outer not exists [ source = inner | where a = c ]` (search filtering with subquery)
- `source = table as t1 exists [ source = table as t2 | where t1.a = t2.a ]` (table alias is useful in exists subquery)
- `source = outer | where exists [ source = inner1 | where a = c and exists [ source = inner2 | where c = e ] ]` (nested)
- `source = outer | where exists [ source = inner1 | where a = c | where exists [ source = inner2 | where c = e ] ]` (nested)
- `source = outer | where exists [ source = inner | where c > 10 ]` (uncorrelated exists)
Expand All @@ -332,8 +336,13 @@ Assumptions: `a`, `b` are fields of table outer, `c`, `d` are fields of table in
- `source = outer | eval m = [ source = inner | stats max(c) ] | fields m, a`
- `source = outer | eval m = [ source = inner | stats max(c) ] + b | fields m, a`

**Uncorrelated scalar subquery in Select and Where**
- `source = outer | where a > [ source = inner | stats min(c) ] | eval m = [ source = inner | stats max(c) ] | fields m, a`
**Uncorrelated scalar subquery in Where**
- `source = outer | where a > [ source = inner | stats min(c) ] | fields a`
- `source = outer | where [ source = inner | stats min(c) ] > 0 | fields a`

**Uncorrelated scalar subquery in Search filter**
- `source = outer a > [ source = inner | stats min(c) ] | fields a`
- `source = outer [ source = inner | stats min(c) ] > 0 | fields a`

**Correlated scalar subquery in Select**
- `source = outer | eval m = [ source = inner | where outer.b = inner.d | stats max(c) ] | fields m, a`
Expand All @@ -345,10 +354,23 @@ Assumptions: `a`, `b` are fields of table outer, `c`, `d` are fields of table in
- `source = outer | where a = [ source = inner | where b = d | stats max(c) ]`
- `source = outer | where [ source = inner | where outer.b = inner.d OR inner.d = 1 | stats count() ] > 0 | fields a`

**Correlated scalar subquery in Search filter**
- `source = outer a = [ source = inner | where b = d | stats max(c) ]`
- `source = outer [ source = inner | where outer.b = inner.d OR inner.d = 1 | stats count() ] > 0 | fields a`

**Nested scalar subquery**
- `source = outer | where a = [ source = inner | stats max(c) | sort c ] OR b = [ source = inner | where c = 1 | stats min(d) | sort d ]`
- `source = outer | where a = [ source = inner | where c = [ source = nested | stats max(e) by f | sort f ] | stats max(d) by c | sort c | head 1 ]`

#### **(Relation) Subquery**
[See additional command details](ppl-subquery-command.md)

`InSubquery`, `ExistsSubquery` and `ScalarSubquery` are all subquery expressions. But `RelationSubquery` is not a subquery expression, it is a subquery plan which is common used in Join or Search clause.

- `source = table1 | join left = l right = r [ source = table2 | where d > 10 | head 5 ]` (subquery in join right side)
- `source = [ source = table1 | join left = l right = r [ source = table2 | where d > 10 | head 5 ] | stats count(a) by b ] as outer | head 1`

_- **Limitation: another command usage of (relation) subquery is in `appendcols` commands which is unsupported**_

---
#### Experimental Commands:
Expand Down
2 changes: 1 addition & 1 deletion docs/ppl-lang/ppl-search-command.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ The example show fetch all the document from accounts index with .

PPL query:

os> source=accounts account_number=1 or gender="F";
os> SEARCH source=accounts account_number=1 or gender="F";
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are two queries as examples. One example query ignores SEARCH keyword. Keep a SEARCH keyword in this example query.

+------------------+-------------+--------------------+-----------+----------+--------+------------+---------+-------+----------------------+------------+
| account_number | firstname | address | balance | gender | city | employer | state | age | email | lastname |
|------------------+-------------+--------------------+-----------+----------+--------+------------+---------+-------+----------------------+------------|
Expand Down
112 changes: 95 additions & 17 deletions docs/ppl-lang/ppl-subquery-command.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
## PPL SubQuery Commands:

**Syntax**
### Syntax
The subquery command should be implemented using a clean, logical syntax that integrates with existing PPL structure.

```sql
Expand All @@ -21,13 +21,15 @@ For additional info See [Issue](https://github.com/opensearch-project/opensearch

---

**InSubquery usage**
### InSubquery usage
- `source = outer | where a in [ source = inner | fields b ]`
- `source = outer | where (a) in [ source = inner | fields b ]`
- `source = outer | where (a,b,c) in [ source = inner | fields d,e,f ]`
- `source = outer | where a not in [ source = inner | fields b ]`
- `source = outer | where (a) not in [ source = inner | fields b ]`
- `source = outer | where (a,b,c) not in [ source = inner | fields d,e,f ]`
- `source = outer a in [ source = inner | fields b ]` (search filtering with subquery)
- `source = outer a not in [ source = inner | fields b ]` (search filtering with subquery)
- `source = outer | where a in [ source = inner1 | where b not in [ source = inner2 | fields c ] | fields b ]` (nested)
- `source = table1 | inner join left = l right = r on l.a = r.a AND r.a in [ source = inner | fields d ] | fields l.a, r.a, b, c` (as join filter)

Expand Down Expand Up @@ -111,15 +113,19 @@ source = supplier
nation
| sort s_name
```
---

**ExistsSubquery usage**
### ExistsSubquery usage

Assumptions: `a`, `b` are fields of table outer, `c`, `d` are fields of table inner, `e`, `f` are fields of table inner2

- `source = outer | where exists [ source = inner | where a = c ]`
- `source = outer | where not exists [ source = inner | where a = c ]`
- `source = outer | where exists [ source = inner | where a = c and b = d ]`
- `source = outer | where not exists [ source = inner | where a = c and b = d ]`
- `source = outer exists [ source = inner | where a = c ]` (search filtering with subquery)
- `source = outer not exists [ source = inner | where a = c ]` (search filtering with subquery)
- `source = table as t1 exists [ source = table as t2 | where t1.a = t2.a ]` (table alias is useful in exists subquery)
- `source = outer | where exists [ source = inner1 | where a = c and exists [ source = inner2 | where c = e ] ]` (nested)
- `source = outer | where exists [ source = inner1 | where a = c | where exists [ source = inner2 | where c = e ] ]` (nested)
- `source = outer | where exists [ source = inner | where c > 10 ]` (uncorrelated exists)
Expand Down Expand Up @@ -163,17 +169,21 @@ source = orders
| sort o_orderpriority
| fields o_orderpriority, order_count
```
---

**ScalarSubquery usage**
### ScalarSubquery usage

Assumptions: `a`, `b` are fields of table outer, `c`, `d` are fields of table inner, `e`, `f` are fields of table nested

**Uncorrelated scalar subquery in Select**
- `source = outer | eval m = [ source = inner | stats max(c) ] | fields m, a`
- `source = outer | eval m = [ source = inner | stats max(c) ] + b | fields m, a`

**Uncorrelated scalar subquery in Select and Where**
- `source = outer | where a > [ source = inner | stats min(c) ] | eval m = [ source = inner | stats max(c) ] | fields m, a`
**Uncorrelated scalar subquery in Where**
- `source = outer | where a > [ source = inner | stats min(c) ] | fields a`

**Uncorrelated scalar subquery in Search filter**
- `source = outer a > [ source = inner | stats min(c) ] | fields a`

**Correlated scalar subquery in Select**
- `source = outer | eval m = [ source = inner | where outer.b = inner.d | stats max(c) ] | fields m, a`
Expand All @@ -185,6 +195,10 @@ Assumptions: `a`, `b` are fields of table outer, `c`, `d` are fields of table in
- `source = outer | where a = [ source = inner | where b = d | stats max(c) ]`
- `source = outer | where [ source = inner | where outer.b = inner.d OR inner.d = 1 | stats count() ] > 0 | fields a`

**Correlated scalar subquery in Search filter**
- `source = outer a = [ source = inner | where b = d | stats max(c) ]`
- `source = outer [ source = inner | where outer.b = inner.d OR inner.d = 1 | stats count() ] > 0 | fields a`

**Nested scalar subquery**
- `source = outer | where a = [ source = inner | stats max(c) | sort c ] OR b = [ source = inner | where c = 1 | stats min(d) | sort d ]`
- `source = outer | where a = [ source = inner | where c = [ source = nested | stats max(e) by f | sort f ] | stats max(d) by c | sort c | head 1 ]`
Expand Down Expand Up @@ -240,27 +254,77 @@ source = spark_catalog.default.outer
source = spark_catalog.default.inner | where c = 1 | stats min(d) | sort d
]
```
---

### (Relation) Subquery
`InSubquery`, `ExistsSubquery` and `ScalarSubquery` are all subquery expressions. But `RelationSubquery` is not a subquery expression, it is a subquery plan which is common used in Join or From clause.

- `source = table1 | join left = l right = r [ source = table2 | where d > 10 | head 5 ]` (subquery in join right side)
- `source = [ source = table1 | join left = l right = r [ source = table2 | where d > 10 | head 5 ] | stats count(a) by b ] as outer | head 1`

### **Additional Context**
**_SQL Migration examples with Subquery PPL:_**

tpch q13
```sql
select
c_count,
count(*) as custdist
from
(
select
c_custkey,
count(o_orderkey) as c_count
from
customer left outer join orders on
c_custkey = o_custkey
and o_comment not like '%special%requests%'
group by
c_custkey
) as c_orders
group by
c_count
order by
custdist desc,
c_count desc
```
Rewritten by PPL (Relation) Subquery:
```sql
SEARCH source = [
SEARCH source = customer
| LEFT OUTER JOIN left = c right = o ON c_custkey = o_custkey
[
SEARCH source = orders
| WHERE not like(o_comment, '%special%requests%')
]
| STATS COUNT(o_orderkey) AS c_count BY c_custkey
] AS c_orders
| STATS COUNT(o_orderkey) AS c_count BY c_custkey
| STATS COUNT(1) AS custdist BY c_count
| SORT - custdist, - c_count
```
---

`InSubquery`, `ExistsSubquery` and `ScalarSubquery` are all subquery expression. The common usage of subquery expression is in `where` clause:
### Additional Context

The `where` command syntax is:
`InSubquery`, `ExistsSubquery` and `ScalarSubquery` as subquery expressions, their common usage is in `where` clause and `search filter`.

Where command:
```
| where <boolean expression> | ...
```
| where <boolean expression>
Search filter:
```
So the subquery is part of boolean expression, such as
search source=* <boolean expression> | ...
```
A subquery expression could be used in boolean expression, for example

```sql
| where orders.order_id in (subquery source=returns | where return_reason="damaged" | return order_id)
| where orders.order_id in [ source=returns | where return_reason="damaged" | field order_id ]
```

The `orders.order_id in (subquery source=...)` is a `<boolean expression>`.

In general, we name this kind of subquery clause the `InSubquery` expression, it is a `<boolean expression>`, one kind of `subquery expressions`.
The `orders.order_id in [ source=... ]` is a `<boolean expression>`.

PS: there are many kinds of `subquery expressions`, another commonly used one is `ScalarSubquery` expression:
In general, we name this kind of subquery clause the `InSubquery` expression, it is a `<boolean expression>`.

**Subquery with Different Join Types**

Expand Down Expand Up @@ -326,4 +390,18 @@ source = outer
| eval l = "nonEmpty"
| fields l
```
This query just print "nonEmpty" if the inner table is not empty.
This query just print "nonEmpty" if the inner table is not empty.

**Table alias in subquery**

Table alias is useful in query which contains a subquery, for example

```sql
select a, (
select sum(b)
from catalog.schema.table1 as t1
where t1.a = t2.a
) sum_b
from catalog.schema.table2 as t2
```
`t1` and `t2` are table aliases which are used in correlated subquery, `sum_b` are subquery alias.
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,44 @@ class FlintSparkPPLExistsSubqueryITSuite
comparePlans(logicalPlan, expectedPlan, checkAnalysis = false)
}

test("test simple exists subquery in search filter") {
val frame = sql(s"""
| source = $outerTable exists [ source = $innerTable | where id = uid ]
| | sort - salary
| | fields id, name, salary
| """.stripMargin)
val results: Array[Row] = frame.collect()
val expectedResults: Array[Row] = Array(
Row(1002, "John", 120000),
Row(1003, "David", 120000),
Row(1000, "Jake", 100000),
Row(1005, "Jane", 90000),
Row(1006, "Tommy", 30000))
implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, Integer](_.getAs[Integer](0))
assert(results.sorted.sameElements(expectedResults.sorted))

val logicalPlan: LogicalPlan = frame.queryExecution.logical

val outer = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test1"))
val inner = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test2"))
val existsSubquery = Filter(
Exists(Filter(EqualTo(UnresolvedAttribute("id"), UnresolvedAttribute("uid")), inner)),
outer)
val sortedPlan = Sort(
Seq(SortOrder(UnresolvedAttribute("salary"), Descending)),
global = true,
existsSubquery)
val expectedPlan =
Project(
Seq(
UnresolvedAttribute("id"),
UnresolvedAttribute("name"),
UnresolvedAttribute("salary")),
sortedPlan)

comparePlans(logicalPlan, expectedPlan, checkAnalysis = false)
}

test("test not exists subquery") {
val frame = sql(s"""
| source = $outerTable
Expand Down Expand Up @@ -122,6 +160,41 @@ class FlintSparkPPLExistsSubqueryITSuite
comparePlans(logicalPlan, expectedPlan, checkAnalysis = false)
}

test("test not exists subquery in search filter") {
val frame = sql(s"""
| source = $outerTable not exists [ source = $innerTable | where id = uid ]
| | sort - salary
| | fields id, name, salary
| """.stripMargin)
val results: Array[Row] = frame.collect()
val expectedResults: Array[Row] = Array(Row(1001, "Hello", 70000), Row(1004, "David", 0))
implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, Integer](_.getAs[Integer](0))
assert(results.sorted.sameElements(expectedResults.sorted))

val logicalPlan: LogicalPlan = frame.queryExecution.logical

val outer = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test1"))
val inner = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test2"))
val existsSubquery =
Filter(
Not(
Exists(Filter(EqualTo(UnresolvedAttribute("id"), UnresolvedAttribute("uid")), inner))),
outer)
val sortedPlan = Sort(
Seq(SortOrder(UnresolvedAttribute("salary"), Descending)),
global = true,
existsSubquery)
val expectedPlan =
Project(
Seq(
UnresolvedAttribute("id"),
UnresolvedAttribute("name"),
UnresolvedAttribute("salary")),
sortedPlan)

comparePlans(logicalPlan, expectedPlan, checkAnalysis = false)
}

test("test empty exists subquery") {
var frame = sql(s"""
| source = $outerTable
Expand Down
Loading
Loading