Skip to content

Commit

Permalink
fix(recurring-job): avoid intermediate detachment during backup
Browse files Browse the repository at this point in the history
longhorn/longhorn-7937

Signed-off-by: Chin-Ya Huang <chin-ya.huang@suse.com>
  • Loading branch information
c3y1huang committed Feb 22, 2024
1 parent 43013ae commit 08e8564
Show file tree
Hide file tree
Showing 2 changed files with 106 additions and 2 deletions.
107 changes: 105 additions & 2 deletions app/recurring_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"fmt"
"os"
"reflect"
"sort"
"strconv"
"strings"
Expand All @@ -20,6 +21,7 @@ import (
"k8s.io/client-go/tools/record"

corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
clientset "k8s.io/client-go/kubernetes"
typedv1core "k8s.io/client-go/kubernetes/typed/core/v1"
Expand Down Expand Up @@ -48,6 +50,8 @@ const (
)

type Job struct {
name string

logger logrus.FieldLogger
lhClient lhclientset.Interface
namespace string
Expand Down Expand Up @@ -184,15 +188,18 @@ func startVolumeJob(
job, err := newJob(
logger,
managerURL,
jobName,
volumeName,
snapshotName,
jobLabelMap,
jobRetain,
jobTask)
jobTask,
)
if err != nil {
log.WithError(err).Error("Failed to create new job for volume")
return err
}

err = job.run()
if err != nil {
log.WithError(err).Errorf("Failed to run job for volume")
Expand All @@ -213,7 +220,7 @@ func sliceStringSafely(s string, begin, end int) string {
return s[begin:end]
}

func newJob(logger logrus.FieldLogger, managerURL, volumeName, snapshotName string, labels map[string]string, retain int, task longhorn.RecurringJobType) (*Job, error) {
func newJob(logger logrus.FieldLogger, managerURL, jobName, volumeName, snapshotName string, labels map[string]string, retain int, task longhorn.RecurringJobType) (*Job, error) {
namespace := os.Getenv(types.EnvPodNamespace)
if namespace == "" {
return nil, fmt.Errorf("failed detect pod namespace, environment variable %v is missing", types.EnvPodNamespace)
Expand Down Expand Up @@ -257,6 +264,7 @@ func newJob(logger logrus.FieldLogger, managerURL, volumeName, snapshotName stri
}

return &Job{
name: jobName,
logger: logger,
lhClient: lhClient,
namespace: namespace,
Expand Down Expand Up @@ -303,6 +311,15 @@ func (job *Job) run() (err error) {
return fmt.Errorf("volume %v is in an invalid state for recurring job: %v. Volume must be in state Attached or Detached", volumeName, volume.State)
}

if err := job.handleAttachmentTicketCreation(volumeName); err != nil {
return fmt.Errorf("failed to create attachment ticket for volume %v: %v", volumeName, err)
}
defer func() {
if err := job.handleAttachmentTicketDeletion(volumeName); err != nil {
job.logger.WithError(err).Error("Failed to clean up volume attachment ticket")
}
}()

// only recurring job types `snapshot` and `backup` need to check if old snapshots can be deleted or not before creating
switch job.task {
case longhorn.RecurringJobTypeSnapshot, longhorn.RecurringJobTypeBackup:
Expand Down Expand Up @@ -824,6 +841,73 @@ func (job *Job) waitForVolumeState(state string, timeout int) (*longhornclient.V
return nil, fmt.Errorf("timeout waiting for volume %v to be in state %v", volumeName, state)
}

// handleAttachmentTicketCreation check and create attachment so that the source volume doesn't get detached during the job.
func (job *Job) handleAttachmentTicketCreation(volumeName string) (err error) {
defer func() {
err = errors.Wrap(err, "handleAttachmentTicketCreation: failed to create/update attachment")
}()

volumeAttachment, err := job.getLHVolumeAttachmentByVolumeName(volumeName)
if err != nil {
return err
}

existingVA := volumeAttachment.DeepCopy()
defer func() {
if err != nil {
return
}
if reflect.DeepEqual(existingVA.Spec, volumeAttachment.Spec) {
return
}

if _, err = job.updateLHVolumeAttachment(volumeAttachment); err != nil {
return
}
}()

volume, err := job.GetVolume(volumeName)
attachmentTicketID := longhorn.GetAttachmentTicketID(longhorn.AttacherTypeRecurringJobApp, job.name)
createOrUpdateAttachmentTicket(volumeAttachment, attachmentTicketID, volume.Status.OwnerID, longhorn.AnyValue, longhorn.AttacherTypeBackupController)

return nil
}

// handleAttachmentTicketDeletion check and delete attachment so that the source volume can be detached after the job.
func (job *Job) handleAttachmentTicketDeletion(volumeName string) (err error) {
defer func() {
err = errors.Wrap(err, "handleAttachmentTicketDeletion: failed to clean up attachment")
}()

va, err := job.getLHVolumeAttachmentByVolumeName(volumeName)
if err != nil {
if apierrors.IsNotFound(err) {
return nil
}
return err
}

attachmentTicketID := longhorn.GetAttachmentTicketID(longhorn.AttacherTypeRecurringJobApp, job.name)

if _, ok := va.Spec.AttachmentTickets[attachmentTicketID]; ok {
delete(va.Spec.AttachmentTickets, attachmentTicketID)
if _, err = job.updateLHVolumeAttachment(va); err != nil {
return err
}
}

return nil
}

func (job *Job) getLHVolumeAttachmentByVolumeName(volumeName string) (*longhorn.VolumeAttachment, error) {
volumeAttachmentName := types.GetLHVolumeAttachmentNameFromVolumeName(volumeName)
return job.lhClient.LonghornV1beta2().VolumeAttachments(job.namespace).Get(context.TODO(), volumeAttachmentName, metav1.GetOptions{})
}

func (job *Job) updateLHVolumeAttachment(va *longhorn.VolumeAttachment) (*longhorn.VolumeAttachment, error) {
return job.lhClient.LonghornV1beta2().VolumeAttachments(va.Namespace).Update(context.TODO(), va, metav1.UpdateOptions{})
}

func filterSnapshotCRs(snapshotCRs []longhornclient.SnapshotCR, predicate func(snapshot longhornclient.SnapshotCR) bool) []longhornclient.SnapshotCR {
filtered := []longhornclient.SnapshotCR{}
for _, snapshotCR := range snapshotCRs {
Expand Down Expand Up @@ -947,3 +1031,22 @@ func getLonghornClientset() (*lhclientset.Clientset, error) {
}
return lhclientset.NewForConfig(config)
}

func createOrUpdateAttachmentTicket(va *longhorn.VolumeAttachment, ticketID, nodeID, disableFrontend string, attacherType longhorn.AttacherType) {
attachmentTicket, ok := va.Spec.AttachmentTickets[ticketID]
if !ok {
// Create new one
attachmentTicket = &longhorn.AttachmentTicket{
ID: ticketID,
Type: attacherType,
NodeID: nodeID,
Parameters: map[string]string{
longhorn.AttachmentParameterDisableFrontend: disableFrontend,
},
}
}
if attachmentTicket.NodeID != nodeID {
attachmentTicket.NodeID = nodeID
}
va.Spec.AttachmentTickets[attachmentTicket.ID] = attachmentTicket
}
1 change: 1 addition & 0 deletions k8s/pkg/apis/longhorn/v1beta2/volumeattachment.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ const (
AttacherTypeVolumeExpansionController = AttacherType("volume-expansion-controller")
AttacherTypeBackingImageDataSourceController = AttacherType("bim-ds-controller")
AttacherTypeVolumeRebuildingController = AttacherType("volume-rebuilding-controller")
AttacherTypeRecurringJobApp = AttacherType("recurring-job-app")
)

const (
Expand Down

0 comments on commit 08e8564

Please sign in to comment.