Skip to content

Commit 4bd629e

Browse files
Load: verify measurement datatype & register table schema in split file (#13827)
Co-authored-by: Steve Yurong Su <rong@apache.org>
1 parent b3a2142 commit 4bd629e

File tree

10 files changed

+170
-36
lines changed

10 files changed

+170
-36
lines changed

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileToTableModelAnalyzer.java

Lines changed: 71 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
package org.apache.iotdb.db.queryengine.plan.analyze.load;
2121

2222
import org.apache.iotdb.commons.conf.CommonDescriptor;
23+
import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
2324
import org.apache.iotdb.confignode.rpc.thrift.TDatabaseSchema;
2425
import org.apache.iotdb.db.exception.LoadEmptyFileException;
2526
import org.apache.iotdb.db.exception.LoadReadOnlyException;
@@ -30,8 +31,10 @@
3031
import org.apache.iotdb.db.queryengine.plan.execution.config.ConfigTaskResult;
3132
import org.apache.iotdb.db.queryengine.plan.execution.config.executor.ClusterConfigTaskExecutor;
3233
import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.relational.CreateDBTask;
34+
import org.apache.iotdb.db.queryengine.plan.relational.metadata.ColumnSchema;
3335
import org.apache.iotdb.db.queryengine.plan.relational.metadata.ITableDeviceSchemaValidation;
3436
import org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata;
37+
import org.apache.iotdb.db.queryengine.plan.relational.metadata.TableSchema;
3538
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.LoadTsFile;
3639
import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
3740
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
@@ -44,17 +47,19 @@
4447
import com.google.common.util.concurrent.ListenableFuture;
4548
import org.apache.commons.io.FileUtils;
4649
import org.apache.tsfile.file.metadata.IDeviceID;
47-
import org.apache.tsfile.file.metadata.TableSchema;
4850
import org.apache.tsfile.file.metadata.TimeseriesMetadata;
4951
import org.apache.tsfile.read.TsFileSequenceReader;
5052
import org.apache.tsfile.read.TsFileSequenceReaderTimeseriesMetadataIterator;
53+
import org.apache.tsfile.utils.Pair;
5154
import org.slf4j.Logger;
5255
import org.slf4j.LoggerFactory;
5356

5457
import java.io.File;
5558
import java.io.IOException;
59+
import java.util.ArrayList;
5660
import java.util.Arrays;
5761
import java.util.Collections;
62+
import java.util.HashMap;
5863
import java.util.List;
5964
import java.util.Map;
6065
import java.util.Objects;
@@ -70,6 +75,9 @@ public class LoadTsFileToTableModelAnalyzer extends LoadTsFileAnalyzer {
7075

7176
private final Metadata metadata;
7277

78+
// tableName -> Pair<device column count, device column mapping>
79+
private final Map<String, Pair<Integer, List<Integer>>> tableIdColumnMapper = new HashMap<>();
80+
7381
public LoadTsFileToTableModelAnalyzer(
7482
LoadTsFileStatement loadTsFileStatement, Metadata metadata, MPPQueryContext context) {
7583
super(loadTsFileStatement, context);
@@ -137,26 +145,24 @@ protected void analyzeSingleTsFile(final File tsFile)
137145
// construct tsfile resource
138146
final TsFileResource tsFileResource = constructTsFileResource(reader, tsFile);
139147

140-
for (Map.Entry<String, TableSchema> name2Schema :
148+
for (Map.Entry<String, org.apache.tsfile.file.metadata.TableSchema> name2Schema :
141149
reader.readFileMetadata().getTableSchemaMap().entrySet()) {
150+
final TableSchema fileSchema =
151+
TableSchema.fromTsFileTableSchema(name2Schema.getKey(), name2Schema.getValue());
152+
final TableSchema realSchema;
142153
// TODO: remove this synchronized block after the metadata is thread-safe
143154
synchronized (metadata) {
144-
org.apache.iotdb.db.queryengine.plan.relational.metadata.TableSchema realSchema =
145-
metadata
146-
.validateTableHeaderSchema(
147-
database,
148-
org.apache.iotdb.db.queryengine.plan.relational.metadata.TableSchema
149-
.fromTsFileTableSchema(name2Schema.getKey(), name2Schema.getValue()),
150-
context,
151-
true)
152-
.orElse(null);
155+
realSchema =
156+
metadata.validateTableHeaderSchema(database, fileSchema, context, true).orElse(null);
153157
if (Objects.isNull(realSchema)) {
154158
throw new VerifyMetadataException(
155159
String.format(
156160
"Failed to validate schema for table {%s, %s}",
157161
name2Schema.getKey(), name2Schema.getValue()));
158162
}
159163
}
164+
tableIdColumnMapper.clear();
165+
verifyTableDataTypeAndGenerateIdColumnMapper(fileSchema, realSchema);
160166
}
161167

162168
long writePointCount = 0;
@@ -193,6 +199,42 @@ protected void analyzeSingleTsFile(final File tsFile)
193199
}
194200
}
195201

202+
private void verifyTableDataTypeAndGenerateIdColumnMapper(
203+
TableSchema fileSchema, TableSchema realSchema) throws VerifyMetadataException {
204+
final int realIdColumnCount = realSchema.getIdColumns().size();
205+
final List<Integer> idColumnMapping =
206+
tableIdColumnMapper
207+
.computeIfAbsent(
208+
realSchema.getTableName(), k -> new Pair<>(realIdColumnCount, new ArrayList<>()))
209+
.getRight();
210+
for (int i = 0; i < fileSchema.getColumns().size(); i++) {
211+
final ColumnSchema fileColumn = fileSchema.getColumns().get(i);
212+
if (fileColumn.getColumnCategory() == TsTableColumnCategory.ID) {
213+
final int realIndex = realSchema.getIndexAmongIdColumns(fileColumn.getName());
214+
if (realIndex != -1) {
215+
idColumnMapping.add(realIndex);
216+
} else {
217+
throw new VerifyMetadataException(
218+
String.format(
219+
"Id column %s in TsFile is not found in IoTDB table %s",
220+
fileColumn.getName(), realSchema.getTableName()));
221+
}
222+
} else if (fileColumn.getColumnCategory() == TsTableColumnCategory.MEASUREMENT) {
223+
final ColumnSchema realColumn =
224+
realSchema.getColumn(fileColumn.getName(), fileColumn.getColumnCategory());
225+
if (!fileColumn.getType().equals(realColumn.getType())) {
226+
throw new VerifyMetadataException(
227+
String.format(
228+
"Data type mismatch for column %s in table %s, type in TsFile: %s, type in IoTDB: %s",
229+
realColumn.getName(),
230+
realSchema.getTableName(),
231+
fileColumn.getType(),
232+
realColumn.getType()));
233+
}
234+
}
235+
}
236+
}
237+
196238
private void autoCreateDatabase(final String database) throws VerifyMetadataException {
197239
validateDatabaseName(database);
198240
final CreateDBTask task =
@@ -228,8 +270,24 @@ public String getTableName() {
228270

229271
@Override
230272
public List<Object[]> getDeviceIdList() {
231-
return Collections.singletonList(
232-
Arrays.copyOfRange(deviceId.getSegments(), 1, deviceId.getSegments().length));
273+
final Pair<Integer, List<Integer>> idColumnCountAndMapper =
274+
analyzer.tableIdColumnMapper.get(deviceId.getTableName());
275+
if (Objects.isNull(idColumnCountAndMapper)) {
276+
// This should not happen
277+
LOGGER.warn(
278+
"Failed to find id column mapping for table {}, deviceId: {}",
279+
deviceId.getTableName(),
280+
deviceId);
281+
return Collections.singletonList(
282+
Arrays.copyOfRange(deviceId.getSegments(), 1, deviceId.getSegments().length));
283+
}
284+
285+
final Object[] deviceIdArray = new String[idColumnCountAndMapper.getLeft()];
286+
for (int i = 0; i < idColumnCountAndMapper.getRight().size(); i++) {
287+
final int j = idColumnCountAndMapper.getRight().get(i);
288+
deviceIdArray[j] = deviceId.getSegments()[i + 1];
289+
}
290+
return Collections.singletonList(deviceIdArray);
233291
}
234292

235293
@Override

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/TableSchema.java

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,31 @@ public List<ColumnSchema> getColumns() {
5454
return columns;
5555
}
5656

57+
/** Get the column with the specified name and category, return null if not found. */
58+
public ColumnSchema getColumn(String columnName, TsTableColumnCategory columnCategory) {
59+
for (final ColumnSchema column : columns) {
60+
if (column.getName().equals(columnName) && column.getColumnCategory() == columnCategory) {
61+
return column;
62+
}
63+
}
64+
return null;
65+
}
66+
67+
/**
68+
* Given the name of an ID column, return the index of this column among all ID columns, return -1
69+
* if not found.
70+
*/
71+
public int getIndexAmongIdColumns(String idColumnName) {
72+
int index = 0;
73+
for (ColumnSchema column : getIdColumns()) {
74+
if (column.getName().equals(idColumnName)) {
75+
return index;
76+
}
77+
index++;
78+
}
79+
return -1;
80+
}
81+
5782
public static TableSchema of(TsTable tsTable) {
5883
String tableName = tsTable.getTableName();
5984
List<ColumnSchema> columns = new ArrayList<>();
@@ -112,6 +137,12 @@ public static TableSchema fromTsFileTableSchema(
112137
continue;
113138
}
114139

140+
// TsFile should not contain attribute columns by design.
141+
final ColumnType columnType = tsFileTableSchema.getColumnTypes().get(i);
142+
if (columnType == ColumnType.ATTRIBUTE) {
143+
continue;
144+
}
145+
115146
final TSDataType dataType = tsFileTableSchema.getColumnSchemas().get(i).getType();
116147
if (dataType == TSDataType.VECTOR) {
117148
continue;
@@ -122,8 +153,7 @@ public static TableSchema fromTsFileTableSchema(
122153
columnName,
123154
InternalTypeManager.fromTSDataType(dataType),
124155
false,
125-
TsTableColumnCategory.fromTsFileColumnType(
126-
tsFileTableSchema.getColumnTypes().get(i))));
156+
TsTableColumnCategory.fromTsFileColumnType(columnType)));
127157
}
128158
return new TableSchema(tableName, columns);
129159
} catch (Exception e) {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@
6161
import org.apache.iotdb.db.storageengine.load.memory.LoadTsFileMemoryManager;
6262
import org.apache.iotdb.db.storageengine.load.metrics.LoadTsFileCostMetricsSet;
6363
import org.apache.iotdb.db.storageengine.load.splitter.ChunkData;
64+
import org.apache.iotdb.db.storageengine.load.splitter.DeletionData;
6465
import org.apache.iotdb.db.storageengine.load.splitter.TsFileData;
6566
import org.apache.iotdb.db.storageengine.load.splitter.TsFileSplitter;
6667
import org.apache.iotdb.metrics.utils.MetricLevel;
@@ -525,9 +526,15 @@ public TsFileDataManager(
525526
}
526527

527528
private boolean addOrSendTsFileData(TsFileData tsFileData) {
528-
return tsFileData.isModification()
529-
? addOrSendDeletionData(tsFileData)
530-
: addOrSendChunkData((ChunkData) tsFileData);
529+
switch (tsFileData.getType()) {
530+
case CHUNK:
531+
return addOrSendChunkData((ChunkData) tsFileData);
532+
case DELETION:
533+
return addOrSendDeletionData((DeletionData) tsFileData);
534+
default:
535+
throw new UnsupportedOperationException(
536+
String.format("Unsupported TsFileDataType %s.", tsFileData.getType()));
537+
}
531538
}
532539

533540
private boolean isMemoryEnough() {
@@ -605,7 +612,7 @@ private void routeChunkData() {
605612
nonDirectionalChunkData.clear();
606613
}
607614

608-
private boolean addOrSendDeletionData(TsFileData deletionData) {
615+
private boolean addOrSendDeletionData(DeletionData deletionData) {
609616
routeChunkData(); // ensure chunk data will be added before deletion
610617

611618
for (Map.Entry<TRegionReplicaSet, LoadTsFilePieceNode> entry : replicaSet2Piece.entrySet()) {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -220,12 +220,17 @@ public void writeToDataRegion(DataRegion dataRegion, LoadTsFilePieceNode pieceNo
220220
}
221221

222222
for (TsFileData tsFileData : pieceNode.getAllTsFileData()) {
223-
if (!tsFileData.isModification()) {
224-
ChunkData chunkData = (ChunkData) tsFileData;
225-
writerManager.write(
226-
new DataPartitionInfo(dataRegion, chunkData.getTimePartitionSlot()), chunkData);
227-
} else {
228-
writerManager.writeDeletion(dataRegion, (DeletionData) tsFileData);
223+
switch (tsFileData.getType()) {
224+
case CHUNK:
225+
ChunkData chunkData = (ChunkData) tsFileData;
226+
writerManager.write(
227+
new DataPartitionInfo(dataRegion, chunkData.getTimePartitionSlot()), chunkData);
228+
break;
229+
case DELETION:
230+
writerManager.writeDeletion(dataRegion, (DeletionData) tsFileData);
231+
break;
232+
default:
233+
throw new IOException("Unsupported TsFileData type: " + tsFileData.getType());
229234
}
230235
}
231236
} finally {
@@ -398,7 +403,9 @@ private void write(DataPartitionInfo partitionInfo, ChunkData chunkData) throws
398403
return;
399404
}
400405

401-
dataPartition2Writer.put(partitionInfo, new TsFileIOWriter(newTsFile));
406+
final TsFileIOWriter writer = new TsFileIOWriter(newTsFile);
407+
writer.setGenerateTableSchema(true);
408+
dataPartition2Writer.put(partitionInfo, writer);
402409
}
403410
TsFileIOWriter writer = dataPartition2Writer.get(partitionInfo);
404411
if (!chunkData.getDevice().equals(dataPartition2LastDevice.getOrDefault(partitionInfo, ""))) {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/AlignedChunkData.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,7 @@ public void addValueChunk(final ChunkHeader chunkHeader) {
162162

163163
@Override
164164
public void serialize(final DataOutputStream stream) throws IOException {
165-
ReadWriteIOUtils.write(isModification(), stream);
165+
ReadWriteIOUtils.write(getType().ordinal(), stream);
166166
ReadWriteIOUtils.write(isAligned(), stream);
167167
serializeAttr(stream);
168168
byteStream.writeTo(stream);

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/ChunkData.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,8 @@ public interface ChunkData extends TsFileData {
5050
void writeToFileWriter(TsFileIOWriter writer) throws IOException;
5151

5252
@Override
53-
default boolean isModification() {
54-
return false;
53+
default TsFileDataType getType() {
54+
return TsFileDataType.CHUNK;
5555
}
5656

5757
static ChunkData deserialize(InputStream stream) throws PageException, IOException {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/DeletionData.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,13 +49,13 @@ public void writeToModificationFile(ModificationFile modificationFile, long file
4949
}
5050

5151
@Override
52-
public boolean isModification() {
53-
return true;
52+
public TsFileDataType getType() {
53+
return TsFileDataType.DELETION;
5454
}
5555

5656
@Override
5757
public void serialize(DataOutputStream stream) throws IOException {
58-
ReadWriteIOUtils.write(isModification(), stream);
58+
ReadWriteIOUtils.write(getType().ordinal(), stream);
5959
deletion.serializeWithoutFileOffset(stream);
6060
}
6161

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/NonAlignedChunkData.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ public void writeToFileWriter(final TsFileIOWriter writer) throws IOException {
120120

121121
@Override
122122
public void serialize(final DataOutputStream stream) throws IOException {
123-
ReadWriteIOUtils.write(isModification(), stream);
123+
ReadWriteIOUtils.write(getType().ordinal(), stream);
124124
ReadWriteIOUtils.write(isAligned(), stream);
125125
serializeAttr(stream);
126126
byteStream.writeTo(stream);

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/TsFileData.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,13 +31,20 @@
3131
public interface TsFileData {
3232
long getDataSize();
3333

34-
boolean isModification();
34+
TsFileDataType getType();
3535

3636
void serialize(DataOutputStream stream) throws IOException;
3737

3838
static TsFileData deserialize(InputStream stream)
3939
throws IOException, PageException, IllegalPathException {
40-
boolean isModification = ReadWriteIOUtils.readBool(stream);
41-
return isModification ? DeletionData.deserialize(stream) : ChunkData.deserialize(stream);
40+
final TsFileDataType type = TsFileDataType.values()[ReadWriteIOUtils.readInt(stream)];
41+
switch (type) {
42+
case CHUNK:
43+
return ChunkData.deserialize(stream);
44+
case DELETION:
45+
return DeletionData.deserialize(stream);
46+
default:
47+
throw new UnsupportedOperationException("Unknown TsFileData type: " + type);
48+
}
4249
}
4350
}
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iotdb.db.storageengine.load.splitter;
21+
22+
public enum TsFileDataType {
23+
CHUNK,
24+
DELETION
25+
}

0 commit comments

Comments
 (0)