Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update prepare shutdown #9175

Merged
merged 19 commits into from
Apr 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

##### Enhancements

* [9175](https://github.com/grafana/loki/pull/9175) **MichelHollands**: Ingester: update the `prepare_shutdown` endpoint so it supports GET and DELETE and stores the state on disk.
* [8953](https://github.com/grafana/loki/pull/8953) **dannykopping**: Querier: block queries by hash.
* [8851](https://github.com/grafana/loki/pull/8851) **jeschkies**: Introduce limit to require a set of labels for selecting streams.
* [9016](https://github.com/grafana/loki/pull/9016) **kavirajk**: Change response type of `format_query` handler to `application/json`
Expand Down
14 changes: 11 additions & 3 deletions docs/sources/api/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -625,14 +625,22 @@ In microservices mode, the `/flush` endpoint is exposed by the ingester.
### Tell ingester to release all resources on next SIGTERM

```
POST /ingester/prepare_shutdown
GET, POST, DELETE /ingester/prepare_shutdown
```

`/ingester/prepare_shutdown` will prepare the ingester to release resources on the next SIGTERM signal,
where releasing resources means flushing data and unregistering from the ingester ring.
After a `POST` to the `prepare_shutdown` endpoint returns, when the ingester process is stopped with `SIGINT` / `SIGTERM`,
the ingester will be unregistered from the ring and in-memory time series data will be flushed to long-term storage.
This endpoint supersedes any YAML configurations and isn't necessary if the ingester is already
configured to unregister from the ring or to flush on shutdown.

A `GET` to the `prepare_shutdown` endpoint returns the status of this configuration, either `set` or `unset`.

A `DELETE` to the `prepare_shutdown` endpoint reverts the configuration of the ingester to its previous state
(with respect to unregistering on shutdown and flushing of in-memory time series data to long-term storage).

This API endpoint is usually used by Kubernetes-specific scale down automations such as the
[rollout-operator](https://github.com/grafana/rollout-operator).

## Flush in-memory chunks and shut down

```
Expand Down
5 changes: 5 additions & 0 deletions docs/sources/configuration/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -1480,6 +1480,11 @@ wal:
# Maximum number of dropped streams to keep in memory during tailing.
# CLI flag: -ingester.tailer.max-dropped-streams
[max_dropped_streams: <int> | default = 10]

# Path where the shutdown marker file is stored. If not set and
# common.path_prefix is set then common.path_prefix will be used.
# CLI flag: -ingester.shutdown-marker-path
[shutdown_marker_path: <string> | default = ""]
```

### index_gateway
Expand Down
9 changes: 9 additions & 0 deletions docs/sources/upgrading/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,15 @@ The output is incredibly verbose as it shows the entire internal config struct u

## Main / Unreleased

### Loki

#### Shutdown marker file

A shutdown marker file can be written by the `/ingester/prepare_shutdown` endpoint.
If the new `ingester.shutdown_marker_path` config setting has a value that value is used.
If not the`common.path_prefix` config setting is used if it has a value. Otherwise a warning is shown
in the logs on startup and the `/ingester/prepare_shutdown` endpoint will return a 500 status code.

## 2.8.0

### Loki
Expand Down
3 changes: 1 addition & 2 deletions pkg/ingester/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"github.com/grafana/loki/pkg/chunkenc"
"github.com/grafana/loki/pkg/storage/chunk"
"github.com/grafana/loki/pkg/util"
loki_util "github.com/grafana/loki/pkg/util"
util_log "github.com/grafana/loki/pkg/util/log"
)

Expand Down Expand Up @@ -292,7 +291,7 @@ func (i *Ingester) flushChunks(ctx context.Context, fp model.Fingerprint, labelP
return fmt.Errorf("chunk close for flushing: %w", err)
}

firstTime, lastTime := loki_util.RoundToMilliseconds(c.chunk.Bounds())
firstTime, lastTime := util.RoundToMilliseconds(c.chunk.Bounds())
ch := chunk.NewChunk(
userID, fp, metric,
chunkenc.NewFacade(c.chunk, i.cfg.BlockSize, i.cfg.TargetChunkSize),
Expand Down
168 changes: 158 additions & 10 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,15 @@ import (
"fmt"
"net/http"
"os"
"path"
"path/filepath"
"sync"
"time"

"github.com/go-kit/log/level"
"github.com/grafana/dskit/concurrency"
"github.com/grafana/dskit/modules"
"github.com/grafana/dskit/multierror"
"github.com/grafana/dskit/ring"
"github.com/grafana/dskit/services"
"github.com/grafana/dskit/tenant"
Expand Down Expand Up @@ -40,14 +42,15 @@ import (
index_stats "github.com/grafana/loki/pkg/storage/stores/index/stats"
"github.com/grafana/loki/pkg/usagestats"
"github.com/grafana/loki/pkg/util"
errUtil "github.com/grafana/loki/pkg/util"
util_log "github.com/grafana/loki/pkg/util/log"
"github.com/grafana/loki/pkg/util/wal"
)

const (
// RingKey is the key under which we store the ingesters ring in the KVStore.
RingKey = "ring"

shutdownMarkerFilename = "shutdown-requested.txt"
)

// ErrReadOnly is returned when the ingester is shutting down and a push was
Expand Down Expand Up @@ -105,6 +108,8 @@ type Config struct {
IndexShards int `yaml:"index_shards"`

MaxDroppedStreams int `yaml:"max_dropped_streams"`

ShutdownMarkerPath string `yaml:"shutdown_marker_path"`
}

// RegisterFlags registers the flags.
Expand All @@ -129,6 +134,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.BoolVar(&cfg.AutoForgetUnhealthy, "ingester.autoforget-unhealthy", false, "Forget about ingesters having heartbeat timestamps older than `ring.kvstore.heartbeat_timeout`. This is equivalent to clicking on the `/ring` `forget` button in the UI: the ingester is removed from the ring. This is a useful setting when you are sure that an unhealthy node won't return. An example is when not using stateful sets or the equivalent. Use `memberlist.rejoin_interval` > 0 to handle network partition cases when using a memberlist.")
f.IntVar(&cfg.IndexShards, "ingester.index-shards", index.DefaultIndexShards, "Shard factor used in the ingesters for the in process reverse index. This MUST be evenly divisible by ALL schema shard factors or Loki will not start.")
f.IntVar(&cfg.MaxDroppedStreams, "ingester.tailer.max-dropped-streams", 10, "Maximum number of dropped streams to keep in memory during tailing.")
f.StringVar(&cfg.ShutdownMarkerPath, "ingester.shutdown-marker-path", "", "Path where the shutdown marker file is stored. If not set and common.path_prefix is set then common.path_prefix will be used.")
}

func (cfg *Config) Validate() error {
Expand Down Expand Up @@ -477,6 +483,17 @@ func (i *Ingester) starting(ctx context.Context) error {
return err
}

shutdownMarkerPath := path.Join(i.cfg.ShutdownMarkerPath, shutdownMarkerFilename)
shutdownMarker, err := shutdownMarkerExists(shutdownMarkerPath)
if err != nil {
return errors.Wrap(err, "failed to check ingester shutdown marker")
}

if shutdownMarker {
level.Info(util_log.Logger).Log("msg", "detected existing shutdown marker, setting unregister and flush on shutdown", "path", shutdownMarkerPath)
i.setPrepareShutdown()
}

// start our loop
i.loopDone.Add(1)
go i.loop()
Expand Down Expand Up @@ -509,7 +526,7 @@ func (i *Ingester) running(ctx context.Context) error {
// At this point, loop no longer runs, but flushers are still running.
func (i *Ingester) stopping(_ error) error {
i.stopIncomingRequests()
var errs errUtil.MultiError
var errs util.MultiError
errs.Add(i.wal.Stop())

if i.flushOnShutdownSwitch.Get() {
Expand Down Expand Up @@ -575,13 +592,144 @@ func (i *Ingester) LegacyShutdownHandler(w http.ResponseWriter, r *http.Request)
//
// Internally, when triggered, this handler will configure the ingester service to release their resources whenever a SIGTERM is received.
// Releasing resources meaning flushing data, deleting tokens, and removing itself from the ring.
//
// It also creates a file on disk which is used to re-apply the configuration if the
// ingester crashes and restarts before being permanently shutdown.
//
// * `GET` shows the status of this configuration
// * `POST` enables this configuration
// * `DELETE` disables this configuration
func (i *Ingester) PrepareShutdown(w http.ResponseWriter, r *http.Request) {
if i.cfg.ShutdownMarkerPath == "" {
w.WriteHeader(http.StatusInternalServerError)
return
}
shutdownMarkerPath := path.Join(i.cfg.ShutdownMarkerPath, shutdownMarkerFilename)

switch r.Method {
case http.MethodGet:
exists, err := shutdownMarkerExists(shutdownMarkerPath)
if err != nil {
level.Error(util_log.Logger).Log("msg", "unable to check for prepare-shutdown marker file", "path", shutdownMarkerPath, "err", err)
w.WriteHeader(http.StatusInternalServerError)
return
}

if exists {
util.WriteTextResponse(w, "set")
} else {
util.WriteTextResponse(w, "unset")
}
case http.MethodPost:
if err := createShutdownMarker(shutdownMarkerPath); err != nil {
level.Error(util_log.Logger).Log("msg", "unable to create prepare-shutdown marker file", "path", shutdownMarkerPath, "err", err)
w.WriteHeader(http.StatusInternalServerError)
return
}

i.setPrepareShutdown()
level.Info(util_log.Logger).Log("msg", "created prepare-shutdown marker file", "path", shutdownMarkerPath)

w.WriteHeader(http.StatusNoContent)
case http.MethodDelete:
if err := removeShutdownMarker(shutdownMarkerPath); err != nil {
level.Error(util_log.Logger).Log("msg", "unable to remove prepare-shutdown marker file", "path", shutdownMarkerPath, "err", err)
w.WriteHeader(http.StatusInternalServerError)
return
}

i.unsetPrepareShutdown()
level.Info(util_log.Logger).Log("msg", "removed prepare-shutdown marker file", "path", shutdownMarkerPath)

w.WriteHeader(http.StatusNoContent)
default:
w.WriteHeader(http.StatusMethodNotAllowed)
}
}

// setPrepareShutdown toggles ingester lifecycler config to prepare for shutdown
func (i *Ingester) setPrepareShutdown() {
level.Info(util_log.Logger).Log("msg", "preparing full ingester shutdown, resources will be released on SIGTERM")
i.lifecycler.SetFlushOnShutdown(true)
i.lifecycler.SetUnregisterOnShutdown(true)
i.terminateOnShutdown = true
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
i.metrics.shutdownMarker.Set(1)
}

func (i *Ingester) unsetPrepareShutdown() {
level.Info(util_log.Logger).Log("msg", "undoing preparation for full ingester shutdown")
i.lifecycler.SetFlushOnShutdown(!i.cfg.WAL.Enabled || i.cfg.WAL.FlushOnShutdown)
i.lifecycler.SetUnregisterOnShutdown(i.cfg.LifecyclerConfig.UnregisterOnShutdown)
i.terminateOnShutdown = false
i.metrics.shutdownMarker.Set(0)
}

// createShutdownMarker writes a marker file to disk to indicate that an ingester is
// going to be scaled down in the future. The presence of this file means that an ingester
// should flush and upload all data when stopping.
func createShutdownMarker(p string) error {
// Write the file, fsync it, then fsync the containing directory in order to guarantee
// it is persisted to disk. From https://man7.org/linux/man-pages/man2/fsync.2.html
//
// > Calling fsync() does not necessarily ensure that the entry in the
// > directory containing the file has also reached disk. For that an
// > explicit fsync() on a file descriptor for the directory is also
// > needed.
file, err := os.Create(p)
if err != nil {
return err
}

merr := multierror.New()
_, err = file.WriteString(time.Now().UTC().Format(time.RFC3339))
merr.Add(err)
merr.Add(file.Sync())
merr.Add(file.Close())

if err := merr.Err(); err != nil {
return err
}

dir, err := os.OpenFile(path.Dir(p), os.O_RDONLY, 0777)
if err != nil {
return err
}

merr.Add(dir.Sync())
merr.Add(dir.Close())
return merr.Err()
}

// removeShutdownMarker removes the shutdown marker file if it exists.
func removeShutdownMarker(p string) error {
err := os.Remove(p)
if err != nil && !os.IsNotExist(err) {
return err
}

dir, err := os.OpenFile(path.Dir(p), os.O_RDONLY, 0777)
if err != nil {
return err
}

merr := multierror.New()
merr.Add(dir.Sync())
merr.Add(dir.Close())
return merr.Err()
}

// shutdownMarkerExists returns true if the shutdown marker file exists, false otherwise
func shutdownMarkerExists(p string) (bool, error) {
s, err := os.Stat(p)
if err != nil && os.IsNotExist(err) {
return false, nil
}

if err != nil {
return false, err
}

return s.Mode().IsRegular(), nil
}

// ShutdownHandler handles a graceful shutdown of the ingester service and
Expand Down Expand Up @@ -708,13 +856,13 @@ func (i *Ingester) Query(req *logproto.QueryRequest, queryServer logproto.Querie
}}
storeItr, err := i.store.SelectLogs(ctx, storeReq)
if err != nil {
errUtil.LogErrorWithContext(ctx, "closing iterator", it.Close)
util.LogErrorWithContext(ctx, "closing iterator", it.Close)
return err
}
it = iter.NewMergeEntryIterator(ctx, []iter.EntryIterator{it, storeItr}, req.Direction)
}

defer errUtil.LogErrorWithContext(ctx, "closing iterator", it.Close)
defer util.LogErrorWithContext(ctx, "closing iterator", it.Close)

// sendBatches uses -1 to specify no limit.
batchLimit := int32(req.Limit)
Expand Down Expand Up @@ -754,14 +902,14 @@ func (i *Ingester) QuerySample(req *logproto.SampleQueryRequest, queryServer log
}}
storeItr, err := i.store.SelectSamples(ctx, storeReq)
if err != nil {
errUtil.LogErrorWithContext(ctx, "closing iterator", it.Close)
util.LogErrorWithContext(ctx, "closing iterator", it.Close)
return err
}

it = iter.NewMergeSampleIterator(ctx, []iter.SampleIterator{it, storeItr})
}

defer errUtil.LogErrorWithContext(ctx, "closing iterator", it.Close)
defer util.LogErrorWithContext(ctx, "closing iterator", it.Close)

return sendSampleBatches(ctx, it, queryServer)
}
Expand Down Expand Up @@ -803,7 +951,7 @@ func (i *Ingester) GetChunkIDs(ctx context.Context, req *logproto.GetChunkIDsReq
reqStart = adjustQueryStartTime(asyncStoreMaxLookBack, reqStart, time.Now())

// parse the request
start, end := errUtil.RoundToMilliseconds(reqStart, req.End)
start, end := util.RoundToMilliseconds(reqStart, req.End)
matchers, err := syntax.ParseMatchers(req.Matchers)
if err != nil {
return nil, err
Expand Down Expand Up @@ -897,7 +1045,7 @@ func (i *Ingester) Label(ctx context.Context, req *logproto.LabelRequest) (*logp
}

return &logproto.LabelResponse{
Values: errUtil.MergeStringLists(resp.Values, storeValues),
Values: util.MergeStringLists(resp.Values, storeValues),
}, nil
}

Expand Down
Loading