Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

csi: implement Snapshot functionality #103

Merged
merged 3 commits into from
Nov 15, 2018
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
## unreleased

* Add Snapshots functionality
[[GH-103]](https://github.com/digitalocean/csi-digitalocean/pull/103)
* Fix inconsistent usage of the driver name
[[GH-100]](https://github.com/digitalocean/csi-digitalocean/pull/100)
* Use publish_info in ControllerPublishVolume for storing and accessing the
Expand Down
218 changes: 198 additions & 20 deletions driver/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -547,16 +547,13 @@ func (d *Driver) ControllerGetCapabilities(ctx context.Context, req *csi.Control
}
}

// TODO(arslan): checkout if the capabilities are worth supporting
var caps []*csi.ControllerServiceCapability
for _, cap := range []csi.ControllerServiceCapability_RPC_Type{
csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME,
csi.ControllerServiceCapability_RPC_PUBLISH_UNPUBLISH_VOLUME,
csi.ControllerServiceCapability_RPC_LIST_VOLUMES,

// TODO(arslan): enable once snapshotting is supported
// csi.ControllerServiceCapability_RPC_CREATE_DELETE_SNAPSHOT,
// csi.ControllerServiceCapability_RPC_LIST_SNAPSHOTS,
csi.ControllerServiceCapability_RPC_CREATE_DELETE_SNAPSHOT,
csi.ControllerServiceCapability_RPC_LIST_SNAPSHOTS,
} {
caps = append(caps, newCap(cap))
}
Expand All @@ -575,32 +572,194 @@ func (d *Driver) ControllerGetCapabilities(ctx context.Context, req *csi.Control
// CreateSnapshot will be called by the CO to create a new snapshot from a
// source volume on behalf of a user.
func (d *Driver) CreateSnapshot(ctx context.Context, req *csi.CreateSnapshotRequest) (*csi.CreateSnapshotResponse, error) {
d.log.WithFields(logrus.Fields{
"req": req,
"method": "create_snapshot",
}).Warn("create snapshot is not implemented")
return nil, status.Error(codes.Unimplemented, "")
if req.GetName() == "" {
return nil, status.Error(codes.InvalidArgument, "CreateSnapshot Name must be provided")
}

if req.GetSourceVolumeId() == "" {
return nil, status.Error(codes.InvalidArgument, "CreateSnapshot Source Volume ID must be provided")
}

ll := d.log.WithFields(logrus.Fields{
"req_name": req.GetName(),
"req_source_volume_id": req.GetSourceVolumeId(),
"req_parameters": req.GetParameters(),
"method": "create_snapshot",
})

ll.Info("create snapshot is called")

// get snapshot first, if it's created do no thing
snapshots, _, err := d.storage.ListSnapshots(ctx, req.GetSourceVolumeId(), nil)
if err != nil {
return nil, status.Errorf(codes.Internal, "couldn't fetch snapshots: %s", err.Error())
}

for _, snap := range snapshots {
if snap.Name == req.GetName() && snap.ResourceID == req.GetSourceVolumeId() {
s, err := toCsiSnapshot(&snap)
if err != nil {
return nil, status.Errorf(codes.Internal,
"couldn't convert DO snapshot to CSI snapshot: %s", err.Error())
}

return &csi.CreateSnapshotResponse{
Snapshot: s,
}, nil
}
}

snap, resp, err := d.storage.CreateSnapshot(ctx, &godo.SnapshotCreateRequest{
VolumeID: req.GetSourceVolumeId(),
Name: req.GetName(),
Description: createdByDO,
})
if err != nil {
if resp != nil && resp.StatusCode == http.StatusConflict {
// 409 is returned when we try to snapshot a volume with the same
// name
ll.WithFields(logrus.Fields{
"error": err,
"resp": resp,
}).Warn("snapshot create failed, might be due using an existing name")
return nil, status.Errorf(codes.AlreadyExists, "snapshot with name %s already exists", req.GetName())
}

return nil, status.Error(codes.Internal, err.Error())
}

s, err := toCsiSnapshot(snap)
if err != nil {
return nil, status.Errorf(codes.Internal,
"couldn't convert DO snapshot to CSI snapshot: %s", err.Error())
}

return &csi.CreateSnapshotResponse{
Snapshot: s,
}, nil
}

// DeleteSnapshost will be called by the CO to delete a snapshot.
func (d *Driver) DeleteSnapshot(ctx context.Context, req *csi.DeleteSnapshotRequest) (*csi.DeleteSnapshotResponse, error) {
d.log.WithFields(logrus.Fields{
"req": req,
"method": "delete_snapshot",
}).Warn("delete snapshot is not implemented")
return nil, status.Error(codes.Unimplemented, "")
ll := d.log.WithFields(logrus.Fields{
"req_snapshot_id": req.GetSnapshotId(),
"method": "delete_snapshot",
})

ll.Info("delete snapshot is called")

if req.GetSnapshotId() == "" {
return nil, status.Error(codes.InvalidArgument, "DeleteSnapshot Snapshot ID must be provided")
}

resp, err := d.storage.DeleteSnapshot(ctx, req.GetSnapshotId())
if err != nil {
if resp != nil && resp.StatusCode == http.StatusNotFound {
// we assume it's deleted already for idempotency
ll.WithFields(logrus.Fields{
"error": err,
"resp": resp,
}).Warn("assuming snapshot is deleted already")
return &csi.DeleteSnapshotResponse{}, nil
}
return nil, err
}

ll.WithField("response", resp).Info("snapshot is deleted")
return &csi.DeleteSnapshotResponse{}, nil
}

// ListSnapshots returns the information about all snapshots on the storage
// system within the given parameters regardless of how they were created.
// ListSnapshots shold not list a snapshot that is being created but has not
// been cut successfully yet.
func (d *Driver) ListSnapshots(ctx context.Context, req *csi.ListSnapshotsRequest) (*csi.ListSnapshotsResponse, error) {
d.log.WithFields(logrus.Fields{
"req": req,
"method": "list_snapshots",
}).Warn("list snapshots is not implemented")
return nil, status.Error(codes.Unimplemented, "")
// Pagination in the CSI world works different than at DO. CSI sens the
// `req.MaxEntries` to indicate how much snapshots it wants. The
// req.StartingToken is returned by us, if we somehow need to indicate that
// we couldn't fetch and need to fetch again. But it's NOT the page number.
// Suppose CSI wants us to fetch 50 entries, we only fetch 30, we need to
// return NextToken as 31, to indicate that we want to continue returning
// from the index 31 up to 50.

// Check if it's an integer
var nextToken int
var err error
if req.StartingToken != "" {
nextToken, err = strconv.Atoi(req.StartingToken)
if err != nil {
return nil, status.Errorf(codes.Aborted, "ListSnapshots starting token is not valid : %s",
err.Error())
}
}

if nextToken != 0 && req.MaxEntries != 0 {
return nil, status.Errorf(codes.Aborted,
"ListSnapshots invalid arguments starting token: %d and max entries: %d can't be non null at the same time", nextToken, req.MaxEntries)
}

ll := d.log.WithFields(logrus.Fields{
"req_starting_token": req.StartingToken,
"method": "list_snapshots",
})
ll.Info("list snapshots is called")

// fetch all entries
listOpts := &godo.ListOptions{}
Copy link

@aybabtme aybabtme Nov 14, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe set the listOpts.PerPage = req.MaxEntries value here. not guaranteed to be respected but at least it'll avoid making unnecessary calls if the two align

var snapshots []godo.Snapshot
for {
snaps, resp, err := d.snapshots.ListVolume(ctx, listOpts)
if err != nil {
return nil, status.Errorf(codes.Aborted, "ListSnapshots listing volume snapshots has failed: %s", err.Error())
}

snapshots = append(snapshots, snaps...)

if resp.Links == nil || resp.Links.IsLastPage() {
break
}

page, err := resp.Links.CurrentPage()
if err != nil {
return nil, err
}

listOpts.Page = page + 1

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also set listOpts.PerPage according to the response you got

}

if nextToken > len(snapshots) {
return nil, status.Error(codes.Aborted, "ListSnapshots starting token is greater than total number of snapshots")
}

if nextToken != 0 {
snapshots = snapshots[nextToken:]
}

if req.MaxEntries != 0 {
nextToken = len(snapshots) - int(req.MaxEntries) - 1
snapshots = snapshots[:req.MaxEntries]
}

var entries []*csi.ListSnapshotsResponse_Entry

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

entries := make([]*csi.ListSnapshotsResponse_Entry, 0, len(snapshots))

for _, snapshot := range snapshots {
snap, err := toCsiSnapshot(&snapshot)
if err != nil {
return nil, status.Errorf(codes.Internal,
"couldn't convert DO snapshot to CSI snapshot: %s", err.Error())
}

entries = append(entries, &csi.ListSnapshotsResponse_Entry{
Snapshot: snap,
})
}

listResp := &csi.ListSnapshotsResponse{
Entries: entries,
NextToken: strconv.Itoa(nextToken),
}

ll.WithField("response", listResp).Info("snapshots listed")
return listResp, nil
}

// extractStorage extracts the storage size in GB from the given capacity
Expand Down Expand Up @@ -704,6 +863,25 @@ func (d *Driver) checkLimit(ctx context.Context) error {
return nil
}

// toCsiSnapshot converts a DO Snapshot struct into a csi.Snapshot struct
func toCsiSnapshot(snap *godo.Snapshot) (*csi.Snapshot, error) {
// snapshot already exists, return the information we have
createdAt, err := time.Parse(time.RFC3339, snap.Created)
if err != nil {
return nil, status.Errorf(codes.Internal, "couldn't parse snapshot's created field: %s", err.Error())

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use a fmt.Errorf here, not a grpc error

}

return &csi.Snapshot{
Id: snap.ID,
SourceVolumeId: snap.ResourceID,
SizeBytes: int64(snap.SizeGigaBytes) * GB,
CreatedAt: createdAt.UTC().UnixNano(),
Status: &csi.SnapshotStatus{
Type: csi.SnapshotStatus_READY,
},
}, nil
}

// validateCapabilities validates the requested capabilities. It returns false
// if it doesn't satisfy the currently supported modes of DigitalOcean Block
// Storage
Expand Down
2 changes: 2 additions & 0 deletions driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ type Driver struct {
storage godo.StorageService
storageActions godo.StorageActionsService
droplets godo.DropletsService
snapshots godo.SnapshotsService
account godo.AccountService

// ready defines whether the driver is ready to function. This value will
Expand Down Expand Up @@ -114,6 +115,7 @@ func NewDriver(ep, token, url string) (*Driver, error) {
storage: doClient.Storage,
storageActions: doClient.StorageActions,
droplets: doClient.Droplets,
snapshots: doClient.Snapshots,
account: doClient.Account,
}, nil
}
Expand Down
Loading