Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion docs/content/docs/ops/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -1026,7 +1026,7 @@ Metrics related to data exchange between task executors using netty network comm
</thead>
<tbody>
<tr>
<th rowspan="5"><strong>JobManager</strong></th>
<th rowspan="7"><strong>JobManager</strong></th>
<td>numRegisteredTaskManagers</td>
<td>The number of registered taskmanagers.</td>
<td>Gauge</td>
Expand All @@ -1051,6 +1051,16 @@ Metrics related to data exchange between task executors using netty network comm
<td>The total number of task slots.</td>
<td>Gauge</td>
</tr>
<tr>
<td>totalRequestTaskManagers</td>
<td>The total number of task manager create request</td>
<td>Counter</td>
</tr>
<tr>
<td>totalReleaseTaskManagers</td>
<td>The total number of task manager release request</td>
<td>Counter</td>
</tr>
</tbody>
</table>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ private MetricNames() {}
public static final String TASK_SLOTS_TOTAL = "taskSlotsTotal";
public static final String NUM_REGISTERED_TASK_MANAGERS = "numRegisteredTaskManagers";
public static final String NUM_PENDING_TASK_MANAGERS = "numPendingTaskManagers";
public static final String TOTAL_REQUEST_TASK_MANAGERS = "totalRequestTaskManagers";
public static final String TOTAL_RELEASE_TASK_MANAGERS = "totalReleaseTaskManagers";

public static final String NUM_RESTARTS = "numRestarts";
public static final String NUM_RESCALES = "numRescales";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RpcOptions;
import org.apache.flink.metrics.Counter;
import org.apache.flink.runtime.blocklist.BlocklistHandler;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec;
Expand Down Expand Up @@ -96,6 +97,10 @@ public class ActiveResourceManager<WorkerType extends ResourceIDRetrievable>

/** Number of requested or registered recovered workers per worker resource spec. */
private final WorkerCounter totalWorkerCounter;

private final Counter requestWorkerTotal;

private final Counter releaseWorkerTotal;

/** Identifiers and worker resource spec of all allocated workers. */
private final Map<ResourceID, WorkerResourceSpec> workerResourceSpecs;
Expand Down Expand Up @@ -169,6 +174,8 @@ public ActiveResourceManager(
this.workerNodeMap = new HashMap<>();
this.pendingWorkerCounter = new WorkerCounter();
this.totalWorkerCounter = new WorkerCounter();
this.requestWorkerTotal = resourceManagerMetricGroup.counter(MetricNames.TOTAL_REQUEST_TASK_MANAGERS);
this.releaseWorkerTotal = resourceManagerMetricGroup.counter(MetricNames.TOTAL_RELEASE_TASK_MANAGERS);
this.workerResourceSpecs = new HashMap<>();
this.unallocatedWorkerFutures = new HashMap<>();
this.currentAttemptUnregisteredWorkers = new HashSet<>();
Expand Down Expand Up @@ -340,6 +347,7 @@ private void checkResourceDeclarations() {
releaseOrRequestWorkerNumber,
totalWorkerCounter.getNum(workerResourceSpec),
declaredWorkerNumber);
releaseWorkerTotal.inc(releaseOrRequestWorkerNumber);

// release unwanted workers.
int remainingReleasingWorkerNumber =
Expand Down Expand Up @@ -494,6 +502,7 @@ public void requestNewWorker(WorkerResourceSpec workerResourceSpec) {
flinkConfig, workerResourceSpec);
final int pendingCount = pendingWorkerCounter.increaseAndGet(workerResourceSpec);
totalWorkerCounter.increaseAndGet(workerResourceSpec);
requestWorkerTotal.inc();

log.info(
"Requesting new worker with resource spec {}, current pending count: {}.",
Expand Down