Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions pkg/util/provider/machinecontroller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ func NewController(
nodeQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "node"),
machineClassQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "machineclass"),
machineQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "machine"),
machineTerminationQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "machinetermination"),
machineSafetyOrphanVMsQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "machinesafetyorphanvms"),
machineSafetyAPIServerQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "machinesafetyapiserver"),
safetyOptions: safetyOptions,
Expand Down Expand Up @@ -253,6 +254,7 @@ type controller struct {
nodeQueue workqueue.RateLimitingInterface
machineClassQueue workqueue.RateLimitingInterface
machineQueue workqueue.RateLimitingInterface
machineTerminationQueue workqueue.RateLimitingInterface
machineSafetyOrphanVMsQueue workqueue.RateLimitingInterface
machineSafetyAPIServerQueue workqueue.RateLimitingInterface
// syncs
Expand All @@ -279,6 +281,7 @@ func (c *controller) Run(workers int, stopCh <-chan struct{}) {
defer c.secretQueue.ShutDown()
defer c.machineClassQueue.ShutDown()
defer c.machineQueue.ShutDown()
defer c.machineTerminationQueue.ShutDown()
defer c.machineSafetyOrphanVMsQueue.ShutDown()
defer c.machineSafetyAPIServerQueue.ShutDown()

Expand Down Expand Up @@ -307,6 +310,7 @@ func (c *controller) Run(workers int, stopCh <-chan struct{}) {
worker.Run(c.machineClassQueue, "ClusterMachineClass", worker.DefaultMaxRetries, true, c.reconcileClusterMachineClassKey, stopCh, &waitGroup)
worker.Run(c.nodeQueue, "ClusterNode", worker.DefaultMaxRetries, true, c.reconcileClusterNodeKey, stopCh, &waitGroup)
worker.Run(c.machineQueue, "ClusterMachine", worker.DefaultMaxRetries, true, c.reconcileClusterMachineKey, stopCh, &waitGroup)
worker.Run(c.machineTerminationQueue, "ClusterMachineTermination", worker.DefaultMaxRetries, true, c.reconcileClusterMachineTermination, stopCh, &waitGroup)
worker.Run(c.machineSafetyOrphanVMsQueue, "ClusterMachineSafetyOrphanVMs", worker.DefaultMaxRetries, true, c.reconcileClusterMachineSafetyOrphanVMs, stopCh, &waitGroup)
worker.Run(c.machineSafetyAPIServerQueue, "ClusterMachineAPIServer", worker.DefaultMaxRetries, true, c.reconcileClusterMachineSafetyAPIServer, stopCh, &waitGroup)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -605,6 +605,7 @@ func createController(
secretQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "secret"),
nodeQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "node"),
machineQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "machine"),
machineTerminationQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "machinetermination"),
machineSafetyOrphanVMsQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "machinesafetyorphanvms"),
machineSafetyAPIServerQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "machinesafetyapiserver"),
recorder: record.NewBroadcaster().NewRecorder(nil, corev1.EventSource{Component: ""}),
Expand Down
171 changes: 112 additions & 59 deletions pkg/util/provider/machinecontroller/machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/selection"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
Expand All @@ -35,7 +36,19 @@ SECTION
Machine controller - Machine add, update, delete watches
*/
func (c *controller) addMachine(obj interface{}) {
c.enqueueMachine(obj, "handling machine obj ADD event")
machine, ok := obj.(*v1alpha1.Machine)
if !ok {
klog.Errorf("couldn't convert to machine resource from object")
return
}
// On restart of the controller process, a machine that was marked for
// deletion would be processed as part of an `add` event. This check
// ensures that its enqueued in the correct queue.
if machine.DeletionTimestamp != nil {
c.enqueueMachineTermination(machine, "handling terminating machine object ADD event")
} else {
c.enqueueMachine(obj, "handling machine obj ADD event")
}
}

func (c *controller) updateMachine(oldObj, newObj interface{}) {
Expand All @@ -52,11 +65,27 @@ func (c *controller) updateMachine(oldObj, newObj interface{}) {
return
}

c.enqueueMachine(newObj, "handling machine object UPDATE event")
if newMachine.DeletionTimestamp != nil {
c.enqueueMachineTermination(newMachine, "handling terminating machine object UPDATE event")
} else {
c.enqueueMachine(newObj, "handling machine object UPDATE event")
}
}

func (c *controller) deleteMachine(obj interface{}) {
c.enqueueMachine(obj, "handling machine object DELETE event")
machine, ok := obj.(*v1alpha1.Machine)
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
return
}
machine, ok = tombstone.Obj.(*v1alpha1.Machine)
if !ok {
utilruntime.HandleError(fmt.Errorf("Tombstone contained object that is not a Machine Deployment %#v", obj))
return
}
}
c.enqueueMachineTermination(machine, "handling terminating machine object DELETE event")
}

// getKeyForObj returns key for object, else returns false
Expand All @@ -83,6 +112,27 @@ func (c *controller) enqueueMachineAfter(obj interface{}, after time.Duration, r
}
}

func (c *controller) enqueueMachineTermination(machine *v1alpha1.Machine, reason string) {

if key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(machine); err != nil {
utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %#v: %v", machine, err))
return
} else {
klog.V(3).Infof("Adding machine object to termination queue %q, reason: %s", key, reason)
c.machineTerminationQueue.Add(key)
}
}

func (c *controller) enqueueMachineTerminationAfter(machine *v1alpha1.Machine, after time.Duration, reason string) {
if key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(machine); err != nil {
utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %#v: %v", machine, err))
return
} else {
klog.V(3).Infof("Adding machine object to termination queue %q after %s, reason: %s", key, after, reason)
c.machineTerminationQueue.AddAfter(key, after)
}
}

func (c *controller) reconcileClusterMachineKey(key string) error {
ctx := context.Background()

Expand All @@ -101,12 +151,18 @@ func (c *controller) reconcileClusterMachineKey(key string) error {
return err
}

if machine.DeletionTimestamp != nil {
klog.Errorf("Machine %q should be in machine termination queue", machine.Name)
return nil
}

retryPeriod, err := c.reconcileClusterMachine(ctx, machine)

var reEnqueReason = "periodic reconcile"
if err != nil {
reEnqueReason = err.Error()
}

c.enqueueMachineAfter(machine, time.Duration(retryPeriod), reEnqueReason)

return nil
Expand Down Expand Up @@ -144,18 +200,6 @@ func (c *controller) reconcileClusterMachine(ctx context.Context, machine *v1alp
return retry, err
}

if machine.DeletionTimestamp != nil {
// Process a delete event
return c.triggerDeletionFlow(
ctx,
&driver.DeleteMachineRequest{
Machine: machine,
MachineClass: machineClass,
Secret: &corev1.Secret{Data: secretData},
},
)
}

// Add finalizers if not present on machine object
retry, err = c.addMachineFinalizers(ctx, machine)
if err != nil {
Expand Down Expand Up @@ -193,6 +237,58 @@ func (c *controller) reconcileClusterMachine(ctx context.Context, machine *v1alp
return machineutils.LongRetry, nil
}

func (c *controller) reconcileClusterMachineTermination(key string) error {
ctx := context.Background()

_, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return err
}

machine, err := c.machineLister.Machines(c.namespace).Get(name)
if apierrors.IsNotFound(err) {
klog.V(4).Infof("Machine %q: Not doing work because it is not found", key)
return nil
}
if err != nil {
klog.Errorf("Machine %q: Unable to retrieve object from store: %v", key, err)
return err
}

klog.V(2).Infof("reconcileClusterMachineTermination: Start for %q with phase:%q, description:%q",
machine.Name, machine.Status.CurrentStatus.Phase, machine.Status.LastOperation.Description)
defer klog.V(2).Infof("reconcileClusterMachineTermination: Stop for %q", machine.Name)

machineClass, secretData, retry, err := c.ValidateMachineClass(ctx, &machine.Spec.Class)
if err != nil {
klog.Errorf("cannot reconcile machine %q: %s", machine.Name, err)
c.enqueueMachineTerminationAfter(machine, time.Duration(retry), err.Error())
return err
}

// Process a delete event
retryPeriod, err := c.triggerDeletionFlow(
ctx,
&driver.DeleteMachineRequest{
Machine: machine,
MachineClass: machineClass,
Secret: &corev1.Secret{Data: secretData},
},
)

if err != nil {
c.enqueueMachineTerminationAfter(machine, time.Duration(retryPeriod), err.Error())
return err
} else {
// If the informer loses connection to the API server it may need to resync.
// If a resource is deleted while the watch is down, the informer won’t get
// delete event because the object is already gone. To avoid this edge-case,
// a requeue is scheduled post machine deletion as well.
c.enqueueMachineTerminationAfter(machine, time.Duration(retryPeriod), "post-deletion reconcile")
return nil
}
}

/*
SECTION
Machine controller - nodeToMachine
Expand Down Expand Up @@ -465,30 +561,6 @@ func (c *controller) triggerCreationFlow(ctx context.Context, createMachineReque
nodeName = machine.Labels[v1alpha1.NodeLabelKey]
}

case codes.Unknown, codes.DeadlineExceeded, codes.Aborted, codes.Unavailable:
// GetMachineStatus() returned with one of the above error codes.
// Retry operation.
updateRetryPeriod, updateErr := c.machineStatusUpdate(
ctx,
machine,
v1alpha1.LastOperation{
Description: "Cloud provider message - " + err.Error(),
State: v1alpha1.MachineStateFailed,
Type: v1alpha1.MachineOperationCreate,
LastUpdateTime: metav1.Now(),
},
v1alpha1.CurrentStatus{
Phase: c.getCreateFailurePhase(machine),
LastUpdateTime: metav1.Now(),
},
machine.Status.LastKnownState,
)
if updateErr != nil {
return updateRetryPeriod, updateErr
}

return machineutils.ShortRetry, err

case codes.Uninitialized:
uninitializedMachine = true
klog.Infof("VM instance associated with machine %s was created but not initialized.", machine.Name)
Expand All @@ -498,26 +570,7 @@ func (c *controller) triggerCreationFlow(ctx context.Context, createMachineReque
providerID = getMachineStatusResponse.ProviderID

default:
updateRetryPeriod, updateErr := c.machineStatusUpdate(
ctx,
machine,
v1alpha1.LastOperation{
Description: "Cloud provider message - " + err.Error(),
State: v1alpha1.MachineStateFailed,
Type: v1alpha1.MachineOperationCreate,
LastUpdateTime: metav1.Now(),
},
v1alpha1.CurrentStatus{
Phase: c.getCreateFailurePhase(machine),
LastUpdateTime: metav1.Now(),
},
machine.Status.LastKnownState,
)
if updateErr != nil {
return updateRetryPeriod, updateErr
}

return machineutils.MediumRetry, err
return c.machineCreateErrorHandler(ctx, machine, nil, err)
}
} else {
if machine.Labels[v1alpha1.NodeLabelKey] == "" || machine.Spec.ProviderID == "" {
Expand Down
4 changes: 3 additions & 1 deletion pkg/util/provider/machinecontroller/machine_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,8 @@ func SyncMachineTaints(
return toBeUpdated
}

// machineCreateErrorHandler TODO
// machineCreateErrorHandler updates the machine status based on
// CreateMachineResponse and the error during the machine creation
func (c *controller) machineCreateErrorHandler(ctx context.Context, machine *v1alpha1.Machine, createMachineResponse *driver.CreateMachineResponse, err error) (machineutils.RetryPeriod, error) {
var (
retryRequired = machineutils.MediumRetry
Expand All @@ -509,6 +510,7 @@ func (c *controller) machineCreateErrorHandler(ctx context.Context, machine *v1a
switch machineErr.Code() {
case codes.Unknown, codes.DeadlineExceeded, codes.Aborted, codes.Unavailable:
retryRequired = machineutils.ShortRetry
lastKnownState = machine.Status.LastKnownState
}
}

Expand Down