Skip to content

Commit 4dc662e

Browse files
authored
[FLINK-31866] Start metric window with timestamp of first observation (apache#573)
The autoscaler uses a ConfigMap to store past metric observations which is used to re-initialize the autoscaler state in case of failures or upgrades. Whenever trimming of the ConfigMap occurs, we need to make sure we also update the timestamp for the start of the metric collection, so any removed observations can be compensated with by collecting new ones. If we don't do this, the metric window will effectively shrink due to removing observations. This can lead to triggering scaling decisions when the operator gets redeployed due to the removed items. The solution we are opting here is to treat the first metric observation timestamp as the start of the metric collection.
1 parent d9b3437 commit 4dc662e

File tree

6 files changed

+84
-61
lines changed

6 files changed

+84
-61
lines changed

flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfo.java

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,6 @@ public class AutoScalerInfo {
6565

6666
protected static final String COLLECTED_METRICS_KEY = "collectedMetrics";
6767
protected static final String SCALING_HISTORY_KEY = "scalingHistory";
68-
protected static final String JOB_UPDATE_TS_KEY = "jobUpdateTs";
6968

7069
protected static final int MAX_CM_BYTES = 1000000;
7170

@@ -104,12 +103,10 @@ public SortedMap<Instant, CollectedMetrics> getMetricHistory() {
104103
}
105104

106105
@SneakyThrows
107-
public void updateMetricHistory(
108-
Instant jobUpdateTs, SortedMap<Instant, CollectedMetrics> history) {
106+
public void updateMetricHistory(SortedMap<Instant, CollectedMetrics> history) {
109107
configMap
110108
.getData()
111109
.put(COLLECTED_METRICS_KEY, compress(YAML_MAPPER.writeValueAsString(history)));
112-
configMap.getData().put(JOB_UPDATE_TS_KEY, jobUpdateTs.toString());
113110
}
114111

115112
@SneakyThrows
@@ -124,11 +121,6 @@ public void updateVertexList(List<JobVertexID> vertexList) {
124121

125122
public void clearMetricHistory() {
126123
configMap.getData().remove(COLLECTED_METRICS_KEY);
127-
configMap.getData().remove(JOB_UPDATE_TS_KEY);
128-
}
129-
130-
public Optional<Instant> getJobUpdateTs() {
131-
return Optional.ofNullable(configMap.getData().get(JOB_UPDATE_TS_KEY)).map(Instant::parse);
132124
}
133125

134126
@SneakyThrows

flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java

Lines changed: 30 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,6 @@
5757
import java.util.Optional;
5858
import java.util.Set;
5959
import java.util.SortedMap;
60-
import java.util.TreeMap;
6160
import java.util.concurrent.ConcurrentHashMap;
6261
import java.util.stream.Collectors;
6362

@@ -77,50 +76,41 @@ public abstract class ScalingMetricCollector {
7776

7877
public CollectedMetricHistory updateMetrics(
7978
AbstractFlinkResource<?, ?> cr,
80-
AutoScalerInfo scalingInformation,
79+
AutoScalerInfo autoscalerInfo,
8180
FlinkService flinkService,
8281
Configuration conf)
8382
throws Exception {
8483

84+
var topology = getJobTopology(flinkService, cr, conf, autoscalerInfo);
8585
var resourceID = ResourceID.fromResource(cr);
86-
var jobStatus = cr.getStatus().getJobStatus();
87-
var currentJobUpdateTs = Instant.ofEpochMilli(Long.parseLong(jobStatus.getUpdateTime()));
8886
var now = clock.instant();
8987

90-
if (!currentJobUpdateTs.equals(
91-
scalingInformation.getJobUpdateTs().orElse(currentJobUpdateTs))) {
92-
scalingInformation.clearMetricHistory();
88+
var metricHistory =
89+
histories.computeIfAbsent(resourceID, (k) -> autoscalerInfo.getMetricHistory());
90+
91+
// The timestamp of the first metric observation marks the start
92+
// If we haven't collected any metrics, we are starting now
93+
var metricCollectionStartTs = metricHistory.isEmpty() ? now : metricHistory.firstKey();
94+
var jobUpdateTs = getJobUpdateTs(cr);
95+
if (jobUpdateTs.isAfter(metricCollectionStartTs)) {
96+
LOG.info("Job updated. Clearing metrics.");
97+
autoscalerInfo.clearMetricHistory();
9398
cleanup(cr);
99+
metricHistory.clear();
100+
metricCollectionStartTs = now;
94101
}
95102

96-
var topology = getJobTopology(flinkService, cr, conf, scalingInformation);
103+
// Trim metrics outside the metric window from metrics history
104+
var metricWindowSize = getMetricWindowSize(conf);
105+
metricHistory.headMap(now.minus(metricWindowSize)).clear();
97106

98-
var stabilizationDuration = conf.get(AutoScalerOptions.STABILIZATION_INTERVAL);
99-
var stableTime = currentJobUpdateTs.plus(stabilizationDuration);
107+
var stableTime = jobUpdateTs.plus(conf.get(AutoScalerOptions.STABILIZATION_INTERVAL));
100108
if (now.isBefore(stableTime)) {
101109
// As long as we are stabilizing, collect no metrics at all
102110
LOG.info("Skipping metric collection during stabilization period until {}", stableTime);
103111
return new CollectedMetricHistory(topology, Collections.emptySortedMap());
104112
}
105113

106-
// Adjust the window size until it reaches the max size
107-
var metricsWindowSize =
108-
Duration.ofMillis(
109-
Math.min(
110-
now.toEpochMilli() - stableTime.toEpochMilli(),
111-
conf.get(AutoScalerOptions.METRICS_WINDOW).toMillis()));
112-
113-
// Extract metrics history for metric window size
114-
var scalingMetricHistory =
115-
histories.compute(
116-
resourceID,
117-
(k, h) -> {
118-
if (h == null) {
119-
h = scalingInformation.getMetricHistory();
120-
}
121-
return new TreeMap<>(h.tailMap(now.minus(metricsWindowSize)));
122-
});
123-
124114
// The filtered list of metrics we want to query for each vertex
125115
var filteredVertexMetricNames = queryFilteredMetricNames(flinkService, cr, conf, topology);
126116

@@ -133,10 +123,10 @@ public CollectedMetricHistory updateMetrics(
133123
convertToScalingMetrics(resourceID, collectedVertexMetrics, topology, conf);
134124

135125
// Add scaling metrics to history if they were computed successfully
136-
scalingMetricHistory.put(clock.instant(), scalingMetrics);
137-
scalingInformation.updateMetricHistory(currentJobUpdateTs, scalingMetricHistory);
126+
metricHistory.put(now, scalingMetrics);
127+
autoscalerInfo.updateMetricHistory(metricHistory);
138128

139-
var windowFullTime = stableTime.plus(conf.get(AutoScalerOptions.METRICS_WINDOW));
129+
var windowFullTime = metricCollectionStartTs.plus(metricWindowSize);
140130
if (now.isBefore(windowFullTime)) {
141131
// As long as we haven't had time to collect a full window,
142132
// collect metrics but do not return any metrics
@@ -146,7 +136,15 @@ public CollectedMetricHistory updateMetrics(
146136
return new CollectedMetricHistory(topology, Collections.emptySortedMap());
147137
}
148138

149-
return new CollectedMetricHistory(topology, scalingMetricHistory);
139+
return new CollectedMetricHistory(topology, metricHistory);
140+
}
141+
142+
protected Duration getMetricWindowSize(Configuration conf) {
143+
return conf.get(AutoScalerOptions.METRICS_WINDOW);
144+
}
145+
146+
private static Instant getJobUpdateTs(AbstractFlinkResource<?, ?> cr) {
147+
return Instant.ofEpochMilli(Long.parseLong(cr.getStatus().getJobStatus().getUpdateTime()));
150148
}
151149

152150
protected JobTopology getJobTopology(

flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfoTest.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,6 @@ public void testCompressionMigration() throws JsonProcessingException {
136136
data.put(
137137
AutoScalerInfo.COLLECTED_METRICS_KEY,
138138
AutoScalerInfo.YAML_MAPPER.writeValueAsString(metricHistory));
139-
data.put(AutoScalerInfo.JOB_UPDATE_TS_KEY, jobUpdateTs.toString());
140139
data.put(
141140
AutoScalerInfo.SCALING_HISTORY_KEY,
142141
AutoScalerInfo.YAML_MAPPER.writeValueAsString(scalingHistory));
@@ -147,7 +146,7 @@ public void testCompressionMigration() throws JsonProcessingException {
147146

148147
// Override with compressed data
149148
var newTs = Instant.now();
150-
info.updateMetricHistory(newTs, metricHistory);
149+
info.updateMetricHistory(metricHistory);
151150
info.addToScalingHistory(newTs, Map.of(), new Configuration());
152151

153152
// Make sure we can still access everything
@@ -191,7 +190,7 @@ public void testMetricsTrimming() throws Exception {
191190
1, 2, Map.of(ScalingMetric.LAG, EvaluatedScalingMetric.of(2.)))),
192191
new Configuration());
193192

194-
info.updateMetricHistory(Instant.now(), metricHistory);
193+
info.updateMetricHistory(metricHistory);
195194

196195
assertFalse(
197196
data.get(AutoScalerInfo.COLLECTED_METRICS_KEY).length()

flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/BacklogBasedScalingTest.java

Lines changed: 31 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -109,14 +109,21 @@ public void setup() {
109109
autoscaler =
110110
new JobAutoScalerImpl(
111111
kubernetesClient, metricsCollector, evaluator, scalingExecutor);
112+
113+
// Reset custom window size to default
114+
metricsCollector.setTestMetricWindowSize(null);
112115
}
113116

114117
@Test
115118
public void test() throws Exception {
116119
var ctx = createAutoscalerTestContext();
120+
121+
/* Test scaling up. */
117122
var now = Instant.ofEpochMilli(0);
118123
setClocksTo(now);
119124
redeployJob(now);
125+
// Adjust metric window size, so we can fill the metric window with two metrics
126+
metricsCollector.setTestMetricWindowSize(Duration.ofSeconds(1));
120127
metricsCollector.setCurrentMetrics(
121128
Map.of(
122129
source1,
@@ -139,16 +146,22 @@ public void test() throws Exception {
139146
"", Double.NaN, Double.NaN, Double.NaN, 500.))));
140147

141148
autoscaler.scale(getResourceContext(app, ctx));
142-
assertFalse(AutoScalerInfo.forResource(app, kubernetesClient).getMetricHistory().isEmpty());
149+
assertEquals(
150+
1, AutoScalerInfo.forResource(app, kubernetesClient).getMetricHistory().size());
143151

144152
now = now.plus(Duration.ofSeconds(1));
145153
setClocksTo(now);
146154
autoscaler.scale(getResourceContext(app, ctx));
155+
assertEquals(
156+
2, AutoScalerInfo.forResource(app, kubernetesClient).getMetricHistory().size());
147157

148158
var scaledParallelism = ScalingExecutorTest.getScaledParallelism(app);
149159
assertEquals(4, scaledParallelism.get(source1));
150160
assertEquals(4, scaledParallelism.get(sink));
151161

162+
/* Test stability while processing pending records. */
163+
164+
// Update topology to reflect updated parallelisms
152165
metricsCollector.setJobTopology(
153166
new JobTopology(
154167
new VertexInfo(source1, Set.of(), 4, 24),
@@ -173,12 +186,16 @@ public void test() throws Exception {
173186
FlinkMetric.NUM_RECORDS_IN_PER_SEC,
174187
new AggregatedMetric(
175188
"", Double.NaN, Double.NaN, Double.NaN, 1800.))));
176-
177-
now = now.plus(Duration.ofSeconds(1));
189+
now = now.plusSeconds(1);
178190
setClocksTo(now);
191+
// Redeploying which erases metric history
179192
redeployJob(now);
193+
// Adjust metric window size, so we can fill the metric window with three metrics
194+
metricsCollector.setTestMetricWindowSize(Duration.ofSeconds(2));
195+
180196
autoscaler.scale(getResourceContext(app, ctx));
181-
assertFalse(AutoScalerInfo.forResource(app, kubernetesClient).getMetricHistory().isEmpty());
197+
assertEquals(
198+
1, AutoScalerInfo.forResource(app, kubernetesClient).getMetricHistory().size());
182199
scaledParallelism = ScalingExecutorTest.getScaledParallelism(app);
183200
assertEquals(4, scaledParallelism.get(source1));
184201
assertEquals(4, scaledParallelism.get(sink));
@@ -207,11 +224,14 @@ public void test() throws Exception {
207224
now = now.plus(Duration.ofSeconds(1));
208225
setClocksTo(now);
209226
autoscaler.scale(getResourceContext(app, ctx));
210-
assertFalse(AutoScalerInfo.forResource(app, kubernetesClient).getMetricHistory().isEmpty());
227+
assertEquals(
228+
2, AutoScalerInfo.forResource(app, kubernetesClient).getMetricHistory().size());
211229
scaledParallelism = ScalingExecutorTest.getScaledParallelism(app);
212230
assertEquals(4, scaledParallelism.get(source1));
213231
assertEquals(4, scaledParallelism.get(sink));
214232

233+
/* Test scaling down. */
234+
215235
// We have finally caught up to our original lag, time to scale down
216236
metricsCollector.setCurrentMetrics(
217237
Map.of(
@@ -235,15 +255,20 @@ public void test() throws Exception {
235255
now = now.plus(Duration.ofSeconds(1));
236256
setClocksTo(now);
237257
autoscaler.scale(getResourceContext(app, ctx));
258+
assertEquals(
259+
3, AutoScalerInfo.forResource(app, kubernetesClient).getMetricHistory().size());
238260

239261
scaledParallelism = ScalingExecutorTest.getScaledParallelism(app);
240262
assertEquals(2, scaledParallelism.get(source1));
241263
assertEquals(2, scaledParallelism.get(sink));
264+
242265
metricsCollector.setJobTopology(
243266
new JobTopology(
244267
new VertexInfo(source1, Set.of(), 2, 24),
245268
new VertexInfo(sink, Set.of(source1), 2, 720)));
246269

270+
/* Test stability while processing backlog. */
271+
247272
metricsCollector.setCurrentMetrics(
248273
Map.of(
249274
source1,
@@ -349,13 +374,7 @@ public void testMetricsPersistedAfterRedeploy() {
349374
}
350375

351376
private void redeployJob(Instant now) {
352-
// Offset the update time by one metrics window to simulate collecting one entire window
353-
app.getStatus()
354-
.getJobStatus()
355-
.setUpdateTime(
356-
String.valueOf(
357-
now.minus(AutoScalerOptions.METRICS_WINDOW.defaultValue())
358-
.toEpochMilli()));
377+
app.getStatus().getJobStatus().setUpdateTime(String.valueOf(now.toEpochMilli()));
359378
}
360379

361380
private void setClocksTo(Instant time) {

flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/MetricsCollectionAndEvaluationTest.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -331,15 +331,19 @@ public void testMetricCollectorWindow() throws Exception {
331331
metricsHistory = metricsCollector.updateMetrics(app, scalingInfo, service, conf);
332332
assertEquals(3, metricsHistory.getMetricHistory().size());
333333

334-
// Completely new metric window with just the currently connected metric
334+
// Complete new metric window with just the currently collected metric
335335
metricsCollector.setClock(
336336
Clock.offset(clock, conf.get(AutoScalerOptions.METRICS_WINDOW).plusDays(1)));
337337
metricsHistory = metricsCollector.updateMetrics(app, scalingInfo, service, conf);
338338
assertEquals(1, metricsHistory.getMetricHistory().size());
339339

340-
// Everything should reset on job updates
341-
app.getStatus().getJobStatus().setUpdateTime("0");
342-
assertEquals(1, metricsHistory.getMetricHistory().size());
340+
// Existing metrics should be cleared on job updates
341+
app.getStatus()
342+
.getJobStatus()
343+
.setUpdateTime(
344+
String.valueOf(clock.instant().plus(Duration.ofDays(10)).toEpochMilli()));
345+
metricsHistory = metricsCollector.updateMetrics(app, scalingInfo, service, conf);
346+
assertEquals(0, metricsHistory.getMetricHistory().size());
343347
}
344348

345349
@Test
@@ -435,7 +439,7 @@ private CollectedMetricHistory collectMetrics() throws Exception {
435439
conf.set(AutoScalerOptions.STABILIZATION_INTERVAL, Duration.ZERO);
436440
conf.set(AutoScalerOptions.METRICS_WINDOW, Duration.ofSeconds(2));
437441

438-
metricsCollector.setClock(Clock.offset(clock, Duration.ofSeconds(1)));
442+
metricsCollector.setClock(clock);
439443

440444
var collectedMetrics = metricsCollector.updateMetrics(app, scalingInfo, service, conf);
441445
assertTrue(collectedMetrics.getMetricHistory().isEmpty());

flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/TestingMetricsCollector.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929

3030
import lombok.Setter;
3131

32+
import java.time.Duration;
3233
import java.util.Collection;
3334
import java.util.Collections;
3435
import java.util.HashMap;
@@ -39,6 +40,8 @@ public class TestingMetricsCollector extends ScalingMetricCollector {
3940

4041
@Setter private JobTopology jobTopology;
4142

43+
@Setter private Duration testMetricWindowSize;
44+
4245
@Setter
4346
private Map<JobVertexID, Map<FlinkMetric, AggregatedMetric>> currentMetrics = new HashMap<>();
4447

@@ -76,4 +79,12 @@ protected Collection<AggregatedMetric> queryAggregatedMetricNames(
7679
RestClusterClient<?> restClient, JobID jobID, JobVertexID jobVertexID) {
7780
return metricNames.getOrDefault(jobVertexID, Collections.emptyList());
7881
}
82+
83+
@Override
84+
protected Duration getMetricWindowSize(Configuration conf) {
85+
if (testMetricWindowSize != null) {
86+
return testMetricWindowSize;
87+
}
88+
return super.getMetricWindowSize(conf);
89+
}
7990
}

0 commit comments

Comments
 (0)