Skip to content

Move WAL replays (chunks / tsdb) into ingester's Starting state. #2222

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Mar 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
* [CHANGE] If you are vendoring Cortex and use its components in your project, be aware that many Cortex components no longer start automatically when they are created. You may want to review PR and attached document. #2166
* [CHANGE] Cortex now has /ready probe for all services, not just ingester and querier as before. In single-binary mode, /ready reports 204 only if all components are running properly. #2166
* [CHANGE] Experimental TSDB: switched the blocks storage index header to the binary format. This change is expected to have no visible impact, except lower startup times and memory usage in the queriers. It's possible to switch back to the old JSON format via the flag `-experimental.tsdb.bucket-store.binary-index-header-enabled=false`. #2223
* [CHANGE] WAL replays are now done while the rest of Cortex is starting, and more specifically, when HTTP server is running. This makes it possible to scrape metrics during WAL replays. Applies to both chunks and experimental blocks storage. #2222
* [FEATURE] Added a read-only local alertmanager config store using files named corresponding to their tenant id. #2125
* [FEATURE] Added user sub rings to distribute users to a subset of ingesters. #1947
* `--experimental.distributor.user-subring-size`
Expand Down
73 changes: 60 additions & 13 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,12 +171,17 @@ func New(cfg Config, clientConfig client.Config, limits *validation.Overrides, c
}
i.limiter = NewSeriesLimiter(limits, i.lifecycler, cfg.LifecyclerConfig.RingConfig.ReplicationFactor, cfg.ShardByAllLabels)

if cfg.WALConfig.Recover {
i.Service = services.NewBasicService(i.starting, i.loop, i.stopping)
return i, nil
}

func (i *Ingester) starting(ctx context.Context) error {
if i.cfg.WALConfig.Recover {
level.Info(util.Logger).Log("msg", "recovering from WAL")
start := time.Now()
if err := recoverFromWAL(i); err != nil {
level.Error(util.Logger).Log("msg", "failed to recover from WAL", "time", time.Since(start).String())
return nil, err
return errors.Wrap(err, "failed to recover from WAL")
}
elapsed := time.Since(start)
level.Info(util.Logger).Log("msg", "recovery from WAL completed", "time", elapsed.String())
Expand All @@ -185,20 +190,15 @@ func New(cfg Config, clientConfig client.Config, limits *validation.Overrides, c

// If the WAL recover happened, then the userStates would already be set.
if i.userStates == nil {
i.userStates = newUserStates(i.limiter, cfg, i.metrics)
i.userStates = newUserStates(i.limiter, i.cfg, i.metrics)
}

i.wal, err = newWAL(cfg.WALConfig, i.userStates.cp)
var err error
i.wal, err = newWAL(i.cfg.WALConfig, i.userStates.cp)
if err != nil {
return nil, err
return errors.Wrap(err, "starting WAL")
}

// TODO: lot more stuff can be put into startingFn (esp. WAL replay), but for now keep it in New
i.Service = services.NewBasicService(i.starting, i.loop, i.stopping)
return i, nil
}

func (i *Ingester) starting(ctx context.Context) error {
// Now that user states have been created, we can start the lifecycler.
// Important: we want to keep lifecycler running until we ask it to stop, so we need to give it independent context
if err := i.lifecycler.StartAsync(context.Background()); err != nil {
Expand Down Expand Up @@ -268,8 +268,23 @@ func (i *Ingester) stopIncomingRequests() {
i.stopped = true
}

// check that ingester has finished starting, i.e. it is in Running or Stopping state.
// Why Stopping? Because ingester still runs, even when it is transferring data out in Stopping state.
// Ingester handles this state on its own (via `stopped` flag).
func (i *Ingester) checkRunningOrStopping() error {
s := i.State()
if s == services.Running || s == services.Stopping {
return nil
}
return status.Error(codes.Unavailable, s.String())
}

// Push implements client.IngesterServer
func (i *Ingester) Push(ctx context.Context, req *client.WriteRequest) (*client.WriteResponse, error) {
if err := i.checkRunningOrStopping(); err != nil {
return nil, err
}

if i.cfg.TSDBEnabled {
return i.v2Push(ctx, req)
}
Expand Down Expand Up @@ -414,6 +429,10 @@ func (i *Ingester) append(ctx context.Context, userID string, labels labelPairs,

// Query implements service.IngesterServer
func (i *Ingester) Query(ctx context.Context, req *client.QueryRequest) (*client.QueryResponse, error) {
if err := i.checkRunningOrStopping(); err != nil {
return nil, err
}

if i.cfg.TSDBEnabled {
return i.v2Query(ctx, req)
}
Expand Down Expand Up @@ -477,6 +496,10 @@ func (i *Ingester) Query(ctx context.Context, req *client.QueryRequest) (*client

// QueryStream implements service.IngesterServer
func (i *Ingester) QueryStream(req *client.QueryRequest, stream client.Ingester_QueryStreamServer) error {
if err := i.checkRunningOrStopping(); err != nil {
return err
}

if i.cfg.TSDBEnabled {
return i.v2QueryStream(req, stream)
}
Expand Down Expand Up @@ -553,6 +576,10 @@ func (i *Ingester) QueryStream(req *client.QueryRequest, stream client.Ingester_

// LabelValues returns all label values that are associated with a given label name.
func (i *Ingester) LabelValues(ctx context.Context, req *client.LabelValuesRequest) (*client.LabelValuesResponse, error) {
if err := i.checkRunningOrStopping(); err != nil {
return nil, err
}

if i.cfg.TSDBEnabled {
return i.v2LabelValues(ctx, req)
}
Expand All @@ -574,6 +601,10 @@ func (i *Ingester) LabelValues(ctx context.Context, req *client.LabelValuesReque

// LabelNames return all the label names.
func (i *Ingester) LabelNames(ctx context.Context, req *client.LabelNamesRequest) (*client.LabelNamesResponse, error) {
if err := i.checkRunningOrStopping(); err != nil {
return nil, err
}

if i.cfg.TSDBEnabled {
return i.v2LabelNames(ctx, req)
}
Expand All @@ -595,6 +626,10 @@ func (i *Ingester) LabelNames(ctx context.Context, req *client.LabelNamesRequest

// MetricsForLabelMatchers returns all the metrics which match a set of matchers.
func (i *Ingester) MetricsForLabelMatchers(ctx context.Context, req *client.MetricsForLabelMatchersRequest) (*client.MetricsForLabelMatchersResponse, error) {
if err := i.checkRunningOrStopping(); err != nil {
return nil, err
}

if i.cfg.TSDBEnabled {
return i.v2MetricsForLabelMatchers(ctx, req)
}
Expand Down Expand Up @@ -638,6 +673,10 @@ func (i *Ingester) MetricsForLabelMatchers(ctx context.Context, req *client.Metr

// UserStats returns ingestion statistics for the current user.
func (i *Ingester) UserStats(ctx context.Context, req *client.UserStatsRequest) (*client.UserStatsResponse, error) {
if err := i.checkRunningOrStopping(); err != nil {
return nil, err
}

if i.cfg.TSDBEnabled {
return i.v2UserStats(ctx, req)
}
Expand All @@ -663,6 +702,10 @@ func (i *Ingester) UserStats(ctx context.Context, req *client.UserStatsRequest)

// AllUserStats returns ingestion statistics for all users known to this ingester.
func (i *Ingester) AllUserStats(ctx context.Context, req *client.UserStatsRequest) (*client.UsersStatsResponse, error) {
if err := i.checkRunningOrStopping(); err != nil {
return nil, err
}

if i.cfg.TSDBEnabled {
return i.v2AllUserStats(ctx, req)
}
Expand Down Expand Up @@ -692,6 +735,10 @@ func (i *Ingester) AllUserStats(ctx context.Context, req *client.UserStatsReques

// Check implements the grpc healthcheck
func (i *Ingester) Check(ctx context.Context, req *grpc_health_v1.HealthCheckRequest) (*grpc_health_v1.HealthCheckResponse, error) {
if err := i.checkRunningOrStopping(); err != nil {
return &grpc_health_v1.HealthCheckResponse{Status: grpc_health_v1.HealthCheckResponse_NOT_SERVING}, nil
}

return &grpc_health_v1.HealthCheckResponse{Status: grpc_health_v1.HealthCheckResponse_SERVING}, nil
}

Expand All @@ -704,8 +751,8 @@ func (i *Ingester) Watch(in *grpc_health_v1.HealthCheckRequest, stream grpc_heal
// the addition removal of another ingester. Returns 204 when the ingester is
// ready, 500 otherwise.
func (i *Ingester) CheckReady(ctx context.Context) error {
if s := i.State(); s != services.Running {
return fmt.Errorf("service not Running: %v", s)
if err := i.checkRunningOrStopping(); err != nil {
return fmt.Errorf("ingester not ready: %v", err)
}
return i.lifecycler.CheckReady(ctx)
}
10 changes: 5 additions & 5 deletions pkg/ingester/ingester_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,16 +131,16 @@ func NewV2(cfg Config, clientConfig client.Config, limits *validation.Overrides,
i.limiter = NewSeriesLimiter(limits, i.lifecycler, cfg.LifecyclerConfig.RingConfig.ReplicationFactor, cfg.ShardByAllLabels)
i.userStates = newUserStates(i.limiter, cfg, i.metrics)

// Scan and open TSDB's that already exist on disk // TODO: move to starting
if err := i.openExistingTSDB(context.Background()); err != nil {
return nil, err
}

i.Service = services.NewBasicService(i.startingV2, i.updateLoop, i.stoppingV2)
return i, nil
}

func (i *Ingester) startingV2(ctx context.Context) error {
// Scan and open TSDB's that already exist on disk
if err := i.openExistingTSDB(context.Background()); err != nil {
return errors.Wrap(err, "opening existing TSDBs")
}

// Important: we want to keep lifecycler running until we ask it to stop, so we need to give it independent context
if err := i.lifecycler.StartAsync(context.Background()); err != nil {
return errors.Wrap(err, "failed to start lifecycler")
Expand Down