Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/content/maintenance/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,12 @@ When using Flink to read and write, Paimon has implemented some key standard Fli
<td>Gauge</td>
<td>Time difference between reading the data file and file creation.</td>
</tr>
<tr>
<td>sourceParallelismUpperBound</td>
<td>Flink Source Enumerator</td>
<td>Gauge</td>
<td>Recommended upper bound of parallelism for auto-scaling systems. Note: This is a recommendation, not a hard limit.</td>
</tr>
</tbody>
</table>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,14 @@ public class ContinuousFileSplitEnumerator

private final int maxSnapshotCount;

private final int sourceParallelismUpperBound;

/**
* Metric name for source scaling max parallelism. This metric provides a recommended upper
* bound of parallelism for auto-scaling systems.
*/
public static final String SOURCE_PARALLELISM_UPPER_BOUND = "sourceParallelismUpperBound";

public ContinuousFileSplitEnumerator(
SplitEnumeratorContext<FileStoreSourceSplit> context,
Collection<FileStoreSourceSplit> remainSplits,
Expand All @@ -101,7 +109,8 @@ public ContinuousFileSplitEnumerator(
boolean unordered,
int splitMaxPerTask,
boolean shuffleBucketWithPartition,
int maxSnapshotCount) {
int maxSnapshotCount,
int sourceParallelismUpperBound) {
checkArgument(discoveryInterval > 0L);
this.context = checkNotNull(context);
this.nextSnapshotId = nextSnapshotId;
Expand All @@ -118,6 +127,7 @@ public ContinuousFileSplitEnumerator(
this.consumerProgressCalculator =
new ConsumerProgressCalculator(context.currentParallelism());
this.maxSnapshotCount = maxSnapshotCount;
this.sourceParallelismUpperBound = sourceParallelismUpperBound;
}

@VisibleForTesting
Expand All @@ -135,10 +145,20 @@ private void addSplit(FileStoreSourceSplit split) {

@Override
public void start() {
registerMetrics();
context.callAsync(
this::scanNextSnapshot, this::processDiscoveredSplits, 0, discoveryInterval);
}

private void registerMetrics() {
try {
context.metricGroup()
.gauge(SOURCE_PARALLELISM_UPPER_BOUND, () -> sourceParallelismUpperBound);
} catch (Exception e) {
LOG.warn("Failed to register enumerator metrics.", e);
}
}

@Override
public void close() throws IOException {
// no resources to close
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ public class ContinuousFileStoreSource extends FlinkSource {
protected final Map<String, String> options;
protected final boolean unordered;

/** refer to org.apache.flink.configuration.PipelineOptions.MAX_PARALLELISM. */
protected static final int MAX_PARALLELISM_OF_SOURCE = 32768;

public ContinuousFileStoreSource(
ReadBuilder readBuilder, Map<String, String> options, @Nullable Long limit) {
this(readBuilder, options, limit, false, null);
Expand Down Expand Up @@ -108,6 +111,9 @@ protected SplitEnumerator<FileStoreSourceSplit, PendingSplitsCheckpoint> buildEn
@Nullable Long nextSnapshotId,
StreamTableScan scan) {
Options options = Options.fromMap(this.options);
int bucketNum = options.get(CoreOptions.BUCKET);
int sourceParallelismUpperBound = bucketNum < 0 ? MAX_PARALLELISM_OF_SOURCE : bucketNum;

return new ContinuousFileSplitEnumerator(
context,
splits,
Expand All @@ -117,6 +123,7 @@ protected SplitEnumerator<FileStoreSourceSplit, PendingSplitsCheckpoint> buildEn
unordered,
options.get(CoreOptions.SCAN_MAX_SPLITS_PER_TASK),
options.get(FlinkConnectorOptions.READ_SHUFFLE_BUCKET_WITH_PARTITION),
options.get(FlinkConnectorOptions.SCAN_MAX_SNAPSHOT_COUNT));
options.get(FlinkConnectorOptions.SCAN_MAX_SNAPSHOT_COUNT),
sourceParallelismUpperBound);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,8 @@ public AlignedContinuousFileSplitEnumerator(
long alignTimeout,
int splitPerTaskMax,
boolean shuffleBucketWithPartition,
int maxSnapshotCount) {
int maxSnapshotCount,
int sourceParallelismUpperBound) {
super(
context,
remainSplits,
Expand All @@ -105,7 +106,8 @@ public AlignedContinuousFileSplitEnumerator(
unawareBucket,
splitPerTaskMax,
shuffleBucketWithPartition,
maxSnapshotCount);
maxSnapshotCount,
sourceParallelismUpperBound);
this.pendingPlans = new ArrayBlockingQueue<>(MAX_PENDING_PLAN);
this.alignedAssigner = (AlignedSplitAssigner) super.splitAssigner;
this.nextSnapshotId = nextSnapshotId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@ protected SplitEnumerator<FileStoreSourceSplit, PendingSplitsCheckpoint> buildEn
@Nullable Long nextSnapshotId,
StreamTableScan scan) {
Options options = Options.fromMap(this.options);
int bucketNum = options.get(CoreOptions.BUCKET);
int sourceParallelismUpperBound = bucketNum < 0 ? MAX_PARALLELISM_OF_SOURCE : bucketNum;

return new AlignedContinuousFileSplitEnumerator(
context,
splits,
Expand All @@ -92,6 +95,7 @@ protected SplitEnumerator<FileStoreSourceSplit, PendingSplitsCheckpoint> buildEn
options.get(FlinkConnectorOptions.SOURCE_CHECKPOINT_ALIGN_TIMEOUT).toMillis(),
options.get(CoreOptions.SCAN_MAX_SPLITS_PER_TASK),
options.get(FlinkConnectorOptions.READ_SHUFFLE_BUCKET_WITH_PARTITION),
options.get(FlinkConnectorOptions.SCAN_MAX_SNAPSHOT_COUNT));
options.get(FlinkConnectorOptions.SCAN_MAX_SNAPSHOT_COUNT),
sourceParallelismUpperBound);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -880,6 +880,7 @@ private static class Builder {
private int maxSnapshotCount = -1;

private int splitMaxPerTask = 10;
private int sourceParallelismUpperBound = 10;

public Builder setSplitEnumeratorContext(
SplitEnumeratorContext<FileStoreSourceSplit> context) {
Expand Down Expand Up @@ -927,7 +928,8 @@ public ContinuousFileSplitEnumerator build() {
unawareBucket,
this.splitMaxPerTask,
false,
maxSnapshotCount);
maxSnapshotCount,
sourceParallelismUpperBound);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,32 +54,48 @@
/** Tests for file store sources with metrics. */
public class FileStoreSourceMetricsTest {
private FileStoreTable table;
private FileStoreTable fixBucketTable;
private TestingSplitEnumeratorContextWithRegisteringGroup context;
private MetricGroup scanMetricGroup;
private MetricGroup enumeratorMetricGroup;

@BeforeEach
public void before(@TempDir java.nio.file.Path path) throws Exception {
FileIO fileIO = LocalFileIO.create();
Path tablePath = new Path(path.toString());
Path fixBucketTablePath = new Path(path.toString(), "fix_bucket");
SchemaManager schemaManager = new SchemaManager(fileIO, tablePath);
SchemaManager fixBucketSchemaManager = new SchemaManager(fileIO, fixBucketTablePath);
TableSchema tableSchema =
schemaManager.createTable(
Schema.newBuilder()
.column("a", DataTypes.INT())
.column("b", DataTypes.BIGINT())
.build());
TableSchema fixBucketTableSchema =
fixBucketSchemaManager.createTable(
Schema.newBuilder()
.column("a", DataTypes.INT())
.column("b", DataTypes.BIGINT())
.primaryKey("a")
.option("bucket", "2")
.option("bucket-key", "a")
.build());
table = FileStoreTableFactory.create(fileIO, tablePath, tableSchema);
fixBucketTable =
FileStoreTableFactory.create(fileIO, fixBucketTablePath, fixBucketTableSchema);
context = new TestingSplitEnumeratorContextWithRegisteringGroup(1);
scanMetricGroup =
context.metricGroup()
.addGroup("paimon")
.addGroup("table", table.name())
.addGroup("scan");
enumeratorMetricGroup = context.metricGroup();
}

@Test
public void staticFileStoreSourceScanMetricsTest() throws Exception {
writeOnce();
writeOnce(table);
StaticFileStoreSource staticFileStoreSource =
new StaticFileStoreSource(
table.newReadBuilder(),
Expand All @@ -98,7 +114,7 @@ public void staticFileStoreSourceScanMetricsTest() throws Exception {

@Test
public void continuousFileStoreSourceScanMetricsTest() throws Exception {
writeOnce();
writeOnce(table);
ContinuousFileStoreSource continuousFileStoreSource =
new ContinuousFileStoreSource(table.newReadBuilder(), table.options(), null);
ContinuousFileSplitEnumerator enumerator =
Expand All @@ -114,7 +130,7 @@ public void continuousFileStoreSourceScanMetricsTest() throws Exception {
.getValue())
.isEqualTo(1L);

writeAgain();
writeAgain(table);
enumerator.scanNextSnapshot();
assertThat(TestingMetricUtils.getHistogram(scanMetricGroup, "scanDuration").getCount())
.isEqualTo(2);
Expand All @@ -126,7 +142,50 @@ public void continuousFileStoreSourceScanMetricsTest() throws Exception {
.isEqualTo(1L);
}

private void writeOnce() throws Exception {
@Test
public void continuousFileStoreFixBucketEnumeratorMetricsTest() throws Exception {
writeOnce(fixBucketTable);

ContinuousFileStoreSource continuousFileStoreSource =
new ContinuousFileStoreSource(
fixBucketTable.newReadBuilder(), fixBucketTable.options(), null);
ContinuousFileSplitEnumerator enumerator =
(ContinuousFileSplitEnumerator)
continuousFileStoreSource.restoreEnumerator(context, null);
enumerator.start();

// equal bucketNum when bucket > 0
assertThat(
TestingMetricUtils.getGauge(
enumeratorMetricGroup,
ContinuousFileSplitEnumerator
.SOURCE_PARALLELISM_UPPER_BOUND)
.getValue())
.isEqualTo(2);
}

@Test
public void continuousFileStoreDynBucketEnumeratorMetricsTest() throws Exception {
writeOnce(table);

ContinuousFileStoreSource continuousFileStoreSource =
new ContinuousFileStoreSource(table.newReadBuilder(), table.options(), null);
ContinuousFileSplitEnumerator enumerator =
(ContinuousFileSplitEnumerator)
continuousFileStoreSource.restoreEnumerator(context, null);
enumerator.start();

// equal parallelism when bucket < 0
assertThat(
TestingMetricUtils.getGauge(
enumeratorMetricGroup,
ContinuousFileSplitEnumerator
.SOURCE_PARALLELISM_UPPER_BOUND)
.getValue())
.isEqualTo(ContinuousFileStoreSource.MAX_PARALLELISM_OF_SOURCE);
}

private void writeOnce(FileStoreTable table) throws Exception {
InnerTableWrite writer = table.newWrite("test");
TableCommitImpl commit = table.newCommit("test");
writer.write(GenericRow.of(1, 2L));
Expand All @@ -140,7 +199,7 @@ private void writeOnce() throws Exception {
writer.close();
}

private void writeAgain() throws Exception {
private void writeAgain(FileStoreTable table) throws Exception {
InnerTableWrite writer = table.newWrite("test");
TableCommitImpl commit = table.newCommit("test");
writer.write(GenericRow.of(10, 2L));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,8 @@ public AlignedContinuousFileSplitEnumerator build() {
timeout,
10,
false,
-1);
-1,
10);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.flink.metrics.Metric;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
import org.apache.flink.runtime.metrics.groups.ProxyMetricGroup;

import java.lang.reflect.Field;
import java.util.Map;
Expand All @@ -46,6 +47,14 @@ public static Histogram getHistogram(MetricGroup group, String metricName) {
@SuppressWarnings("unchecked")
private static Metric getMetric(MetricGroup group, String metricName) {
try {
// Handle ProxyMetricGroup wrapper class
if (ProxyMetricGroup.class.isAssignableFrom(group.getClass())) {
Field parentField =
group.getClass().getSuperclass().getDeclaredField("parentMetricGroup");
parentField.setAccessible(true);
group = (MetricGroup) parentField.get(group);
}

Field field = AbstractMetricGroup.class.getDeclaredField("metrics");
field.setAccessible(true);
return ((Map<String, Metric>) field.get(group)).get(metricName);
Expand Down