Skip to content

Lifecycler errors don't make process exit immediately, but let other modules shutdown properly #2251

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Mar 11, 2020
Merged
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

* [CHANGE] Utilize separate protos for rule state and storage. Experimental ruler API will not be functional until the rollout is complete. #2226
* [CHANGE] Frontend worker in querier now starts after all Querier module dependencies are started. This fixes issue where frontend worker started to send queries to querier before it was ready to serve them (mostly visible when using experimental blocks storage). #2246
* [CHANGE] Lifecycler component now enters Failed state on errors, and doesn't exit the process. (Important if you're vendoring Cortex and use Lifecycler) #2251
* [FEATURE] Flusher target to flush the WAL.
* `-flusher.wal-dir` for the WAL directory to recover from.
* `-flusher.concurrent-flushes` for number of concurrent flushes.
Expand Down
8 changes: 7 additions & 1 deletion pkg/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ type Compactor struct {
ring *ring.Ring

// Subservices manager (ring, lifecycler)
subservices *services.Manager
subservices *services.Manager
subservicesWatcher *services.FailureWatcher

// Metrics.
compactionRunsStarted prometheus.Counter
Expand Down Expand Up @@ -181,6 +182,9 @@ func (c *Compactor) starting(ctx context.Context) error {

c.subservices, err = services.NewManager(c.ringLifecycler, c.ring)
if err == nil {
c.subservicesWatcher = services.NewFailureWatcher()
c.subservicesWatcher.WatchManager(c.subservices)

err = services.StartManagerAndAwaitHealthy(ctx, c.subservices)
}

Expand Down Expand Up @@ -228,6 +232,8 @@ func (c *Compactor) running(ctx context.Context) error {
c.compactUsersWithRetries(ctx)
case <-ctx.Done():
return nil
case err := <-c.subservicesWatcher.Chan():
return errors.Wrap(err, "compactor subservice failed")
}
}
}
Expand Down
17 changes: 15 additions & 2 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

opentracing "github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/common/model"
Expand Down Expand Up @@ -112,7 +113,8 @@ type Distributor struct {
ingestionRateLimiter *limiter.RateLimiter

// Manager for subservices (HA Tracker, distributor ring and client pool)
subservices *services.Manager
subservices *services.Manager
subservicesWatcher *services.FailureWatcher
}

// Config contains the configuration require to
Expand Down Expand Up @@ -208,8 +210,10 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove
if err != nil {
return nil, err
}
d.subservicesWatcher = services.NewFailureWatcher()
d.subservicesWatcher.WatchManager(d.subservices)

d.Service = services.NewIdleService(d.starting, d.stopping)
d.Service = services.NewBasicService(d.starting, d.running, d.stopping)
return d, nil
}

Expand All @@ -218,6 +222,15 @@ func (d *Distributor) starting(ctx context.Context) error {
return services.StartManagerAndAwaitHealthy(ctx, d.subservices)
}

func (d *Distributor) running(ctx context.Context) error {
select {
case <-ctx.Done():
return nil
case err := <-d.subservicesWatcher.Chan():
return errors.Wrap(err, "distributor subservice failed")
}
}

// Called after distributor is asked to stop via StopAsync.
func (d *Distributor) stopping(_ error) error {
return services.StopManagerAndAwaitStopped(context.Background(), d.subservices)
Expand Down
14 changes: 10 additions & 4 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,11 @@ type Ingester struct {

metrics *ingesterMetrics

chunkStore ChunkStore
lifecycler *ring.Lifecycler
limits *validation.Overrides
limiter *SeriesLimiter
chunkStore ChunkStore
lifecycler *ring.Lifecycler
limits *validation.Overrides
limiter *SeriesLimiter
subservicesWatcher *services.FailureWatcher

userStatesMtx sync.RWMutex // protects userStates and stopped
userStates *userStates
Expand Down Expand Up @@ -170,6 +171,8 @@ func New(cfg Config, clientConfig client.Config, limits *validation.Overrides, c
return nil, err
}
i.limiter = NewSeriesLimiter(limits, i.lifecycler, cfg.LifecyclerConfig.RingConfig.ReplicationFactor, cfg.ShardByAllLabels)
i.subservicesWatcher = services.NewFailureWatcher()
i.subservicesWatcher.WatchService(i.lifecycler)

i.Service = services.NewBasicService(i.starting, i.loop, i.stopping)
return i, nil
Expand Down Expand Up @@ -279,6 +282,9 @@ func (i *Ingester) loop(ctx context.Context) error {

case <-ctx.Done():
return nil

case err := <-i.subservicesWatcher.Chan():
return errors.Wrap(err, "ingester subservice failed")
}
}
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/ingester/ingester_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,8 @@ func NewV2(cfg Config, clientConfig client.Config, limits *validation.Overrides,
if err != nil {
return nil, err
}
i.subservicesWatcher = services.NewFailureWatcher()
i.subservicesWatcher.WatchService(i.lifecycler)

// Init the limter and instantiate the user states which depend on it
i.limiter = NewSeriesLimiter(limits, i.lifecycler, cfg.LifecyclerConfig.RingConfig.ReplicationFactor, cfg.ShardByAllLabels)
Expand Down Expand Up @@ -209,6 +211,8 @@ func (i *Ingester) updateLoop(ctx context.Context) error {
}
case <-ctx.Done():
return nil
case err := <-i.subservicesWatcher.Chan():
return errors.Wrap(err, "ingester subservice failed")
}
}
}
Expand Down
26 changes: 13 additions & 13 deletions pkg/ring/lifecycler.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"time"

"github.com/go-kit/kit/log/level"
perrors "github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

Expand Down Expand Up @@ -339,15 +340,10 @@ func (i *Lifecycler) HealthyInstancesCount() int {
}

func (i *Lifecycler) loop(ctx context.Context) error {
defer func() {
level.Info(util.Logger).Log("msg", "member.loop() exited gracefully", "ring", i.RingName)
}()

// First, see if we exist in the cluster, update our state to match if we do,
// and add ourselves (without tokens) if we don't.
if err := i.initRing(context.Background()); err != nil {
level.Error(util.Logger).Log("msg", "failed to join the ring", "ring", i.RingName, "err", err)
os.Exit(1)
return perrors.Wrapf(err, "failed to join the ring %s", i.RingName)
}

// We do various period tasks
Expand All @@ -370,16 +366,14 @@ func (i *Lifecycler) loop(ctx context.Context) error {
// let's observe the ring. By using JOINING state, this ingester will be ignored by LEAVING
// ingesters, but we also signal that it is not fully functional yet.
if err := i.autoJoin(context.Background(), JOINING); err != nil {
level.Error(util.Logger).Log("msg", "failed to pick tokens in the KV store", "ring", i.RingName, "err", err)
os.Exit(1)
return perrors.Wrapf(err, "failed to pick tokens in the KV store, ring: %s", i.RingName)
}

level.Info(util.Logger).Log("msg", "observing tokens before going ACTIVE", "ring", i.RingName)
observeChan = time.After(i.cfg.ObservePeriod)
} else {
if err := i.autoJoin(context.Background(), ACTIVE); err != nil {
level.Error(util.Logger).Log("msg", "failed to pick tokens in the KV store", "ring", i.RingName, "err", err)
os.Exit(1)
return perrors.Wrapf(err, "failed to pick tokens in the KV store, ring: %s", i.RingName)
}
}
}
Expand Down Expand Up @@ -416,6 +410,7 @@ func (i *Lifecycler) loop(ctx context.Context) error {
f()

case <-ctx.Done():
level.Info(util.Logger).Log("msg", "lifecycler loop() exited gracefully", "ring", i.RingName)
return nil
}
}
Expand All @@ -425,7 +420,13 @@ func (i *Lifecycler) loop(ctx context.Context) error {
// - send chunks to another ingester, if it can.
// - otherwise, flush chunks to the chunk store.
// - remove config from Consul.
func (i *Lifecycler) stopping(_ error) error {
func (i *Lifecycler) stopping(runningError error) error {
if runningError != nil {
// previously lifecycler just called os.Exit (from loop method)...
// now it stops more gracefully, but also without doing any cleanup
return nil
}

heartbeatTicker := time.NewTicker(i.cfg.HeartbeatPeriod)
defer heartbeatTicker.Stop()

Expand Down Expand Up @@ -459,8 +460,7 @@ heartbeatLoop:

if !i.cfg.SkipUnregister {
if err := i.unregister(context.Background()); err != nil {
level.Error(util.Logger).Log("msg", "Failed to unregister from the KV store", "ring", i.RingName, "err", err)
os.Exit(1)
return perrors.Wrapf(err, "failed to unregister from the KV store, ring: %s", i.RingName)
}
level.Info(util.Logger).Log("msg", "instance removed from the KV store", "ring", i.RingName)
}
Expand Down
35 changes: 35 additions & 0 deletions pkg/util/services/failure_watch_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package services

import (
"context"
"errors"
"testing"

e2 "github.com/pkg/errors"
"github.com/stretchr/testify/require"
)

func TestNilServiceFailureWatcher(t *testing.T) {
var w *FailureWatcher = nil

// prove it doesn't fail, but returns nil channel.
require.Nil(t, w.Chan())
}

func TestServiceFailureWatcher(t *testing.T) {
w := NewFailureWatcher()

err := errors.New("this error doesn't end with dot")

failing := NewBasicService(nil, nil, func(_ error) error {
return err
})

w.WatchService(failing)

require.NoError(t, failing.StartAsync(context.Background()))

e := <-w.Chan()
require.NotNil(t, e)
require.Equal(t, err, e2.Cause(e))
}
35 changes: 35 additions & 0 deletions pkg/util/services/failure_watcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package services

import (
"github.com/pkg/errors"
)

// FailureWatcher waits for service failures, and passed them to the channel.
type FailureWatcher struct {
ch chan error
}

func NewFailureWatcher() *FailureWatcher {
return &FailureWatcher{ch: make(chan error)}
}

// Returns channel for this watcher. If watcher is nil, returns nil channel.
// Errors returned on the channel include failure case and service description.
func (w *FailureWatcher) Chan() <-chan error {
if w == nil {
return nil
}
return w.ch
}

func (w *FailureWatcher) WatchService(service Service) {
service.AddListener(NewListener(nil, nil, nil, nil, func(from State, failure error) {
w.ch <- errors.Wrapf(failure, "service %v failed", service)
}))
}

func (w *FailureWatcher) WatchManager(manager *Manager) {
manager.AddListener(NewManagerListener(nil, nil, func(service Service) {
w.ch <- errors.Wrapf(service.FailureCase(), "service %v failed", service)
}))
}