Skip to content

Commit a32c3e1

Browse files
pracuccigouthamve
authored andcommitted
Do not close and re-open TSDBs during ingester transfer (#1854)
* Do not close and reopen TSDBs while transferring during the ingester shutdown Signed-off-by: Marco Pracucci <marco@pracucci.com> * Close all TSDBs once the transfer has successfully completed on ingester shutdown Signed-off-by: Marco Pracucci <marco@pracucci.com> * Fixed race condition when counting in-flight TSDB Push() requests Signed-off-by: Marco Pracucci <marco@pracucci.com> * Refactored util.WaitGroupWithTimeout() to accept a context in input instead of a timeout Signed-off-by: Marco Pracucci <marco@pracucci.com> * Fixed race in util.WaitGroup() tests Signed-off-by: Marco Pracucci <marco@pracucci.com> * Switched util.WaitGroup() from returing bool to error to give the caller more info about the reasion Signed-off-by: Marco Pracucci <marco@pracucci.com> * Do not use the parent context for the timeout while waiting for in-flight requests to complete in the ingester's TSDB transfer Signed-off-by: Marco Pracucci <marco@pracucci.com> * Re-introduced erroneously removed assertions in ingester transfer tests Signed-off-by: Marco Pracucci <marco@pracucci.com> * Improved comment Signed-off-by: Marco Pracucci <marco@pracucci.com> * Added comment about race condition protection Signed-off-by: Marco Pracucci <marco@pracucci.com> * Fixed lifecycle test after rebasing master Signed-off-by: Marco Pracucci <marco@pracucci.com>
1 parent 3329a99 commit a32c3e1

File tree

5 files changed

+312
-114
lines changed

5 files changed

+312
-114
lines changed

pkg/ingester/ingester_v2.go

Lines changed: 60 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package ingester
33
import (
44
"fmt"
55
"net/http"
6+
"sync"
67
"time"
78

89
"github.com/cortexproject/cortex/pkg/ingester/client"
@@ -32,6 +33,13 @@ const (
3233
type TSDBState struct {
3334
dbs map[string]*tsdb.DB // tsdb sharded by userID
3435
bucket objstore.Bucket
36+
37+
// Keeps count of in-flight requests
38+
inflightWriteReqs sync.WaitGroup
39+
40+
// Used to run only once operations at shutdown, during the blocks/wal
41+
// transferring to a joining ingester
42+
transferOnce sync.Once
3543
}
3644

3745
// NewV2 returns a new Ingester that uses prometheus block storage instead of chunk storage
@@ -84,6 +92,24 @@ func (i *Ingester) v2Push(ctx old_ctx.Context, req *client.WriteRequest) (*clien
8492
return nil, err
8593
}
8694

95+
// Ensure the ingester shutdown procedure hasn't started
96+
i.userStatesMtx.RLock()
97+
98+
if i.stopped {
99+
i.userStatesMtx.RUnlock()
100+
return nil, fmt.Errorf("ingester stopping")
101+
}
102+
103+
// Keep track of in-flight requests, in order to safely start blocks transfer
104+
// (at shutdown) only once all in-flight write requests have completed.
105+
// It's important to increase the number of in-flight requests within the lock
106+
// (even if sync.WaitGroup is thread-safe), otherwise there's a race condition
107+
// with the TSDB transfer, which - after the stopped flag is set to true - waits
108+
// until all in-flight requests to reach zero.
109+
i.TSDBState.inflightWriteReqs.Add(1)
110+
i.userStatesMtx.RUnlock()
111+
defer i.TSDBState.inflightWriteReqs.Done()
112+
87113
// Keep track of some stats which are tracked only if the samples will be
88114
// successfully committed
89115
succeededSamplesCount := 0
@@ -96,10 +122,6 @@ func (i *Ingester) v2Push(ctx old_ctx.Context, req *client.WriteRequest) (*clien
96122
lset := cortex_tsdb.FromLabelAdaptersToLabels(ts.Labels)
97123

98124
for _, s := range ts.Samples {
99-
if i.stopped {
100-
return nil, fmt.Errorf("ingester stopping")
101-
}
102-
103125
_, err := app.Add(lset, s.TimestampMs, s.Value)
104126
if err == nil {
105127
succeededSamplesCount++
@@ -119,7 +141,7 @@ func (i *Ingester) v2Push(ctx old_ctx.Context, req *client.WriteRequest) (*clien
119141

120142
// The error looks an issue on our side, so we should rollback
121143
if rollbackErr := app.Rollback(); rollbackErr != nil {
122-
level.Warn(util.Logger).Log("failed to rollback on error", "userID", userID, "err", rollbackErr)
144+
level.Warn(util.Logger).Log("msg", "failed to rollback on error", "userID", userID, "err", rollbackErr)
123145
}
124146

125147
return nil, err
@@ -397,3 +419,36 @@ func (i *Ingester) getOrCreateTSDB(userID string, force bool) (*tsdb.DB, error)
397419

398420
return db, nil
399421
}
422+
423+
func (i *Ingester) closeAllTSDB() {
424+
i.userStatesMtx.Lock()
425+
426+
wg := &sync.WaitGroup{}
427+
wg.Add(len(i.TSDBState.dbs))
428+
429+
// Concurrently close all users TSDB
430+
for userID, db := range i.TSDBState.dbs {
431+
userID := userID
432+
433+
go func(db *tsdb.DB) {
434+
defer wg.Done()
435+
436+
if err := db.Close(); err != nil {
437+
level.Warn(util.Logger).Log("msg", "unable to close TSDB", "err", err, "user", userID)
438+
return
439+
}
440+
441+
// Now that the TSDB has been closed, we should remove it from the
442+
// set of open ones. This lock acquisition doesn't deadlock with the
443+
// outer one, because the outer one is released as soon as all go
444+
// routines are started.
445+
i.userStatesMtx.Lock()
446+
delete(i.TSDBState.dbs, userID)
447+
i.userStatesMtx.Unlock()
448+
}(db)
449+
}
450+
451+
// Wait until all Close() completed
452+
i.userStatesMtx.Unlock()
453+
wg.Wait()
454+
}

pkg/ingester/lifecycle_test.go

Lines changed: 110 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package ingester
22

33
import (
4+
"errors"
45
"io"
56
"io/ioutil"
67
"math"
@@ -395,94 +396,116 @@ func TestIngesterFlush(t *testing.T) {
395396
}
396397

397398
func TestV2IngesterTransfer(t *testing.T) {
398-
limits, err := validation.NewOverrides(defaultLimitsTestConfig())
399-
require.NoError(t, err)
400-
401-
dir1, err := ioutil.TempDir("", "tsdb")
402-
require.NoError(t, err)
403-
dir2, err := ioutil.TempDir("", "tsdb")
404-
require.NoError(t, err)
405-
require.NoError(t, os.Remove(dir2)) // remove the destination dir so there isn't a move conflict
406-
407-
// Start the first ingester, and get it into ACTIVE state.
408-
cfg1 := defaultIngesterTestConfig()
409-
cfg1.TSDBEnabled = true
410-
cfg1.TSDBConfig.Dir = dir1
411-
cfg1.TSDBConfig.S3 = s3.Config{
412-
Endpoint: "dummy",
413-
BucketName: "dummy",
414-
SecretAccessKey: "dummy",
415-
AccessKeyID: "dummy",
399+
scenarios := map[string]struct {
400+
failedTransfers int
401+
}{
402+
"transfer succeeded at first attempt": {
403+
failedTransfers: 0,
404+
},
405+
"transfer failed at first attempt, then succeeded": {
406+
failedTransfers: 1,
407+
},
416408
}
417-
cfg1.LifecyclerConfig.ID = "ingester1"
418-
cfg1.LifecyclerConfig.Addr = "ingester1"
419-
cfg1.LifecyclerConfig.JoinAfter = 0 * time.Second
420-
cfg1.MaxTransferRetries = 10
421-
ing1, err := New(cfg1, defaultClientTestConfig(), limits, nil, nil)
422-
require.NoError(t, err)
423-
424-
test.Poll(t, 100*time.Millisecond, ring.ACTIVE, func() interface{} {
425-
return ing1.lifecycler.GetState()
426-
})
427-
428-
// Now write a sample to this ingester
429-
req, expectedResponse := mockWriteRequest(labels.Labels{{Name: labels.MetricName, Value: "foo"}}, 456, 123000)
430-
ctx := user.InjectOrgID(context.Background(), userID)
431-
_, err = ing1.Push(ctx, req)
432-
require.NoError(t, err)
433409

434-
// Start a second ingester, but let it go into PENDING
435-
cfg2 := defaultIngesterTestConfig()
436-
cfg2.TSDBEnabled = true
437-
cfg2.TSDBConfig.Dir = dir2
438-
cfg2.TSDBConfig.S3 = s3.Config{
439-
Endpoint: "dummy",
440-
BucketName: "dummy",
441-
SecretAccessKey: "dummy",
442-
AccessKeyID: "dummy",
410+
// We run the same under different scenarios
411+
for name, scenario := range scenarios {
412+
t.Run(name, func(t *testing.T) {
413+
limits, err := validation.NewOverrides(defaultLimitsTestConfig())
414+
require.NoError(t, err)
415+
416+
dir1, err := ioutil.TempDir("", "tsdb")
417+
require.NoError(t, err)
418+
dir2, err := ioutil.TempDir("", "tsdb")
419+
require.NoError(t, err)
420+
require.NoError(t, os.Remove(dir2)) // remove the destination dir so there isn't a move conflict
421+
422+
// Start the first ingester, and get it into ACTIVE state.
423+
cfg1 := defaultIngesterTestConfig()
424+
cfg1.TSDBEnabled = true
425+
cfg1.TSDBConfig.Dir = dir1
426+
cfg1.TSDBConfig.S3 = s3.Config{
427+
Endpoint: "dummy",
428+
BucketName: "dummy",
429+
SecretAccessKey: "dummy",
430+
AccessKeyID: "dummy",
431+
}
432+
cfg1.LifecyclerConfig.ID = "ingester1"
433+
cfg1.LifecyclerConfig.Addr = "ingester1"
434+
cfg1.LifecyclerConfig.JoinAfter = 0 * time.Second
435+
cfg1.MaxTransferRetries = 10
436+
ing1, err := New(cfg1, defaultClientTestConfig(), limits, nil, nil)
437+
require.NoError(t, err)
438+
439+
test.Poll(t, 100*time.Millisecond, ring.ACTIVE, func() interface{} {
440+
return ing1.lifecycler.GetState()
441+
})
442+
443+
// Now write a sample to this ingester
444+
req, expectedResponse := mockWriteRequest(labels.Labels{{Name: labels.MetricName, Value: "foo"}}, 456, 123000)
445+
ctx := user.InjectOrgID(context.Background(), userID)
446+
_, err = ing1.Push(ctx, req)
447+
require.NoError(t, err)
448+
449+
// Start a second ingester, but let it go into PENDING
450+
cfg2 := defaultIngesterTestConfig()
451+
cfg2.TSDBEnabled = true
452+
cfg2.TSDBConfig.Dir = dir2
453+
cfg2.TSDBConfig.S3 = s3.Config{
454+
Endpoint: "dummy",
455+
BucketName: "dummy",
456+
SecretAccessKey: "dummy",
457+
AccessKeyID: "dummy",
458+
}
459+
cfg2.LifecyclerConfig.RingConfig.KVStore.Mock = cfg1.LifecyclerConfig.RingConfig.KVStore.Mock
460+
cfg2.LifecyclerConfig.ID = "ingester2"
461+
cfg2.LifecyclerConfig.Addr = "ingester2"
462+
cfg2.LifecyclerConfig.JoinAfter = 100 * time.Second
463+
ing2, err := New(cfg2, defaultClientTestConfig(), limits, nil, nil)
464+
require.NoError(t, err)
465+
466+
// Let ing1 send blocks/wal to ing2
467+
ingesterClientFactoryCount := 0
468+
469+
ing1.cfg.ingesterClientFactory = func(addr string, _ client.Config) (client.HealthAndIngesterClient, error) {
470+
if ingesterClientFactoryCount++; ingesterClientFactoryCount <= scenario.failedTransfers {
471+
return nil, errors.New("mocked error simulating the case the leaving ingester is unable to connect to the joining ingester")
472+
}
473+
474+
return ingesterClientAdapater{
475+
ingester: ing2,
476+
}, nil
477+
}
478+
479+
// Now stop the first ingester, and wait for the second ingester to become ACTIVE.
480+
ing1.Shutdown()
481+
test.Poll(t, 10*time.Second, ring.ACTIVE, func() interface{} {
482+
return ing2.lifecycler.GetState()
483+
})
484+
485+
// And check the second ingester has the sample
486+
matcher, err := labels.NewMatcher(labels.MatchEqual, model.MetricNameLabel, "foo")
487+
require.NoError(t, err)
488+
489+
request, err := client.ToQueryRequest(model.TimeFromUnix(0), model.TimeFromUnix(200), []*labels.Matcher{matcher})
490+
require.NoError(t, err)
491+
492+
response, err := ing2.Query(ctx, request)
493+
require.NoError(t, err)
494+
assert.Equal(t, expectedResponse, response)
495+
496+
// Check we can send the same sample again to the new ingester and get the same result
497+
req, _ = mockWriteRequest(labels.Labels{{Name: labels.MetricName, Value: "foo"}}, 456, 123000)
498+
_, err = ing2.Push(ctx, req)
499+
require.NoError(t, err)
500+
response, err = ing2.Query(ctx, request)
501+
require.NoError(t, err)
502+
assert.Equal(t, expectedResponse, response)
503+
504+
// Assert the data is in the expected location of dir2
505+
files, err := ioutil.ReadDir(dir2)
506+
require.NoError(t, err)
507+
require.Equal(t, 1, len(files))
508+
require.Equal(t, "1", files[0].Name())
509+
})
443510
}
444-
cfg2.LifecyclerConfig.RingConfig.KVStore.Mock = cfg1.LifecyclerConfig.RingConfig.KVStore.Mock
445-
cfg2.LifecyclerConfig.ID = "ingester2"
446-
cfg2.LifecyclerConfig.Addr = "ingester2"
447-
cfg2.LifecyclerConfig.JoinAfter = 100 * time.Second
448-
ing2, err := New(cfg2, defaultClientTestConfig(), limits, nil, nil)
449-
require.NoError(t, err)
450-
451-
// Let ing2 send blocks/wal to ing1
452-
ing1.cfg.ingesterClientFactory = func(addr string, _ client.Config) (client.HealthAndIngesterClient, error) {
453-
return ingesterClientAdapater{
454-
ingester: ing2,
455-
}, nil
456-
}
457-
458-
// Now stop the first ingester, and wait for the second ingester to become ACTIVE.
459-
ing1.Shutdown()
460-
test.Poll(t, 10*time.Second, ring.ACTIVE, func() interface{} {
461-
return ing2.lifecycler.GetState()
462-
})
463-
464-
// And check the second ingester has the sample
465-
matcher, err := labels.NewMatcher(labels.MatchEqual, model.MetricNameLabel, "foo")
466-
require.NoError(t, err)
467-
468-
request, err := client.ToQueryRequest(model.TimeFromUnix(0), model.TimeFromUnix(200), []*labels.Matcher{matcher})
469-
require.NoError(t, err)
470-
471-
response, err := ing2.Query(ctx, request)
472-
require.NoError(t, err)
473-
assert.Equal(t, expectedResponse, response)
474-
475-
// Check we can send the same sample again to the new ingester and get the same result
476-
req, _ = mockWriteRequest(labels.Labels{{Name: labels.MetricName, Value: "foo"}}, 456, 123000)
477-
_, err = ing2.Push(ctx, req)
478-
require.NoError(t, err)
479-
response, err = ing2.Query(ctx, request)
480-
require.NoError(t, err)
481-
assert.Equal(t, expectedResponse, response)
482-
483-
// Assert the data is in the expected location of dir2
484-
files, err := ioutil.ReadDir(dir2)
485-
require.NoError(t, err)
486-
require.Equal(t, 1, len(files))
487-
require.Equal(t, "1", files[0].Name())
488511
}

0 commit comments

Comments
 (0)