1
1
package chunk
2
2
3
3
import (
4
+ "fmt"
4
5
"math/rand"
5
6
"net/url"
6
7
"strings"
@@ -18,16 +19,15 @@ import (
18
19
19
20
const (
20
21
// For dynamodb errors
22
+ tableNameLabel = "table"
21
23
errorReasonLabel = "error"
22
24
otherError = "other"
23
25
24
26
// Backoff for dynamoDB requests, to match AWS lib - see:
25
27
// https://github.com/aws/aws-sdk-go/blob/master/service/dynamodb/customizations.go
26
28
minBackoff = 50 * time .Millisecond
27
29
maxBackoff = 50 * time .Second
28
-
29
- // Number of synchronous dynamodb requests
30
- numDynamoRequests = 25
30
+ maxRetries = 20
31
31
32
32
// See http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Limits.html.
33
33
dynamoMaxBatchSize = 25
54
54
Namespace : "cortex" ,
55
55
Name : "dynamo_failures_total" ,
56
56
Help : "The total number of errors while storing chunks to the chunk store." ,
57
- }, []string {errorReasonLabel })
57
+ }, []string {tableNameLabel , errorReasonLabel })
58
58
dynamoUnprocessedItems = prometheus .NewCounter (prometheus.CounterOpts {
59
59
Namespace : "cortex" ,
60
60
Name : "dynamo_unprocessed_items_total" ,
@@ -69,11 +69,11 @@ func init() {
69
69
prometheus .MustRegister (dynamoUnprocessedItems )
70
70
}
71
71
72
- func recordDynamoError (err error ) {
72
+ func recordDynamoError (tableName string , err error ) {
73
73
if awsErr , ok := err .(awserr.Error ); ok {
74
- dynamoFailures .WithLabelValues (awsErr .Code ()).Add (float64 (1 ))
74
+ dynamoFailures .WithLabelValues (tableName , awsErr .Code ()).Add (float64 (1 ))
75
75
} else {
76
- dynamoFailures .WithLabelValues (otherError ).Add (float64 (1 ))
76
+ dynamoFailures .WithLabelValues (tableName , otherError ).Add (float64 (1 ))
77
77
}
78
78
}
79
79
@@ -215,9 +215,17 @@ func (c *dynamoDBBackoffClient) batchWriteDynamo(ctx context.Context, reqs map[s
215
215
}
216
216
}
217
217
218
+ tableNames := func (reqs map [string ][]* dynamodb.WriteRequest ) []string {
219
+ result := []string {}
220
+ for tableName := range reqs {
221
+ result = append (result , tableName )
222
+ }
223
+ return result
224
+ }
225
+
218
226
outstanding , unprocessed := reqs , map [string ][]* dynamodb.WriteRequest {}
219
- backoff := minBackoff
220
- for dictLen (outstanding )+ dictLen (unprocessed ) > 0 {
227
+ backoff , numRetries := minBackoff , 0
228
+ for dictLen (outstanding )+ dictLen (unprocessed ) > 0 && numRetries < maxRetries {
221
229
reqs := map [string ][]* dynamodb.WriteRequest {}
222
230
fillReq (unprocessed , reqs )
223
231
fillReq (outstanding , reqs )
@@ -237,7 +245,9 @@ func (c *dynamoDBBackoffClient) batchWriteDynamo(ctx context.Context, reqs map[s
237
245
}
238
246
239
247
if err != nil {
240
- recordDynamoError (err )
248
+ for _ , tableName := range tableNames (reqs ) {
249
+ recordDynamoError (tableName , err )
250
+ }
241
251
}
242
252
243
253
// If there are unprocessed items, backoff and retry those items.
@@ -254,6 +264,7 @@ func (c *dynamoDBBackoffClient) batchWriteDynamo(ctx context.Context, reqs map[s
254
264
copyUnprocessed (reqs , unprocessed )
255
265
time .Sleep (backoff )
256
266
backoff = nextBackoff (backoff )
267
+ numRetries ++
257
268
continue
258
269
}
259
270
@@ -263,8 +274,12 @@ func (c *dynamoDBBackoffClient) batchWriteDynamo(ctx context.Context, reqs map[s
263
274
}
264
275
265
276
backoff = minBackoff
277
+ numRetries = 0
266
278
}
267
279
280
+ if valuesLeft := dictLen (outstanding ) + dictLen (unprocessed ); valuesLeft > 0 {
281
+ return fmt .Errorf ("failed to write chunk after %d retries, %d values remaining" , numRetries , valuesLeft )
282
+ }
268
283
return nil
269
284
}
270
285
@@ -283,7 +298,7 @@ func (c *dynamoDBBackoffClient) queryPages(ctx context.Context, input *dynamodb.
283
298
}
284
299
285
300
if err != nil {
286
- recordDynamoError (err )
301
+ recordDynamoError (* input . TableName , err )
287
302
288
303
if awsErr , ok := err .(awserr.Error ); ok && awsErr .Code () == provisionedThroughputExceededException {
289
304
time .Sleep (backoff )
0 commit comments