Skip to content

Commit

Permalink
fix: support archive.none for OSS directory artifacts (#6312)
Browse files Browse the repository at this point in the history
Signed-off-by: jibuji <sunnypengfei@gmail.com>
  • Loading branch information
jibuji authored Jul 27, 2021
1 parent 7ec5b3e commit 40b0824
Show file tree
Hide file tree
Showing 2 changed files with 164 additions and 21 deletions.
2 changes: 2 additions & 0 deletions USERS.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ Currently, the following organizations are **officially** using Argo Workflows:
1. [FOLIO](http://corp.folio-sec.com/)
1. [FreeWheel](https://freewheel.com/)
1. [Fynd Trak](https://trak.fynd.com/)
1. [Galixir](https://www.galixir.com/)
1. [Gardener](https://gardener.cloud/)
1. [GitHub](https://github.com/)
1. [Gladly](https://gladly.com/)
Expand Down Expand Up @@ -135,6 +136,7 @@ Currently, the following organizations are **officially** using Argo Workflows:
1. [Workiva](https://www.workiva.com/)
1. [Zhihu](https://www.zhihu.com/)


### Projects Using Argo

In addition, the following projects are **officially** using Argo Workflows:
Expand Down
183 changes: 162 additions & 21 deletions workflow/artifacts/oss/oss.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package oss

import (
"fmt"
"os"
"path/filepath"
"strings"
"time"

Expand All @@ -11,6 +13,7 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/utils/pointer"

"github.com/argoproj/argo-workflows/v3/errors"
wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
waitutil "github.com/argoproj/argo-workflows/v3/util/wait"
"github.com/argoproj/argo-workflows/v3/workflow/artifacts/common"
Expand Down Expand Up @@ -63,17 +66,21 @@ func (ossDriver *ArtifactDriver) Load(inputArtifact *wfv1.Artifact, path string)
if origErr == nil {
return true, nil
}
if ossErr, ok := origErr.(oss.ServiceError); ok {
if ossErr.Code != "NoSuchKey" {
return !isTransientOSSErr(err), fmt.Errorf("failed to get file: %w", origErr)
}
if !IsOssErrCode(origErr, "NoSuchKey") {
return !isTransientOSSErr(origErr), fmt.Errorf("failed to get file: %w", origErr)
}
// If we get here, the error was a NoSuchKey. The key might be a directory.
// There is only one method in OSS for downloading objects that does not differentiate between a file
// and a directory so we append the a trailing slash here to differentiate that prior to downloading.
err = bucket.GetObjectToFile(objectName+"/", path)
// If we get here, the error was a NoSuchKey. The key might be a oss "directory"
isDir, err := IsOssDirectory(bucket, objectName)
if err != nil {
return !isTransientOSSErr(err), err
return !isTransientOSSErr(err), fmt.Errorf("failed to test if %s/%s is a directory: %w", bucketName, objectName, err)
}
if !isDir {
// It's neither a file, nor a directory. Return the original NoSuchKey error
return false, origErr
}

if err = GetOssDirectory(bucket, objectName, path); err != nil {
return !isTransientOSSErr(err), fmt.Errorf("failed get directory: %v", err)
}
return true, nil
})
Expand All @@ -89,6 +96,11 @@ func (ossDriver *ArtifactDriver) Save(path string, outputArtifact *wfv1.Artifact
if err != nil {
return !isTransientOSSErr(err), err
}
isDir, err := file.IsDirectory(path)
if err != nil {
log.Warnf("Failed to test if %s is a directory: %v", path, err)
return false, nil
}
bucketName := outputArtifact.OSS.Bucket
if outputArtifact.OSS.CreateBucketIfNotPresent {
exists, err := osscli.IsBucketExist(bucketName)
Expand All @@ -111,19 +123,18 @@ func (ossDriver *ArtifactDriver) Save(path string, outputArtifact *wfv1.Artifact
err = setBucketLifecycleRule(osscli, outputArtifact.OSS)
return !isTransientOSSErr(err), err
}
isDir, err := file.IsDirectory(path)
if err != nil {
return false, fmt.Errorf("failed to test if %s is a directory: %w", path, err)
}
// There is only one method in OSS for uploading objects that does not differentiate between a file and a directory
// so we append the a trailing slash here to differentiate that prior to uploading.
if isDir && !strings.HasSuffix(objectName, "/") {
objectName += "/"
}
err = bucket.PutObjectFromFile(objectName, path)
if err != nil {
return !isTransientOSSErr(err), err
if isDir {
if err = putDirectory(bucket, objectName, path); err != nil {
log.Warnf("failed to put directory: %v", err)
return !isTransientOSSErr(err), err
}
} else {
if err = putFile(bucket, objectName, path); err != nil {
log.Warnf("failed to put file: %v", err)
return !isTransientOSSErr(err), err
}
}

return true, nil
})
return err
Expand Down Expand Up @@ -212,3 +223,133 @@ func isTransientOSSErr(err error) bool {
}
return false
}

func putFile(bucket *oss.Bucket, objectName, path string) error {
log.Debugf("putFile from %s to %s", path, objectName)
return bucket.PutObjectFromFile(objectName, path)
}

func putDirectory(bucket *oss.Bucket, objectName, dir string) error {
return filepath.Walk(dir, func(fpath string, info os.FileInfo, err error) error {
if err != nil {
return errors.InternalWrapError(err)
}
// build the name to be used in OSS
nameInDir, err := filepath.Rel(dir, fpath)
if err != nil {
return errors.InternalWrapError(err)
}
fObjectName := filepath.Join(objectName, nameInDir)
// create an OSS dir explicitly for every local dir, , including empty dirs.
if info.Mode().IsDir() {
// create OSS dir
if !strings.HasSuffix(fObjectName, "/") {
fObjectName += "/"
}
err = bucket.PutObject(fObjectName, nil)
if err != nil {
return err
}
}
if !info.Mode().IsRegular() {
return nil
}

err = putFile(bucket, fObjectName, fpath)
if err != nil {
return err
}
return nil
})
}

// IsOssErrCode tests if an err is an oss.ServiceError with the specified code
func IsOssErrCode(err error, code string) bool {
if serr, ok := err.(oss.ServiceError); ok {
if serr.Code == code {
return true
}
}
return false
}

// IsDirectory tests if the key is acting like a OSS directory. This just means it has at least one
// object which is prefixed with the given key
func IsOssDirectory(bucket *oss.Bucket, objectName string) (bool, error) {
if objectName == "" {
return true, nil
}
if !strings.HasSuffix(objectName, "/") {
objectName += "/"
}
rst, err := bucket.ListObjects(oss.Prefix(objectName), oss.MaxKeys(1))
if err != nil {
return false, err
}
if len(rst.CommonPrefixes)+len(rst.Objects) > 0 {
return true, nil
}
return false, nil
}

// GetOssDirectory download an OSS "directory" to local path
func GetOssDirectory(bucket *oss.Bucket, objectName, path string) error {
files, err := ListOssDirectory(bucket, objectName)
if err != nil {
return err
}
for _, f := range files {
innerName, err := filepath.Rel(objectName, f)
if err != nil {
return fmt.Errorf("get Rel path from %s to %s error: %w", f, objectName, err)
}
fpath := filepath.Join(path, innerName)
if strings.HasSuffix(f, "/") {
err = os.MkdirAll(fpath, 0o700)
if err != nil {
return fmt.Errorf("mkdir %s error: %w", fpath, err)
}
continue
}
dirPath := filepath.Dir(fpath)
err = os.MkdirAll(dirPath, 0o700)
if err != nil {
return fmt.Errorf("mkdir %s error: %w", dirPath, err)
}

err = bucket.GetObjectToFile(f, fpath)
if err != nil {
log.Warnf("failed to load object %s to %s error: %v", f, fpath, err)
return err
}
}
return nil
}

// ListOssDirectory lists all the files which are the descendants of the specified objectKey, if a file has suffix '/', then it is an OSS directory
func ListOssDirectory(bucket *oss.Bucket, objectKey string) (files []string, err error) {
if objectKey != "" {
if !strings.HasSuffix(objectKey, "/") {
objectKey += "/"
}
}

pre := oss.Prefix(objectKey)
marker := oss.Marker("")
for {
lor, err := bucket.ListObjects(marker, pre)
if err != nil {
log.Warnf("oss list object(%s) error: %v", objectKey, err)
return files, err
}
for _, obj := range lor.Objects {
files = append(files, obj.Key)
}

marker = oss.Marker(lor.NextMarker)
if !lor.IsTruncated {
break
}
}
return files, nil
}

0 comments on commit 40b0824

Please sign in to comment.