7
7
"sort"
8
8
9
9
"github.com/aws/aws-sdk-go/aws"
10
- "github.com/aws/aws-sdk-go/service/dynamodb"
11
10
"github.com/aws/aws-sdk-go/service/s3"
12
11
"github.com/prometheus/client_golang/prometheus"
13
12
"github.com/prometheus/common/log"
@@ -20,17 +19,11 @@ import (
20
19
"github.com/weaveworks/cortex/util"
21
20
)
22
21
23
- const (
24
- hashKey = "h"
25
- rangeKey = "r"
26
- chunkKey = "c"
27
- )
28
-
29
22
var (
30
23
indexEntriesPerChunk = prometheus .NewHistogram (prometheus.HistogramOpts {
31
24
Namespace : "cortex" ,
32
25
Name : "chunk_store_index_entries_per_chunk" ,
33
- Help : "Number of entries written to dynamodb per chunk." ,
26
+ Help : "Number of entries written to storage per chunk." ,
34
27
Buckets : prometheus .ExponentialBuckets (1 , 2 , 5 ),
35
28
})
36
29
s3RequestDuration = prometheus .NewHistogramVec (prometheus.HistogramOpts {
43
36
HistogramOpts : prometheus.HistogramOpts {
44
37
Namespace : "cortex" ,
45
38
Name : "chunk_store_row_writes_distribution" ,
46
- Help : "Distribution of writes to individual DynamoDB rows" ,
39
+ Help : "Distribution of writes to individual storage rows" ,
47
40
Buckets : prometheus .DefBuckets ,
48
41
},
49
42
HashBuckets : 1024 ,
@@ -56,18 +49,17 @@ func init() {
56
49
prometheus .MustRegister (rowWrites )
57
50
}
58
51
59
- // Store type stores and indexes chunks
60
- type Store interface {
61
- Put (ctx context.Context , chunks []Chunk ) error
62
- Get (ctx context.Context , from , through model.Time , matchers ... * metric.LabelMatcher ) ([]Chunk , error )
63
- }
64
-
65
52
// StoreConfig specifies config for a ChunkStore
66
53
type StoreConfig struct {
67
54
SchemaConfig
68
55
CacheConfig
69
- S3 S3ClientValue
70
- DynamoDB DynamoDBClientValue
56
+ S3 util.URLValue
57
+ DynamoDB util.URLValue
58
+
59
+ mockS3 S3Client
60
+ mockBucketName string
61
+ mockDynamoDB StorageClient
62
+ mockTableName string
71
63
72
64
// For injecting different schemas in tests.
73
65
schemaFactory func (cfg SchemaConfig ) Schema
@@ -82,17 +74,39 @@ func (cfg *StoreConfig) RegisterFlags(f *flag.FlagSet) {
82
74
f .Var (& cfg .DynamoDB , "dynamodb.url" , "DynamoDB endpoint URL." )
83
75
}
84
76
85
- // AWSStore implements ChunkStore for AWS
86
- type AWSStore struct {
87
- cfg StoreConfig
88
- cache * Cache
89
- dynamo * dynamoDBBackoffClient
90
- schema Schema
77
+ // Store implements Store
78
+ type Store struct {
79
+ cfg StoreConfig
80
+
81
+ storage StorageClient
82
+ tableName string
83
+ s3 S3Client
84
+ bucketName string
85
+ cache * Cache
86
+ schema Schema
91
87
}
92
88
93
- // NewAWSStore makes a new ChunkStore
94
- func NewAWSStore (cfg StoreConfig ) (* AWSStore , error ) {
95
- cfg .SchemaConfig .OriginalTableName = cfg .DynamoDB .TableName
89
+ // NewStore makes a new ChunkStore
90
+ func NewStore (cfg StoreConfig ) (* Store , error ) {
91
+ dynamoDBClient , tableName := cfg .mockDynamoDB , cfg .mockTableName
92
+ if dynamoDBClient == nil {
93
+ var err error
94
+ dynamoDBClient , tableName , err = NewDynamoDBClient (cfg .DynamoDB .String ())
95
+ if err != nil {
96
+ return nil , err
97
+ }
98
+ }
99
+
100
+ s3Client , bucketName := cfg .mockS3 , cfg .mockBucketName
101
+ if s3Client == nil {
102
+ var err error
103
+ s3Client , bucketName , err = NewS3Client (cfg .S3 .String ())
104
+ if err != nil {
105
+ return nil , err
106
+ }
107
+ }
108
+
109
+ cfg .SchemaConfig .OriginalTableName = tableName
96
110
var schema Schema
97
111
var err error
98
112
if cfg .schemaFactory == nil {
@@ -104,11 +118,14 @@ func NewAWSStore(cfg StoreConfig) (*AWSStore, error) {
104
118
return nil , err
105
119
}
106
120
107
- return & AWSStore {
108
- cfg : cfg ,
109
- schema : schema ,
110
- cache : NewCache (cfg .CacheConfig ),
111
- dynamo : newDynamoDBBackoffClient (cfg .DynamoDB ),
121
+ return & Store {
122
+ cfg : cfg ,
123
+ storage : dynamoDBClient ,
124
+ tableName : tableName ,
125
+ s3 : s3Client ,
126
+ bucketName : bucketName ,
127
+ schema : schema ,
128
+ cache : NewCache (cfg .CacheConfig ),
112
129
}, nil
113
130
}
114
131
@@ -117,7 +134,7 @@ func chunkName(userID, chunkID string) string {
117
134
}
118
135
119
136
// Put implements ChunkStore
120
- func (c * AWSStore ) Put (ctx context.Context , chunks []Chunk ) error {
137
+ func (c * Store ) Put (ctx context.Context , chunks []Chunk ) error {
121
138
userID , err := user .GetID (ctx )
122
139
if err != nil {
123
140
return err
@@ -132,7 +149,7 @@ func (c *AWSStore) Put(ctx context.Context, chunks []Chunk) error {
132
149
}
133
150
134
151
// putChunks writes a collection of chunks to S3 in parallel.
135
- func (c * AWSStore ) putChunks (ctx context.Context , userID string , chunks []Chunk ) error {
152
+ func (c * Store ) putChunks (ctx context.Context , userID string , chunks []Chunk ) error {
136
153
incomingErrors := make (chan error )
137
154
for _ , chunk := range chunks {
138
155
go func (chunk Chunk ) {
@@ -151,17 +168,17 @@ func (c *AWSStore) putChunks(ctx context.Context, userID string, chunks []Chunk)
151
168
}
152
169
153
170
// putChunk puts a chunk into S3.
154
- func (c * AWSStore ) putChunk (ctx context.Context , userID string , chunk * Chunk ) error {
171
+ func (c * Store ) putChunk (ctx context.Context , userID string , chunk * Chunk ) error {
155
172
body , err := chunk .reader ()
156
173
if err != nil {
157
174
return err
158
175
}
159
176
160
177
err = instrument .TimeRequestHistogram (ctx , "S3.PutObject" , s3RequestDuration , func (_ context.Context ) error {
161
178
var err error
162
- _ , err = c .cfg . S3 .PutObject (& s3.PutObjectInput {
179
+ _ , err = c .s3 .PutObject (& s3.PutObjectInput {
163
180
Body : body ,
164
- Bucket : aws .String (c .cfg . S3 . BucketName ),
181
+ Bucket : aws .String (c .bucketName ),
165
182
Key : aws .String (chunkName (userID , chunk .ID )),
166
183
})
167
184
return err
@@ -176,19 +193,19 @@ func (c *AWSStore) putChunk(ctx context.Context, userID string, chunk *Chunk) er
176
193
return nil
177
194
}
178
195
179
- func (c * AWSStore ) updateIndex (ctx context.Context , userID string , chunks []Chunk ) error {
196
+ func (c * Store ) updateIndex (ctx context.Context , userID string , chunks []Chunk ) error {
180
197
writeReqs , err := c .calculateDynamoWrites (userID , chunks )
181
198
if err != nil {
182
199
return err
183
200
}
184
201
185
- return c .dynamo . batchWriteDynamo (ctx , writeReqs )
202
+ return c .storage . BatchWrite (ctx , writeReqs )
186
203
}
187
204
188
- // calculateDynamoWrites creates a set of WriteRequests to dynamo for all `the
189
- // chunks it is given.
190
- func (c * AWSStore ) calculateDynamoWrites (userID string , chunks []Chunk ) (map [ string ][] * dynamodb. WriteRequest , error ) {
191
- writeReqs := map [ string ][] * dynamodb. WriteRequest {}
205
+ // calculateDynamoWrites creates a set of batched WriteRequests to dynamo for all
206
+ // the chunks it is given.
207
+ func (c * Store ) calculateDynamoWrites (userID string , chunks []Chunk ) (WriteBatch , error ) {
208
+ writeReqs := c . storage . NewWriteBatch ()
192
209
for _ , chunk := range chunks {
193
210
metricName , err := util .ExtractMetricNameFromMetric (chunk .Metric )
194
211
if err != nil {
@@ -203,31 +220,23 @@ func (c *AWSStore) calculateDynamoWrites(userID string, chunks []Chunk) (map[str
203
220
204
221
for _ , entry := range entries {
205
222
rowWrites .Observe (entry .HashKey , 1 )
206
-
207
- writeReqs [entry .TableName ] = append (writeReqs [entry .TableName ], & dynamodb.WriteRequest {
208
- PutRequest : & dynamodb.PutRequest {
209
- Item : map [string ]* dynamodb.AttributeValue {
210
- hashKey : {S : aws .String (entry .HashKey )},
211
- rangeKey : {B : entry .RangeKey },
212
- },
213
- },
214
- })
223
+ writeReqs .Add (entry .TableName , entry .HashKey , entry .RangeKey )
215
224
}
216
225
}
217
226
return writeReqs , nil
218
227
}
219
228
220
229
// Get implements ChunkStore
221
- func (c * AWSStore ) Get (ctx context.Context , from , through model.Time , allMatchers ... * metric.LabelMatcher ) ([]Chunk , error ) {
230
+ func (c * Store ) Get (ctx context.Context , from , through model.Time , allMatchers ... * metric.LabelMatcher ) ([]Chunk , error ) {
222
231
userID , err := user .GetID (ctx )
223
232
if err != nil {
224
233
return nil , err
225
234
}
226
235
227
236
filters , matchers := util .SplitFiltersAndMatchers (allMatchers )
228
237
229
- // Fetch chunk descriptors (just ID really) from DynamoDB
230
- chunks , err := c .lookupChunks (ctx , userID , from , through , matchers )
238
+ // Fetch chunk descriptors (just ID really) from storage
239
+ chunks , err := c .lookupMatchers (ctx , userID , from , through , matchers )
231
240
if err != nil {
232
241
return nil , err
233
242
}
@@ -281,7 +290,7 @@ outer:
281
290
return filteredChunks , nil
282
291
}
283
292
284
- func (c * AWSStore ) lookupChunks (ctx context.Context , userID string , from , through model.Time , matchers []* metric.LabelMatcher ) ([]Chunk , error ) {
293
+ func (c * Store ) lookupMatchers (ctx context.Context , userID string , from , through model.Time , matchers []* metric.LabelMatcher ) ([]Chunk , error ) {
285
294
metricName , matchers , err := util .ExtractMetricNameFromMatchers (matchers )
286
295
if err != nil {
287
296
return nil , err
@@ -333,7 +342,7 @@ func (c *AWSStore) lookupChunks(ctx context.Context, userID string, from, throug
333
342
return nWayIntersect (chunkSets ), lastErr
334
343
}
335
344
336
- func (c * AWSStore ) lookupEntries (ctx context.Context , entries []IndexEntry , matcher * metric.LabelMatcher ) (ByID , error ) {
345
+ func (c * Store ) lookupEntries (ctx context.Context , entries []IndexEntry , matcher * metric.LabelMatcher ) (ByID , error ) {
337
346
incomingChunkSets := make (chan ByID )
338
347
incomingErrors := make (chan error )
339
348
for _ , entry := range entries {
@@ -361,50 +370,29 @@ func (c *AWSStore) lookupEntries(ctx context.Context, entries []IndexEntry, matc
361
370
return chunks , lastErr
362
371
}
363
372
364
- func (c * AWSStore ) lookupEntry (ctx context.Context , entry IndexEntry , matcher * metric.LabelMatcher ) (ByID , error ) {
365
- input := & dynamodb.QueryInput {
366
- TableName : aws .String (entry .TableName ),
367
- KeyConditions : map [string ]* dynamodb.Condition {
368
- hashKey : {
369
- AttributeValueList : []* dynamodb.AttributeValue {
370
- {S : aws .String (entry .HashKey )},
371
- },
372
- ComparisonOperator : aws .String ("EQ" ),
373
- },
374
- },
375
- ReturnConsumedCapacity : aws .String (dynamodb .ReturnConsumedCapacityTotal ),
376
- }
377
- if len (entry .RangeKey ) > 0 {
378
- input .KeyConditions [rangeKey ] = & dynamodb.Condition {
379
- AttributeValueList : []* dynamodb.AttributeValue {
380
- {B : entry .RangeKey },
381
- },
382
- ComparisonOperator : aws .String (dynamodb .ComparisonOperatorBeginsWith ),
383
- }
384
- }
385
-
373
+ func (c * Store ) lookupEntry (ctx context.Context , entry IndexEntry , matcher * metric.LabelMatcher ) (ByID , error ) {
386
374
var chunkSet ByID
387
375
var processingError error
388
- if err := c .dynamo . queryPages (ctx , input , func (resp interface {} , lastPage bool ) (shouldContinue bool ) {
389
- processingError = processResponse (resp .( * dynamodb. QueryOutput ) , & chunkSet , matcher )
376
+ if err := c .storage . QueryPages (ctx , entry . TableName , entry . HashKey , entry . RangeKey , func (resp ReadBatch , lastPage bool ) (shouldContinue bool ) {
377
+ processingError = processResponse (resp , & chunkSet , matcher )
390
378
return processingError != nil && ! lastPage
391
379
}); err != nil {
392
- log .Errorf ("Error querying DynamoDB : %v" , err )
380
+ log .Errorf ("Error querying storage : %v" , err )
393
381
return nil , err
394
382
} else if processingError != nil {
395
- log .Errorf ("Error processing DynamoDB response: %v" , processingError )
383
+ log .Errorf ("Error processing storage response: %v" , processingError )
396
384
return nil , processingError
397
385
}
398
386
sort .Sort (ByID (chunkSet ))
399
387
chunkSet = unique (chunkSet )
400
388
return chunkSet , nil
401
389
}
402
390
403
- func processResponse (resp * dynamodb. QueryOutput , chunkSet * ByID , matcher * metric.LabelMatcher ) error {
404
- for _ , item := range resp .Items {
405
- rangeValue := item [ rangeKey ]. B
391
+ func processResponse (resp ReadBatch , chunkSet * ByID , matcher * metric.LabelMatcher ) error {
392
+ for i := 0 ; i < resp .Len (); i ++ {
393
+ rangeValue := resp . RangeValue ( i )
406
394
if rangeValue == nil {
407
- return fmt .Errorf ("invalid item: %v " , item )
395
+ return fmt .Errorf ("invalid item: %d " , i )
408
396
}
409
397
_ , value , chunkID , err := parseRangeValue (rangeValue )
410
398
if err != nil {
@@ -415,8 +403,8 @@ func processResponse(resp *dynamodb.QueryOutput, chunkSet *ByID, matcher *metric
415
403
ID : chunkID ,
416
404
}
417
405
418
- if chunkValue , ok := item [ chunkKey ]; ok && chunkValue . B != nil {
419
- if err := json .Unmarshal (chunkValue . B , & chunk ); err != nil {
406
+ if value := resp . Value ( i ); value != nil {
407
+ if err := json .Unmarshal (value , & chunk ); err != nil {
420
408
return err
421
409
}
422
410
chunk .metadataInIndex = true
@@ -431,16 +419,16 @@ func processResponse(resp *dynamodb.QueryOutput, chunkSet *ByID, matcher *metric
431
419
return nil
432
420
}
433
421
434
- func (c * AWSStore ) fetchChunkData (ctx context.Context , userID string , chunkSet []Chunk ) ([]Chunk , error ) {
422
+ func (c * Store ) fetchChunkData (ctx context.Context , userID string , chunkSet []Chunk ) ([]Chunk , error ) {
435
423
incomingChunks := make (chan Chunk )
436
424
incomingErrors := make (chan error )
437
425
for _ , chunk := range chunkSet {
438
426
go func (chunk Chunk ) {
439
427
var resp * s3.GetObjectOutput
440
428
err := instrument .TimeRequestHistogram (ctx , "S3.GetObject" , s3RequestDuration , func (_ context.Context ) error {
441
429
var err error
442
- resp , err = c .cfg . S3 .GetObject (& s3.GetObjectInput {
443
- Bucket : aws .String (c .cfg . S3 . BucketName ),
430
+ resp , err = c .s3 .GetObject (& s3.GetObjectInput {
431
+ Bucket : aws .String (c .bucketName ),
444
432
Key : aws .String (chunkName (userID , chunk .ID )),
445
433
})
446
434
return err
0 commit comments