Skip to content

Commit

Permalink
Merge pull request kubernetes#117412 from smarterclayton/volume_cancel
Browse files Browse the repository at this point in the history
kubelet: pass context to VolumeManager.WaitFor*
  • Loading branch information
k8s-ci-robot authored Apr 17, 2023
2 parents 29fe2c7 + 453f81d commit 89462b8
Show file tree
Hide file tree
Showing 7 changed files with 278 additions and 42 deletions.
40 changes: 23 additions & 17 deletions pkg/kubelet/kubelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -1641,10 +1641,8 @@ func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
// This operation writes all events that are dispatched in order to provide
// the most accurate information possible about an error situation to aid debugging.
// Callers should not write an event if this operation returns an error.
func (kl *Kubelet) SyncPod(_ context.Context, updateType kubetypes.SyncPodType, pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) (isTerminal bool, err error) {
// TODO(#113606): connect this with the incoming context parameter, which comes from the pod worker.
// Currently, using that context causes test failures.
ctx, otelSpan := kl.tracer.Start(context.TODO(), "syncPod", trace.WithAttributes(
func (kl *Kubelet) SyncPod(ctx context.Context, updateType kubetypes.SyncPodType, pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) (isTerminal bool, err error) {
ctx, otelSpan := kl.tracer.Start(ctx, "syncPod", trace.WithAttributes(
attribute.String("k8s.pod.uid", string(pod.UID)),
attribute.String("k8s.pod", klog.KObj(pod).String()),
attribute.String("k8s.pod.name", pod.Name),
Expand Down Expand Up @@ -1739,13 +1737,15 @@ func (kl *Kubelet) SyncPod(_ context.Context, updateType kubetypes.SyncPodType,
var syncErr error
p := kubecontainer.ConvertPodStatusToRunningPod(kl.getRuntime().Type(), podStatus)
if err := kl.killPod(ctx, pod, p, nil); err != nil {
kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToKillPod, "error killing pod: %v", err)
syncErr = fmt.Errorf("error killing pod: %v", err)
utilruntime.HandleError(syncErr)
if !wait.Interrupted(err) {
kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToKillPod, "error killing pod: %v", err)
syncErr = fmt.Errorf("error killing pod: %w", err)
utilruntime.HandleError(syncErr)
}
} else {
// There was no error killing the pod, but the pod cannot be run.
// Return an error to signal that the sync loop should back off.
syncErr = fmt.Errorf("pod cannot be run: %s", runnable.Message)
syncErr = fmt.Errorf("pod cannot be run: %v", runnable.Message)
}
return false, syncErr
}
Expand Down Expand Up @@ -1791,6 +1791,9 @@ func (kl *Kubelet) SyncPod(_ context.Context, updateType kubetypes.SyncPodType,
if !pcm.Exists(pod) && !firstSync {
p := kubecontainer.ConvertPodStatusToRunningPod(kl.getRuntime().Type(), podStatus)
if err := kl.killPod(ctx, pod, p, nil); err == nil {
if wait.Interrupted(err) {
return false, err
}
podKilled = true
} else {
klog.ErrorS(err, "KillPod failed", "pod", klog.KObj(pod), "podStatus", podStatus)
Expand Down Expand Up @@ -1854,15 +1857,13 @@ func (kl *Kubelet) SyncPod(_ context.Context, updateType kubetypes.SyncPodType,
return false, err
}

// Volume manager will not mount volumes for terminating pods
// TODO: once context cancellation is added this check can be removed
if !kl.podWorkers.IsPodTerminationRequested(pod.UID) {
// Wait for volumes to attach/mount
if err := kl.volumeManager.WaitForAttachAndMount(pod); err != nil {
// Wait for volumes to attach/mount
if err := kl.volumeManager.WaitForAttachAndMount(ctx, pod); err != nil {
if !wait.Interrupted(err) {
kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedMountVolume, "Unable to attach or mount volumes: %v", err)
klog.ErrorS(err, "Unable to attach or mount volumes for pod; skipping pod", "pod", klog.KObj(pod))
return false, err
}
return false, err
}

// Fetch the pull secrets for the pod
Expand All @@ -1881,8 +1882,13 @@ func (kl *Kubelet) SyncPod(_ context.Context, updateType kubetypes.SyncPodType,
}
}

// TODO(#113606): connect this with the incoming context parameter, which comes from the pod worker.
// Currently, using that context causes test failures. To remove this todoCtx, any wait.Interrupted
// errors need to be filtered from result and bypass the reasonCache - cancelling the context for
// SyncPod is a known and deliberate error, not a generic error.
todoCtx := context.TODO()
// Call the container runtime's SyncPod callback
result := kl.containerRuntime.SyncPod(ctx, pod, podStatus, pullSecrets, kl.backOff)
result := kl.containerRuntime.SyncPod(todoCtx, pod, podStatus, pullSecrets, kl.backOff)
kl.reasonCache.Update(pod.UID, result)
if err := result.Error(); err != nil {
// Do not return error if the only failures were pods in backoff
Expand Down Expand Up @@ -2056,7 +2062,7 @@ func (kl *Kubelet) SyncTerminatingRuntimePod(_ context.Context, runningPod *kube
// This typically occurs when a pod is force deleted from configuration (local disk or API) and the
// kubelet restarts in the middle of the action.
func (kl *Kubelet) SyncTerminatedPod(ctx context.Context, pod *v1.Pod, podStatus *kubecontainer.PodStatus) error {
_, otelSpan := kl.tracer.Start(context.Background(), "syncTerminatedPod", trace.WithAttributes(
ctx, otelSpan := kl.tracer.Start(ctx, "syncTerminatedPod", trace.WithAttributes(
attribute.String("k8s.pod.uid", string(pod.UID)),
attribute.String("k8s.pod", klog.KObj(pod).String()),
attribute.String("k8s.pod.name", pod.Name),
Expand All @@ -2074,7 +2080,7 @@ func (kl *Kubelet) SyncTerminatedPod(ctx context.Context, pod *v1.Pod, podStatus

// volumes are unmounted after the pod worker reports ShouldPodRuntimeBeRemoved (which is satisfied
// before syncTerminatedPod is invoked)
if err := kl.volumeManager.WaitForUnmount(pod); err != nil {
if err := kl.volumeManager.WaitForUnmount(ctx, pod); err != nil {
return err
}
klog.V(4).InfoS("Pod termination unmounted volumes", "pod", klog.KObj(pod), "podUID", pod.UID)
Expand Down
2 changes: 2 additions & 0 deletions pkg/kubelet/kubelet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ import (
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/gcepd"
_ "k8s.io/kubernetes/pkg/volume/hostpath"
volumesecret "k8s.io/kubernetes/pkg/volume/secret"
volumetest "k8s.io/kubernetes/pkg/volume/testing"
"k8s.io/kubernetes/pkg/volume/util"
"k8s.io/kubernetes/pkg/volume/util/hostutil"
Expand Down Expand Up @@ -367,6 +368,7 @@ func newTestKubeletWithImageList(
allPlugins = append(allPlugins, plug)
} else {
allPlugins = append(allPlugins, gcepd.ProbeVolumePlugins()...)
allPlugins = append(allPlugins, volumesecret.ProbeVolumePlugins()...)
}

var prober volume.DynamicPluginProber // TODO (#51147) inject mock
Expand Down
141 changes: 134 additions & 7 deletions pkg/kubelet/kubelet_volumes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@ limitations under the License.
package kubelet

import (
"context"
"fmt"
"testing"
"time"

"github.com/stretchr/testify/assert"
v1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -80,7 +82,7 @@ func TestListVolumesForPod(t *testing.T) {
defer close(stopCh)

kubelet.podManager.SetPods([]*v1.Pod{pod})
err := kubelet.volumeManager.WaitForAttachAndMount(pod)
err := kubelet.volumeManager.WaitForAttachAndMount(context.Background(), pod)
assert.NoError(t, err)

podName := util.GetUniquePodName(pod)
Expand Down Expand Up @@ -199,7 +201,7 @@ func TestPodVolumesExist(t *testing.T) {

kubelet.podManager.SetPods(pods)
for _, pod := range pods {
err := kubelet.volumeManager.WaitForAttachAndMount(pod)
err := kubelet.volumeManager.WaitForAttachAndMount(context.Background(), pod)
assert.NoError(t, err)
}

Expand All @@ -209,6 +211,131 @@ func TestPodVolumesExist(t *testing.T) {
}
}

func TestPodVolumeDeadlineAttachAndMount(t *testing.T) {
if testing.Short() {
t.Skip("skipping test in short mode.")
}

testKubelet := newTestKubeletWithImageList(t, nil /*imageList*/, false, /* controllerAttachDetachEnabled */
false /*initFakeVolumePlugin*/, true /*localStorageCapacityIsolation*/)

defer testKubelet.Cleanup()
kubelet := testKubelet.kubelet

// any test cases added here should have volumes that fail to mount
pods := []*v1.Pod{
{
ObjectMeta: metav1.ObjectMeta{
Name: "pod1",
UID: "pod1uid",
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "container1",
VolumeMounts: []v1.VolumeMount{
{
Name: "vol1",
MountPath: "/mnt/vol1",
},
},
},
},
Volumes: []v1.Volume{
{
Name: "vol1",
VolumeSource: v1.VolumeSource{
Secret: &v1.SecretVolumeSource{
SecretName: "non-existent",
},
},
},
},
},
},
}

stopCh := runVolumeManager(kubelet)
defer close(stopCh)

kubelet.podManager.SetPods(pods)
for _, pod := range pods {
start := time.Now()
// ensure our context times out quickly
ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(2*time.Second))
err := kubelet.volumeManager.WaitForAttachAndMount(ctx, pod)
delta := time.Since(start)
// the standard timeout is 2 minutes, so if it's just a few seconds we know that the context timeout was the cause
assert.Lessf(t, delta, 10*time.Second, "WaitForAttachAndMount should timeout when the context is cancelled")
assert.ErrorIs(t, err, context.DeadlineExceeded)
cancel()
}
}

func TestPodVolumeDeadlineUnmount(t *testing.T) {
if testing.Short() {
t.Skip("skipping test in short mode.")
}

testKubelet := newTestKubeletWithImageList(t, nil /*imageList*/, false, /* controllerAttachDetachEnabled */
true /*initFakeVolumePlugin*/, true /*localStorageCapacityIsolation*/)

defer testKubelet.Cleanup()
kubelet := testKubelet.kubelet

// any test cases added here should have volumes that succeed at mounting
pods := []*v1.Pod{
{
ObjectMeta: metav1.ObjectMeta{
Name: "pod1",
UID: "pod1uid",
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "container1",
VolumeMounts: []v1.VolumeMount{
{
Name: "vol1",
MountPath: "/mnt/vol1",
},
},
},
},
Volumes: []v1.Volume{
{
Name: "vol1",
VolumeSource: v1.VolumeSource{
RBD: &v1.RBDVolumeSource{
RBDImage: "fake-device",
},
},
},
},
},
},
}

stopCh := runVolumeManager(kubelet)
defer close(stopCh)

kubelet.podManager.SetPods(pods)
for i, pod := range pods {
if err := kubelet.volumeManager.WaitForAttachAndMount(context.Background(), pod); err != nil {
t.Fatalf("pod %d failed: %v", i, err)
}
start := time.Now()
// ensure our context times out quickly
ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(2*time.Second))
err := kubelet.volumeManager.WaitForUnmount(ctx, pod)
delta := time.Since(start)
// the standard timeout is 2 minutes, so if it's just a few seconds we know that the context timeout was the cause
assert.Lessf(t, delta, 10*time.Second, "WaitForUnmount should timeout when the context is cancelled")
assert.ErrorIs(t, err, context.DeadlineExceeded)
cancel()
}
}

func TestVolumeAttachAndMountControllerDisabled(t *testing.T) {
if testing.Short() {
t.Skip("skipping test in short mode.")
Expand Down Expand Up @@ -246,7 +373,7 @@ func TestVolumeAttachAndMountControllerDisabled(t *testing.T) {
defer close(stopCh)

kubelet.podManager.SetPods([]*v1.Pod{pod})
err := kubelet.volumeManager.WaitForAttachAndMount(pod)
err := kubelet.volumeManager.WaitForAttachAndMount(context.Background(), pod)
assert.NoError(t, err)

podVolumes := kubelet.volumeManager.GetMountedVolumesForPod(
Expand Down Expand Up @@ -308,7 +435,7 @@ func TestVolumeUnmountAndDetachControllerDisabled(t *testing.T) {
kubelet.podManager.SetPods([]*v1.Pod{pod})

// Verify volumes attached
err := kubelet.volumeManager.WaitForAttachAndMount(pod)
err := kubelet.volumeManager.WaitForAttachAndMount(context.Background(), pod)
assert.NoError(t, err)

podVolumes := kubelet.volumeManager.GetMountedVolumesForPod(
Expand All @@ -335,7 +462,7 @@ func TestVolumeUnmountAndDetachControllerDisabled(t *testing.T) {
kubelet.podWorkers.(*fakePodWorkers).setPodRuntimeBeRemoved(pod.UID)
kubelet.podManager.SetPods([]*v1.Pod{})

assert.NoError(t, kubelet.volumeManager.WaitForUnmount(pod))
assert.NoError(t, kubelet.volumeManager.WaitForUnmount(context.Background(), pod))
if actual := kubelet.volumeManager.GetMountedVolumesForPod(util.GetUniquePodName(pod)); len(actual) > 0 {
t.Fatalf("expected volume unmount to wait for no volumes: %v", actual)
}
Expand Down Expand Up @@ -418,7 +545,7 @@ func TestVolumeAttachAndMountControllerEnabled(t *testing.T) {
stopCh,
kubelet.volumeManager)

assert.NoError(t, kubelet.volumeManager.WaitForAttachAndMount(pod))
assert.NoError(t, kubelet.volumeManager.WaitForAttachAndMount(context.Background(), pod))

podVolumes := kubelet.volumeManager.GetMountedVolumesForPod(
util.GetUniquePodName(pod))
Expand Down Expand Up @@ -504,7 +631,7 @@ func TestVolumeUnmountAndDetachControllerEnabled(t *testing.T) {
kubelet.volumeManager)

// Verify volumes attached
assert.NoError(t, kubelet.volumeManager.WaitForAttachAndMount(pod))
assert.NoError(t, kubelet.volumeManager.WaitForAttachAndMount(context.Background(), pod))

podVolumes := kubelet.volumeManager.GetMountedVolumesForPod(
util.GetUniquePodName(pod))
Expand Down
Loading

0 comments on commit 89462b8

Please sign in to comment.