Skip to content

Commit

Permalink
Improve accuracy of local control plane migration e2e tests (gardener…
Browse files Browse the repository at this point in the history
…#7981)

* Decouple certain functions from generic `Worker` actuator

This way, they become reusable even when no `genericActuator` object is available

Co-Authored-By: Jens Schneider <schneider@23technologies.cloud>
Co-Authored-By: mreiger <michael@rauschpfeife.net>

* Move restoration logic in generic `Worker` actuator into dedicated function

Earlier, the `genericactuator.Restore()` was restoring and always calling `Reconcile()` at the end. It is still doing this, however now the restoration logic has been separated into `RestoreWithoutReconcile`. This allows extensions to optionally perform custom operations before they call the `Reconcile()` function.

Co-Authored-By: Jens Schneider <schneider@23technologies.cloud>
Co-Authored-By: mreiger <michael@rauschpfeife.net>

* [provider-local] Drop `Migrate` overwrite in `Worker` controller

Earlier, the `Worker` controller of `provider-local` has overwritten the `Migrate` function which caused the essential migration logic in the generic `Worker` actuator to not get executed. This limited the accuracy of the CPM e2e test since the essential logic was not executed.
Similarly, `Restore` was not executing the essential logic of the generic `Worker` actuator. Now it is calling the the `RestoreWithoutReconcile()` and `Reconcile()` afterwards.

In the next commit, it will perform some custom logic to allow making use of the essential CPM logic in generic `Worker` actuator.

Co-Authored-By: Jens Schneider <schneider@23technologies.cloud>
Co-Authored-By: mreiger <michael@rauschpfeife.net>

* [provider-local] Delete stale `Machine`s after CPM

Co-Authored-By: Jens Schneider <schneider@23technologies.cloud>
Co-Authored-By: mreiger <michael@rauschpfeife.net>

* Address PR review feedback

---------

Co-authored-by: Jens Schneider <schneider@23technologies.cloud>
Co-authored-by: mreiger <michael@rauschpfeife.net>
  • Loading branch information
3 people authored May 26, 2023
1 parent 0ce7c25 commit 64b1b85
Show file tree
Hide file tree
Showing 6 changed files with 127 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func (a *genericActuator) Delete(ctx context.Context, log logr.Logger, worker *e

// Mark all existing machines to become forcefully deleted.
log.Info("Marking all machines to become forcefully deleted")
if err := a.markAllMachinesForcefulDeletion(ctx, log, worker.Namespace); err != nil {
if err := markAllMachinesForcefulDeletion(ctx, log, a.client, worker.Namespace); err != nil {
return fmt.Errorf("marking all machines for forceful deletion failed: %w", err)
}

Expand Down Expand Up @@ -124,19 +124,19 @@ func (a *genericActuator) Delete(ctx context.Context, log logr.Logger, worker *e
}

// Mark all existing machines to become forcefully deleted.
func (a *genericActuator) markAllMachinesForcefulDeletion(ctx context.Context, log logr.Logger, namespace string) error {
func markAllMachinesForcefulDeletion(ctx context.Context, log logr.Logger, cl client.Client, namespace string) error {
log.Info("Marking all machines for forceful deletion")
// Mark all existing machines to become forcefully deleted.
existingMachines := &machinev1alpha1.MachineList{}
if err := a.client.List(ctx, existingMachines, client.InNamespace(namespace)); err != nil {
if err := cl.List(ctx, existingMachines, client.InNamespace(namespace)); err != nil {
return err
}

var tasks []flow.TaskFn
for _, machine := range existingMachines.Items {
m := machine
tasks = append(tasks, func(ctx context.Context) error {
return a.markMachineForcefulDeletion(ctx, &m)
return markMachineForcefulDeletion(ctx, cl, &m)
})
}

Expand All @@ -148,7 +148,7 @@ func (a *genericActuator) markAllMachinesForcefulDeletion(ctx context.Context, l
}

// markMachineForcefulDeletion labels a machine object to become forcefully deleted.
func (a *genericActuator) markMachineForcefulDeletion(ctx context.Context, machine *machinev1alpha1.Machine) error {
func markMachineForcefulDeletion(ctx context.Context, cl client.Client, machine *machinev1alpha1.Machine) error {
if machine.Labels == nil {
machine.Labels = map[string]string{}
}
Expand All @@ -158,7 +158,7 @@ func (a *genericActuator) markMachineForcefulDeletion(ctx context.Context, machi
}

machine.Labels[forceDeletionLabelKey] = forceDeletionLabelValue
return a.client.Update(ctx, machine)
return cl.Update(ctx, machine)
}

// waitUntilMachineResourcesDeleted waits for a maximum of 30 minutes until all machine resources have been properly
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func (a *genericActuator) Reconcile(ctx context.Context, log logr.Logger, worker
}

// Generate machine deployment configuration based on previously computed list of deployments and deploy them.
if err := a.deployMachineDeployments(ctx, log, cluster, worker, existingMachineDeployments, wantedMachineDeployments, workerDelegate.MachineClassKind(), clusterAutoscalerUsed); err != nil {
if err := deployMachineDeployments(ctx, log, a.client, cluster, worker, existingMachineDeployments, wantedMachineDeployments, workerDelegate.MachineClassKind(), clusterAutoscalerUsed); err != nil {
return fmt.Errorf("failed to generate the machine deployment config: %w", err)
}

Expand Down Expand Up @@ -193,7 +193,7 @@ func (a *genericActuator) Reconcile(ctx context.Context, log logr.Logger, worker

// Scale down machine-controller-manager if shoot is hibernated.
if isHibernationEnabled {
if err := a.scaleMachineControllerManager(ctx, log, worker, 0); err != nil {
if err := scaleMachineControllerManager(ctx, log, a.client, worker, 0); err != nil {
return err
}
}
Expand All @@ -217,7 +217,17 @@ func (a *genericActuator) scaleClusterAutoscaler(ctx context.Context, log logr.L
return client.IgnoreNotFound(kubernetes.ScaleDeployment(ctx, a.client, kubernetesutils.Key(worker.Namespace, v1beta1constants.DeploymentNameClusterAutoscaler), replicas))
}

func (a *genericActuator) deployMachineDeployments(ctx context.Context, log logr.Logger, cluster *extensionscontroller.Cluster, worker *extensionsv1alpha1.Worker, existingMachineDeployments *machinev1alpha1.MachineDeploymentList, wantedMachineDeployments extensionsworkercontroller.MachineDeployments, classKind string, clusterAutoscalerUsed bool) error {
func deployMachineDeployments(
ctx context.Context,
log logr.Logger,
cl client.Client,
cluster *extensionscontroller.Cluster,
worker *extensionsv1alpha1.Worker,
existingMachineDeployments *machinev1alpha1.MachineDeploymentList,
wantedMachineDeployments extensionsworkercontroller.MachineDeployments,
classKind string,
clusterAutoscalerUsed bool,
) error {
log.Info("Deploying machine deployments")
for _, deployment := range wantedMachineDeployments {
var (
Expand All @@ -231,7 +241,7 @@ func (a *genericActuator) deployMachineDeployments(ctx context.Context, log logr
// Also mark all machines for forceful deletion to avoid respecting of PDBs/SLAs in case of cluster hibernation.
case controller.IsHibernationEnabled(cluster):
replicas = 0
if err := a.markAllMachinesForcefulDeletion(ctx, log, worker.Namespace); err != nil {
if err := markAllMachinesForcefulDeletion(ctx, log, cl, worker.Namespace); err != nil {
return fmt.Errorf("marking all machines for forceful deletion failed: %w", err)
}
// If the cluster autoscaler is not enabled then min=max (as per API validation), hence
Expand Down Expand Up @@ -277,7 +287,7 @@ func (a *genericActuator) deployMachineDeployments(ctx context.Context, log logr
},
}

if _, err := controllerutils.GetAndCreateOrMergePatch(ctx, a.client, machineDeployment, func() error {
if _, err := controllerutils.GetAndCreateOrMergePatch(ctx, cl, machineDeployment, func() error {
machineDeployment.Spec = machinev1alpha1.MachineDeploymentSpec{
Replicas: replicas,
MinReadySeconds: 500,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,18 @@ import (
kubernetesutils "github.com/gardener/gardener/pkg/utils/kubernetes"
)

// Restore uses the Worker's spec to figure out the wanted MachineDeployments. Then it parses the Worker's state.
// If there is a record in the state corresponding to a wanted deployment then the Restore function
// deploys that MachineDeployment with all related MachineSet and Machines.
func (a *genericActuator) Restore(ctx context.Context, log logr.Logger, worker *extensionsv1alpha1.Worker, cluster *extensionscontroller.Cluster) error {
// RestoreWithoutReconcile restores the worker state without calling 'Reconcile'.
func RestoreWithoutReconcile(
ctx context.Context,
log logr.Logger,
cl client.Client,
delegateFactory DelegateFactory,
worker *extensionsv1alpha1.Worker,
cluster *extensionscontroller.Cluster,
) error {
log = log.WithValues("operation", "restore")

workerDelegate, err := a.delegateFactory.WorkerDelegate(ctx, worker, cluster)
workerDelegate, err := delegateFactory.WorkerDelegate(ctx, worker, cluster)
if err != nil {
return fmt.Errorf("could not instantiate actuator context: %w", err)
}
Expand All @@ -51,49 +56,57 @@ func (a *genericActuator) Restore(ctx context.Context, log logr.Logger, worker *

// Get the list of all existing machine deployments.
existingMachineDeployments := &machinev1alpha1.MachineDeploymentList{}
if err := a.client.List(ctx, existingMachineDeployments, client.InNamespace(worker.Namespace)); err != nil {
if err := cl.List(ctx, existingMachineDeployments, client.InNamespace(worker.Namespace)); err != nil {
return err
}

// Parse the worker state to a separate machineDeployment states and attach them to
// the corresponding machineDeployments which are to be deployed later
log.Info("Extracting state from worker status")
if err := a.addStateToMachineDeployment(worker, wantedMachineDeployments); err != nil {
if err := addStateToMachineDeployment(worker, wantedMachineDeployments); err != nil {
return err
}

wantedMachineDeployments = removeWantedDeploymentWithoutState(wantedMachineDeployments)

// Scale the machine-controller-manager to 0. During restoration MCM must not be working
if err := a.scaleMachineControllerManager(ctx, log, worker, 0); err != nil {
return fmt.Errorf("failed scale down machine-controller-manager: %w", err)
if err := scaleMachineControllerManager(ctx, log, cl, worker, 0); err != nil {
return fmt.Errorf("failed to scale down machine-controller-manager: %w", err)
}

// Deploy generated machine classes.
if err := workerDelegate.DeployMachineClasses(ctx); err != nil {
return fmt.Errorf("failed to deploy the machine classes: %w", err)
}

if err := kubernetes.WaitUntilDeploymentScaledToDesiredReplicas(ctx, a.client, kubernetesutils.Key(worker.Namespace, McmDeploymentName), 0); err != nil && !apierrors.IsNotFound(err) {
if err := kubernetes.WaitUntilDeploymentScaledToDesiredReplicas(ctx, cl, kubernetesutils.Key(worker.Namespace, McmDeploymentName), 0); err != nil && !apierrors.IsNotFound(err) {
return fmt.Errorf("deadline exceeded while scaling down machine-controller-manager: %w", err)
}

// Do the actual restoration
if err := a.restoreMachineSetsAndMachines(ctx, log, wantedMachineDeployments); err != nil {
if err := restoreMachineSetsAndMachines(ctx, log, cl, wantedMachineDeployments); err != nil {
return fmt.Errorf("failed restoration of the machineSet and the machines: %w", err)
}

// Generate machine deployment configuration based on previously computed list of deployments and deploy them.
if err := a.deployMachineDeployments(ctx, log, cluster, worker, existingMachineDeployments, wantedMachineDeployments, workerDelegate.MachineClassKind(), true); err != nil {
if err := deployMachineDeployments(ctx, log, cl, cluster, worker, existingMachineDeployments, wantedMachineDeployments, workerDelegate.MachineClassKind(), true); err != nil {
return fmt.Errorf("failed to restore the machine deployment config: %w", err)
}

// Finally reconcile the worker so that the machine-controller-manager gets scaled up and OwnerReferences between
// machinedeployments, machinesets and machines are added properly.
return nil
}

// Restore uses the Worker's spec to figure out the wanted MachineDeployments. Then it parses the Worker's state.
// If there is a record in the state corresponding to a wanted deployment then the Restore function
// deploys that MachineDeployment with all related MachineSet and Machines. It finally calls the 'Reconcile' function.
func (a *genericActuator) Restore(ctx context.Context, log logr.Logger, worker *extensionsv1alpha1.Worker, cluster *extensionscontroller.Cluster) error {
if err := RestoreWithoutReconcile(ctx, log, a.client, a.delegateFactory, worker, cluster); err != nil {
return err
}
return a.Reconcile(ctx, log, worker, cluster)
}

func (a *genericActuator) addStateToMachineDeployment(worker *extensionsv1alpha1.Worker, wantedMachineDeployments extensionsworkercontroller.MachineDeployments) error {
func addStateToMachineDeployment(worker *extensionsv1alpha1.Worker, wantedMachineDeployments extensionsworkercontroller.MachineDeployments) error {
if worker.Status.State == nil || len(worker.Status.State.Raw) <= 0 {
return nil
}
Expand All @@ -115,17 +128,17 @@ func (a *genericActuator) addStateToMachineDeployment(worker *extensionsv1alpha1
return nil
}

func (a *genericActuator) restoreMachineSetsAndMachines(ctx context.Context, log logr.Logger, wantedMachineDeployments extensionsworkercontroller.MachineDeployments) error {
func restoreMachineSetsAndMachines(ctx context.Context, log logr.Logger, cl client.Client, wantedMachineDeployments extensionsworkercontroller.MachineDeployments) error {
log.Info("Deploying Machines and MachineSets")
for _, wantedMachineDeployment := range wantedMachineDeployments {
for _, machineSet := range wantedMachineDeployment.State.MachineSets {
if err := a.client.Create(ctx, &machineSet); client.IgnoreAlreadyExists(err) != nil {
if err := cl.Create(ctx, &machineSet); client.IgnoreAlreadyExists(err) != nil {
return err
}
}

for _, machine := range wantedMachineDeployment.State.Machines {
if err := a.client.Create(ctx, &machine); err != nil {
if err := cl.Create(ctx, &machine); err != nil {
if !apierrors.IsAlreadyExists(err) {
return err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,15 +299,15 @@ var _ = Describe("Actuator", func() {
mockClient.EXPECT().Create(ctx, &expectedMachine1)
mockClient.EXPECT().Create(ctx, &expectedMachine2)

Expect(a.restoreMachineSetsAndMachines(ctx, logger, machineDeployments)).To(Succeed())
Expect(restoreMachineSetsAndMachines(ctx, logger, a.client, machineDeployments)).To(Succeed())
})

It("should not return error if machineset and machines already exist", func() {
mockClient.EXPECT().Create(ctx, &expectedMachineSet).Return(alreadyExistsError)
mockClient.EXPECT().Create(ctx, &expectedMachine1).Return(alreadyExistsError)
mockClient.EXPECT().Create(ctx, &expectedMachine2).Return(alreadyExistsError)

Expect(a.restoreMachineSetsAndMachines(ctx, logger, machineDeployments)).To(Succeed())
Expect(restoreMachineSetsAndMachines(ctx, logger, a.client, machineDeployments)).To(Succeed())
})
})
})
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,9 @@ func (a *genericActuator) waitUntilMachineControllerManagerIsDeleted(ctx context
}, ctx.Done())
}

func (a *genericActuator) scaleMachineControllerManager(ctx context.Context, logger logr.Logger, worker *extensionsv1alpha1.Worker, replicas int32) error {
func scaleMachineControllerManager(ctx context.Context, logger logr.Logger, cl client.Client, worker *extensionsv1alpha1.Worker, replicas int32) error {
logger.Info("Scaling machine-controller-manager", "replicas", replicas)
return client.IgnoreNotFound(kubernetes.ScaleDeployment(ctx, a.client, kubernetesutils.Key(worker.Namespace, McmDeploymentName), replicas))
return client.IgnoreNotFound(kubernetes.ScaleDeployment(ctx, cl, kubernetesutils.Key(worker.Namespace, McmDeploymentName), replicas))
}

func (a *genericActuator) applyMachineControllerManagerShootChart(ctx context.Context, workerDelegate WorkerDelegate, workerObj *extensionsv1alpha1.Worker, cluster *controller.Cluster) error {
Expand Down
84 changes: 71 additions & 13 deletions pkg/provider-local/controller/worker/actuator.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,18 @@ package worker

import (
"context"
"fmt"
"strings"

machinev1alpha1 "github.com/gardener/machine-controller-manager/pkg/apis/machine/v1alpha1"
"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/runtime/inject"

extensionsconfig "github.com/gardener/gardener/extensions/pkg/apis/config"
extensionscontroller "github.com/gardener/gardener/extensions/pkg/controller"
"github.com/gardener/gardener/extensions/pkg/controller/common"
"github.com/gardener/gardener/extensions/pkg/controller/worker"
Expand All @@ -40,42 +47,93 @@ type delegateFactory struct {

type actuator struct {
worker.Actuator
workerDelegate *delegateFactory
}

// NewActuator creates a new Actuator that updates the status of the handled WorkerPoolConfigs.
func NewActuator() worker.Actuator {
delegateFactory := &delegateFactory{}
workerDelegate := &delegateFactory{}

return &actuator{
genericactuator.NewActuator(
delegateFactory,
Actuator: genericactuator.NewActuator(
workerDelegate,
local.MachineControllerManagerName,
mcmChart,
mcmShootChart,
imagevector.ImageVector(),
extensionscontroller.ChartRendererFactoryFunc(util.NewChartRendererForShoot),
),
workerDelegate: workerDelegate,
}
}

func (a *actuator) InjectFunc(f inject.Func) error {
return f(a.Actuator)
}

func (a *actuator) Migrate(ctx context.Context, log logr.Logger, worker *extensionsv1alpha1.Worker, cluster *extensionscontroller.Cluster) error {
// Migrate must call Delete so that the `Node` object registered in the shoot is also deleted. This is necessary because the shoot's
// worker nodes are Pods which reside in the shoot's control plane namespace. This namespace will be deleted at the end of the migrate
// step together with all objects inside of it. During the restore phase a new `Machine` object and therefore a new `Node` will be created.
// If the old `Node` object was not previously deleted, the shoot will have two nodes registered, but only one of them will be backed by an
// actual Pod inside the shoot's control plane.
return a.Actuator.Delete(ctx, log, worker, cluster)
}

func (a *actuator) Restore(ctx context.Context, log logr.Logger, worker *extensionsv1alpha1.Worker, cluster *extensionscontroller.Cluster) error {
// Restore must call Reconcile because the worker nodes of the shoot cluster are deleted during the create phase. Therefore they must be recreated.
if err := genericactuator.RestoreWithoutReconcile(ctx, log, a.workerDelegate.Client(), a.workerDelegate, worker, cluster); err != nil {
return fmt.Errorf("failed restoring the worker state: %w", err)
}

// At this point, the generic actuator has restored all Machine objects into the shoot namespace of the new
// destination seed. However, in the local scenario, the shoot worker nodes are not really external machines but
// "internal" pods running next to the control plane in the seed.
// Since the pods cannot be migrated from the source seed to the destination seed, the shoot worker node pods cannot
// be restored. Instead, they have to be recreated.
// In order to trigger this recreation, we are deleting all (restored) machines which are no longer backed by any
// pods now. We also delete the corresponding Node objects in the shoot. The MCM's MachineSet controller will
// automatically recreate new Machines now, which in fact will result in new pods and nodes.
// In summary, we are still not simulating the very same CPM scenario as for real clouds (here, the nodes/VMs are
// external and remain during the migration), but this is as good as we can get for the local scenario.
if err := a.deleteNoLongerNeededMachines(ctx, log, worker.Namespace); err != nil {
return fmt.Errorf("failed deleting no longer existing machines after restoration: %w", err)
}

return a.Actuator.Reconcile(ctx, log, worker, cluster)
}

func (a *actuator) deleteNoLongerNeededMachines(ctx context.Context, log logr.Logger, namespace string) error {
_, shootClient, err := util.NewClientForShoot(ctx, a.workerDelegate.Client(), namespace, client.Options{}, extensionsconfig.RESTOptions{})
if err != nil {
return fmt.Errorf("failed creating client for shoot cluster: %w", err)
}

machineList := &machinev1alpha1.MachineList{}
if err := a.workerDelegate.Client().List(ctx, machineList, client.InNamespace(namespace)); err != nil {
return fmt.Errorf("failed listing machines: %w", err)
}

podList := &corev1.PodList{}
if err := a.workerDelegate.Client().List(ctx, podList, client.InNamespace(namespace), client.MatchingLabels{"app": "machine"}); err != nil {
return fmt.Errorf("failed listing pods: %w", err)
}

machineNameToPodName := make(map[string]string)
for _, pod := range podList.Items {
machineNameToPodName[strings.TrimPrefix(pod.Name, "machine-")] = pod.Name
}

for _, machine := range machineList.Items {
if _, ok := machineNameToPodName[machine.Name]; ok {
continue
}

log.Info("Deleting machine since it is not backed by any pod", "machine", client.ObjectKeyFromObject(machine.DeepCopy()))

nodeName := "machine-" + machine.Name
if err := shootClient.Delete(ctx, &corev1.Node{ObjectMeta: metav1.ObjectMeta{Name: nodeName}}); client.IgnoreNotFound(err) != nil {
return fmt.Errorf("failed deleting node %q for machine %q: %w", nodeName, machine.Name, err)
}

if err := a.workerDelegate.Client().Delete(ctx, machine.DeepCopy()); err != nil {
return fmt.Errorf("failed deleting machine %q: %w", machine.Name, err)
}
}

return nil
}

func (d *delegateFactory) WorkerDelegate(_ context.Context, worker *extensionsv1alpha1.Worker, cluster *extensionscontroller.Cluster) (genericactuator.WorkerDelegate, error) {
clientset, err := kubernetes.NewForConfig(d.RESTConfig())
if err != nil {
Expand Down

0 comments on commit 64b1b85

Please sign in to comment.