Skip to content

Commit

Permalink
Automatically evict replicas while draining (#2138)
Browse files Browse the repository at this point in the history
Co-authored-by: David Ko <dko@suse.com>
  • Loading branch information
ejweber and innobead authored Nov 23, 2023
1 parent b8abca1 commit dd66388
Show file tree
Hide file tree
Showing 19 changed files with 279 additions and 181 deletions.
2 changes: 2 additions & 0 deletions api/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,7 @@ type Node struct {
Region string `json:"region"`
Zone string `json:"zone"`
InstanceManagerCPURequest int `json:"instanceManagerCPURequest"`
AutoEvicting bool `json:"autoEvicting"`
}

type DiskStatus struct {
Expand Down Expand Up @@ -1871,6 +1872,7 @@ func toNodeResource(node *longhorn.Node, address string, apiContext *api.ApiCont
Region: node.Status.Region,
Zone: node.Status.Zone,
InstanceManagerCPURequest: node.Spec.InstanceManagerCPURequest,
AutoEvicting: node.Status.AutoEvicting,
}

disks := map[string]DiskInfo{}
Expand Down
2 changes: 2 additions & 0 deletions client/generated_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ type Node struct {

AllowScheduling bool `json:"allowScheduling,omitempty" yaml:"allow_scheduling,omitempty"`

AutoEvicting bool `json:"autoEvicting,omitempty" yaml:"auto_evicting,omitempty"`

Conditions map[string]interface{} `json:"conditions,omitempty" yaml:"conditions,omitempty"`

Disks map[string]interface{} `json:"disks,omitempty" yaml:"disks,omitempty"`
Expand Down
20 changes: 12 additions & 8 deletions constant/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,18 @@ const (
EventReasonSucceededExpansion = "SucceededExpansion"
EventReasonCanceledExpansion = "CanceledExpansion"

EventReasonAttached = "Attached"
EventReasonDetached = "Detached"
EventReasonHealthy = "Healthy"
EventReasonFaulted = "Faulted"
EventReasonDegraded = "Degraded"
EventReasonOrphaned = "Orphaned"
EventReasonUnknown = "Unknown"
EventReasonFailedEviction = "FailedEviction"
EventReasonAttached = "Attached"
EventReasonDetached = "Detached"
EventReasonHealthy = "Healthy"
EventReasonFaulted = "Faulted"
EventReasonDegraded = "Degraded"
EventReasonOrphaned = "Orphaned"
EventReasonUnknown = "Unknown"

EventReasonEvictionAutomatic = "EvictionAutomatic"
EventReasonEvictionUserRequested = "EvictionUserRequested"
EventReasonEvictionCanceled = "EvictionCanceled"
EventReasonEvictionFailed = "EvictionFailed"

EventReasonDetachedUnexpectedly = "DetachedUnexpectedly"
EventReasonRemount = "Remount"
Expand Down
1 change: 1 addition & 0 deletions controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,7 @@ func newNode(name, namespace string, allowScheduling bool, status longhorn.Condi
StorageReserved: 0,
},
},
Name: name,
},
Status: longhorn.NodeStatus{
Conditions: []longhorn.Condition{
Expand Down
106 changes: 25 additions & 81 deletions controller/instance_manager_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -566,7 +566,7 @@ func (imc *InstanceManagerController) syncInstanceManagerPDB(im *longhorn.Instan
return err
}

imPDB, err := imc.ds.GetPDBRO(imc.getPDBName(im))
imPDB, err := imc.ds.GetPDBRO(types.GetPDBName(im))
if err != nil && !datastore.ErrorIsNotFound(err) {
return err
}
Expand Down Expand Up @@ -666,7 +666,7 @@ func (imc *InstanceManagerController) cleanUpPDBForNonExistingIM() error {
if labelValue != types.LonghornLabelInstanceManager {
continue
}
if _, ok := ims[getIMNameFromPDBName(pdbName)]; ok {
if _, ok := ims[types.GetIMNameFromPDBName(pdbName)]; ok {
continue
}
if err := imc.ds.DeletePDB(pdbName); err != nil {
Expand All @@ -680,7 +680,7 @@ func (imc *InstanceManagerController) cleanUpPDBForNonExistingIM() error {
}

func (imc *InstanceManagerController) deleteInstanceManagerPDB(im *longhorn.InstanceManager) error {
name := imc.getPDBName(im)
name := types.GetPDBName(im)
imc.logger.Infof("Deleting %v PDB", name)
err := imc.ds.DeletePDB(name)
if err != nil && !datastore.ErrorIsNotFound(err) {
Expand Down Expand Up @@ -732,6 +732,11 @@ func (imc *InstanceManagerController) canDeleteInstanceManagerPDB(im *longhorn.I
return false, err
}

if nodeDrainingPolicy == string(types.NodeDrainPolicyBlockForEviction) && len(replicasOnCurrentNode) > 0 {
// We must wait for ALL replicas to be evicted before removing the PDB.
return false, nil
}

targetReplicas := []*longhorn.Replica{}
if nodeDrainingPolicy == string(types.NodeDrainPolicyAllowIfReplicaIsStopped) {
for _, replica := range replicasOnCurrentNode {
Expand All @@ -743,62 +748,31 @@ func (imc *InstanceManagerController) canDeleteInstanceManagerPDB(im *longhorn.I
targetReplicas = replicasOnCurrentNode
}

// For each replica in the target replica list,
// find out whether there is a PDB protected healthy replica of the same
// volume on another schedulable node.
// For each replica in the target replica list, find out whether there is a PDB protected healthy replica of the
// same volume on another schedulable node.
for _, replica := range targetReplicas {
vol, err := imc.ds.GetVolumeRO(replica.Spec.VolumeName)
if err != nil {
return false, err
}
hasPDBOnAnotherNode := false
isUnusedReplicaOnCurrentNode := false

replicas, err := imc.ds.ListVolumeReplicasRO(vol.Name)
pdbProtectedHealthyReplicas, err := imc.ds.ListVolumePDBProtectedHealthyReplicasRO(replica.Spec.VolumeName)
if err != nil {
return false, err
}

hasPDBOnAnotherNode := false
isUnusedReplicaOnCurrentNode := false
for _, r := range replicas {
hasOtherHealthyReplicas := r.Spec.HealthyAt != "" && r.Spec.FailedAt == "" && r.Spec.NodeID != im.Spec.NodeID
if hasOtherHealthyReplicas {
unschedulable, err := imc.ds.IsKubeNodeUnschedulable(r.Spec.NodeID)
if err != nil {
return false, err
}
if unschedulable {
continue
}

var rIM *longhorn.InstanceManager
rIM, err = imc.getRunningReplicaInstanceManagerRO(r)
if err != nil {
return false, err
}
if rIM == nil {
continue
}

pdb, err := imc.ds.GetPDBRO(imc.getPDBName(rIM))
if err != nil && !datastore.ErrorIsNotFound(err) {
return false, err
}
if pdb != nil {
hasPDBOnAnotherNode = true
break
}
}
// If a replica has never been started, there is no data stored in this replica, and
// retaining it makes no sense for HA.
// Hence Longhorn doesn't need to block the PDB removal for the replica.
// This case typically happens on a newly created volume that hasn't been attached to any node.
// https://github.com/longhorn/longhorn/issues/2673
isUnusedReplicaOnCurrentNode = r.Spec.HealthyAt == "" && r.Spec.FailedAt == "" && r.Spec.NodeID == im.Spec.NodeID
if isUnusedReplicaOnCurrentNode {
for _, pdbProtectedHealthyReplica := range pdbProtectedHealthyReplicas {
if pdbProtectedHealthyReplica.Spec.NodeID != im.Spec.NodeID {
hasPDBOnAnotherNode = true
break
}
}

// If a replica has never been started, there is no data stored in this replica, and retaining it makes no sense
// for HA. Hence Longhorn doesn't need to block the PDB removal for the replica. This case typically happens on
// a newly created volume that hasn't been attached to any node.
// https://github.com/longhorn/longhorn/issues/2673
isUnusedReplicaOnCurrentNode = replica.Spec.HealthyAt == "" &&
replica.Spec.FailedAt == "" &&
replica.Spec.NodeID == im.Spec.NodeID

if !hasPDBOnAnotherNode && !isUnusedReplicaOnCurrentNode {
return false, nil
}
Expand All @@ -807,24 +781,6 @@ func (imc *InstanceManagerController) canDeleteInstanceManagerPDB(im *longhorn.I
return true, nil
}

func (imc *InstanceManagerController) getRunningReplicaInstanceManagerRO(r *longhorn.Replica) (im *longhorn.InstanceManager, err error) {
if r.Status.InstanceManagerName == "" {
im, err = imc.ds.GetInstanceManagerByInstanceRO(r)
if err != nil && !types.ErrorIsNotFound(err) {
return nil, err
}
} else {
im, err = imc.ds.GetInstanceManagerRO(r.Status.InstanceManagerName)
if err != nil && !apierrors.IsNotFound(err) {
return nil, err
}
}
if im == nil || im.Status.CurrentState != longhorn.InstanceManagerStateRunning {
return nil, nil
}
return im, nil
}

func (imc *InstanceManagerController) areAllVolumesDetachedFromNode(nodeName string) (bool, error) {
detached, err := imc.areAllInstanceRemovedFromNodeByType(nodeName, longhorn.InstanceManagerTypeEngine)
if err != nil {
Expand Down Expand Up @@ -874,7 +830,7 @@ func (imc *InstanceManagerController) createInstanceManagerPDB(im *longhorn.Inst
func (imc *InstanceManagerController) generateInstanceManagerPDBManifest(im *longhorn.InstanceManager) *policyv1.PodDisruptionBudget {
return &policyv1.PodDisruptionBudget{
ObjectMeta: metav1.ObjectMeta{
Name: imc.getPDBName(im),
Name: types.GetPDBName(im),
Namespace: imc.namespace,
},
Spec: policyv1.PodDisruptionBudgetSpec{
Expand All @@ -886,18 +842,6 @@ func (imc *InstanceManagerController) generateInstanceManagerPDBManifest(im *lon
}
}

func (imc *InstanceManagerController) getPDBName(im *longhorn.InstanceManager) string {
return getPDBNameFromIMName(im.Name)
}

func getPDBNameFromIMName(imName string) string {
return imName
}

func getIMNameFromPDBName(pdbName string) string {
return pdbName
}

func (imc *InstanceManagerController) enqueueInstanceManager(instanceManager interface{}) {
key, err := controller.KeyFunc(instanceManager)
if err != nil {
Expand Down
121 changes: 120 additions & 1 deletion controller/node_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,8 @@ func (nc *NodeController) isResponsibleForSetting(obj interface{}) bool {

return types.SettingName(setting.Name) == types.SettingNameStorageMinimalAvailablePercentage ||
types.SettingName(setting.Name) == types.SettingNameBackingImageCleanupWaitInterval ||
types.SettingName(setting.Name) == types.SettingNameOrphanAutoDeletion
types.SettingName(setting.Name) == types.SettingNameOrphanAutoDeletion ||
types.SettingName(setting.Name) == types.SettingNameNodeDrainPolicy
}

func (nc *NodeController) isResponsibleForReplica(obj interface{}) bool {
Expand Down Expand Up @@ -552,6 +553,10 @@ func (nc *NodeController) syncNode(key string) (err error) {
return err
}

if err = nc.syncReplicaEvictionRequested(node, kubeNode); err != nil {
return err
}

return nil
}

Expand All @@ -565,6 +570,16 @@ func (nc *NodeController) enqueueNode(obj interface{}) {
nc.queue.Add(key)
}

func (nc *NodeController) enqueueNodeRateLimited(obj interface{}) {
key, err := controller.KeyFunc(obj)
if err != nil {
utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", obj, err))
return
}

nc.queue.AddRateLimited(key)
}

func (nc *NodeController) enqueueSetting(obj interface{}) {
nodes, err := nc.ds.ListNodesRO()
if err != nil {
Expand Down Expand Up @@ -1441,3 +1456,107 @@ func (nc *NodeController) createSnapshotMonitor() (mon monitor.Monitor, err erro

return mon, nil
}

func (nc *NodeController) syncReplicaEvictionRequested(node *longhorn.Node, kubeNode *corev1.Node) error {
log := getLoggerForNode(nc.logger, node)
node.Status.AutoEvicting = false
nodeDrainPolicy, err := nc.ds.GetSettingValueExisted(types.SettingNameNodeDrainPolicy)
if err != nil {
return errors.Wrapf(err, "failed to get %v setting", types.SettingNameNodeDrainPolicy)
}

type replicaToSync struct {
*longhorn.Replica
syncReason string
}
replicasToSync := []replicaToSync{}

for diskName, diskSpec := range node.Spec.Disks {
diskStatus := node.Status.DiskStatus[diskName]
for replicaName := range diskStatus.ScheduledReplica {
replica, err := nc.ds.GetReplica(replicaName)
if err != nil {
return err
}
shouldEvictReplica, reason, err := nc.shouldEvictReplica(node, kubeNode, &diskSpec, replica,
nodeDrainPolicy)
if err != nil {
return err
}
if replica.Spec.EvictionRequested != shouldEvictReplica {
replica.Spec.EvictionRequested = shouldEvictReplica
replicasToSync = append(replicasToSync, replicaToSync{replica, reason})
}

if replica.Spec.EvictionRequested && !node.Spec.EvictionRequested && !diskSpec.EvictionRequested {
// We don't consider the node to be auto evicting if eviction was manually requested.
node.Status.AutoEvicting = true
}
}
}

for _, replicaToSync := range replicasToSync {
replicaLog := log.WithField("replica", replicaToSync.Name).WithField("disk", replicaToSync.Spec.DiskID)
if replicaToSync.Spec.EvictionRequested {
replicaLog.Infof("Requesting replica eviction")
if _, err := nc.ds.UpdateReplica(replicaToSync.Replica); err != nil {
replicaLog.Warn("Failed to request replica eviction, will enqueue then resync node")
nc.enqueueNodeRateLimited(node)
continue
}
nc.eventRecorder.Eventf(replicaToSync.Replica, corev1.EventTypeNormal, replicaToSync.syncReason, "Requesting replica %v eviction from node %v and disk %v", replicaToSync.Name, node.Spec.Name, replicaToSync.Spec.DiskID)
} else {
replicaLog.Infof("Cancelling replica eviction")
if _, err := nc.ds.UpdateReplica(replicaToSync.Replica); err != nil {
replicaLog.Warn("Failed to cancel replica eviction, will enqueue then resync node")
nc.enqueueNodeRateLimited(node)
continue
}
nc.eventRecorder.Eventf(replicaToSync.Replica, corev1.EventTypeNormal, replicaToSync.syncReason, "Cancelling replica %v eviction from node %v and disk %v", replicaToSync.Name, node.Spec.Name, replicaToSync.Spec.DiskID)
}
}

return nil
}

func (nc *NodeController) shouldEvictReplica(node *longhorn.Node, kubeNode *corev1.Node, diskSpec *longhorn.DiskSpec,
replica *longhorn.Replica, nodeDrainPolicy string) (bool, string, error) {
// Replica eviction was cancelled on down or deleted nodes in previous implementations. It seems safest to continue
// this behavior unless we find a reason to change it.
if isDownOrDeleted, err := nc.ds.IsNodeDownOrDeleted(node.Spec.Name); err != nil {
return false, "", err
} else if isDownOrDeleted {
return false, longhorn.NodeConditionReasonKubernetesNodeNotReady, nil
}

if node.Spec.EvictionRequested || diskSpec.EvictionRequested {
return true, constant.EventReasonEvictionUserRequested, nil
}
if !kubeNode.Spec.Unschedulable {
// Node drain policy only takes effect on cordoned nodes.
return false, constant.EventReasonEvictionCanceled, nil
}
if nodeDrainPolicy == string(types.NodeDrainPolicyBlockForEviction) {
return true, constant.EventReasonEvictionAutomatic, nil
}
if nodeDrainPolicy != string(types.NodeDrainPolicyBlockForEvictionIfContainsLastReplica) {
return false, constant.EventReasonEvictionCanceled, nil
}

pdbProtectedHealthyReplicas, err := nc.ds.ListVolumePDBProtectedHealthyReplicasRO(replica.Spec.VolumeName)
if err != nil {
return false, "", err
}
hasPDBOnAnotherNode := false
for _, pdbProtectedHealthyReplica := range pdbProtectedHealthyReplicas {
if pdbProtectedHealthyReplica.Spec.NodeID != replica.Spec.NodeID {
hasPDBOnAnotherNode = true
break
}
}
if !hasPDBOnAnotherNode {
return true, constant.EventReasonEvictionAutomatic, nil
}

return false, constant.EventReasonEvictionCanceled, nil
}
Loading

0 comments on commit dd66388

Please sign in to comment.