Skip to content

Commit 851b164

Browse files
authored
Implement blocksconvert scanner for DynamoDB v9 schema (#3828)
* Implement DynamoDB blocksconvert (v9 schema only) Remember which series we have processed, so we only emit entries to the plan once for each series. Signed-off-by: Bryan Boreham <bjboreham@gmail.com> Includes: * Move IndexEntryProcessor to chunk package, so it can be shared across other packages * Pre-check if user is allowed, and make use of map of ignored users * Move IndexReader type beside IndexEntryProcessor * Stop returning unexported type
1 parent d64bbc9 commit 851b164

File tree

7 files changed

+290
-20
lines changed

7 files changed

+290
-20
lines changed

CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,11 @@
2020
* [ENHANCEMENT] Distributor: Added distributors ring status section in the admin page. #4151
2121
* [BUGFIX] Purger: fix `Invalid null value in condition for column range` caused by `nil` value in range for WriteBatch query. #4128
2222

23+
## Blocksconvert
24+
25+
* [ENHANCEMENT] Scanner: add support for DynamoDB (v9 schema only). #3828
26+
27+
2328
## 1.9.0 in progress
2429

2530
* [CHANGE] Fix for CVE-2021-31232: Local file disclosure vulnerability when `-experimental.alertmanager.enable-api` is used. The HTTP basic auth `password_file` can be used as an attack vector to send any file content via a webhook. The alertmanager templates can be used as an attack vector to send any file content because the alertmanager can load any text file specified in the templates list. #4129

docs/blocks-storage/convert-stored-chunks-to-blocks.md

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -35,26 +35,28 @@ Scanner is started by running `blocksconvert -target=scanner`. Scanner requires
3535

3636
- `-schema-config-file` – this is standard Cortex schema file.
3737
- `-bigtable.instance`, `-bigtable.project` – options for BigTable access.
38+
- `-dynamodb.url` - for DynamoDB access. Example `dynamodb://us-east-1/`
3839
- `-blocks-storage.backend` and corresponding `-blocks-storage.*` options for storing plan files.
3940
- `-scanner.output-dir` – specifies local directory for writing plan files to. Finished plan files are deleted after upload to the bucket. List of scanned tables is also kept in this directory, to avoid scanning the same tables multiple times when Scanner is restarted.
4041
- `-scanner.allowed-users` – comma-separated list of Cortex tenants that should have plans generated. If empty, plans for all found users are generated.
4142
- `-scanner.ignore-users-regex` - If plans for all users are generated (`-scanner.allowed-users` is not set), then users matching this non-empty regular expression will be skipped.
4243
- `-scanner.tables-limit` – How many tables should be scanned? By default all tables are scanned, but when testing scanner it may be useful to start with small number of tables first.
4344
- `-scanner.tables` – Comma-separated list of tables to be scanned. Can be used to scan specific tables only. Note that schema is still used to find all tables first, and then this list is consulted to select only specified tables.
45+
- `-scanner.scan-period-start` & `-scanner.scan-period-end` - limit the scan to a particular date range (format like `2020-12-31`)
4446

4547
Scanner will read the Cortex schema file to discover Index tables, and then it will start scanning them from most-recent table first, going back.
4648
For each table, it will fully read the table and generate a plan for each user and day stored in the table.
4749
Plan files are then uploaded to the configured blocks-storage bucket (at the `-blocksconvert.bucket-prefix` location prefix), and local copies are deleted.
4850
After that, scanner continues with the next table until it scans them all or `-scanner.tables-limit` is reached.
4951

50-
Note that even though `blocksconvert` has options for configuring different Index store backends, **it only supports BigTable at the moment.**
52+
Note that even though `blocksconvert` has options for configuring different Index store backends, **it only supports BigTable and DynamoDB at the moment.**
5153

5254
It is expected that only single Scanner process is running.
5355
Scanner does the scanning of multiple table subranges concurrently.
5456

55-
Scanner exposes metrics with `cortex_blocksconvert_scanner_` prefix, eg. total number of scanned index entries of different type, number of open files (scanner doesn't close currently plan files until entire table has been scanned), scanned BigTable rows and parsed index entries.
57+
Scanner exposes metrics with `cortex_blocksconvert_scanner_` prefix, eg. total number of scanned index entries of different type, number of open files (scanner doesn't close currently plan files until entire table has been scanned), scanned rows and parsed index entries.
5658

57-
**Scanner only supports schema version v9, v10 and v11. Earlier schema versions are currently not supported.**
59+
**Scanner only supports schema version v9 on DynamoDB; v9, v10 and v11 on BigTable. Earlier schema versions are currently not supported.**
5860

5961
### Scheduler
6062

@@ -109,5 +111,5 @@ Cleaner should only be deployed if no other Builder is running. Running multiple
109111

110112
The `blocksconvert` toolset currently has the following limitations:
111113

112-
- Scanner supports only BigTable for chunks index backend, and cannot currently scan other databases.
113-
- Supports only chunks schema versions v9, v10 and v11
114+
- Scanner supports only BigTable and DynamoDB for chunks index backend, and cannot currently scan other databases.
115+
- Supports only chunks schema versions v9 for DynamoDB; v9, v10 and v11 for Bigtable.
Lines changed: 237 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,237 @@
1+
package aws
2+
3+
import (
4+
"context"
5+
"encoding/base64"
6+
"fmt"
7+
"strings"
8+
"sync"
9+
10+
"github.com/aws/aws-sdk-go/aws"
11+
"github.com/aws/aws-sdk-go/aws/client"
12+
"github.com/aws/aws-sdk-go/aws/request"
13+
"github.com/aws/aws-sdk-go/service/dynamodb"
14+
gklog "github.com/go-kit/kit/log"
15+
"github.com/go-kit/kit/log/level"
16+
"github.com/pkg/errors"
17+
"github.com/prometheus/client_golang/prometheus"
18+
"golang.org/x/sync/errgroup"
19+
20+
"github.com/cortexproject/cortex/pkg/chunk"
21+
)
22+
23+
type dynamodbIndexReader struct {
24+
dynamoDBStorageClient
25+
26+
log gklog.Logger
27+
maxRetries int
28+
29+
rowsRead prometheus.Counter
30+
}
31+
32+
// NewDynamoDBIndexReader returns an object that can scan an entire index table
33+
func NewDynamoDBIndexReader(cfg DynamoDBConfig, schemaCfg chunk.SchemaConfig, reg prometheus.Registerer, l gklog.Logger, rowsRead prometheus.Counter) (chunk.IndexReader, error) {
34+
client, err := newDynamoDBStorageClient(cfg, schemaCfg, reg)
35+
if err != nil {
36+
return nil, err
37+
}
38+
39+
return &dynamodbIndexReader{
40+
dynamoDBStorageClient: *client,
41+
maxRetries: cfg.BackoffConfig.MaxRetries,
42+
log: l,
43+
44+
rowsRead: rowsRead,
45+
}, nil
46+
}
47+
48+
func (r *dynamodbIndexReader) IndexTableNames(ctx context.Context) ([]string, error) {
49+
// fake up a table client - if we call NewDynamoDBTableClient() it will double-register metrics
50+
tableClient := dynamoTableClient{
51+
DynamoDB: r.DynamoDB,
52+
metrics: r.metrics,
53+
}
54+
return tableClient.ListTables(ctx)
55+
}
56+
57+
type seriesMap struct {
58+
mutex sync.Mutex // protect concurrent access to maps
59+
seriesProcessed map[string]sha256Set // map of userID/bucket to set showing which series have been processed
60+
}
61+
62+
// Since all sha256 values are the same size, a fixed-size array
63+
// is more space-efficient than string or byte slice
64+
type sha256 [32]byte
65+
66+
// an entry in this set indicates we have processed a series with that sha already
67+
type sha256Set struct {
68+
series map[sha256]struct{}
69+
}
70+
71+
// ReadIndexEntries reads the whole of a table on multiple goroutines in parallel.
72+
// Entries for the same HashValue and RangeValue should be passed to the same processor.
73+
func (r *dynamodbIndexReader) ReadIndexEntries(ctx context.Context, tableName string, processors []chunk.IndexEntryProcessor) error {
74+
projection := hashKey + "," + rangeKey
75+
76+
sm := &seriesMap{ // new map per table
77+
seriesProcessed: make(map[string]sha256Set),
78+
}
79+
80+
var readerGroup errgroup.Group
81+
// Start a goroutine for each processor
82+
for i, processor := range processors {
83+
segment, processor := i, processor // https://golang.org/doc/faq#closures_and_goroutines
84+
readerGroup.Go(func() error {
85+
input := &dynamodb.ScanInput{
86+
TableName: aws.String(tableName),
87+
ProjectionExpression: aws.String(projection),
88+
Segment: aws.Int64(int64(segment)),
89+
TotalSegments: aws.Int64(int64(len(processors))),
90+
ReturnConsumedCapacity: aws.String(dynamodb.ReturnConsumedCapacityTotal),
91+
}
92+
withRetrys := func(req *request.Request) {
93+
req.Retryer = client.DefaultRetryer{NumMaxRetries: r.maxRetries}
94+
}
95+
err := r.DynamoDB.ScanPagesWithContext(ctx, input, func(page *dynamodb.ScanOutput, lastPage bool) bool {
96+
if cc := page.ConsumedCapacity; cc != nil {
97+
r.metrics.dynamoConsumedCapacity.WithLabelValues("DynamoDB.ScanTable", *cc.TableName).
98+
Add(float64(*cc.CapacityUnits))
99+
}
100+
r.processPage(ctx, sm, processor, tableName, page)
101+
return true
102+
}, withRetrys)
103+
if err != nil {
104+
return err
105+
}
106+
processor.Flush()
107+
level.Info(r.log).Log("msg", "Segment finished", "segment", segment)
108+
return nil
109+
})
110+
}
111+
// Wait until all reader segments have finished
112+
outerErr := readerGroup.Wait()
113+
if outerErr != nil {
114+
return outerErr
115+
}
116+
return nil
117+
}
118+
119+
func (r *dynamodbIndexReader) processPage(ctx context.Context, sm *seriesMap, processor chunk.IndexEntryProcessor, tableName string, page *dynamodb.ScanOutput) {
120+
for _, item := range page.Items {
121+
r.rowsRead.Inc()
122+
rangeValue := item[rangeKey].B
123+
if !isSeriesIndexEntry(rangeValue) {
124+
continue
125+
}
126+
hashValue := aws.StringValue(item[hashKey].S)
127+
orgStr, day, seriesID, err := decodeHashValue(hashValue)
128+
if err != nil {
129+
level.Error(r.log).Log("msg", "Failed to decode hash value", "err", err)
130+
continue
131+
}
132+
if !processor.AcceptUser(orgStr) {
133+
continue
134+
}
135+
136+
bucketHashKey := orgStr + ":" + day // from v9Entries.GetChunkWriteEntries()
137+
138+
// Check whether we have already processed this series
139+
// via two-step lookup: first by tenant/day bucket, then by series
140+
var seriesSha256 sha256
141+
err = decodeBase64(seriesSha256[:], seriesID)
142+
if err != nil {
143+
level.Error(r.log).Log("msg", "Failed to decode series ID", "err", err)
144+
continue
145+
}
146+
sm.mutex.Lock()
147+
shaSet := sm.seriesProcessed[bucketHashKey]
148+
if shaSet.series == nil {
149+
shaSet.series = make(map[sha256]struct{})
150+
sm.seriesProcessed[bucketHashKey] = shaSet
151+
}
152+
if _, exists := shaSet.series[seriesSha256]; exists {
153+
sm.mutex.Unlock()
154+
continue
155+
}
156+
// mark it as 'seen already'
157+
shaSet.series[seriesSha256] = struct{}{}
158+
sm.mutex.Unlock()
159+
160+
err = r.queryChunkEntriesForSeries(ctx, processor, tableName, bucketHashKey+":"+seriesID)
161+
if err != nil {
162+
level.Error(r.log).Log("msg", "error while reading series", "err", err)
163+
return
164+
}
165+
}
166+
}
167+
168+
func decodeBase64(dst []byte, value string) error {
169+
n, err := base64.RawStdEncoding.Decode(dst, []byte(value))
170+
if err != nil {
171+
return errors.Wrap(err, "unable to decode sha256")
172+
}
173+
if n != len(dst) {
174+
return errors.Wrapf(err, "seriesID has unexpected length; raw value %q", value)
175+
}
176+
return nil
177+
}
178+
179+
func (r *dynamodbIndexReader) queryChunkEntriesForSeries(ctx context.Context, processor chunk.IndexEntryProcessor, tableName, queryHashKey string) error {
180+
// DynamoDB query which just says "all rows with hashKey X"
181+
// This is hard-coded for schema v9
182+
input := &dynamodb.QueryInput{
183+
TableName: aws.String(tableName),
184+
KeyConditions: map[string]*dynamodb.Condition{
185+
hashKey: {
186+
AttributeValueList: []*dynamodb.AttributeValue{
187+
{S: aws.String(queryHashKey)},
188+
},
189+
ComparisonOperator: aws.String(dynamodb.ComparisonOperatorEq),
190+
},
191+
},
192+
ReturnConsumedCapacity: aws.String(dynamodb.ReturnConsumedCapacityTotal),
193+
}
194+
withRetrys := func(req *request.Request) {
195+
req.Retryer = client.DefaultRetryer{NumMaxRetries: r.maxRetries}
196+
}
197+
var result error
198+
err := r.DynamoDB.QueryPagesWithContext(ctx, input, func(output *dynamodb.QueryOutput, _ bool) bool {
199+
if cc := output.ConsumedCapacity; cc != nil {
200+
r.metrics.dynamoConsumedCapacity.WithLabelValues("DynamoDB.QueryPages", *cc.TableName).
201+
Add(float64(*cc.CapacityUnits))
202+
}
203+
204+
for _, item := range output.Items {
205+
err := processor.ProcessIndexEntry(chunk.IndexEntry{
206+
TableName: tableName,
207+
HashValue: aws.StringValue(item[hashKey].S),
208+
RangeValue: item[rangeKey].B})
209+
if err != nil {
210+
result = errors.Wrap(err, "processor error")
211+
return false
212+
}
213+
}
214+
return true
215+
}, withRetrys)
216+
if err != nil {
217+
return errors.Wrap(err, "DynamoDB error")
218+
}
219+
return result
220+
}
221+
222+
func isSeriesIndexEntry(rangeValue []byte) bool {
223+
const chunkTimeRangeKeyV3 = '3' // copied from pkg/chunk/schema.go
224+
return len(rangeValue) > 2 && rangeValue[len(rangeValue)-2] == chunkTimeRangeKeyV3
225+
}
226+
227+
func decodeHashValue(hashValue string) (orgStr, day, seriesID string, err error) {
228+
hashParts := strings.SplitN(hashValue, ":", 3)
229+
if len(hashParts) != 3 {
230+
err = fmt.Errorf("unrecognized hash value: %q", hashValue)
231+
return
232+
}
233+
orgStr = hashParts[0]
234+
day = hashParts[1]
235+
seriesID = hashParts[2]
236+
return
237+
}

tools/blocksconvert/scanner/index_reader.go renamed to pkg/chunk/index_reader.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,15 @@
1-
package scanner
1+
package chunk
22

33
import (
44
"context"
5-
6-
"github.com/cortexproject/cortex/pkg/chunk"
75
)
86

9-
// Processor that receives index entries from the table.
7+
// IndexEntryProcessor receives index entries from a table.
108
type IndexEntryProcessor interface {
11-
ProcessIndexEntry(indexEntry chunk.IndexEntry) error
9+
ProcessIndexEntry(indexEntry IndexEntry) error
10+
11+
// Will this user be accepted by the processor?
12+
AcceptUser(user string) bool
1213

1314
// Called at the end of reading of index entries.
1415
Flush() error

tools/blocksconvert/scanner/bigtable_index_reader.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ func (r *bigtableIndexReader) IndexTableNames(ctx context.Context) ([]string, er
6666
//
6767
// Index entries are returned in HashValue, RangeValue order.
6868
// Entries for the same HashValue and RangeValue are passed to the same processor.
69-
func (r *bigtableIndexReader) ReadIndexEntries(ctx context.Context, tableName string, processors []IndexEntryProcessor) error {
69+
func (r *bigtableIndexReader) ReadIndexEntries(ctx context.Context, tableName string, processors []chunk.IndexEntryProcessor) error {
7070
client, err := bigtable.NewClient(ctx, r.project, r.instance)
7171
if err != nil {
7272
return errors.Wrap(err, "create bigtable client failed")

tools/blocksconvert/scanner/scanner.go

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"golang.org/x/sync/errgroup"
2525

2626
"github.com/cortexproject/cortex/pkg/chunk"
27+
"github.com/cortexproject/cortex/pkg/chunk/aws"
2728
"github.com/cortexproject/cortex/pkg/chunk/storage"
2829
"github.com/cortexproject/cortex/pkg/util/flagext"
2930
"github.com/cortexproject/cortex/pkg/util/services"
@@ -202,7 +203,7 @@ func (s *Scanner) running(ctx context.Context) error {
202203
continue
203204
}
204205

205-
var reader IndexReader
206+
var reader chunk.IndexReader
206207
switch c.IndexType {
207208
case "gcp", "gcp-columnkey", "bigtable", "bigtable-hashed":
208209
bigTable := s.storageCfg.GCPStorageConfig
@@ -213,6 +214,19 @@ func (s *Scanner) running(ctx context.Context) error {
213214
}
214215

215216
reader = newBigtableIndexReader(bigTable.Project, bigTable.Instance, s.logger, s.indexReaderRowsRead, s.indexReaderParsedIndexEntries, s.currentTableRanges, s.currentTableScannedRanges)
217+
case "aws-dynamo":
218+
cfg := s.storageCfg.AWSStorageConfig
219+
220+
if cfg.DynamoDB.URL == nil {
221+
level.Error(s.logger).Log("msg", "cannot scan DynamoDB, missing configuration", "schemaFrom", c.From.String())
222+
continue
223+
}
224+
225+
var err error
226+
reader, err = aws.NewDynamoDBIndexReader(cfg.DynamoDBConfig, s.schema, s.reg, s.logger, s.indexReaderRowsRead)
227+
if err != nil {
228+
level.Error(s.logger).Log("msg", "cannot scan DynamoDB", "err", err)
229+
}
216230
default:
217231
level.Warn(s.logger).Log("msg", "unsupported index type", "type", c.IndexType, "schemaFrom", c.From.String())
218232
continue
@@ -297,12 +311,12 @@ func (s *Scanner) running(ctx context.Context) error {
297311

298312
type tableToProcess struct {
299313
table string
300-
reader IndexReader
314+
reader chunk.IndexReader
301315
start time.Time
302316
end time.Time // Will not be set for non-periodic tables. Exclusive.
303317
}
304318

305-
func (s *Scanner) findTablesToProcess(ctx context.Context, indexReader IndexReader, fromUnixTimestamp, toUnixTimestamp int64, tablesConfig chunk.PeriodicTableConfig) ([]tableToProcess, error) {
319+
func (s *Scanner) findTablesToProcess(ctx context.Context, indexReader chunk.IndexReader, fromUnixTimestamp, toUnixTimestamp int64, tablesConfig chunk.PeriodicTableConfig) ([]tableToProcess, error) {
306320
tables, err := indexReader.IndexTableNames(ctx)
307321
if err != nil {
308322
return nil, err
@@ -346,7 +360,7 @@ func (s *Scanner) findTablesToProcess(ctx context.Context, indexReader IndexRead
346360
return result, nil
347361
}
348362

349-
func (s *Scanner) processTable(ctx context.Context, table string, indexReader IndexReader) error {
363+
func (s *Scanner) processTable(ctx context.Context, table string, indexReader chunk.IndexReader) error {
350364
tableLog := log.With(s.logger, "table", table)
351365

352366
tableProcessedFile := filepath.Join(s.cfg.OutputDirectory, table+".processed")
@@ -469,7 +483,7 @@ func shouldSkipOperationBecauseFileExists(file string) bool {
469483

470484
func scanSingleTable(
471485
ctx context.Context,
472-
indexReader IndexReader,
486+
indexReader chunk.IndexReader,
473487
tableName string,
474488
outDir string,
475489
concurrency int,
@@ -497,7 +511,7 @@ func scanSingleTable(
497511
})
498512
}
499513

500-
var ps []IndexEntryProcessor
514+
var ps []chunk.IndexEntryProcessor
501515

502516
for i := 0; i < concurrency; i++ {
503517
ps = append(ps, newProcessor(outDir, result, allowed, ignored, series, indexEntries, ignoredEntries))

0 commit comments

Comments
 (0)