Skip to content

Commit 91a1261

Browse files
authored
Don't update lastUpdateTime if TSDB isn't updated. (#3727)
Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>
1 parent eb02c0d commit 91a1261

File tree

3 files changed

+91
-1
lines changed

3 files changed

+91
-1
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
## master / unreleased
44

5+
* [CHANGE] Ingester: don't update internal "last updated" timestamp of TSDB if tenant only sends invalid samples. This affects how "idle" time is computed. #3727
56

67
## 1.7.0 in progress
78

pkg/ingester/ingester_v2.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -767,7 +767,10 @@ func (i *Ingester) v2Push(ctx context.Context, req *client.WriteRequest) (*clien
767767
}
768768
i.TSDBState.appenderCommitDuration.Observe(time.Since(startCommit).Seconds())
769769

770-
db.setLastUpdate(time.Now())
770+
// If only invalid samples are pushed, don't change "last update", as TSDB was not modified.
771+
if succeededSamplesCount > 0 {
772+
db.setLastUpdate(time.Now())
773+
}
771774

772775
// Increment metrics only if the samples have been successfully committed.
773776
// If the code didn't reach this point, it means that we returned an error

pkg/ingester/ingester_v2_test.go

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1884,6 +1884,47 @@ func TestIngester_closeAndDeleteUserTSDBIfIdle_shouldNotCloseTSDBIfShippingIsInP
18841884
assert.Equal(t, tsdbNotActive, i.closeAndDeleteUserTSDBIfIdle(userID))
18851885
}
18861886

1887+
func TestIngester_idleCloseEmptyTSDB(t *testing.T) {
1888+
ctx := context.Background()
1889+
cfg := defaultIngesterTestConfig()
1890+
cfg.BlocksStorageConfig.TSDB.ShipInterval = 1 * time.Minute
1891+
cfg.BlocksStorageConfig.TSDB.HeadCompactionInterval = 1 * time.Minute
1892+
cfg.BlocksStorageConfig.TSDB.CloseIdleTSDBTimeout = 0 // Will not run the loop, but will allow us to close any TSDB fast.
1893+
1894+
// Create ingester
1895+
i, cleanup, err := newIngesterMockWithTSDBStorage(cfg, nil)
1896+
require.NoError(t, err)
1897+
defer cleanup()
1898+
1899+
require.NoError(t, services.StartAndAwaitRunning(ctx, i))
1900+
defer services.StopAndAwaitTerminated(ctx, i) //nolint:errcheck
1901+
1902+
// Wait until it's ACTIVE
1903+
test.Poll(t, 1*time.Second, ring.ACTIVE, func() interface{} {
1904+
return i.lifecycler.GetState()
1905+
})
1906+
1907+
db, err := i.getOrCreateTSDB(userID, true)
1908+
require.NoError(t, err)
1909+
require.NotNil(t, db)
1910+
1911+
// Run compaction and shipping.
1912+
i.compactBlocks(context.Background(), true)
1913+
i.shipBlocks(context.Background())
1914+
1915+
// Make sure we can close completely empty TSDB without problems.
1916+
require.Equal(t, tsdbIdleClosed, i.closeAndDeleteUserTSDBIfIdle(userID))
1917+
1918+
// Verify that it was closed.
1919+
db = i.getTSDB(userID)
1920+
require.Nil(t, db)
1921+
1922+
// And we can recreate it again, if needed.
1923+
db, err = i.getOrCreateTSDB(userID, true)
1924+
require.NoError(t, err)
1925+
require.NotNil(t, db)
1926+
}
1927+
18871928
type shipperMock struct {
18881929
mock.Mock
18891930
}
@@ -1894,6 +1935,51 @@ func (m *shipperMock) Sync(ctx context.Context) (uploaded int, err error) {
18941935
return args.Int(0), args.Error(1)
18951936
}
18961937

1938+
func TestIngester_invalidSamplesDontChangeLastUpdateTime(t *testing.T) {
1939+
cfg := defaultIngesterTestConfig()
1940+
cfg.LifecyclerConfig.JoinAfter = 0
1941+
1942+
// Create ingester
1943+
i, cleanup, err := newIngesterMockWithTSDBStorage(cfg, nil)
1944+
require.NoError(t, err)
1945+
defer cleanup()
1946+
1947+
require.NoError(t, services.StartAndAwaitRunning(context.Background(), i))
1948+
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck
1949+
1950+
// Wait until it's ACTIVE
1951+
test.Poll(t, 1*time.Second, ring.ACTIVE, func() interface{} {
1952+
return i.lifecycler.GetState()
1953+
})
1954+
1955+
ctx := user.InjectOrgID(context.Background(), userID)
1956+
sampleTimestamp := int64(model.Now())
1957+
1958+
{
1959+
req, _, _ := mockWriteRequest(labels.Labels{{Name: labels.MetricName, Value: "test"}}, 0, sampleTimestamp)
1960+
_, err = i.v2Push(ctx, req)
1961+
require.NoError(t, err)
1962+
}
1963+
1964+
db := i.getTSDB(userID)
1965+
lastUpdate := db.lastUpdate.Load()
1966+
1967+
// Wait until 1 second passes.
1968+
test.Poll(t, 1*time.Second, time.Now().Unix()+1, func() interface{} {
1969+
return time.Now().Unix()
1970+
})
1971+
1972+
// Push another sample to the same metric and timestamp, with different value. We expect to get error.
1973+
{
1974+
req, _, _ := mockWriteRequest(labels.Labels{{Name: labels.MetricName, Value: "test"}}, 1, sampleTimestamp)
1975+
_, err = i.v2Push(ctx, req)
1976+
require.Error(t, err)
1977+
}
1978+
1979+
// Make sure last update hasn't changed.
1980+
require.Equal(t, lastUpdate, db.lastUpdate.Load())
1981+
}
1982+
18971983
func TestIngester_flushing(t *testing.T) {
18981984
for name, tc := range map[string]struct {
18991985
setupIngester func(cfg *Config)

0 commit comments

Comments
 (0)