Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
* Metrics: Renamed `cortex_parquet_queryable_cache_*` to `cortex_parquet_cache_*`.
* Flags: Renamed `-querier.parquet-queryable-shard-cache-size` to `-querier.parquet-shard-cache-size` and `-querier.parquet-queryable-shard-cache-ttl` to `-querier.parquet-shard-cache-ttl`.
* Config: Renamed `parquet_queryable_shard_cache_size` to `parquet_shard_cache_size` and `parquet_queryable_shard_cache_ttl` to `parquet_shard_cache_ttl`.
* [FEATURE] HATracker: Add experimental support for memberlist as a KV store backend. #7284
* [FEATURE] StoreGateway: Introduces a new parquet mode. #7046
* [FEATURE] StoreGateway: Add a parquet shard cache to parquet mode. #7166
* [FEATURE] Distributor: Add a per-tenant flag `-distributor.enable-type-and-unit-labels` that enables adding `__unit__` and `__type__` labels for remote write v2 and OTLP requests. This is a breaking change; the `-distributor.otlp.enable-type-and-unit-labels` flag is now deprecated, operates as a no-op, and has been consolidated into this new flag. #7077
Expand Down
2 changes: 1 addition & 1 deletion docs/configuration/arguments.md
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ The next three options only apply when the querier is used together with the Que

### Ring/HA Tracker Store

The KVStore client is used by both the Ring and HA Tracker (HA Tracker doesn't support memberlist as KV store).
The KVStore client is used by both the Ring and HA Tracker (HA Tracker supports memberlist as a KV store as an experimental feature).
- `{ring,distributor.ha-tracker}.prefix`
The prefix for the keys in the store. Should end with a /. For example with a prefix of foo/, the key bar would be stored under foo/bar.
- `{ring,distributor.ha-tracker}.store`
Expand Down
5 changes: 2 additions & 3 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -3103,9 +3103,8 @@ ha_tracker:
# CLI flag: -distributor.ha-tracker.enable-startup-sync
[enable_startup_sync: <boolean> | default = false]
# Backend storage to use for the ring. Please be aware that memberlist is not
# supported by the HA tracker since gossip propagation is too slow for HA
# purposes.
# Backend storage to use for the ring. Memberlist support in the HA tracker is
# experimental, as gossip propagation delays may impact HA performance.
kvstore:
# Backend storage to use for the ring. Supported values are: consul,
# dynamodb, etcd, inmemory, memberlist, multi.
Expand Down
3 changes: 3 additions & 0 deletions pkg/cortex/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/cortexproject/cortex/pkg/flusher"
"github.com/cortexproject/cortex/pkg/frontend"
"github.com/cortexproject/cortex/pkg/frontend/transport"
"github.com/cortexproject/cortex/pkg/ha"
"github.com/cortexproject/cortex/pkg/ingester"
"github.com/cortexproject/cortex/pkg/overrides"
"github.com/cortexproject/cortex/pkg/parquetconverter"
Expand Down Expand Up @@ -821,6 +822,7 @@ func (t *Cortex) initMemberlistKV() (services.Service, error) {
t.Cfg.MemberlistKV.MetricsRegisterer = reg
t.Cfg.MemberlistKV.Codecs = []codec.Codec{
ring.GetCodec(),
ha.GetReplicaDescCodec(),
}
dnsProviderReg := prometheus.WrapRegistererWithPrefix(
"cortex_",
Expand All @@ -835,6 +837,7 @@ func (t *Cortex) initMemberlistKV() (services.Service, error) {

// Update the config.
t.Cfg.Distributor.DistributorRing.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV
t.Cfg.Distributor.HATrackerConfig.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV
t.Cfg.Ingester.LifecyclerConfig.RingConfig.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV
t.Cfg.StoreGateway.ShardingRing.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV
t.Cfg.Compactor.ShardingRing.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV
Expand Down
93 changes: 87 additions & 6 deletions pkg/ha/ha_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/cortexproject/cortex/pkg/ring/kv"
"github.com/cortexproject/cortex/pkg/ring/kv/codec"
"github.com/cortexproject/cortex/pkg/ring/kv/memberlist"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/services"
)
Expand Down Expand Up @@ -72,14 +73,93 @@ type HATrackerConfig struct {
// of tracked keys is large.
EnableStartupSync bool `yaml:"enable_startup_sync"`

KVStore kv.Config `yaml:"kvstore" doc:"description=Backend storage to use for the ring. Please be aware that memberlist is not supported by the HA tracker since gossip propagation is too slow for HA purposes."`
KVStore kv.Config `yaml:"kvstore" doc:"description=Backend storage to use for the ring. Memberlist support in the HA tracker is experimental, as gossip propagation delays may impact HA performance."`
}

// RegisterFlags adds the flags required to config this to the given FlagSet with a specified prefix
func (cfg *HATrackerConfig) RegisterFlags(f *flag.FlagSet) {
cfg.RegisterFlagsWithPrefix("", "", f)
}

func (d *ReplicaDesc) Clone() any {
return proto.Clone(d)
}

// Merge merges other ReplicaDesc into this one and can be sent out to other clients.
// This merge function depends on the timestamp of the replica. It will choose more recent state
// from the two descriptors based on ReceivedAt timestamp.
func (d *ReplicaDesc) Merge(mergeable memberlist.Mergeable, _ bool) (memberlist.Mergeable, error) {
if mergeable == nil {
return nil, nil
}

other, ok := mergeable.(*ReplicaDesc)
if !ok {
return nil, fmt.Errorf("expected *ha.ReplicaDesc, got %T", mergeable)
}

if other == nil {
return nil, nil
}

// If other has been deleted, always take the deletion
if other.DeletedAt > 0 {
if d.DeletedAt == 0 || other.DeletedAt > d.DeletedAt {
d.Replica = other.Replica
d.ReceivedAt = other.ReceivedAt
d.DeletedAt = other.DeletedAt
return proto.Clone(d).(*ReplicaDesc), nil
}
return nil, nil
}

// If this descriptor is deleted but other isn't, and other is more recent, take the other
if d.DeletedAt > 0 && other.DeletedAt == 0 && other.ReceivedAt > d.ReceivedAt {
d.Replica = other.Replica
d.ReceivedAt = other.ReceivedAt
d.DeletedAt = other.DeletedAt
return proto.Clone(d).(*ReplicaDesc), nil
}

// Choose the descriptor with the more recent timestamp
if other.ReceivedAt > d.ReceivedAt {
d.Replica = other.Replica
d.ReceivedAt = other.ReceivedAt
d.DeletedAt = other.DeletedAt
return proto.Clone(d).(*ReplicaDesc), nil
}

// If timestamps are exactly equal but replicas differ, use lexicographic ordering
if other.ReceivedAt == d.ReceivedAt && other.Replica != d.Replica {
// Choose the lexicographically smaller replica
if other.Replica < d.Replica {
d.Replica = other.Replica
d.ReceivedAt = other.ReceivedAt
d.DeletedAt = other.DeletedAt
return proto.Clone(d).(*ReplicaDesc), nil
}
return nil, nil
}

// No change (same timestamp, same replica)
return nil, nil
}

// MergeContent describes content of this Mergeable.
// For ReplicaDesc, we return the replica name.
func (d *ReplicaDesc) MergeContent() []string {
if d.Replica == "" {
return nil
}
return []string{d.Replica}
}

// RemoveTombstones is a no-op for ReplicaDesc.
func (d *ReplicaDesc) RemoveTombstones(_ time.Time) (total, removed int) {
// No-op: HATracker manages tombstones via cleanupOldReplicas
return
}

// RegisterFlags adds the flags required to config this to the given FlagSet.
func (cfg *HATrackerConfig) RegisterFlagsWithPrefix(flagPrefix string, kvPrefix string, f *flag.FlagSet) {
finalFlagPrefix := ""
Expand Down Expand Up @@ -116,12 +196,13 @@ func (cfg *HATrackerConfig) Validate() error {
return fmt.Errorf(errInvalidFailoverTimeout, cfg.FailoverTimeout, minFailureTimeout)
}

// Tracker kv store only supports consul and etcd.
storeAllowedList := []string{"consul", "etcd"}
if slices.Contains(storeAllowedList, cfg.KVStore.Store) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR should only just extend allow list to memberlist instead of allowing all stores

return nil
// Tracker kv store only supports consul, etcd, memberlist, and multi.
storeAllowedList := []string{"consul", "etcd", "memberlist", "multi"}
if !slices.Contains(storeAllowedList, cfg.KVStore.Store) {
return fmt.Errorf("invalid HATracker KV store type: %s", cfg.KVStore.Store)
}
return fmt.Errorf("invalid HATracker KV store type: %s", cfg.KVStore.Store)

return nil
}

func GetReplicaDescCodec() codec.Proto {
Expand Down
Loading
Loading