Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(recurring-job): avoid intermediate volume detachment during backup #2629

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 99 additions & 25 deletions app/recurring_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"fmt"
"os"
"reflect"
"sort"
"strconv"
"strings"
Expand All @@ -20,6 +21,7 @@ import (
"k8s.io/client-go/tools/record"

corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
clientset "k8s.io/client-go/kubernetes"
typedv1core "k8s.io/client-go/kubernetes/typed/core/v1"
Expand Down Expand Up @@ -48,6 +50,8 @@ const (
)

type Job struct {
name string

logger logrus.FieldLogger
lhClient lhclientset.Interface
namespace string
Expand Down Expand Up @@ -184,15 +188,18 @@ func startVolumeJob(
job, err := newJob(
logger,
managerURL,
jobName,
volumeName,
snapshotName,
jobLabelMap,
jobRetain,
jobTask)
jobTask,
)
if err != nil {
log.WithError(err).Error("Failed to create new job for volume")
return err
}

err = job.run()
if err != nil {
log.WithError(err).Errorf("Failed to run job for volume")
Expand All @@ -213,7 +220,7 @@ func sliceStringSafely(s string, begin, end int) string {
return s[begin:end]
}

func newJob(logger logrus.FieldLogger, managerURL, volumeName, snapshotName string, labels map[string]string, retain int, task longhorn.RecurringJobType) (*Job, error) {
func newJob(logger logrus.FieldLogger, managerURL, jobName, volumeName, snapshotName string, labels map[string]string, retain int, task longhorn.RecurringJobType) (*Job, error) {
namespace := os.Getenv(types.EnvPodNamespace)
if namespace == "" {
return nil, fmt.Errorf("failed detect pod namespace, environment variable %v is missing", types.EnvPodNamespace)
Expand Down Expand Up @@ -257,6 +264,7 @@ func newJob(logger logrus.FieldLogger, managerURL, volumeName, snapshotName stri
}

return &Job{
name: jobName,
logger: logger,
lhClient: lhClient,
namespace: namespace,
Expand Down Expand Up @@ -303,6 +311,15 @@ func (job *Job) run() (err error) {
return fmt.Errorf("volume %v is in an invalid state for recurring job: %v. Volume must be in state Attached or Detached", volumeName, volume.State)
}

if err := job.handleAttachmentTicketCreation(volumeName); err != nil {
return fmt.Errorf("failed to create attachment ticket for volume %v: %v", volumeName, err)
}
defer func() {
if err := job.handleAttachmentTicketDeletion(volumeName); err != nil {
job.logger.WithError(err).Error("Failed to clean up volume attachment ticket")
}
}()

// only recurring job types `snapshot` and `backup` need to check if old snapshots can be deleted or not before creating
switch job.task {
case longhorn.RecurringJobTypeSnapshot, longhorn.RecurringJobTypeBackup:
Expand Down Expand Up @@ -478,17 +495,6 @@ func (job *Job) eventCreate(eventType, eventReason, message string) error {
return nil
}

func (job *Job) deleteSnapshots(names []string, volume *longhornclient.Volume, volumeAPI longhornclient.VolumeOperations) error {
for _, name := range names {
_, err := volumeAPI.ActionSnapshotDelete(volume, &longhornclient.SnapshotInput{Name: name})
if err != nil {
return err
}
job.logger.WithField("volume", volume.Name).Infof("Deleted snapshot %v", name)
}
return nil
}

func (job *Job) purgeSnapshots(volume *longhornclient.Volume, volumeAPI longhornclient.VolumeOperations) error {
// Trigger snapshot purge of the volume
if _, err := volumeAPI.ActionSnapshotPurge(volume); err != nil {
Expand Down Expand Up @@ -806,22 +812,71 @@ func (job *Job) GetSettingAsBool(name types.SettingName) (bool, error) {
return value, nil
}

// waitForVolumeState timeout in second
func (job *Job) waitForVolumeState(state string, timeout int) (*longhornclient.Volume, error) {
volumeAPI := job.api.Volume
volumeName := job.volumeName
// handleAttachmentTicketCreation check and create attachment so that the source volume doesn't get detached during the job.
func (job *Job) handleAttachmentTicketCreation(volumeName string) (err error) {
defer func() {
err = errors.Wrap(err, "handleAttachmentTicketCreation: failed to create/update attachment")
}()

for i := 0; i < timeout; i++ {
volume, err := volumeAPI.ById(volumeName)
if err == nil {
if volume.State == state {
return volume, nil
}
volumeAttachment, err := job.getLHVolumeAttachmentByVolumeName(volumeName)
if err != nil {
return err
}

existingVolumeAttachment := volumeAttachment.DeepCopy()
defer func() {
if err != nil {
return
}
if reflect.DeepEqual(existingVolumeAttachment.Spec, volumeAttachment.Spec) {
return
}

if _, err = job.updateLHVolumeAttachment(volumeAttachment); err != nil {
return
}
}()

volume, err := job.GetVolume(volumeName)
attachmentTicketID := longhorn.GetAttachmentTicketID(longhorn.AttacherTypeRecurringJobApp, job.name)
createOrUpdateAttachmentTicket(volumeAttachment, attachmentTicketID, volume.Status.OwnerID, longhorn.AnyValue, longhorn.AttacherTypeBackupController)

return nil
}

// handleAttachmentTicketDeletion check and delete attachment so that the source volume can be detached after the job.
func (job *Job) handleAttachmentTicketDeletion(volumeName string) (err error) {
defer func() {
err = errors.Wrap(err, "handleAttachmentTicketDeletion: failed to clean up attachment")
}()

volumeAttachment, err := job.getLHVolumeAttachmentByVolumeName(volumeName)
if err != nil {
if apierrors.IsNotFound(err) {
return nil
}
return err
}

attachmentTicketID := longhorn.GetAttachmentTicketID(longhorn.AttacherTypeRecurringJobApp, job.name)

if _, ok := volumeAttachment.Spec.AttachmentTickets[attachmentTicketID]; ok {
delete(volumeAttachment.Spec.AttachmentTickets, attachmentTicketID)
if _, err = job.updateLHVolumeAttachment(volumeAttachment); err != nil {
return err
}
time.Sleep(1 * time.Second)
}

return nil, fmt.Errorf("timeout waiting for volume %v to be in state %v", volumeName, state)
return nil
}

func (job *Job) getLHVolumeAttachmentByVolumeName(volumeName string) (*longhorn.VolumeAttachment, error) {
volumeAttachmentName := types.GetLHVolumeAttachmentNameFromVolumeName(volumeName)
return job.lhClient.LonghornV1beta2().VolumeAttachments(job.namespace).Get(context.TODO(), volumeAttachmentName, metav1.GetOptions{})
}

func (job *Job) updateLHVolumeAttachment(volumeAttachment *longhorn.VolumeAttachment) (*longhorn.VolumeAttachment, error) {
return job.lhClient.LonghornV1beta2().VolumeAttachments(volumeAttachment.Namespace).Update(context.TODO(), volumeAttachment, metav1.UpdateOptions{})
}

func filterSnapshotCRs(snapshotCRs []longhornclient.SnapshotCR, predicate func(snapshot longhornclient.SnapshotCR) bool) []longhornclient.SnapshotCR {
Expand Down Expand Up @@ -947,3 +1002,22 @@ func getLonghornClientset() (*lhclientset.Clientset, error) {
}
return lhclientset.NewForConfig(config)
}

func createOrUpdateAttachmentTicket(volumeAttachment *longhorn.VolumeAttachment, ticketID, nodeID, disableFrontend string, attacherType longhorn.AttacherType) {
attachmentTicket, ok := volumeAttachment.Spec.AttachmentTickets[ticketID]
if !ok {
// Create new one
attachmentTicket = &longhorn.AttachmentTicket{
ID: ticketID,
Type: attacherType,
NodeID: nodeID,
Parameters: map[string]string{
longhorn.AttachmentParameterDisableFrontend: disableFrontend,
},
}
}
if attachmentTicket.NodeID != nodeID {
attachmentTicket.NodeID = nodeID
}
volumeAttachment.Spec.AttachmentTickets[attachmentTicket.ID] = attachmentTicket
}
1 change: 1 addition & 0 deletions k8s/pkg/apis/longhorn/v1beta2/volumeattachment.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ const (
AttacherTypeVolumeExpansionController = AttacherType("volume-expansion-controller")
AttacherTypeBackingImageDataSourceController = AttacherType("bim-ds-controller")
AttacherTypeVolumeRebuildingController = AttacherType("volume-rebuilding-controller")
AttacherTypeRecurringJobApp = AttacherType("recurring-job-app")
)

const (
Expand Down