Skip to content

Commit 08a5e87

Browse files
authored
[Experimental] Memberlist singleton and support for multiple codecs (cortexproject#2016)
* Added codec ID, to be used between memberlist clients. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Separated memberlist.Client from memberlist.KV. This allows multiple clients to use the same memberlist.KV instance. This is another step towards supporting multiple codecs per KV. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Replaced custom serialization with protobuf serialization. To support multiple codecs, we need to add a new field. As we're going to break the format anyway, we may as well switch to protobuf. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Codec ID is now part of the message exchanged between memberlist nodes. Codec is now stored in the client, while KV has a registry of codecs that are used when receiving values. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Added tests for multi-codecs KV. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Fixed client creation. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Removed Stop method from kv.Client interface. It was introduced for memberlist Client, but it will not be needed anymore as memberlist.KV will be a top-level component with its own lifecycle, separated from the client. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Memberlist KV is now top-level component. It is initialized if any component actually uses it, but not sooner. If initialized, Stop method is called when server shuts down. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * All codecs are registered to memberlist KV at the beginning. Client now only verifies that KV knows about coded that client wants to use, but doesn't register it anymore. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Report unknown codec errors as invalid messages. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Updated CHANGELOG.md Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Typos Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Comments. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Fixes Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Added yaml tag to ignore function. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Fixed unintended import reorder. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Comments. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Comments. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Return error Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Updated docs Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Improved comments in LocalState and MergeRemoteState Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Allow ruler and distributor use memberlist.KV for their internal rings. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Added test to verify that Memberlist works in single-binary mode, and all distributors can see all ingesters and their tokens. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Updated CHANGELOG.md, moved to CHANGE because of protocol break. Added note on how to upgrade. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Use "memberlist" in YAML config. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Updated comment. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Updated comments and messages. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Fixed config. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Compactor can now also use memberlist KV store. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>
1 parent 3327661 commit 08a5e87

30 files changed

+1372
-260
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
* [CHANGE] Moved `--store.min-chunk-age` to the Querier config as `--querier.query-store-after`, allowing the store to be skipped during query time if the metrics wouldn't be found. The YAML config option `ingestermaxquerylookback` has been renamed to `query_ingesters_within` to match its CLI flag. #1893
88
* `--store.min-chunk-age` has been removed
99
* `--querier.query-store-after` has been added in it's place.
10+
* [CHANGE] Experimental Memberlist KV store can now be used in single-binary Cortex. Attempts to use it previously would fail with panic. This change also breaks existing binary protocol used to exchange gossip messages, so this version will not be able to understand gossiped Ring when used in combination with the previous version of Cortex. Easiest way to upgrade is to shutdown old Cortex installation, and restart it with new version. Incremental rollout works too, but with reduced functionality until all components run the same version. #2016
1011
* [FEATURE] Added user sub rings to distribute users to a subset of ingesters. #1947
1112
* `--experimental.distributor.user-subring-size`
1213
* [FEATURE] Added flag `-experimental.ruler.enable-api` to enable the ruler api which implements the Prometheus API `/api/v1/rules` and `/api/v1/alerts` endpoints under the configured `-http.prefix`. #1999

Makefile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ pkg/querier/queryrange/queryrange.pb.go: pkg/querier/queryrange/queryrange.proto
5656
pkg/chunk/storage/caching_index_client.pb.go: pkg/chunk/storage/caching_index_client.proto
5757
pkg/distributor/ha_tracker.pb.go: pkg/distributor/ha_tracker.proto
5858
pkg/ruler/rules/rules.pb.go: pkg/ruler/rules/rules.proto
59+
pkg/ring/kv/memberlist/kv.pb.go: pkg/ring/kv/memberlist/kv.proto
5960

6061
all: $(UPTODATE_FILES)
6162
test: protos

docs/configuration/config-file-reference.md

Lines changed: 17 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,9 @@ runtime_config:
102102
# File with the configuration that can be updated in runtime.
103103
# CLI flag: -runtime-config.file
104104
[file: <string> | default = ""]
105+
106+
# The memberlist_config configures the Gossip memberlist.
107+
[memberlist: <memberlist_config>]
105108
```
106109
107110
## `server_config`
@@ -271,10 +274,6 @@ ha_tracker:
271274
# The CLI flags prefix for this block config is: distributor.ha-tracker
272275
[etcd: <etcd_config>]
273276
274-
# The memberlist_config configures the Gossip memberlist.
275-
# The CLI flags prefix for this block config is: distributor.ha-tracker
276-
[memberlist: <memberlist_config>]
277-
278277
multi:
279278
# Primary backend storage used by multi-client.
280279
# CLI flag: -distributor.ha-tracker.multi.primary
@@ -328,10 +327,6 @@ ring:
328327
# The CLI flags prefix for this block config is: distributor.ring
329328
[etcd: <etcd_config>]
330329
331-
# The memberlist_config configures the Gossip memberlist.
332-
# The CLI flags prefix for this block config is: distributor.ring
333-
[memberlist: <memberlist_config>]
334-
335330
multi:
336331
# Primary backend storage used by multi-client.
337332
# CLI flag: -distributor.ring.multi.primary
@@ -403,9 +398,6 @@ lifecycler:
403398
# The etcd_config configures the etcd client.
404399
[etcd: <etcd_config>]
405400
406-
# The memberlist_config configures the Gossip memberlist.
407-
[memberlist: <memberlist_config>]
408-
409401
multi:
410402
# Primary backend storage used by multi-client.
411403
# CLI flag: -multi.primary
@@ -740,10 +732,6 @@ ring:
740732
# The CLI flags prefix for this block config is: ruler.ring
741733
[etcd: <etcd_config>]
742734
743-
# The memberlist_config configures the Gossip memberlist.
744-
# The CLI flags prefix for this block config is: ruler.ring
745-
[memberlist: <memberlist_config>]
746-
747735
multi:
748736
# Primary backend storage used by multi-client.
749737
# CLI flag: -ruler.ring.multi.primary
@@ -1739,62 +1727,62 @@ The `memberlist_config` configures the Gossip memberlist.
17391727

17401728
```yaml
17411729
# Name of the node in memberlist cluster. Defaults to hostname.
1742-
# CLI flag: -<prefix>.memberlist.nodename
1730+
# CLI flag: -memberlist.nodename
17431731
[node_name: <string> | default = ""]
17441732
17451733
# The timeout for establishing a connection with a remote node, and for
17461734
# read/write operations. Uses memberlist LAN defaults if 0.
1747-
# CLI flag: -<prefix>.memberlist.stream-timeout
1735+
# CLI flag: -memberlist.stream-timeout
17481736
[stream_timeout: <duration> | default = 0s]
17491737
17501738
# Multiplication factor used when sending out messages (factor * log(N+1)).
1751-
# CLI flag: -<prefix>.memberlist.retransmit-factor
1739+
# CLI flag: -memberlist.retransmit-factor
17521740
[retransmit_factor: <int> | default = 0]
17531741
17541742
# How often to use pull/push sync. Uses memberlist LAN defaults if 0.
1755-
# CLI flag: -<prefix>.memberlist.pullpush-interval
1743+
# CLI flag: -memberlist.pullpush-interval
17561744
[pull_push_interval: <duration> | default = 0s]
17571745
17581746
# How often to gossip. Uses memberlist LAN defaults if 0.
1759-
# CLI flag: -<prefix>.memberlist.gossip-interval
1747+
# CLI flag: -memberlist.gossip-interval
17601748
[gossip_interval: <duration> | default = 0s]
17611749
17621750
# How many nodes to gossip to. Uses memberlist LAN defaults if 0.
1763-
# CLI flag: -<prefix>.memberlist.gossip-nodes
1751+
# CLI flag: -memberlist.gossip-nodes
17641752
[gossip_nodes: <int> | default = 0]
17651753
17661754
# Other cluster members to join. Can be specified multiple times. Memberlist
17671755
# store is EXPERIMENTAL.
1768-
# CLI flag: -<prefix>.memberlist.join
1756+
# CLI flag: -memberlist.join
17691757
[join_members: <list of string> | default = ]
17701758
17711759
# If this node fails to join memberlist cluster, abort.
1772-
# CLI flag: -<prefix>.memberlist.abort-if-join-fails
1760+
# CLI flag: -memberlist.abort-if-join-fails
17731761
[abort_if_cluster_join_fails: <boolean> | default = true]
17741762
17751763
# How long to keep LEFT ingesters in the ring.
1776-
# CLI flag: -<prefix>.memberlist.left-ingesters-timeout
1764+
# CLI flag: -memberlist.left-ingesters-timeout
17771765
[left_ingesters_timeout: <duration> | default = 5m0s]
17781766
17791767
# Timeout for leaving memberlist cluster.
1780-
# CLI flag: -<prefix>.memberlist.leave-timeout
1768+
# CLI flag: -memberlist.leave-timeout
17811769
[leave_timeout: <duration> | default = 5s]
17821770
17831771
# IP address to listen on for gossip messages. Multiple addresses may be
17841772
# specified. Defaults to 0.0.0.0
1785-
# CLI flag: -<prefix>.memberlist.bind-addr
1773+
# CLI flag: -memberlist.bind-addr
17861774
[bind_addr: <list of string> | default = ]
17871775
17881776
# Port to listen on for gossip messages.
1789-
# CLI flag: -<prefix>.memberlist.bind-port
1777+
# CLI flag: -memberlist.bind-port
17901778
[bind_port: <int> | default = 7946]
17911779
17921780
# Timeout used when connecting to other nodes to send packet.
1793-
# CLI flag: -<prefix>.memberlist.packet-dial-timeout
1781+
# CLI flag: -memberlist.packet-dial-timeout
17941782
[packet_dial_timeout: <duration> | default = 5s]
17951783
17961784
# Timeout for writing 'packet' data.
1797-
# CLI flag: -<prefix>.memberlist.packet-write-timeout
1785+
# CLI flag: -memberlist.packet-write-timeout
17981786
[packet_write_timeout: <duration> | default = 5s]
17991787
```
18001788

integration/framework/scenario.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ func (s *Scenario) StartDynamoDB() error {
126126

127127
func (s *Scenario) StartDistributor(name string, flags map[string]string, image string) error {
128128
if image == "" {
129-
image = getDefaultCortexImage()
129+
image = GetDefaultCortexImage()
130130
}
131131

132132
return s.StartService(NewService(
@@ -150,7 +150,7 @@ func (s *Scenario) StartDistributor(name string, flags map[string]string, image
150150

151151
func (s *Scenario) StartQuerier(name string, flags map[string]string, image string) error {
152152
if image == "" {
153-
image = getDefaultCortexImage()
153+
image = GetDefaultCortexImage()
154154
}
155155

156156
return s.StartService(NewService(
@@ -173,7 +173,7 @@ func (s *Scenario) StartQuerier(name string, flags map[string]string, image stri
173173

174174
func (s *Scenario) StartIngester(name string, flags map[string]string, image string) error {
175175
if image == "" {
176-
image = getDefaultCortexImage()
176+
image = GetDefaultCortexImage()
177177
}
178178

179179
return s.StartService(NewService(
@@ -201,7 +201,7 @@ func (s *Scenario) StartIngester(name string, flags map[string]string, image str
201201

202202
func (s *Scenario) StartTableManager(name string, flags map[string]string, image string) error {
203203
if image == "" {
204-
image = getDefaultCortexImage()
204+
image = GetDefaultCortexImage()
205205
}
206206

207207
return s.StartService(NewService(
@@ -305,8 +305,8 @@ func existDockerNetwork() (bool, error) {
305305
return strings.TrimSpace(string(out)) != "", nil
306306
}
307307

308-
// getDefaultCortexImage returns the Docker image to use to run Cortex.
309-
func getDefaultCortexImage() string {
308+
// GetDefaultCortexImage returns the Docker image to use to run Cortex.
309+
func GetDefaultCortexImage() string {
310310
// Get the cortex image from the CORTEX_IMAGE env variable,
311311
// falling back to "quay.io/cortexproject/cortex:latest"
312312
if os.Getenv("CORTEX_IMAGE") != "" {

integration/framework/service.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,10 @@ func NewService(
6464
}
6565
}
6666

67+
func (s *Service) SetBackoff(cfg util.BackoffConfig) {
68+
s.retryBackoff = util.NewBackoff(context.Background(), cfg)
69+
}
70+
6771
func (s *Service) Start() (err error) {
6872
// In case of any error, if the container was already created, we
6973
// have to cleanup removing it. We ignore the error of the "docker rm"
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
package main
2+
3+
import (
4+
"testing"
5+
"time"
6+
7+
"github.com/stretchr/testify/require"
8+
9+
"github.com/cortexproject/cortex/integration/framework"
10+
"github.com/cortexproject/cortex/pkg/util"
11+
)
12+
13+
func TestSingleBinaryWithMemberlist(t *testing.T) {
14+
s, err := framework.NewScenario()
15+
require.NoError(t, err)
16+
defer s.Shutdown()
17+
18+
// Start dependencies
19+
require.NoError(t, s.StartDynamoDB())
20+
// Look ma, no Consul!
21+
require.NoError(t, s.WaitReady("dynamodb"))
22+
23+
require.NoError(t, startSingleBinary(s, "cortex-1", ""))
24+
require.NoError(t, startSingleBinary(s, "cortex-2", "cortex-1:8000"))
25+
require.NoError(t, startSingleBinary(s, "cortex-3", "cortex-2:8000"))
26+
27+
require.NoError(t, s.WaitReady("cortex-1", "cortex-2", "cortex-3"))
28+
29+
// All three Cortex serves should see each other.
30+
require.NoError(t, s.Service("cortex-1").WaitMetric(80, "memberlist_client_cluster_members_count", 3))
31+
require.NoError(t, s.Service("cortex-2").WaitMetric(80, "memberlist_client_cluster_members_count", 3))
32+
require.NoError(t, s.Service("cortex-3").WaitMetric(80, "memberlist_client_cluster_members_count", 3))
33+
34+
// All Cortex servers should have 512 tokens, altogether 3 * 512
35+
require.NoError(t, s.Service("cortex-1").WaitMetric(80, "cortex_ring_tokens_total", 3*512))
36+
require.NoError(t, s.Service("cortex-2").WaitMetric(80, "cortex_ring_tokens_total", 3*512))
37+
require.NoError(t, s.Service("cortex-3").WaitMetric(80, "cortex_ring_tokens_total", 3*512))
38+
39+
require.NoError(t, s.StopService("cortex-1"))
40+
require.NoError(t, s.Service("cortex-2").WaitMetric(80, "cortex_ring_tokens_total", 2*512))
41+
require.NoError(t, s.Service("cortex-2").WaitMetric(80, "memberlist_client_cluster_members_count", 2))
42+
require.NoError(t, s.Service("cortex-3").WaitMetric(80, "cortex_ring_tokens_total", 2*512))
43+
require.NoError(t, s.Service("cortex-3").WaitMetric(80, "memberlist_client_cluster_members_count", 2))
44+
45+
require.NoError(t, s.StopService("cortex-2"))
46+
require.NoError(t, s.Service("cortex-3").WaitMetric(80, "cortex_ring_tokens_total", 1*512))
47+
require.NoError(t, s.Service("cortex-3").WaitMetric(80, "memberlist_client_cluster_members_count", 1))
48+
49+
require.NoError(t, s.StopService("cortex-3"))
50+
}
51+
52+
func startSingleBinary(s *framework.Scenario, name string, join string) error {
53+
flags := map[string]string{
54+
"-target": "all", // single-binary mode
55+
"-log.level": "warn",
56+
"-ingester.final-sleep": "0s",
57+
"-ingester.join-after": "0s", // join quickly
58+
"-ingester.min-ready-duration": "0s",
59+
"-ingester.concurrent-flushes": "10",
60+
"-ingester.max-transfer-retries": "0", // disable
61+
"-ingester.num-tokens": "512",
62+
"-ingester.observe-period": "5s", // to avoid conflicts in tokens
63+
"-ring.store": "memberlist",
64+
"-memberlist.bind-port": "8000",
65+
}
66+
67+
if join != "" {
68+
flags["-memberlist.join"] = join
69+
}
70+
71+
serv := framework.NewService(
72+
name,
73+
framework.GetDefaultCortexImage(),
74+
framework.NetworkName,
75+
[]int{80, 8000},
76+
nil,
77+
framework.NewCommandWithoutEntrypoint("cortex", framework.BuildArgs(framework.MergeFlags(ChunksStorage, flags))...),
78+
framework.NewReadinessProbe(80, "/ready", 204),
79+
)
80+
81+
backOff := util.BackoffConfig{
82+
MinBackoff: 100 * time.Millisecond,
83+
MaxBackoff: 500 * time.Millisecond, // bump max backoff... things take little longer with memberlist
84+
MaxRetries: 100,
85+
}
86+
87+
serv.SetBackoff(backOff)
88+
return s.StartService(serv)
89+
}

pkg/cortex/cortex.go

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030
"github.com/cortexproject/cortex/pkg/querier/frontend"
3131
"github.com/cortexproject/cortex/pkg/querier/queryrange"
3232
"github.com/cortexproject/cortex/pkg/ring"
33+
"github.com/cortexproject/cortex/pkg/ring/kv/memberlist"
3334
"github.com/cortexproject/cortex/pkg/ruler"
3435
"github.com/cortexproject/cortex/pkg/storage/tsdb"
3536
"github.com/cortexproject/cortex/pkg/util"
@@ -84,6 +85,7 @@ type Config struct {
8485
ConfigStore config_client.Config `yaml:"config_store,omitempty"`
8586
Alertmanager alertmanager.MultitenantAlertmanagerConfig `yaml:"alertmanager,omitempty"`
8687
RuntimeConfig runtimeconfig.ManagerConfig `yaml:"runtime_config,omitempty"`
88+
MemberlistKV memberlist.KVConfig `yaml:"memberlist"`
8789
}
8890

8991
// RegisterFlags registers flag.
@@ -119,6 +121,7 @@ func (c *Config) RegisterFlags(f *flag.FlagSet) {
119121
c.ConfigStore.RegisterFlagsWithPrefix("alertmanager.", f)
120122
c.Alertmanager.RegisterFlags(f)
121123
c.RuntimeConfig.RegisterFlags(f)
124+
c.MemberlistKV.RegisterFlags(f, "")
122125

123126
// These don't seem to have a home.
124127
flag.IntVar(&chunk_util.QueryParallelism, "querier.query-parallelism", 100, "Max subqueries run in parallel per higher-level query.")
@@ -171,11 +174,12 @@ type Cortex struct {
171174
cache cache.Cache
172175
runtimeConfig *runtimeconfig.Manager
173176

174-
ruler *ruler.Ruler
175-
configAPI *api.API
176-
configDB db.DB
177-
alertmanager *alertmanager.MultitenantAlertmanager
178-
compactor *compactor.Compactor
177+
ruler *ruler.Ruler
178+
configAPI *api.API
179+
configDB db.DB
180+
alertmanager *alertmanager.MultitenantAlertmanager
181+
compactor *compactor.Compactor
182+
memberlistKVState *memberlistKVState
179183

180184
// The chunk store that the querier should use to query the long
181185
// term storage. It depends on the storage engine used.

pkg/cortex/memberlist_kv.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
package cortex
2+
3+
import (
4+
"sync"
5+
6+
"github.com/cortexproject/cortex/pkg/ring/kv/memberlist"
7+
)
8+
9+
// This struct holds state of initialization of memberlist.KV instance.
10+
type memberlistKVState struct {
11+
// config used for initialization
12+
cfg *memberlist.KVConfig
13+
14+
// init function, to avoid multiple initializations.
15+
init sync.Once
16+
17+
// state
18+
kv *memberlist.KV
19+
err error
20+
}
21+
22+
func newMemberlistKVState(cfg *memberlist.KVConfig) *memberlistKVState {
23+
return &memberlistKVState{
24+
cfg: cfg,
25+
}
26+
}
27+
28+
func (kvs *memberlistKVState) getMemberlistKV() (*memberlist.KV, error) {
29+
kvs.init.Do(func() {
30+
kvs.kv, kvs.err = memberlist.NewKV(*kvs.cfg)
31+
})
32+
33+
return kvs.kv, kvs.err
34+
}

0 commit comments

Comments
 (0)