Skip to content

Commit

Permalink
[flink] Limit max split while continuously scanning to avoid JobManag…
Browse files Browse the repository at this point in the history
…er OOM (apache#2373)
  • Loading branch information
leaves12138 authored Nov 24, 2023
1 parent 3d6a703 commit 737fb19
Show file tree
Hide file tree
Showing 8 changed files with 112 additions and 23 deletions.
6 changes: 6 additions & 0 deletions docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,12 @@
<td>Integer</td>
<td>The parallelism of scanning manifest files, default value is the size of cpu processor. Note: Scale-up this parameter will increase memory usage while scanning manifest files. We can consider downsize it when we encounter an out of memory exception while scanning</td>
</tr>
<tr>
<td><h5>scan.max-splits-per-task</h5></td>
<td style="word-wrap: break-word;">10</td>
<td>Integer</td>
<td>Max split size should be cached for one task while scanning. If splits size cached in enumerator are greater than tasks size multiply by this value, scanner will pause scanning.</td>
</tr>
<tr>
<td><h5>scan.mode</h5></td>
<td style="word-wrap: break-word;">default</td>
Expand Down
12 changes: 12 additions & 0 deletions paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,14 @@ public class CoreOptions implements Serializable {
.defaultValue(Duration.ofSeconds(10))
.withDescription("The discovery interval of continuous reading.");

public static final ConfigOption<Integer> SCAN_MAX_SPLITS_PER_TASK =
key("scan.max-splits-per-task")
.intType()
.defaultValue(10)
.withDescription(
"Max split size should be cached for one task while scanning. "
+ "If splits size cached in enumerator are greater than tasks size multiply by this value, scanner will pause scanning.");

@Immutable
public static final ConfigOption<MergeEngine> MERGE_ENGINE =
key("merge-engine")
Expand Down Expand Up @@ -1132,6 +1140,10 @@ public Duration continuousDiscoveryInterval() {
return options.get(CONTINUOUS_DISCOVERY_INTERVAL);
}

public int scanSplitMaxPerTask() {
return options.get(SCAN_MAX_SPLITS_PER_TASK);
}

public int localSortMaxNumFileHandles() {
return options.get(LOCAL_SORT_MAX_NUM_FILE_HANDLES);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;

import static org.apache.paimon.utils.Preconditions.checkArgument;
Expand All @@ -57,7 +58,6 @@ public class ContinuousFileSplitEnumerator
implements SplitEnumerator<FileStoreSourceSplit, PendingSplitsCheckpoint> {

private static final Logger LOG = LoggerFactory.getLogger(ContinuousFileSplitEnumerator.class);
private static final int SPLIT_MAX_NUM = 5_000;

protected final SplitEnumeratorContext<FileStoreSourceSplit> context;

Expand All @@ -73,6 +73,8 @@ public class ContinuousFileSplitEnumerator

protected final ConsumerProgressCalculator consumerProgressCalculator;

private final int splitMaxNum;

@Nullable protected Long nextSnapshotId;

protected boolean finished = false;
Expand All @@ -85,7 +87,8 @@ public ContinuousFileSplitEnumerator(
@Nullable Long nextSnapshotId,
long discoveryInterval,
StreamTableScan scan,
BucketMode bucketMode) {
BucketMode bucketMode,
int splitMaxPerTask) {
checkArgument(discoveryInterval > 0L);
this.context = checkNotNull(context);
this.nextSnapshotId = nextSnapshotId;
Expand All @@ -94,6 +97,7 @@ public ContinuousFileSplitEnumerator(
this.splitGenerator = new FileStoreSourceSplitGenerator();
this.scan = scan;
this.splitAssigner = createSplitAssigner(bucketMode);
this.splitMaxNum = context.currentParallelism() * splitMaxPerTask;
addSplits(remainSplits);

this.consumerProgressCalculator =
Expand Down Expand Up @@ -135,7 +139,7 @@ public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname
assignSplits();
// if current task assigned no split, we check conditions to scan one more time
if (readersAwaitingSplit.contains(subtaskId)) {
if (stopTriggerScan || splitAssigner.remainingSplits().size() >= SPLIT_MAX_NUM) {
if (stopTriggerScan) {
return;
}
stopTriggerScan = true;
Expand Down Expand Up @@ -185,17 +189,21 @@ public void notifyCheckpointComplete(long checkpointId) throws Exception {
// ------------------------------------------------------------------------

// this need to be synchronized because scan object is not thread safe. handleSplitRequest and
// context.callAsync will invoke this.
protected synchronized PlanWithNextSnapshotId scanNextSnapshot() {
// context.callAsync will invoke this. This method runs in workerExecutorThreadPool in
// parallelism.
protected synchronized Optional<PlanWithNextSnapshotId> scanNextSnapshot() {
if (splitAssigner.remainingSplits().size() >= splitMaxNum) {
return Optional.empty();
}
TableScan.Plan plan = scan.plan();
Long nextSnapshotId = scan.checkpoint();
return new PlanWithNextSnapshotId(plan, nextSnapshotId);
return Optional.of(new PlanWithNextSnapshotId(plan, nextSnapshotId));
}

// this mothod could not be synchronized, because it runs in coordinatorThread, which will make
// it serialize.
protected void processDiscoveredSplits(
PlanWithNextSnapshotId planWithNextSnapshotId, Throwable error) {
Optional<PlanWithNextSnapshotId> planWithNextSnapshotIdOptional, Throwable error) {
if (error != null) {
if (error instanceof EndOfScanException) {
// finished
Expand All @@ -208,6 +216,10 @@ protected void processDiscoveredSplits(
return;
}

if (!planWithNextSnapshotIdOptional.isPresent()) {
return;
}
PlanWithNextSnapshotId planWithNextSnapshotId = planWithNextSnapshotIdOptional.get();
nextSnapshotId = planWithNextSnapshotId.nextSnapshotId;
TableScan.Plan plan = planWithNextSnapshotId.plan;
if (plan.equals(SnapshotNotExistPlan.INSTANCE)) {
Expand Down Expand Up @@ -239,7 +251,7 @@ protected synchronized void assignSplits() {
continue;
}
List<FileStoreSourceSplit> splits = splitAssigner.getNext(task, null);
if (splits.size() > 0) {
if (!splits.isEmpty()) {
assignment.put(task, splits);
consumerProgressCalculator.updateAssignInformation(task, splits.get(0));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,14 @@ protected SplitEnumerator<FileStoreSourceSplit, PendingSplitsCheckpoint> buildEn
Collection<FileStoreSourceSplit> splits,
@Nullable Long nextSnapshotId,
StreamTableScan scan) {
CoreOptions coreOptions = CoreOptions.fromMap(options);
return new ContinuousFileSplitEnumerator(
context,
splits,
nextSnapshotId,
CoreOptions.fromMap(options).continuousDiscoveryInterval().toMillis(),
coreOptions.continuousDiscoveryInterval().toMillis(),
scan,
bucketMode);
bucketMode,
coreOptions.scanSplitMaxPerTask());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.TreeMap;
import java.util.concurrent.ArrayBlockingQueue;

Expand Down Expand Up @@ -92,8 +93,16 @@ public AlignedContinuousFileSplitEnumerator(
long discoveryInterval,
StreamTableScan scan,
BucketMode bucketMode,
long alignTimeout) {
super(context, remainSplits, nextSnapshotId, discoveryInterval, scan, bucketMode);
long alignTimeout,
int splitPerTaskMax) {
super(
context,
remainSplits,
nextSnapshotId,
discoveryInterval,
scan,
bucketMode,
splitPerTaskMax);
this.pendingPlans = new ArrayBlockingQueue<>(MAX_PENDING_PLAN);
this.alignedAssigner = (AlignedSplitAssigner) super.splitAssigner;
this.nextSnapshotId = nextSnapshotId;
Expand Down Expand Up @@ -193,21 +202,25 @@ public void notifyCheckpointComplete(long checkpointId) {
// ------------------------------------------------------------------------

@Override
protected PlanWithNextSnapshotId scanNextSnapshot() {
protected Optional<PlanWithNextSnapshotId> scanNextSnapshot() {
if (pendingPlans.remainingCapacity() > 0) {
PlanWithNextSnapshotId scannedPlan = super.scanNextSnapshot();
if (!(scannedPlan.plan() instanceof SnapshotNotExistPlan)) {
synchronized (lock) {
pendingPlans.add(scannedPlan);
lock.notifyAll();
Optional<PlanWithNextSnapshotId> scannedPlanOptional = super.scanNextSnapshot();
if (scannedPlanOptional.isPresent()) {
PlanWithNextSnapshotId scannedPlan = scannedPlanOptional.get();
if (!(scannedPlan.plan() instanceof SnapshotNotExistPlan)) {
synchronized (lock) {
pendingPlans.add(scannedPlan);
lock.notifyAll();
}
}
}
}
return null;
return Optional.empty();
}

@Override
protected void processDiscoveredSplits(PlanWithNextSnapshotId ignore, Throwable error) {
protected void processDiscoveredSplits(
Optional<PlanWithNextSnapshotId> ignore, Throwable error) {
if (error != null) {
if (error instanceof EndOfScanException) {
// finished
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ protected SplitEnumerator<FileStoreSourceSplit, PendingSplitsCheckpoint> buildEn
options.get(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL).toMillis(),
scan,
bucketMode,
options.get(FlinkConnectorOptions.SOURCE_CHECKPOINT_ALIGN_TIMEOUT).toMillis());
options.get(FlinkConnectorOptions.SOURCE_CHECKPOINT_ALIGN_TIMEOUT).toMillis(),
options.get(CoreOptions.SCAN_MAX_SPLITS_PER_TASK));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.flink.connector.testutils.source.reader.TestingSplitEnumeratorContext;
import org.apache.flink.core.testutils.ManuallyTriggeredScheduledExecutorService;
import org.apache.flink.runtime.source.coordinator.ExecutorNotifier;
import org.assertj.core.api.Assertions;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.Test;

Expand Down Expand Up @@ -750,6 +751,48 @@ public void testEnumeratorWithConsumer() throws Exception {
assertThat(scan.getNextSnapshotIdForConsumer()).isEqualTo(2L);
}

@Test
public void testEnumeratorSplitMax() throws Exception {
final TestingSplitEnumeratorContext<FileStoreSourceSplit> context =
getSplitEnumeratorContext(2);

TreeMap<Long, TableScan.Plan> results = new TreeMap<>();
StreamTableScan scan = new MockScan(results);
ContinuousFileSplitEnumerator enumerator =
new Builder()
.setSplitEnumeratorContext(context)
.setInitialSplits(Collections.emptyList())
.setDiscoveryInterval(1)
.setScan(scan)
.withBucketMode(BucketMode.UNAWARE)
.build();
enumerator.start();

long snapshot = 0;
List<DataSplit> splits = new ArrayList<>();
for (int i = 0; i < 16; i++) {
splits.add(createDataSplit(snapshot++, i, Collections.emptyList()));
}
results.put(1L, new DataFilePlan(splits));
context.triggerAllActions();

splits = new ArrayList<>();
for (int i = 0; i < 16; i++) {
splits.add(createDataSplit(snapshot++, i, Collections.emptyList()));
}
results.put(2L, new DataFilePlan(splits));
context.triggerAllActions();

splits = new ArrayList<>();
for (int i = 0; i < 16; i++) {
splits.add(createDataSplit(snapshot++, i, Collections.emptyList()));
}
results.put(3L, new DataFilePlan(splits));
context.triggerAllActions();

Assertions.assertThat(enumerator.splitAssigner.remainingSplits().size()).isEqualTo(16 * 2);
}

private void triggerCheckpointAndComplete(
ContinuousFileSplitEnumerator enumerator, long checkpointId) throws Exception {
enumerator.snapshotState(checkpointId);
Expand Down Expand Up @@ -825,7 +868,7 @@ public Builder withBucketMode(BucketMode bucketMode) {

public ContinuousFileSplitEnumerator build() {
return new ContinuousFileSplitEnumerator(
context, initialSplits, null, discoveryInterval, scan, bucketMode);
context, initialSplits, null, discoveryInterval, scan, bucketMode, 10);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ public Builder setAlignedTimeout(long timeout) {

public AlignedContinuousFileSplitEnumerator build() {
return new AlignedContinuousFileSplitEnumerator(
context, initialSplits, null, discoveryInterval, scan, bucketMode, timeout);
context, initialSplits, null, discoveryInterval, scan, bucketMode, timeout, 10);
}
}
}

0 comments on commit 737fb19

Please sign in to comment.