Skip to content

Add timeout for dynamodb ring kv #6544

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jan 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docs/blocks-storage/compactor.md
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,10 @@ compactor:
# CLI flag: -compactor.ring.dynamodb.max-cas-retries
[max_cas_retries: <int> | default = 10]

# Timeout of dynamoDbClient requests. Default is 2m.
# CLI flag: -compactor.ring.dynamodb.timeout
[timeout: <duration> | default = 2m]

# The consul_config configures the consul client.
# The CLI flags prefix for this block config is: compactor.ring
[consul: <consul_config>]
Expand Down
4 changes: 4 additions & 0 deletions docs/blocks-storage/store-gateway.md
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,10 @@ store_gateway:
# CLI flag: -store-gateway.sharding-ring.dynamodb.max-cas-retries
[max_cas_retries: <int> | default = 10]

# Timeout of dynamoDbClient requests. Default is 2m.
# CLI flag: -store-gateway.sharding-ring.dynamodb.timeout
[timeout: <duration> | default = 2m]

# The consul_config configures the consul client.
# The CLI flags prefix for this block config is:
# store-gateway.sharding-ring
Expand Down
28 changes: 28 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,10 @@ sharding_ring:
# CLI flag: -alertmanager.sharding-ring.dynamodb.max-cas-retries
[max_cas_retries: <int> | default = 10]

# Timeout of dynamoDbClient requests. Default is 2m.
# CLI flag: -alertmanager.sharding-ring.dynamodb.timeout
[timeout: <duration> | default = 2m]

# The consul_config configures the consul client.
# The CLI flags prefix for this block config is: alertmanager.sharding-ring
[consul: <consul_config>]
Expand Down Expand Up @@ -2286,6 +2290,10 @@ sharding_ring:
# CLI flag: -compactor.ring.dynamodb.max-cas-retries
[max_cas_retries: <int> | default = 10]

# Timeout of dynamoDbClient requests. Default is 2m.
# CLI flag: -compactor.ring.dynamodb.timeout
[timeout: <duration> | default = 2m]

# The consul_config configures the consul client.
# The CLI flags prefix for this block config is: compactor.ring
[consul: <consul_config>]
Expand Down Expand Up @@ -2595,6 +2603,10 @@ ha_tracker:
# CLI flag: -distributor.ha-tracker.dynamodb.max-cas-retries
[max_cas_retries: <int> | default = 10]

# Timeout of dynamoDbClient requests. Default is 2m.
# CLI flag: -distributor.ha-tracker.dynamodb.timeout
[timeout: <duration> | default = 2m]

# The consul_config configures the consul client.
# The CLI flags prefix for this block config is: distributor.ha-tracker
[consul: <consul_config>]
Expand Down Expand Up @@ -2689,6 +2701,10 @@ ring:
# CLI flag: -distributor.ring.dynamodb.max-cas-retries
[max_cas_retries: <int> | default = 10]

# Timeout of dynamoDbClient requests. Default is 2m.
# CLI flag: -distributor.ring.dynamodb.timeout
[timeout: <duration> | default = 2m]

# The consul_config configures the consul client.
# The CLI flags prefix for this block config is: distributor.ring
[consul: <consul_config>]
Expand Down Expand Up @@ -3017,6 +3033,10 @@ lifecycler:
# CLI flag: -dynamodb.max-cas-retries
[max_cas_retries: <int> | default = 10]

# Timeout of dynamoDbClient requests. Default is 2m.
# CLI flag: -dynamodb.timeout
[timeout: <duration> | default = 2m]

# The consul_config configures the consul client.
[consul: <consul_config>]

Expand Down Expand Up @@ -4666,6 +4686,10 @@ ring:
# CLI flag: -ruler.ring.dynamodb.max-cas-retries
[max_cas_retries: <int> | default = 10]

# Timeout of dynamoDbClient requests. Default is 2m.
# CLI flag: -ruler.ring.dynamodb.timeout
[timeout: <duration> | default = 2m]

# The consul_config configures the consul client.
# The CLI flags prefix for this block config is: ruler.ring
[consul: <consul_config>]
Expand Down Expand Up @@ -5657,6 +5681,10 @@ sharding_ring:
# CLI flag: -store-gateway.sharding-ring.dynamodb.max-cas-retries
[max_cas_retries: <int> | default = 10]

# Timeout of dynamoDbClient requests. Default is 2m.
# CLI flag: -store-gateway.sharding-ring.dynamodb.timeout
[timeout: <duration> | default = 2m]

# The consul_config configures the consul client.
# The CLI flags prefix for this block config is: store-gateway.sharding-ring
[consul: <consul_config>]
Expand Down
9 changes: 8 additions & 1 deletion pkg/ring/kv/dynamodb/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type Config struct {
TTL time.Duration `yaml:"ttl"`
PullerSyncTime time.Duration `yaml:"puller_sync_time"`
MaxCasRetries int `yaml:"max_cas_retries"`
Timeout time.Duration `yaml:"timeout"`
}

type Client struct {
Expand Down Expand Up @@ -53,6 +54,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet, prefix string) {
f.DurationVar(&cfg.TTL, prefix+"dynamodb.ttl-time", 0, "Time to expire items on dynamodb.")
f.DurationVar(&cfg.PullerSyncTime, prefix+"dynamodb.puller-sync-time", 60*time.Second, "Time to refresh local ring with information on dynamodb.")
f.IntVar(&cfg.MaxCasRetries, prefix+"dynamodb.max-cas-retries", maxCasRetries, "Maximum number of retries for DDB KV CAS.")
f.DurationVar(&cfg.Timeout, prefix+"dynamodb.timeout", 2*time.Minute, "Timeout of dynamoDbClient requests. Default is 2m.")
}

func NewClient(cfg Config, cc codec.Codec, logger log.Logger, registerer prometheus.Registerer) (*Client, error) {
Expand All @@ -69,8 +71,13 @@ func NewClient(cfg Config, cc codec.Codec, logger log.Logger, registerer prometh
MaxRetries: cfg.MaxCasRetries,
}

var kv dynamoDbClient
kv = dynamodbInstrumentation{kv: dynamoDB, ddbMetrics: ddbMetrics}
if cfg.Timeout > 0 {
kv = newDynamodbKVWithTimeout(kv, cfg.Timeout)
}
c := &Client{
kv: dynamodbInstrumentation{kv: dynamoDB, ddbMetrics: ddbMetrics},
kv: kv,
codec: cc,
logger: ddbLog(logger),
ddbMetrics: ddbMetrics,
Expand Down
77 changes: 77 additions & 0 deletions pkg/ring/kv/dynamodb/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,29 @@ func Test_UpdateStaleData(t *testing.T) {

}

func Test_DynamodbKVWithTimeout(t *testing.T) {
ddbMock := NewDynamodbClientMock()
// Backend has delay of 5s while the client timeout is 1s.
ddbWithDelay := newDynamodbKVWithDelay(ddbMock, time.Second*5)
dbWithTimeout := newDynamodbKVWithTimeout(ddbWithDelay, time.Second)

ctx := context.Background()
_, _, err := dbWithTimeout.List(ctx, dynamodbKey{primaryKey: key})
require.True(t, errors.Is(err, context.DeadlineExceeded))

err = dbWithTimeout.Delete(ctx, dynamodbKey{primaryKey: key})
require.True(t, errors.Is(err, context.DeadlineExceeded))

_, _, err = dbWithTimeout.Query(ctx, dynamodbKey{primaryKey: key}, true)
require.True(t, errors.Is(err, context.DeadlineExceeded))

err = dbWithTimeout.Put(ctx, dynamodbKey{primaryKey: key}, []byte{})
require.True(t, errors.Is(err, context.DeadlineExceeded))

err = dbWithTimeout.Batch(ctx, nil, nil)
require.True(t, errors.Is(err, context.DeadlineExceeded))
}

// NewClientMock makes a new local dynamodb client.
func NewClientMock(ddbClient dynamoDbClient, cc codec.Codec, logger log.Logger, registerer prometheus.Registerer, time time.Duration, config backoff.Config) *Client {
return &Client{
Expand Down Expand Up @@ -429,3 +452,57 @@ func (m *DescMock) FindDifference(that codec.MultiKey) (interface{}, []string, e
}
return args.Get(0), args.Get(1).([]string), err
}

type dynamodbKVWithDelayAndContextCheck struct {
ddbClient dynamoDbClient
delay time.Duration
}

func newDynamodbKVWithDelay(client dynamoDbClient, delay time.Duration) *dynamodbKVWithDelayAndContextCheck {
return &dynamodbKVWithDelayAndContextCheck{ddbClient: client, delay: delay}
}

func (d *dynamodbKVWithDelayAndContextCheck) List(ctx context.Context, key dynamodbKey) ([]string, float64, error) {
select {
case <-ctx.Done():
return nil, 0, ctx.Err()
case <-time.After(d.delay):
return d.ddbClient.List(ctx, key)
}
}

func (d *dynamodbKVWithDelayAndContextCheck) Query(ctx context.Context, key dynamodbKey, isPrefix bool) (map[string][]byte, float64, error) {
select {
case <-ctx.Done():
return nil, 0, ctx.Err()
case <-time.After(d.delay):
return d.ddbClient.Query(ctx, key, isPrefix)
}
}

func (d *dynamodbKVWithDelayAndContextCheck) Delete(ctx context.Context, key dynamodbKey) error {
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(d.delay):
return d.ddbClient.Delete(ctx, key)
}
}

func (d *dynamodbKVWithDelayAndContextCheck) Put(ctx context.Context, key dynamodbKey, data []byte) error {
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(d.delay):
return d.ddbClient.Put(ctx, key, data)
}
}

func (d *dynamodbKVWithDelayAndContextCheck) Batch(ctx context.Context, put map[dynamodbKey][]byte, delete []dynamodbKey) error {
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(d.delay):
return d.ddbClient.Batch(ctx, put, delete)
}
}
39 changes: 39 additions & 0 deletions pkg/ring/kv/dynamodb/dynamodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,45 @@ func (kv dynamodbKV) generatePutItemRequest(key dynamodbKey, data []byte) map[st
return item
}

type dynamodbKVWithTimeout struct {
ddbClient dynamoDbClient
timeout time.Duration
}

func newDynamodbKVWithTimeout(client dynamoDbClient, timeout time.Duration) *dynamodbKVWithTimeout {
return &dynamodbKVWithTimeout{ddbClient: client, timeout: timeout}
}

func (d *dynamodbKVWithTimeout) List(ctx context.Context, key dynamodbKey) ([]string, float64, error) {
ctx, cancel := context.WithTimeout(ctx, d.timeout)
defer cancel()
return d.ddbClient.List(ctx, key)
}

func (d *dynamodbKVWithTimeout) Query(ctx context.Context, key dynamodbKey, isPrefix bool) (map[string][]byte, float64, error) {
ctx, cancel := context.WithTimeout(ctx, d.timeout)
defer cancel()
return d.ddbClient.Query(ctx, key, isPrefix)
}

func (d *dynamodbKVWithTimeout) Delete(ctx context.Context, key dynamodbKey) error {
ctx, cancel := context.WithTimeout(ctx, d.timeout)
defer cancel()
return d.ddbClient.Delete(ctx, key)
}

func (d *dynamodbKVWithTimeout) Put(ctx context.Context, key dynamodbKey, data []byte) error {
ctx, cancel := context.WithTimeout(ctx, d.timeout)
defer cancel()
return d.ddbClient.Put(ctx, key, data)
}

func (d *dynamodbKVWithTimeout) Batch(ctx context.Context, put map[dynamodbKey][]byte, delete []dynamodbKey) error {
ctx, cancel := context.WithTimeout(ctx, d.timeout)
defer cancel()
return d.ddbClient.Batch(ctx, put, delete)
}

func generateItemKey(key dynamodbKey) map[string]*dynamodb.AttributeValue {
resp := map[string]*dynamodb.AttributeValue{
primaryKey: {
Expand Down
Loading