Skip to content

Commit

Permalink
Merge pull request #99 from agin719/cos-v4-dev
Browse files Browse the repository at this point in the history
add MultiCopy
  • Loading branch information
agin719 authored Dec 29, 2020
2 parents 91fc87b + efead0c commit 957a6e1
Show file tree
Hide file tree
Showing 5 changed files with 311 additions and 4 deletions.
2 changes: 1 addition & 1 deletion cos.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (

const (
// Version current go sdk version
Version = "0.7.16"
Version = "0.7.17"
userAgent = "cos-go-sdk-v5/" + Version
contentTypeXML = "application/xml"
defaultServiceBaseURL = "http://service.cos.myqcloud.com"
Expand Down
66 changes: 66 additions & 0 deletions example/object/multicopy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package main

import (
"context"
"net/url"
"os"

"net/http"

"fmt"

"github.com/tencentyun/cos-go-sdk-v5"
"github.com/tencentyun/cos-go-sdk-v5/debug"
)

func log_status(err error) {
if err == nil {
return
}
if cos.IsNotFoundError(err) {
// WARN
fmt.Println("WARN: Resource is not existed")
} else if e, ok := cos.IsCOSError(err); ok {
fmt.Printf("ERROR: Code: %v\n", e.Code)
fmt.Printf("ERROR: Message: %v\n", e.Message)
fmt.Printf("ERROR: Resource: %v\n", e.Resource)
fmt.Printf("ERROR: RequestId: %v\n", e.RequestID)
// ERROR
} else {
fmt.Printf("ERROR: %v\n", err)
// ERROR
}
}

func main() {
u, _ := url.Parse("https://test-1259654469.cos.ap-guangzhou.myqcloud.com")
b := &cos.BaseURL{BucketURL: u}
c := cos.NewClient(b, &http.Client{
Transport: &cos.AuthorizationTransport{
SecretID: os.Getenv("COS_SECRETID"),
SecretKey: os.Getenv("COS_SECRETKEY"),
Transport: &debug.DebugRequestTransport{
RequestHeader: true,
RequestBody: true,
ResponseHeader: true,
ResponseBody: true,
},
},
})

opt := &cos.MultiCopyOptions{
OptCopy: &cos.ObjectCopyOptions{
&cos.ObjectCopyHeaderOptions{
XCosStorageClass: "Archive",
},
nil,
},
ThreadPoolSize: 10,
}
source := "exampleobject"
soruceURL := fmt.Sprintf("%s/%s", u.Host, source)
dest := fmt.Sprintf("destobject")
res, _, err := c.Object.MultiCopy(context.Background(), dest, soruceURL, opt)
log_status(err)
fmt.Printf("res:%+v\n", res)
}
30 changes: 30 additions & 0 deletions helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,3 +152,33 @@ func CheckReaderLen(reader io.Reader) error {
}
return errors.New("The single object size you upload can not be larger than 5GB")
}

func CopyOptionsToMulti(opt *ObjectCopyOptions) *InitiateMultipartUploadOptions {
if opt == nil {
return nil
}
optini := &InitiateMultipartUploadOptions{
opt.ACLHeaderOptions,
&ObjectPutHeaderOptions{},
}
if opt.ObjectCopyHeaderOptions == nil {
return optini
}
optini.ObjectPutHeaderOptions = &ObjectPutHeaderOptions{
CacheControl: opt.ObjectCopyHeaderOptions.CacheControl,
ContentDisposition: opt.ObjectCopyHeaderOptions.ContentDisposition,
ContentEncoding: opt.ObjectCopyHeaderOptions.ContentEncoding,
ContentType: opt.ObjectCopyHeaderOptions.ContentType,
ContentLanguage: opt.ObjectCopyHeaderOptions.ContentLanguage,
Expect: opt.ObjectCopyHeaderOptions.Expect,
Expires: opt.ObjectCopyHeaderOptions.Expires,
XCosMetaXXX: opt.ObjectCopyHeaderOptions.XCosMetaXXX,
XCosStorageClass: opt.ObjectCopyHeaderOptions.XCosStorageClass,
XCosServerSideEncryption: opt.ObjectCopyHeaderOptions.XCosServerSideEncryption,
XCosSSECustomerAglo: opt.ObjectCopyHeaderOptions.XCosSSECustomerAglo,
XCosSSECustomerKey: opt.ObjectCopyHeaderOptions.XCosSSECustomerKey,
XCosSSECustomerKeyMD5: opt.ObjectCopyHeaderOptions.XCosSSECustomerKeyMD5,
XOptionHeader: opt.ObjectCopyHeaderOptions.XOptionHeader,
}
return optini
}
11 changes: 8 additions & 3 deletions object.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,8 @@ type ObjectCopyResult struct {
XMLName xml.Name `xml:"CopyObjectResult"`
ETag string `xml:"ETag,omitempty"`
LastModified string `xml:"LastModified,omitempty"`
CRC64 string `xml:"CRC64,omitempty"`
VersionId string `xml:"VersionId,omitempty"`
}

// Copy 调用 PutObjectCopy 请求实现将一个文件从源路径复制到目标路径。建议文件大小 1M 到 5G,
Expand Down Expand Up @@ -588,8 +590,8 @@ func worker(s *ObjectService, jobs <-chan *Jobs, results chan<- *Results) {
}
}

func DividePart(fileSize int64) (int64, int64) {
partSize := int64(1 * 1024 * 1024)
func DividePart(fileSize int64, last int) (int64, int64) {
partSize := int64(last * 1024 * 1024)
partNum := fileSize / partSize
for partNum >= 10000 {
partSize = partSize * 2
Expand Down Expand Up @@ -621,7 +623,7 @@ func SplitFileIntoChunks(filePath string, partSize int64) (int64, []Chunk, int,
return 0, nil, 0, errors.New("Too many parts, out of 10000")
}
} else {
partNum, partSize = DividePart(stat.Size())
partNum, partSize = DividePart(stat.Size(), 1)
}

var chunks []Chunk
Expand Down Expand Up @@ -866,6 +868,9 @@ func (s *ObjectService) Upload(ctx context.Context, name string, filepath string
progressCallback(listener, event)

v, resp, err := s.CompleteMultipartUpload(context.Background(), name, uploadID, optcom)
if err != nil {
s.AbortMultipartUpload(ctx, name, uploadID)
}

return v, resp, err
}
Expand Down
206 changes: 206 additions & 0 deletions object_part.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ import (
"fmt"
"io"
"net/http"
"net/url"
"sort"
"strings"
)

// InitiateMultipartUploadOptions is the option of InitateMultipartUpload
Expand Down Expand Up @@ -307,3 +310,206 @@ func (s *ObjectService) ListUploads(ctx context.Context, opt *ObjectListUploadsO
resp, err := s.client.send(ctx, sendOpt)
return &res, resp, err
}

type MultiCopyOptions struct {
OptCopy *ObjectCopyOptions
PartSize int64
ThreadPoolSize int
}

type CopyJobs struct {
Name string
UploadId string
RetryTimes int
Chunk Chunk
Opt *ObjectCopyPartOptions
}

type CopyResults struct {
PartNumber int
Resp *Response
err error
res *CopyPartResult
}

func copyworker(s *ObjectService, jobs <-chan *CopyJobs, results chan<- *CopyResults) {
for j := range jobs {
var copyres CopyResults
j.Opt.XCosCopySourceRange = fmt.Sprintf("bytes=%d-%d", j.Chunk.OffSet, j.Chunk.OffSet+j.Chunk.Size-1)
rt := j.RetryTimes
for {
res, resp, err := s.CopyPart(context.Background(), j.Name, j.UploadId, j.Chunk.Number, j.Opt.XCosCopySource, j.Opt)
copyres.PartNumber = j.Chunk.Number
copyres.Resp = resp
copyres.err = err
copyres.res = res
if err != nil {
rt--
if rt == 0 {
results <- &copyres
break
}
continue
}
results <- &copyres
break
}
}
}

func (s *ObjectService) innerHead(ctx context.Context, sourceURL string, opt *ObjectHeadOptions, id []string) (resp *Response, err error) {
surl := strings.SplitN(sourceURL, "/", 2)
if len(surl) < 2 {
err = errors.New(fmt.Sprintf("sourceURL format error: %s", sourceURL))
return
}

u, err := url.Parse(fmt.Sprintf("https://%s", surl[0]))
if err != nil {
return
}
b := &BaseURL{BucketURL: u}
client := NewClient(b, &http.Client{
Transport: s.client.client.Transport,
})
if len(id) > 0 {
resp, err = client.Object.Head(ctx, surl[1], nil, id[0])
} else {
resp, err = client.Object.Head(ctx, surl[1], nil)
}
return
}

func SplitCopyFileIntoChunks(totalBytes int64, partSize int64) ([]Chunk, int, error) {
var partNum int64
if partSize > 0 {
partSize = partSize * 1024 * 1024
partNum = totalBytes / partSize
if partNum >= 10000 {
return nil, 0, errors.New("Too many parts, out of 10000")
}
} else {
partNum, partSize = DividePart(totalBytes, 64)
}

var chunks []Chunk
var chunk = Chunk{}
for i := int64(0); i < partNum; i++ {
chunk.Number = int(i + 1)
chunk.OffSet = i * partSize
chunk.Size = partSize
chunks = append(chunks, chunk)
}

if totalBytes%partSize > 0 {
chunk.Number = len(chunks) + 1
chunk.OffSet = int64(len(chunks)) * partSize
chunk.Size = totalBytes % partSize
chunks = append(chunks, chunk)
partNum++
}
return chunks, int(partNum), nil
}

func (s *ObjectService) MultiCopy(ctx context.Context, name string, sourceURL string, opt *MultiCopyOptions, id ...string) (*ObjectCopyResult, *Response, error) {
resp, err := s.innerHead(ctx, sourceURL, nil, id)
if err != nil {
return nil, nil, err
}
totalBytes := resp.ContentLength
surl := strings.SplitN(sourceURL, "/", 2)
if len(surl) < 2 {
return nil, nil, errors.New(fmt.Sprintf("x-cos-copy-source format error: %s", sourceURL))
}
var u string
if len(id) == 1 {
u = fmt.Sprintf("%s/%s?versionId=%s", surl[0], encodeURIComponent(surl[1]), id[0])
} else if len(id) == 0 {
u = fmt.Sprintf("%s/%s", surl[0], encodeURIComponent(surl[1]))
} else {
return nil, nil, errors.New("wrong params")
}

if opt == nil {
opt = &MultiCopyOptions{}
}
chunks, partNum, err := SplitCopyFileIntoChunks(totalBytes, opt.PartSize)
if err != nil {
return nil, nil, err
}
if partNum == 0 || totalBytes < singleUploadMaxLength {
if len(id) > 0 {
return s.Copy(ctx, name, sourceURL, opt.OptCopy, id[0])
} else {
return s.Copy(ctx, name, sourceURL, opt.OptCopy)
}
}
optini := CopyOptionsToMulti(opt.OptCopy)
var uploadID string
res, _, err := s.InitiateMultipartUpload(ctx, name, optini)
if err != nil {
return nil, nil, err
}
uploadID = res.UploadID

var poolSize int
if opt.ThreadPoolSize > 0 {
poolSize = opt.ThreadPoolSize
} else {
poolSize = 1
}

chjobs := make(chan *CopyJobs, 100)
chresults := make(chan *CopyResults, 10000)
optcom := &CompleteMultipartUploadOptions{}

for w := 1; w <= poolSize; w++ {
go copyworker(s, chjobs, chresults)
}

go func() {
for _, chunk := range chunks {
partOpt := &ObjectCopyPartOptions{
XCosCopySource: u,
}
job := &CopyJobs{
Name: name,
RetryTimes: 3,
UploadId: uploadID,
Chunk: chunk,
Opt: partOpt,
}
chjobs <- job
}
close(chjobs)
}()

err = nil
for i := 0; i < partNum; i++ {
res := <-chresults
if res.res == nil || res.err != nil {
err = fmt.Errorf("UploadID %s, part %d failed to get resp content. error: %s", uploadID, res.PartNumber, res.err.Error())
break
}
etag := res.res.ETag
optcom.Parts = append(optcom.Parts, Object{
PartNumber: res.PartNumber, ETag: etag},
)
}
close(chresults)
if err != nil {
return nil, nil, err
}
sort.Sort(ObjectList(optcom.Parts))

v, resp, err := s.CompleteMultipartUpload(ctx, name, uploadID, optcom)
if err != nil {
s.AbortMultipartUpload(ctx, name, uploadID)
}
cpres := &ObjectCopyResult{
ETag: v.ETag,
CRC64: resp.Header.Get("x-cos-hash-crc64ecma"),
VersionId: resp.Header.Get("x-cos-version-id"),
}
return cpres, resp, err
}

0 comments on commit 957a6e1

Please sign in to comment.