Skip to content

Commit

Permalink
Support prefix on S3 object names (#213)
Browse files Browse the repository at this point in the history
* Add ObjectPrefix field to S3Store

* Integrate S3ObjectPrefix with Flags

* Account for S3ObjectPrefix flag in CreateComposer

* Account for ObjectPrefix in S3Store operations

* Add test for configuring an S3Store with ObjectPrefix
  • Loading branch information
acj authored and Acconut committed Nov 13, 2018
1 parent a4a6ef5 commit 27c9c4a
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 12 deletions.
1 change: 1 addition & 0 deletions cmd/tusd/cli/composer.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ func CreateComposer() {
// Derive credentials from default credential chain (env, shared, ec2 instance role)
// as per https://github.com/aws/aws-sdk-go#configuring-credentials
store := s3store.New(Flags.S3Bucket, s3.New(session.Must(session.NewSession()), s3Config))
store.ObjectPrefix = Flags.S3ObjectPrefix
store.UseIn(Composer)

locker := memorylocker.New()
Expand Down
2 changes: 2 additions & 0 deletions cmd/tusd/cli/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ var Flags struct {
Basepath string
Timeout int64
S3Bucket string
S3ObjectPrefix string
S3Endpoint string
GCSBucket string
FileHooksDir string
Expand All @@ -38,6 +39,7 @@ func ParseFlags() {
flag.StringVar(&Flags.Basepath, "base-path", "/files/", "Basepath of the HTTP server")
flag.Int64Var(&Flags.Timeout, "timeout", 30*1000, "Read timeout for connections in milliseconds. A zero value means that reads will not timeout")
flag.StringVar(&Flags.S3Bucket, "s3-bucket", "", "Use AWS S3 with this bucket as storage backend (requires the AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY and AWS_REGION environment variables to be set)")
flag.StringVar(&Flags.S3ObjectPrefix, "s3-object-prefix", "", "Prefix for S3 object names")
flag.StringVar(&Flags.S3Endpoint, "s3-endpoint", "", "Endpoint to use S3 compatible implementations like minio (requires s3-bucket to be pass)")
flag.StringVar(&Flags.GCSBucket, "gcs-bucket", "", "Use Google Cloud Storage with this bucket as storage backend (requires the GCS_SERVICE_ACCOUNT_FILE environment variable to be set)")
flag.StringVar(&Flags.FileHooksDir, "hooks-dir", "", "Directory to search for available hooks scripts")
Expand Down
37 changes: 25 additions & 12 deletions s3store/s3store.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,10 @@ var nonASCIIRegexp = regexp.MustCompile(`([^\x00-\x7F])`)
type S3Store struct {
// Bucket used to store the data in, e.g. "tusdstore.example.com"
Bucket string
// ObjectPrefix is prepended to the name of each S3 object that is created.
// It can be used to create a pseudo-directory structure in the bucket,
// e.g. "path/to/my/uploads".
ObjectPrefix string
// Service specifies an interface used to communicate with the S3 backend.
// Usually, this is an instance of github.com/aws/aws-sdk-go/service/s3.S3
// (http://docs.aws.amazon.com/sdk-for-go/api/service/s3/S3.html).
Expand Down Expand Up @@ -197,7 +201,7 @@ func (store S3Store) NewUpload(info tusd.FileInfo) (id string, err error) {
// Create the actual multipart upload
res, err := store.Service.CreateMultipartUpload(&s3.CreateMultipartUploadInput{
Bucket: aws.String(store.Bucket),
Key: aws.String(uploadId),
Key: store.keyWithPrefix(uploadId),
Metadata: metadata,
})
if err != nil {
Expand All @@ -215,7 +219,7 @@ func (store S3Store) NewUpload(info tusd.FileInfo) (id string, err error) {
// Create object on S3 containing information about the file
_, err = store.Service.PutObject(&s3.PutObjectInput{
Bucket: aws.String(store.Bucket),
Key: aws.String(uploadId + ".info"),
Key: store.keyWithPrefix(uploadId + ".info"),
Body: bytes.NewReader(infoJson),
ContentLength: aws.Int64(int64(len(infoJson))),
})
Expand Down Expand Up @@ -286,7 +290,7 @@ func (store S3Store) WriteChunk(id string, offset int64, src io.Reader) (int64,

_, err = store.Service.UploadPart(&s3.UploadPartInput{
Bucket: aws.String(store.Bucket),
Key: aws.String(uploadId),
Key: store.keyWithPrefix(uploadId),
UploadId: aws.String(multipartId),
PartNumber: aws.Int64(nextPartNum),
Body: file,
Expand All @@ -307,7 +311,7 @@ func (store S3Store) GetInfo(id string) (info tusd.FileInfo, err error) {
// Get file info stored in separate object
res, err := store.Service.GetObject(&s3.GetObjectInput{
Bucket: aws.String(store.Bucket),
Key: aws.String(uploadId + ".info"),
Key: store.keyWithPrefix(uploadId + ".info"),
})
if err != nil {
if isAwsError(err, "NoSuchKey") {
Expand Down Expand Up @@ -353,7 +357,7 @@ func (store S3Store) GetReader(id string) (io.Reader, error) {
// Attempt to get upload content
res, err := store.Service.GetObject(&s3.GetObjectInput{
Bucket: aws.String(store.Bucket),
Key: aws.String(uploadId),
Key: store.keyWithPrefix(uploadId),
})
if err == nil {
// No error occurred, and we are able to stream the object
Expand All @@ -371,7 +375,7 @@ func (store S3Store) GetReader(id string) (io.Reader, error) {
// never existsted or just has not been finished yet
_, err = store.Service.ListParts(&s3.ListPartsInput{
Bucket: aws.String(store.Bucket),
Key: aws.String(uploadId),
Key: store.keyWithPrefix(uploadId),
UploadId: aws.String(multipartId),
MaxParts: aws.Int64(0),
})
Expand Down Expand Up @@ -400,7 +404,7 @@ func (store S3Store) Terminate(id string) error {
// Abort the multipart upload
_, err := store.Service.AbortMultipartUpload(&s3.AbortMultipartUploadInput{
Bucket: aws.String(store.Bucket),
Key: aws.String(uploadId),
Key: store.keyWithPrefix(uploadId),
UploadId: aws.String(multipartId),
})
if err != nil && !isAwsError(err, "NoSuchUpload") {
Expand All @@ -417,10 +421,10 @@ func (store S3Store) Terminate(id string) error {
Delete: &s3.Delete{
Objects: []*s3.ObjectIdentifier{
{
Key: aws.String(uploadId),
Key: store.keyWithPrefix(uploadId),
},
{
Key: aws.String(uploadId + ".info"),
Key: store.keyWithPrefix(uploadId + ".info"),
},
},
Quiet: aws.Bool(true),
Expand Down Expand Up @@ -470,7 +474,7 @@ func (store S3Store) FinishUpload(id string) error {

_, err = store.Service.CompleteMultipartUpload(&s3.CompleteMultipartUploadInput{
Bucket: aws.String(store.Bucket),
Key: aws.String(uploadId),
Key: store.keyWithPrefix(uploadId),
UploadId: aws.String(multipartId),
MultipartUpload: &s3.CompletedMultipartUpload{
Parts: completedParts,
Expand All @@ -497,7 +501,7 @@ func (store S3Store) ConcatUploads(dest string, partialUploads []string) error {

_, err := store.Service.UploadPartCopy(&s3.UploadPartCopyInput{
Bucket: aws.String(store.Bucket),
Key: aws.String(uploadId),
Key: store.keyWithPrefix(uploadId),
UploadId: aws.String(multipartId),
// Part numbers must be in the range of 1 to 10000, inclusive. Since
// slice indexes start at 0, we add 1 to ensure that i >= 1.
Expand Down Expand Up @@ -528,7 +532,7 @@ func (store S3Store) listAllParts(id string) (parts []*s3.Part, err error) {
// Get uploaded parts
listPtr, err := store.Service.ListParts(&s3.ListPartsInput{
Bucket: aws.String(store.Bucket),
Key: aws.String(uploadId),
Key: store.keyWithPrefix(uploadId),
UploadId: aws.String(multipartId),
PartNumberMarker: aws.Int64(partMarker),
})
Expand Down Expand Up @@ -610,3 +614,12 @@ func (store S3Store) calcOptimalPartSize(size int64) (optimalPartSize int64, err
}
return optimalPartSize, nil
}

func (store S3Store) keyWithPrefix(key string) *string {
prefix := store.ObjectPrefix
if prefix != "" && !strings.HasSuffix(prefix, "/") {
prefix += "/"
}

return aws.String(prefix + key)
}
48 changes: 48 additions & 0 deletions s3store/s3store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,54 @@ func TestNewUpload(t *testing.T) {
assert.Equal("uploadId+multipartId", id)
}

func TestNewUploadWithObjectPrefix(t *testing.T) {
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()
assert := assert.New(t)

s3obj := NewMockS3API(mockCtrl)
store := New("bucket", s3obj)
store.ObjectPrefix = "my/uploaded/files"

assert.Equal("bucket", store.Bucket)
assert.Equal(s3obj, store.Service)

s1 := "hello"
s2 := "men?"

gomock.InOrder(
s3obj.EXPECT().CreateMultipartUpload(&s3.CreateMultipartUploadInput{
Bucket: aws.String("bucket"),
Key: aws.String("my/uploaded/files/uploadId"),
Metadata: map[string]*string{
"foo": &s1,
"bar": &s2,
},
}).Return(&s3.CreateMultipartUploadOutput{
UploadId: aws.String("multipartId"),
}, nil),
s3obj.EXPECT().PutObject(&s3.PutObjectInput{
Bucket: aws.String("bucket"),
Key: aws.String("my/uploaded/files/uploadId.info"),
Body: bytes.NewReader([]byte(`{"ID":"uploadId+multipartId","Size":500,"SizeIsDeferred":false,"Offset":0,"MetaData":{"bar":"menü","foo":"hello"},"IsPartial":false,"IsFinal":false,"PartialUploads":null}`)),
ContentLength: aws.Int64(int64(171)),
}),
)

info := tusd.FileInfo{
ID: "uploadId",
Size: 500,
MetaData: map[string]string{
"foo": "hello",
"bar": "menü",
},
}

id, err := store.NewUpload(info)
assert.Nil(err)
assert.Equal("uploadId+multipartId", id)
}

func TestNewUploadLargerMaxObjectSize(t *testing.T) {
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()
Expand Down

0 comments on commit 27c9c4a

Please sign in to comment.