From e2f8663345a1a962a6c7c0e69d963bcf92821144 Mon Sep 17 00:00:00 2001 From: Ganesh Vernekar <15064823+codesome@users.noreply.github.com> Date: Tue, 4 Aug 2020 18:40:29 +0530 Subject: [PATCH] Avoid indefinite checkpointing (#2955) Signed-off-by: Ganesh Vernekar Co-authored-by: Goutham Veeramachaneni --- CHANGELOG.md | 1 + pkg/ingester/wal.go | 10 ++++++++++ 2 files changed, 11 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index b6fa18354f..ee426c9d3a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -110,6 +110,7 @@ * [BUGFIX] Fixed `Missing chunks and index config causing silent failure` Absence of chunks and index from schema config is not validated. #2732 * [BUGFIX] Fix panic caused by KVs from boltdb being used beyond their life. #2971 * [BUGFIX] Experimental TSDB: `/api/v1/series`, `/api/v1/labels` and `/api/v1/label/{name}/values` only query the TSDB head regardless of the configured `-experimental.blocks-storage.tsdb.retention-period`. #2974 +* [BUGFIX] Ingester: Avoid indefinite checkpointing in case of surge in number of series. #2955 ## 1.2.0 / 2020-07-01 diff --git a/pkg/ingester/wal.go b/pkg/ingester/wal.go index 203587d273..68ba158f4f 100644 --- a/pkg/ingester/wal.go +++ b/pkg/ingester/wal.go @@ -337,6 +337,7 @@ func (w *walWrapper) performCheckpoint(immediate bool) (err error) { totalSize := 0 ticker := time.NewTicker(perSeriesDuration) defer ticker.Stop() + start := time.Now() for userID, state := range us { for pair := range state.fpToSeries.iter() { state.fpLocker.Lock(pair.fp) @@ -361,6 +362,15 @@ func (w *walWrapper) performCheckpoint(immediate bool) (err error) { } if !immediate { + if time.Since(start) > 2*w.cfg.CheckpointDuration { + // This could indicate a surge in number of series and continuing with + // the old estimation of ticker can make checkpointing run indefinitely in worst case + // and disk running out of space. Re-adjust the ticker might not solve the problem + // as there can be another surge again. Hence let's checkpoint this one immediately. + immediate = true + continue + } + select { case <-ticker.C: case <-w.quit: // When we're trying to shutdown, finish the checkpoint as fast as possible.