Skip to content

Commit

Permalink
Update daemonSet status even if syncDaemonSet fails
Browse files Browse the repository at this point in the history
This ensures that the daemonset controller updates daemonset statuses in
a best-effort manner even if syncDaemonSet fails.

In order to add an integration test, this also replaces
`cmd/kube-apiserver/app/testing.StartTestServer` with
`test/integration/framework.StartTestServer` and adds
`setupWithServerSetup` to configure the admission control of the
apiserver.
  • Loading branch information
gjkim42 committed Jan 4, 2023
1 parent 67f5701 commit bfda7f6
Show file tree
Hide file tree
Showing 5 changed files with 259 additions and 31 deletions.
60 changes: 39 additions & 21 deletions pkg/controller/daemon/daemon_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -887,6 +887,32 @@ func (dsc *DaemonSetsController) podsShouldBeOnNode(
return nodesNeedingDaemonPods, podsToDelete
}

func (dsc *DaemonSetsController) updateDaemonSet(ctx context.Context, ds *apps.DaemonSet, nodeList []*v1.Node, hash, key string, old []*apps.ControllerRevision) error {
err := dsc.manage(ctx, ds, nodeList, hash)
if err != nil {
return err
}

// Process rolling updates if we're ready.
if dsc.expectations.SatisfiedExpectations(key) {
switch ds.Spec.UpdateStrategy.Type {
case apps.OnDeleteDaemonSetStrategyType:
case apps.RollingUpdateDaemonSetStrategyType:
err = dsc.rollingUpdate(ctx, ds, nodeList, hash)
}
if err != nil {
return err
}
}

err = dsc.cleanupHistory(ctx, ds, old)
if err != nil {
return fmt.Errorf("failed to clean up revisions of DaemonSet: %w", err)
}

return nil
}

// manage manages the scheduling and running of Pods of ds on nodes.
// After figuring out which nodes should run a Pod of ds but not yet running one and
// which nodes should not run a Pod of ds but currently running one, it calls function
Expand Down Expand Up @@ -1136,7 +1162,7 @@ func (dsc *DaemonSetsController) updateDaemonSetStatus(ctx context.Context, ds *

err = storeDaemonSetStatus(ctx, dsc.kubeClient.AppsV1().DaemonSets(ds.Namespace), ds, desiredNumberScheduled, currentNumberScheduled, numberMisscheduled, numberReady, updatedNumberScheduled, numberAvailable, numberUnavailable, updateObservedGen)
if err != nil {
return fmt.Errorf("error storing status for daemon set %#v: %v", ds, err)
return fmt.Errorf("error storing status for daemon set %#v: %w", ds, err)
}

// Resync the DaemonSet after MinReadySeconds as a last line of defense to guard against clock-skew.
Expand Down Expand Up @@ -1210,29 +1236,21 @@ func (dsc *DaemonSetsController) syncDaemonSet(ctx context.Context, key string)
return dsc.updateDaemonSetStatus(ctx, ds, nodeList, hash, false)
}

err = dsc.manage(ctx, ds, nodeList, hash)
if err != nil {
err = dsc.updateDaemonSet(ctx, ds, nodeList, hash, dsKey, old)
statusErr := dsc.updateDaemonSetStatus(ctx, ds, nodeList, hash, true)
switch {
case err != nil && statusErr != nil:
// If there was an error, and we failed to update status,
// log it and return the original error.
klog.ErrorS(statusErr, "Failed to update status", "daemonSet", klog.KObj(ds))
return err
case err != nil:
return err
case statusErr != nil:
return statusErr
}

// Process rolling updates if we're ready.
if dsc.expectations.SatisfiedExpectations(dsKey) {
switch ds.Spec.UpdateStrategy.Type {
case apps.OnDeleteDaemonSetStrategyType:
case apps.RollingUpdateDaemonSetStrategyType:
err = dsc.rollingUpdate(ctx, ds, nodeList, hash)
}
if err != nil {
return err
}
}

err = dsc.cleanupHistory(ctx, ds, old)
if err != nil {
return fmt.Errorf("failed to clean up revisions of DaemonSet: %v", err)
}

return dsc.updateDaemonSetStatus(ctx, ds, nodeList, hash, true)
return nil
}

// NodeShouldRunDaemonPod checks a set of preconditions against a (node,daemonset) and returns a
Expand Down
107 changes: 104 additions & 3 deletions pkg/controller/daemon/daemon_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package daemon

import (
"context"
"errors"
"fmt"
"reflect"
"sort"
Expand Down Expand Up @@ -255,7 +256,7 @@ func (f *fakePodControl) CreatePods(ctx context.Context, namespace string, templ
f.Lock()
defer f.Unlock()
if err := f.FakePodControl.CreatePods(ctx, namespace, template, object, controllerRef); err != nil {
return fmt.Errorf("failed to create pod for DaemonSet")
return fmt.Errorf("failed to create pod for DaemonSet: %w", err)
}

pod := &v1.Pod{
Expand Down Expand Up @@ -387,14 +388,23 @@ func validateSyncDaemonSets(manager *daemonSetsController, fakePodControl *fakeP
}

func expectSyncDaemonSets(t *testing.T, manager *daemonSetsController, ds *apps.DaemonSet, podControl *fakePodControl, expectedCreates, expectedDeletes int, expectedEvents int) {
t.Helper()
expectSyncDaemonSetsWithError(t, manager, ds, podControl, expectedCreates, expectedDeletes, expectedEvents, nil)
}

func expectSyncDaemonSetsWithError(t *testing.T, manager *daemonSetsController, ds *apps.DaemonSet, podControl *fakePodControl, expectedCreates, expectedDeletes int, expectedEvents int, expectedError error) {
t.Helper()
key, err := controller.KeyFunc(ds)
if err != nil {
t.Fatal("could not get key for daemon")
}

err = manager.syncHandler(context.TODO(), key)
if err != nil {
if expectedError != nil && !errors.Is(err, expectedError) {
t.Fatalf("Unexpected error returned from syncHandler: %v", err)
}

if expectedError == nil && err != nil {
t.Log(err)
}

Expand Down Expand Up @@ -771,7 +781,7 @@ func TestSimpleDaemonSetPodCreateErrors(t *testing.T) {
for _, strategy := range updateStrategies() {
ds := newDaemonSet("foo")
ds.Spec.UpdateStrategy = *strategy
manager, podControl, _, err := newTestController(ds)
manager, podControl, clientset, err := newTestController(ds)
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
Expand All @@ -782,6 +792,17 @@ func TestSimpleDaemonSetPodCreateErrors(t *testing.T) {
t.Fatal(err)
}

var updated *apps.DaemonSet
clientset.PrependReactor("update", "daemonsets", func(action core.Action) (handled bool, ret runtime.Object, err error) {
if action.GetSubresource() != "status" {
return false, nil, nil
}
if u, ok := action.(core.UpdateAction); ok {
updated = u.GetObject().(*apps.DaemonSet)
}
return false, nil, nil
})

expectSyncDaemonSets(t, manager, ds, podControl, podControl.FakePodControl.CreateLimit, 0, 0)

expectedLimit := 0
Expand All @@ -791,6 +812,18 @@ func TestSimpleDaemonSetPodCreateErrors(t *testing.T) {
if podControl.FakePodControl.CreateCallCount > expectedLimit {
t.Errorf("Unexpected number of create calls. Expected <= %d, saw %d\n", podControl.FakePodControl.CreateLimit*2, podControl.FakePodControl.CreateCallCount)
}
if updated == nil {
t.Fatalf("Failed to get updated status")
}
if got, want := updated.Status.DesiredNumberScheduled, int32(podControl.FakePodControl.CreateLimit)*10; got != want {
t.Errorf("Status.DesiredNumberScheduled = %v, want %v", got, want)
}
if got, want := updated.Status.CurrentNumberScheduled, int32(podControl.FakePodControl.CreateLimit); got != want {
t.Errorf("Status.CurrentNumberScheduled = %v, want %v", got, want)
}
if got, want := updated.Status.UpdatedNumberScheduled, int32(podControl.FakePodControl.CreateLimit); got != want {
t.Errorf("Status.UpdatedNumberScheduled = %v, want %v", got, want)
}
}
}

Expand Down Expand Up @@ -856,6 +889,74 @@ func TestSimpleDaemonSetUpdatesStatusAfterLaunchingPods(t *testing.T) {
}
}

func TestSimpleDaemonSetUpdatesStatusError(t *testing.T) {
var (
syncErr = fmt.Errorf("sync error")
statusErr = fmt.Errorf("status error")
)

testCases := []struct {
desc string

hasSyncErr bool
hasStatusErr bool

expectedErr error
}{
{
desc: "sync error",
hasSyncErr: true,
hasStatusErr: false,
expectedErr: syncErr,
},
{
desc: "status error",
hasSyncErr: false,
hasStatusErr: true,
expectedErr: statusErr,
},
{
desc: "sync and status error",
hasSyncErr: true,
hasStatusErr: true,
expectedErr: syncErr,
},
}

for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
for _, strategy := range updateStrategies() {
ds := newDaemonSet("foo")
ds.Spec.UpdateStrategy = *strategy
manager, podControl, clientset, err := newTestController(ds)
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}

if tc.hasSyncErr {
podControl.FakePodControl.Err = syncErr
}

clientset.PrependReactor("update", "daemonsets", func(action core.Action) (handled bool, ret runtime.Object, err error) {
if action.GetSubresource() != "status" {
return false, nil, nil
}

if tc.hasStatusErr {
return true, nil, statusErr
} else {
return false, nil, nil
}
})

manager.dsStore.Add(ds)
addNodes(manager.nodeStore, 0, 1, nil)
expectSyncDaemonSetsWithError(t, manager, ds, podControl, 1, 0, 0, tc.expectedErr)
}
})
}
}

// DaemonSets should do nothing if there aren't any nodes
func TestNoNodesDoesNothing(t *testing.T) {
for _, strategy := range updateStrategies() {
Expand Down
65 changes: 58 additions & 7 deletions test/integration/daemonset/daemonset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,12 @@ import (
"k8s.io/client-go/tools/events"
"k8s.io/client-go/util/flowcontrol"
"k8s.io/client-go/util/retry"
"k8s.io/kubernetes/cmd/kube-apiserver/app/options"
kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/daemon"
"k8s.io/kubernetes/pkg/controlplane"
"k8s.io/kubernetes/pkg/scheduler"
"k8s.io/kubernetes/pkg/scheduler/profile"
labelsutil "k8s.io/kubernetes/pkg/util/labels"
Expand All @@ -52,14 +54,26 @@ import (
var zero = int64(0)

func setup(t *testing.T) (context.Context, kubeapiservertesting.TearDownFunc, *daemon.DaemonSetsController, informers.SharedInformerFactory, clientset.Interface) {
// Disable ServiceAccount admission plugin as we don't have serviceaccount controller running.
server := kubeapiservertesting.StartTestServerOrDie(t, nil, []string{"--disable-admission-plugins=ServiceAccount,TaintNodesByCondition"}, framework.SharedEtcd())
return setupWithServerSetup(t, framework.TestServerSetup{})
}

config := restclient.CopyConfig(server.ClientConfig)
clientSet, err := clientset.NewForConfig(config)
if err != nil {
t.Fatalf("Error in creating clientset: %v", err)
func setupWithServerSetup(t *testing.T, serverSetup framework.TestServerSetup) (context.Context, kubeapiservertesting.TearDownFunc, *daemon.DaemonSetsController, informers.SharedInformerFactory, clientset.Interface) {
modifyServerRunOptions := serverSetup.ModifyServerRunOptions
serverSetup.ModifyServerRunOptions = func(opts *options.ServerRunOptions) {
if modifyServerRunOptions != nil {
modifyServerRunOptions(opts)
}

opts.Admission.GenericAdmission.DisablePlugins = append(opts.Admission.GenericAdmission.DisablePlugins,
// Disable ServiceAccount admission plugin as we don't have
// serviceaccount controller running.
"ServiceAccount",
"TaintNodesByCondition",
)
}

clientSet, config, closeFn := framework.StartTestServer(t, serverSetup)

resyncPeriod := 12 * time.Hour
informers := informers.NewSharedInformerFactory(clientset.NewForConfigOrDie(restclient.AddUserAgent(config, "daemonset-informers")), resyncPeriod)
dc, err := daemon.NewDaemonSetsController(
Expand Down Expand Up @@ -96,7 +110,7 @@ func setup(t *testing.T) (context.Context, kubeapiservertesting.TearDownFunc, *d

tearDownFn := func() {
cancel()
server.TearDownFn()
closeFn()
eventBroadcaster.Shutdown()
}

Expand Down Expand Up @@ -999,3 +1013,40 @@ func TestUnschedulableNodeDaemonDoesLaunchPod(t *testing.T) {
validateDaemonSetStatus(dsClient, ds.Name, 2, t)
})
}

func TestUpdateStatusDespitePodCreationFailure(t *testing.T) {
forEachStrategy(t, func(t *testing.T, strategy *apps.DaemonSetUpdateStrategy) {
limitedPodNumber := 2
ctx, closeFn, dc, informers, clientset := setupWithServerSetup(t, framework.TestServerSetup{
ModifyServerConfig: func(config *controlplane.Config) {
config.GenericConfig.AdmissionControl = &fakePodFailAdmission{
limitedPodNumber: limitedPodNumber,
}
},
})
defer closeFn()
ns := framework.CreateNamespaceOrDie(clientset, "update-status-despite-pod-failure", t)
defer framework.DeleteNamespaceOrDie(clientset, ns, t)

dsClient := clientset.AppsV1().DaemonSets(ns.Name)
podClient := clientset.CoreV1().Pods(ns.Name)
nodeClient := clientset.CoreV1().Nodes()
podInformer := informers.Core().V1().Pods().Informer()

informers.Start(ctx.Done())
go dc.Run(ctx, 2)

ds := newDaemonSet("foo", ns.Name)
ds.Spec.UpdateStrategy = *strategy
_, err := dsClient.Create(context.TODO(), ds, metav1.CreateOptions{})
if err != nil {
t.Fatalf("Failed to create DaemonSet: %v", err)
}
defer cleanupDaemonSets(t, clientset, ds)

addNodes(nodeClient, 0, 5, nil, t)

validateDaemonSetPodsAndMarkReady(podClient, podInformer, limitedPodNumber, t)
validateDaemonSetStatus(dsClient, ds.Name, int32(limitedPodNumber), t)
})
}
53 changes: 53 additions & 0 deletions test/integration/daemonset/util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package daemonset

import (
"context"
"fmt"
"sync"

"k8s.io/apiserver/pkg/admission"
api "k8s.io/kubernetes/pkg/apis/core"
)

var _ admission.ValidationInterface = &fakePodFailAdmission{}

type fakePodFailAdmission struct {
lock sync.Mutex
limitedPodNumber int
succeedPodsCount int
}

func (f *fakePodFailAdmission) Handles(operation admission.Operation) bool {
return operation == admission.Create
}

func (f *fakePodFailAdmission) Validate(ctx context.Context, attr admission.Attributes, o admission.ObjectInterfaces) (err error) {
if attr.GetKind().GroupKind() != api.Kind("Pod") {
return nil
}

f.lock.Lock()
defer f.lock.Unlock()

if f.succeedPodsCount >= f.limitedPodNumber {
return fmt.Errorf("fakePodFailAdmission error")
}
f.succeedPodsCount++
return nil
}
Loading

0 comments on commit bfda7f6

Please sign in to comment.