Skip to content

Commit

Permalink
feat: Support oras.PushBytes and oras.TagBytes (#293)
Browse files Browse the repository at this point in the history
Signed-off-by: Sylvia Lei <lixlei@microsoft.com>
  • Loading branch information
Wwwsylvia authored Sep 2, 2022
1 parent 880b84b commit 6d3ce22
Show file tree
Hide file tree
Showing 3 changed files with 579 additions and 20 deletions.
91 changes: 88 additions & 3 deletions content.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,15 @@ limitations under the License.
package oras

import (
"bytes"
"context"
"errors"
"fmt"
"io"

ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"golang.org/x/sync/errgroup"
"golang.org/x/sync/semaphore"
"oras.land/oras-go/v2/content"
"oras.land/oras-go/v2/errdef"
"oras.land/oras-go/v2/internal/cas"
Expand Down Expand Up @@ -149,9 +153,7 @@ func Fetch(ctx context.Context, target ReadOnlyTarget, reference string, opts Fe
}

// DefaultFetchBytesOptions provides the default FetchBytesOptions.
var DefaultFetchBytesOptions = FetchBytesOptions{
MaxBytes: defaultMaxBytes,
}
var DefaultFetchBytesOptions FetchBytesOptions

// defaultMaxBytes is the default value of MaxBytes.
const defaultMaxBytes int64 = 4 * 1024 * 1024 // 4 MiB
Expand Down Expand Up @@ -191,3 +193,86 @@ func FetchBytes(ctx context.Context, target ReadOnlyTarget, reference string, op

return desc, bytes, nil
}

// PushBytes describes the contentBytes using the given mediaType and pushes it.
// If mediaType is not specified, "application/octet-stream" is used.
func PushBytes(ctx context.Context, pusher content.Pusher, mediaType string, contentBytes []byte) (ocispec.Descriptor, error) {
desc := content.NewDescriptorFromBytes(mediaType, contentBytes)
r := bytes.NewReader(contentBytes)
if err := pusher.Push(ctx, desc, r); err != nil {
return ocispec.Descriptor{}, err
}

return desc, nil
}

// defaultTagConcurrency is the default concurrency of tagging.
const defaultTagConcurrency = 5 // This value is consistent with dockerd

// DefaultTagBytesOptions provides the default TagBytesOptions.
var DefaultTagBytesOptions TagBytesOptions

// TagBytesOptions contains parameters for oras.TagBytes.
type TagBytesOptions struct {
// Concurrency limits the maximum number of concurrent tag tasks.
// If less than or equal to 0, a default (currently 5) is used.
Concurrency int64
}

// TagBytes describes the contentBytes using the given mediaType, pushes it,
// and tag it with the given reference.
// If mediaType is not specified, "application/octet-stream" is used.
func TagBytes(ctx context.Context, target Target, mediaType string, contentBytes []byte, reference string) (ocispec.Descriptor, error) {
return TagBytesN(ctx, target, mediaType, contentBytes, []string{reference}, DefaultTagBytesOptions)
}

// TagBytesN describes the contentBytes using the given mediaType, pushes it,
// and tag it with the given references.
// If mediaType is not specified, "application/octet-stream" is used.
func TagBytesN(ctx context.Context, target Target, mediaType string, contentBytes []byte, references []string, opts TagBytesOptions) (ocispec.Descriptor, error) {
if len(references) == 0 {
return PushBytes(ctx, target, mediaType, contentBytes)
}

if opts.Concurrency <= 0 {
opts.Concurrency = defaultTagConcurrency
}
desc := content.NewDescriptorFromBytes(mediaType, contentBytes)
limiter := semaphore.NewWeighted(defaultTagConcurrency)
eg, egCtx := errgroup.WithContext(ctx)
if refPusher, ok := target.(registry.ReferencePusher); ok {
for _, reference := range references {
limiter.Acquire(ctx, 1)
eg.Go(func(ref string) func() error {
return func() error {
defer limiter.Release(1)
r := bytes.NewReader(contentBytes)
if err := refPusher.PushReference(egCtx, desc, r, ref); err != nil && !errors.Is(err, errdef.ErrAlreadyExists) {
return err
}
return nil
}
}(reference))
}
} else {
r := bytes.NewReader(contentBytes)
if err := target.Push(ctx, desc, r); err != nil && !errors.Is(err, errdef.ErrAlreadyExists) {
return ocispec.Descriptor{}, err
}

for _, reference := range references {
limiter.Acquire(ctx, 1)
eg.Go(func(ref string) func() error {
return func() error {
defer limiter.Release(1)
return target.Tag(egCtx, desc, ref)
}
}(reference))
}
}

if err := eg.Wait(); err != nil {
return ocispec.Descriptor{}, err
}
return desc, nil
}
Loading

0 comments on commit 6d3ce22

Please sign in to comment.