Skip to content

Commit 73567bc

Browse files
support for registering custom index clients, added new methods to object stores (#2049)
* support for registering custom index clients, added new methods to object store NewIndexClient accepts factory methods for creating custom index clients added new methods to object stores to work on objects(io.Reader) instead of just chunks Signed-off-by: Sandeep Sukhani <sandeep.d.sukhani@gmail.com> * splitted s3 config from dynamodb config and updated docs Signed-off-by: Sandeep Sukhani <sandeep.d.sukhani@gmail.com> * removed unwanted code Signed-off-by: Sandeep Sukhani <sandeep.d.sukhani@gmail.com> * added List method to azure and addressed other feedback in PR review Signed-off-by: Sandeep Sukhani <sandeep.d.sukhani@gmail.com> * addressed some of the feedback from PR review Signed-off-by: Sandeep Sukhani <sandeep.d.sukhani@gmail.com> * changes suggested from PR review Signed-off-by: Sandeep Sukhani <sandeep.d.sukhani@gmail.com> * fixed an issue with reporting errors in PutObject for GCS object store Signed-off-by: Sandeep Sukhani <sandeep.d.sukhani@gmail.com>
1 parent ec2d25b commit 73567bc

15 files changed

+686
-220
lines changed

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ go 1.12
55
require (
66
cloud.google.com/go/bigtable v1.1.0
77
cloud.google.com/go/storage v1.3.0
8+
github.com/Azure/azure-pipeline-go v0.2.2
89
github.com/Azure/azure-storage-blob-go v0.8.0
910
github.com/Masterminds/squirrel v0.0.0-20161115235646-20f192218cf5
1011
github.com/NYTimes/gziphandler v1.1.1

pkg/chunk/aws/dynamodb_storage_client.go

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -125,19 +125,13 @@ func (cfg *DynamoDBConfig) RegisterFlags(f *flag.FlagSet) {
125125
// StorageConfig specifies config for storing data on AWS.
126126
type StorageConfig struct {
127127
DynamoDBConfig
128-
S3 flagext.URLValue
129-
BucketNames string
130-
S3ForcePathStyle bool
128+
S3Config `yaml:",inline"`
131129
}
132130

133131
// RegisterFlags adds the flags required to config this to the given FlagSet
134132
func (cfg *StorageConfig) RegisterFlags(f *flag.FlagSet) {
135133
cfg.DynamoDBConfig.RegisterFlags(f)
136-
137-
f.Var(&cfg.S3, "s3.url", "S3 endpoint URL with escaped Key and Secret encoded. "+
138-
"If only region is specified as a host, proper endpoint will be deduced. Use inmemory:///<bucket-name> to use a mock in-memory implementation.")
139-
f.BoolVar(&cfg.S3ForcePathStyle, "s3.force-path-style", false, "Set this to `true` to force the request to use path-style addressing.")
140-
f.StringVar(&cfg.BucketNames, "s3.buckets", "", "Comma separated list of bucket names to evenly distribute chunks over. Overrides any buckets specified in s3.url flag")
134+
cfg.S3Config.RegisterFlags(f)
141135
}
142136

143137
type dynamoDBStorageClient struct {

pkg/chunk/aws/fixtures.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ var Fixtures = []testutils.Fixture{
4545
batchWriteItemRequestFn: dynamoDB.batchWriteItemRequest,
4646
schemaCfg: schemaConfig,
4747
}
48-
object := &s3ObjectClient{
48+
object := &S3ObjectClient{
4949
S3: newMockS3(),
5050
}
5151
return index, object, table, schemaConfig, nil

pkg/chunk/aws/s3_storage_client.go

Lines changed: 117 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,10 @@ package aws
33
import (
44
"bytes"
55
"context"
6+
"flag"
67
"fmt"
78
"hash/fnv"
9+
"io"
810
"io/ioutil"
911
"strings"
1012

@@ -16,6 +18,7 @@ import (
1618

1719
"github.com/cortexproject/cortex/pkg/chunk"
1820
"github.com/cortexproject/cortex/pkg/chunk/util"
21+
"github.com/cortexproject/cortex/pkg/util/flagext"
1922
awscommon "github.com/weaveworks/common/aws"
2023
"github.com/weaveworks/common/instrument"
2124
)
@@ -33,13 +36,33 @@ func init() {
3336
s3RequestDuration.Register()
3437
}
3538

36-
type s3ObjectClient struct {
39+
// S3Config specifies config for storing chunks on AWS S3.
40+
type S3Config struct {
41+
S3 flagext.URLValue
42+
BucketNames string
43+
S3ForcePathStyle bool
44+
}
45+
46+
// RegisterFlags adds the flags required to config this to the given FlagSet
47+
func (cfg *S3Config) RegisterFlags(f *flag.FlagSet) {
48+
cfg.RegisterFlagsWithPrefix("", f)
49+
}
50+
51+
// RegisterFlagsWithPrefix adds the flags required to config this to the given FlagSet with a specified prefix
52+
func (cfg *S3Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
53+
f.Var(&cfg.S3, prefix+"s3.url", "S3 endpoint URL with escaped Key and Secret encoded. "+
54+
"If only region is specified as a host, proper endpoint will be deduced. Use inmemory:///<bucket-name> to use a mock in-memory implementation.")
55+
f.BoolVar(&cfg.S3ForcePathStyle, prefix+"s3.force-path-style", false, "Set this to `true` to force the request to use path-style addressing.")
56+
f.StringVar(&cfg.BucketNames, prefix+"s3.buckets", "", "Comma separated list of bucket names to evenly distribute chunks over. Overrides any buckets specified in s3.url flag")
57+
}
58+
59+
type S3ObjectClient struct {
3760
bucketNames []string
3861
S3 s3iface.S3API
3962
}
4063

4164
// NewS3ObjectClient makes a new S3-backed ObjectClient.
42-
func NewS3ObjectClient(cfg StorageConfig, schemaCfg chunk.SchemaConfig) (chunk.ObjectClient, error) {
65+
func NewS3ObjectClient(cfg S3Config) (*S3ObjectClient, error) {
4366
if cfg.S3.URL == nil {
4467
return nil, fmt.Errorf("no URL specified for S3")
4568
}
@@ -60,50 +83,40 @@ func NewS3ObjectClient(cfg StorageConfig, schemaCfg chunk.SchemaConfig) (chunk.O
6083
if cfg.BucketNames != "" {
6184
bucketNames = strings.Split(cfg.BucketNames, ",") // comma separated list of bucket names
6285
}
63-
client := s3ObjectClient{
86+
client := S3ObjectClient{
6487
S3: s3Client,
6588
bucketNames: bucketNames,
6689
}
67-
return client, nil
90+
return &client, nil
6891
}
6992

70-
func (a s3ObjectClient) Stop() {
93+
func (a *S3ObjectClient) Stop() {
7194
}
7295

73-
func (a s3ObjectClient) GetChunks(ctx context.Context, chunks []chunk.Chunk) ([]chunk.Chunk, error) {
96+
func (a *S3ObjectClient) GetChunks(ctx context.Context, chunks []chunk.Chunk) ([]chunk.Chunk, error) {
7497
return util.GetParallelChunks(ctx, chunks, a.getChunk)
7598
}
7699

77-
func (a s3ObjectClient) getChunk(ctx context.Context, decodeContext *chunk.DecodeContext, c chunk.Chunk) (chunk.Chunk, error) {
78-
var resp *s3.GetObjectOutput
79-
80-
// Map the key into a bucket
81-
key := c.ExternalKey()
82-
bucket := a.bucketFromKey(key)
83-
84-
err := instrument.CollectedRequest(ctx, "S3.GetObject", s3RequestDuration, instrument.ErrorCode, func(ctx context.Context) error {
85-
var err error
86-
resp, err = a.S3.GetObjectWithContext(ctx, &s3.GetObjectInput{
87-
Bucket: aws.String(bucket),
88-
Key: aws.String(key),
89-
})
90-
return err
91-
})
100+
func (a *S3ObjectClient) getChunk(ctx context.Context, decodeContext *chunk.DecodeContext, c chunk.Chunk) (chunk.Chunk, error) {
101+
readCloser, err := a.GetObject(ctx, c.ExternalKey())
92102
if err != nil {
93103
return chunk.Chunk{}, err
94104
}
95-
defer resp.Body.Close()
96-
buf, err := ioutil.ReadAll(resp.Body)
105+
106+
defer readCloser.Close()
107+
108+
buf, err := ioutil.ReadAll(readCloser)
97109
if err != nil {
98110
return chunk.Chunk{}, err
99111
}
112+
100113
if err := c.Decode(decodeContext, buf); err != nil {
101114
return chunk.Chunk{}, err
102115
}
103116
return c, nil
104117
}
105118

106-
func (a s3ObjectClient) PutChunks(ctx context.Context, chunks []chunk.Chunk) error {
119+
func (a *S3ObjectClient) PutChunks(ctx context.Context, chunks []chunk.Chunk) error {
107120
var (
108121
s3ChunkKeys []string
109122
s3ChunkBufs [][]byte
@@ -123,7 +136,7 @@ func (a s3ObjectClient) PutChunks(ctx context.Context, chunks []chunk.Chunk) err
123136
incomingErrors := make(chan error)
124137
for i := range s3ChunkBufs {
125138
go func(i int) {
126-
incomingErrors <- a.putS3Chunk(ctx, s3ChunkKeys[i], s3ChunkBufs[i])
139+
incomingErrors <- a.PutObject(ctx, s3ChunkKeys[i], bytes.NewReader(s3ChunkBufs[i]))
127140
}(i)
128141
}
129142

@@ -137,19 +150,8 @@ func (a s3ObjectClient) PutChunks(ctx context.Context, chunks []chunk.Chunk) err
137150
return lastErr
138151
}
139152

140-
func (a s3ObjectClient) putS3Chunk(ctx context.Context, key string, buf []byte) error {
141-
return instrument.CollectedRequest(ctx, "S3.PutObject", s3RequestDuration, instrument.ErrorCode, func(ctx context.Context) error {
142-
_, err := a.S3.PutObjectWithContext(ctx, &s3.PutObjectInput{
143-
Body: bytes.NewReader(buf),
144-
Bucket: aws.String(a.bucketFromKey(key)),
145-
Key: aws.String(key),
146-
})
147-
return err
148-
})
149-
}
150-
151153
// bucketFromKey maps a key to a bucket name
152-
func (a s3ObjectClient) bucketFromKey(key string) string {
154+
func (a *S3ObjectClient) bucketFromKey(key string) string {
153155
if len(a.bucketNames) == 0 {
154156
return ""
155157
}
@@ -160,3 +162,81 @@ func (a s3ObjectClient) bucketFromKey(key string) string {
160162

161163
return a.bucketNames[hash%uint32(len(a.bucketNames))]
162164
}
165+
166+
// Get object from the store
167+
func (a *S3ObjectClient) GetObject(ctx context.Context, objectKey string) (io.ReadCloser, error) {
168+
var resp *s3.GetObjectOutput
169+
170+
// Map the key into a bucket
171+
bucket := a.bucketFromKey(objectKey)
172+
173+
err := instrument.CollectedRequest(ctx, "S3.GetObject", s3RequestDuration, instrument.ErrorCode, func(ctx context.Context) error {
174+
var err error
175+
resp, err = a.S3.GetObjectWithContext(ctx, &s3.GetObjectInput{
176+
Bucket: aws.String(bucket),
177+
Key: aws.String(objectKey),
178+
})
179+
return err
180+
})
181+
if err != nil {
182+
return nil, err
183+
}
184+
185+
return resp.Body, nil
186+
}
187+
188+
// Put object into the store
189+
func (a *S3ObjectClient) PutObject(ctx context.Context, objectKey string, object io.ReadSeeker) error {
190+
return instrument.CollectedRequest(ctx, "S3.PutObject", s3RequestDuration, instrument.ErrorCode, func(ctx context.Context) error {
191+
_, err := a.S3.PutObjectWithContext(ctx, &s3.PutObjectInput{
192+
Body: object,
193+
Bucket: aws.String(a.bucketFromKey(objectKey)),
194+
Key: aws.String(objectKey),
195+
})
196+
return err
197+
})
198+
}
199+
200+
// List only objects from the store non-recursively
201+
func (a *S3ObjectClient) List(ctx context.Context, prefix string) ([]chunk.StorageObject, error) {
202+
var storageObjects []chunk.StorageObject
203+
204+
for i := range a.bucketNames {
205+
err := instrument.CollectedRequest(ctx, "S3.List", s3RequestDuration, instrument.ErrorCode, func(ctx context.Context) error {
206+
input := s3.ListObjectsV2Input{
207+
Bucket: aws.String(a.bucketNames[i]),
208+
Prefix: aws.String(prefix),
209+
Delimiter: aws.String(chunk.DirDelim),
210+
}
211+
212+
for {
213+
output, err := a.S3.ListObjectsV2WithContext(ctx, &input)
214+
if err != nil {
215+
return err
216+
}
217+
218+
for _, content := range output.Contents {
219+
storageObjects = append(storageObjects, chunk.StorageObject{
220+
Key: *content.Key,
221+
ModifiedAt: *content.LastModified,
222+
})
223+
}
224+
225+
if !*output.IsTruncated {
226+
// No more results to fetch
227+
break
228+
}
229+
230+
input.SetContinuationToken(*output.NextContinuationToken)
231+
}
232+
233+
return nil
234+
})
235+
236+
if err != nil {
237+
return nil, err
238+
}
239+
}
240+
241+
return storageObjects, nil
242+
}

0 commit comments

Comments
 (0)