Skip to content

Commit 7ad757f

Browse files
Pipe: Add retry when TsFile parsing failed to avoid race among processor threads (#15624)
1 parent 293d6eb commit 7ad757f

File tree

1 file changed

+27
-2
lines changed

1 file changed

+27
-2
lines changed

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package org.apache.iotdb.db.pipe.agent.task.connection;
2121

22+
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeOutOfMemoryCriticalException;
2223
import org.apache.iotdb.commons.pipe.agent.task.connection.UnboundedBlockingPendingQueue;
2324
import org.apache.iotdb.commons.pipe.agent.task.progress.PipeEventCommitManager;
2425
import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBTreePattern;
@@ -41,6 +42,7 @@
4142
import org.slf4j.Logger;
4243
import org.slf4j.LoggerFactory;
4344

45+
import java.util.Iterator;
4446
import java.util.concurrent.atomic.AtomicInteger;
4547

4648
public class PipeEventCollector implements EventCollector {
@@ -142,8 +144,31 @@ private void parseAndCollectEvent(final PipeTsFileInsertionEvent sourceEvent) th
142144
}
143145

144146
try {
145-
for (final TabletInsertionEvent parsedEvent : sourceEvent.toTabletInsertionEvents()) {
146-
collectParsedRawTableEvent((PipeRawTabletInsertionEvent) parsedEvent);
147+
final Iterable<TabletInsertionEvent> iterable = sourceEvent.toTabletInsertionEvents();
148+
final Iterator<TabletInsertionEvent> iterator = iterable.iterator();
149+
while (iterator.hasNext()) {
150+
final TabletInsertionEvent parsedEvent = iterator.next();
151+
int retryCount = 0;
152+
while (true) {
153+
try {
154+
collectParsedRawTableEvent((PipeRawTabletInsertionEvent) parsedEvent);
155+
break;
156+
} catch (final PipeRuntimeOutOfMemoryCriticalException e) {
157+
if (retryCount++ % 100 == 0) {
158+
LOGGER.warn(
159+
"parseAndCollectEvent: failed to allocate memory for parsing TsFile {}, retry count is {}, will keep retrying.",
160+
sourceEvent.getTsFile(),
161+
retryCount,
162+
e);
163+
} else if (LOGGER.isDebugEnabled()) {
164+
LOGGER.debug(
165+
"parseAndCollectEvent: failed to allocate memory for parsing TsFile {}, retry count is {}, will keep retrying.",
166+
sourceEvent.getTsFile(),
167+
retryCount,
168+
e);
169+
}
170+
}
171+
}
147172
}
148173
} finally {
149174
sourceEvent.close();

0 commit comments

Comments
 (0)