-
Notifications
You must be signed in to change notification settings - Fork 108
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
csi: implement Snapshot functionality #103
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -547,16 +547,13 @@ func (d *Driver) ControllerGetCapabilities(ctx context.Context, req *csi.Control | |
} | ||
} | ||
|
||
// TODO(arslan): checkout if the capabilities are worth supporting | ||
var caps []*csi.ControllerServiceCapability | ||
for _, cap := range []csi.ControllerServiceCapability_RPC_Type{ | ||
csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME, | ||
csi.ControllerServiceCapability_RPC_PUBLISH_UNPUBLISH_VOLUME, | ||
csi.ControllerServiceCapability_RPC_LIST_VOLUMES, | ||
|
||
// TODO(arslan): enable once snapshotting is supported | ||
// csi.ControllerServiceCapability_RPC_CREATE_DELETE_SNAPSHOT, | ||
// csi.ControllerServiceCapability_RPC_LIST_SNAPSHOTS, | ||
csi.ControllerServiceCapability_RPC_CREATE_DELETE_SNAPSHOT, | ||
csi.ControllerServiceCapability_RPC_LIST_SNAPSHOTS, | ||
} { | ||
caps = append(caps, newCap(cap)) | ||
} | ||
|
@@ -575,32 +572,194 @@ func (d *Driver) ControllerGetCapabilities(ctx context.Context, req *csi.Control | |
// CreateSnapshot will be called by the CO to create a new snapshot from a | ||
// source volume on behalf of a user. | ||
func (d *Driver) CreateSnapshot(ctx context.Context, req *csi.CreateSnapshotRequest) (*csi.CreateSnapshotResponse, error) { | ||
d.log.WithFields(logrus.Fields{ | ||
"req": req, | ||
"method": "create_snapshot", | ||
}).Warn("create snapshot is not implemented") | ||
return nil, status.Error(codes.Unimplemented, "") | ||
if req.GetName() == "" { | ||
return nil, status.Error(codes.InvalidArgument, "CreateSnapshot Name must be provided") | ||
} | ||
|
||
if req.GetSourceVolumeId() == "" { | ||
return nil, status.Error(codes.InvalidArgument, "CreateSnapshot Source Volume ID must be provided") | ||
} | ||
|
||
ll := d.log.WithFields(logrus.Fields{ | ||
"req_name": req.GetName(), | ||
"req_source_volume_id": req.GetSourceVolumeId(), | ||
"req_parameters": req.GetParameters(), | ||
"method": "create_snapshot", | ||
}) | ||
|
||
ll.Info("create snapshot is called") | ||
|
||
// get snapshot first, if it's created do no thing | ||
snapshots, _, err := d.storage.ListSnapshots(ctx, req.GetSourceVolumeId(), nil) | ||
if err != nil { | ||
return nil, status.Errorf(codes.Internal, "couldn't fetch snapshots: %s", err.Error()) | ||
} | ||
|
||
for _, snap := range snapshots { | ||
if snap.Name == req.GetName() && snap.ResourceID == req.GetSourceVolumeId() { | ||
s, err := toCsiSnapshot(&snap) | ||
if err != nil { | ||
return nil, status.Errorf(codes.Internal, | ||
"couldn't convert DO snapshot to CSI snapshot: %s", err.Error()) | ||
} | ||
|
||
return &csi.CreateSnapshotResponse{ | ||
Snapshot: s, | ||
}, nil | ||
} | ||
} | ||
|
||
snap, resp, err := d.storage.CreateSnapshot(ctx, &godo.SnapshotCreateRequest{ | ||
VolumeID: req.GetSourceVolumeId(), | ||
Name: req.GetName(), | ||
Description: createdByDO, | ||
}) | ||
if err != nil { | ||
if resp != nil && resp.StatusCode == http.StatusConflict { | ||
// 409 is returned when we try to snapshot a volume with the same | ||
// name | ||
ll.WithFields(logrus.Fields{ | ||
"error": err, | ||
"resp": resp, | ||
}).Warn("snapshot create failed, might be due using an existing name") | ||
return nil, status.Errorf(codes.AlreadyExists, "snapshot with name %s already exists", req.GetName()) | ||
} | ||
|
||
return nil, status.Error(codes.Internal, err.Error()) | ||
} | ||
|
||
s, err := toCsiSnapshot(snap) | ||
if err != nil { | ||
return nil, status.Errorf(codes.Internal, | ||
"couldn't convert DO snapshot to CSI snapshot: %s", err.Error()) | ||
} | ||
|
||
return &csi.CreateSnapshotResponse{ | ||
Snapshot: s, | ||
}, nil | ||
} | ||
|
||
// DeleteSnapshost will be called by the CO to delete a snapshot. | ||
func (d *Driver) DeleteSnapshot(ctx context.Context, req *csi.DeleteSnapshotRequest) (*csi.DeleteSnapshotResponse, error) { | ||
d.log.WithFields(logrus.Fields{ | ||
"req": req, | ||
"method": "delete_snapshot", | ||
}).Warn("delete snapshot is not implemented") | ||
return nil, status.Error(codes.Unimplemented, "") | ||
ll := d.log.WithFields(logrus.Fields{ | ||
"req_snapshot_id": req.GetSnapshotId(), | ||
"method": "delete_snapshot", | ||
}) | ||
|
||
ll.Info("delete snapshot is called") | ||
|
||
if req.GetSnapshotId() == "" { | ||
return nil, status.Error(codes.InvalidArgument, "DeleteSnapshot Snapshot ID must be provided") | ||
} | ||
|
||
resp, err := d.storage.DeleteSnapshot(ctx, req.GetSnapshotId()) | ||
if err != nil { | ||
if resp != nil && resp.StatusCode == http.StatusNotFound { | ||
// we assume it's deleted already for idempotency | ||
ll.WithFields(logrus.Fields{ | ||
"error": err, | ||
"resp": resp, | ||
}).Warn("assuming snapshot is deleted already") | ||
return &csi.DeleteSnapshotResponse{}, nil | ||
} | ||
return nil, err | ||
} | ||
|
||
ll.WithField("response", resp).Info("snapshot is deleted") | ||
return &csi.DeleteSnapshotResponse{}, nil | ||
} | ||
|
||
// ListSnapshots returns the information about all snapshots on the storage | ||
// system within the given parameters regardless of how they were created. | ||
// ListSnapshots shold not list a snapshot that is being created but has not | ||
// been cut successfully yet. | ||
func (d *Driver) ListSnapshots(ctx context.Context, req *csi.ListSnapshotsRequest) (*csi.ListSnapshotsResponse, error) { | ||
d.log.WithFields(logrus.Fields{ | ||
"req": req, | ||
"method": "list_snapshots", | ||
}).Warn("list snapshots is not implemented") | ||
return nil, status.Error(codes.Unimplemented, "") | ||
// Pagination in the CSI world works different than at DO. CSI sens the | ||
// `req.MaxEntries` to indicate how much snapshots it wants. The | ||
// req.StartingToken is returned by us, if we somehow need to indicate that | ||
// we couldn't fetch and need to fetch again. But it's NOT the page number. | ||
// Suppose CSI wants us to fetch 50 entries, we only fetch 30, we need to | ||
// return NextToken as 31, to indicate that we want to continue returning | ||
// from the index 31 up to 50. | ||
|
||
// Check if it's an integer | ||
var nextToken int | ||
var err error | ||
if req.StartingToken != "" { | ||
nextToken, err = strconv.Atoi(req.StartingToken) | ||
if err != nil { | ||
return nil, status.Errorf(codes.Aborted, "ListSnapshots starting token is not valid : %s", | ||
err.Error()) | ||
} | ||
} | ||
|
||
if nextToken != 0 && req.MaxEntries != 0 { | ||
return nil, status.Errorf(codes.Aborted, | ||
"ListSnapshots invalid arguments starting token: %d and max entries: %d can't be non null at the same time", nextToken, req.MaxEntries) | ||
} | ||
|
||
ll := d.log.WithFields(logrus.Fields{ | ||
"req_starting_token": req.StartingToken, | ||
"method": "list_snapshots", | ||
}) | ||
ll.Info("list snapshots is called") | ||
|
||
// fetch all entries | ||
listOpts := &godo.ListOptions{} | ||
var snapshots []godo.Snapshot | ||
for { | ||
snaps, resp, err := d.snapshots.ListVolume(ctx, listOpts) | ||
if err != nil { | ||
return nil, status.Errorf(codes.Aborted, "ListSnapshots listing volume snapshots has failed: %s", err.Error()) | ||
} | ||
|
||
snapshots = append(snapshots, snaps...) | ||
|
||
if resp.Links == nil || resp.Links.IsLastPage() { | ||
break | ||
} | ||
|
||
page, err := resp.Links.CurrentPage() | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
listOpts.Page = page + 1 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. also set |
||
} | ||
|
||
if nextToken > len(snapshots) { | ||
return nil, status.Error(codes.Aborted, "ListSnapshots starting token is greater than total number of snapshots") | ||
} | ||
|
||
if nextToken != 0 { | ||
snapshots = snapshots[nextToken:] | ||
} | ||
|
||
if req.MaxEntries != 0 { | ||
nextToken = len(snapshots) - int(req.MaxEntries) - 1 | ||
snapshots = snapshots[:req.MaxEntries] | ||
} | ||
|
||
var entries []*csi.ListSnapshotsResponse_Entry | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. entries := make([]*csi.ListSnapshotsResponse_Entry, 0, len(snapshots)) |
||
for _, snapshot := range snapshots { | ||
snap, err := toCsiSnapshot(&snapshot) | ||
if err != nil { | ||
return nil, status.Errorf(codes.Internal, | ||
"couldn't convert DO snapshot to CSI snapshot: %s", err.Error()) | ||
} | ||
|
||
entries = append(entries, &csi.ListSnapshotsResponse_Entry{ | ||
Snapshot: snap, | ||
}) | ||
} | ||
|
||
listResp := &csi.ListSnapshotsResponse{ | ||
Entries: entries, | ||
NextToken: strconv.Itoa(nextToken), | ||
} | ||
|
||
ll.WithField("response", listResp).Info("snapshots listed") | ||
return listResp, nil | ||
} | ||
|
||
// extractStorage extracts the storage size in GB from the given capacity | ||
|
@@ -704,6 +863,25 @@ func (d *Driver) checkLimit(ctx context.Context) error { | |
return nil | ||
} | ||
|
||
// toCsiSnapshot converts a DO Snapshot struct into a csi.Snapshot struct | ||
func toCsiSnapshot(snap *godo.Snapshot) (*csi.Snapshot, error) { | ||
// snapshot already exists, return the information we have | ||
createdAt, err := time.Parse(time.RFC3339, snap.Created) | ||
if err != nil { | ||
return nil, status.Errorf(codes.Internal, "couldn't parse snapshot's created field: %s", err.Error()) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. use a |
||
} | ||
|
||
return &csi.Snapshot{ | ||
Id: snap.ID, | ||
SourceVolumeId: snap.ResourceID, | ||
SizeBytes: int64(snap.SizeGigaBytes) * GB, | ||
CreatedAt: createdAt.UTC().UnixNano(), | ||
Status: &csi.SnapshotStatus{ | ||
Type: csi.SnapshotStatus_READY, | ||
}, | ||
}, nil | ||
} | ||
|
||
// validateCapabilities validates the requested capabilities. It returns false | ||
// if it doesn't satisfy the currently supported modes of DigitalOcean Block | ||
// Storage | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe set the
listOpts.PerPage = req.MaxEntries
value here. not guaranteed to be respected but at least it'll avoid making unnecessary calls if the two align