Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

api: Optimize multipart upload memory usage for unknown sized stream #776

Merged
merged 1 commit into from
Aug 4, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 0 additions & 23 deletions api-put-object-common.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package minio

import (
"hash"
"io"
"math"
"os"
Expand Down Expand Up @@ -76,28 +75,6 @@ func optimalPartInfo(objectSize int64) (totalPartsCount int, partSize int64, las
return totalPartsCount, partSize, lastPartSize, nil
}

// hashCopyN - Calculates chosen hashes up to partSize amount of bytes.
func hashCopyN(hashAlgorithms map[string]hash.Hash, hashSums map[string][]byte, writer io.Writer, reader io.Reader, partSize int64) (size int64, err error) {
hashWriter := writer
for _, v := range hashAlgorithms {
hashWriter = io.MultiWriter(hashWriter, v)
}

// Copies to input at writer.
size, err = io.CopyN(hashWriter, reader, partSize)
if err != nil {
// If not EOF return error right here.
if err != io.EOF {
return 0, err
}
}

for k, v := range hashAlgorithms {
hashSums[k] = v.Sum(nil)
}
return size, err
}

// getUploadID - fetch upload id if already present for an object name
// or initiate a new request to fetch a new upload id.
func (c Client) newUploadID(bucketName, objectName string, metaData map[string][]string) (uploadID string, err error) {
Expand Down
2 changes: 1 addition & 1 deletion api-put-object-encrypted.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,5 +42,5 @@ func (c Client) PutEncryptedObject(bucketName, objectName string, reader io.Read
metadata[amzHeaderKey] = []string{encryptMaterials.GetKey()}
metadata[amzHeaderMatDesc] = []string{encryptMaterials.GetDesc()}

return c.putObjectMultipart(bucketName, objectName, encryptMaterials, -1, metadata, progress)
return c.putObjectMultipartStreamNoLength(bucketName, objectName, encryptMaterials, metadata, progress)
}
64 changes: 36 additions & 28 deletions api-put-object-multipart.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,14 @@ import (
"sort"
"strconv"
"strings"
"sync"

"github.com/minio/minio-go/pkg/s3utils"
)

func (c Client) putObjectMultipart(bucketName, objectName string, reader io.Reader, size int64,
metadata map[string][]string, progress io.Reader) (n int64, err error) {
n, err = c.putObjectMultipartNoStream(bucketName, objectName, reader, size, metadata, progress)
n, err = c.putObjectMultipartNoStream(bucketName, objectName, reader, metadata, progress)
if err != nil {
errResp := ToErrorResponse(err)
// Verify if multipart functionality is not available, if not
Expand All @@ -50,8 +51,17 @@ func (c Client) putObjectMultipart(bucketName, objectName string, reader io.Read
return n, err
}

func (c Client) putObjectMultipartNoStream(bucketName, objectName string, reader io.Reader, size int64,
metadata map[string][]string, progress io.Reader) (n int64, err error) {
// Pool to manage re-usable memory for upload objects
// with streams with unknown size.
var bufPool = sync.Pool{
New: func() interface{} {
_, partSize, _, _ := optimalPartInfo(-1)
b := make([]byte, partSize)
return &b
},
}

func (c Client) putObjectMultipartNoStream(bucketName, objectName string, reader io.Reader, metadata map[string][]string, progress io.Reader) (n int64, err error) {
// Input validation.
if err = s3utils.CheckValidBucketName(bucketName); err != nil {
return 0, err
Expand All @@ -68,7 +78,7 @@ func (c Client) putObjectMultipartNoStream(bucketName, objectName string, reader
var complMultipartUpload completeMultipartUpload

// Calculate the optimal parts info for a given size.
totalPartsCount, partSize, _, err := optimalPartInfo(size)
totalPartsCount, _, _, err := optimalPartInfo(-1)
if err != nil {
return 0, err
}
Expand All @@ -88,9 +98,6 @@ func (c Client) putObjectMultipartNoStream(bucketName, objectName string, reader
// Part number always starts with '1'.
partNumber := 1

// Initialize a temporary buffer.
tmpBuffer := new(bytes.Buffer)

// Initialize parts uploaded map.
partsInfo := make(map[int]ObjectPart)

Expand All @@ -100,53 +107,54 @@ func (c Client) putObjectMultipartNoStream(bucketName, objectName string, reader
// HTTPS connection.
hashAlgos, hashSums := c.hashMaterials()

// Calculates hash sums while copying partSize bytes into tmpBuffer.
prtSize, rErr := hashCopyN(hashAlgos, hashSums, tmpBuffer, reader, partSize)
if rErr != nil && rErr != io.EOF {
bufp := bufPool.Get().(*[]byte)
length, rErr := io.ReadFull(reader, *bufp)
if rErr == io.EOF {
break
}
if rErr != nil && rErr != io.ErrUnexpectedEOF {
bufPool.Put(bufp)
return 0, rErr
}

var reader io.Reader
// Calculates hash sums while copying partSize bytes into cw.
for k, v := range hashAlgos {
v.Write((*bufp)[:length])
hashSums[k] = v.Sum(nil)
}

// Update progress reader appropriately to the latest offset
// as we read from the source.
reader = newHook(tmpBuffer, progress)
rd := newHook(bytes.NewReader((*bufp)[:length]), progress)

// Proceed to upload the part.
var objPart ObjectPart
objPart, err = c.uploadPart(bucketName, objectName, uploadID, reader, partNumber,
hashSums["md5"], hashSums["sha256"], prtSize, metadata)
objPart, err = c.uploadPart(bucketName, objectName, uploadID, rd, partNumber,
hashSums["md5"], hashSums["sha256"], int64(length), metadata)
if err != nil {
// Reset the temporary buffer upon any error.
tmpBuffer.Reset()
bufPool.Put(bufp)
return totalUploadedSize, err
}

// Save successfully uploaded part metadata.
partsInfo[partNumber] = objPart

// Reset the temporary buffer.
tmpBuffer.Reset()

// Save successfully uploaded size.
totalUploadedSize += prtSize
totalUploadedSize += int64(length)

// Increment part number.
partNumber++

// Put back data into bufpool.
bufPool.Put(bufp)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this statement can be in defer

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We cannot defer @balamurugana we need to get this back in the next loop.


// For unknown size, Read EOF we break away.
// We do not have to upload till totalPartsCount.
if size < 0 && rErr == io.EOF {
if rErr == io.EOF {
break
}
}

// Verify if we uploaded all the data.
if size > 0 {
if totalUploadedSize != size {
return totalUploadedSize, ErrUnexpectedEOF(totalUploadedSize, size, bucketName, objectName)
}
}

// Loop over total uploaded parts to save them in
// Parts array before completing the multipart request.
for i := 1; i < partNumber; i++ {
Expand Down
49 changes: 21 additions & 28 deletions api-put-object.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ func (c Client) PutObjectWithProgress(bucketName, objectName string, reader io.R
if err != nil {
return 0, err
}

return c.putObjectCommon(bucketName, objectName, reader, size, metadata, progress)
}

Expand All @@ -203,7 +204,7 @@ func (c Client) putObjectCommon(bucketName, objectName string, reader io.Reader,
}

if size < 0 {
return c.putObjectMultipartStreamNoLength(bucketName, objectName, reader, size, metadata, progress)
return c.putObjectMultipartStreamNoLength(bucketName, objectName, reader, metadata, progress)
}

if size < minPartSize {
Expand All @@ -214,8 +215,8 @@ func (c Client) putObjectCommon(bucketName, objectName string, reader io.Reader,
return c.putObjectMultipartStream(bucketName, objectName, reader, size, metadata, progress)
}

func (c Client) putObjectMultipartStreamNoLength(bucketName, objectName string, reader io.Reader, size int64,
metadata map[string][]string, progress io.Reader) (n int64, err error) {
func (c Client) putObjectMultipartStreamNoLength(bucketName, objectName string, reader io.Reader, metadata map[string][]string,
progress io.Reader) (n int64, err error) {
// Input validation.
if err = s3utils.CheckValidBucketName(bucketName); err != nil {
return 0, err
Expand All @@ -232,7 +233,7 @@ func (c Client) putObjectMultipartStreamNoLength(bucketName, objectName string,
var complMultipartUpload completeMultipartUpload

// Calculate the optimal parts info for a given size.
totalPartsCount, partSize, _, err := optimalPartInfo(size)
totalPartsCount, _, _, err := optimalPartInfo(-1)
if err != nil {
return 0, err
}
Expand All @@ -252,60 +253,52 @@ func (c Client) putObjectMultipartStreamNoLength(bucketName, objectName string,
// Part number always starts with '1'.
partNumber := 1

// Initialize a temporary buffer.
tmpBuffer := new(bytes.Buffer)

// Initialize parts uploaded map.
partsInfo := make(map[int]ObjectPart)

for partNumber <= totalPartsCount {
// Calculates hash sums while copying partSize bytes into tmpBuffer.
prtSize, rErr := io.CopyN(tmpBuffer, reader, partSize)
if rErr != nil && rErr != io.EOF {
bufp := bufPool.Get().(*[]byte)
length, rErr := io.ReadFull(reader, *bufp)
if rErr == io.EOF {
break
}
if rErr != nil && rErr != io.ErrUnexpectedEOF {
bufPool.Put(bufp)
return 0, rErr
}

var reader io.Reader
// Update progress reader appropriately to the latest offset
// as we read from the source.
reader = newHook(tmpBuffer, progress)
rd := newHook(bytes.NewReader((*bufp)[:length]), progress)

// Proceed to upload the part.
var objPart ObjectPart
objPart, err = c.uploadPart(bucketName, objectName, uploadID, reader, partNumber,
nil, nil, prtSize, metadata)
objPart, err = c.uploadPart(bucketName, objectName, uploadID, rd, partNumber,
nil, nil, int64(length), metadata)
if err != nil {
// Reset the temporary buffer upon any error.
tmpBuffer.Reset()
bufPool.Put(bufp)
return totalUploadedSize, err
}

// Save successfully uploaded part metadata.
partsInfo[partNumber] = objPart

// Reset the temporary buffer.
tmpBuffer.Reset()

// Save successfully uploaded size.
totalUploadedSize += prtSize
totalUploadedSize += int64(length)

// Increment part number.
partNumber++

// Put back data into bufpool.
bufPool.Put(bufp)

// For unknown size, Read EOF we break away.
// We do not have to upload till totalPartsCount.
if size < 0 && rErr == io.EOF {
if rErr == io.EOF {
break
}
}

// Verify if we uploaded all the data.
if size > 0 {
if totalUploadedSize != size {
return totalUploadedSize, ErrUnexpectedEOF(totalUploadedSize, size, bucketName, objectName)
}
}

// Loop over total uploaded parts to save them in
// Parts array before completing the multipart request.
for i := 1; i < partNumber; i++ {
Expand Down
63 changes: 63 additions & 0 deletions functional_tests.go
Original file line number Diff line number Diff line change
Expand Up @@ -3609,6 +3609,68 @@ func testUserMetadataCopyingV2() {
testUserMetadataCopyingWrapper(c)
}

// Test put object with size -1 byte object.
func testPutObjectNoLengthV2() {
logger().Info()

// Seed random based on current time.
rand.Seed(time.Now().Unix())

// Instantiate new minio client object.
c, err := minio.NewV2(
os.Getenv(serverEndpoint),
os.Getenv(accessKey),
os.Getenv(secretKey),
mustParseBool(os.Getenv(enableHTTPS)),
)
if err != nil {
log.Fatal("Error:", err)
}

// Enable tracing, write to stderr.
// c.TraceOn(os.Stderr)

// Set user agent.
c.SetAppInfo("Minio-go-FunctionalTest", "0.1.0")

// Generate a new random bucket name.
bucketName := randString(60, rand.NewSource(time.Now().UnixNano()),
"minio-go-test")

// Make a new bucket.
err = c.MakeBucket(bucketName, "us-east-1")
if err != nil {
log.Fatal("Error:", err, bucketName)
}

objectName := bucketName + "unique"

// Generate data using 4 parts so that all 3 'workers' are utilized and a part is leftover.
// Use different data for each part for multipart tests to ensure part order at the end.
var buf = getDataBuffer("datafile-65-MB", MinPartSize)

// Upload an object.
n, err := c.PutObjectWithSize(bucketName, objectName, bytes.NewReader(buf), -1, nil, nil)
if err != nil {
log.Fatalf("Error: %v %s %s", err, bucketName, objectName)
}
if n != int64(len(buf)) {
log.Error(fmt.Errorf("Expected upload object size %d but got %d", len(buf), n))
}

// Remove the object.
err = c.RemoveObject(bucketName, objectName)
if err != nil {
log.Fatal("Error:", err)
}

// Remove the bucket.
err = c.RemoveBucket(bucketName)
if err != nil {
log.Fatal("Error:", err)
}
}

// Test put object with 0 byte object.
func testPutObject0ByteV2() {
logger().Info()
Expand Down Expand Up @@ -4023,6 +4085,7 @@ func main() {
testEncryptedCopyObjectV2()
testUserMetadataCopyingV2()
testPutObject0ByteV2()
testPutObjectNoLengthV2()
testMakeBucketError()
testMakeBucketRegions()
testPutObjectWithMetadata()
Expand Down