Skip to content

Commit 1adc74d

Browse files
authored
fix: memtable enhancement issues (#14994)
* perf: improve seq inserting * rename MergeSortTvListIterator to MergeSortTVListIterator * fix: concurrent indices modification during query sort and flush sort
1 parent 0db8987 commit 1adc74d

File tree

13 files changed

+37
-67
lines changed

13 files changed

+37
-67
lines changed

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -375,15 +375,6 @@ public boolean chunkNotExist(IDeviceID deviceId, String measurement) {
375375
return !memChunkGroup.contains(measurement);
376376
}
377377

378-
@Override
379-
public long getMeasurementSize(IDeviceID deviceId, String measurement) {
380-
IWritableMemChunkGroup memChunkGroup = memTableMap.get(deviceId);
381-
if (null == memChunkGroup) {
382-
return 0;
383-
}
384-
return memChunkGroup.getMeasurementSize(measurement);
385-
}
386-
387378
@Override
388379
public IWritableMemChunk getWritableMemChunk(IDeviceID deviceId, String measurement) {
389380
IWritableMemChunkGroup memChunkGroup = memTableMap.get(deviceId);

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunkGroup.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -137,11 +137,6 @@ public long deleteTime(ModEntry modEntry) {
137137
return memChunk.deleteTime(modEntry.getStartTime(), modEntry.getEndTime());
138138
}
139139

140-
@Override
141-
public long getMeasurementSize(String measurement) {
142-
return memChunk.rowCount();
143-
}
144-
145140
@Override
146141
public IWritableMemChunk getWritableMemChunk(String measurement) {
147142
return memChunk;

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IMemTable.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -169,8 +169,6 @@ void queryForDeviceRegionScan(
169169
boolean chunkNotExist(IDeviceID deviceId, String measurement);
170170

171171
/** only used when mem control enabled */
172-
long getMeasurementSize(IDeviceID deviceId, String measurement);
173-
174172
IWritableMemChunk getWritableMemChunk(IDeviceID deviceId, String measurement);
175173

176174
/** only used when mem control enabled */

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunkGroup.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,8 +56,6 @@ void writeTablet(
5656

5757
long deleteTime(ModEntry modEntry);
5858

59-
long getMeasurementSize(String measurement);
60-
6159
IWritableMemChunk getWritableMemChunk(String measurement);
6260

6361
long getMaxTime();

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/ReadOnlyMemChunk.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext;
2525
import org.apache.iotdb.db.storageengine.dataregion.read.reader.chunk.MemChunkLoader;
2626
import org.apache.iotdb.db.utils.MathUtils;
27-
import org.apache.iotdb.db.utils.datastructure.MergeSortTvListIterator;
27+
import org.apache.iotdb.db.utils.datastructure.MergeSortTVListIterator;
2828
import org.apache.iotdb.db.utils.datastructure.TVList;
2929

3030
import org.apache.tsfile.common.conf.TSFileDescriptor;
@@ -85,7 +85,7 @@ public class ReadOnlyMemChunk {
8585
// TVList and its rowCount during query
8686
private Map<TVList, Integer> tvListQueryMap;
8787

88-
private MergeSortTvListIterator timeValuePairIterator;
88+
private MergeSortTVListIterator timeValuePairIterator;
8989

9090
protected final int MAX_NUMBER_OF_POINTS_IN_PAGE =
9191
TSFileDescriptor.getInstance().getConfig().getMaxNumberOfPointsInPage();
@@ -148,7 +148,7 @@ public void initChunkMetaFromTvLists() {
148148
int cnt = 0;
149149
int[] deleteCursor = {0};
150150
List<TVList> tvLists = new ArrayList<>(tvListQueryMap.keySet());
151-
timeValuePairIterator = new MergeSortTvListIterator(tvLists, floatPrecision, encoding);
151+
timeValuePairIterator = new MergeSortTVListIterator(tvLists, floatPrecision, encoding);
152152
int[] tvListOffsets = timeValuePairIterator.getTVListOffsets();
153153
while (timeValuePairIterator.hasNextTimeValuePair()) {
154154
if (cnt % MAX_NUMBER_OF_POINTS_IN_PAGE == 0) {
@@ -246,7 +246,7 @@ private void writeValidValuesIntoTsBlock(TsBlockBuilder builder) throws IOExcept
246246
int[] deleteCursor = {0};
247247
List<TVList> tvLists = new ArrayList<>(tvListQueryMap.keySet());
248248
IPointReader timeValuePairIterator =
249-
new MergeSortTvListIterator(tvLists, floatPrecision, encoding);
249+
new MergeSortTVListIterator(tvLists, floatPrecision, encoding);
250250

251251
while (timeValuePairIterator.hasNextTimeValuePair()) {
252252
TimeValuePair tvPair = timeValuePairIterator.nextTimeValuePair();
@@ -334,7 +334,7 @@ public TsBlock getTsBlock() {
334334
return null;
335335
}
336336

337-
public MergeSortTvListIterator getMergeSortTVListIterator() {
337+
public MergeSortTVListIterator getMergeSortTVListIterator() {
338338
return timeValuePairIterator;
339339
}
340340

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -658,8 +658,8 @@ private long[] checkMemCostAndAddToTspInfoForRow(
658658
memTableIncrement += TVList.tvListArrayMemCost(dataTypes[i]);
659659
} else {
660660
// here currentChunkPointNum >= 1
661-
long currentChunkPointNum = workMemTable.getMeasurementSize(deviceId, measurements[i]);
662661
IWritableMemChunk memChunk = workMemTable.getWritableMemChunk(deviceId, measurements[i]);
662+
long currentChunkPointNum = memChunk != null ? memChunk.rowCount() : 0;
663663
if (currentChunkPointNum % PrimitiveArrayManager.ARRAY_SIZE == 0) {
664664
memTableIncrement +=
665665
memChunk != null ? memChunk.getWorkingTVList().tvListArrayMemCost() : 0;
@@ -704,12 +704,12 @@ private long[] checkMemCostAndAddToTspInfoForRows(List<InsertRowNode> insertRowN
704704
.putIfAbsent(measurements[i], 1);
705705
} else {
706706
// here currentChunkPointNum >= 1
707-
long currentChunkPointNum = workMemTable.getMeasurementSize(deviceId, measurements[i]);
708707
IWritableMemChunk memChunk = workMemTable.getWritableMemChunk(deviceId, measurements[i]);
709708
int addingPointNum =
710709
increasingMemTableInfo
711710
.computeIfAbsent(deviceId, k -> new HashMap<>())
712711
.computeIfAbsent(measurements[i], k -> 0);
712+
long currentChunkPointNum = memChunk != null ? memChunk.rowCount() : 0;
713713
if ((currentChunkPointNum + addingPointNum) % PrimitiveArrayManager.ARRAY_SIZE == 0) {
714714
memTableIncrement +=
715715
memChunk != null
@@ -976,8 +976,8 @@ private void updateMemCost(
976976
((end - start) / PrimitiveArrayManager.ARRAY_SIZE + 1)
977977
* TVList.tvListArrayMemCost(dataType);
978978
} else {
979-
long currentChunkPointNum = workMemTable.getMeasurementSize(deviceId, measurement);
980979
IWritableMemChunk memChunk = workMemTable.getWritableMemChunk(deviceId, measurement);
980+
long currentChunkPointNum = memChunk != null ? memChunk.rowCount() : 0;
981981
if (currentChunkPointNum % PrimitiveArrayManager.ARRAY_SIZE == 0) {
982982
memIncrements[0] +=
983983
((end - start) / PrimitiveArrayManager.ARRAY_SIZE + 1)
@@ -989,11 +989,7 @@ private void updateMemCost(
989989
(end - start - 1 + (currentChunkPointNum % PrimitiveArrayManager.ARRAY_SIZE))
990990
/ PrimitiveArrayManager.ARRAY_SIZE;
991991
if (acquireArray != 0) {
992-
memIncrements[0] +=
993-
acquireArray
994-
* (memChunk != null
995-
? memChunk.getWorkingTVList().tvListArrayMemCost()
996-
: TVList.tvListArrayMemCost(dataType));
992+
memIncrements[0] += acquireArray * memChunk.getWorkingTVList().tvListArrayMemCost();
997993
}
998994
}
999995
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
import org.apache.iotdb.db.queryengine.plan.planner.memory.MemoryReservationManager;
2828
import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView;
2929
import org.apache.iotdb.db.utils.ModificationUtils;
30-
import org.apache.iotdb.db.utils.datastructure.MergeSortTvListIterator;
30+
import org.apache.iotdb.db.utils.datastructure.MergeSortTVListIterator;
3131
import org.apache.iotdb.db.utils.datastructure.TVList;
3232

3333
import org.apache.tsfile.enums.TSDataType;
@@ -641,7 +641,7 @@ public synchronized void encode(BlockingQueue<Object> ioTaskQueue) {
641641
// create MergeSortTvListIterator. It need not handle float/double precision here.
642642
List<TVList> tvLists = new ArrayList<>(sortedList);
643643
tvLists.add(list);
644-
MergeSortTvListIterator timeValuePairIterator = new MergeSortTvListIterator(tvLists);
644+
MergeSortTVListIterator timeValuePairIterator = new MergeSortTVListIterator(tvLists);
645645

646646
TimeValuePair prevTvPair = null;
647647
while (timeValuePairIterator.hasNextTimeValuePair()) {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunkGroup.java

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -145,14 +145,6 @@ public long deleteTime(ModEntry modEntry) {
145145
return delete(modEntry);
146146
}
147147

148-
@Override
149-
public long getMeasurementSize(String measurement) {
150-
if (!memChunkMap.containsKey(measurement)) {
151-
return 0;
152-
}
153-
return memChunkMap.get(measurement).rowCount();
154-
}
155-
156148
@Override
157149
public IWritableMemChunk getWritableMemChunk(String measurement) {
158150
if (!memChunkMap.containsKey(measurement)) {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/chunk/MemChunkReader.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
package org.apache.iotdb.db.storageengine.dataregion.read.reader.chunk;
2121

2222
import org.apache.iotdb.db.storageengine.dataregion.memtable.ReadOnlyMemChunk;
23-
import org.apache.iotdb.db.utils.datastructure.MergeSortTvListIterator;
23+
import org.apache.iotdb.db.utils.datastructure.MergeSortTVListIterator;
2424

2525
import org.apache.tsfile.enums.TSDataType;
2626
import org.apache.tsfile.file.metadata.IChunkMetadata;
@@ -47,7 +47,7 @@
4747
public class MemChunkReader implements IChunkReader, IPointReader {
4848

4949
private final ReadOnlyMemChunk readableChunk;
50-
private final MergeSortTvListIterator timeValuePairIterator;
50+
private final MergeSortTVListIterator timeValuePairIterator;
5151
private final Filter globalTimeFilter;
5252
private final List<IPageReader> pageReaderList;
5353

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/chunk/MemPageReader.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
package org.apache.iotdb.db.storageengine.dataregion.read.reader.chunk;
2121

2222
import org.apache.iotdb.db.storageengine.dataregion.read.reader.chunk.metadata.PageMetadata;
23-
import org.apache.iotdb.db.utils.datastructure.MergeSortTvListIterator;
23+
import org.apache.iotdb.db.utils.datastructure.MergeSortTVListIterator;
2424

2525
import org.apache.tsfile.block.column.Column;
2626
import org.apache.tsfile.block.column.ColumnBuilder;
@@ -51,7 +51,7 @@ public class MemPageReader implements IPageReader {
5151
private TsBlock tsBlock;
5252
private Filter recordFilter;
5353

54-
private final MergeSortTvListIterator mergeSortTvListIterator;
54+
private final MergeSortTVListIterator mergeSortTvListIterator;
5555
// MemPage range - [pageStartOffsets, pageEndOffsets)
5656
private final int[] pageStartOffsets;
5757
private final int[] pageEndOffsets;
@@ -63,7 +63,7 @@ public class MemPageReader implements IPageReader {
6363

6464
public MemPageReader(
6565
Supplier<TsBlock> tsBlockSupplier,
66-
MergeSortTvListIterator mergeSortTvListIterator,
66+
MergeSortTVListIterator mergeSortTvListIterator,
6767
int[] pageStartOffsets,
6868
int[] pageEndOffSets,
6969
TSDataType tsDataType,

0 commit comments

Comments
 (0)