Skip to content

Commit 438e986

Browse files
author
bosiew.tian
committed
[flink] Add sourceScalingMaxParallelism metric for auto-scaling systems
1 parent a61f3fb commit 438e986

File tree

10 files changed

+171
-15
lines changed

10 files changed

+171
-15
lines changed

docs/content/maintenance/metrics.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -391,6 +391,12 @@ When using Flink to read and write, Paimon has implemented some key standard Fli
391391
<td>Gauge</td>
392392
<td>Time difference between reading the data file and file creation.</td>
393393
</tr>
394+
<tr>
395+
<td>sourceScalingMaxParallelism</td>
396+
<td>Flink Source Enumerator</td>
397+
<td>Gauge</td>
398+
<td>Recommended upper bound of parallelism for auto-scaling systems. For fixed bucket tables, this equals the bucket number. For dynamic bucket tables (bucket = -1), this equals the current parallelism. Note: This is a recommendation, not a hard limit.</td>
399+
</tr>
394400
</tbody>
395401
</table>
396402

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.paimon.flink.source.assigners.FIFOSplitAssigner;
2323
import org.apache.paimon.flink.source.assigners.PreAssignSplitAssigner;
2424
import org.apache.paimon.flink.source.assigners.SplitAssigner;
25+
import org.apache.paimon.flink.source.metrics.FileStoreSourceEnumeratorMetrics;
2526
import org.apache.paimon.postpone.PostponeBucketFileStoreWrite;
2627
import org.apache.paimon.table.BucketMode;
2728
import org.apache.paimon.table.sink.ChannelComputer;
@@ -92,6 +93,9 @@ public class ContinuousFileSplitEnumerator
9293

9394
private final int maxSnapshotCount;
9495

96+
// Currently unused, serves as a placeholder for future metric extensions or updates.
97+
@Nullable private final FileStoreSourceEnumeratorMetrics enumeratorMetrics;
98+
9599
public ContinuousFileSplitEnumerator(
96100
SplitEnumeratorContext<FileStoreSourceSplit> context,
97101
Collection<FileStoreSourceSplit> remainSplits,
@@ -101,7 +105,8 @@ public ContinuousFileSplitEnumerator(
101105
boolean unordered,
102106
int splitMaxPerTask,
103107
boolean shuffleBucketWithPartition,
104-
int maxSnapshotCount) {
108+
int maxSnapshotCount,
109+
@Nullable FileStoreSourceEnumeratorMetrics enumeratorMetrics) {
105110
checkArgument(discoveryInterval > 0L);
106111
this.context = checkNotNull(context);
107112
this.nextSnapshotId = nextSnapshotId;
@@ -118,6 +123,7 @@ public ContinuousFileSplitEnumerator(
118123
this.consumerProgressCalculator =
119124
new ConsumerProgressCalculator(context.currentParallelism());
120125
this.maxSnapshotCount = maxSnapshotCount;
126+
this.enumeratorMetrics = enumeratorMetrics;
121127
}
122128

123129
@VisibleForTesting

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.paimon.flink.FlinkConnectorOptions;
2323
import org.apache.paimon.flink.NestedProjectedRowData;
2424
import org.apache.paimon.flink.metrics.FlinkMetricRegistry;
25+
import org.apache.paimon.flink.source.metrics.FileStoreSourceEnumeratorMetrics;
2526
import org.apache.paimon.options.Options;
2627
import org.apache.paimon.table.source.ReadBuilder;
2728
import org.apache.paimon.table.source.StreamDataTableScan;
@@ -84,12 +85,14 @@ public SplitEnumerator<FileStoreSourceSplit, PendingSplitsCheckpoint> restoreEnu
8485
splits = checkpoint.splits();
8586
}
8687
StreamTableScan scan = readBuilder.newStreamScan();
88+
FileStoreSourceEnumeratorMetrics enumeratorMetrics = null;
8789
if (metricGroup(context) != null) {
90+
enumeratorMetrics = new FileStoreSourceEnumeratorMetrics(context, options);
8891
((StreamDataTableScan) scan)
8992
.withMetricRegistry(new FlinkMetricRegistry(context.metricGroup()));
9093
}
9194
scan.restore(nextSnapshotId);
92-
return buildEnumerator(context, splits, nextSnapshotId, scan);
95+
return buildEnumerator(context, splits, nextSnapshotId, scan, enumeratorMetrics);
9396
}
9497

9598
@Nullable
@@ -106,7 +109,8 @@ protected SplitEnumerator<FileStoreSourceSplit, PendingSplitsCheckpoint> buildEn
106109
SplitEnumeratorContext<FileStoreSourceSplit> context,
107110
Collection<FileStoreSourceSplit> splits,
108111
@Nullable Long nextSnapshotId,
109-
StreamTableScan scan) {
112+
StreamTableScan scan,
113+
@Nullable FileStoreSourceEnumeratorMetrics enumeratorMetrics) {
110114
Options options = Options.fromMap(this.options);
111115
return new ContinuousFileSplitEnumerator(
112116
context,
@@ -117,6 +121,7 @@ protected SplitEnumerator<FileStoreSourceSplit, PendingSplitsCheckpoint> buildEn
117121
unordered,
118122
options.get(CoreOptions.SCAN_MAX_SPLITS_PER_TASK),
119123
options.get(FlinkConnectorOptions.READ_SHUFFLE_BUCKET_WITH_PARTITION),
120-
options.get(FlinkConnectorOptions.SCAN_MAX_SNAPSHOT_COUNT));
124+
options.get(FlinkConnectorOptions.SCAN_MAX_SNAPSHOT_COUNT),
125+
enumeratorMetrics);
121126
}
122127
}

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileSplitEnumerator.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.paimon.flink.source.PendingSplitsCheckpoint;
2424
import org.apache.paimon.flink.source.assigners.AlignedSplitAssigner;
2525
import org.apache.paimon.flink.source.assigners.SplitAssigner;
26+
import org.apache.paimon.flink.source.metrics.FileStoreSourceEnumeratorMetrics;
2627
import org.apache.paimon.table.source.DataSplit;
2728
import org.apache.paimon.table.source.EndOfScanException;
2829
import org.apache.paimon.table.source.SnapshotNotExistPlan;
@@ -95,7 +96,8 @@ public AlignedContinuousFileSplitEnumerator(
9596
long alignTimeout,
9697
int splitPerTaskMax,
9798
boolean shuffleBucketWithPartition,
98-
int maxSnapshotCount) {
99+
int maxSnapshotCount,
100+
@Nullable FileStoreSourceEnumeratorMetrics enumeratorMetrics) {
99101
super(
100102
context,
101103
remainSplits,
@@ -105,7 +107,8 @@ public AlignedContinuousFileSplitEnumerator(
105107
unawareBucket,
106108
splitPerTaskMax,
107109
shuffleBucketWithPartition,
108-
maxSnapshotCount);
110+
maxSnapshotCount,
111+
enumeratorMetrics);
109112
this.pendingPlans = new ArrayBlockingQueue<>(MAX_PENDING_PLAN);
110113
this.alignedAssigner = (AlignedSplitAssigner) super.splitAssigner;
111114
this.nextSnapshotId = nextSnapshotId;

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileStoreSource.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.paimon.flink.source.ContinuousFileStoreSource;
2626
import org.apache.paimon.flink.source.FileStoreSourceSplit;
2727
import org.apache.paimon.flink.source.PendingSplitsCheckpoint;
28+
import org.apache.paimon.flink.source.metrics.FileStoreSourceEnumeratorMetrics;
2829
import org.apache.paimon.flink.source.metrics.FileStoreSourceReaderMetrics;
2930
import org.apache.paimon.options.Options;
3031
import org.apache.paimon.table.source.ReadBuilder;
@@ -80,7 +81,8 @@ protected SplitEnumerator<FileStoreSourceSplit, PendingSplitsCheckpoint> buildEn
8081
SplitEnumeratorContext<FileStoreSourceSplit> context,
8182
Collection<FileStoreSourceSplit> splits,
8283
@Nullable Long nextSnapshotId,
83-
StreamTableScan scan) {
84+
StreamTableScan scan,
85+
@Nullable FileStoreSourceEnumeratorMetrics enumeratorMetrics) {
8486
Options options = Options.fromMap(this.options);
8587
return new AlignedContinuousFileSplitEnumerator(
8688
context,
@@ -92,6 +94,7 @@ protected SplitEnumerator<FileStoreSourceSplit, PendingSplitsCheckpoint> buildEn
9294
options.get(FlinkConnectorOptions.SOURCE_CHECKPOINT_ALIGN_TIMEOUT).toMillis(),
9395
options.get(CoreOptions.SCAN_MAX_SPLITS_PER_TASK),
9496
options.get(FlinkConnectorOptions.READ_SHUFFLE_BUCKET_WITH_PARTITION),
95-
options.get(FlinkConnectorOptions.SCAN_MAX_SNAPSHOT_COUNT));
97+
options.get(FlinkConnectorOptions.SCAN_MAX_SNAPSHOT_COUNT),
98+
enumeratorMetrics);
9699
}
97100
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.flink.source.metrics;
20+
21+
import org.apache.paimon.CoreOptions;
22+
23+
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
24+
25+
import java.util.Map;
26+
27+
/**
28+
* Source enumerator metrics.
29+
*
30+
* <p>This class manages metrics for the source split enumerator.
31+
*/
32+
public class FileStoreSourceEnumeratorMetrics {
33+
34+
/**
35+
* Metric name for source scaling max parallelism. This metric provides a recommended upper
36+
* bound of parallelism for auto-scaling systems. For fixed bucket tables, this equals the
37+
* bucket number; for dynamic bucket tables, this equals the current parallelism. Note: This is
38+
* a recommendation, not a hard limit - users can configure higher parallelism manually if
39+
* needed.
40+
*/
41+
public static final String SCALING_MAX_PARALLELISM = "sourceScalingMaxParallelism";
42+
43+
private final int scalingMaxParallelism;
44+
45+
/**
46+
* Creates enumerator metrics and registers them with the given metric group.
47+
*
48+
* @param context the split enumerator context
49+
* @param options the source options
50+
*/
51+
public FileStoreSourceEnumeratorMetrics(
52+
SplitEnumeratorContext<?> context, Map<String, String> options) {
53+
int bucketNum = CoreOptions.fromMap(options).bucket();
54+
// Dynamic bucket mode uses -1.
55+
// In this case, scaling max parallelism equals current parallelism.
56+
this.scalingMaxParallelism = bucketNum < 0 ? context.currentParallelism() : bucketNum;
57+
58+
context.metricGroup().gauge(SCALING_MAX_PARALLELISM, this::getScalingMaxParallelism);
59+
}
60+
61+
public int getScalingMaxParallelism() {
62+
return scalingMaxParallelism;
63+
}
64+
}

paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -927,7 +927,8 @@ public ContinuousFileSplitEnumerator build() {
927927
unawareBucket,
928928
this.splitMaxPerTask,
929929
false,
930-
maxSnapshotCount);
930+
maxSnapshotCount,
931+
null);
931932
}
932933
}
933934

paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceMetricsTest.java

Lines changed: 63 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import org.apache.paimon.data.GenericRow;
2222
import org.apache.paimon.flink.FlinkConnectorOptions;
23+
import org.apache.paimon.flink.source.metrics.FileStoreSourceEnumeratorMetrics;
2324
import org.apache.paimon.flink.utils.TestingMetricUtils;
2425
import org.apache.paimon.fs.FileIO;
2526
import org.apache.paimon.fs.Path;
@@ -54,32 +55,48 @@
5455
/** Tests for file store sources with metrics. */
5556
public class FileStoreSourceMetricsTest {
5657
private FileStoreTable table;
58+
private FileStoreTable fixBucketTable;
5759
private TestingSplitEnumeratorContextWithRegisteringGroup context;
5860
private MetricGroup scanMetricGroup;
61+
private MetricGroup enumeratorMetricGroup;
5962

6063
@BeforeEach
6164
public void before(@TempDir java.nio.file.Path path) throws Exception {
6265
FileIO fileIO = LocalFileIO.create();
6366
Path tablePath = new Path(path.toString());
67+
Path fixBucketTablePath = new Path(path.toString(), "fix_bucket");
6468
SchemaManager schemaManager = new SchemaManager(fileIO, tablePath);
69+
SchemaManager fixBucketSchemaManager = new SchemaManager(fileIO, fixBucketTablePath);
6570
TableSchema tableSchema =
6671
schemaManager.createTable(
6772
Schema.newBuilder()
6873
.column("a", DataTypes.INT())
6974
.column("b", DataTypes.BIGINT())
7075
.build());
76+
TableSchema fixBucketTableSchema =
77+
fixBucketSchemaManager.createTable(
78+
Schema.newBuilder()
79+
.column("a", DataTypes.INT())
80+
.column("b", DataTypes.BIGINT())
81+
.primaryKey("a")
82+
.option("bucket", "2")
83+
.option("bucket-key", "a")
84+
.build());
7185
table = FileStoreTableFactory.create(fileIO, tablePath, tableSchema);
86+
fixBucketTable =
87+
FileStoreTableFactory.create(fileIO, fixBucketTablePath, fixBucketTableSchema);
7288
context = new TestingSplitEnumeratorContextWithRegisteringGroup(1);
7389
scanMetricGroup =
7490
context.metricGroup()
7591
.addGroup("paimon")
7692
.addGroup("table", table.name())
7793
.addGroup("scan");
94+
enumeratorMetricGroup = context.metricGroup();
7895
}
7996

8097
@Test
8198
public void staticFileStoreSourceScanMetricsTest() throws Exception {
82-
writeOnce();
99+
writeOnce(table);
83100
StaticFileStoreSource staticFileStoreSource =
84101
new StaticFileStoreSource(
85102
table.newReadBuilder(),
@@ -98,7 +115,7 @@ public void staticFileStoreSourceScanMetricsTest() throws Exception {
98115

99116
@Test
100117
public void continuousFileStoreSourceScanMetricsTest() throws Exception {
101-
writeOnce();
118+
writeOnce(table);
102119
ContinuousFileStoreSource continuousFileStoreSource =
103120
new ContinuousFileStoreSource(table.newReadBuilder(), table.options(), null);
104121
ContinuousFileSplitEnumerator enumerator =
@@ -114,7 +131,7 @@ public void continuousFileStoreSourceScanMetricsTest() throws Exception {
114131
.getValue())
115132
.isEqualTo(1L);
116133

117-
writeAgain();
134+
writeAgain(table);
118135
enumerator.scanNextSnapshot();
119136
assertThat(TestingMetricUtils.getHistogram(scanMetricGroup, "scanDuration").getCount())
120137
.isEqualTo(2);
@@ -126,7 +143,48 @@ public void continuousFileStoreSourceScanMetricsTest() throws Exception {
126143
.isEqualTo(1L);
127144
}
128145

129-
private void writeOnce() throws Exception {
146+
@Test
147+
public void continuousFileStoreFixBucketEnumeratorMetricsTest() throws Exception {
148+
writeOnce(fixBucketTable);
149+
150+
ContinuousFileStoreSource continuousFileStoreSource =
151+
new ContinuousFileStoreSource(
152+
fixBucketTable.newReadBuilder(), fixBucketTable.options(), null);
153+
ContinuousFileSplitEnumerator enumerator =
154+
(ContinuousFileSplitEnumerator)
155+
continuousFileStoreSource.restoreEnumerator(context, null);
156+
enumerator.scanNextSnapshot();
157+
158+
// equal bucketNum when bucket > 0
159+
assertThat(
160+
TestingMetricUtils.getGauge(
161+
enumeratorMetricGroup,
162+
FileStoreSourceEnumeratorMetrics.SCALING_MAX_PARALLELISM)
163+
.getValue())
164+
.isEqualTo(2);
165+
}
166+
167+
@Test
168+
public void continuousFileStoreDynBucketEnumeratorMetricsTest() throws Exception {
169+
writeOnce(table);
170+
171+
ContinuousFileStoreSource continuousFileStoreSource =
172+
new ContinuousFileStoreSource(table.newReadBuilder(), table.options(), null);
173+
ContinuousFileSplitEnumerator enumerator =
174+
(ContinuousFileSplitEnumerator)
175+
continuousFileStoreSource.restoreEnumerator(context, null);
176+
enumerator.scanNextSnapshot();
177+
178+
// equal parallelism when bucket < 0
179+
assertThat(
180+
TestingMetricUtils.getGauge(
181+
enumeratorMetricGroup,
182+
FileStoreSourceEnumeratorMetrics.SCALING_MAX_PARALLELISM)
183+
.getValue())
184+
.isEqualTo(1);
185+
}
186+
187+
private void writeOnce(FileStoreTable table) throws Exception {
130188
InnerTableWrite writer = table.newWrite("test");
131189
TableCommitImpl commit = table.newCommit("test");
132190
writer.write(GenericRow.of(1, 2L));
@@ -140,7 +198,7 @@ private void writeOnce() throws Exception {
140198
writer.close();
141199
}
142200

143-
private void writeAgain() throws Exception {
201+
private void writeAgain(FileStoreTable table) throws Exception {
144202
InnerTableWrite writer = table.newWrite("test");
145203
TableCommitImpl commit = table.newCommit("test");
146204
writer.write(GenericRow.of(10, 2L));

paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/align/AlignedContinuousFileSplitEnumeratorTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -252,7 +252,8 @@ public AlignedContinuousFileSplitEnumerator build() {
252252
timeout,
253253
10,
254254
false,
255-
-1);
255+
-1,
256+
null);
256257
}
257258
}
258259

paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/utils/TestingMetricUtils.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.flink.metrics.Metric;
2525
import org.apache.flink.metrics.MetricGroup;
2626
import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
27+
import org.apache.flink.runtime.metrics.groups.ProxyMetricGroup;
2728

2829
import java.lang.reflect.Field;
2930
import java.util.Map;
@@ -46,6 +47,14 @@ public static Histogram getHistogram(MetricGroup group, String metricName) {
4647
@SuppressWarnings("unchecked")
4748
private static Metric getMetric(MetricGroup group, String metricName) {
4849
try {
50+
// Handle ProxyMetricGroup wrapper class
51+
if (ProxyMetricGroup.class.isAssignableFrom(group.getClass())) {
52+
Field parentField =
53+
group.getClass().getSuperclass().getDeclaredField("parentMetricGroup");
54+
parentField.setAccessible(true);
55+
group = (MetricGroup) parentField.get(group);
56+
}
57+
4958
Field field = AbstractMetricGroup.class.getDeclaredField("metrics");
5059
field.setAccessible(true);
5160
return ((Map<String, Metric>) field.get(group)).get(metricName);

0 commit comments

Comments
 (0)