Skip to content

Commit 03c092f

Browse files
authored
Ingester can now flush only specified users. (#4073)
* Ingester can now flush only specified users. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Rename tenant to t, to shorten the query string. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * CHANGELOG.md and api updates. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Remove nil/empty distinction. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Review feedback. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Fix documentation. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Use allowed tenants for passing which tenants to flush. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>
1 parent 8fe0bb1 commit 03c092f

File tree

6 files changed

+146
-70
lines changed

6 files changed

+146
-70
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
* [ENHANCEMENT] Ingester: added following per-ingester (instance) limits: max number of series in memory (`-ingester.instance-limits.max-series`), max number of users in memory (`-ingester.instance-limits.max-tenants`), max ingestion rate (`-ingester.instance-limits.max-ingestion-rate`), and max inflight requests (`-ingester.instance-limits.max-inflight-push-requests`). These limits are only used when using blocks storage. Limits can also be configured using runtime-config feature, and current values are exported as `cortex_ingester_instance_limits` metric. #3992.
4040
* [ENHANCEMENT] Cortex is now built with Go 1.16. #4062
4141
* [ENHANCEMENT] Ruler: Added `-ruler.enabled-tenants` and `-ruler.disabled-tenants` to explicitly enable or disable rules processing for specific tenants. #4074
42+
* [ENHANCEMENT] Block Storage Ingester: `/flush` now accepts two new parameters: `tenant` to specify tenant to flush and `wait=true` to make call synchronous. Multiple tenants can be specified by repeating `tenant` parameter. If no `tenant` is specified, all tenants are flushed, as before. #4073
4243
* [BUGFIX] Ruler-API: fix bug where `/api/v1/rules/<namespace>/<group_name>` endpoint return `400` instead of `404`. #4013
4344
* [BUGFIX] Distributor: reverted changes done to rate limiting in #3825. #3948
4445
* [BUGFIX] Ingester: Fix race condition when opening and closing tsdb concurrently. #3959

docs/api/_index.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -247,6 +247,10 @@ GET,POST /flush
247247

248248
Triggers a flush of the in-memory time series data (chunks or blocks) to the long-term storage. This endpoint triggers the flush also when `-ingester.flush-on-shutdown-with-wal-enabled` or `-blocks-storage.tsdb.flush-blocks-on-shutdown` are disabled.
249249

250+
When using blocks storage, this endpoint accepts `tenant` parameter to specify tenant whose blocks are compacted and shipped. This parameter may be specified multiple times to select more tenants. If no tenant is specified, all tenants are flushed.
251+
252+
Flush endpoint now also accepts `wait=true` parameter, which makes the call synchronous – it will only return after flushing has finished. Note that returned status code does not reflect the result of flush operation. This parameter is only available when using blocks storage.
253+
250254
### Shutdown
251255

252256
```

pkg/ingester/ingester_v2.go

Lines changed: 62 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -387,8 +387,8 @@ type TSDBState struct {
387387

388388
tsdbMetrics *tsdbMetrics
389389

390-
forceCompactTrigger chan chan<- struct{}
391-
shipTrigger chan chan<- struct{}
390+
forceCompactTrigger chan requestWithUsersAndCallback
391+
shipTrigger chan requestWithUsersAndCallback
392392

393393
// Timeout chosen for idle compactions.
394394
compactionIdleTimeout time.Duration
@@ -405,6 +405,11 @@ type TSDBState struct {
405405
idleTsdbChecks *prometheus.CounterVec
406406
}
407407

408+
type requestWithUsersAndCallback struct {
409+
users *util.AllowedTenants // if nil, all tenants are allowed.
410+
callback chan<- struct{} // when compaction/shipping is finished, this channel is closed
411+
}
412+
408413
func newTSDBState(bucketClient objstore.Bucket, registerer prometheus.Registerer) TSDBState {
409414
idleTsdbChecks := promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{
410415
Name: "cortex_ingester_idle_tsdb_checks_total",
@@ -426,8 +431,8 @@ func newTSDBState(bucketClient objstore.Bucket, registerer prometheus.Registerer
426431
dbs: make(map[string]*userTSDB),
427432
bucket: bucketClient,
428433
tsdbMetrics: newTSDBMetrics(registerer),
429-
forceCompactTrigger: make(chan chan<- struct{}),
430-
shipTrigger: make(chan chan<- struct{}),
434+
forceCompactTrigger: make(chan requestWithUsersAndCallback),
435+
shipTrigger: make(chan requestWithUsersAndCallback),
431436

432437
compactionsTriggered: promauto.With(registerer).NewCounter(prometheus.CounterOpts{
433438
Name: "cortex_ingester_tsdb_compactions_triggered_total",
@@ -1707,24 +1712,20 @@ func (i *Ingester) shipBlocksLoop(ctx context.Context) error {
17071712
for {
17081713
select {
17091714
case <-shipTicker.C:
1710-
i.shipBlocks(ctx)
1715+
i.shipBlocks(ctx, nil)
17111716

1712-
case ch := <-i.TSDBState.shipTrigger:
1713-
i.shipBlocks(ctx)
1714-
1715-
// Notify back.
1716-
select {
1717-
case ch <- struct{}{}:
1718-
default: // Nobody is waiting for notification, don't block this loop.
1719-
}
1717+
case req := <-i.TSDBState.shipTrigger:
1718+
i.shipBlocks(ctx, req.users)
1719+
close(req.callback) // Notify back.
17201720

17211721
case <-ctx.Done():
17221722
return nil
17231723
}
17241724
}
17251725
}
17261726

1727-
func (i *Ingester) shipBlocks(ctx context.Context) {
1727+
// shipBlocks runs shipping for all users.
1728+
func (i *Ingester) shipBlocks(ctx context.Context, allowed *util.AllowedTenants) {
17281729
// Do not ship blocks if the ingester is PENDING or JOINING. It's
17291730
// particularly important for the JOINING state because there could
17301731
// be a blocks transfer in progress (from another ingester) and if we
@@ -1739,6 +1740,10 @@ func (i *Ingester) shipBlocks(ctx context.Context) {
17391740
// Number of concurrent workers is limited in order to avoid to concurrently sync a lot
17401741
// of tenants in a large cluster.
17411742
_ = concurrency.ForEachUser(ctx, i.getTSDBUsers(), i.cfg.BlocksStorageConfig.TSDB.ShipConcurrency, func(ctx context.Context, userID string) error {
1743+
if !allowed.IsAllowed(userID) {
1744+
return nil
1745+
}
1746+
17421747
// Get the user's DB. If the user doesn't exist, we skip it.
17431748
userDB := i.getTSDB(userID)
17441749
if userDB == nil || userDB.shipper == nil {
@@ -1803,16 +1808,11 @@ func (i *Ingester) compactionLoop(ctx context.Context) error {
18031808
for ctx.Err() == nil {
18041809
select {
18051810
case <-ticker.C:
1806-
i.compactBlocks(ctx, false)
1807-
1808-
case ch := <-i.TSDBState.forceCompactTrigger:
1809-
i.compactBlocks(ctx, true)
1811+
i.compactBlocks(ctx, false, nil)
18101812

1811-
// Notify back.
1812-
select {
1813-
case ch <- struct{}{}:
1814-
default: // Nobody is waiting for notification, don't block this loop.
1815-
}
1813+
case req := <-i.TSDBState.forceCompactTrigger:
1814+
i.compactBlocks(ctx, true, req.users)
1815+
close(req.callback) // Notify back.
18161816

18171817
case <-ctx.Done():
18181818
return nil
@@ -1822,7 +1822,7 @@ func (i *Ingester) compactionLoop(ctx context.Context) error {
18221822
}
18231823

18241824
// Compacts all compactable blocks. Force flag will force compaction even if head is not compactable yet.
1825-
func (i *Ingester) compactBlocks(ctx context.Context, force bool) {
1825+
func (i *Ingester) compactBlocks(ctx context.Context, force bool, allowed *util.AllowedTenants) {
18261826
// Don't compact TSDB blocks while JOINING as there may be ongoing blocks transfers.
18271827
// Compaction loop is not running in LEAVING state, so if we get here in LEAVING state, we're flushing blocks.
18281828
if i.lifecycler != nil {
@@ -1833,6 +1833,10 @@ func (i *Ingester) compactBlocks(ctx context.Context, force bool) {
18331833
}
18341834

18351835
_ = concurrency.ForEachUser(ctx, i.getTSDBUsers(), i.cfg.BlocksStorageConfig.TSDB.HeadCompactionConcurrency, func(ctx context.Context, userID string) error {
1836+
if !allowed.IsAllowed(userID) {
1837+
return nil
1838+
}
1839+
18361840
userDB := i.getTSDB(userID)
18371841
if userDB == nil {
18381842
return nil
@@ -1982,28 +1986,43 @@ func (i *Ingester) v2LifecyclerFlush() {
19821986

19831987
ctx := context.Background()
19841988

1985-
i.compactBlocks(ctx, true)
1989+
i.compactBlocks(ctx, true, nil)
19861990
if i.cfg.BlocksStorageConfig.TSDB.IsBlocksShippingEnabled() {
1987-
i.shipBlocks(ctx)
1991+
i.shipBlocks(ctx, nil)
19881992
}
19891993

19901994
level.Info(i.logger).Log("msg", "finished flushing and shipping TSDB blocks")
19911995
}
19921996

1997+
const (
1998+
tenantParam = "tenant"
1999+
waitParam = "wait"
2000+
)
2001+
19932002
// Blocks version of Flush handler. It force-compacts blocks, and triggers shipping.
1994-
func (i *Ingester) v2FlushHandler(w http.ResponseWriter, _ *http.Request) {
1995-
go func() {
2003+
func (i *Ingester) v2FlushHandler(w http.ResponseWriter, r *http.Request) {
2004+
err := r.ParseForm()
2005+
if err != nil {
2006+
level.Warn(i.logger).Log("msg", "failed to parse HTTP request in flush handler", "err", err)
2007+
w.WriteHeader(http.StatusBadRequest)
2008+
return
2009+
}
2010+
2011+
tenants := r.Form[tenantParam]
2012+
2013+
allowedUsers := util.NewAllowedTenants(tenants, nil)
2014+
run := func() {
19962015
ingCtx := i.BasicService.ServiceContext()
19972016
if ingCtx == nil || ingCtx.Err() != nil {
19982017
level.Info(i.logger).Log("msg", "flushing TSDB blocks: ingester not running, ignoring flush request")
19992018
return
20002019
}
20012020

2002-
ch := make(chan struct{}, 1)
2021+
compactionCallbackCh := make(chan struct{})
20032022

20042023
level.Info(i.logger).Log("msg", "flushing TSDB blocks: triggering compaction")
20052024
select {
2006-
case i.TSDBState.forceCompactTrigger <- ch:
2025+
case i.TSDBState.forceCompactTrigger <- requestWithUsersAndCallback{users: allowedUsers, callback: compactionCallbackCh}:
20072026
// Compacting now.
20082027
case <-ingCtx.Done():
20092028
level.Warn(i.logger).Log("msg", "failed to compact TSDB blocks, ingester not running anymore")
@@ -2012,18 +2031,20 @@ func (i *Ingester) v2FlushHandler(w http.ResponseWriter, _ *http.Request) {
20122031

20132032
// Wait until notified about compaction being finished.
20142033
select {
2015-
case <-ch:
2034+
case <-compactionCallbackCh:
20162035
level.Info(i.logger).Log("msg", "finished compacting TSDB blocks")
20172036
case <-ingCtx.Done():
20182037
level.Warn(i.logger).Log("msg", "failed to compact TSDB blocks, ingester not running anymore")
20192038
return
20202039
}
20212040

20222041
if i.cfg.BlocksStorageConfig.TSDB.IsBlocksShippingEnabled() {
2042+
shippingCallbackCh := make(chan struct{}) // must be new channel, as compactionCallbackCh is closed now.
2043+
20232044
level.Info(i.logger).Log("msg", "flushing TSDB blocks: triggering shipping")
20242045

20252046
select {
2026-
case i.TSDBState.shipTrigger <- ch:
2047+
case i.TSDBState.shipTrigger <- requestWithUsersAndCallback{users: allowedUsers, callback: shippingCallbackCh}:
20272048
// shipping now
20282049
case <-ingCtx.Done():
20292050
level.Warn(i.logger).Log("msg", "failed to ship TSDB blocks, ingester not running anymore")
@@ -2032,7 +2053,7 @@ func (i *Ingester) v2FlushHandler(w http.ResponseWriter, _ *http.Request) {
20322053

20332054
// Wait until shipping finished.
20342055
select {
2035-
case <-ch:
2056+
case <-shippingCallbackCh:
20362057
level.Info(i.logger).Log("msg", "shipping of TSDB blocks finished")
20372058
case <-ingCtx.Done():
20382059
level.Warn(i.logger).Log("msg", "failed to ship TSDB blocks, ingester not running anymore")
@@ -2041,7 +2062,14 @@ func (i *Ingester) v2FlushHandler(w http.ResponseWriter, _ *http.Request) {
20412062
}
20422063

20432064
level.Info(i.logger).Log("msg", "flushing TSDB blocks: finished")
2044-
}()
2065+
}
2066+
2067+
if len(r.Form[waitParam]) > 0 && r.Form[waitParam][0] == "true" {
2068+
// Run synchronously. This simplifies and speeds up tests.
2069+
run()
2070+
} else {
2071+
go run()
2072+
}
20452073

20462074
w.WriteHeader(http.StatusNoContent)
20472075
}

0 commit comments

Comments
 (0)