Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update cloud provider interface to take in context #45

Merged
merged 1 commit into from
Oct 8, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Update cloud provider interface to take in context
With the context object populated through the call stack, the operation
could be cancelled properly
  • Loading branch information
Cheng Pan committed Oct 6, 2018
commit af3bcdd6ae95fa11f6644d354ea4ec7070738f83
68 changes: 35 additions & 33 deletions pkg/cloud/cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package cloud

import (
"context"
"errors"
"fmt"

Expand All @@ -25,6 +26,7 @@ import (
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/credentials/ec2rolecreds"
"github.com/aws/aws-sdk-go/aws/ec2metadata"
"github.com/aws/aws-sdk-go/aws/request"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/ec2"
"github.com/golang/glog"
Expand Down Expand Up @@ -92,23 +94,23 @@ type DiskOptions struct {

// EC2 abstracts aws.EC2 to facilitate its mocking.
type EC2 interface {
DescribeVolumes(input *ec2.DescribeVolumesInput) (*ec2.DescribeVolumesOutput, error)
CreateVolume(input *ec2.CreateVolumeInput) (*ec2.Volume, error)
DeleteVolume(input *ec2.DeleteVolumeInput) (*ec2.DeleteVolumeOutput, error)
DetachVolume(input *ec2.DetachVolumeInput) (*ec2.VolumeAttachment, error)
AttachVolume(input *ec2.AttachVolumeInput) (*ec2.VolumeAttachment, error)
DescribeInstances(input *ec2.DescribeInstancesInput) (*ec2.DescribeInstancesOutput, error)
DescribeVolumesWithContext(ctx aws.Context, input *ec2.DescribeVolumesInput, opts ...request.Option) (*ec2.DescribeVolumesOutput, error)
CreateVolumeWithContext(ctx aws.Context, input *ec2.CreateVolumeInput, opts ...request.Option) (*ec2.Volume, error)
DeleteVolumeWithContext(ctx aws.Context, input *ec2.DeleteVolumeInput, opts ...request.Option) (*ec2.DeleteVolumeOutput, error)
DetachVolumeWithContext(ctx aws.Context, input *ec2.DetachVolumeInput, opts ...request.Option) (*ec2.VolumeAttachment, error)
AttachVolumeWithContext(ctx aws.Context, input *ec2.AttachVolumeInput, opts ...request.Option) (*ec2.VolumeAttachment, error)
DescribeInstancesWithContext(ctx aws.Context, input *ec2.DescribeInstancesInput, opts ...request.Option) (*ec2.DescribeInstancesOutput, error)
}

type Cloud interface {
GetMetadata() MetadataService
CreateDisk(volumeName string, diskOptions *DiskOptions) (disk *Disk, err error)
DeleteDisk(volumeID string) (success bool, err error)
AttachDisk(volumeID string, nodeID string) (devicePath string, err error)
DetachDisk(volumeID string, nodeID string) (err error)
GetDiskByName(name string, capacityBytes int64) (disk *Disk, err error)
GetDiskByID(volumeID string) (disk *Disk, err error)
IsExistInstance(nodeID string) (sucess bool)
CreateDisk(ctx context.Context, volumeName string, diskOptions *DiskOptions) (disk *Disk, err error)
DeleteDisk(ctx context.Context, volumeID string) (success bool, err error)
AttachDisk(ctx context.Context, volumeID string, nodeID string) (devicePath string, err error)
DetachDisk(ctx context.Context, volumeID string, nodeID string) (err error)
GetDiskByName(ctx context.Context, name string, capacityBytes int64) (disk *Disk, err error)
GetDiskByID(ctx context.Context, volumeID string) (disk *Disk, err error)
IsExistInstance(ctx context.Context, nodeID string) (sucess bool)
}

type cloud struct {
Expand Down Expand Up @@ -155,7 +157,7 @@ func (c *cloud) GetMetadata() MetadataService {
return c.metadata
}

func (c *cloud) CreateDisk(volumeName string, diskOptions *DiskOptions) (*Disk, error) {
func (c *cloud) CreateDisk(ctx context.Context, volumeName string, diskOptions *DiskOptions) (*Disk, error) {
var createType string
var iops int64
capacityGiB := util.BytesToGiB(diskOptions.CapacityBytes)
Expand Down Expand Up @@ -198,7 +200,7 @@ func (c *cloud) CreateDisk(volumeName string, diskOptions *DiskOptions) (*Disk,
request.Iops = aws.Int64(iops)
}

response, err := c.ec2.CreateVolume(request)
response, err := c.ec2.CreateVolumeWithContext(ctx, request)
if err != nil {
return nil, fmt.Errorf("could not create volume in EC2: %v", err)
}
Expand All @@ -216,9 +218,9 @@ func (c *cloud) CreateDisk(volumeName string, diskOptions *DiskOptions) (*Disk,
return &Disk{CapacityGiB: size, VolumeID: volumeID}, nil
}

func (c *cloud) DeleteDisk(volumeID string) (bool, error) {
func (c *cloud) DeleteDisk(ctx context.Context, volumeID string) (bool, error) {
request := &ec2.DeleteVolumeInput{VolumeId: &volumeID}
if _, err := c.ec2.DeleteVolume(request); err != nil {
if _, err := c.ec2.DeleteVolumeWithContext(ctx, request); err != nil {
if awsErr, ok := err.(awserr.Error); ok {
if awsErr.Code() == "InvalidVolume.NotFound" {
return false, ErrNotFound
Expand All @@ -229,8 +231,8 @@ func (c *cloud) DeleteDisk(volumeID string) (bool, error) {
return true, nil
}

func (c *cloud) AttachDisk(volumeID, nodeID string) (string, error) {
instance, err := c.getInstance(nodeID)
func (c *cloud) AttachDisk(ctx context.Context, volumeID, nodeID string) (string, error) {
instance, err := c.getInstance(ctx, nodeID)
if err != nil {
return "", err
}
Expand All @@ -248,7 +250,7 @@ func (c *cloud) AttachDisk(volumeID, nodeID string) (string, error) {
VolumeId: aws.String(volumeID),
}

resp, err := c.ec2.AttachVolume(request)
resp, err := c.ec2.AttachVolumeWithContext(ctx, request)
if err != nil {
if awsErr, ok := err.(awserr.Error); ok {
if awsErr.Code() == "VolumeInUse" {
Expand Down Expand Up @@ -291,8 +293,8 @@ func (c *cloud) AttachDisk(volumeID, nodeID string) (string, error) {
return device.Path, nil
}

func (c *cloud) DetachDisk(volumeID, nodeID string) error {
instance, err := c.getInstance(nodeID)
func (c *cloud) DetachDisk(ctx context.Context, volumeID, nodeID string) error {
instance, err := c.getInstance(ctx, nodeID)
if err != nil {
return err
}
Expand All @@ -313,15 +315,15 @@ func (c *cloud) DetachDisk(volumeID, nodeID string) error {
VolumeId: aws.String(volumeID),
}

_, err = c.ec2.DetachVolume(request)
_, err = c.ec2.DetachVolumeWithContext(ctx, request)
if err != nil {
return fmt.Errorf("could not detach volume %q from node %q: %v", volumeID, nodeID, err)
}

return nil
}

func (c *cloud) GetDiskByName(name string, capacityBytes int64) (*Disk, error) {
func (c *cloud) GetDiskByName(ctx context.Context, name string, capacityBytes int64) (*Disk, error) {
request := &ec2.DescribeVolumesInput{
Filters: []*ec2.Filter{
{
Expand All @@ -331,7 +333,7 @@ func (c *cloud) GetDiskByName(name string, capacityBytes int64) (*Disk, error) {
},
}

volume, err := c.getVolume(request)
volume, err := c.getVolume(ctx, request)
if err != nil {
return nil, err
}
Expand All @@ -347,14 +349,14 @@ func (c *cloud) GetDiskByName(name string, capacityBytes int64) (*Disk, error) {
}, nil
}

func (c *cloud) GetDiskByID(volumeID string) (*Disk, error) {
func (c *cloud) GetDiskByID(ctx context.Context, volumeID string) (*Disk, error) {
request := &ec2.DescribeVolumesInput{
VolumeIds: []*string{
aws.String(volumeID),
},
}

volume, err := c.getVolume(request)
volume, err := c.getVolume(ctx, request)
if err != nil {
return nil, err
}
Expand All @@ -365,20 +367,20 @@ func (c *cloud) GetDiskByID(volumeID string) (*Disk, error) {
}, nil
}

func (c *cloud) IsExistInstance(nodeID string) bool {
instance, err := c.getInstance(nodeID)
func (c *cloud) IsExistInstance(ctx context.Context, nodeID string) bool {
instance, err := c.getInstance(ctx, nodeID)
if err != nil || instance == nil {
return false
}
return true
}

func (c *cloud) getVolume(request *ec2.DescribeVolumesInput) (*ec2.Volume, error) {
func (c *cloud) getVolume(ctx context.Context, request *ec2.DescribeVolumesInput) (*ec2.Volume, error) {
var volumes []*ec2.Volume
var nextToken *string

for {
response, err := c.ec2.DescribeVolumes(request)
response, err := c.ec2.DescribeVolumesWithContext(ctx, request)
if err != nil {
return nil, err
}
Expand All @@ -401,15 +403,15 @@ func (c *cloud) getVolume(request *ec2.DescribeVolumesInput) (*ec2.Volume, error
return volumes[0], nil
}

func (c *cloud) getInstance(nodeID string) (*ec2.Instance, error) {
func (c *cloud) getInstance(ctx context.Context, nodeID string) (*ec2.Instance, error) {
instances := []*ec2.Instance{}
request := &ec2.DescribeInstancesInput{
InstanceIds: []*string{&nodeID},
}

var nextToken *string
for {
response, err := c.ec2.DescribeInstances(request)
response, err := c.ec2.DescribeInstancesWithContext(ctx, request)
if err != nil {
return nil, fmt.Errorf("error listing AWS instances: %q", err)
}
Expand Down
36 changes: 22 additions & 14 deletions pkg/cloud/cloud_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package cloud

import (
"context"
"fmt"
"strings"
"testing"
Expand Down Expand Up @@ -76,9 +77,10 @@ func TestCreateDisk(t *testing.T) {
}
}

mockEC2.EXPECT().CreateVolume(gomock.Any()).Return(vol, tc.expErr)
ctx := context.Background()
mockEC2.EXPECT().CreateVolumeWithContext(gomock.Eq(ctx), gomock.Any()).Return(vol, tc.expErr)

disk, err := c.CreateDisk(tc.volumeName, tc.diskOptions)
disk, err := c.CreateDisk(ctx, tc.volumeName, tc.diskOptions)
if err != nil {
if tc.expErr == nil {
t.Fatalf("CreateDisk() failed: expected no error, got: %v", err)
Expand Down Expand Up @@ -134,9 +136,10 @@ func TestDeleteDisk(t *testing.T) {
mockEC2 := mocks.NewMockEC2(mockCtrl)
c := newCloud(mockEC2)

mockEC2.EXPECT().DeleteVolume(gomock.Any()).Return(&ec2.DeleteVolumeOutput{}, tc.expErr)
ctx := context.Background()
mockEC2.EXPECT().DeleteVolumeWithContext(gomock.Eq(ctx), gomock.Any()).Return(&ec2.DeleteVolumeOutput{}, tc.expErr)

ok, err := c.DeleteDisk(tc.volumeID)
ok, err := c.DeleteDisk(ctx, tc.volumeID)
if err != nil && tc.expErr == nil {
t.Fatalf("DeleteDisk() failed: expected no error, got: %v", err)
}
Expand Down Expand Up @@ -180,10 +183,11 @@ func TestAttachDisk(t *testing.T) {
mockEC2 := mocks.NewMockEC2(mockCtrl)
c := newCloud(mockEC2)

mockEC2.EXPECT().DescribeInstances(gomock.Any()).Return(newDescribeInstancesOutput(tc.nodeID), nil)
mockEC2.EXPECT().AttachVolume(gomock.Any()).Return(&ec2.VolumeAttachment{}, tc.expErr)
ctx := context.Background()
mockEC2.EXPECT().DescribeInstancesWithContext(gomock.Eq(ctx), gomock.Any()).Return(newDescribeInstancesOutput(tc.nodeID), nil)
mockEC2.EXPECT().AttachVolumeWithContext(gomock.Eq(ctx), gomock.Any()).Return(&ec2.VolumeAttachment{}, tc.expErr)

devicePath, err := c.AttachDisk(tc.volumeID, tc.nodeID)
devicePath, err := c.AttachDisk(ctx, tc.volumeID, tc.nodeID)
if err != nil {
if tc.expErr == nil {
t.Fatalf("AttachDisk() failed: expected no error, got: %v", err)
Expand Down Expand Up @@ -228,10 +232,11 @@ func TestDetachDisk(t *testing.T) {
mockEC2 := mocks.NewMockEC2(mockCtrl)
c := newCloud(mockEC2)

mockEC2.EXPECT().DescribeInstances(gomock.Any()).Return(newDescribeInstancesOutput(tc.nodeID), nil)
mockEC2.EXPECT().DetachVolume(gomock.Any()).Return(&ec2.VolumeAttachment{}, tc.expErr)
ctx := context.Background()
mockEC2.EXPECT().DescribeInstancesWithContext(gomock.Eq(ctx), gomock.Any()).Return(newDescribeInstancesOutput(tc.nodeID), nil)
mockEC2.EXPECT().DetachVolumeWithContext(gomock.Eq(ctx), gomock.Any()).Return(&ec2.VolumeAttachment{}, tc.expErr)

err := c.DetachDisk(tc.volumeID, tc.nodeID)
err := c.DetachDisk(ctx, tc.volumeID, tc.nodeID)
if err != nil {
if tc.expErr == nil {
t.Fatalf("DetachDisk() failed: expected no error, got: %v", err)
Expand Down Expand Up @@ -277,9 +282,11 @@ func TestGetDiskByName(t *testing.T) {
VolumeId: aws.String(tc.volumeName),
Size: aws.Int64(util.BytesToGiB(tc.volumeCapacity)),
}
mockEC2.EXPECT().DescribeVolumes(gomock.Any()).Return(&ec2.DescribeVolumesOutput{Volumes: []*ec2.Volume{vol}}, tc.expErr)

disk, err := c.GetDiskByName(tc.volumeName, tc.volumeCapacity)
ctx := context.Background()
mockEC2.EXPECT().DescribeVolumesWithContext(gomock.Eq(ctx), gomock.Any()).Return(&ec2.DescribeVolumesOutput{Volumes: []*ec2.Volume{vol}}, tc.expErr)

disk, err := c.GetDiskByName(ctx, tc.volumeName, tc.volumeCapacity)
if err != nil {
if tc.expErr == nil {
t.Fatalf("GetDiskByName() failed: expected no error, got: %v", err)
Expand Down Expand Up @@ -321,7 +328,8 @@ func TestGetDiskByID(t *testing.T) {
mockEC2 := mocks.NewMockEC2(mockCtrl)
c := newCloud(mockEC2)

mockEC2.EXPECT().DescribeVolumes(gomock.Any()).Return(
ctx := context.Background()
mockEC2.EXPECT().DescribeVolumesWithContext(gomock.Eq(ctx), gomock.Any()).Return(
&ec2.DescribeVolumesOutput{
Volumes: []*ec2.Volume{
{VolumeId: aws.String(tc.volumeID)},
Expand All @@ -330,7 +338,7 @@ func TestGetDiskByID(t *testing.T) {
tc.expErr,
)

disk, err := c.GetDiskByID(tc.volumeID)
disk, err := c.GetDiskByID(ctx, tc.volumeID)
if err != nil {
if tc.expErr == nil {
t.Fatalf("GetDisk() failed: expected no error, got: %v", err)
Expand Down
15 changes: 8 additions & 7 deletions pkg/cloud/fakes.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package cloud

import (
"context"
"fmt"
"math/rand"
"time"
Expand Down Expand Up @@ -47,7 +48,7 @@ func (c *FakeCloudProvider) GetMetadata() MetadataService {
return c.m
}

func (c *FakeCloudProvider) CreateDisk(volumeName string, diskOptions *DiskOptions) (*Disk, error) {
func (c *FakeCloudProvider) CreateDisk(ctx context.Context, volumeName string, diskOptions *DiskOptions) (*Disk, error) {
r1 := rand.New(rand.NewSource(time.Now().UnixNano()))
d := &fakeDisk{
Disk: &Disk{
Expand All @@ -60,7 +61,7 @@ func (c *FakeCloudProvider) CreateDisk(volumeName string, diskOptions *DiskOptio
return d.Disk, nil
}

func (c *FakeCloudProvider) DeleteDisk(volumeID string) (bool, error) {
func (c *FakeCloudProvider) DeleteDisk(ctx context.Context, volumeID string) (bool, error) {
for volName, f := range c.disks {
if f.Disk.VolumeID == volumeID {
delete(c.disks, volName)
Expand All @@ -69,19 +70,19 @@ func (c *FakeCloudProvider) DeleteDisk(volumeID string) (bool, error) {
return true, nil
}

func (c *FakeCloudProvider) AttachDisk(volumeID, nodeID string) (string, error) {
func (c *FakeCloudProvider) AttachDisk(ctx context.Context, volumeID, nodeID string) (string, error) {
if _, ok := c.pub[volumeID]; ok {
return "", ErrAlreadyExists
}
c.pub[volumeID] = nodeID
return "/dev/xvdbc", nil
}

func (c *FakeCloudProvider) DetachDisk(volumeID, nodeID string) error {
func (c *FakeCloudProvider) DetachDisk(ctx context.Context, volumeID, nodeID string) error {
return nil
}

func (c *FakeCloudProvider) GetDiskByName(name string, capacityBytes int64) (*Disk, error) {
func (c *FakeCloudProvider) GetDiskByName(ctx context.Context, name string, capacityBytes int64) (*Disk, error) {
var disks []*fakeDisk
for _, d := range c.disks {
for key, value := range d.tags {
Expand All @@ -101,7 +102,7 @@ func (c *FakeCloudProvider) GetDiskByName(name string, capacityBytes int64) (*Di
return nil, nil
}

func (c *FakeCloudProvider) GetDiskByID(volumeID string) (*Disk, error) {
func (c *FakeCloudProvider) GetDiskByID(ctx context.Context, volumeID string) (*Disk, error) {
for _, f := range c.disks {
if f.Disk.VolumeID == volumeID {
return f.Disk, nil
Expand All @@ -110,7 +111,7 @@ func (c *FakeCloudProvider) GetDiskByID(volumeID string) (*Disk, error) {
return nil, ErrNotFound
}

func (c *FakeCloudProvider) IsExistInstance(nodeID string) bool {
func (c *FakeCloudProvider) IsExistInstance(ctx context.Context, nodeID string) bool {
if nodeID != c.m.GetInstanceID() {
return false
}
Expand Down
Loading