Skip to content

Commit

Permalink
[memberlist] Add support to join memberlist via SRV records (#2788)
Browse files Browse the repository at this point in the history
This PR enables dns based discovery for members to join via SRV
records. Member lookup is only supported on memberlist initial
creation and not part of any periodic re-joins.

Signed-off-by: Periklis Tsirakidis <periklis@redhat.com>

Co-authored-by: Periklis Tsirakidis <periklis@redhat.com>
periklis and periklis authored Jul 2, 2020
1 parent d5c319b commit 0ea5a8b
Showing 4 changed files with 58 additions and 8 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -33,6 +33,7 @@
* [ENHANCEMENT] Experimental TSDB: Added `-experimental.tsdb.head-compaction-idle-timeout` option to force compaction of data in memory into a block. #2803
* [ENHANCEMENT] Experimental TSDB: Added support for flushing blocks via `/flush`, `/shutdown` (previously these only worked for chunks storage) and by using `-experimental.tsdb.flush-blocks-on-shutdown` option. #2794
* [ENHANCEMENT] Ingester: Added new metric `cortex_ingester_flush_series_in_progress` that reports number of ongoing flush-series operations. Useful when calling `/flush` handler: if `cortex_ingester_flush_queue_length + cortex_ingester_flush_series_in_progress` is 0, all flushes are finished. #2778
* [ENHANCEMENT] Memberlist members can join cluster via SRV records. #2788
* [BUGFIX] Fixed a bug in the index intersect code causing storage to return more chunks/series than required. #2796
* [BUGFIX] Fixed the number of reported keys in the background cache queue. #2764
* [BUGFIX] Fix race in processing of headers in sharded queries. #2762
1 change: 1 addition & 0 deletions docs/configuration/arguments.md
Original file line number Diff line number Diff line change
@@ -475,6 +475,7 @@ Some clients in Cortex support service discovery via DNS to find addresses of ba

- [Blocks storage's memcached index cache](../operations/blocks-storage.md#memcached-index-cache)
- [All caching memcached servers](./config-file-reference.md#memcached-client-config)
- [Memberlist KV store](./config-file-reference.md#memberlist-config)

### Supported discovery modes

5 changes: 4 additions & 1 deletion docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
@@ -2323,7 +2323,10 @@ The `memberlist_config` configures the Gossip memberlist.
# CLI flag: -memberlist.dead-node-reclaim-time
[dead_node_reclaim_time: <duration> | default = 0s]
# Other cluster members to join. Can be specified multiple times.
# Other cluster members to join. Can be specified multiple times. It can be an
# IP, hostname or an entry specified in the DNS Service Discovery format (see
# https://cortexmetrics.io/docs/configuration/arguments/#dns-service-discovery
# for more details).
# CLI flag: -memberlist.join
[join_members: <list of string> | default = ]
59 changes: 52 additions & 7 deletions pkg/ring/kv/memberlist/memberlist_client.go
Original file line number Diff line number Diff line change
@@ -16,6 +16,8 @@ import (
"github.com/go-kit/kit/log/level"
"github.com/hashicorp/memberlist"
"github.com/prometheus/client_golang/prometheus"
"github.com/thanos-io/thanos/pkg/discovery/dns"
"github.com/thanos-io/thanos/pkg/extprom"

"github.com/cortexproject/cortex/pkg/ring/kv/codec"
"github.com/cortexproject/cortex/pkg/util"
@@ -164,7 +166,7 @@ func (cfg *KVConfig) RegisterFlags(f *flag.FlagSet, prefix string) {
f.BoolVar(&cfg.RandomizeNodeName, prefix+"memberlist.randomize-node-name", true, "Add random suffix to the node name.")
f.DurationVar(&cfg.StreamTimeout, prefix+"memberlist.stream-timeout", 0, "The timeout for establishing a connection with a remote node, and for read/write operations. Uses memberlist LAN defaults if 0.")
f.IntVar(&cfg.RetransmitMult, prefix+"memberlist.retransmit-factor", 0, "Multiplication factor used when sending out messages (factor * log(N+1)).")
f.Var(&cfg.JoinMembers, prefix+"memberlist.join", "Other cluster members to join. Can be specified multiple times.")
f.Var(&cfg.JoinMembers, prefix+"memberlist.join", "Other cluster members to join. Can be specified multiple times. It can be an IP, hostname or an entry specified in the DNS Service Discovery format (see https://cortexmetrics.io/docs/configuration/arguments/#dns-service-discovery for more details).")
f.DurationVar(&cfg.MinJoinBackoff, prefix+"memberlist.min-join-backoff", 1*time.Second, "Min backoff duration to join other cluster members.")
f.DurationVar(&cfg.MaxJoinBackoff, prefix+"memberlist.max-join-backoff", 1*time.Minute, "Max backoff duration to join other cluster members.")
f.IntVar(&cfg.MaxJoinRetries, prefix+"memberlist.max-join-retries", 10, "Max number of retries to join other cluster members.")
@@ -201,6 +203,9 @@ type KV struct {

cfg KVConfig

// dns discovery provider
provider *dns.Provider

// Protects access to memberlist and broadcasts fields.
initWG sync.WaitGroup
memberlist *memberlist.Memberlist
@@ -268,15 +273,23 @@ var (
)

// NewKV creates new gossip-based KV service. Note that service needs to be started, until then it doesn't initialize
// gossiping part. Only after service is in Running state, it is really gossiping.
// Starting the service will also trigger connecting to the existing memberlist cluster, if cfg.JoinMembers is set.
// If that fails and AbortIfJoinFails is true, error is returned and service enters Failed state.
// gossiping part. Only after service is in Running state, it is really gossiping. Starting the service will also
// trigger connecting to the existing memberlist cluster. If that fails and AbortIfJoinFails is true, error is returned
// and service enters Failed state.
func NewKV(cfg KVConfig) *KV {
cfg.TCPTransport.MetricsRegisterer = cfg.MetricsRegisterer
cfg.TCPTransport.MetricsNamespace = cfg.MetricsNamespace

mr := extprom.WrapRegistererWithPrefix("cortex_",
extprom.WrapRegistererWith(
prometheus.Labels{"name": "memberlist"},
cfg.MetricsRegisterer,
),
)

mlkv := &KV{
cfg: cfg,
provider: dns.NewProvider(util.Logger, mr, dns.GolangResolverType),
store: make(map[string]valueDesc),
codecs: make(map[string]codec.Codec),
watchers: make(map[string][]chan string),
@@ -380,7 +393,10 @@ func (m *KV) running(ctx context.Context) error {
// Join the cluster, if configured. We want this to happen in Running state, because started memberlist
// is good enough for usage from Client (which checks for Running state), even before it connects to the cluster.
if len(m.cfg.JoinMembers) > 0 {
err := m.joinMembersOnStartup(ctx, m.cfg.JoinMembers)
// Lookup SRV records for given addresses to discover members.
members := m.discoverMembers(ctx, m.cfg.JoinMembers)

err := m.joinMembersOnStartup(ctx, members)
if err != nil {
level.Error(util.Logger).Log("msg", "failed to join memberlist cluster", "err", err)

@@ -391,7 +407,7 @@ func (m *KV) running(ctx context.Context) error {
}

var tickerChan <-chan time.Time = nil
if m.cfg.RejoinInterval > 0 {
if m.cfg.RejoinInterval > 0 && len(m.cfg.JoinMembers) > 0 {
t := time.NewTicker(m.cfg.RejoinInterval)
defer t.Stop()

@@ -401,7 +417,9 @@ func (m *KV) running(ctx context.Context) error {
for {
select {
case <-tickerChan:
reached, err := m.memberlist.Join(m.cfg.JoinMembers)
members := m.discoverMembers(ctx, m.cfg.JoinMembers)

reached, err := m.memberlist.Join(members)
if err == nil {
level.Info(util.Logger).Log("msg", "re-joined memberlist cluster", "reached_nodes", reached)
} else {
@@ -475,6 +493,33 @@ func (m *KV) joinMembersOnStartup(ctx context.Context, members []string) error {
return lastErr
}

// Provides a dns-based member disovery to join a memberlist cluster w/o knowning members' addresses upfront.
func (m *KV) discoverMembers(ctx context.Context, members []string) []string {
if len(members) == 0 {
return nil
}

var ms, resolve []string

for _, member := range members {
if strings.Contains(member, "+") {
resolve = append(resolve, member)
} else {
// No DNS SRV record to lookup, just append member
ms = append(ms, member)
}
}

err := m.provider.Resolve(ctx, resolve)
if err != nil {
level.Error(util.Logger).Log("msg", "failed to resolve members", "addrs", strings.Join(resolve, ","))
}

ms = append(ms, m.provider.Addresses()...)

return ms
}

// While Stopping, we try to leave memberlist cluster and then shutdown memberlist client.
// We do this in order to send out last messages, typically that ingester has LEFT the ring.
func (m *KV) stopping(_ error) error {

0 comments on commit 0ea5a8b

Please sign in to comment.