Skip to content

Commit

Permalink
Change DataExportSource type (#14)
Browse files Browse the repository at this point in the history
Signed-off-by: Serhii Aheienko <serhii.aheienko@gmail.com>
  • Loading branch information
saheienko authored Jun 16, 2020
1 parent 5603ba5 commit 0f67c66
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 176 deletions.
34 changes: 18 additions & 16 deletions pkg/apis/kdmp/v1alpha1/dataexport.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package v1alpha1

import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

Expand Down Expand Up @@ -78,23 +77,26 @@ type DataExport struct {

// DataExportSpec defines configuration parameters for DataExport.
type DataExportSpec struct {
Type DataExportType `json:"type,omitempty"`
ClusterPair string `json:"clusterPair,omitempty"`
SnapshotStorageClass string `json:"snapshotStorageClass,omitempty"`
Source DataExportSource `json:"source,omitempty"`
Destination DataExportDestination `json:"destination,omitempty"`
Type DataExportType `json:"type,omitempty"`
ClusterPair string `json:"clusterPair,omitempty"`
SnapshotStorageClass string `json:"snapshotStorageClass,omitempty"`
Source DataExportObjectReference `json:"source,omitempty"`
Destination DataExportObjectReference `json:"destination,omitempty"`
}

// DataExportSource defines a PVC name and namespace that should be processed.
type DataExportSource struct {
PersistentVolumeClaim *corev1.PersistentVolumeClaim `json:"persistentVolumeClaim,omitempty"`
}

// DataExportDestination defines a backend for data transfer.
type DataExportDestination struct {
// PersistentVolumeClaim defines a PVC backend for data transfer. If provided PVC doesn't exist
// a new one will be created using the spec configuration.
PersistentVolumeClaim *corev1.PersistentVolumeClaim `json:"persistentVolumeClaim,omitempty"`
// DataExportObjectReference contains enough information to let you inspect the referred object.
type DataExportObjectReference struct {
// API version of the referent.
APIVersion string `json:"apiVersion,omitempty"`
// Kind of the referent.
// More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds
Kind string `json:"kind,omitempty"`
// Namespace of the referent.
// More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/namespaces/
Namespace string `json:"namespace,omitempty"`
// Name of the referent.
// More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names
Name string `json:"name,omitempty"`
}

// ExportStatus indicates a current state of the data transfer.
Expand Down
41 changes: 7 additions & 34 deletions pkg/apis/kdmp/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

146 changes: 20 additions & 126 deletions pkg/controllers/dataexport/reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"github.com/portworx/sched-ops/k8s/core"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/utils/pointer"
)

Expand Down Expand Up @@ -98,23 +97,18 @@ func (c *Controller) sync(ctx context.Context, in *kdmpapi.DataExport) (bool, er
return true, c.client.Update(ctx, setStatus(dataExport, kdmpapi.DataExportStatusInitial, ""))
}

if _, err := c.ensureDestinationPVC(ctx, dataExport); err != nil {
msg := fmt.Sprintf("destination PVC check failed: %s", err)
return false, c.updateStatus(dataExport, kdmpapi.DataExportStatusFailed, msg)
}

srcPVC := dataExport.Spec.Source.PersistentVolumeClaim.Name
srcPVCName := dataExport.Spec.Source.Name

// use snapshot pvc in the dst namespace if it's available
if dataExport.Status.SnapshotPVCName != "" {
srcPVC = dataExport.Status.SnapshotPVCName
srcPVCName = dataExport.Status.SnapshotPVCName
}

// start data transfer
id, err := driver.StartJob(
drivers.WithSourcePVC(srcPVC),
drivers.WithDestinationPVC(dataExport.Spec.Destination.PersistentVolumeClaim.GetName()),
drivers.WithNamespace(dataExport.Spec.Destination.PersistentVolumeClaim.GetNamespace()),
drivers.WithSourcePVC(srcPVCName),
drivers.WithDestinationPVC(dataExport.Spec.Destination.Name),
drivers.WithNamespace(dataExport.Spec.Destination.Namespace),
drivers.WithLabels(jobLabels(dataExport.GetName())),
)
if err != nil {
Expand Down Expand Up @@ -172,11 +166,10 @@ func (c *Controller) stageSnapshotScheduled(ctx context.Context, snapshotter sna
return false, fmt.Errorf("snapshot driver is nil")
}

srcPVC, dstPVC := dataExport.Spec.Source.PersistentVolumeClaim, dataExport.Spec.Destination.PersistentVolumeClaim
name, namespace, err := snapshotter.CreateSnapshot(
snapshots.PVCName(srcPVC.Name),
snapshots.PVCNamespace(srcPVC.Namespace),
snapshots.RestoreNamespaces(dstPVC.Namespace),
snapshots.PVCName(dataExport.Spec.Source.Name),
snapshots.PVCNamespace(dataExport.Spec.Source.Namespace),
snapshots.RestoreNamespaces(dataExport.Spec.Destination.Namespace),
)
if err != nil {
msg := fmt.Sprintf("failed to create a snapshot: %s", err)
Expand All @@ -199,8 +192,7 @@ func (c *Controller) stageSnapshotInProgress(ctx context.Context, snapshotter sn
return false, fmt.Errorf("snapshot driver is nil")
}

srcPvc := dataExport.Spec.Source.PersistentVolumeClaim
status, err := snapshotter.SnapshotStatus(dataExport.Status.SnapshotID, srcPvc.Namespace)
status, err := snapshotter.SnapshotStatus(dataExport.Status.SnapshotID, dataExport.Spec.Source.Namespace)
if err != nil {
msg := fmt.Sprintf("failed to get a snapshot status: %s", err)
return false, c.updateStatus(dataExport, kdmpapi.DataExportStatusFailed, msg)
Expand Down Expand Up @@ -288,8 +280,8 @@ func (c *Controller) restoreSnapshot(ctx context.Context, snapshotter snapshots.
return nil, fmt.Errorf("snapshot driver is nil")
}

srcPvc := de.Spec.Source.PersistentVolumeClaim
srcPvc, err := core.Instance().GetPersistentVolumeClaim(srcPvc.Name, srcPvc.Namespace)
src := de.Spec.Source
srcPvc, err := core.Instance().GetPersistentVolumeClaim(src.Name, src.Namespace)
if err != nil {
return nil, err
}
Expand All @@ -303,7 +295,7 @@ func (c *Controller) restoreSnapshot(ctx context.Context, snapshotter snapshots.
snapshots.Name(de.Status.SnapshotID),
snapshots.Namespace(de.Status.SnapshotNamespace),
snapshots.PVCName(toSnapshotPVCName(srcPvc.Name)),
snapshots.PVCNamespace(de.Spec.Destination.PersistentVolumeClaim.Namespace),
snapshots.PVCNamespace(de.Spec.Destination.Namespace),
snapshots.PVCSpec(restoreSpec),
)
if err != nil {
Expand All @@ -316,50 +308,32 @@ func (c *Controller) restoreSnapshot(ctx context.Context, snapshotter snapshots.
}

func (c *Controller) checkClaims(de *kdmpapi.DataExport) error {
srcPVC := de.Spec.Source.PersistentVolumeClaim
if srcPVC == nil {
return fmt.Errorf("source pvc should be provided")
if !hasSnapshotStage(de) && de.Spec.Source.Namespace != de.Spec.Destination.Namespace {
return fmt.Errorf("source and destination volume claims should be in the same namespace if no snapshot class is provided")
}

// ignore a check for mounted pods if a source pvc has a snapshot (data will be copied from the snapshot)
src, err := c.checkPVC(srcPVC, !hasSnapshotStage(de))
srcPVC, err := c.checkPVC(de.Spec.Source, !hasSnapshotStage(de))
if err != nil {
return fmt.Errorf("source pvc: %v", err)
}
srcPVC = src

dstPVC := de.Spec.Destination.PersistentVolumeClaim
if dstPVC == nil {
return fmt.Errorf("destination pvc should be provided")
}
dstPVCexits := true
dst, err := c.checkPVC(dstPVC, true)
dstPVC, err := c.checkPVC(de.Spec.Destination, true)
if err != nil {
if !errors.IsNotFound(err) {
return fmt.Errorf("destination pvc: %v", err)
}

// check provided pvc spec if destination pvc doesn't exist
if err = isValidDestinationSpec(dstPVC.Spec); err != nil {
return fmt.Errorf("destination pvc: %v", err)
}

dstPVCexits = false
}
if dst != nil {
dstPVC = dst
return fmt.Errorf("destination pvc: %v", err)
}

srcReq := srcPVC.Spec.Resources.Requests[corev1.ResourceStorage]
dstReq := dstPVC.Spec.Resources.Requests[corev1.ResourceStorage]
// dstReq < srcReq
if dstPVCexits && dstReq.Cmp(srcReq) == -1 {
if dstReq.Cmp(srcReq) == -1 {
return fmt.Errorf("size of the destination pvc (%s) is less than of the source one (%s)", dstReq.String(), srcReq.String())
}

return nil
}

func (c *Controller) checkPVC(in *corev1.PersistentVolumeClaim, checkMounts bool) (*corev1.PersistentVolumeClaim, error) {
func (c *Controller) checkPVC(in kdmpapi.DataExportObjectReference, checkMounts bool) (*corev1.PersistentVolumeClaim, error) {
if in.Name == "" || in.Namespace == "" {
return nil, fmt.Errorf("name and namespace should be provided")
}
Expand All @@ -384,47 +358,6 @@ func (c *Controller) checkPVC(in *corev1.PersistentVolumeClaim, checkMounts bool
return pvc, nil
}

func (c *Controller) ensureDestinationPVC(ctx context.Context, de *kdmpapi.DataExport) (*corev1.PersistentVolumeClaim, error) {
dst := de.Spec.Destination.PersistentVolumeClaim
dstPVC, err := core.Instance().GetPersistentVolumeClaim(dst.Name, dst.Namespace)
if err != nil {
if !errors.IsNotFound(err) {
return nil, fmt.Errorf("get destination pvc: %s", err)
}
// create a volume otherwise
}
if err == nil {
return dstPVC, nil
}

// Create a volume claim if it's not found
spec := dst.Spec
if err := isValidClaimSpec(spec); err != nil {
// use source spec parameters if destination on is invalid
src := de.Spec.Source.PersistentVolumeClaim
srcPVC, err := core.Instance().GetPersistentVolumeClaim(src.Name, src.Namespace)
if err != nil {
return nil, fmt.Errorf("get source pvc: %s", err)
}
spec = mergeSpec(spec, srcPVC.Spec)
}

// TODO: copy annotations and labels?
dstPVC = &corev1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Name: dst.Name,
Namespace: dst.Namespace,
},
Spec: corev1.PersistentVolumeClaimSpec{
AccessModes: spec.AccessModes,
Resources: spec.Resources,
StorageClassName: spec.StorageClassName,
VolumeMode: spec.VolumeMode,
},
}
return core.Instance().CreatePersistentVolumeClaim(dstPVC)
}

func toPodNames(objs []corev1.Pod) []string {
out := make([]string, 0)
for _, o := range objs {
Expand All @@ -441,45 +374,6 @@ func hasSnapshotStage(de *kdmpapi.DataExport) bool {
return de.Spec.SnapshotStorageClass != ""
}

func isValidClaimSpec(spec corev1.PersistentVolumeClaimSpec) error {
if spec.StorageClassName == nil || *spec.StorageClassName == "" {
return fmt.Errorf("storageClassName should be set")
}
if spec.Resources.Requests == nil {
return fmt.Errorf("requests should be set")
}
if len(spec.AccessModes) == 0 {
return fmt.Errorf("accessModes should be set")
}
return nil
}

func isValidDestinationSpec(spec corev1.PersistentVolumeClaimSpec) error {
if spec.StorageClassName == nil || *spec.StorageClassName == "" {
return fmt.Errorf("storageClassName should be set")
}
return nil
}

func mergeSpec(dst, src corev1.PersistentVolumeClaimSpec) corev1.PersistentVolumeClaimSpec {
if dst.StorageClassName == nil {
dst.StorageClassName = src.StorageClassName
}
if dst.Resources.Requests == nil {
dst.Resources.Requests = src.Resources.Requests
}
if dst.Resources.Limits == nil {
dst.Resources.Limits = src.Resources.Limits
}
if dst.AccessModes == nil {
dst.AccessModes = src.AccessModes
}
if dst.VolumeMode == nil {
dst.VolumeMode = src.VolumeMode
}
return dst
}

func setStatus(de *kdmpapi.DataExport, status kdmpapi.DataExportStatus, reason string) *kdmpapi.DataExport {
de.Status.Status = status
de.Status.Reason = reason
Expand Down

0 comments on commit 0f67c66

Please sign in to comment.