Skip to content

Commit

Permalink
Flink: Backport apache#9308 to v1.17 and the relevant parts to v1.16 (a…
Browse files Browse the repository at this point in the history
  • Loading branch information
pvary authored Jan 3, 2024
1 parent 38eb1b1 commit 8c7001e
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayDeque;
import java.util.Collections;
import java.util.List;
import java.util.Queue;
Expand All @@ -35,6 +34,7 @@
import org.apache.iceberg.flink.source.split.SerializableComparator;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Queues;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -60,9 +60,18 @@ class IcebergSourceSplitReader<T> implements SplitReader<RecordAndPosition<T>, I
this.openSplitFunction = openSplitFunction;
this.splitComparator = splitComparator;
this.indexOfSubtask = context.getIndexOfSubtask();
this.splits = new ArrayDeque<>();
this.splits = Queues.newArrayDeque();
}

/**
* The method reads a batch of records from the assigned splits. If all the records from the
* current split are returned then it will emit a {@link ArrayBatchRecords#finishedSplit(String)}
* batch to signal this event. In the next fetch loop the reader will continue with the next split
* (if any).
*
* @return The fetched records
* @throws IOException If there is an error during reading
*/
@Override
public RecordsWithSplitIds<RecordAndPosition<T>> fetch() throws IOException {
metrics.incrementSplitReaderFetchCalls(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ protected IcebergSource<RowData> source() {
.project(TestFixtures.TS_SCHEMA)
.splitSize(100L)
.streaming(true)
.monitorInterval(Duration.ofMillis(2))
.monitorInterval(Duration.ofMillis(10))
.streamingStartingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL)
.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayDeque;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Queue;
Expand All @@ -35,6 +35,7 @@
import org.apache.iceberg.flink.source.split.SerializableComparator;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Queues;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -60,9 +61,18 @@ class IcebergSourceSplitReader<T> implements SplitReader<RecordAndPosition<T>, I
this.openSplitFunction = openSplitFunction;
this.splitComparator = splitComparator;
this.indexOfSubtask = context.getIndexOfSubtask();
this.splits = new ArrayDeque<>();
this.splits = Queues.newArrayDeque();
}

/**
* The method reads a batch of records from the assigned splits. If all the records from the
* current split are returned then it will emit a {@link ArrayBatchRecords#finishedSplit(String)}
* batch to signal this event. In the next fetch loop the reader will continue with the next split
* (if any).
*
* @return The fetched records
* @throws IOException If there is an error during reading
*/
@Override
public RecordsWithSplitIds<RecordAndPosition<T>> fetch() throws IOException {
metrics.incrementSplitReaderFetchCalls(1);
Expand Down Expand Up @@ -123,6 +133,16 @@ public void close() throws Exception {
}
}

@Override
public void pauseOrResumeSplits(
Collection<IcebergSourceSplit> splitsToPause, Collection<IcebergSourceSplit> splitsToResume) {
// IcebergSourceSplitReader only reads splits sequentially. When waiting for watermark alignment
// the SourceOperator will stop processing and recycling the fetched batches. This exhausts the
// {@link ArrayPoolDataIteratorBatcher#pool} and the `currentReader.next()` call will be
// blocked even without split-level watermark alignment. Based on this the
// `pauseOrResumeSplits` and the `wakeUp` are left empty.
}

private long calculateBytes(IcebergSourceSplit split) {
return split.task().files().stream().map(FileScanTask::length).reduce(0L, Long::sum);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.runtime.metrics.MetricNames;
Expand Down Expand Up @@ -95,9 +94,7 @@ public class TestIcebergSourceWithWatermarkExtractor implements Serializable {
.setNumberTaskManagers(1)
.setNumberSlotsPerTaskManager(PARALLELISM)
.setRpcServiceSharing(RpcServiceSharing.DEDICATED)
.setConfiguration(
reporter.addToConfiguration(
new Configuration().set(PipelineOptions.ALLOW_UNALIGNED_SOURCE_SPLITS, true)))
.setConfiguration(reporter.addToConfiguration(new Configuration()))
.withHaLeadershipControl()
.build());

Expand Down Expand Up @@ -383,7 +380,7 @@ protected IcebergSource<RowData> source() {
.project(TestFixtures.TS_SCHEMA)
.splitSize(100L)
.streaming(true)
.monitorInterval(Duration.ofMillis(2))
.monitorInterval(Duration.ofMillis(10))
.streamingStartingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL)
.build();
}
Expand Down

0 comments on commit 8c7001e

Please sign in to comment.