Skip to content

Commit 08fc8ab

Browse files
Pipe: better memory control for in-memory tablets (#13301)
1 parent 71f765b commit 08fc8ab

File tree

12 files changed

+314
-31
lines changed

12 files changed

+314
-31
lines changed

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRow.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,21 @@ public boolean isAligned() {
190190
return isAligned;
191191
}
192192

193+
public int getCurrentRowSize() {
194+
int rowSize = 0;
195+
rowSize += 8; // timestamp
196+
for (int i = 0; i < valueColumnTypes.length; i++) {
197+
if (valueColumnTypes[i] != null) {
198+
if (valueColumnTypes[i].isBinary()) {
199+
rowSize += getBinary(i) != null ? getBinary(i).getLength() : 0;
200+
} else {
201+
rowSize += valueColumnTypes[i].getDataTypeSize();
202+
}
203+
}
204+
}
205+
return rowSize;
206+
}
207+
193208
public IMeasurementSchema[] getMeasurementSchemaList() {
194209
return measurementSchemaList;
195210
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRowCollector.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,16 @@
1919

2020
package org.apache.iotdb.db.pipe.event.common.row;
2121

22-
import org.apache.iotdb.commons.pipe.config.PipeConfig;
2322
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
2423
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
2524
import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
25+
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil;
2626
import org.apache.iotdb.pipe.api.access.Row;
2727
import org.apache.iotdb.pipe.api.collector.RowCollector;
2828
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
2929
import org.apache.iotdb.pipe.api.exception.PipeException;
3030

31+
import org.apache.tsfile.utils.Pair;
3132
import org.apache.tsfile.write.record.Tablet;
3233
import org.apache.tsfile.write.schema.IMeasurementSchema;
3334

@@ -66,13 +67,12 @@ public void collectRow(Row row) {
6667
final String deviceId = pipeRow.getDeviceId();
6768
final List<IMeasurementSchema> measurementSchemaList =
6869
new ArrayList<>(Arrays.asList(measurementSchemaArray));
69-
tablet =
70-
new Tablet(
71-
deviceId,
72-
measurementSchemaList,
73-
PipeConfig.getInstance().getPipeDataStructureTabletRowSize());
74-
isAligned = pipeRow.isAligned();
70+
// Calculate row count and memory size of the tablet based on the first row
71+
Pair<Integer, Integer> rowCountAndMemorySize =
72+
PipeMemoryWeightUtil.calculateTabletRowCountAndMemory(pipeRow);
73+
tablet = new Tablet(deviceId, measurementSchemaList, rowCountAndMemorySize.getLeft());
7574
tablet.initBitMaps();
75+
isAligned = pipeRow.isAligned();
7676
}
7777

7878
final int rowIndex = tablet.rowSize;

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
2727
import org.apache.iotdb.commons.utils.TestOnly;
2828
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
29+
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil;
2930
import org.apache.iotdb.db.pipe.resource.memory.PipeTabletMemoryBlock;
3031
import org.apache.iotdb.pipe.api.access.Row;
3132
import org.apache.iotdb.pipe.api.collector.RowCollector;
@@ -110,7 +111,10 @@ public PipeRawTabletInsertionEvent(
110111

111112
@Override
112113
public boolean internallyIncreaseResourceReferenceCount(final String holderMessage) {
113-
allocatedMemoryBlock = PipeDataNodeResourceManager.memory().forceAllocateWithRetry(tablet);
114+
allocatedMemoryBlock =
115+
PipeDataNodeResourceManager.memory()
116+
.forceAllocateForTabletWithRetry(
117+
PipeMemoryWeightUtil.calculateTabletSizeInBytes(tablet));
114118
return true;
115119
}
116120

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/TsFileInsertionDataContainer.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
2323
import org.apache.iotdb.commons.pipe.pattern.PipePattern;
2424
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
25+
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
26+
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
2527
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
2628

2729
import org.apache.tsfile.read.TsFileSequenceReader;
@@ -42,6 +44,8 @@ public abstract class TsFileInsertionDataContainer implements AutoCloseable {
4244
protected final PipeTaskMeta pipeTaskMeta; // used to report progress
4345
protected final EnrichedEvent sourceEvent; // used to report progress
4446

47+
protected final PipeMemoryBlock allocatedMemoryBlockForTablet;
48+
4549
protected TsFileSequenceReader tsFileSequenceReader;
4650

4751
protected TsFileInsertionDataContainer(
@@ -58,6 +62,10 @@ protected TsFileInsertionDataContainer(
5862

5963
this.pipeTaskMeta = pipeTaskMeta;
6064
this.sourceEvent = sourceEvent;
65+
66+
// Allocate empty memory block, will be resized later.
67+
this.allocatedMemoryBlockForTablet =
68+
PipeDataNodeResourceManager.memory().forceAllocateForTabletWithRetry(0);
6169
}
6270

6371
/**
@@ -74,5 +82,9 @@ public void close() {
7482
} catch (final IOException e) {
7583
LOGGER.warn("Failed to close TsFileSequenceReader", e);
7684
}
85+
86+
if (allocatedMemoryBlockForTablet != null) {
87+
allocatedMemoryBlockForTablet.close();
88+
}
7789
}
7890
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/query/TsFileInsertionQueryDataContainer.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -280,7 +280,8 @@ public boolean hasNext() {
280280
measurementDataTypeMap,
281281
entry.getKey(),
282282
entry.getValue(),
283-
timeFilterExpression);
283+
timeFilterExpression,
284+
allocatedMemoryBlockForTablet);
284285
} catch (final Exception e) {
285286
close();
286287
throw new PipeException("failed to create TsFileInsertionDataTabletIterator", e);

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/query/TsFileInsertionQueryDataTabletIterator.java

Lines changed: 37 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@
1919

2020
package org.apache.iotdb.db.pipe.event.common.tsfile.container.query;
2121

22-
import org.apache.iotdb.commons.pipe.config.PipeConfig;
22+
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
23+
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
24+
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil;
2325
import org.apache.iotdb.pipe.api.exception.PipeException;
2426

2527
import org.apache.tsfile.common.constant.TsFileConstant;
@@ -32,6 +34,7 @@
3234
import org.apache.tsfile.read.expression.IExpression;
3335
import org.apache.tsfile.read.expression.QueryExpression;
3436
import org.apache.tsfile.read.query.dataset.QueryDataSet;
37+
import org.apache.tsfile.utils.Pair;
3538
import org.apache.tsfile.write.record.Tablet;
3639
import org.apache.tsfile.write.schema.IMeasurementSchema;
3740
import org.apache.tsfile.write.schema.MeasurementSchema;
@@ -42,6 +45,7 @@
4245
import java.util.List;
4346
import java.util.Map;
4447
import java.util.NoSuchElementException;
48+
import java.util.Objects;
4549
import java.util.stream.Collectors;
4650

4751
public class TsFileInsertionQueryDataTabletIterator implements Iterator<Tablet> {
@@ -56,12 +60,15 @@ public class TsFileInsertionQueryDataTabletIterator implements Iterator<Tablet>
5660

5761
private final QueryDataSet queryDataSet;
5862

63+
private final PipeMemoryBlock allocatedBlockForTablet;
64+
5965
TsFileInsertionQueryDataTabletIterator(
6066
final TsFileReader tsFileReader,
6167
final Map<String, TSDataType> measurementDataTypeMap,
6268
final IDeviceID deviceId,
6369
final List<String> measurements,
64-
final IExpression timeFilterExpression)
70+
final IExpression timeFilterExpression,
71+
final PipeMemoryBlock allocatedBlockForTablet)
6572
throws IOException {
6673
this.tsFileReader = tsFileReader;
6774
this.measurementDataTypeMap = measurementDataTypeMap;
@@ -79,6 +86,8 @@ public class TsFileInsertionQueryDataTabletIterator implements Iterator<Tablet>
7986
this.timeFilterExpression = timeFilterExpression;
8087

8188
this.queryDataSet = buildQueryDataSet();
89+
90+
this.allocatedBlockForTablet = Objects.requireNonNull(allocatedBlockForTablet);
8291
}
8392

8493
private QueryDataSet buildQueryDataSet() throws IOException {
@@ -118,16 +127,35 @@ private Tablet buildNextTablet() throws IOException {
118127
measurementDataTypeMap.get(deviceId + TsFileConstant.PATH_SEPARATOR + measurement);
119128
schemas.add(new MeasurementSchema(measurement, dataType));
120129
}
121-
final Tablet tablet =
122-
new Tablet(
123-
// Used for tree model
124-
deviceId.toString(),
125-
schemas,
126-
PipeConfig.getInstance().getPipeDataStructureTabletRowSize());
127-
tablet.initBitMaps();
128130

131+
Tablet tablet = null;
132+
if (!queryDataSet.hasNext()) {
133+
tablet =
134+
new Tablet(
135+
// Used for tree model
136+
deviceId.toString(), schemas, 1);
137+
tablet.initBitMaps();
138+
// Ignore the memory cost of tablet
139+
PipeDataNodeResourceManager.memory().forceResize(allocatedBlockForTablet, 0);
140+
return tablet;
141+
}
142+
143+
boolean isFirstRow = true;
129144
while (queryDataSet.hasNext()) {
130145
final RowRecord rowRecord = queryDataSet.next();
146+
if (isFirstRow) {
147+
// Calculate row count and memory size of the tablet based on the first row
148+
Pair<Integer, Integer> rowCountAndMemorySize =
149+
PipeMemoryWeightUtil.calculateTabletRowCountAndMemory(rowRecord);
150+
tablet =
151+
new Tablet(
152+
// Used for tree model
153+
deviceId.toString(), schemas, rowCountAndMemorySize.getLeft());
154+
tablet.initBitMaps();
155+
PipeDataNodeResourceManager.memory()
156+
.forceResize(allocatedBlockForTablet, rowCountAndMemorySize.getRight());
157+
isFirstRow = false;
158+
}
131159

132160
final int rowIndex = tablet.rowSize;
133161

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java

Lines changed: 34 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,13 @@
1919

2020
package org.apache.iotdb.db.pipe.event.common.tsfile.container.scan;
2121

22-
import org.apache.iotdb.commons.pipe.config.PipeConfig;
2322
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
2423
import org.apache.iotdb.commons.pipe.pattern.PipePattern;
2524
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
2625
import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
2726
import org.apache.iotdb.db.pipe.event.common.tsfile.container.TsFileInsertionDataContainer;
27+
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
28+
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil;
2829
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
2930
import org.apache.iotdb.pipe.api.exception.PipeException;
3031

@@ -44,6 +45,7 @@
4445
import org.apache.tsfile.utils.Binary;
4546
import org.apache.tsfile.utils.BitMap;
4647
import org.apache.tsfile.utils.DateUtils;
48+
import org.apache.tsfile.utils.Pair;
4749
import org.apache.tsfile.utils.TsPrimitiveType;
4850
import org.apache.tsfile.write.UnSupportedDataTypeException;
4951
import org.apache.tsfile.write.record.Tablet;
@@ -175,15 +177,32 @@ public Tablet next() {
175177

176178
private Tablet getNextTablet() {
177179
try {
178-
final Tablet tablet =
179-
new Tablet(
180-
currentDevice.toString(),
181-
currentMeasurements,
182-
PipeConfig.getInstance().getPipeDataStructureTabletRowSize());
183-
tablet.initBitMaps();
180+
Tablet tablet = null;
184181

182+
if (!data.hasCurrent()) {
183+
tablet = new Tablet(currentDevice.toString(), currentMeasurements, 1);
184+
tablet.initBitMaps();
185+
// Ignore the memory cost of tablet
186+
PipeDataNodeResourceManager.memory().forceResize(allocatedMemoryBlockForTablet, 0);
187+
return tablet;
188+
}
189+
190+
boolean isFirstRow = true;
185191
while (data.hasCurrent()) {
186192
if (isMultiPage || data.currentTime() >= startTime && data.currentTime() <= endTime) {
193+
if (isFirstRow) {
194+
// Calculate row count and memory size of the tablet based on the first row
195+
Pair<Integer, Integer> rowCountAndMemorySize =
196+
PipeMemoryWeightUtil.calculateTabletRowCountAndMemory(data);
197+
tablet =
198+
new Tablet(
199+
currentDevice.toString(), currentMeasurements, rowCountAndMemorySize.getLeft());
200+
tablet.initBitMaps();
201+
PipeDataNodeResourceManager.memory()
202+
.forceResize(allocatedMemoryBlockForTablet, rowCountAndMemorySize.getRight());
203+
isFirstRow = false;
204+
}
205+
187206
final int rowIndex = tablet.rowSize;
188207

189208
tablet.addTimestamp(rowIndex, data.currentTime());
@@ -197,16 +216,22 @@ private Tablet getNextTablet() {
197216
data = chunkReader.nextPageData();
198217
}
199218

200-
if (tablet.rowSize == tablet.getMaxRowNumber()) {
219+
if (tablet != null && tablet.rowSize == tablet.getMaxRowNumber()) {
201220
break;
202221
}
203222
}
204223

224+
if (tablet == null) {
225+
tablet = new Tablet(currentDevice.toString(), currentMeasurements, 1);
226+
tablet.initBitMaps();
227+
// Ignore the memory cost of tablet
228+
PipeDataNodeResourceManager.memory().forceResize(allocatedMemoryBlockForTablet, 0);
229+
}
230+
205231
// Switch chunk reader iff current chunk is all consumed
206232
if (!data.hasCurrent()) {
207233
prepareData();
208234
}
209-
210235
return tablet;
211236
} catch (final Exception e) {
212237
close();

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java

Lines changed: 55 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import org.apache.iotdb.db.conf.IoTDBDescriptor;
2525
import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
2626

27-
import org.apache.tsfile.write.record.Tablet;
2827
import org.slf4j.Logger;
2928
import org.slf4j.LoggerFactory;
3029

@@ -75,7 +74,7 @@ public synchronized PipeMemoryBlock forceAllocate(long sizeInBytes)
7574
return forceAllocate(sizeInBytes, false);
7675
}
7776

78-
public PipeTabletMemoryBlock forceAllocateWithRetry(Tablet tablet)
77+
public PipeTabletMemoryBlock forceAllocateForTabletWithRetry(long tabletSizeInBytes)
7978
throws PipeRuntimeOutOfMemoryCriticalException {
8079
if (!PIPE_MEMORY_MANAGEMENT_ENABLED) {
8180
// No need to calculate the tablet size, skip it to save time
@@ -107,8 +106,7 @@ public PipeTabletMemoryBlock forceAllocateWithRetry(Tablet tablet)
107106

108107
synchronized (this) {
109108
final PipeTabletMemoryBlock block =
110-
(PipeTabletMemoryBlock)
111-
forceAllocate(PipeMemoryWeightUtil.calculateTabletSizeInBytes(tablet), true);
109+
(PipeTabletMemoryBlock) forceAllocate(tabletSizeInBytes, true);
112110
usedMemorySizeInBytesOfTablets += block.getMemoryUsageInBytes();
113111
return block;
114112
}
@@ -147,6 +145,59 @@ private PipeMemoryBlock forceAllocate(long sizeInBytes, boolean isForTablet)
147145
sizeInBytes));
148146
}
149147

148+
public synchronized void forceResize(PipeMemoryBlock block, long targetSize) {
149+
if (block == null || block.isReleased()) {
150+
LOGGER.warn("forceResize: cannot resize a null or released memory block");
151+
return;
152+
}
153+
154+
if (!PIPE_MEMORY_MANAGEMENT_ENABLED) {
155+
block.setMemoryUsageInBytes(targetSize);
156+
return;
157+
}
158+
159+
final long oldSize = block.getMemoryUsageInBytes();
160+
161+
if (oldSize >= targetSize) {
162+
usedMemorySizeInBytes -= oldSize - targetSize;
163+
if (block instanceof PipeTabletMemoryBlock) {
164+
usedMemorySizeInBytesOfTablets -= oldSize - targetSize;
165+
}
166+
block.setMemoryUsageInBytes(targetSize);
167+
return;
168+
}
169+
170+
long sizeInBytes = targetSize - oldSize;
171+
for (int i = 1; i <= MEMORY_ALLOCATE_MAX_RETRIES; i++) {
172+
if (TOTAL_MEMORY_SIZE_IN_BYTES - usedMemorySizeInBytes >= sizeInBytes) {
173+
usedMemorySizeInBytes += sizeInBytes;
174+
if (block instanceof PipeTabletMemoryBlock) {
175+
usedMemorySizeInBytesOfTablets += sizeInBytes;
176+
}
177+
block.setMemoryUsageInBytes(targetSize);
178+
return;
179+
}
180+
181+
try {
182+
tryShrink4Allocate(sizeInBytes);
183+
this.wait(MEMORY_ALLOCATE_RETRY_INTERVAL_IN_MS);
184+
} catch (InterruptedException e) {
185+
Thread.currentThread().interrupt();
186+
LOGGER.warn("forceResize: interrupted while waiting for available memory", e);
187+
}
188+
}
189+
190+
throw new PipeRuntimeOutOfMemoryCriticalException(
191+
String.format(
192+
"forceResize: failed to allocate memory after %d retries, "
193+
+ "total memory size %d bytes, used memory size %d bytes, "
194+
+ "requested memory size %d bytes",
195+
MEMORY_ALLOCATE_MAX_RETRIES,
196+
TOTAL_MEMORY_SIZE_IN_BYTES,
197+
usedMemorySizeInBytes,
198+
sizeInBytes));
199+
}
200+
150201
/**
151202
* Allocate a {@link PipeMemoryBlock} for pipe only if memory already used is less than the
152203
* specified threshold.

0 commit comments

Comments
 (0)