Skip to content

Commit

Permalink
Merge pull request #104 from agin719/cos-v4-dev
Browse files Browse the repository at this point in the history
fix multicopy and multiupload send on closed channel panic
  • Loading branch information
agin719 authored Feb 25, 2021
2 parents 6df1acc + c0e4c78 commit b14f275
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 15 deletions.
30 changes: 19 additions & 11 deletions object.go
Original file line number Diff line number Diff line change
Expand Up @@ -513,7 +513,7 @@ func (s *ObjectService) DeleteMulti(ctx context.Context, opt *ObjectDeleteMultiO
type Object struct {
Key string `xml:",omitempty"`
ETag string `xml:",omitempty"`
Size int `xml:",omitempty"`
Size int64 `xml:",omitempty"`
PartNumber int `xml:",omitempty"`
LastModified string `xml:",omitempty"`
StorageClass string `xml:",omitempty"`
Expand Down Expand Up @@ -836,35 +836,43 @@ func (s *ObjectService) Upload(ctx context.Context, name string, filepath string
}()

// 5.Recv the resp etag to complete
err = nil
for i := 0; i < partNum; i++ {
if chunks[i].Done {
optcom.Parts = append(optcom.Parts, Object{
PartNumber: chunks[i].Number, ETag: chunks[i].ETag},
)
consumedBytes += chunks[i].Size
event = newProgressEvent(ProgressDataEvent, chunks[i].Size, consumedBytes, totalBytes)
progressCallback(listener, event)
if err == nil {
consumedBytes += chunks[i].Size
event = newProgressEvent(ProgressDataEvent, chunks[i].Size, consumedBytes, totalBytes)
progressCallback(listener, event)
}
continue
}
res := <-chresults
// Notice one part fail can not get the etag according.
if res.Resp == nil || res.err != nil {
// Some part already fail, can not to get the header inside.
err := fmt.Errorf("UploadID %s, part %d failed to get resp content. error: %s", uploadID, res.PartNumber, res.err.Error())
event = newProgressEvent(ProgressFailedEvent, 0, consumedBytes, totalBytes, err)
progressCallback(listener, event)
return nil, nil, err
err = fmt.Errorf("UploadID %s, part %d failed to get resp content. error: %s", uploadID, res.PartNumber, res.err.Error())
continue
}
// Notice one part fail can not get the etag according.
etag := res.Resp.Header.Get("ETag")
optcom.Parts = append(optcom.Parts, Object{
PartNumber: res.PartNumber, ETag: etag},
)
consumedBytes += chunks[res.PartNumber-1].Size
event = newProgressEvent(ProgressDataEvent, chunks[res.PartNumber-1].Size, consumedBytes, totalBytes)
progressCallback(listener, event)
if err == nil {
consumedBytes += chunks[res.PartNumber-1].Size
event = newProgressEvent(ProgressDataEvent, chunks[res.PartNumber-1].Size, consumedBytes, totalBytes)
progressCallback(listener, event)
}
}
close(chresults)
if err != nil {
event = newProgressEvent(ProgressFailedEvent, 0, consumedBytes, totalBytes, err)
progressCallback(listener, event)
return nil, nil, err
}
sort.Sort(ObjectList(optcom.Parts))

event = newProgressEvent(ProgressCompletedEvent, 0, consumedBytes, totalBytes)
Expand Down
7 changes: 4 additions & 3 deletions object_part.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"net/url"
"sort"
"strings"
"time"
)

// InitiateMultipartUploadOptions is the option of InitateMultipartUpload
Expand Down Expand Up @@ -349,6 +350,7 @@ func copyworker(s *ObjectService, jobs <-chan *CopyJobs, results chan<- *CopyRes
results <- &copyres
break
}
time.Sleep(10 * time.Millisecond)
continue
}
results <- &copyres
Expand Down Expand Up @@ -389,7 +391,7 @@ func SplitCopyFileIntoChunks(totalBytes int64, partSize int64) ([]Chunk, int, er
return nil, 0, errors.New("Too many parts, out of 10000")
}
} else {
partNum, partSize = DividePart(totalBytes, 64)
partNum, partSize = DividePart(totalBytes, 128)
}

var chunks []Chunk
Expand Down Expand Up @@ -483,13 +485,12 @@ func (s *ObjectService) MultiCopy(ctx context.Context, name string, sourceURL st
}
close(chjobs)
}()

err = nil
for i := 0; i < partNum; i++ {
res := <-chresults
if res.res == nil || res.err != nil {
err = fmt.Errorf("UploadID %s, part %d failed to get resp content. error: %s", uploadID, res.PartNumber, res.err.Error())
break
continue
}
etag := res.res.ETag
optcom.Parts = append(optcom.Parts, Object{
Expand Down
2 changes: 1 addition & 1 deletion object_select.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
)

type JSONInputSerialization struct {
Type string `xml:"Type"`
Type string `xml:"Type,omitempty"`
}

type CSVInputSerialization struct {
Expand Down

0 comments on commit b14f275

Please sign in to comment.