Skip to content

Commit be0b921

Browse files
Pipe: Add retry for tablet batch req to avoid retransmission when memory is insufficient (#15715)
1 parent 921d9e2 commit be0b921

File tree

1 file changed

+39
-24
lines changed

1 file changed

+39
-24
lines changed

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java

Lines changed: 39 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -473,10 +473,10 @@ private TPipeTransferResp handleTransferTabletBatch(final PipeTransferTabletBatc
473473
Stream.of(
474474
statementPair.getLeft().isEmpty()
475475
? RpcUtils.SUCCESS_STATUS
476-
: executeStatementAndAddRedirectInfo(statementPair.getLeft()),
476+
: executeBatchStatementAndAddRedirectInfo(statementPair.getLeft()),
477477
statementPair.getRight().isEmpty()
478478
? RpcUtils.SUCCESS_STATUS
479-
: executeStatementAndAddRedirectInfo(statementPair.getRight()))
479+
: executeBatchStatementAndAddRedirectInfo(statementPair.getRight()))
480480
.collect(Collectors.toList())));
481481
}
482482

@@ -486,7 +486,7 @@ private TPipeTransferResp handleTransferTabletBatchV2(final PipeTransferTabletBa
486486
PipeReceiverStatusHandler.getPriorStatus(
487487
(statementSet.isEmpty()
488488
? Stream.of(RpcUtils.SUCCESS_STATUS)
489-
: statementSet.stream().map(this::executeStatementAndAddRedirectInfo))
489+
: statementSet.stream().map(this::executeBatchStatementAndAddRedirectInfo))
490490
.collect(Collectors.toList())));
491491
}
492492

@@ -734,8 +734,8 @@ private TPipeTransferResp handleTransferSlice(final PipeTransferSliceReq pipeTra
734734
* request. So for each sub-status which needs to redirect, we record the device path using the
735735
* message field.
736736
*/
737-
private TSStatus executeStatementAndAddRedirectInfo(final InsertBaseStatement statement) {
738-
final TSStatus result = executeStatementAndClassifyExceptions(statement);
737+
private TSStatus executeBatchStatementAndAddRedirectInfo(final InsertBaseStatement statement) {
738+
final TSStatus result = executeStatementAndClassifyExceptions(statement, 5);
739739

740740
if (result.getCode() == TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()
741741
&& result.getSubStatusSize() > 0) {
@@ -771,15 +771,46 @@ private TSStatus executeStatementAndAddRedirectInfo(final InsertBaseStatement st
771771
}
772772

773773
private TSStatus executeStatementAndClassifyExceptions(final Statement statement) {
774+
return executeStatementAndClassifyExceptions(statement, 1);
775+
}
776+
777+
private TSStatus executeStatementAndClassifyExceptions(
778+
final Statement statement, final int tryCount) {
774779
long estimatedMemory = 0L;
775780
final double pipeReceiverActualToEstimatedMemoryRatio =
776781
PIPE_CONFIG.getPipeReceiverActualToEstimatedMemoryRatio();
777782
try {
778783
if (statement instanceof InsertBaseStatement) {
779784
estimatedMemory = ((InsertBaseStatement) statement).ramBytesUsed();
780-
allocatedMemoryBlock =
781-
PipeDataNodeResourceManager.memory()
782-
.forceAllocate((long) (estimatedMemory * pipeReceiverActualToEstimatedMemoryRatio));
785+
for (int i = 0; i < tryCount; ++i) {
786+
try {
787+
allocatedMemoryBlock =
788+
PipeDataNodeResourceManager.memory()
789+
.forceAllocate(
790+
(long) (estimatedMemory * pipeReceiverActualToEstimatedMemoryRatio));
791+
break;
792+
} catch (final PipeRuntimeOutOfMemoryCriticalException e) {
793+
if (i == tryCount - 1) {
794+
final String message =
795+
String.format(
796+
"Temporarily out of memory when executing statement %s, Requested memory: %s, "
797+
+ "used memory: %s, free memory: %s, total non-floating memory: %s",
798+
statement,
799+
estimatedMemory * pipeReceiverActualToEstimatedMemoryRatio,
800+
PipeDataNodeResourceManager.memory().getUsedMemorySizeInBytes(),
801+
PipeDataNodeResourceManager.memory().getFreeMemorySizeInBytes(),
802+
PipeDataNodeResourceManager.memory().getTotalNonFloatingMemorySizeInBytes());
803+
if (LOGGER.isDebugEnabled()) {
804+
LOGGER.debug("Receiver id = {}: {}", receiverId.get(), message, e);
805+
}
806+
return new TSStatus(
807+
TSStatusCode.PIPE_RECEIVER_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode())
808+
.setMessage(message);
809+
} else {
810+
Thread.sleep(100L * (i + 1));
811+
}
812+
}
813+
}
783814
}
784815

785816
final TSStatus result =
@@ -795,22 +826,6 @@ private TSStatus executeStatementAndClassifyExceptions(final Statement statement
795826
result);
796827
return statement.accept(STATEMENT_STATUS_VISITOR, result);
797828
}
798-
} catch (final PipeRuntimeOutOfMemoryCriticalException e) {
799-
final String message =
800-
String.format(
801-
"Temporarily out of memory when executing statement %s, Requested memory: %s, "
802-
+ "used memory: %s, free memory: %s, total non-floating memory: %s",
803-
statement,
804-
estimatedMemory * pipeReceiverActualToEstimatedMemoryRatio,
805-
PipeDataNodeResourceManager.memory().getUsedMemorySizeInBytes(),
806-
PipeDataNodeResourceManager.memory().getFreeMemorySizeInBytes(),
807-
PipeDataNodeResourceManager.memory().getTotalNonFloatingMemorySizeInBytes());
808-
if (LOGGER.isDebugEnabled()) {
809-
LOGGER.debug("Receiver id = {}: {}", receiverId.get(), message, e);
810-
}
811-
return new TSStatus(
812-
TSStatusCode.PIPE_RECEIVER_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode())
813-
.setMessage(message);
814829
} catch (final Exception e) {
815830
LOGGER.warn(
816831
"Receiver id = {}: Exception encountered while executing statement {}: ",

0 commit comments

Comments
 (0)