Skip to content

Commit

Permalink
[core] support to compare varchar/char type in aggregate merge engine (
Browse files Browse the repository at this point in the history
  • Loading branch information
Aitozi authored Nov 29, 2023
1 parent 99a1910 commit 40d373b
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 36 deletions.
2 changes: 1 addition & 1 deletion docs/content/concepts/primary-key-table.md
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ Field `price` will be aggregated by the `max` function, and field `sales` will b
Current supported aggregate functions and data types are:

* `sum`: supports DECIMAL, TINYINT, SMALLINT, INTEGER, BIGINT, FLOAT and DOUBLE.
* `min`/`max`: support DECIMAL, TINYINT, SMALLINT, INTEGER, BIGINT, FLOAT, DOUBLE, DATE, TIME, TIMESTAMP and TIMESTAMP_LTZ.
* `min`/`max`: support CHAR, VARCHAR, DECIMAL, TINYINT, SMALLINT, INTEGER, BIGINT, FLOAT, DOUBLE, DATE, TIME, TIMESTAMP and TIMESTAMP_LTZ.
* `last_value` / `last_non_null_value`: support all data types.
* `listagg`: supports STRING data type.
* `bool_and` / `bool_or`: support BOOLEAN data type.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,8 +279,12 @@ public static int compare(Object x, Object y, DataTypeRoot type) {
case VARBINARY:
ret = byteArrayCompare((byte[]) x, (byte[]) y);
break;
case VARCHAR:
case CHAR:
ret = ((BinaryString) x).compareTo((BinaryString) y);
break;
default:
throw new IllegalArgumentException();
throw new IllegalArgumentException("Incomparable type: " + type);
}
return ret;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.paimon.utils;

import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.Decimal;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.Timestamp;
Expand Down Expand Up @@ -134,5 +135,13 @@ public void testCompare() {
// test TIME_WITHOUT_TIME_ZONE data type
assertThat(InternalRowUtils.compare(165, 168, DataTypeRoot.TIME_WITHOUT_TIME_ZONE))
.isLessThan(0);

// test VARCHAR type
assertThat(
InternalRowUtils.compare(
BinaryString.fromString("a"),
BinaryString.fromString("b"),
DataTypeRoot.VARCHAR))
.isLessThan(0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,8 @@ protected List<String> ddl() {
+ "h DOUBLE,"
+ "i DATE,"
+ "l TIMESTAMP,"
+ "m CHAR(1),"
+ "n VARCHAR,"
+ "PRIMARY KEY (j,k) NOT ENFORCED)"
+ " WITH ('merge-engine'='aggregation', "
+ "'fields.a.aggregate-function'='min', "
Expand All @@ -374,7 +376,9 @@ protected List<String> ddl() {
+ "'fields.f.aggregate-function'='min',"
+ "'fields.h.aggregate-function'='min',"
+ "'fields.i.aggregate-function'='min',"
+ "'fields.l.aggregate-function'='min'"
+ "'fields.l.aggregate-function'='min',"
+ "'fields.m.aggregate-function'='min',"
+ "'fields.n.aggregate-function'='min'"
+ ");");
}

Expand All @@ -384,13 +388,13 @@ public void testMergeInMemory() {
"INSERT INTO T3 VALUES "
+ "(1, 2, CAST(NULL AS INT), 1.01, CAST(-1 AS TINYINT), CAST(-1 AS SMALLINT), "
+ "CAST(1000 AS BIGINT), 1.11, CAST(1.11 AS DOUBLE), CAST('2020-01-01' AS DATE), "
+ "CAST('2021-01-01 01:01:01' AS TIMESTAMP)),"
+ "CAST('2021-01-01 01:01:01' AS TIMESTAMP), 'a', 'aaa'),"
+ "(1, 2, 2, 1.10, CAST(2 AS TINYINT), CAST(2 AS SMALLINT), "
+ "CAST(100000 AS BIGINT), -1.11, CAST(1.21 AS DOUBLE), CAST('2020-01-02' AS DATE), "
+ "CAST('2022-01-01 01:01:01' AS TIMESTAMP)), "
+ "CAST('2022-01-01 01:01:01' AS TIMESTAMP), 'b', 'bbb'), "
+ "(1, 2, 3, 10.00, CAST(1 AS TINYINT), CAST(1 AS SMALLINT), "
+ "CAST(10000000 AS BIGINT), 0, CAST(-1.11 AS DOUBLE), CAST('2022-01-02' AS DATE), "
+ "CAST('2022-01-01 02:00:00' AS TIMESTAMP))");
+ "CAST('2022-01-01 02:00:00' AS TIMESTAMP), 'c', 'ccc')");
List<Row> result = batchSql("SELECT * FROM T3");
assertThat(result)
.containsExactlyInAnyOrder(
Expand All @@ -405,23 +409,25 @@ public void testMergeInMemory() {
(float) -1.11,
-1.11,
LocalDate.of(2020, 1, 1),
LocalDateTime.of(2021, 1, 1, 1, 1, 1)));
LocalDateTime.of(2021, 1, 1, 1, 1, 1),
"a",
"aaa"));
}

@Test
public void testMergeRead() {
batchSql(
"INSERT INTO T3 VALUES "
+ "(1, 2, CAST(NULL AS INT), 1.01, CAST(1 AS TINYINT), CAST(-1 AS SMALLINT), CAST(1000 AS BIGINT), "
+ "1.11, CAST(1.11 AS DOUBLE), CAST('2020-01-01' AS DATE), CAST('2021-01-01 01:01:01' AS TIMESTAMP))");
+ "1.11, CAST(1.11 AS DOUBLE), CAST('2020-01-01' AS DATE), CAST('2021-01-01 01:01:01' AS TIMESTAMP), 'a', 'aaa')");
batchSql(
"INSERT INTO T3 VALUES "
+ "(1, 2, 2, 1.10, CAST(2 AS TINYINT), CAST(2 AS SMALLINT), CAST(100000 AS BIGINT), "
+ "-1.11, CAST(1.21 AS DOUBLE), CAST('2020-01-02' AS DATE), CAST('2022-01-01 01:01:01' AS TIMESTAMP))");
+ "-1.11, CAST(1.21 AS DOUBLE), CAST('2020-01-02' AS DATE), CAST('2022-01-01 01:01:01' AS TIMESTAMP), 'b', 'bbb')");
batchSql(
"INSERT INTO T3 VALUES "
+ "(1, 2, 3, 10.00, CAST(-1 AS TINYINT), CAST(1 AS SMALLINT), CAST(10000000 AS BIGINT), "
+ "0, CAST(-1.11 AS DOUBLE), CAST('2022-01-02' AS DATE), CAST('2022-01-01 02:00:00' AS TIMESTAMP))");
+ "0, CAST(-1.11 AS DOUBLE), CAST('2022-01-02' AS DATE), CAST('2022-01-01 02:00:00' AS TIMESTAMP), 'c', 'ccc')");

List<Row> result = batchSql("SELECT * FROM T3");
assertThat(result)
Expand All @@ -437,7 +443,9 @@ public void testMergeRead() {
(float) -1.11,
-1.11,
LocalDate.of(2020, 1, 1),
LocalDateTime.of(2021, 1, 1, 1, 1, 1)));
LocalDateTime.of(2021, 1, 1, 1, 1, 1),
"a",
"aaa"));
}

@Test
Expand All @@ -449,29 +457,29 @@ public void testMergeCompaction() {
batchSql(
"INSERT INTO T3 VALUES "
+ "(1, 2, CAST(NULL AS INT), 1.01, CAST(1 AS TINYINT), CAST(-1 AS SMALLINT), CAST(1000 AS BIGINT), "
+ "1.11, CAST(1.11 AS DOUBLE), CAST('2020-01-01' AS DATE), CAST('2021-01-01 01:01:01' AS TIMESTAMP))");
+ "1.11, CAST(1.11 AS DOUBLE), CAST('2020-01-01' AS DATE), CAST('2021-01-01 01:01:01' AS TIMESTAMP), 'a', 'aaa')");
batchSql(
"INSERT INTO T3 VALUES "
+ "(1, 2, 2, 1.10, CAST(2 AS TINYINT), CAST(2 AS SMALLINT), CAST(100000 AS BIGINT), "
+ "-1.11, CAST(1.21 AS DOUBLE), CAST('2020-01-02' AS DATE), CAST('2022-01-01 01:01:01' AS TIMESTAMP))");
+ "-1.11, CAST(1.21 AS DOUBLE), CAST('2020-01-02' AS DATE), CAST('2022-01-01 01:01:01' AS TIMESTAMP), 'b', 'bbb')");
batchSql(
"INSERT INTO T3 VALUES "
+ "(1, 2, 3, 10.00, CAST(-1 AS TINYINT), CAST(1 AS SMALLINT), CAST(10000000 AS BIGINT), "
+ "0, CAST(-1.11 AS DOUBLE), CAST('2022-01-02' AS DATE), CAST('2022-01-01 02:00:00' AS TIMESTAMP))");
+ "0, CAST(-1.11 AS DOUBLE), CAST('2022-01-02' AS DATE), CAST('2022-01-01 02:00:00' AS TIMESTAMP), 'c', 'ccc')");

// key 1 3
batchSql(
"INSERT INTO T3 VALUES "
+ "(1, 3, CAST(NULL AS INT), 1.01, CAST(1 AS TINYINT), CAST(-1 AS SMALLINT), CAST(1000 AS BIGINT), "
+ "1.11, CAST(1.11 AS DOUBLE), CAST('2020-01-01' AS DATE), CAST('2021-01-01 01:01:01' AS TIMESTAMP))");
+ "1.11, CAST(1.11 AS DOUBLE), CAST('2020-01-01' AS DATE), CAST('2021-01-01 01:01:01' AS TIMESTAMP), 'a', 'aaa')");
batchSql(
"INSERT INTO T3 VALUES "
+ "(1, 3, 6, 1.10, CAST(2 AS TINYINT), CAST(2 AS SMALLINT), CAST(100000 AS BIGINT), "
+ "-1.11, CAST(1.21 AS DOUBLE), CAST('2020-01-02' AS DATE), CAST('2022-01-01 01:01:01' AS TIMESTAMP))");
+ "-1.11, CAST(1.21 AS DOUBLE), CAST('2020-01-02' AS DATE), CAST('2022-01-01 01:01:01' AS TIMESTAMP), 'b', 'bbb')");
batchSql(
"INSERT INTO T3 VALUES "
+ "(1, 3, 3, 10.00, CAST(-1 AS TINYINT), CAST(1 AS SMALLINT), CAST(10000000 AS BIGINT), "
+ "0, CAST(-1.11 AS DOUBLE), CAST('2022-01-02' AS DATE), CAST('2022-01-01 02:00:00' AS TIMESTAMP))");
+ "0, CAST(-1.11 AS DOUBLE), CAST('2022-01-02' AS DATE), CAST('2022-01-01 02:00:00' AS TIMESTAMP), 'c', 'ccc')");

assertThat(batchSql("SELECT * FROM T3"))
.containsExactlyInAnyOrder(
Expand All @@ -486,7 +494,9 @@ public void testMergeCompaction() {
(float) -1.11,
-1.11,
LocalDate.of(2020, 1, 1),
LocalDateTime.of(2021, 1, 1, 1, 1, 1)),
LocalDateTime.of(2021, 1, 1, 1, 1, 1),
"a",
"aaa"),
Row.of(
1,
3,
Expand All @@ -498,7 +508,9 @@ public void testMergeCompaction() {
(float) -1.11,
-1.11,
LocalDate.of(2020, 1, 1),
LocalDateTime.of(2021, 1, 1, 1, 1, 1)));
LocalDateTime.of(2021, 1, 1, 1, 1, 1),
"a",
"aaa"));
}

@Test
Expand All @@ -525,6 +537,8 @@ protected List<String> ddl() {
+ "h DOUBLE,"
+ "i DATE,"
+ "l TIMESTAMP,"
+ "m CHAR,"
+ "n VARCHAR,"
+ "PRIMARY KEY (j,k) NOT ENFORCED)"
+ " WITH ('merge-engine'='aggregation', "
+ "'fields.a.aggregate-function'='max', "
Expand All @@ -535,7 +549,9 @@ protected List<String> ddl() {
+ "'fields.f.aggregate-function'='max',"
+ "'fields.h.aggregate-function'='max',"
+ "'fields.i.aggregate-function'='max',"
+ "'fields.l.aggregate-function'='max'"
+ "'fields.l.aggregate-function'='max',"
+ "'fields.m.aggregate-function'='max',"
+ "'fields.n.aggregate-function'='max'"
+ ");");
}

Expand All @@ -545,13 +561,13 @@ public void testMergeInMemory() {
"INSERT INTO T2 VALUES "
+ "(1, 2, CAST(NULL AS INT), 1.01, CAST(1 AS TINYINT), CAST(-1 AS SMALLINT), "
+ "CAST(1000 AS BIGINT), 1.11, CAST(1.11 AS DOUBLE), CAST('2020-01-01' AS DATE), "
+ "CAST('2021-01-01 01:01:01' AS TIMESTAMP)),"
+ "CAST('2021-01-01 01:01:01' AS TIMESTAMP), 'a', 'aaa'),"
+ "(1, 2, 2, 1.10, CAST(2 AS TINYINT), CAST(2 AS SMALLINT), CAST(100000 AS BIGINT), "
+ "-1.11, CAST(1.21 AS DOUBLE), CAST('2020-01-02' AS DATE), "
+ "CAST('2022-01-01 01:01:01' AS TIMESTAMP)), "
+ "CAST('2022-01-01 01:01:01' AS TIMESTAMP), 'b', 'bbb'), "
+ "(1, 2, 3, 10.00, CAST(1 AS TINYINT), CAST(1 AS SMALLINT), CAST(10000000 AS BIGINT), "
+ "0, CAST(-1.11 AS DOUBLE), CAST('2022-01-02' AS DATE), "
+ "CAST('2022-01-01 02:00:00' AS TIMESTAMP))");
+ "CAST('2022-01-01 02:00:00' AS TIMESTAMP), 'c', 'ccc')");
List<Row> result = batchSql("SELECT * FROM T2");
assertThat(result)
.containsExactlyInAnyOrder(
Expand All @@ -566,7 +582,9 @@ public void testMergeInMemory() {
(float) 1.11,
1.21,
LocalDate.of(2022, 1, 2),
LocalDateTime.of(2022, 1, 1, 2, 0, 0)));
LocalDateTime.of(2022, 1, 1, 2, 0, 0),
"c",
"ccc"));
}

@Test
Expand All @@ -575,17 +593,17 @@ public void testMergeRead() {
"INSERT INTO T2 VALUES "
+ "(1, 2, CAST(NULL AS INT), 1.01, CAST(1 AS TINYINT), CAST(-1 AS SMALLINT), CAST(1000 AS BIGINT), "
+ "1.11, CAST(1.11 AS DOUBLE), CAST('2020-01-01' AS DATE), "
+ "CAST('2021-01-01 01:01:01' AS TIMESTAMP))");
+ "CAST('2021-01-01 01:01:01' AS TIMESTAMP), 'a', 'aaa')");
batchSql(
"INSERT INTO T2 VALUES "
+ "(1, 2, 2, 1.10, CAST(2 AS TINYINT), CAST(2 AS SMALLINT), CAST(100000 AS BIGINT), -1.11, "
+ "CAST(1.21 AS DOUBLE), CAST('2020-01-02' AS DATE), "
+ "CAST('2022-01-01 01:01:01' AS TIMESTAMP))");
+ "CAST('2022-01-01 01:01:01' AS TIMESTAMP), 'b', 'bbb')");
batchSql(
"INSERT INTO T2 VALUES "
+ "(1, 2, 3, 10.00, CAST(1 AS TINYINT), CAST(1 AS SMALLINT), CAST(10000000 AS BIGINT), 0, "
+ "CAST(-1.11 AS DOUBLE), CAST('2022-01-02' AS DATE), "
+ "CAST('2022-01-01 02:00:00' AS TIMESTAMP))");
+ "CAST('2022-01-01 02:00:00' AS TIMESTAMP), 'c', 'ccc')");

List<Row> result = batchSql("SELECT * FROM T2");
assertThat(result)
Expand All @@ -601,7 +619,9 @@ public void testMergeRead() {
(float) 1.11,
1.21,
LocalDate.of(2022, 1, 2),
LocalDateTime.of(2022, 1, 1, 2, 0, 0)));
LocalDateTime.of(2022, 1, 1, 2, 0, 0),
"c",
"ccc"));
}

@Test
Expand All @@ -613,29 +633,29 @@ public void testMergeCompaction() {
batchSql(
"INSERT INTO T2 VALUES "
+ "(1, 2, CAST(NULL AS INT), 1.01, CAST(1 AS TINYINT), CAST(-1 AS SMALLINT), CAST(1000 AS BIGINT), "
+ "1.11, CAST(1.11 AS DOUBLE), CAST('2020-01-01' AS DATE), CAST('2021-01-01 01:01:01' AS TIMESTAMP))");
+ "1.11, CAST(1.11 AS DOUBLE), CAST('2020-01-01' AS DATE), CAST('2021-01-01 01:01:01' AS TIMESTAMP), 'a', 'aaa')");
batchSql(
"INSERT INTO T2 VALUES "
+ "(1, 2, 2, 1.10, CAST(2 AS TINYINT), CAST(2 AS SMALLINT), CAST(100000 AS BIGINT), -1.11, "
+ "CAST(1.21 AS DOUBLE), CAST('2020-01-02' AS DATE), CAST('2022-01-01 01:01:01' AS TIMESTAMP))");
+ "CAST(1.21 AS DOUBLE), CAST('2020-01-02' AS DATE), CAST('2022-01-01 01:01:01' AS TIMESTAMP), 'c', 'ccc')");
batchSql(
"INSERT INTO T2 VALUES "
+ "(1, 2, 3, 10.00, CAST(1 AS TINYINT), CAST(1 AS SMALLINT), CAST(10000000 AS BIGINT), 0, "
+ "CAST(-1.11 AS DOUBLE), CAST('2022-01-02' AS DATE), CAST('2022-01-01 02:00:00' AS TIMESTAMP))");
+ "CAST(-1.11 AS DOUBLE), CAST('2022-01-02' AS DATE), CAST('2022-01-01 02:00:00' AS TIMESTAMP), 'b', 'bbb')");

// key 1 3
batchSql(
"INSERT INTO T2 VALUES "
+ "(1, 3, CAST(NULL AS INT), 1.01, CAST(1 AS TINYINT), CAST(-1 AS SMALLINT), CAST(1000 AS BIGINT), "
+ "1.11, CAST(1.11 AS DOUBLE), CAST('2020-01-01' AS DATE), CAST('2021-01-01 01:01:01' AS TIMESTAMP))");
+ "1.11, CAST(1.11 AS DOUBLE), CAST('2020-01-01' AS DATE), CAST('2021-01-01 01:01:01' AS TIMESTAMP), 'a', 'aaa')");
batchSql(
"INSERT INTO T2 VALUES "
+ "(1, 3, 6, 1.10, CAST(2 AS TINYINT), CAST(2 AS SMALLINT), CAST(100000 AS BIGINT), -1.11, "
+ "CAST(1.21 AS DOUBLE), CAST('2020-01-02' AS DATE), CAST('2022-01-01 01:01:01' AS TIMESTAMP))");
+ "CAST(1.21 AS DOUBLE), CAST('2020-01-02' AS DATE), CAST('2022-01-01 01:01:01' AS TIMESTAMP), 'c', 'ccc')");
batchSql(
"INSERT INTO T2 VALUES "
+ "(1, 3, 3, 10.00, CAST(1 AS TINYINT), CAST(1 AS SMALLINT), CAST(10000000 AS BIGINT), 0, "
+ "CAST(-1.11 AS DOUBLE), CAST('2022-01-02' AS DATE), CAST('2022-01-01 02:00:00' AS TIMESTAMP))");
+ "CAST(-1.11 AS DOUBLE), CAST('2022-01-02' AS DATE), CAST('2022-01-01 02:00:00' AS TIMESTAMP), 'b', 'bbb')");

assertThat(batchSql("SELECT * FROM T2"))
.containsExactlyInAnyOrder(
Expand All @@ -650,7 +670,9 @@ public void testMergeCompaction() {
(float) 1.11,
1.21,
LocalDate.of(2022, 1, 2),
LocalDateTime.of(2022, 1, 1, 2, 0, 0)),
LocalDateTime.of(2022, 1, 1, 2, 0, 0),
"c",
"ccc"),
Row.of(
1,
3,
Expand All @@ -662,7 +684,9 @@ public void testMergeCompaction() {
(float) 1.11,
1.21,
LocalDate.of(2022, 1, 2),
LocalDateTime.of(2022, 1, 1, 2, 0, 0)));
LocalDateTime.of(2022, 1, 1, 2, 0, 0),
"c",
"ccc" + ""));
}

@Test
Expand Down

0 comments on commit 40d373b

Please sign in to comment.