Skip to content

Commit 43f503b

Browse files
committed
Join memberlist on starting with no retry
Signed-off-by: Daniel Blando <ddeluigg@amazon.com>
1 parent 49ce5ab commit 43f503b

File tree

3 files changed

+59
-4
lines changed

3 files changed

+59
-4
lines changed

CHANGELOG.md

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
11
# Changelog
22

33
## master / unreleased
4-
* [FEATURE] Compactor: Added `-compactor.block-files-concurrency` allowing to configure number of go routines for download/upload block files during compaction. #4784
54
* [ENHANCEMENT] Querier/Ruler: Retry store-gateway in case of unexpected failure, instead of failing the query. #4532
65
* [ENHANCEMENT] Ring: DoBatch prioritize 4xx errors when failing. #4783
6+
* [FEATURE] Compactor: Added `-compactor.block-files-concurrency` allowing to configure number of go routines for download/upload block files during compaction. #4784
77
* [FEATURE] Compactor: Added -compactor.blocks-fetch-concurrency` allowing to configure number of go routines for blocks during compaction. #4787
8+
* [BUGFIX] Memberlist: Add join with no retrying when starting service. #4804
9+
10+
811

912
## 1.13.0 2022-07-14
1013

pkg/ring/kv/memberlist/memberlist_client.go

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -411,7 +411,7 @@ func (m *KV) buildMemberlistConfig() (*memberlist.Config, error) {
411411
return mlCfg, nil
412412
}
413413

414-
func (m *KV) starting(_ context.Context) error {
414+
func (m *KV) starting(ctx context.Context) error {
415415
mlCfg, err := m.buildMemberlistConfig()
416416
if err != nil {
417417
return err
@@ -438,6 +438,15 @@ func (m *KV) starting(_ context.Context) error {
438438
}
439439
m.initWG.Done()
440440

441+
if len(m.cfg.JoinMembers) > 0 {
442+
// Lookup SRV records for given addresses to discover members.
443+
members := m.discoverMembers(ctx, m.cfg.JoinMembers)
444+
445+
err := m.joinMembersOnStarting(members)
446+
if err != nil {
447+
level.Warn(m.logger).Log("msg", "failed to join memberlist cluster on startup", "err", err)
448+
}
449+
}
441450
return nil
442451
}
443452

@@ -450,7 +459,7 @@ func (m *KV) running(ctx context.Context) error {
450459
// Lookup SRV records for given addresses to discover members.
451460
members := m.discoverMembers(ctx, m.cfg.JoinMembers)
452461

453-
err := m.joinMembersOnStartup(ctx, members)
462+
err := m.joinMembersOnRunning(ctx, members)
454463
if err != nil {
455464
level.Error(m.logger).Log("msg", "failed to join memberlist cluster", "err", err)
456465

@@ -517,7 +526,7 @@ func (m *KV) JoinMembers(members []string) (int, error) {
517526
return m.memberlist.Join(members)
518527
}
519528

520-
func (m *KV) joinMembersOnStartup(ctx context.Context, members []string) error {
529+
func (m *KV) joinMembersOnRunning(ctx context.Context, members []string) error {
521530
reached, err := m.memberlist.Join(members)
522531
if err == nil {
523532
level.Info(m.logger).Log("msg", "joined memberlist cluster", "reached_nodes", reached)
@@ -556,6 +565,16 @@ func (m *KV) joinMembersOnStartup(ctx context.Context, members []string) error {
556565
return lastErr
557566
}
558567

568+
func (m *KV) joinMembersOnStarting(members []string) error {
569+
reached, err := m.memberlist.Join(members)
570+
if err == nil {
571+
level.Info(m.logger).Log("msg", "joined memberlist cluster", "reached_nodes", reached)
572+
return nil
573+
}
574+
575+
return err
576+
}
577+
559578
// Provides a dns-based member disovery to join a memberlist cluster w/o knowning members' addresses upfront.
560579
func (m *KV) discoverMembers(ctx context.Context, members []string) []string {
561580
if len(members) == 0 {

pkg/ring/kv/memberlist/memberlist_client_test.go

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -789,6 +789,39 @@ func TestMemberlistFailsToJoin(t *testing.T) {
789789
require.Equal(t, mkv.FailureCase(), errFailedToJoinCluster)
790790
}
791791

792+
func TestMemberlistJoinOnStarting(t *testing.T) {
793+
ports, err := getFreePorts(2)
794+
require.NoError(t, err)
795+
796+
var cfg1 KVConfig
797+
flagext.DefaultValues(&cfg1)
798+
cfg1.TCPTransport = TCPTransportConfig{
799+
BindAddrs: []string{"localhost"},
800+
BindPort: ports[0],
801+
}
802+
803+
cfg1.RandomizeNodeName = true
804+
cfg1.Codecs = []codec.Codec{dataCodec{}}
805+
cfg1.AbortIfJoinFails = false
806+
807+
cfg2 := cfg1
808+
cfg2.TCPTransport.BindPort = ports[1]
809+
cfg2.JoinMembers = []string{fmt.Sprintf("localhost:%d", ports[0])}
810+
cfg2.RejoinInterval = 1 * time.Second
811+
812+
mkv1 := NewKV(cfg1, log.NewNopLogger(), &dnsProviderMock{}, prometheus.NewPedanticRegistry())
813+
require.NoError(t, mkv1.starting(context.Background()))
814+
815+
mkv2 := NewKV(cfg2, log.NewNopLogger(), &dnsProviderMock{}, prometheus.NewPedanticRegistry())
816+
require.NoError(t, mkv2.starting(context.Background()))
817+
818+
membersFunc := func() interface{} {
819+
return mkv2.memberlist.NumMembers()
820+
}
821+
822+
poll(t, 5*time.Second, 2, membersFunc)
823+
}
824+
792825
func getFreePorts(count int) ([]int, error) {
793826
var ports []int
794827
for i := 0; i < count; i++ {

0 commit comments

Comments
 (0)