Skip to content

Commit e21c310

Browse files
committed
Add dynamodb multikey kv
Signed-off-by: Daniel Deluiggi <ddeluigg@amazon.com>
1 parent e887a2b commit e21c310

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

49 files changed

+30068
-57
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
* [FEATURE] Ingester: Enable snapshotting of In-memory TSDB on disk during shutdown via `-blocks-storage.tsdb.memory-snapshot-on-shutdown`. #5011
1717
* [FEATURE] Query Frontend/Scheduler: Add a new counter metric `cortex_request_queue_requests_total` for total requests going to queue. #5030
1818
* [FEATURE] Build ARM docker images. #5041
19+
* [FEATURE] Ring: Add new kv store option `dynamodb`. #5026
1920
* [BUGFIX] Updated `golang.org/x/net` dependency to fix CVE-2022-27664. #5008
2021
* [BUGFIX] Fix panic when otel and xray tracing is enabled. #5044
2122
* [BUGFIX] Fixed no compact block got grouped in shuffle sharding grouper. #5055

docs/blocks-storage/compactor.md

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,19 @@ compactor:
204204
# CLI flag: -compactor.ring.prefix
205205
[prefix: <string> | default = "collectors/"]
206206

207+
dynamodb:
208+
# Region to access dynamodb.
209+
# CLI flag: -compactor.ring.dynamodb.region
210+
[region: <string> | default = ""]
211+
212+
# Table name to use on dynamodb.
213+
# CLI flag: -compactor.ring.dynamodb.table-name
214+
[table_name: <string> | default = ""]
215+
216+
# Time to expire items on dynamodb.
217+
# CLI flag: -compactor.ring.dynamodb.ttl-time
218+
[ttl: <duration> | default = 0s]
219+
207220
# The consul_config configures the consul client.
208221
# The CLI flags prefix for this block config is: compactor.ring
209222
[consul: <consul_config>]

docs/blocks-storage/store-gateway.md

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,19 @@ store_gateway:
205205
# CLI flag: -store-gateway.sharding-ring.prefix
206206
[prefix: <string> | default = "collectors/"]
207207

208+
dynamodb:
209+
# Region to access dynamodb.
210+
# CLI flag: -store-gateway.sharding-ring.dynamodb.region
211+
[region: <string> | default = ""]
212+
213+
# Table name to use on dynamodb.
214+
# CLI flag: -store-gateway.sharding-ring.dynamodb.table-name
215+
[table_name: <string> | default = ""]
216+
217+
# Time to expire items on dynamodb.
218+
# CLI flag: -store-gateway.sharding-ring.dynamodb.ttl-time
219+
[ttl: <duration> | default = 0s]
220+
208221
# The consul_config configures the consul client.
209222
# The CLI flags prefix for this block config is:
210223
# store-gateway.sharding-ring

docs/configuration/config-file-reference.md

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -483,6 +483,19 @@ ha_tracker:
483483
# CLI flag: -distributor.ha-tracker.prefix
484484
[prefix: <string> | default = "ha-tracker/"]
485485
486+
dynamodb:
487+
# Region to access dynamodb.
488+
# CLI flag: -distributor.ha-tracker.dynamodb.region
489+
[region: <string> | default = ""]
490+
491+
# Table name to use on dynamodb.
492+
# CLI flag: -distributor.ha-tracker.dynamodb.table-name
493+
[table_name: <string> | default = ""]
494+
495+
# Time to expire items on dynamodb.
496+
# CLI flag: -distributor.ha-tracker.dynamodb.ttl-time
497+
[ttl: <duration> | default = 0s]
498+
486499
# The consul_config configures the consul client.
487500
# The CLI flags prefix for this block config is: distributor.ha-tracker
488501
[consul: <consul_config>]
@@ -547,6 +560,19 @@ ring:
547560
# CLI flag: -distributor.ring.prefix
548561
[prefix: <string> | default = "collectors/"]
549562
563+
dynamodb:
564+
# Region to access dynamodb.
565+
# CLI flag: -distributor.ring.dynamodb.region
566+
[region: <string> | default = ""]
567+
568+
# Table name to use on dynamodb.
569+
# CLI flag: -distributor.ring.dynamodb.table-name
570+
[table_name: <string> | default = ""]
571+
572+
# Time to expire items on dynamodb.
573+
# CLI flag: -distributor.ring.dynamodb.ttl-time
574+
[ttl: <duration> | default = 0s]
575+
550576
# The consul_config configures the consul client.
551577
# The CLI flags prefix for this block config is: distributor.ring
552578
[consul: <consul_config>]
@@ -617,6 +643,19 @@ lifecycler:
617643
# CLI flag: -ring.prefix
618644
[prefix: <string> | default = "collectors/"]
619645
646+
dynamodb:
647+
# Region to access dynamodb.
648+
# CLI flag: -dynamodb.region
649+
[region: <string> | default = ""]
650+
651+
# Table name to use on dynamodb.
652+
# CLI flag: -dynamodb.table-name
653+
[table_name: <string> | default = ""]
654+
655+
# Time to expire items on dynamodb.
656+
# CLI flag: -dynamodb.ttl-time
657+
[ttl: <duration> | default = 0s]
658+
620659
# The consul_config configures the consul client.
621660
[consul: <consul_config>]
622661
@@ -1283,6 +1322,19 @@ ring:
12831322
# CLI flag: -ruler.ring.prefix
12841323
[prefix: <string> | default = "rulers/"]
12851324
1325+
dynamodb:
1326+
# Region to access dynamodb.
1327+
# CLI flag: -ruler.ring.dynamodb.region
1328+
[region: <string> | default = ""]
1329+
1330+
# Table name to use on dynamodb.
1331+
# CLI flag: -ruler.ring.dynamodb.table-name
1332+
[table_name: <string> | default = ""]
1333+
1334+
# Time to expire items on dynamodb.
1335+
# CLI flag: -ruler.ring.dynamodb.ttl-time
1336+
[ttl: <duration> | default = 0s]
1337+
12861338
# The consul_config configures the consul client.
12871339
# The CLI flags prefix for this block config is: ruler.ring
12881340
[consul: <consul_config>]
@@ -1665,6 +1717,19 @@ sharding_ring:
16651717
# CLI flag: -alertmanager.sharding-ring.prefix
16661718
[prefix: <string> | default = "alertmanagers/"]
16671719
1720+
dynamodb:
1721+
# Region to access dynamodb.
1722+
# CLI flag: -alertmanager.sharding-ring.dynamodb.region
1723+
[region: <string> | default = ""]
1724+
1725+
# Table name to use on dynamodb.
1726+
# CLI flag: -alertmanager.sharding-ring.dynamodb.table-name
1727+
[table_name: <string> | default = ""]
1728+
1729+
# Time to expire items on dynamodb.
1730+
# CLI flag: -alertmanager.sharding-ring.dynamodb.ttl-time
1731+
[ttl: <duration> | default = 0s]
1732+
16681733
# The consul_config configures the consul client.
16691734
# The CLI flags prefix for this block config is: alertmanager.sharding-ring
16701735
[consul: <consul_config>]
@@ -3852,6 +3917,19 @@ sharding_ring:
38523917
# CLI flag: -compactor.ring.prefix
38533918
[prefix: <string> | default = "collectors/"]
38543919
3920+
dynamodb:
3921+
# Region to access dynamodb.
3922+
# CLI flag: -compactor.ring.dynamodb.region
3923+
[region: <string> | default = ""]
3924+
3925+
# Table name to use on dynamodb.
3926+
# CLI flag: -compactor.ring.dynamodb.table-name
3927+
[table_name: <string> | default = ""]
3928+
3929+
# Time to expire items on dynamodb.
3930+
# CLI flag: -compactor.ring.dynamodb.ttl-time
3931+
[ttl: <duration> | default = 0s]
3932+
38553933
# The consul_config configures the consul client.
38563934
# The CLI flags prefix for this block config is: compactor.ring
38573935
[consul: <consul_config>]
@@ -3939,6 +4017,19 @@ sharding_ring:
39394017
# CLI flag: -store-gateway.sharding-ring.prefix
39404018
[prefix: <string> | default = "collectors/"]
39414019
4020+
dynamodb:
4021+
# Region to access dynamodb.
4022+
# CLI flag: -store-gateway.sharding-ring.dynamodb.region
4023+
[region: <string> | default = ""]
4024+
4025+
# Table name to use on dynamodb.
4026+
# CLI flag: -store-gateway.sharding-ring.dynamodb.table-name
4027+
[table_name: <string> | default = ""]
4028+
4029+
# Time to expire items on dynamodb.
4030+
# CLI flag: -store-gateway.sharding-ring.dynamodb.ttl-time
4031+
[ttl: <duration> | default = 0s]
4032+
39424033
# The consul_config configures the consul client.
39434034
# The CLI flags prefix for this block config is: store-gateway.sharding-ring
39444035
[consul: <consul_config>]

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ require (
88
github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137
99
github.com/alicebob/miniredis/v2 v2.23.1
1010
github.com/armon/go-metrics v0.4.1
11+
github.com/aws/aws-sdk-go v1.44.109
1112
github.com/bradfitz/gomemcache v0.0.0-20190913173617-a41fca850d0b
1213
github.com/cespare/xxhash v1.1.0
1314
github.com/dustin/go-humanize v1.0.0
@@ -85,7 +86,6 @@ require (
8586
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 // indirect
8687
github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a // indirect
8788
github.com/asaskevich/govalidator v0.0.0-20210307081110-f21760c49a8d // indirect
88-
github.com/aws/aws-sdk-go v1.44.109 // indirect
8989
github.com/aws/aws-sdk-go-v2 v1.16.0 // indirect
9090
github.com/aws/aws-sdk-go-v2/config v1.15.1 // indirect
9191
github.com/aws/aws-sdk-go-v2/credentials v1.11.0 // indirect

integration/kv_test.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,14 @@ func (c stringCodec) Decode(bb []byte) (interface{}, error) {
229229
func (c stringCodec) Encode(v interface{}) ([]byte, error) { return []byte(v.(string)), nil }
230230
func (c stringCodec) CodecID() string { return "stringCodec" }
231231

232+
func (stringCodec) EncodeMultiKey(msg interface{}) (map[string][]byte, error) {
233+
return nil, errors.New("String codec does not support EncodeMultiKey")
234+
}
235+
236+
func (stringCodec) DecodeMultiKey(map[string][]byte) (interface{}, error) {
237+
return nil, errors.New("String codec does not support DecodeMultiKey")
238+
}
239+
232240
type watcher struct {
233241
values map[string][]interface{}
234242
}

pkg/ring/http.go

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ const pageContent = `
2424
<body>
2525
<h1>Ring Status</h1>
2626
<p>Current time: {{ .Now }}</p>
27+
<p>Storage updated: {{ .StorageLastUpdated }}</p>
2728
<form action="" method="POST">
2829
<input type="hidden" name="csrf_token" value="$__CSRF_TOKEN_PLACEHOLDER__">
2930
<table width="100%" border="1">
@@ -116,9 +117,10 @@ type ingesterDesc struct {
116117
}
117118

118119
type httpResponse struct {
119-
Ingesters []ingesterDesc `json:"shards"`
120-
Now time.Time `json:"now"`
121-
ShowTokens bool `json:"-"`
120+
Ingesters []ingesterDesc `json:"shards"`
121+
Now time.Time `json:"now"`
122+
StorageLastUpdated time.Time `json:"storageLastUpdated"`
123+
ShowTokens bool `json:"-"`
122124
}
123125

124126
func (r *Ring) ServeHTTP(w http.ResponseWriter, req *http.Request) {
@@ -149,14 +151,14 @@ func (r *Ring) ServeHTTP(w http.ResponseWriter, req *http.Request) {
149151
}
150152
sort.Strings(ingesterIDs)
151153

152-
now := time.Now()
154+
storageLastUpdate := r.KVClient.LastUpdateTime(r.key)
153155
var ingesters []ingesterDesc
154156
_, owned := r.countTokens()
155157
for _, id := range ingesterIDs {
156158
ing := r.ringDesc.Ingesters[id]
157159
heartbeatTimestamp := time.Unix(ing.Timestamp, 0)
158160
state := ing.State.String()
159-
if !r.IsHealthy(&ing, Reporting, now) {
161+
if !r.IsHealthy(&ing, Reporting, storageLastUpdate) {
160162
state = unhealthy
161163
}
162164

@@ -182,9 +184,10 @@ func (r *Ring) ServeHTTP(w http.ResponseWriter, req *http.Request) {
182184
tokensParam := req.URL.Query().Get("tokens")
183185

184186
renderHTTPResponse(w, httpResponse{
185-
Ingesters: ingesters,
186-
Now: now,
187-
ShowTokens: tokensParam == "true",
187+
Ingesters: ingesters,
188+
Now: time.Now(),
189+
StorageLastUpdated: storageLastUpdate,
190+
ShowTokens: tokensParam == "true",
188191
}, pageTemplate, req)
189192
}
190193

pkg/ring/kv/client.go

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,9 @@ import (
55
"flag"
66
"fmt"
77
"sync"
8+
"time"
9+
10+
"github.com/cortexproject/cortex/pkg/ring/kv/dynamodb"
811

912
"github.com/go-kit/log"
1013
"github.com/prometheus/client_golang/prometheus"
@@ -40,9 +43,10 @@ var inmemoryStore Client
4043
// Consul, Etcd, Memberlist or MultiClient. It was extracted from Config to keep
4144
// single-client config separate from final client-config (with all the wrappers)
4245
type StoreConfig struct {
43-
Consul consul.Config `yaml:"consul"`
44-
Etcd etcd.Config `yaml:"etcd"`
45-
Multi MultiConfig `yaml:"multi"`
46+
DynamoDB dynamodb.Config `yaml:"dynamodb"`
47+
Consul consul.Config `yaml:"consul"`
48+
Etcd etcd.Config `yaml:"etcd"`
49+
Multi MultiConfig `yaml:"multi"`
4650

4751
// Function that returns memberlist.KV store to use. By using a function, we can delay
4852
// initialization of memberlist.KV until it is actually required.
@@ -69,6 +73,7 @@ func (cfg *Config) RegisterFlagsWithPrefix(flagsPrefix, defaultPrefix string, f
6973
// This needs to be fixed in the future (1.0 release maybe?) when we normalize flags.
7074
// At the moment we have consul.<flag-name>, and ring.store, going forward it would
7175
// be easier to have everything under ring, so ring.consul.<flag-name>
76+
cfg.DynamoDB.RegisterFlags(f, flagsPrefix)
7277
cfg.Consul.RegisterFlags(f, flagsPrefix)
7378
cfg.Etcd.RegisterFlagsWithPrefix(f, flagsPrefix)
7479
cfg.Multi.RegisterFlagsWithPrefix(f, flagsPrefix)
@@ -111,6 +116,9 @@ type Client interface {
111116

112117
// WatchPrefix calls f whenever any value stored under prefix changes.
113118
WatchPrefix(ctx context.Context, prefix string, f func(string, interface{}) bool)
119+
120+
// LastUpdateTime returns the time a key was last sync by the kv store
121+
LastUpdateTime(key string) time.Time
114122
}
115123

116124
// NewClient creates a new Client (consul, etcd or inmemory) based on the config,
@@ -128,6 +136,9 @@ func createClient(backend string, prefix string, cfg StoreConfig, codec codec.Co
128136
var err error
129137

130138
switch backend {
139+
case "dynamodb":
140+
client, err = dynamodb.NewClient(cfg.DynamoDB, codec, logger, reg)
141+
131142
case "consul":
132143
client, err = consul.NewClient(cfg.Consul, codec, logger, reg)
133144

pkg/ring/kv/codec/clonable.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
package codec
2+
3+
type Clonable interface {
4+
// Clone should return a deep copy of the state.
5+
Clone() interface{}
6+
}

0 commit comments

Comments
 (0)