Skip to content

Commit

Permalink
[FLINK-22594][Kubernetes]Add handler to catch pod oomKilled message a…
Browse files Browse the repository at this point in the history
…nd report to resource manager metric
  • Loading branch information
Aitozi committed Jul 19, 2021
1 parent 7261cba commit cf7500f
Show file tree
Hide file tree
Showing 9 changed files with 248 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
import org.apache.flink.kubernetes.kubeclient.FlinkPod;
import org.apache.flink.kubernetes.kubeclient.factory.KubernetesTaskManagerFactory;
import org.apache.flink.kubernetes.kubeclient.handlers.PodOOMHandler;
import org.apache.flink.kubernetes.kubeclient.parameters.KubernetesTaskManagerParameters;
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesPod;
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesTooOldResourceVersionException;
Expand All @@ -47,6 +48,8 @@
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.FutureUtils;

import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableList;

import javax.annotation.Nullable;

import java.io.File;
Expand Down Expand Up @@ -83,6 +86,8 @@ public class KubernetesResourceManagerDriver

private FlinkPod taskManagerPodTemplate;

private List<FlinkKubeClient.PodModifyEventHandler> podModifyEventHandlers;

public KubernetesResourceManagerDriver(
Configuration flinkConfig,
FlinkKubeClient flinkKubeClient,
Expand All @@ -92,6 +97,7 @@ public KubernetesResourceManagerDriver(
this.flinkKubeClient = Preconditions.checkNotNull(flinkKubeClient);
this.requestResourceFutures = new HashMap<>();
this.running = false;
this.podModifyEventHandlers = new ArrayList<>();
}

// ------------------------------------------------------------------------
Expand All @@ -101,6 +107,7 @@ public KubernetesResourceManagerDriver(
@Override
protected void initializeInternal() throws Exception {
podsWatchOpt = watchTaskManagerPods();
podModifyEventHandlers = initPodModifyEventHandlers();
final File podTemplateFile = KubernetesUtils.getTaskManagerPodTemplateFileInPod();
if (podTemplateFile.exists()) {
taskManagerPodTemplate =
Expand Down Expand Up @@ -209,6 +216,10 @@ public void releaseResource(KubernetesWorkerNode worker) {
// Internal
// ------------------------------------------------------------------------

private List<FlinkKubeClient.PodModifyEventHandler> initPodModifyEventHandlers() {
return ImmutableList.of(new PodOOMHandler(resourceManagerMetricGroup));
}

private void recoverWorkerNodesFromPreviousAttempts() throws ResourceManagerException {
List<KubernetesPod> podList =
flinkKubeClient.getPodsWithLabels(KubernetesUtils.getTaskManagerLabels(clusterId));
Expand Down Expand Up @@ -266,6 +277,10 @@ private KubernetesTaskManagerParameters createKubernetesTaskManagerParameters(
KubernetesConfigOptions.EXTERNAL_RESOURCE_KUBERNETES_CONFIG_KEY_SUFFIX));
}

private void handlePodModifyEventsInMainThread(List<KubernetesPod> pods) {
getMainThreadExecutor().execute(() -> podModifyEventHandlers.forEach(h -> h.handle(pods)));
}

private void handlePodEventsInMainThread(List<KubernetesPod> pods) {
getMainThreadExecutor()
.execute(
Expand Down Expand Up @@ -339,13 +354,15 @@ private Optional<KubernetesWatch> watchTaskManagerPods() {

private class PodCallbackHandlerImpl
implements FlinkKubeClient.WatchCallbackHandler<KubernetesPod> {

@Override
public void onAdded(List<KubernetesPod> pods) {
handlePodEventsInMainThread(pods);
}

@Override
public void onModified(List<KubernetesPod> pods) {
handlePodModifyEventsInMainThread(pods);
handlePodEventsInMainThread(pods);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,4 +214,12 @@ interface WatchCallbackHandler<T> {

void handleError(Throwable throwable);
}

/** Handler to deal with pod modify event. */
interface PodModifyEventHandler {

void handle(List<KubernetesPod> pods);

void handle(KubernetesPod pod);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.kubernetes.kubeclient.handlers;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesPod;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.MetricGroup;

import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;

import io.fabric8.kubernetes.api.model.ContainerStatus;
import io.fabric8.kubernetes.api.model.Pod;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;

/** This handler will check the pod oom message, and report metric. */
public class PodOOMHandler implements FlinkKubeClient.PodModifyEventHandler {

private static final Logger LOG = LoggerFactory.getLogger(PodOOMHandler.class);
private static final String OOM_METRIC_NAME = "kubernetes.OOMKilled";
private static final String OOM_KILLED_KEYWORD = "OOMKilled";

private Counter oomCounter;
// dead time to the set of oomKilled pods.
// this is to avoid duplicate event message from api server.
private Cache<String, Set<String>> tombstones;

public PodOOMHandler(MetricGroup metricGroup) {
this.oomCounter = metricGroup.counter(OOM_METRIC_NAME);
this.tombstones = CacheBuilder.newBuilder().expireAfterWrite(1, TimeUnit.HOURS).build();
}

@Override
public void handle(List<KubernetesPod> pods) {
pods.forEach(
p -> {
if (p.isTerminated()) {
Pod pod = p.getInternalResource();
String podName = pod.getMetadata().getName();
for (ContainerStatus containerStatus :
pod.getStatus().getContainerStatuses()) {
if (containerStatus.getState() != null
&& containerStatus.getState().getTerminated() != null
&& containerStatus.getState().getTerminated().getReason()
!= null
&& containerStatus.getState().getTerminated().getFinishedAt()
!= null) {
if (containerStatus
.getState()
.getTerminated()
.getReason()
.contains(OOM_KILLED_KEYWORD)) {
String finishTime =
containerStatus
.getState()
.getTerminated()
.getFinishedAt();
Set<String> podKilledAtThisTime =
tombstones.getIfPresent(finishTime);

boolean newAdded = false;
if (podKilledAtThisTime == null) {
newAdded = true;
podKilledAtThisTime = new HashSet<>();
podKilledAtThisTime.add(podName);
tombstones.put(finishTime, podKilledAtThisTime);
} else {
if (!podKilledAtThisTime.contains(podName)) {
newAdded = true;
podKilledAtThisTime.add(podName);
tombstones.put(finishTime, podKilledAtThisTime);
}
}
if (newAdded) {
oomCounter.inc();
LOG.info(
"pod {} oom killed at {}, totally: {}",
podName,
finishTime,
oomCounter.getCount());
}
}
}
}
}
});
}

@Override
public void handle(KubernetesPod pod) {
handle(Collections.singletonList(pod));
}

@VisibleForTesting
public Cache<String, Set<String>> getTombstones() {
return tombstones;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.kubernetes.kubeclient.handlers;

import org.apache.flink.kubernetes.kubeclient.resources.KubernetesPod;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;

import io.fabric8.kubernetes.api.model.ContainerStateBuilder;
import io.fabric8.kubernetes.api.model.ContainerStatus;
import io.fabric8.kubernetes.api.model.ContainerStatusBuilder;
import io.fabric8.kubernetes.api.model.PodBuilder;
import io.fabric8.kubernetes.api.model.PodSpecBuilder;
import io.fabric8.kubernetes.api.model.PodStatusBuilder;
import org.junit.Assert;
import org.junit.Test;

import java.util.Collections;

/** Test for PodOOMHandler. */
public class PodOOMHandlerTest {

@Test
public void testOOMCount() {
PodOOMHandler podOOMHandler =
new PodOOMHandler(
UnregisteredMetricGroups.createUnregisteredResourceManagerMetricGroup());

String finishTime = "2020-03-19T13:14:48Z";
KubernetesPod kubernetesPod =
new KubernetesPod(
new PodBuilder()
.editOrNewMetadata()
.withName("pod-1")
.endMetadata()
.withSpec(
new PodSpecBuilder()
.withContainers(Collections.emptyList())
.build())
.withStatus(
new PodStatusBuilder()
.withContainerStatuses(
new ContainerStatus[] {
new ContainerStatusBuilder()
.withState(
new ContainerStateBuilder()
.withNewTerminated()
.withFinishedAt(
finishTime)
.withReason(
"OOMKilled")
.endTerminated()
.build())
.build()
})
.build())
.build());
podOOMHandler.handle(kubernetesPod);
Assert.assertEquals(1, podOOMHandler.getTombstones().size());
Assert.assertTrue(podOOMHandler.getTombstones().getIfPresent(finishTime).contains("pod-1"));
// test ignore same events
podOOMHandler.handle(kubernetesPod);
Assert.assertEquals(1, podOOMHandler.getTombstones().getIfPresent(finishTime).size());

kubernetesPod.getInternalResource().getMetadata().setName("pod-2");
podOOMHandler.handle(kubernetesPod);
Assert.assertEquals(2, podOOMHandler.getTombstones().getIfPresent(finishTime).size());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable;
import org.apache.flink.runtime.metrics.groups.ResourceManagerMetricGroup;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.ScheduledExecutor;

Expand All @@ -40,6 +41,7 @@ public abstract class AbstractResourceManagerDriver<WorkerType extends ResourceI
private ResourceEventHandler<WorkerType> resourceEventHandler = null;
private ScheduledExecutor mainThreadExecutor = null;
private Executor ioExecutor = null;
protected ResourceManagerMetricGroup resourceManagerMetricGroup = null;

public AbstractResourceManagerDriver(
final Configuration flinkConfig, final Configuration flinkClientConfig) {
Expand Down Expand Up @@ -72,11 +74,13 @@ protected final Executor getIoExecutor() {
public final void initialize(
ResourceEventHandler<WorkerType> resourceEventHandler,
ScheduledExecutor mainThreadExecutor,
Executor ioExecutor)
Executor ioExecutor,
ResourceManagerMetricGroup resourceManagerMetricGroup)
throws Exception {
this.resourceEventHandler = Preconditions.checkNotNull(resourceEventHandler);
this.mainThreadExecutor = Preconditions.checkNotNull(mainThreadExecutor);
this.ioExecutor = Preconditions.checkNotNull(ioExecutor);
this.resourceManagerMetricGroup = resourceManagerMetricGroup;

initializeInternal();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,8 @@ public ActiveResourceManager(
@Override
protected void initialize() throws ResourceManagerException {
try {
resourceManagerDriver.initialize(this, new GatewayMainThreadExecutor(), ioExecutor);
resourceManagerDriver.initialize(
this, new GatewayMainThreadExecutor(), ioExecutor, resourceManagerMetricGroup);
} catch (Exception e) {
throw new ResourceManagerException("Cannot initialize resource provider.", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec;
import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable;
import org.apache.flink.runtime.metrics.groups.ResourceManagerMetricGroup;
import org.apache.flink.util.concurrent.ScheduledExecutor;

import javax.annotation.Nullable;
Expand All @@ -44,7 +45,8 @@ public interface ResourceManagerDriver<WorkerType extends ResourceIDRetrievable>
void initialize(
ResourceEventHandler<WorkerType> resourceEventHandler,
ScheduledExecutor mainThreadExecutor,
Executor ioExecutor)
Executor ioExecutor,
ResourceManagerMetricGroup resourceManagerMetricGroup)
throws Exception;

/** Terminate the deployment specific components. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec;
import org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils;
import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.concurrent.ScheduledExecutor;
Expand Down Expand Up @@ -177,7 +178,8 @@ protected final void runTest(RunnableWithException testMethod) throws Exception
driver.initialize(
resourceEventHandlerBuilder.build(),
mainThreadExecutor,
ForkJoinPool.commonPool());
ForkJoinPool.commonPool(),
UnregisteredMetricGroups.createUnregisteredResourceManagerMetricGroup());

testMethod.run();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.metrics.groups.ResourceManagerMetricGroup;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.ScheduledExecutor;
import org.apache.flink.util.function.BiConsumerWithException;
Expand Down Expand Up @@ -69,7 +70,8 @@ private TestingResourceManagerDriver(
public void initialize(
ResourceEventHandler<ResourceID> resourceEventHandler,
ScheduledExecutor mainThreadExecutor,
Executor ioExecutor)
Executor ioExecutor,
ResourceManagerMetricGroup resourceManagerMetricGroup)
throws Exception {
initializeFunction.apply(resourceEventHandler, mainThreadExecutor, ioExecutor);
}
Expand Down

0 comments on commit cf7500f

Please sign in to comment.