Skip to content

Commit 48593d7

Browse files
authored
Add last_by and first_by aggregation support for table model
1 parent 9ea87df commit 48593d7

File tree

17 files changed

+1208
-82
lines changed

17 files changed

+1208
-82
lines changed

integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBMultiIDsWithAttributesTableIT.java

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -989,6 +989,67 @@ public void lastFirstMaxMinTest() {
989989
tableResultSetEqualTest(sql, expectedHeader, retArray, DATABASE_NAME);
990990
}
991991

992+
@Test
993+
public void lastByFirstByTest() {
994+
String[] expectedHeader1 = buildHeaders(13);
995+
String[] expectedHeader2 = buildHeaders(14);
996+
997+
sql =
998+
"select last_by(time,time),last_by(device,time),last_by(level,time),last_by(attr1,time),last_by(attr2,time),last_by(num,time),last_by(bignum,time),last_by(floatnum,time),last_by(str,time),last_by(bool,time),last_by(date,time),last_by(ts,time),last_by(stringv,time) from table0 where device='d2'";
999+
retArray =
1000+
new String[] {
1001+
"1971-08-20T11:33:20.000Z,d2,l5,null,null,15,3147483648,235.213,watermelon,true,2023-01-01,null,null,",
1002+
};
1003+
tableResultSetEqualTest(sql, expectedHeader1, retArray, DATABASE_NAME);
1004+
sql =
1005+
"select last_by(time,time),last_by(device,time),last_by(level,time),last_by(attr1,time),last_by(attr2,time),last_by(num,time),last_by(bignum,time),last_by(floatnum,time),last_by(str,time),last_by(bool,time),last_by(date,time),last_by(ts,time),last_by(stringv,time),last_by(blob,time) from table0 where device='d2'";
1006+
retArray =
1007+
new String[] {
1008+
"1971-08-20T11:33:20.000Z,d2,l5,null,null,15,3147483648,235.213,watermelon,true,2023-01-01,null,null,null,",
1009+
};
1010+
tableResultSetEqualTest(sql, expectedHeader2, retArray, DATABASE_NAME);
1011+
1012+
sql =
1013+
"select last_by(time,time),last_by(time,device),last_by(time,level),last_by(time,attr1),last_by(time,attr2),last_by(time,num),last_by(time,bignum),last_by(time,floatnum),last_by(time,str),last_by(time,bool),last_by(time,date),last_by(time,ts),last_by(time,stringv) from table0 where device='d2'";
1014+
retArray =
1015+
new String[] {
1016+
"1971-08-20T11:33:20.000Z,1971-08-20T11:33:20.000Z,1971-08-20T11:33:20.000Z,1971-04-26T17:46:40.000Z,1971-01-01T00:01:40.000Z,1971-08-20T11:33:20.000Z,1971-08-20T11:33:20.000Z,1971-08-20T11:33:20.000Z,1971-08-20T11:33:20.000Z,1971-08-20T11:33:20.000Z,1971-08-20T11:33:20.000Z,1971-01-01T00:01:40.000Z,1971-01-01T00:01:40.000Z,",
1017+
};
1018+
tableResultSetEqualTest(sql, expectedHeader1, retArray, DATABASE_NAME);
1019+
sql =
1020+
"select last_by(time,time),last_by(time,device),last_by(time,level),last_by(time,attr1),last_by(time,attr2),last_by(time,num),last_by(time,bignum),last_by(time,floatnum),last_by(time,str),last_by(time,bool),last_by(time,date),last_by(time,ts),last_by(time,stringv),last_by(time,blob) from table0 where device='d2'";
1021+
retArray =
1022+
new String[] {
1023+
"1971-08-20T11:33:20.000Z,1971-08-20T11:33:20.000Z,1971-08-20T11:33:20.000Z,1971-04-26T17:46:40.000Z,1971-01-01T00:01:40.000Z,1971-08-20T11:33:20.000Z,1971-08-20T11:33:20.000Z,1971-08-20T11:33:20.000Z,1971-08-20T11:33:20.000Z,1971-08-20T11:33:20.000Z,1971-08-20T11:33:20.000Z,1971-01-01T00:01:40.000Z,1971-01-01T00:01:40.000Z,1970-01-01T00:00:00.080Z,",
1024+
};
1025+
tableResultSetEqualTest(sql, expectedHeader2, retArray, DATABASE_NAME);
1026+
1027+
String[] expectedHeader11 = buildHeaders(expectedHeader1.length * 2);
1028+
sql =
1029+
"select last_by(time,time),last_by(device,time),last_by(level,time),last_by(attr1,time),last_by(attr2,time),last_by(num,time),last_by(bignum,time),last_by(floatnum,time),last_by(str,time),last_by(bool,time),last_by(date,time),last_by(ts,time),last_by(stringv,time),last_by(time,time),last_by(time,device),last_by(time,level),last_by(time,attr1),last_by(time,attr2),last_by(time,num),last_by(time,bignum),last_by(time,floatnum),last_by(time,str),last_by(time,bool),last_by(time,date),last_by(time,ts),last_by(time,stringv) from table0 where device='d2'";
1030+
retArray =
1031+
new String[] {
1032+
"1971-08-20T11:33:20.000Z,d2,l5,null,null,15,3147483648,235.213,watermelon,true,2023-01-01,null,null,1971-08-20T11:33:20.000Z,1971-08-20T11:33:20.000Z,1971-08-20T11:33:20.000Z,1971-04-26T17:46:40.000Z,1971-01-01T00:01:40.000Z,1971-08-20T11:33:20.000Z,1971-08-20T11:33:20.000Z,1971-08-20T11:33:20.000Z,1971-08-20T11:33:20.000Z,1971-08-20T11:33:20.000Z,1971-08-20T11:33:20.000Z,1971-01-01T00:01:40.000Z,1971-01-01T00:01:40.000Z,",
1033+
};
1034+
tableResultSetEqualTest(sql, expectedHeader11, retArray, DATABASE_NAME);
1035+
1036+
sql =
1037+
"select first_by(time,time),first_by(device,time),first_by(level,time),first_by(attr1,time),first_by(attr2,time),first_by(num,time),first_by(bignum,time),first_by(floatnum,time),first_by(str,time),first_by(bool,time),first_by(date,time),first_by(ts,time),first_by(stringv,time) from table0 where device='d2' and time>80";
1038+
retArray =
1039+
new String[] {
1040+
"1970-01-01T00:00:00.100Z,d2,l5,null,null,8,2147483964,4654.231,papaya,true,null,null,null,",
1041+
};
1042+
tableResultSetEqualTest(sql, expectedHeader1, retArray, DATABASE_NAME);
1043+
1044+
sql =
1045+
"select first_by(time,time),first_by(time,device),first_by(time,level),first_by(time,attr1),first_by(time,attr2),first_by(time,num),first_by(time,bignum),first_by(time,floatnum),first_by(time,str),first_by(time,bool),first_by(time,date),first_by(time,ts),first_by(time,stringv) from table0 where device='d2' and time>80";
1046+
retArray =
1047+
new String[] {
1048+
"1970-01-01T00:00:00.100Z,1970-01-01T00:00:00.100Z,1970-01-01T00:00:00.100Z,1971-01-01T00:00:00.000Z,1971-01-01T00:00:00.000Z,1970-01-01T00:00:00.100Z,1970-01-01T00:00:00.100Z,1970-01-01T00:00:00.100Z,1970-01-01T00:00:00.100Z,1970-01-01T00:00:00.100Z,1971-08-20T11:33:20.000Z,1971-01-01T00:01:40.000Z,1971-01-01T00:01:40.000Z,",
1049+
};
1050+
tableResultSetEqualTest(sql, expectedHeader1, retArray, DATABASE_NAME);
1051+
}
1052+
9921053
// ==================================================================
9931054
// ============================ Join Test ===========================
9941055
// ==================================================================
@@ -1274,4 +1335,12 @@ public void fullOuterJoinTest2() {
12741335
+ "ORDER BY time, t1.device, t2.device";
12751336
tableResultSetEqualTest(sql, expectedHeader, retArray, DATABASE_NAME);
12761337
}
1338+
1339+
public static String[] buildHeaders(int length) {
1340+
String[] expectedHeader = new String[length];
1341+
for (int i = 0; i < length; i++) {
1342+
expectedHeader[i] = "_col" + i;
1343+
}
1344+
return expectedHeader;
1345+
}
12771346
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/AccumulatorFactory.java

Lines changed: 29 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -23,17 +23,18 @@
2323
import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.GroupedAccumulator;
2424
import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.GroupedAvgAccumulator;
2525
import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.GroupedCountAccumulator;
26-
import org.apache.iotdb.db.queryengine.plan.expression.Expression;
27-
import org.apache.iotdb.db.queryengine.plan.expression.binary.CompareBinaryExpression;
28-
import org.apache.iotdb.db.queryengine.plan.expression.leaf.ConstantOperand;
26+
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression;
27+
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.SymbolReference;
2928

3029
import org.apache.tsfile.enums.TSDataType;
3130

3231
import java.util.List;
3332
import java.util.Map;
3433

3534
import static com.google.common.base.Preconditions.checkState;
36-
import static org.apache.iotdb.db.queryengine.plan.relational.metadata.TableBuiltinAggregationFunction.LAST;
35+
import static org.apache.iotdb.commons.schema.table.TsTable.TIME_COLUMN_NAME;
36+
import static org.apache.iotdb.db.queryengine.plan.relational.metadata.TableBuiltinAggregationFunction.FIRST_BY;
37+
import static org.apache.iotdb.db.queryengine.plan.relational.metadata.TableBuiltinAggregationFunction.LAST_BY;
3738

3839
public class AccumulatorFactory {
3940

@@ -47,6 +48,23 @@ public static TableAccumulator createAccumulator(
4748
if (aggregationType == TAggregationType.UDAF) {
4849
// If UDAF accumulator receives raw input, it needs to check input's attribute
4950
throw new UnsupportedOperationException();
51+
} else if ((LAST_BY.getFunctionName().equals(functionName)
52+
|| FIRST_BY.getFunctionName().equals(functionName))
53+
&& inputExpressions.size() > 1) {
54+
boolean xIsTimeColumn = false;
55+
boolean yIsTimeColumn = false;
56+
if (isTimeColumn(inputExpressions.get(1))) {
57+
yIsTimeColumn = true;
58+
} else if (isTimeColumn(inputExpressions.get(0))) {
59+
xIsTimeColumn = true;
60+
}
61+
if (LAST_BY.getFunctionName().equals(functionName)) {
62+
return new LastByAccumulator(
63+
inputDataTypes.get(0), inputDataTypes.get(1), xIsTimeColumn, yIsTimeColumn);
64+
} else {
65+
return new FirstByAccumulator(
66+
inputDataTypes.get(0), inputDataTypes.get(1), xIsTimeColumn, yIsTimeColumn);
67+
}
5068
} else {
5169
return createBuiltinAccumulator(
5270
aggregationType, inputDataTypes, inputExpressions, inputAttributes, ascending);
@@ -106,6 +124,10 @@ public static TableAccumulator createBuiltinAccumulator(
106124
return new MaxAccumulator(inputDataTypes.get(0));
107125
case MIN:
108126
return new MinAccumulator(inputDataTypes.get(0));
127+
case LAST_BY:
128+
return new LastByAccumulator(inputDataTypes.get(0), inputDataTypes.get(1), false, false);
129+
case FIRST_BY:
130+
return new FirstByAccumulator(inputDataTypes.get(0), inputDataTypes.get(1), false, false);
109131
default:
110132
throw new IllegalArgumentException("Invalid Aggregation function: " + aggregationType);
111133
}
@@ -219,34 +241,8 @@ public interface KeepEvaluator {
219241
boolean apply(long keep);
220242
}
221243

222-
public static KeepEvaluator initKeepEvaluator(Expression keepExpression) {
223-
// We have checked semantic in FE,
224-
// keep expression must be ConstantOperand or CompareBinaryExpression here
225-
if (keepExpression instanceof ConstantOperand) {
226-
return keep -> keep >= Long.parseLong(keepExpression.getExpressionString());
227-
} else {
228-
long constant =
229-
Long.parseLong(
230-
((CompareBinaryExpression) keepExpression)
231-
.getRightExpression()
232-
.getExpressionString());
233-
switch (keepExpression.getExpressionType()) {
234-
case LESS_THAN:
235-
return keep -> keep < constant;
236-
case LESS_EQUAL:
237-
return keep -> keep <= constant;
238-
case GREATER_THAN:
239-
return keep -> keep > constant;
240-
case GREATER_EQUAL:
241-
return keep -> keep >= constant;
242-
case EQUAL_TO:
243-
return keep -> keep == constant;
244-
case NON_EQUAL:
245-
return keep -> keep != constant;
246-
default:
247-
throw new IllegalArgumentException(
248-
"unsupported expression type: " + keepExpression.getExpressionType());
249-
}
250-
}
244+
public static boolean isTimeColumn(Expression expression) {
245+
return expression instanceof SymbolReference
246+
&& TIME_COLUMN_NAME.equals(((SymbolReference) expression).getName());
251247
}
252248
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/AvgAccumulator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ public boolean hasFinalResult() {
138138

139139
@Override
140140
public void addStatistics(Statistics[] statistics) {
141-
if (statistics == null) {
141+
if (statistics == null || statistics[0] == null) {
142142
return;
143143
}
144144
initResult = true;

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/CountAccumulator.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,9 +82,10 @@ public boolean hasFinalResult() {
8282

8383
@Override
8484
public void addStatistics(Statistics[] statistics) {
85-
if (statistics[0] == null) {
85+
if (statistics == null || statistics[0] == null) {
8686
return;
8787
}
88+
8889
countState += statistics[0].getCount();
8990
}
9091

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/FirstAccumulator.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -196,12 +196,12 @@ public void evaluateFinal(ColumnBuilder columnBuilder) {
196196

197197
@Override
198198
public boolean hasFinalResult() {
199-
return initResult;
199+
return false;
200200
}
201201

202202
@Override
203203
public void addStatistics(Statistics[] statistics) {
204-
if (statistics[0] == null) {
204+
if (statistics == null || statistics[0] == null) {
205205
return;
206206
}
207207
switch (seriesDataType) {

0 commit comments

Comments
 (0)