Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add objstore.List() recursive support #3823

Merged
merged 9 commits into from
Feb 24, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 0 additions & 7 deletions cmd/thanos/query_frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"github.com/NYTimes/gziphandler"
cortexfrontend "github.com/cortexproject/cortex/pkg/frontend"
"github.com/cortexproject/cortex/pkg/frontend/transport"
cortexfrontendv1 "github.com/cortexproject/cortex/pkg/frontend/v1"
"github.com/cortexproject/cortex/pkg/querier/queryrange"
cortexvalidation "github.com/cortexproject/cortex/pkg/util/validation"
"github.com/go-kit/kit/log"
Expand Down Expand Up @@ -175,12 +174,6 @@ func runQueryFrontend(
return errors.Wrap(err, "error validating the config")
}

fe, err := cortexfrontendv1.New(cortexfrontendv1.Config{}, nil, logger, reg)
kakkoyun marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return errors.Wrap(err, "setup query frontend")
}
defer fe.Close()

tripperWare, err := queryfrontend.NewTripperware(cfg.Config, reg, logger)
if err != nil {
return errors.Wrap(err, "setup tripperwares")
Expand Down
5 changes: 5 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ require (
github.com/Azure/azure-storage-blob-go v0.8.0
github.com/NYTimes/gziphandler v1.1.1
github.com/alecthomas/units v0.0.0-20210208195552-ff826a37aa15
github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a // indirect
bwplotka marked this conversation as resolved.
Show resolved Hide resolved
github.com/aliyun/aliyun-oss-go-sdk v2.0.4+incompatible
github.com/baiyubin/aliyun-sts-go-sdk v0.0.0-20180326062324-cfa1a18b161f // indirect
github.com/bradfitz/gomemcache v0.0.0-20190913173617-a41fca850d0b
github.com/cespare/xxhash v1.1.0
github.com/chromedp/cdproto v0.0.0-20200424080200-0de008e41fa0
Expand All @@ -24,6 +26,7 @@ require (
github.com/gogo/status v1.0.3
github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e
github.com/golang/snappy v0.0.3-0.20201103224600-674baa8c7fc3
github.com/gomodule/redigo v1.8.4 // indirect
github.com/googleapis/gax-go v2.0.2+incompatible
github.com/grpc-ecosystem/go-grpc-middleware v1.1.0
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
Expand Down Expand Up @@ -52,6 +55,7 @@ require (
github.com/uber/jaeger-client-go v2.25.0+incompatible
github.com/uber/jaeger-lib v2.4.0+incompatible
github.com/weaveworks/common v0.0.0-20210112142934-23c8d7fa6120
github.com/yuin/gopher-lua v0.0.0-20200816102855-ee81675732da // indirect
go.elastic.co/apm v1.5.0
go.elastic.co/apm/module/apmot v1.5.0
go.uber.org/atomic v1.7.0
Expand All @@ -74,6 +78,7 @@ replace (
// Using a 3rd-party branch for custom dialer - see https://github.com/bradfitz/gomemcache/pull/86.
// Required by Cortex https://github.com/cortexproject/cortex/pull/3051.
github.com/bradfitz/gomemcache => github.com/themihai/gomemcache v0.0.0-20180902122335-24332e2d58ab
github.com/cortexproject/cortex => github.com/pracucci/cortex v0.0.0-20210223160359-b668aa375d55
// Update to v1.1.1 to make sure windows CI pass.
github.com/elastic/go-sysinfo => github.com/elastic/go-sysinfo v1.1.1
// Make sure Prometheus version is pinned as Prometheus semver does not include Go APIs.
Expand Down
278 changes: 21 additions & 257 deletions go.sum

Large diffs are not rendered by default.

38 changes: 27 additions & 11 deletions pkg/objstore/azure/azure.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,33 +107,49 @@ func NewBucket(logger log.Logger, azureConfig []byte, component string) (*Bucket

// Iter calls f for each entry in the given directory. The argument to f is the full
// object name including the prefix of the inspected directory.
func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error) error {

func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error, options ...objstore.IterOption) error {
prefix := dir
if prefix != "" && !strings.HasSuffix(prefix, DirDelim) {
prefix += DirDelim
}

marker := blob.Marker{}
params := objstore.ApplyIterOptions(options...)
listOptions := blob.ListBlobsSegmentOptions{Prefix: prefix}

for i := 1; ; i++ {
list, err := b.containerURL.ListBlobsHierarchySegment(ctx, marker, DirDelim, blob.ListBlobsSegmentOptions{
Prefix: prefix,
})
var (
blobPrefixes []blob.BlobPrefix
blobItems []blob.BlobItem
)

if err != nil {
return errors.Wrapf(err, "cannot list blobs in directory %s (iteration #%d)", dir, i)
}
if params.Recursive {
list, err := b.containerURL.ListBlobsFlatSegment(ctx, marker, listOptions)
if err != nil {
return errors.Wrapf(err, "cannot list flat blobs with prefix %s (iteration #%d)", dir, i)
}

marker = list.NextMarker
marker = list.NextMarker
blobItems = list.Segment.BlobItems
blobPrefixes = nil
} else {
list, err := b.containerURL.ListBlobsHierarchySegment(ctx, marker, DirDelim, listOptions)
if err != nil {
return errors.Wrapf(err, "cannot list hierarchy blobs with prefix %s (iteration #%d)", dir, i)
}

marker = list.NextMarker
blobItems = list.Segment.BlobItems
blobPrefixes = list.Segment.BlobPrefixes
}

var listNames []string

for _, blob := range list.Segment.BlobItems {
for _, blob := range blobItems {
listNames = append(listNames, blob.Name)
}

for _, blobPrefix := range list.Segment.BlobPrefixes {
for _, blobPrefix := range blobPrefixes {
listNames = append(listNames, blobPrefix.Name)
}

Expand Down
14 changes: 10 additions & 4 deletions pkg/objstore/cos/cos.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,12 +137,12 @@ func (b *Bucket) Delete(ctx context.Context, name string) error {

// Iter calls f for each entry in the given directory (not recursive.). The argument to f is the full
// object name including the prefix of the inspected directory.
func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error) error {
func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error, options ...objstore.IterOption) error {
if dir != "" {
dir = strings.TrimSuffix(dir, dirDelim) + dirDelim
}

for object := range b.listObjects(ctx, dir) {
for object := range b.listObjects(ctx, dir, options...) {
if object.err != nil {
return object.err
}
Expand Down Expand Up @@ -224,9 +224,15 @@ type objectInfo struct {
err error
}

func (b *Bucket) listObjects(ctx context.Context, objectPrefix string) <-chan objectInfo {
func (b *Bucket) listObjects(ctx context.Context, objectPrefix string, options ...objstore.IterOption) <-chan objectInfo {
objectsCh := make(chan objectInfo, 1)

// If recursive iteration is enabled we should pass an empty delimiter.
delimiter := dirDelim
if !objstore.ApplyIterOptions(options...).Recursive {
delimiter = ""
}

go func(objectsCh chan<- objectInfo) {
defer close(objectsCh)
var marker string
Expand All @@ -235,7 +241,7 @@ func (b *Bucket) listObjects(ctx context.Context, objectPrefix string) <-chan ob
Prefix: objectPrefix,
MaxKeys: 1000,
Marker: marker,
Delimiter: dirDelim,
Delimiter: delimiter,
})
if err != nil {
select {
Expand Down
15 changes: 14 additions & 1 deletion pkg/objstore/filesystem/filesystem.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ func NewBucket(rootDir string) (*Bucket, error) {

// Iter calls f for each entry in the given directory. The argument to f is the full
// object name including the prefix of the inspected directory.
func (b *Bucket) Iter(_ context.Context, dir string, f func(string) error) error {
func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error, options ...objstore.IterOption) error {
params := objstore.ApplyIterOptions(options...)
absDir := filepath.Join(b.rootDir, dir)
info, err := os.Stat(absDir)
if err != nil {
Expand Down Expand Up @@ -84,7 +85,19 @@ func (b *Bucket) Iter(_ context.Context, dir string, f func(string) error) error
// Skip empty directories.
continue
}

name += objstore.DirDelim

if params.Recursive {
// Recursively list files in the subdirectory.
if err := b.Iter(ctx, name, f, options...); err != nil {
return err
}

// The callback f() has already been called for the subdirectory
// files so we should skip to next filesystem entry.
continue
}
}
if err := f(name); err != nil {
return err
Expand Down
11 changes: 9 additions & 2 deletions pkg/objstore/gcs/gcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,15 +86,22 @@ func (b *Bucket) Name() string {

// Iter calls f for each entry in the given directory. The argument to f is the full
// object name including the prefix of the inspected directory.
func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error) error {
func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error, options ...objstore.IterOption) error {
// Ensure the object name actually ends with a dir suffix. Otherwise we'll just iterate the
// object itself as one prefix item.
if dir != "" {
dir = strings.TrimSuffix(dir, DirDelim) + DirDelim
}

// If recursive iteration is enabled we should pass an empty delimiter.
delimiter := DirDelim
if !objstore.ApplyIterOptions(options...).Recursive {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tiny bug, fixed here: #3841

Let's enable acceptance tests for you @pracucci 🤗

delimiter = ""
}

it := b.bkt.Objects(ctx, &storage.Query{
Prefix: dir,
Delimiter: DirDelim,
Delimiter: delimiter,
})
for {
select {
Expand Down
12 changes: 9 additions & 3 deletions pkg/objstore/inmem.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,9 @@ func (b *InMemBucket) Objects() map[string][]byte {

// Iter calls f for each entry in the given directory. The argument to f is the full
// object name including the prefix of the inspected directory.
func (b *InMemBucket) Iter(_ context.Context, dir string, f func(string) error) error {
func (b *InMemBucket) Iter(_ context.Context, dir string, f func(string) error, options ...IterOption) error {
unique := map[string]struct{}{}
params := ApplyIterOptions(options...)

var dirPartsCount int
dirParts := strings.SplitAfter(dir, DirDelim)
Expand All @@ -61,8 +62,13 @@ func (b *InMemBucket) Iter(_ context.Context, dir string, f func(string) error)
continue
}

parts := strings.SplitAfter(filename, DirDelim)
unique[strings.Join(parts[:dirPartsCount+1], "")] = struct{}{}
if params.Recursive {
// Any object matching the prefix should be included.
unique[filename] = struct{}{}
} else {
parts := strings.SplitAfter(filename, DirDelim)
unique[strings.Join(parts[:dirPartsCount+1], "")] = struct{}{}
}
}
b.mtx.RUnlock()

Expand Down
28 changes: 25 additions & 3 deletions pkg/objstore/objstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ type BucketReader interface {
// Iter calls f for each entry in the given directory (not recursive.). The argument to f is the full
// object name including the prefix of the inspected directory.
// Entries are passed to function in sorted order.
Iter(ctx context.Context, dir string, f func(string) error) error
Iter(ctx context.Context, dir string, f func(string) error, options ...IterOption) error

// Get returns a reader for the given object name.
Get(ctx context.Context, name string) (io.ReadCloser, error)
Expand All @@ -95,6 +95,28 @@ type InstrumentedBucketReader interface {
ReaderWithExpectedErrs(IsOpFailureExpectedFunc) BucketReader
}

// IterOption configures the provided params.
type IterOption func(params *IterParams)

// WithRecursiveIter is an option that can be applied to Iter() to recursively list objects
// in the bucket.
func WithRecursiveIter(params *IterParams) {
params.Recursive = true
}

// IterParams holds the Iter() parameters and is used by objstore clients implementations.
type IterParams struct {
Recursive bool
}

func ApplyIterOptions(options ...IterOption) IterParams {
out := IterParams{}
for _, opt := range options {
opt(&out)
}
return out
}

type ObjectAttributes struct {
// Size is the object size in bytes.
Size int64 `json:"size"`
Expand Down Expand Up @@ -308,11 +330,11 @@ func (b *metricBucket) ReaderWithExpectedErrs(fn IsOpFailureExpectedFunc) Bucket
return b.WithExpectedErrs(fn)
}

func (b *metricBucket) Iter(ctx context.Context, dir string, f func(name string) error) error {
func (b *metricBucket) Iter(ctx context.Context, dir string, f func(name string) error, options ...IterOption) error {
const op = OpIter
b.ops.WithLabelValues(op).Inc()

err := b.bkt.Iter(ctx, dir, f)
err := b.bkt.Iter(ctx, dir, f, options...)
if err != nil {
if !b.isOpFailureExpected(err) && ctx.Err() != context.Canceled {
b.opsFailures.WithLabelValues(op).Inc()
Expand Down
8 changes: 4 additions & 4 deletions pkg/objstore/objstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@ func TestMetricBucket_Close(t *testing.T) {
testutil.Equals(t, 7, promtest.CollectAndCount(bkt.opsDuration))

AcceptanceTest(t, bkt.WithExpectedErrs(bkt.IsObjNotFoundErr))
testutil.Equals(t, float64(6), promtest.ToFloat64(bkt.ops.WithLabelValues(OpIter)))
testutil.Equals(t, float64(9), promtest.ToFloat64(bkt.ops.WithLabelValues(OpIter)))
bwplotka marked this conversation as resolved.
Show resolved Hide resolved
testutil.Equals(t, float64(2), promtest.ToFloat64(bkt.ops.WithLabelValues(OpAttributes)))
testutil.Equals(t, float64(3), promtest.ToFloat64(bkt.ops.WithLabelValues(OpGet)))
testutil.Equals(t, float64(3), promtest.ToFloat64(bkt.ops.WithLabelValues(OpGetRange)))
testutil.Equals(t, float64(2), promtest.ToFloat64(bkt.ops.WithLabelValues(OpExists)))
testutil.Equals(t, float64(6), promtest.ToFloat64(bkt.ops.WithLabelValues(OpUpload)))
testutil.Equals(t, float64(8), promtest.ToFloat64(bkt.ops.WithLabelValues(OpUpload)))
testutil.Equals(t, float64(2), promtest.ToFloat64(bkt.ops.WithLabelValues(OpDelete)))
testutil.Equals(t, 7, promtest.CollectAndCount(bkt.ops))
testutil.Equals(t, float64(0), promtest.ToFloat64(bkt.opsFailures.WithLabelValues(OpIter)))
Expand All @@ -42,12 +42,12 @@ func TestMetricBucket_Close(t *testing.T) {
// Clear bucket, but don't clear metrics to ensure we use same.
bkt.bkt = NewInMemBucket()
AcceptanceTest(t, bkt)
testutil.Equals(t, float64(12), promtest.ToFloat64(bkt.ops.WithLabelValues(OpIter)))
testutil.Equals(t, float64(18), promtest.ToFloat64(bkt.ops.WithLabelValues(OpIter)))
testutil.Equals(t, float64(4), promtest.ToFloat64(bkt.ops.WithLabelValues(OpAttributes)))
testutil.Equals(t, float64(6), promtest.ToFloat64(bkt.ops.WithLabelValues(OpGet)))
testutil.Equals(t, float64(6), promtest.ToFloat64(bkt.ops.WithLabelValues(OpGetRange)))
testutil.Equals(t, float64(4), promtest.ToFloat64(bkt.ops.WithLabelValues(OpExists)))
testutil.Equals(t, float64(12), promtest.ToFloat64(bkt.ops.WithLabelValues(OpUpload)))
testutil.Equals(t, float64(16), promtest.ToFloat64(bkt.ops.WithLabelValues(OpUpload)))
testutil.Equals(t, float64(4), promtest.ToFloat64(bkt.ops.WithLabelValues(OpDelete)))
testutil.Equals(t, 7, promtest.CollectAndCount(bkt.ops))
testutil.Equals(t, float64(0), promtest.ToFloat64(bkt.opsFailures.WithLabelValues(OpIter)))
Expand Down
9 changes: 7 additions & 2 deletions pkg/objstore/oss/oss.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,17 +190,22 @@ func NewBucket(logger log.Logger, conf []byte, component string) (*Bucket, error

// Iter calls f for each entry in the given directory (not recursive). The argument to f is the full
// object name including the prefix of the inspected directory.
func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error) error {
func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error, options ...objstore.IterOption) error {
if dir != "" {
dir = strings.TrimSuffix(dir, objstore.DirDelim) + objstore.DirDelim
}

var delimiter alioss.Option
if !objstore.ApplyIterOptions(options...).Recursive {
delimiter = alioss.Delimiter(objstore.DirDelim)
}

marker := alioss.Marker("")
for {
if err := ctx.Err(); err != nil {
return errors.Wrap(err, "context closed while iterating bucket")
}
objects, err := b.bucket.ListObjects(alioss.Prefix(dir), alioss.Delimiter(objstore.DirDelim), marker)
objects, err := b.bucket.ListObjects(alioss.Prefix(dir), delimiter, marker)
if err != nil {
return errors.Wrap(err, "listing aliyun oss bucket failed")
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/objstore/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ func ValidateForTests(conf Config) error {

// Iter calls f for each entry in the given directory. The argument to f is the full
// object name including the prefix of the inspected directory.
func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error) error {
func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error, options ...objstore.IterOption) error {
// Ensure the object name actually ends with a dir suffix. Otherwise we'll just iterate the
// object itself as one prefix item.
if dir != "" {
Expand All @@ -354,7 +354,7 @@ func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error) err

opts := minio.ListObjectsOptions{
Prefix: dir,
Recursive: false,
Recursive: objstore.ApplyIterOptions(options...).Recursive,
UseV1: b.listObjectsV1,
}

Expand Down
10 changes: 8 additions & 2 deletions pkg/objstore/swift/swift.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,11 +195,17 @@ func (c *Container) Name() string {

// Iter calls f for each entry in the given directory. The argument to f is the full
// object name including the prefix of the inspected directory.
func (c *Container) Iter(_ context.Context, dir string, f func(string) error) error {
func (c *Container) Iter(_ context.Context, dir string, f func(string) error, options ...objstore.IterOption) error {
if dir != "" {
dir = strings.TrimSuffix(dir, string(DirDelim)) + string(DirDelim)
}
return c.connection.ObjectsWalk(c.name, &swift.ObjectsOpts{Prefix: dir, Delimiter: DirDelim}, func(opts *swift.ObjectsOpts) (interface{}, error) {

listOptions := &swift.ObjectsOpts{Prefix: dir}
if !objstore.ApplyIterOptions(options...).Recursive {
listOptions.Delimiter = DirDelim
}

return c.connection.ObjectsWalk(c.name, listOptions, func(opts *swift.ObjectsOpts) (interface{}, error) {
objects, err := c.connection.ObjectNames(c.name, opts)
if err != nil {
return objects, errors.Wrap(err, "list object names")
Expand Down
Loading