Skip to content

Commit

Permalink
[FLINK-28138][runtime] Add metric numSlowExecutionVertices for specul…
Browse files Browse the repository at this point in the history
…ative execution
  • Loading branch information
zhuzhurk committed Jul 22, 2022
1 parent 3b1165b commit 2173b45
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -109,4 +109,7 @@ public static String currentInputWatermarkName(int index) {
public static final String MAILBOX_THROUGHPUT = "mailboxMailsPerSecond";
public static final String MAILBOX_LATENCY = "mailboxLatencyMs";
public static final String MAILBOX_SIZE = "mailboxQueueSize";

// speculative execution
public static final String NUM_SLOW_EXECUTION_VERTICES = "numSlowExecutionVertices";
}
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ public abstract class SchedulerBase implements SchedulerNG, CheckpointScheduling

private final CheckpointIDCounter checkpointIdCounter;

private final JobManagerJobMetricGroup jobManagerJobMetricGroup;
protected final JobManagerJobMetricGroup jobManagerJobMetricGroup;

protected final ExecutionVertexVersioner executionVertexVersioner;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@

package org.apache.flink.runtime.scheduler.adaptivebatch;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.blocklist.BlockedNode;
import org.apache.flink.runtime.blocklist.BlocklistOperations;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
Expand All @@ -37,6 +39,7 @@
import org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategy;
import org.apache.flink.runtime.io.network.partition.PartitionException;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.metrics.MetricNames;
import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
import org.apache.flink.runtime.scheduler.ExecutionGraphFactory;
import org.apache.flink.runtime.scheduler.ExecutionOperations;
Expand Down Expand Up @@ -86,6 +89,8 @@ public class SpeculativeScheduler extends AdaptiveBatchScheduler

private final SlowTaskDetector slowTaskDetector;

private long numSlowExecutionVertices;

public SpeculativeScheduler(
final Logger log,
final JobGraph jobGraph,
Expand Down Expand Up @@ -157,10 +162,16 @@ public SpeculativeScheduler(

@Override
protected void startSchedulingInternal() {
registerMetrics(jobManagerJobMetricGroup);

super.startSchedulingInternal();
slowTaskDetector.start(getExecutionGraph(), this, getMainThreadExecutor());
}

private void registerMetrics(MetricGroup metricGroup) {
metricGroup.gauge(MetricNames.NUM_SLOW_EXECUTION_VERTICES, () -> numSlowExecutionVertices);
}

@Override
public CompletableFuture<Void> closeAsync() {
slowTaskDetector.stop();
Expand Down Expand Up @@ -272,6 +283,7 @@ private static boolean isExecutionVertexPossibleToFinish(

@Override
public void notifySlowTasks(Map<ExecutionVertexID, Collection<ExecutionAttemptID>> slowTasks) {
numSlowExecutionVertices = slowTasks.size();

// add slow nodes to blocklist before scheduling new speculative executions
blockSlowNodes(slowTasks);
Expand Down Expand Up @@ -344,4 +356,9 @@ private Set<String> getSlowNodeIds(
.map(TaskManagerLocation::getNodeId)
.collect(Collectors.toSet());
}

@VisibleForTesting
long getNumSlowExecutionVertices() {
return numSlowExecutionVertices;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ void testLocalExecutionAttemptFailureAndForbiddenRestartWillFailJob() {
}

@Test
public void testSpeculativeExecutionCombinedWithAdaptiveScheduling() throws Exception {
void testSpeculativeExecutionCombinedWithAdaptiveScheduling() throws Exception {
final JobVertex source = createNoOpVertex("source", 1);
final JobVertex sink = createNoOpVertex("sink", -1);
sink.connectNewDataSetAsInput(
Expand Down Expand Up @@ -385,6 +385,24 @@ public void testSpeculativeExecutionCombinedWithAdaptiveScheduling() throws Exce
assertThat(sinkExecutionVertex.getCurrentExecutions()).hasSize(2);
}

@Test
void testNumSlowExecutionVerticesMetric() {
final SpeculativeScheduler scheduler = createSchedulerAndStartScheduling();
final ExecutionVertex ev = getOnlyExecutionVertex(scheduler);
final Execution attempt1 = ev.getCurrentExecutionAttempt();

notifySlowTask(scheduler, attempt1);
assertThat(scheduler.getNumSlowExecutionVertices()).isEqualTo(1);

// notify a slow vertex twice
notifySlowTask(scheduler, attempt1);
assertThat(scheduler.getNumSlowExecutionVertices()).isEqualTo(1);

// vertex no longer slow
scheduler.notifySlowTasks(Collections.emptyMap());
assertThat(scheduler.getNumSlowExecutionVertices()).isZero();
}

private static Execution getExecution(ExecutionVertex executionVertex, int attemptNumber) {
return executionVertex.getCurrentExecutions().stream()
.filter(e -> e.getAttemptNumber() == attemptNumber)
Expand Down

0 comments on commit 2173b45

Please sign in to comment.