Skip to content

Commit

Permalink
Allow adding and subtracting timestamp types in the multi-stage engine (
Browse files Browse the repository at this point in the history
  • Loading branch information
yashmayya committed Jan 9, 2025
1 parent 05bd0b9 commit 4e904d4
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,17 @@
import java.util.Map;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import org.apache.calcite.sql.SqlBinaryOperator;
import org.apache.calcite.sql.SqlFunction;
import org.apache.calcite.sql.SqlFunctionCategory;
import org.apache.calcite.sql.SqlIdentifier;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlOperator;
import org.apache.calcite.sql.SqlOperatorTable;
import org.apache.calcite.sql.SqlSyntax;
import org.apache.calcite.sql.fun.SqlMonotonicBinaryOperator;
import org.apache.calcite.sql.fun.SqlStdOperatorTable;
import org.apache.calcite.sql.type.InferTypes;
import org.apache.calcite.sql.type.OperandTypes;
import org.apache.calcite.sql.type.ReturnTypes;
import org.apache.calcite.sql.type.SqlTypeFamily;
Expand Down Expand Up @@ -68,6 +71,30 @@ public static PinotOperatorTable instance() {
return INSTANCE.get();
}

// The standard Calcite + and - operators don't support operations on TIMESTAMP types. However, Pinot supports these
// operations, so we need to define our own operators. Note that Postgres supports - on TIMESTAMP types, but not +.
// Calcite only supports such operations if the second operand is an interval (similar to Postgres for the +
// operator).
public static final SqlBinaryOperator PINOT_PLUS =
new SqlMonotonicBinaryOperator(
"+",
SqlKind.PLUS,
40,
true,
ReturnTypes.NULLABLE_SUM,
InferTypes.FIRST_KNOWN,
OperandTypes.PLUS_OPERATOR.or(OperandTypes.family(SqlTypeFamily.TIMESTAMP, SqlTypeFamily.TIMESTAMP)));

public static final SqlBinaryOperator PINOT_MINUS =
new SqlMonotonicBinaryOperator(
"-",
SqlKind.MINUS,
40,
true,
ReturnTypes.NULLABLE_SUM,
InferTypes.FIRST_KNOWN,
OperandTypes.MINUS_OPERATOR.or(OperandTypes.family(SqlTypeFamily.TIMESTAMP, SqlTypeFamily.TIMESTAMP)));

/**
* This list includes the supported standard {@link SqlOperator}s defined in {@link SqlStdOperatorTable}.
* NOTE: The operator order follows the same order as defined in {@link SqlStdOperatorTable} for easier search.
Expand Down Expand Up @@ -104,12 +131,12 @@ public static PinotOperatorTable instance() {
SqlStdOperatorTable.SEARCH,
SqlStdOperatorTable.LESS_THAN,
SqlStdOperatorTable.LESS_THAN_OR_EQUAL,
SqlStdOperatorTable.MINUS,
SqlStdOperatorTable.MULTIPLY,
SqlStdOperatorTable.NOT_EQUALS,
SqlStdOperatorTable.OR,
SqlStdOperatorTable.PLUS,
SqlStdOperatorTable.INTERVAL,
PINOT_MINUS,
PINOT_PLUS,

// POSTFIX OPERATORS
SqlStdOperatorTable.DESC,
Expand Down Expand Up @@ -225,8 +252,8 @@ public static PinotOperatorTable instance() {
Pair.of(SqlStdOperatorTable.GREATER_THAN_OR_EQUAL, List.of("GREATER_THAN_OR_EQUAL")),
Pair.of(SqlStdOperatorTable.LESS_THAN, List.of("LESS_THAN")),
Pair.of(SqlStdOperatorTable.LESS_THAN_OR_EQUAL, List.of("LESS_THAN_OR_EQUAL")),
Pair.of(SqlStdOperatorTable.MINUS, List.of("SUB", "MINUS")),
Pair.of(SqlStdOperatorTable.PLUS, List.of("ADD", "PLUS")),
Pair.of(PINOT_MINUS, List.of("SUB", "MINUS")),
Pair.of(PINOT_PLUS, List.of("ADD", "PLUS")),
Pair.of(SqlStdOperatorTable.MULTIPLY, List.of("MULT", "TIMES"))
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ static Schema.SchemaBuilder getSchemaBuilder(String schemaName) {
.addSingleValueDimension("col2", FieldSpec.DataType.STRING, "")
.addSingleValueDimension("col5", FieldSpec.DataType.BOOLEAN, false)
.addDateTime("ts", FieldSpec.DataType.LONG, "1:MILLISECONDS:EPOCH", "1:HOURS")
.addDateTime("ts_timestamp", FieldSpec.DataType.TIMESTAMP, "1:MILLISECONDS:EPOCH", "1:HOURS")
.addMetric("col3", FieldSpec.DataType.INT, 0)
.addMetric("col4", FieldSpec.DataType.BIG_DECIMAL, 0)
.addMetric("col6", FieldSpec.DataType.INT, 0)
Expand Down Expand Up @@ -231,6 +232,10 @@ protected Object[][] provideQueries() {
"SELECT /*+ aggOptions(is_skip_leaf_stage_group_by='true') */ a.col2, a.col3 FROM a JOIN b "
+ "ON a.col1 = b.col1 WHERE a.col3 >= 0 GROUP BY a.col2, a.col3"
},
new Object[]{"SELECT ts_timestamp - CAST(123456789 AS TIMESTAMP) FROM a"},
new Object[]{"SELECT SUB(ts_timestamp, CAST(123456789 AS TIMESTAMP)) FROM a"},
new Object[]{"SELECT ts_timestamp + CAST(123456789 AS TIMESTAMP) FROM a"},
new Object[]{"SELECT ADD(ts_timestamp, CAST(123456789 AS TIMESTAMP)) FROM a"}
};
}

Expand Down
18 changes: 9 additions & 9 deletions pinot-query-planner/src/test/resources/queries/JoinPlans.json
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
"sql": "EXPLAIN PLAN FOR SELECT * FROM a JOIN b ON a.col1 = b.col2",
"output": [
"Execution Plan",
"\nLogicalJoin(condition=[=($0, $8)], joinType=[inner])",
"\nLogicalJoin(condition=[=($0, $9)], joinType=[inner])",
"\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalTableScan(table=[[default, a]])",
"\n PinotLogicalExchange(distribution=[hash[1]])",
Expand All @@ -55,7 +55,7 @@
"sql": "EXPLAIN PLAN FOR SELECT * FROM a JOIN b ON a.col1 = b.col2 WHERE a.col3 >= 0",
"output": [
"Execution Plan",
"\nLogicalJoin(condition=[=($0, $8)], joinType=[inner])",
"\nLogicalJoin(condition=[=($0, $9)], joinType=[inner])",
"\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalFilter(condition=[>=($2, 0)])",
"\n LogicalTableScan(table=[[default, a]])",
Expand All @@ -69,7 +69,7 @@
"sql": "EXPLAIN PLAN FOR SELECT * FROM a JOIN b ON a.col1 = b.col2 WHERE a.col3 >= 0 AND a.col3 > b.col3",
"output": [
"Execution Plan",
"\nLogicalJoin(condition=[AND(=($0, $8), >($2, $9))], joinType=[inner])",
"\nLogicalJoin(condition=[AND(=($0, $9), >($2, $10))], joinType=[inner])",
"\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalFilter(condition=[>=($2, 0)])",
"\n LogicalTableScan(table=[[default, a]])",
Expand All @@ -83,7 +83,7 @@
"sql": "EXPLAIN PLAN FOR SELECT * FROM a JOIN b on a.col1 = b.col1 AND a.col2 = b.col2",
"output": [
"Execution Plan",
"\nLogicalJoin(condition=[AND(=($0, $7), =($1, $8))], joinType=[inner])",
"\nLogicalJoin(condition=[AND(=($0, $8), =($1, $9))], joinType=[inner])",
"\n PinotLogicalExchange(distribution=[hash[0, 1]])",
"\n LogicalTableScan(table=[[default, a]])",
"\n PinotLogicalExchange(distribution=[hash[0, 1]])",
Expand Down Expand Up @@ -431,9 +431,9 @@
"sql": "EXPLAIN PLAN FOR WITH tmp1 AS ( SELECT * FROM a WHERE col2 NOT IN ('foo', 'bar') ) SELECT * FROM a WHERE col2 IN (SELECT col1 FROM tmp1) AND col2 IN (SELECT col1 FROM b WHERE col3 > 100) AND col3 IN (SELECT col3 from b WHERE col3 < 100)",
"output": [
"Execution Plan",
"\nLogicalJoin(condition=[=($2, $7)], joinType=[semi])",
"\n LogicalJoin(condition=[=($1, $7)], joinType=[semi])",
"\n LogicalJoin(condition=[=($1, $7)], joinType=[semi])",
"\nLogicalJoin(condition=[=($2, $8)], joinType=[semi])",
"\n LogicalJoin(condition=[=($1, $8)], joinType=[semi])",
"\n LogicalJoin(condition=[=($1, $8)], joinType=[semi])",
"\n LogicalTableScan(table=[[default, a]])",
"\n PinotLogicalExchange(distribution=[broadcast], relExchangeType=[PIPELINE_BREAKER])",
"\n LogicalProject(col1=[$0])",
Expand Down Expand Up @@ -505,8 +505,8 @@
"sql": "EXPLAIN PLAN FOR WITH tmp1 AS ( SELECT * FROM a WHERE col2 NOT IN ('foo', 'bar') ), tmp2 AS ( SELECT * FROM b WHERE col1 IN (SELECT col1 FROM tmp1) AND col3 < 100 ) SELECT * FROM tmp2 WHERE col3 IN (SELECT col3 from tmp1)",
"output": [
"Execution Plan",
"\nLogicalJoin(condition=[=($2, $7)], joinType=[semi])",
"\n LogicalJoin(condition=[=($0, $7)], joinType=[semi])",
"\nLogicalJoin(condition=[=($2, $8)], joinType=[semi])",
"\n LogicalJoin(condition=[=($0, $8)], joinType=[semi])",
"\n LogicalFilter(condition=[<($2, 100)])",
"\n LogicalTableScan(table=[[default, b]])",
"\n PinotLogicalExchange(distribution=[broadcast], relExchangeType=[PIPELINE_BREAKER])",
Expand Down

0 comments on commit 4e904d4

Please sign in to comment.