Skip to content

Commit

Permalink
Implement kando location push/pull (kanisterio#3183)
Browse files Browse the repository at this point in the history
* Implement kando location push/pull

* Swallow errors on close
  • Loading branch information
tdmanv authored Jun 22, 2018
1 parent 101b40b commit 31b7291
Show file tree
Hide file tree
Showing 5 changed files with 138 additions and 20 deletions.
10 changes: 10 additions & 0 deletions pkg/kando/kando_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package kando

import (
"testing"

. "gopkg.in/check.v1"
)

// Hook up gocheck into the "go test" runner.
func Test(t *testing.T) { TestingT(t) }
22 changes: 17 additions & 5 deletions pkg/kando/location_pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@ package kando

import (
"context"
"io"
"os"

"github.com/spf13/cobra"

"github.com/kanisterio/kanister/pkg/location"
"github.com/kanisterio/kanister/pkg/param"
)

Expand All @@ -23,17 +26,26 @@ func newLocationPullCommand() *cobra.Command {
}

func runLocationPull(cmd *cobra.Command, args []string) error {
source := args[0]
target, err := targetWriter(args[0])
if err != nil {
return err
}
p, err := unmarshalProfileFlag(cmd)
if err != nil {
return err
}
s := pathFlag(cmd)
ctx := context.Background()
return locationPull(ctx, p, s, source)
return locationPull(ctx, p, s, target)
}

func targetWriter(target string) (io.Writer, error) {
if target != usePipeParam {
return os.Open(target)
}
return os.Stdout, nil
}

// TODO: Implement this function
func locationPull(ctx context.Context, p *param.Profile, path string, source string) error {
return nil
func locationPull(ctx context.Context, p *param.Profile, path string, target io.Writer) error {
return location.Read(ctx, target, *p, path)
}
30 changes: 26 additions & 4 deletions pkg/kando/location_push.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,13 @@ package kando

import (
"context"
"io"
"os"

"github.com/pkg/errors"
"github.com/spf13/cobra"

"github.com/kanisterio/kanister/pkg/location"
"github.com/kanisterio/kanister/pkg/param"
)

Expand All @@ -23,7 +27,10 @@ func newLocationPushCommand() *cobra.Command {
}

func runLocationPush(cmd *cobra.Command, args []string) error {
source := args[0]
source, err := sourceReader(args[0])
if err != nil {
return err
}
p, err := unmarshalProfileFlag(cmd)
if err != nil {
return err
Expand All @@ -33,7 +40,22 @@ func runLocationPush(cmd *cobra.Command, args []string) error {
return locationPush(ctx, p, s, source)
}

// TODO: Implement this function
func locationPush(ctx context.Context, p *param.Profile, path string, source string) error {
return nil
const usePipeParam = `-`

func sourceReader(source string) (io.Reader, error) {
if source != usePipeParam {
return os.Open(source)
}
fi, err := os.Stdin.Stat()
if err != nil {
errors.Wrap(err, "Failed to Stat stdin")
}
if fi.Mode()&os.ModeNamedPipe == 0 {
return nil, errors.New("Stdin must be piped when the source parameter is \"-\"")
}
return os.Stdin, nil
}

func locationPush(ctx context.Context, p *param.Profile, path string, source io.Reader) error {
return location.Write(ctx, source, *p, path)
}
69 changes: 69 additions & 0 deletions pkg/kando/location_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package kando

import (
"bytes"
"context"
"fmt"
"os"
"path/filepath"

. "gopkg.in/check.v1"

crv1alpha1 "github.com/kanisterio/kanister/pkg/apis/cr/v1alpha1"
"github.com/kanisterio/kanister/pkg/location"
"github.com/kanisterio/kanister/pkg/param"
)

type LocationSuite struct{}

var _ = Suite(&LocationSuite{})

const testBucketName = "S3_TEST_BUCKET"

var objectStoreTestEnvVars []string = []string{
location.AWSAccessKeyID,
location.AWSSecretAccessKey,
testBucketName,
}

const testContent = "test-content"

func (s *LocationSuite) TestLocationObjectStore(c *C) {
skipIfEnvNotSet(c, objectStoreTestEnvVars)
ctx := context.Background()
p := &param.Profile{
Location: crv1alpha1.Location{
Type: crv1alpha1.LocationTypeS3Compliant,
S3Compliant: &crv1alpha1.S3CompliantLocation{
Bucket: os.Getenv(testBucketName),
Prefix: c.TestName(),
},
},
Credential: param.Credential{
Type: param.CredentialTypeKeyPair,
KeyPair: &param.KeyPair{
ID: os.Getenv(location.AWSAccessKeyID),
Secret: os.Getenv(location.AWSSecretAccessKey),
},
},
}
path := filepath.Join(c.MkDir(), "test-object.txt")

source := bytes.NewBufferString(testContent)
err := locationPush(ctx, p, path, source)
c.Assert(err, IsNil)

target := bytes.NewBuffer(nil)
err = locationPull(ctx, p, path, target)
c.Assert(err, IsNil)
c.Assert(target.String(), Equals, testContent)
}

func skipIfEnvNotSet(c *C, envVars []string) {
for _, ev := range envVars {
if os.Getenv(ev) == "" {
reason := fmt.Sprintf("Test %s requires the environemnt variable '%s'", c.TestName(), ev)
c.Skip(reason)
}
}
}
27 changes: 16 additions & 11 deletions pkg/location/location.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"io"
"os/exec"
"path/filepath"
"strings"

"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -55,9 +56,8 @@ func readExec(ctx context.Context, output io.Writer, bin string, args []string,
log.WithError(err).Error("Failed to write data from pipe")
}
log.Infof("Read %d bytes", w)
if err := rc.Close(); err != nil {
log.WithError(err).Error("Failed to close pipe")
}
// rc may be closed allready. Swallow close errors.
_ = rc.Close()
}()
return errors.Wrap(cmd.Wait(), "Failed to read data from location in profile")
}
Expand All @@ -79,9 +79,8 @@ func writeExec(ctx context.Context, input io.Reader, bin string, args []string,
log.WithError(err).Error("Failed to write data from pipe")
}
log.Infof("Wrote %d bytes", w)
if err := wc.Close(); err != nil {
log.WithError(err).Error("Failed to close pipe")
}
// wc may be closed allready. Swallow close errors.
_ = wc.Close()
}()
return errors.Wrap(cmd.Wait(), "Failed to write data to location in profile")
}
Expand All @@ -102,12 +101,18 @@ func s3CompliantWriteArgs(profile param.Profile, suffix string) []string {
return awsS3CpArgs(profile, "-", dst)
}

const s3Prefix = "s3://"

func s3CompliantPath(profile param.Profile, suffix string) string {
return filepath.Join(
path := filepath.Join(
profile.Location.S3Compliant.Bucket,
profile.Location.S3Compliant.Prefix,
suffix,
)
if strings.HasPrefix(profile.Location.S3Compliant.Bucket, s3Prefix) {
return path
}
return s3Prefix + path
}

func s3CompliantEnv(profile param.Profile) []string {
Expand All @@ -126,16 +131,16 @@ func awsS3CpArgs(profile param.Profile, src string, dst string) (cmd []string) {
}

const (
awsAccessKeyID = "AWS_ACCESS_KEY_ID"
awsSecretAccessKey = "AWS_SECRET_ACCESS_KEY"
AWSAccessKeyID = "AWS_ACCESS_KEY_ID"
AWSSecretAccessKey = "AWS_SECRET_ACCESS_KEY"
)

func awsCredsEnv(cred param.Credential) []string {
if cred.Type != param.CredentialTypeKeyPair {
panic("Unsupported Credential type: " + cred.Type)
}
return []string{
fmt.Sprintf("%s=%s", awsAccessKeyID, cred.KeyPair.ID),
fmt.Sprintf("%s=%s", awsSecretAccessKey, cred.KeyPair.Secret),
fmt.Sprintf("%s=%s", AWSAccessKeyID, cred.KeyPair.ID),
fmt.Sprintf("%s=%s", AWSSecretAccessKey, cred.KeyPair.Secret),
}
}

0 comments on commit 31b7291

Please sign in to comment.