Skip to content

Commit e480abc

Browse files
authored
Pipe: Fix the inconsistency between schema and values columns in the process of building tsfile (#15625) (#15778)
1 parent 25d51fe commit e480abc

File tree

1 file changed

+31
-10
lines changed

1 file changed

+31
-10
lines changed

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventTsFileBatch.java

Lines changed: 31 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import org.apache.tsfile.utils.Pair;
4848
import org.apache.tsfile.write.TsFileWriter;
4949
import org.apache.tsfile.write.record.Tablet;
50+
import org.apache.tsfile.write.schema.IMeasurementSchema;
5051
import org.apache.tsfile.write.schema.MeasurementSchema;
5152
import org.apache.tsfile.write.writer.RestorableTsFileIOWriter;
5253
import org.slf4j.Logger;
@@ -367,6 +368,7 @@ private Tablet tryBestToAggregateTablets(
367368
final Set<MeasurementSchema> seen = new HashSet<>();
368369
final List<Integer> distinctIndices =
369370
IntStream.range(0, aggregatedSchemas.size())
371+
.filter(i -> Objects.nonNull(aggregatedSchemas.get(i)))
370372
.filter(i -> seen.add(aggregatedSchemas.get(i))) // Only keep the first occurrence index
371373
.boxed()
372374
.collect(Collectors.toList());
@@ -548,14 +550,23 @@ private void writeTabletsIntoOneFile(
548550
final IMemTable memTable, final RestorableTsFileIOWriter writer) throws Exception {
549551
for (int i = 0, size = tabletList.size(); i < size; ++i) {
550552
final Tablet tablet = tabletList.get(i);
553+
MeasurementSchema[] measurementSchemas =
554+
tablet.getSchemas().stream()
555+
.map(schema -> (MeasurementSchema) schema)
556+
.toArray(MeasurementSchema[]::new);
557+
Object[] values = Arrays.copyOf(tablet.values, tablet.values.length);
558+
BitMap[] bitMaps = Arrays.copyOf(tablet.bitMaps, tablet.bitMaps.length);
551559

552560
// convert date value to int refer to
553561
// org.apache.iotdb.db.storageengine.dataregion.memtable.WritableMemChunk.writeNonAlignedTablet
554-
final Object[] values = Arrays.copyOf(tablet.values, tablet.values.length);
562+
int validatedIndex = 0;
555563
for (int j = 0; j < tablet.getSchemas().size(); ++j) {
556-
final MeasurementSchema schema = tablet.getSchemas().get(j);
557-
if (Objects.nonNull(schema)
558-
&& Objects.equals(TSDataType.DATE, schema.getType())
564+
final IMeasurementSchema schema = measurementSchemas[j];
565+
if (Objects.isNull(schema)) {
566+
continue;
567+
}
568+
569+
if (Objects.equals(TSDataType.DATE, schema.getType())
559570
&& values[j] instanceof LocalDate[]) {
560571
final LocalDate[] dates = ((LocalDate[]) values[j]);
561572
final int[] dateValues = new int[dates.length];
@@ -564,23 +575,33 @@ private void writeTabletsIntoOneFile(
564575
}
565576
values[j] = dateValues;
566577
}
578+
measurementSchemas[validatedIndex] = measurementSchemas[j];
579+
values[validatedIndex] = values[j];
580+
bitMaps[validatedIndex] = bitMaps[j];
581+
validatedIndex++;
582+
}
583+
584+
if (validatedIndex != measurementSchemas.length) {
585+
values = Arrays.copyOf(values, validatedIndex);
586+
measurementSchemas = Arrays.copyOf(measurementSchemas, validatedIndex);
587+
bitMaps = Arrays.copyOf(bitMaps, validatedIndex);
567588
}
568589

569590
final InsertTabletNode insertTabletNode =
570591
new InsertTabletNode(
571592
PLACEHOLDER_PLAN_NODE_ID,
572593
new PartialPath(tablet.deviceId),
573594
isTabletAlignedList.get(i),
574-
tablet.getSchemas().stream()
575-
.map(m -> Objects.nonNull(m) ? m.getMeasurementId() : null)
595+
Arrays.stream(measurementSchemas)
596+
.map(IMeasurementSchema::getMeasurementId)
576597
.toArray(String[]::new),
577-
tablet.getSchemas().stream()
578-
.map(m -> Objects.nonNull(m) ? m.getType() : null)
598+
Arrays.stream(measurementSchemas)
599+
.map(IMeasurementSchema::getType)
579600
.toArray(TSDataType[]::new),
580601
// TODO: cast
581-
tablet.getSchemas().toArray(new MeasurementSchema[0]),
602+
measurementSchemas,
582603
tablet.timestamps,
583-
tablet.bitMaps,
604+
bitMaps,
584605
values,
585606
tablet.rowSize);
586607

0 commit comments

Comments
 (0)