Skip to content

Commit ef2eada

Browse files
Xiaojian Sunsunxiaojian
authored andcommitted
Introduce version merge engine for primary key table
1 parent 92c6d23 commit ef2eada

File tree

15 files changed

+924
-120
lines changed

15 files changed

+924
-120
lines changed

fluss-client/src/test/java/com/alibaba/fluss/client/table/FlussTableITCase.java

Lines changed: 69 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,8 @@
7272
import static com.alibaba.fluss.record.TestData.DATA1_TABLE_INFO_PK;
7373
import static com.alibaba.fluss.record.TestData.DATA1_TABLE_PATH;
7474
import static com.alibaba.fluss.record.TestData.DATA1_TABLE_PATH_PK;
75+
import static com.alibaba.fluss.record.TestData.DATA3_SCHEMA_PK;
76+
import static com.alibaba.fluss.record.TestData.DATA3_TABLE_PATH_PK;
7577
import static com.alibaba.fluss.testutils.DataTestUtils.assertRowValueEquals;
7678
import static com.alibaba.fluss.testutils.DataTestUtils.compactedRow;
7779
import static com.alibaba.fluss.testutils.DataTestUtils.keyRow;
@@ -884,7 +886,7 @@ void testFirstRowMergeEngine() throws Exception {
884886
TableDescriptor tableDescriptor =
885887
TableDescriptor.builder()
886888
.schema(DATA1_SCHEMA_PK)
887-
.property(ConfigOptions.TABLE_MERGE_ENGINE, MergeEngine.FIRST_ROW)
889+
.property(ConfigOptions.TABLE_MERGE_ENGINE, MergeEngine.Type.FIRST_ROW)
888890
.build();
889891
RowType rowType = DATA1_SCHEMA_PK.toRowType();
890892
createTable(DATA1_TABLE_PATH_PK, tableDescriptor, false);
@@ -901,7 +903,6 @@ void testFirstRowMergeEngine() throws Exception {
901903
expectedRows.add(compactedRow(rowType, new Object[] {id, "value_0"}));
902904
}
903905
upsertWriter.flush();
904-
905906
// now, get rows by lookup
906907
for (int id = 0; id < rows; id++) {
907908
InternalRow gotRow =
@@ -910,6 +911,55 @@ void testFirstRowMergeEngine() throws Exception {
910911
.getRow();
911912
assertThatRow(gotRow).withSchema(rowType).isEqualTo(expectedRows.get(id));
912913
}
914+
// check scan change log
915+
LogScanner logScanner = table.getLogScanner(new LogScan());
916+
logScanner.subscribeFromBeginning(0);
917+
List<ScanRecord> actualLogRecords = new ArrayList<>(0);
918+
while (actualLogRecords.size() < rows) {
919+
ScanRecords scanRecords = logScanner.poll(Duration.ofSeconds(1));
920+
scanRecords.forEach(actualLogRecords::add);
921+
}
922+
assertThat(actualLogRecords).hasSize(rows);
923+
for (int i = 0; i < actualLogRecords.size(); i++) {
924+
ScanRecord scanRecord = actualLogRecords.get(i);
925+
assertThat(scanRecord.getRowKind()).isEqualTo(RowKind.INSERT);
926+
assertThatRow(scanRecord.getRow())
927+
.withSchema(rowType)
928+
.isEqualTo(expectedRows.get(i));
929+
}
930+
}
931+
}
932+
933+
@Test
934+
void testMergeEngineWithVersion() throws Exception {
935+
// Create table.
936+
TableDescriptor tableDescriptor =
937+
TableDescriptor.builder()
938+
.schema(DATA3_SCHEMA_PK)
939+
.property(ConfigOptions.TABLE_MERGE_ENGINE, MergeEngine.Type.VERSION)
940+
.property(ConfigOptions.TABLE_MERGE_ENGINE_VERSION_COLUMN, "b")
941+
.build();
942+
RowType rowType = DATA3_SCHEMA_PK.toRowType();
943+
createTable(DATA3_TABLE_PATH_PK, tableDescriptor, false);
944+
945+
int rows = 3;
946+
try (Table table = conn.getTable(DATA3_TABLE_PATH_PK)) {
947+
// put rows.
948+
UpsertWriter upsertWriter = table.getUpsertWriter();
949+
List<InternalRow> expectedRows = new ArrayList<>(rows);
950+
// init rows.
951+
for (int row = 0; row < rows; row++) {
952+
upsertWriter.upsert(compactedRow(rowType, new Object[] {row, 1000L}));
953+
expectedRows.add(compactedRow(rowType, new Object[] {row, 1000L}));
954+
}
955+
// update row if id=0 and version < 1000L, will not update
956+
upsertWriter.upsert(compactedRow(rowType, new Object[] {0, 999L}));
957+
958+
// update if version> 1000L
959+
upsertWriter.upsert(compactedRow(rowType, new Object[] {1, 1001L}));
960+
rows = rows + 2;
961+
962+
upsertWriter.flush();
913963

914964
// check scan change log
915965
LogScanner logScanner = table.getLogScanner(new LogScan());
@@ -922,13 +972,29 @@ void testFirstRowMergeEngine() throws Exception {
922972
}
923973

924974
assertThat(actualLogRecords).hasSize(rows);
925-
for (int i = 0; i < actualLogRecords.size(); i++) {
975+
for (int i = 0; i < 3; i++) {
926976
ScanRecord scanRecord = actualLogRecords.get(i);
927977
assertThat(scanRecord.getRowKind()).isEqualTo(RowKind.INSERT);
928978
assertThatRow(scanRecord.getRow())
929979
.withSchema(rowType)
930980
.isEqualTo(expectedRows.get(i));
931981
}
982+
983+
// update_before for id =1
984+
List<ScanRecord> updateActualLogRecords = new ArrayList<>(actualLogRecords);
985+
986+
ScanRecord beforeRecord = updateActualLogRecords.get(3);
987+
assertThat(beforeRecord.getRowKind()).isEqualTo(RowKind.UPDATE_BEFORE);
988+
assertThat(beforeRecord.getRow().getFieldCount()).isEqualTo(2);
989+
assertThat(beforeRecord.getRow().getInt(0)).isEqualTo(1);
990+
assertThat(beforeRecord.getRow().getLong(1)).isEqualTo(1000);
991+
992+
// update_after for id =1
993+
ScanRecord afterRecord = updateActualLogRecords.get(4);
994+
assertThat(afterRecord.getRowKind()).isEqualTo(RowKind.UPDATE_AFTER);
995+
assertThat(afterRecord.getRow().getFieldCount()).isEqualTo(2);
996+
assertThat(afterRecord.getRow().getInt(0)).isEqualTo(1);
997+
assertThat(afterRecord.getRow().getLong(1)).isEqualTo(1001);
932998
}
933999
}
9341000
}

fluss-common/src/main/java/com/alibaba/fluss/config/ConfigOptions.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -969,12 +969,18 @@ public class ConfigOptions {
969969
+ "When this option is set to ture and the datalake tiering service is up,"
970970
+ " the table will be tiered and compacted into datalake format stored on lakehouse storage.");
971971

972-
public static final ConfigOption<MergeEngine> TABLE_MERGE_ENGINE =
972+
public static final ConfigOption<MergeEngine.Type> TABLE_MERGE_ENGINE =
973973
key("table.merge-engine")
974-
.enumType(MergeEngine.class)
974+
.enumType(MergeEngine.Type.class)
975975
.noDefaultValue()
976976
.withDescription("The merge engine for the primary key table.");
977977

978+
public static final ConfigOption<String> TABLE_MERGE_ENGINE_VERSION_COLUMN =
979+
key("table.merge-engine.version.column")
980+
.stringType()
981+
.noDefaultValue()
982+
.withDescription("The merge engine version column for the primary key table.");
983+
978984
// ------------------------------------------------------------------------
979985
// ConfigOptions for Kv
980986
// ------------------------------------------------------------------------
Lines changed: 121 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,11 @@
11
/*
2-
* Licensed to the Apache Software Foundation (ASF) under one
3-
* or more contributor license agreements. See the NOTICE file
4-
* distributed with this work for additional information
5-
* regarding copyright ownership. The ASF licenses this file
6-
* to you under the Apache License, Version 2.0 (the
7-
* "License"); you may not use this file except in compliance
8-
* with the License. You may obtain a copy of the License at
2+
* Copyright (c) 2024 Alibaba Group Holding Ltd.
93
*
10-
* http://www.apache.org/licenses/LICENSE-2.0
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
119
*
1210
* Unless required by applicable law or agreed to in writing, software
1311
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,22 +16,125 @@
1816

1917
package com.alibaba.fluss.metadata;
2018

21-
/**
22-
* The merge engine for primary key table.
23-
*
24-
* @since 0.6
25-
*/
26-
public enum MergeEngine {
27-
FIRST_ROW("first_row");
19+
import com.alibaba.fluss.config.ConfigOptions;
20+
import com.alibaba.fluss.config.Configuration;
21+
import com.alibaba.fluss.shaded.guava32.com.google.common.collect.Sets;
22+
import com.alibaba.fluss.types.BigIntType;
23+
import com.alibaba.fluss.types.DataType;
24+
import com.alibaba.fluss.types.IntType;
25+
import com.alibaba.fluss.types.LocalZonedTimestampType;
26+
import com.alibaba.fluss.types.RowType;
27+
import com.alibaba.fluss.types.TimeType;
28+
import com.alibaba.fluss.types.TimestampType;
29+
30+
import java.util.Map;
31+
import java.util.Objects;
32+
import java.util.Set;
2833

29-
private final String value;
34+
/** The merge engine for primary key table. */
35+
public class MergeEngine {
36+
37+
public static final Set<String> VERSION_SUPPORTED_DATA_TYPES =
38+
Sets.newHashSet(
39+
BigIntType.class.getName(),
40+
IntType.class.getName(),
41+
TimestampType.class.getName(),
42+
TimeType.class.getName(),
43+
LocalZonedTimestampType.class.getName());
44+
private final Type type;
45+
private final String column;
46+
47+
private MergeEngine(Type type) {
48+
this(type, null);
49+
}
50+
51+
private MergeEngine(Type type, String column) {
52+
this.type = type;
53+
this.column = column;
54+
}
55+
56+
public static MergeEngine create(Map<String, String> properties) {
57+
return create(properties, null);
58+
}
59+
60+
public static MergeEngine create(Map<String, String> properties, RowType rowType) {
61+
return create(Configuration.fromMap(properties), rowType);
62+
}
3063

31-
MergeEngine(String value) {
32-
this.value = value;
64+
public static MergeEngine create(Configuration options, RowType rowType) {
65+
if (options == null) {
66+
return null;
67+
}
68+
MergeEngine.Type type = options.get(ConfigOptions.TABLE_MERGE_ENGINE);
69+
if (type == null) {
70+
return null;
71+
}
72+
73+
switch (type) {
74+
case FIRST_ROW:
75+
return new MergeEngine(Type.FIRST_ROW);
76+
case VERSION:
77+
String column = options.get(ConfigOptions.TABLE_MERGE_ENGINE_VERSION_COLUMN);
78+
if (column == null) {
79+
throw new IllegalArgumentException(
80+
"When the merge engine is set to version, the 'table.merge-engine.version.column' cannot be empty.");
81+
}
82+
if (rowType != null) {
83+
int fieldIndex = rowType.getFieldIndex(column);
84+
if (fieldIndex == -1) {
85+
throw new IllegalArgumentException(
86+
String.format(
87+
"When the merge engine is set to version, the column %s does not exist.",
88+
column));
89+
}
90+
DataType dataType = rowType.getTypeAt(fieldIndex);
91+
if (!VERSION_SUPPORTED_DATA_TYPES.contains(dataType.getClass().getName())) {
92+
throw new IllegalArgumentException(
93+
String.format(
94+
"The merge engine column is not support type %s .",
95+
dataType.asSummaryString()));
96+
}
97+
}
98+
return new MergeEngine(Type.VERSION, column);
99+
default:
100+
throw new UnsupportedOperationException("Unsupported merge engine: " + type);
101+
}
102+
}
103+
104+
public Type getType() {
105+
return type;
106+
}
107+
108+
public String getColumn() {
109+
return column;
110+
}
111+
112+
public enum Type {
113+
FIRST_ROW("first_row"),
114+
VERSION("version");
115+
private final String value;
116+
117+
Type(String value) {
118+
this.value = value;
119+
}
120+
121+
@Override
122+
public String toString() {
123+
return value;
124+
}
125+
}
126+
127+
@Override
128+
public boolean equals(Object o) {
129+
if (o == null || getClass() != o.getClass()) {
130+
return false;
131+
}
132+
MergeEngine that = (MergeEngine) o;
133+
return type == that.type && Objects.equals(column, that.column);
33134
}
34135

35136
@Override
36-
public String toString() {
37-
return value;
137+
public int hashCode() {
138+
return Objects.hash(type, column);
38139
}
39140
}

fluss-common/src/main/java/com/alibaba/fluss/metadata/TableDescriptor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -281,7 +281,7 @@ public boolean isDataLakeEnabled() {
281281
}
282282

283283
public @Nullable MergeEngine getMergeEngine() {
284-
return configuration().get(ConfigOptions.TABLE_MERGE_ENGINE);
284+
return MergeEngine.create(configuration(), schema.toRowType());
285285
}
286286

287287
public TableDescriptor copy(Map<String, String> newProperties) {

fluss-common/src/test/java/com/alibaba/fluss/record/TestData.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,4 +187,18 @@ public final class TestData {
187187
TableDescriptor.builder().schema(DATA2_SCHEMA).distributedBy(3, "a").build(),
188188
1);
189189
// -------------------------------- data2 info end ------------------------------------
190+
191+
// ------------------- data3 and related table info begin ----------------------
192+
public static final Schema DATA3_SCHEMA_PK =
193+
Schema.newBuilder()
194+
.column("a", DataTypes.INT())
195+
.withComment("a is first column")
196+
.column("b", DataTypes.BIGINT())
197+
.withComment("b is second column")
198+
.primaryKey("a")
199+
.build();
200+
public static final TablePath DATA3_TABLE_PATH_PK =
201+
TablePath.of("test_db_3", "test_pk_table_3");
202+
// ---------------------------- data3 table info end ------------------------------
203+
190204
}

fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/catalog/FlinkTableFactory.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
import com.alibaba.fluss.connector.flink.sink.FlinkTableSink;
2424
import com.alibaba.fluss.connector.flink.source.FlinkTableSource;
2525
import com.alibaba.fluss.connector.flink.utils.FlinkConnectorOptionsUtils;
26+
import com.alibaba.fluss.connector.flink.utils.FlinkConversions;
27+
import com.alibaba.fluss.metadata.MergeEngine;
2628
import com.alibaba.fluss.metadata.TablePath;
2729

2830
import org.apache.flink.api.common.RuntimeExecutionMode;
@@ -129,7 +131,7 @@ public DynamicTableSource createDynamicTableSource(Context context) {
129131
cache,
130132
partitionDiscoveryIntervalMs,
131133
tableOptions.get(toFlinkOption(ConfigOptions.TABLE_DATALAKE_ENABLED)),
132-
tableOptions.get(toFlinkOption(ConfigOptions.TABLE_MERGE_ENGINE)));
134+
MergeEngine.create(helper.getOptions().toMap()));
133135
}
134136

135137
@Override
@@ -150,7 +152,8 @@ public DynamicTableSink createDynamicTableSink(Context context) {
150152
rowType,
151153
context.getPrimaryKeyIndexes(),
152154
isStreamingMode,
153-
tableOptions.get(toFlinkOption(ConfigOptions.TABLE_MERGE_ENGINE)));
155+
MergeEngine.create(
156+
helper.getOptions().toMap(), FlinkConversions.toFlussRowType(rowType)));
154157
}
155158

156159
@Override

0 commit comments

Comments
 (0)