diff --git a/app/recurring_job.go b/app/recurring_job.go index 6070a5598b..74329af785 100644 --- a/app/recurring_job.go +++ b/app/recurring_job.go @@ -5,6 +5,7 @@ import ( "encoding/json" "fmt" "os" + "reflect" "sort" "strconv" "strings" @@ -20,6 +21,7 @@ import ( "k8s.io/client-go/tools/record" corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" clientset "k8s.io/client-go/kubernetes" typedv1core "k8s.io/client-go/kubernetes/typed/core/v1" @@ -48,6 +50,8 @@ const ( ) type Job struct { + name string + logger logrus.FieldLogger lhClient lhclientset.Interface namespace string @@ -184,15 +188,18 @@ func startVolumeJob( job, err := newJob( logger, managerURL, + jobName, volumeName, snapshotName, jobLabelMap, jobRetain, - jobTask) + jobTask, + ) if err != nil { log.WithError(err).Error("Failed to create new job for volume") return err } + err = job.run() if err != nil { log.WithError(err).Errorf("Failed to run job for volume") @@ -213,7 +220,7 @@ func sliceStringSafely(s string, begin, end int) string { return s[begin:end] } -func newJob(logger logrus.FieldLogger, managerURL, volumeName, snapshotName string, labels map[string]string, retain int, task longhorn.RecurringJobType) (*Job, error) { +func newJob(logger logrus.FieldLogger, managerURL, jobName, volumeName, snapshotName string, labels map[string]string, retain int, task longhorn.RecurringJobType) (*Job, error) { namespace := os.Getenv(types.EnvPodNamespace) if namespace == "" { return nil, fmt.Errorf("failed detect pod namespace, environment variable %v is missing", types.EnvPodNamespace) @@ -257,6 +264,8 @@ func newJob(logger logrus.FieldLogger, managerURL, volumeName, snapshotName stri } return &Job{ + name: jobName, + logger: logger, lhClient: lhClient, namespace: namespace, @@ -303,6 +312,15 @@ func (job *Job) run() (err error) { return fmt.Errorf("volume %v is in an invalid state for recurring job: %v. Volume must be in state Attached or Detached", volumeName, volume.State) } + if err := job.handleAttachmentTicketCreation(volumeName); err != nil { + return fmt.Errorf("failed to create attachment ticket for volume %v: %v", volumeName, err) + } + defer func() { + if err := job.handleAttachmentTicketDeletion(volumeName); err != nil { + job.logger.WithError(err).Error("Failed to clean up volume attachment ticket") + } + }() + // only recurring job types `snapshot` and `backup` need to check if old snapshots can be deleted or not before creating switch job.task { case longhorn.RecurringJobTypeSnapshot, longhorn.RecurringJobTypeBackup: @@ -824,6 +842,73 @@ func (job *Job) waitForVolumeState(state string, timeout int) (*longhornclient.V return nil, fmt.Errorf("timeout waiting for volume %v to be in state %v", volumeName, state) } +// handleAttachmentTicketCreation check and create attachment so that the source volume is attached if needed +func (job *Job) handleAttachmentTicketCreation(volumeName string) (err error) { + defer func() { + err = errors.Wrap(err, "handleAttachmentTicketCreation: failed to create/update attachment") + }() + + volumeAttachment, err := job.getLHVolumeAttachmentByVolumeName(volumeName) + if err != nil { + return err + } + + existingVA := volumeAttachment.DeepCopy() + defer func() { + if err != nil { + return + } + if reflect.DeepEqual(existingVA.Spec, volumeAttachment.Spec) { + return + } + + if _, err = job.updateLHVolumeAttachment(volumeAttachment); err != nil { + return + } + }() + + volume, err := job.GetVolume(volumeName) + attachmentTicketID := longhorn.GetAttachmentTicketID(longhorn.AttacherTypeRecurringJobApp, job.name) + createOrUpdateAttachmentTicket(volumeAttachment, attachmentTicketID, volume.Status.OwnerID, longhorn.AnyValue, longhorn.AttacherTypeBackupController) + + return nil +} + +// handleAttachmentTicketDeletion check and delete attachment so that the source volume is detached if needed +func (job *Job) handleAttachmentTicketDeletion(volumeName string) (err error) { + defer func() { + err = errors.Wrap(err, "handleAttachmentTicketDeletion: failed to clean up attachment") + }() + + va, err := job.getLHVolumeAttachmentByVolumeName(volumeName) + if err != nil { + if apierrors.IsNotFound(err) { + return nil + } + return err + } + + attachmentTicketID := longhorn.GetAttachmentTicketID(longhorn.AttacherTypeRecurringJobApp, job.name) + + if _, ok := va.Spec.AttachmentTickets[attachmentTicketID]; ok { + delete(va.Spec.AttachmentTickets, attachmentTicketID) + if _, err = job.updateLHVolumeAttachment(va); err != nil { + return err + } + } + + return nil +} + +func (job *Job) getLHVolumeAttachmentByVolumeName(volumeName string) (*longhorn.VolumeAttachment, error) { + volumeAttachmentName := types.GetLHVolumeAttachmentNameFromVolumeName(volumeName) + return job.lhClient.LonghornV1beta2().VolumeAttachments(job.namespace).Get(context.TODO(), volumeAttachmentName, metav1.GetOptions{}) +} + +func (job *Job) updateLHVolumeAttachment(va *longhorn.VolumeAttachment) (*longhorn.VolumeAttachment, error) { + return job.lhClient.LonghornV1beta2().VolumeAttachments(va.Namespace).Update(context.TODO(), va, metav1.UpdateOptions{}) +} + func filterSnapshotCRs(snapshotCRs []longhornclient.SnapshotCR, predicate func(snapshot longhornclient.SnapshotCR) bool) []longhornclient.SnapshotCR { filtered := []longhornclient.SnapshotCR{} for _, snapshotCR := range snapshotCRs { @@ -947,3 +1032,22 @@ func getLonghornClientset() (*lhclientset.Clientset, error) { } return lhclientset.NewForConfig(config) } + +func createOrUpdateAttachmentTicket(va *longhorn.VolumeAttachment, ticketID, nodeID, disableFrontend string, attacherType longhorn.AttacherType) { + attachmentTicket, ok := va.Spec.AttachmentTickets[ticketID] + if !ok { + // Create new one + attachmentTicket = &longhorn.AttachmentTicket{ + ID: ticketID, + Type: attacherType, + NodeID: nodeID, + Parameters: map[string]string{ + longhorn.AttachmentParameterDisableFrontend: disableFrontend, + }, + } + } + if attachmentTicket.NodeID != nodeID { + attachmentTicket.NodeID = nodeID + } + va.Spec.AttachmentTickets[attachmentTicket.ID] = attachmentTicket +} diff --git a/k8s/pkg/apis/longhorn/v1beta2/volumeattachment.go b/k8s/pkg/apis/longhorn/v1beta2/volumeattachment.go index 2cf935c974..25aaa49c32 100644 --- a/k8s/pkg/apis/longhorn/v1beta2/volumeattachment.go +++ b/k8s/pkg/apis/longhorn/v1beta2/volumeattachment.go @@ -52,6 +52,7 @@ const ( AttacherTypeVolumeExpansionController = AttacherType("volume-expansion-controller") AttacherTypeBackingImageDataSourceController = AttacherType("bim-ds-controller") AttacherTypeVolumeRebuildingController = AttacherType("volume-rebuilding-controller") + AttacherTypeRecurringJobApp = AttacherType("recurring-job-app") ) const (