Skip to content

support for registering custom index clients, added new methods to object stores #2049

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Feb 7, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.12
require (
cloud.google.com/go/bigtable v1.1.0
cloud.google.com/go/storage v1.3.0
github.com/Azure/azure-pipeline-go v0.2.2
github.com/Azure/azure-storage-blob-go v0.8.0
github.com/Masterminds/squirrel v0.0.0-20161115235646-20f192218cf5
github.com/NYTimes/gziphandler v1.1.1
Expand Down
10 changes: 2 additions & 8 deletions pkg/chunk/aws/dynamodb_storage_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,19 +125,13 @@ func (cfg *DynamoDBConfig) RegisterFlags(f *flag.FlagSet) {
// StorageConfig specifies config for storing data on AWS.
type StorageConfig struct {
DynamoDBConfig
S3 flagext.URLValue
BucketNames string
S3ForcePathStyle bool
S3Config `yaml:",inline"`
}

// RegisterFlags adds the flags required to config this to the given FlagSet
func (cfg *StorageConfig) RegisterFlags(f *flag.FlagSet) {
cfg.DynamoDBConfig.RegisterFlags(f)

f.Var(&cfg.S3, "s3.url", "S3 endpoint URL with escaped Key and Secret encoded. "+
"If only region is specified as a host, proper endpoint will be deduced. Use inmemory:///<bucket-name> to use a mock in-memory implementation.")
f.BoolVar(&cfg.S3ForcePathStyle, "s3.force-path-style", false, "Set this to `true` to force the request to use path-style addressing.")
f.StringVar(&cfg.BucketNames, "s3.buckets", "", "Comma separated list of bucket names to evenly distribute chunks over. Overrides any buckets specified in s3.url flag")
cfg.S3Config.RegisterFlags(f)
}

type dynamoDBStorageClient struct {
Expand Down
2 changes: 1 addition & 1 deletion pkg/chunk/aws/fixtures.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ var Fixtures = []testutils.Fixture{
batchWriteItemRequestFn: dynamoDB.batchWriteItemRequest,
schemaCfg: schemaConfig,
}
object := &s3ObjectClient{
object := &S3ObjectClient{
S3: newMockS3(),
}
return index, object, table, schemaConfig, nil
Expand Down
154 changes: 117 additions & 37 deletions pkg/chunk/aws/s3_storage_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ package aws
import (
"bytes"
"context"
"flag"
"fmt"
"hash/fnv"
"io"
"io/ioutil"
"strings"

Expand All @@ -16,6 +18,7 @@ import (

"github.com/cortexproject/cortex/pkg/chunk"
"github.com/cortexproject/cortex/pkg/chunk/util"
"github.com/cortexproject/cortex/pkg/util/flagext"
awscommon "github.com/weaveworks/common/aws"
"github.com/weaveworks/common/instrument"
)
Expand All @@ -33,13 +36,33 @@ func init() {
s3RequestDuration.Register()
}

type s3ObjectClient struct {
// S3Config specifies config for storing chunks on AWS S3.
type S3Config struct {
S3 flagext.URLValue
BucketNames string
S3ForcePathStyle bool
}

// RegisterFlags adds the flags required to config this to the given FlagSet
func (cfg *S3Config) RegisterFlags(f *flag.FlagSet) {
cfg.RegisterFlagsWithPrefix("", f)
}

// RegisterFlagsWithPrefix adds the flags required to config this to the given FlagSet with a specified prefix
func (cfg *S3Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.Var(&cfg.S3, prefix+"s3.url", "S3 endpoint URL with escaped Key and Secret encoded. "+
"If only region is specified as a host, proper endpoint will be deduced. Use inmemory:///<bucket-name> to use a mock in-memory implementation.")
f.BoolVar(&cfg.S3ForcePathStyle, prefix+"s3.force-path-style", false, "Set this to `true` to force the request to use path-style addressing.")
f.StringVar(&cfg.BucketNames, prefix+"s3.buckets", "", "Comma separated list of bucket names to evenly distribute chunks over. Overrides any buckets specified in s3.url flag")
}

type S3ObjectClient struct {
bucketNames []string
S3 s3iface.S3API
}

// NewS3ObjectClient makes a new S3-backed ObjectClient.
func NewS3ObjectClient(cfg StorageConfig, schemaCfg chunk.SchemaConfig) (chunk.ObjectClient, error) {
func NewS3ObjectClient(cfg S3Config) (*S3ObjectClient, error) {
if cfg.S3.URL == nil {
return nil, fmt.Errorf("no URL specified for S3")
}
Expand All @@ -60,50 +83,40 @@ func NewS3ObjectClient(cfg StorageConfig, schemaCfg chunk.SchemaConfig) (chunk.O
if cfg.BucketNames != "" {
bucketNames = strings.Split(cfg.BucketNames, ",") // comma separated list of bucket names
}
client := s3ObjectClient{
client := S3ObjectClient{
S3: s3Client,
bucketNames: bucketNames,
}
return client, nil
return &client, nil
}

func (a s3ObjectClient) Stop() {
func (a *S3ObjectClient) Stop() {
}

func (a s3ObjectClient) GetChunks(ctx context.Context, chunks []chunk.Chunk) ([]chunk.Chunk, error) {
func (a *S3ObjectClient) GetChunks(ctx context.Context, chunks []chunk.Chunk) ([]chunk.Chunk, error) {
return util.GetParallelChunks(ctx, chunks, a.getChunk)
}

func (a s3ObjectClient) getChunk(ctx context.Context, decodeContext *chunk.DecodeContext, c chunk.Chunk) (chunk.Chunk, error) {
var resp *s3.GetObjectOutput

// Map the key into a bucket
key := c.ExternalKey()
bucket := a.bucketFromKey(key)

err := instrument.CollectedRequest(ctx, "S3.GetObject", s3RequestDuration, instrument.ErrorCode, func(ctx context.Context) error {
var err error
resp, err = a.S3.GetObjectWithContext(ctx, &s3.GetObjectInput{
Bucket: aws.String(bucket),
Key: aws.String(key),
})
return err
})
func (a *S3ObjectClient) getChunk(ctx context.Context, decodeContext *chunk.DecodeContext, c chunk.Chunk) (chunk.Chunk, error) {
readCloser, err := a.GetObject(ctx, c.ExternalKey())
if err != nil {
return chunk.Chunk{}, err
}
defer resp.Body.Close()
buf, err := ioutil.ReadAll(resp.Body)

defer readCloser.Close()

buf, err := ioutil.ReadAll(readCloser)
if err != nil {
return chunk.Chunk{}, err
}

if err := c.Decode(decodeContext, buf); err != nil {
return chunk.Chunk{}, err
}
return c, nil
}

func (a s3ObjectClient) PutChunks(ctx context.Context, chunks []chunk.Chunk) error {
func (a *S3ObjectClient) PutChunks(ctx context.Context, chunks []chunk.Chunk) error {
var (
s3ChunkKeys []string
s3ChunkBufs [][]byte
Expand All @@ -123,7 +136,7 @@ func (a s3ObjectClient) PutChunks(ctx context.Context, chunks []chunk.Chunk) err
incomingErrors := make(chan error)
for i := range s3ChunkBufs {
go func(i int) {
incomingErrors <- a.putS3Chunk(ctx, s3ChunkKeys[i], s3ChunkBufs[i])
incomingErrors <- a.PutObject(ctx, s3ChunkKeys[i], bytes.NewReader(s3ChunkBufs[i]))
}(i)
}

Expand All @@ -137,19 +150,8 @@ func (a s3ObjectClient) PutChunks(ctx context.Context, chunks []chunk.Chunk) err
return lastErr
}

func (a s3ObjectClient) putS3Chunk(ctx context.Context, key string, buf []byte) error {
return instrument.CollectedRequest(ctx, "S3.PutObject", s3RequestDuration, instrument.ErrorCode, func(ctx context.Context) error {
_, err := a.S3.PutObjectWithContext(ctx, &s3.PutObjectInput{
Body: bytes.NewReader(buf),
Bucket: aws.String(a.bucketFromKey(key)),
Key: aws.String(key),
})
return err
})
}

// bucketFromKey maps a key to a bucket name
func (a s3ObjectClient) bucketFromKey(key string) string {
func (a *S3ObjectClient) bucketFromKey(key string) string {
if len(a.bucketNames) == 0 {
return ""
}
Expand All @@ -160,3 +162,81 @@ func (a s3ObjectClient) bucketFromKey(key string) string {

return a.bucketNames[hash%uint32(len(a.bucketNames))]
}

// Get object from the store
func (a *S3ObjectClient) GetObject(ctx context.Context, objectKey string) (io.ReadCloser, error) {
var resp *s3.GetObjectOutput

// Map the key into a bucket
bucket := a.bucketFromKey(objectKey)

err := instrument.CollectedRequest(ctx, "S3.GetObject", s3RequestDuration, instrument.ErrorCode, func(ctx context.Context) error {
var err error
resp, err = a.S3.GetObjectWithContext(ctx, &s3.GetObjectInput{
Bucket: aws.String(bucket),
Key: aws.String(objectKey),
})
return err
})
if err != nil {
return nil, err
}

return resp.Body, nil
}

// Put object into the store
func (a *S3ObjectClient) PutObject(ctx context.Context, objectKey string, object io.ReadSeeker) error {
return instrument.CollectedRequest(ctx, "S3.PutObject", s3RequestDuration, instrument.ErrorCode, func(ctx context.Context) error {
_, err := a.S3.PutObjectWithContext(ctx, &s3.PutObjectInput{
Body: object,
Bucket: aws.String(a.bucketFromKey(objectKey)),
Key: aws.String(objectKey),
})
return err
})
}

// List only objects from the store non-recursively
func (a *S3ObjectClient) List(ctx context.Context, prefix string) ([]chunk.StorageObject, error) {
var storageObjects []chunk.StorageObject

for i := range a.bucketNames {
err := instrument.CollectedRequest(ctx, "S3.List", s3RequestDuration, instrument.ErrorCode, func(ctx context.Context) error {
input := s3.ListObjectsV2Input{
Bucket: aws.String(a.bucketNames[i]),
Prefix: aws.String(prefix),
Delimiter: aws.String(chunk.DirDelim),
}

for {
output, err := a.S3.ListObjectsV2WithContext(ctx, &input)
if err != nil {
return err
}

for _, content := range output.Contents {
storageObjects = append(storageObjects, chunk.StorageObject{
Key: *content.Key,
ModifiedAt: *content.LastModified,
})
}

if !*output.IsTruncated {
// No more results to fetch
break
}

input.SetContinuationToken(*output.NextContinuationToken)
}

return nil
})

if err != nil {
return nil, err
}
}

return storageObjects, nil
}
Loading