Skip to content

Commit

Permalink
[CALCITE-4572] Piglet fails if Pig Latin script contains RANK or FILT…
Browse files Browse the repository at this point in the history
…ER operators (Mahesh Kumar Behera)

Close apache#2394
  • Loading branch information
maheshk114 authored and julianhyde committed Apr 15, 2021
1 parent 760714d commit 90530a0
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,6 @@
* algebra plans.
*/
class PigRelOpVisitor extends PigRelOpWalker.PlanPreVisitor {
private static final String RANK_PREFIX = "rank_";

// The relational algebra builder customized for Pig
protected final PigRelBuilder builder;
private Operator currentRoot;
Expand Down Expand Up @@ -648,7 +646,7 @@ private static JoinRelType getJoinType(boolean leftInner, boolean rightInner) {
List<String> fieldNames = new ArrayList<>();

projectedFields.add(rankField);
fieldNames.add(RANK_PREFIX + loRank.getAlias()); // alias of the rank field
fieldNames.add(loRank.getSchema().getField(0).alias); // alias of the rank field
for (int i = 0; i < inputRowType.getFieldCount(); i++) {
projectedFields.add(builder.field(i));
fieldNames.add(inputRowType.getFieldNames().get(i));
Expand Down
34 changes: 28 additions & 6 deletions piglet/src/test/java/org/apache/calcite/test/PigRelOpTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -1142,15 +1142,15 @@ private Fluent pig(String script) {
+ " LogicalProject(EMPNO=[$0], JOB=[$2], DEPTNO=[$7])\n"
+ " LogicalTableScan(table=[[scott, EMP]])\n";
final String optimizedPlan = ""
+ "LogicalProject(rank_C=[$3], EMPNO=[$0], JOB=[$1], DEPTNO=[$2])\n"
+ "LogicalProject(rank_B=[$3], EMPNO=[$0], JOB=[$1], DEPTNO=[$2])\n"
+ " LogicalWindow(window#0=[window(order by [2, 1 DESC] "
+ "aggs [RANK()])])\n"
+ " LogicalProject(EMPNO=[$0], JOB=[$2], DEPTNO=[$7])\n"
+ " LogicalTableScan(table=[[scott, EMP]])\n";

final String script = base + "C = RANK B BY DEPTNO ASC, JOB DESC;\n";
final String plan = ""
+ "LogicalProject(rank_C=[RANK() OVER (ORDER BY $2, $1 DESC)], "
+ "LogicalProject(rank_B=[RANK() OVER (ORDER BY $2, $1 DESC)], "
+ "EMPNO=[$0], JOB=[$1], DEPTNO=[$2])\n"
+ basePlan;
final String result = ""
Expand All @@ -1170,7 +1170,7 @@ private Fluent pig(String script) {
+ "(14,7900,CLERK,30)\n";
final String sql = ""
+ "SELECT RANK() OVER (ORDER BY DEPTNO, JOB DESC RANGE BETWEEN "
+ "UNBOUNDED PRECEDING AND CURRENT ROW) AS rank_C, EMPNO, JOB, DEPTNO\n"
+ "UNBOUNDED PRECEDING AND CURRENT ROW) AS rank_B, EMPNO, JOB, DEPTNO\n"
+ "FROM scott.EMP";
pig(script).assertRel(hasTree(plan))
.assertOptimizedRel(hasTree(optimizedPlan))
Expand All @@ -1179,14 +1179,14 @@ private Fluent pig(String script) {

final String script2 = base + "C = RANK B BY DEPTNO ASC, JOB DESC DENSE;\n";
final String optimizedPlan2 = ""
+ "LogicalProject(rank_C=[$3], EMPNO=[$0], JOB=[$1], DEPTNO=[$2])\n"
+ "LogicalProject(rank_B=[$3], EMPNO=[$0], JOB=[$1], DEPTNO=[$2])\n"
+ " LogicalWindow(window#0=[window(order by [2, 1 DESC] "
+ "aggs [DENSE_RANK()])"
+ "])\n"
+ " LogicalProject(EMPNO=[$0], JOB=[$2], DEPTNO=[$7])\n"
+ " LogicalTableScan(table=[[scott, EMP]])\n";
final String plan2 = ""
+ "LogicalProject(rank_C=[DENSE_RANK() OVER (ORDER BY $2, $1 DESC)], "
+ "LogicalProject(rank_B=[DENSE_RANK() OVER (ORDER BY $2, $1 DESC)], "
+ "EMPNO=[$0], JOB=[$1], DEPTNO=[$2])\n"
+ basePlan;
final String result2 = ""
Expand All @@ -1206,7 +1206,7 @@ private Fluent pig(String script) {
+ "(9,7900,CLERK,30)\n";
final String sql2 = ""
+ "SELECT DENSE_RANK() OVER (ORDER BY DEPTNO, JOB DESC RANGE BETWEEN "
+ "UNBOUNDED PRECEDING AND CURRENT ROW) AS rank_C, EMPNO, JOB, DEPTNO\n"
+ "UNBOUNDED PRECEDING AND CURRENT ROW) AS rank_B, EMPNO, JOB, DEPTNO\n"
+ "FROM scott.EMP";
pig(script2).assertRel(hasTree(plan2))
.assertOptimizedRel(hasTree(optimizedPlan2))
Expand Down Expand Up @@ -1621,4 +1621,26 @@ private Fluent pig(String script) {
pig(script).assertRel(hasTree(plan))
.assertSql(is(sql));
}

@Test void testRankAndFilter() {
final String script = ""
+ "A = LOAD 'emp1' USING PigStorage(',') as ("
+ " id:int, name:chararray, age:int, city:chararray);\n"
+ "B = rank A;\n"
+ "C = FILTER B by ($0 > 1);";

final String plan = ""
+ "LogicalFilter(condition=[>($0, 1)])\n"
+ " LogicalProject(rank_A=[RANK() OVER ()], id=[$0],"
+ " name=[$1], age=[$2], city=[$3])\n"
+ " LogicalTableScan(table=[[emp1]])\n";

final String sql = "SELECT w0$o0 AS rank_A, id, name, age, city\n"
+ "FROM (SELECT id, name, age, city, RANK() OVER (RANGE BETWEEN "
+ "UNBOUNDED PRECEDING AND CURRENT ROW)\n"
+ " FROM emp1) AS t\n"
+ "WHERE w0$o0 > 1";
pig(script).assertRel(hasTree(plan))
.assertSql(is(sql));
}
}

0 comments on commit 90530a0

Please sign in to comment.