Skip to content

Commit eb3f554

Browse files
Load: when table exists in IoTDB, make sure the existing/incoming ID columns are the prefix of the incoming/existing ID columns (#14341)
1 parent 0d22f2b commit eb3f554

File tree

13 files changed

+377
-10
lines changed

13 files changed

+377
-10
lines changed

integration-test/src/test/java/org/apache/iotdb/db/it/utils/TestUtils.java

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1254,6 +1254,48 @@ public static void assertDataAlwaysOnEnv(
12541254
}
12551255
}
12561256

1257+
public static void assertDataAlwaysOnEnv(
1258+
BaseEnv env,
1259+
String sql,
1260+
String expectedHeader,
1261+
Set<String> expectedResSet,
1262+
long consistentSeconds,
1263+
String database,
1264+
Consumer<String> handleFailure) {
1265+
try (Connection connection = env.getConnection();
1266+
Statement statement = connection.createStatement()) {
1267+
// Keep retrying if there are execution failures
1268+
await()
1269+
.pollInSameThread()
1270+
.pollDelay(1L, TimeUnit.SECONDS)
1271+
.pollInterval(1L, TimeUnit.SECONDS)
1272+
.atMost(consistentSeconds, TimeUnit.SECONDS)
1273+
.failFast(
1274+
() -> {
1275+
try {
1276+
if (database != null) {
1277+
statement.execute("use " + database);
1278+
}
1279+
TestUtils.assertResultSetEqual(
1280+
executeQueryWithRetry(statement, sql), expectedHeader, expectedResSet);
1281+
} catch (Exception e) {
1282+
if (handleFailure != null) {
1283+
handleFailure.accept(e.getMessage());
1284+
}
1285+
Assert.fail();
1286+
} catch (Error e) {
1287+
if (handleFailure != null) {
1288+
handleFailure.accept(e.getMessage());
1289+
}
1290+
throw e;
1291+
}
1292+
});
1293+
} catch (Exception e) {
1294+
e.printStackTrace();
1295+
fail();
1296+
}
1297+
}
1298+
12571299
public static void restartDataNodes() {
12581300
EnvFactory.getEnv().shutdownAllDataNodes();
12591301
EnvFactory.getEnv().startAllDataNodes();

integration-test/src/test/java/org/apache/iotdb/pipe/it/tablemodel/IoTDBPipeWithLoadIT.java

Lines changed: 246 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
2929
import org.apache.iotdb.it.framework.IoTDBTestRunner;
3030
import org.apache.iotdb.itbase.category.MultiClusterIT2TableModel;
31+
import org.apache.iotdb.itbase.env.BaseEnv;
3132
import org.apache.iotdb.rpc.TSStatusCode;
3233

3334
import org.junit.Assert;
@@ -37,10 +38,15 @@
3738
import org.junit.experimental.categories.Category;
3839
import org.junit.runner.RunWith;
3940

41+
import java.sql.Connection;
42+
import java.sql.Statement;
4043
import java.util.HashMap;
4144
import java.util.Map;
45+
import java.util.Set;
4246
import java.util.function.Consumer;
4347

48+
import static org.junit.Assert.fail;
49+
4450
@RunWith(IoTDBTestRunner.class)
4551
@Category({MultiClusterIT2TableModel.class})
4652
public class IoTDBPipeWithLoadIT extends AbstractPipeTableModelTestIT {
@@ -124,4 +130,244 @@ public void testReceiverNotLoadDeletedTimeseries() throws Exception {
124130
TableModelUtils.assertCountData("test", "test", 50, receiverEnv, handleFailure);
125131
}
126132
}
133+
134+
// Test that receiver will not load data when table exists but ID columns mismatch
135+
@Test
136+
public void testReceiverNotLoadWhenIdColumnMismatch() throws Exception {
137+
final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
138+
final String receiverIp = receiverDataNode.getIp();
139+
final int receiverPort = receiverDataNode.getPort();
140+
final Consumer<String> handleFailure =
141+
o -> {
142+
TestUtils.executeNonQueryWithRetry(senderEnv, "flush");
143+
TestUtils.executeNonQueryWithRetry(receiverEnv, "flush");
144+
};
145+
146+
final Map<String, String> extractorAttributes = new HashMap<>();
147+
final Map<String, String> processorAttributes = new HashMap<>();
148+
final Map<String, String> connectorAttributes = new HashMap<>();
149+
150+
extractorAttributes.put("capture.table", "true");
151+
extractorAttributes.put("extractor.realtime.mode", "file");
152+
153+
connectorAttributes.put("connector.batch.enable", "false");
154+
connectorAttributes.put("connector.ip", receiverIp);
155+
connectorAttributes.put("connector.port", Integer.toString(receiverPort));
156+
157+
try (final SyncConfigNodeIServiceClient client =
158+
(SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
159+
try (Connection connection = senderEnv.getConnection(BaseEnv.TABLE_SQL_DIALECT);
160+
Statement statement = connection.createStatement()) {
161+
statement.execute("create database if not exists db");
162+
statement.execute("use db");
163+
statement.execute(
164+
"create table if not exists t1(id1 STRING ID, id2 STRING ID, s1 TEXT MEASUREMENT, s2 INT32 MEASUREMENT)");
165+
statement.execute("INSERT INTO t1(time,id1,id2,s1,s2) values(1, 'd1', 'd2', 'red', 1)");
166+
statement.execute("INSERT INTO t1(time,id1,id2,s1,s2) values(2, 'd1', 'd2', 'blue', 2)");
167+
statement.execute("flush");
168+
} catch (Exception e) {
169+
fail(e.getMessage());
170+
}
171+
172+
try (Connection connection = receiverEnv.getConnection(BaseEnv.TABLE_SQL_DIALECT);
173+
Statement statement = connection.createStatement()) {
174+
statement.execute("create database if not exists db");
175+
statement.execute("use db");
176+
statement.execute(
177+
"create table if not exists t1(id3 STRING ID, id4 STRING ID, s3 TEXT MEASUREMENT, s4 INT32 MEASUREMENT)");
178+
statement.execute("INSERT INTO t1(time,id3,id4,s3,s4) values(1, 'd3', 'd4', 'red2', 10)");
179+
statement.execute("INSERT INTO t1(time,id3,id4,s3,s4) values(2, 'd3', 'd4', 'blue2', 20)");
180+
statement.execute("flush");
181+
} catch (Exception e) {
182+
fail(e.getMessage());
183+
}
184+
185+
TSStatus status =
186+
client.createPipe(
187+
new TCreatePipeReq("p1", connectorAttributes)
188+
.setExtractorAttributes(extractorAttributes)
189+
.setProcessorAttributes(processorAttributes));
190+
Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
191+
Assert.assertEquals(
192+
TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("p1").getCode());
193+
194+
try {
195+
// wait some time
196+
Thread.sleep(10_000);
197+
} catch (InterruptedException e) {
198+
e.printStackTrace();
199+
}
200+
201+
Set<String> expectedResSet = new java.util.HashSet<>();
202+
expectedResSet.add("1970-01-01T00:00:00.002Z,d3,d4,blue2,20,");
203+
expectedResSet.add("1970-01-01T00:00:00.001Z,d3,d4,red2,10,");
204+
// make sure data are not transferred
205+
TestUtils.assertDataEventuallyOnEnv(
206+
receiverEnv,
207+
"select * from t1",
208+
"time,id3,id4,s3,s4,",
209+
expectedResSet,
210+
"db",
211+
handleFailure);
212+
}
213+
}
214+
215+
// Test that receiver can load data when table exists and existing ID columns are the prefix of
216+
// incoming ID columns
217+
@Test
218+
public void testReceiverAutoExtendIdColumn() throws Exception {
219+
final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
220+
final String receiverIp = receiverDataNode.getIp();
221+
final int receiverPort = receiverDataNode.getPort();
222+
final Consumer<String> handleFailure =
223+
o -> {
224+
TestUtils.executeNonQueryWithRetry(senderEnv, "flush");
225+
TestUtils.executeNonQueryWithRetry(receiverEnv, "flush");
226+
};
227+
228+
final Map<String, String> extractorAttributes = new HashMap<>();
229+
final Map<String, String> processorAttributes = new HashMap<>();
230+
final Map<String, String> connectorAttributes = new HashMap<>();
231+
232+
extractorAttributes.put("capture.table", "true");
233+
extractorAttributes.put("extractor.realtime.mode", "file");
234+
235+
connectorAttributes.put("connector.batch.enable", "false");
236+
connectorAttributes.put("connector.ip", receiverIp);
237+
connectorAttributes.put("connector.port", Integer.toString(receiverPort));
238+
239+
try (final SyncConfigNodeIServiceClient client =
240+
(SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
241+
try (Connection connection = senderEnv.getConnection(BaseEnv.TABLE_SQL_DIALECT);
242+
Statement statement = connection.createStatement()) {
243+
statement.execute("create database if not exists db");
244+
statement.execute("use db");
245+
statement.execute(
246+
"create table if not exists t1(id1 STRING ID, id2 STRING ID, id3 STRING ID, s1 TEXT MEASUREMENT, s2 INT32 MEASUREMENT)");
247+
statement.execute(
248+
"INSERT INTO t1(time,id1,id2,id3,s1,s2) values(1, 'd1', 'd2', 'd3', 'red', 1)");
249+
statement.execute(
250+
"INSERT INTO t1(time,id1,id2,id3,s1,s2) values(2, 'd1', 'd2', 'd3', 'blue', 2)");
251+
statement.execute("flush");
252+
} catch (Exception e) {
253+
fail(e.getMessage());
254+
}
255+
256+
try (Connection connection = receiverEnv.getConnection(BaseEnv.TABLE_SQL_DIALECT);
257+
Statement statement = connection.createStatement()) {
258+
statement.execute("create database if not exists db");
259+
statement.execute("use db");
260+
statement.execute(
261+
"create table if not exists t1(id1 STRING ID, id2 STRING ID, s3 TEXT MEASUREMENT, s4 INT32 MEASUREMENT)");
262+
statement.execute("INSERT INTO t1(time,id1,id2,s3,s4) values(1, 'd1', 'd2', 'red2', 10)");
263+
statement.execute("INSERT INTO t1(time,id1,id2,s3,s4) values(2, 'd1', 'd2', 'blue2', 20)");
264+
statement.execute("flush");
265+
} catch (Exception e) {
266+
fail(e.getMessage());
267+
}
268+
269+
TSStatus status =
270+
client.createPipe(
271+
new TCreatePipeReq("p1", connectorAttributes)
272+
.setExtractorAttributes(extractorAttributes)
273+
.setProcessorAttributes(processorAttributes));
274+
Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
275+
Assert.assertEquals(
276+
TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("p1").getCode());
277+
278+
Set<String> expectedResSet = new java.util.HashSet<>();
279+
expectedResSet.add("1970-01-01T00:00:00.001Z,d1,d2,null,null,d3,red,1,");
280+
expectedResSet.add("1970-01-01T00:00:00.002Z,d1,d2,null,null,d3,blue,2,");
281+
expectedResSet.add("1970-01-01T00:00:00.001Z,d1,d2,red2,10,null,null,null,");
282+
expectedResSet.add("1970-01-01T00:00:00.002Z,d1,d2,blue2,20,null,null,null,");
283+
// make sure data are transferred and column "id3" is auto extended
284+
TestUtils.assertDataEventuallyOnEnv(
285+
receiverEnv,
286+
"select * from t1",
287+
"time,id1,id2,s3,s4,id3,s1,s2,",
288+
expectedResSet,
289+
"db",
290+
handleFailure);
291+
}
292+
}
293+
294+
// Test that receiver can load data when table exists and incoming ID columns are the prefix of
295+
// existing ID columns
296+
@Test
297+
public void testLoadWhenIncomingIdColumnsArePrefixOfExisting() throws Exception {
298+
final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
299+
final String receiverIp = receiverDataNode.getIp();
300+
final int receiverPort = receiverDataNode.getPort();
301+
final Consumer<String> handleFailure =
302+
o -> {
303+
TestUtils.executeNonQueryWithRetry(senderEnv, "flush");
304+
TestUtils.executeNonQueryWithRetry(receiverEnv, "flush");
305+
};
306+
307+
final Map<String, String> extractorAttributes = new HashMap<>();
308+
final Map<String, String> processorAttributes = new HashMap<>();
309+
final Map<String, String> connectorAttributes = new HashMap<>();
310+
311+
extractorAttributes.put("capture.table", "true");
312+
extractorAttributes.put("extractor.realtime.mode", "file");
313+
314+
connectorAttributes.put("connector.batch.enable", "false");
315+
connectorAttributes.put("connector.ip", receiverIp);
316+
connectorAttributes.put("connector.port", Integer.toString(receiverPort));
317+
318+
try (final SyncConfigNodeIServiceClient client =
319+
(SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
320+
try (Connection connection = senderEnv.getConnection(BaseEnv.TABLE_SQL_DIALECT);
321+
Statement statement = connection.createStatement()) {
322+
statement.execute("create database if not exists db");
323+
statement.execute("use db");
324+
statement.execute(
325+
"create table if not exists t1(id1 STRING ID, id2 STRING ID, s1 TEXT MEASUREMENT, s2 INT32 MEASUREMENT)");
326+
statement.execute("INSERT INTO t1(time,id1,id2,s1,s2) values(1, 'd1', 'd2', 'red', 1)");
327+
statement.execute("INSERT INTO t1(time,id1,id2,s1,s2) values(2, 'd1', 'd2', 'blue', 2)");
328+
statement.execute("flush");
329+
} catch (Exception e) {
330+
fail(e.getMessage());
331+
}
332+
333+
try (Connection connection = receiverEnv.getConnection(BaseEnv.TABLE_SQL_DIALECT);
334+
Statement statement = connection.createStatement()) {
335+
statement.execute("create database if not exists db");
336+
statement.execute("use db");
337+
statement.execute(
338+
"create table if not exists t1(id1 STRING ID, id2 STRING ID, id3 STRING ID,s3 TEXT MEASUREMENT, s4 INT32 MEASUREMENT)");
339+
statement.execute(
340+
"INSERT INTO t1(time,id1,id2,id3,s3,s4) values(1, 'd1', 'd2', 'd3', 'red2', 10)");
341+
statement.execute(
342+
"INSERT INTO t1(time,id1,id2,id3,s3,s4) values(2, 'd1', 'd2', 'd3', 'blue2', 20)");
343+
statement.execute("flush");
344+
} catch (Exception e) {
345+
fail(e.getMessage());
346+
}
347+
348+
TSStatus status =
349+
client.createPipe(
350+
new TCreatePipeReq("p1", connectorAttributes)
351+
.setExtractorAttributes(extractorAttributes)
352+
.setProcessorAttributes(processorAttributes));
353+
Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
354+
Assert.assertEquals(
355+
TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("p1").getCode());
356+
357+
Set<String> expectedResSet = new java.util.HashSet<>();
358+
expectedResSet.add("1970-01-01T00:00:00.001Z,d1,d2,d3,red2,10,null,null,");
359+
expectedResSet.add("1970-01-01T00:00:00.002Z,d1,d2,d3,blue2,20,null,null,");
360+
expectedResSet.add("1970-01-01T00:00:00.001Z,d1,d2,null,null,null,red,1,");
361+
expectedResSet.add("1970-01-01T00:00:00.002Z,d1,d2,null,null,null,blue,2,");
362+
// make sure data are transferred and column "id3" is null in transferred data
363+
TestUtils.assertDataEventuallyOnEnv(
364+
receiverEnv,
365+
"select * from t1",
366+
"time,id1,id2,id3,s3,s4,s1,s2,",
367+
expectedResSet,
368+
10,
369+
"db",
370+
handleFailure);
371+
}
372+
}
127373
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileTableSchemaCache.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -255,7 +255,7 @@ private static Object[] truncateNullSuffixesOfDeviceIdSegments(Object[] segments
255255
public void createTable(TableSchema fileSchema, MPPQueryContext context, Metadata metadata)
256256
throws VerifyMetadataException {
257257
final TableSchema realSchema =
258-
metadata.validateTableHeaderSchema(database, fileSchema, context, true).orElse(null);
258+
metadata.validateTableHeaderSchema(database, fileSchema, context, true, true).orElse(null);
259259
if (Objects.isNull(realSchema)) {
260260
throw new VerifyMetadataException(
261261
String.format(

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/Metadata.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,9 @@ List<DeviceEntry> indexScan(
9898
* <p>The caller need to recheck the dataType of measurement columns to decide whether to do
9999
* partial insert
100100
*
101+
* @param isStrictIdColumn if true, when the table already exists, the id columns in the existing
102+
* table should be the prefix of those in the input tableSchema, or input id columns be the
103+
* prefix of existing id columns.
101104
* @return If table doesn't exist and the user have no authority to create table, Optional.empty()
102105
* will be returned. The returned table may not include all the columns
103106
* in @param{tableSchema}, if the user have no authority to alter table.
@@ -108,7 +111,8 @@ Optional<TableSchema> validateTableHeaderSchema(
108111
final String database,
109112
final TableSchema tableSchema,
110113
final MPPQueryContext context,
111-
final boolean allowCreateTable);
114+
final boolean allowCreateTable,
115+
final boolean isStrictIdColumn);
112116

113117
/**
114118
* This method is used for table device validation and should be invoked after column validation.

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/TableMetadataImpl.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -705,9 +705,14 @@ public List<DeviceEntry> indexScan(
705705

706706
@Override
707707
public Optional<TableSchema> validateTableHeaderSchema(
708-
String database, TableSchema tableSchema, MPPQueryContext context, boolean allowCreateTable) {
708+
String database,
709+
TableSchema tableSchema,
710+
MPPQueryContext context,
711+
boolean allowCreateTable,
712+
boolean isStrictIdColumn) {
709713
return TableHeaderSchemaValidator.getInstance()
710-
.validateTableHeaderSchema(database, tableSchema, context, allowCreateTable);
714+
.validateTableHeaderSchema(
715+
database, tableSchema, context, allowCreateTable, isStrictIdColumn);
711716
}
712717

713718
@Override

0 commit comments

Comments
 (0)