Skip to content

Commit 71f765b

Browse files
Pipe: add timely consistency check for pipe memory control (#13354)
1 parent b0501ae commit 71f765b

File tree

1 file changed

+25
-2
lines changed

1 file changed

+25
-2
lines changed

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ public PipeMemoryManager() {
6666
PipeDataNodeAgent.runtime()
6767
.registerPeriodicalJob(
6868
"PipeMemoryManager#tryExpandAll()",
69-
this::tryExpandAll,
69+
this::tryExpandAllAndCheckConsistency,
7070
PipeConfig.getInstance().getPipeMemoryExpanderIntervalSeconds());
7171
}
7272

@@ -290,8 +290,31 @@ private boolean tryShrink4Allocate(long sizeInBytes) {
290290
}
291291
}
292292

293-
public synchronized void tryExpandAll() {
293+
public synchronized void tryExpandAllAndCheckConsistency() {
294294
allocatedBlocks.forEach(PipeMemoryBlock::expand);
295+
296+
long blockSum =
297+
allocatedBlocks.stream().mapToLong(PipeMemoryBlock::getMemoryUsageInBytes).sum();
298+
if (blockSum != usedMemorySizeInBytes) {
299+
LOGGER.warn(
300+
"tryExpandAllAndCheckConsistency: memory usage is not consistent with allocated blocks,"
301+
+ " usedMemorySizeInBytes is {} but sum of all blocks is {}",
302+
usedMemorySizeInBytes,
303+
blockSum);
304+
}
305+
306+
long tabletBlockSum =
307+
allocatedBlocks.stream()
308+
.filter(PipeTabletMemoryBlock.class::isInstance)
309+
.mapToLong(PipeMemoryBlock::getMemoryUsageInBytes)
310+
.sum();
311+
if (tabletBlockSum != usedMemorySizeInBytesOfTablets) {
312+
LOGGER.warn(
313+
"tryExpandAllAndCheckConsistency: memory usage of tablets is not consistent with allocated blocks,"
314+
+ " usedMemorySizeInBytesOfTablets is {} but sum of all tablet blocks is {}",
315+
usedMemorySizeInBytesOfTablets,
316+
tabletBlockSum);
317+
}
295318
}
296319

297320
public synchronized void release(PipeMemoryBlock block) {

0 commit comments

Comments
 (0)