Skip to content

Commit ecacaa1

Browse files
Load: Use IDeviceID in ChunkData (#13887)
1 parent e363388 commit ecacaa1

File tree

8 files changed

+52
-41
lines changed

8 files changed

+52
-41
lines changed

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -591,11 +591,7 @@ private void routeChunkData() {
591591
List<TRegionReplicaSet> replicaSets =
592592
scheduler.partitionFetcher.queryDataPartition(
593593
nonDirectionalChunkData.stream()
594-
.map(
595-
data ->
596-
new Pair<>(
597-
IDeviceID.Factory.DEFAULT_FACTORY.create(data.getDevice()),
598-
data.getTimePartitionSlot()))
594+
.map(data -> new Pair<>(data.getDevice(), data.getTimePartitionSlot()))
599595
.collect(Collectors.toList()),
600596
scheduler.queryContext.getSession().getUserName());
601597
IntStream.range(0, nonDirectionalChunkData.size())

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -361,7 +361,7 @@ private static class TsFileWriterManager {
361361

362362
private final File taskDir;
363363
private Map<DataPartitionInfo, TsFileIOWriter> dataPartition2Writer;
364-
private Map<DataPartitionInfo, String> dataPartition2LastDevice;
364+
private Map<DataPartitionInfo, IDeviceID> dataPartition2LastDevice;
365365
private Map<DataPartitionInfo, ModificationFile> dataPartition2ModificationFile;
366366
private boolean isClosed;
367367

@@ -408,11 +408,11 @@ private void write(DataPartitionInfo partitionInfo, ChunkData chunkData) throws
408408
dataPartition2Writer.put(partitionInfo, writer);
409409
}
410410
TsFileIOWriter writer = dataPartition2Writer.get(partitionInfo);
411-
if (!chunkData.getDevice().equals(dataPartition2LastDevice.getOrDefault(partitionInfo, ""))) {
411+
if (!Objects.equals(chunkData.getDevice(), dataPartition2LastDevice.get(partitionInfo))) {
412412
if (dataPartition2LastDevice.containsKey(partitionInfo)) {
413413
writer.endChunkGroup();
414414
}
415-
writer.startChunkGroup(IDeviceID.Factory.DEFAULT_FACTORY.create(chunkData.getDevice()));
415+
writer.startChunkGroup(chunkData.getDevice());
416416
dataPartition2LastDevice.put(partitionInfo, chunkData.getDevice());
417417
}
418418
chunkData.writeToFileWriter(writer);

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/AlignedChunkData.java

Lines changed: 21 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -22,17 +22,18 @@
2222
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
2323
import org.apache.iotdb.commons.utils.TimePartitionUtils;
2424

25-
import org.apache.tsfile.common.conf.TSFileConfig;
2625
import org.apache.tsfile.enums.TSDataType;
2726
import org.apache.tsfile.exception.write.PageException;
2827
import org.apache.tsfile.file.header.ChunkHeader;
2928
import org.apache.tsfile.file.header.PageHeader;
3029
import org.apache.tsfile.file.metadata.IChunkMetadata;
30+
import org.apache.tsfile.file.metadata.IDeviceID;
31+
import org.apache.tsfile.file.metadata.PlainDeviceID;
32+
import org.apache.tsfile.file.metadata.StringArrayDeviceID;
3133
import org.apache.tsfile.file.metadata.statistics.Statistics;
3234
import org.apache.tsfile.read.common.Chunk;
3335
import org.apache.tsfile.utils.Binary;
3436
import org.apache.tsfile.utils.PublicBAOS;
35-
import org.apache.tsfile.utils.ReadWriteForEncodingUtils;
3637
import org.apache.tsfile.utils.ReadWriteIOUtils;
3738
import org.apache.tsfile.utils.TsPrimitiveType;
3839
import org.apache.tsfile.write.UnSupportedDataTypeException;
@@ -41,6 +42,8 @@
4142
import org.apache.tsfile.write.schema.MeasurementSchema;
4243
import org.apache.tsfile.write.writer.TsFileIOWriter;
4344

45+
import javax.annotation.Nonnull;
46+
4447
import java.io.DataOutputStream;
4548
import java.io.IOException;
4649
import java.io.InputStream;
@@ -60,7 +63,7 @@ public class AlignedChunkData implements ChunkData {
6063
protected static final Binary DEFAULT_BINARY = null;
6164

6265
protected final TTimePartitionSlot timePartitionSlot;
63-
protected final String device;
66+
protected final IDeviceID device;
6467
protected List<ChunkHeader> chunkHeaderList;
6568

6669
protected final PublicBAOS byteStream;
@@ -75,7 +78,7 @@ public class AlignedChunkData implements ChunkData {
7578
protected List<Chunk> chunkList;
7679

7780
public AlignedChunkData(
78-
final String device,
81+
@Nonnull final IDeviceID device,
7982
final ChunkHeader chunkHeader,
8083
final TTimePartitionSlot timePartitionSlot) {
8184
this(device, timePartitionSlot);
@@ -84,14 +87,15 @@ public AlignedChunkData(
8487
addAttrDataSize();
8588
}
8689

87-
protected AlignedChunkData(AlignedChunkData alignedChunkData) {
90+
protected AlignedChunkData(final AlignedChunkData alignedChunkData) {
8891
this(alignedChunkData.device, alignedChunkData.timePartitionSlot);
8992
this.satisfiedLengthQueue = new LinkedList<>(alignedChunkData.satisfiedLengthQueue);
9093
this.needDecodeChunk = alignedChunkData.needDecodeChunk;
9194
addAttrDataSize();
9295
}
9396

94-
protected AlignedChunkData(String device, TTimePartitionSlot timePartitionSlot) {
97+
protected AlignedChunkData(
98+
@Nonnull final IDeviceID device, final TTimePartitionSlot timePartitionSlot) {
9599
this.dataSize = 0;
96100
this.device = device;
97101
this.chunkHeaderList = new ArrayList<>();
@@ -106,17 +110,15 @@ protected AlignedChunkData(String device, TTimePartitionSlot timePartitionSlot)
106110
private void addAttrDataSize() { // Should be init before serialize, corresponding serializeAttr
107111
dataSize += 2 * Byte.BYTES; // isModification and isAligned
108112
dataSize += Long.BYTES; // timePartitionSlot
109-
final int deviceLength = device.getBytes(TSFileConfig.STRING_CHARSET).length;
110-
dataSize += ReadWriteForEncodingUtils.varIntSize(deviceLength);
111-
dataSize += deviceLength; // device
113+
dataSize += device.serializedSize(); // device
112114
dataSize += Integer.BYTES; // chunkHeaderListSize
113115
if (!chunkHeaderList.isEmpty()) {
114116
dataSize += chunkHeaderList.get(0).getSerializedSize(); // timeChunkHeader
115117
}
116118
}
117119

118120
@Override
119-
public String getDevice() {
121+
public IDeviceID getDevice() {
120122
return device;
121123
}
122124

@@ -171,7 +173,10 @@ public void serialize(final DataOutputStream stream) throws IOException {
171173

172174
private void serializeAttr(final DataOutputStream stream) throws IOException {
173175
ReadWriteIOUtils.write(timePartitionSlot.getStartTime(), stream);
174-
ReadWriteIOUtils.write(device, stream);
176+
177+
ReadWriteIOUtils.write(device instanceof StringArrayDeviceID, stream);
178+
device.serialize(stream);
179+
175180
ReadWriteIOUtils.write(dataSize, stream);
176181
ReadWriteIOUtils.write(needDecodeChunk, stream);
177182
ReadWriteIOUtils.write(chunkHeaderList.size(), stream);
@@ -421,7 +426,11 @@ public static AlignedChunkData deserialize(final InputStream stream)
421426
throws IOException, PageException {
422427
final TTimePartitionSlot timePartitionSlot =
423428
TimePartitionUtils.getTimePartitionSlot(ReadWriteIOUtils.readLong(stream));
424-
final String device = ReadWriteIOUtils.readString(stream);
429+
final boolean isStringArrayDeviceID = ReadWriteIOUtils.readBool(stream);
430+
final IDeviceID device =
431+
isStringArrayDeviceID
432+
? StringArrayDeviceID.deserialize(stream)
433+
: PlainDeviceID.deserialize(stream).convertToStringArrayDeviceId();
425434
final long dataSize = ReadWriteIOUtils.readLong(stream);
426435
final boolean needDecodeChunk = ReadWriteIOUtils.readBool(stream);
427436
final int chunkHeaderListSize = ReadWriteIOUtils.readInt(stream);

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/BatchedAlignedValueChunkData.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.tsfile.exception.write.PageException;
2727
import org.apache.tsfile.file.header.ChunkHeader;
2828
import org.apache.tsfile.file.header.PageHeader;
29+
import org.apache.tsfile.file.metadata.IDeviceID;
2930
import org.apache.tsfile.file.metadata.statistics.Statistics;
3031
import org.apache.tsfile.read.common.Chunk;
3132
import org.apache.tsfile.utils.Binary;
@@ -59,7 +60,7 @@ public BatchedAlignedValueChunkData(AlignedChunkData alignedChunkData) {
5960
}
6061

6162
// Used for deserialize
62-
public BatchedAlignedValueChunkData(String device, TTimePartitionSlot timePartitionSlot) {
63+
public BatchedAlignedValueChunkData(IDeviceID device, TTimePartitionSlot timePartitionSlot) {
6364
super(device, timePartitionSlot);
6465
valueChunkWriters = new ArrayList<>();
6566
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/ChunkData.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.tsfile.file.header.ChunkHeader;
2626
import org.apache.tsfile.file.header.PageHeader;
2727
import org.apache.tsfile.file.metadata.IChunkMetadata;
28+
import org.apache.tsfile.file.metadata.IDeviceID;
2829
import org.apache.tsfile.utils.ReadWriteIOUtils;
2930
import org.apache.tsfile.write.writer.TsFileIOWriter;
3031

@@ -33,7 +34,7 @@
3334
import java.nio.ByteBuffer;
3435

3536
public interface ChunkData extends TsFileData {
36-
String getDevice();
37+
IDeviceID getDevice();
3738

3839
TTimePartitionSlot getTimePartitionSlot();
3940

@@ -63,7 +64,7 @@ static ChunkData deserialize(InputStream stream) throws PageException, IOExcepti
6364

6465
static ChunkData createChunkData(
6566
boolean isAligned,
66-
String device,
67+
IDeviceID device,
6768
ChunkHeader chunkHeader,
6869
TTimePartitionSlot timePartitionSlot) {
6970
return isAligned

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/NonAlignedChunkData.java

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,22 +22,25 @@
2222
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
2323
import org.apache.iotdb.commons.utils.TimePartitionUtils;
2424

25-
import org.apache.tsfile.common.conf.TSFileConfig;
2625
import org.apache.tsfile.exception.write.PageException;
2726
import org.apache.tsfile.file.header.ChunkHeader;
2827
import org.apache.tsfile.file.header.PageHeader;
2928
import org.apache.tsfile.file.metadata.IChunkMetadata;
29+
import org.apache.tsfile.file.metadata.IDeviceID;
30+
import org.apache.tsfile.file.metadata.PlainDeviceID;
31+
import org.apache.tsfile.file.metadata.StringArrayDeviceID;
3032
import org.apache.tsfile.file.metadata.statistics.Statistics;
3133
import org.apache.tsfile.read.common.Chunk;
3234
import org.apache.tsfile.utils.Binary;
3335
import org.apache.tsfile.utils.PublicBAOS;
34-
import org.apache.tsfile.utils.ReadWriteForEncodingUtils;
3536
import org.apache.tsfile.utils.ReadWriteIOUtils;
3637
import org.apache.tsfile.write.UnSupportedDataTypeException;
3738
import org.apache.tsfile.write.chunk.ChunkWriterImpl;
3839
import org.apache.tsfile.write.schema.MeasurementSchema;
3940
import org.apache.tsfile.write.writer.TsFileIOWriter;
4041

42+
import javax.annotation.Nonnull;
43+
4144
import java.io.DataOutputStream;
4245
import java.io.IOException;
4346
import java.io.InputStream;
@@ -47,7 +50,7 @@
4750
public class NonAlignedChunkData implements ChunkData {
4851

4952
private final TTimePartitionSlot timePartitionSlot;
50-
private final String device;
53+
private final IDeviceID device;
5154
private final ChunkHeader chunkHeader;
5255

5356
private final PublicBAOS byteStream;
@@ -60,7 +63,7 @@ public class NonAlignedChunkData implements ChunkData {
6063
private Chunk chunk;
6164

6265
public NonAlignedChunkData(
63-
final String device,
66+
@Nonnull final IDeviceID device,
6467
final ChunkHeader chunkHeader,
6568
final TTimePartitionSlot timePartitionSlot) {
6669
this.dataSize = 0;
@@ -78,14 +81,12 @@ public NonAlignedChunkData(
7881
private void addAttrDataSize() { // should be init before serialize, corresponding serializeAttr
7982
dataSize += 2 * Byte.BYTES; // isModification and isAligned
8083
dataSize += Long.BYTES; // timePartitionSlot
81-
final int deviceLength = device.getBytes(TSFileConfig.STRING_CHARSET).length;
82-
dataSize += ReadWriteForEncodingUtils.varIntSize(deviceLength);
83-
dataSize += deviceLength; // device
84+
dataSize += device.serializedSize(); // device
8485
dataSize += chunkHeader.getSerializedSize(); // timeChunkHeader
8586
}
8687

8788
@Override
88-
public String getDevice() {
89+
public IDeviceID getDevice() {
8990
return device;
9091
}
9192

@@ -129,7 +130,10 @@ public void serialize(final DataOutputStream stream) throws IOException {
129130

130131
private void serializeAttr(final DataOutputStream stream) throws IOException {
131132
ReadWriteIOUtils.write(timePartitionSlot.getStartTime(), stream);
132-
ReadWriteIOUtils.write(device, stream);
133+
134+
ReadWriteIOUtils.write(device instanceof StringArrayDeviceID, stream);
135+
device.serialize(stream);
136+
133137
ReadWriteIOUtils.write(dataSize, stream);
134138
ReadWriteIOUtils.write(needDecodeChunk, stream);
135139
chunkHeader.serializeTo(stream); // chunk header already serialize chunk type
@@ -275,7 +279,11 @@ public static NonAlignedChunkData deserialize(final InputStream stream)
275279
throws IOException, PageException {
276280
final TTimePartitionSlot timePartitionSlot =
277281
TimePartitionUtils.getTimePartitionSlot(ReadWriteIOUtils.readLong(stream));
278-
final String device = ReadWriteIOUtils.readString(stream);
282+
final boolean isStringArrayDeviceID = ReadWriteIOUtils.readBool(stream);
283+
final IDeviceID device =
284+
isStringArrayDeviceID
285+
? StringArrayDeviceID.deserialize(stream)
286+
: PlainDeviceID.deserialize(stream).convertToStringArrayDeviceId();
279287
final long dataSize = ReadWriteIOUtils.readLong(stream);
280288
final boolean needDecodeChunk = ReadWriteIOUtils.readBool(stream);
281289
final byte chunkType = ReadWriteIOUtils.readByte(stream);

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/TsFileSplitter.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,7 @@ private void processTimeChunkOrNonAlignedChunk(TsFileSequenceReader reader, byte
173173
TTimePartitionSlot timePartitionSlot =
174174
TimePartitionUtils.getTimePartitionSlot(chunkMetadata.getStartTime());
175175
ChunkData chunkData =
176-
ChunkData.createChunkData(isAligned, curDevice.toString(), header, timePartitionSlot);
176+
ChunkData.createChunkData(isAligned, curDevice, header, timePartitionSlot);
177177

178178
if (!needDecodeChunk(chunkMetadata)) {
179179
chunkData.setNotDecode();
@@ -230,8 +230,7 @@ private void decodeAndWriteTimeChunkOrNonAlignedChunk(
230230
consumeChunkData(measurementId, chunkOffset, chunkData);
231231
}
232232
timePartitionSlot = pageTimePartitionSlot;
233-
chunkData =
234-
ChunkData.createChunkData(isAligned, curDevice.toString(), header, timePartitionSlot);
233+
chunkData = ChunkData.createChunkData(isAligned, curDevice, header, timePartitionSlot);
235234
}
236235
if (isAligned) {
237236
pageIndex2ChunkData
@@ -267,9 +266,7 @@ private void decodeAndWriteTimeChunkOrNonAlignedChunk(
267266
satisfiedLength = 0;
268267
endTime =
269268
timePartitionSlot.getStartTime() + TimePartitionUtils.getTimePartitionInterval();
270-
chunkData =
271-
ChunkData.createChunkData(
272-
isAligned, curDevice.toString(), header, timePartitionSlot);
269+
chunkData = ChunkData.createChunkData(isAligned, curDevice, header, timePartitionSlot);
273270
}
274271
satisfiedLength += 1;
275272
}

iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/BatchedCompactionWithTsFileSplitterTest.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -278,8 +278,7 @@ private void consumeChunkDataAndValidate(TsFileResource resource)
278278
}
279279
});
280280
try {
281-
IDeviceID deviceID =
282-
IDeviceID.Factory.DEFAULT_FACTORY.create(alignedChunkData.getDevice());
281+
final IDeviceID deviceID = alignedChunkData.getDevice();
283282
if (!deviceID.equals(writer.currentDevice)) {
284283
if (writer.currentDevice != null) {
285284
writer.endChunkGroup();

0 commit comments

Comments
 (0)