Skip to content

Ingester can now flush only specified users. #4073

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Apr 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
* [ENHANCEMENT] Ingester: added following per-ingester (instance) limits: max number of series in memory (`-ingester.instance-limits.max-series`), max number of users in memory (`-ingester.instance-limits.max-tenants`), max ingestion rate (`-ingester.instance-limits.max-ingestion-rate`), and max inflight requests (`-ingester.instance-limits.max-inflight-push-requests`). These limits are only used when using blocks storage. Limits can also be configured using runtime-config feature, and current values are exported as `cortex_ingester_instance_limits` metric. #3992.
* [ENHANCEMENT] Cortex is now built with Go 1.16. #4062
* [ENHANCEMENT] Ruler: Added `-ruler.enabled-tenants` and `-ruler.disabled-tenants` to explicitly enable or disable rules processing for specific tenants. #4074
* [ENHANCEMENT] Block Storage Ingester: `/flush` now accepts two new parameters: `tenant` to specify tenant to flush and `wait=true` to make call synchronous. Multiple tenants can be specified by repeating `tenant` parameter. If no `tenant` is specified, all tenants are flushed, as before. #4073
* [BUGFIX] Ruler-API: fix bug where `/api/v1/rules/<namespace>/<group_name>` endpoint return `400` instead of `404`. #4013
* [BUGFIX] Distributor: reverted changes done to rate limiting in #3825. #3948
* [BUGFIX] Ingester: Fix race condition when opening and closing tsdb concurrently. #3959
Expand Down
4 changes: 4 additions & 0 deletions docs/api/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,10 @@ GET,POST /flush

Triggers a flush of the in-memory time series data (chunks or blocks) to the long-term storage. This endpoint triggers the flush also when `-ingester.flush-on-shutdown-with-wal-enabled` or `-blocks-storage.tsdb.flush-blocks-on-shutdown` are disabled.

When using blocks storage, this endpoint accepts `tenant` parameter to specify tenant whose blocks are compacted and shipped. This parameter may be specified multiple times to select more tenants. If no tenant is specified, all tenants are flushed.

Flush endpoint now also accepts `wait=true` parameter, which makes the call synchronous – it will only return after flushing has finished. Note that returned status code does not reflect the result of flush operation. This parameter is only available when using blocks storage.

### Shutdown

```
Expand Down
96 changes: 62 additions & 34 deletions pkg/ingester/ingester_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,8 +387,8 @@ type TSDBState struct {

tsdbMetrics *tsdbMetrics

forceCompactTrigger chan chan<- struct{}
shipTrigger chan chan<- struct{}
forceCompactTrigger chan requestWithUsersAndCallback
shipTrigger chan requestWithUsersAndCallback

// Timeout chosen for idle compactions.
compactionIdleTimeout time.Duration
Expand All @@ -405,6 +405,11 @@ type TSDBState struct {
idleTsdbChecks *prometheus.CounterVec
}

type requestWithUsersAndCallback struct {
users *util.AllowedTenants // if nil, all tenants are allowed.
callback chan<- struct{} // when compaction/shipping is finished, this channel is closed
}

func newTSDBState(bucketClient objstore.Bucket, registerer prometheus.Registerer) TSDBState {
idleTsdbChecks := promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_ingester_idle_tsdb_checks_total",
Expand All @@ -426,8 +431,8 @@ func newTSDBState(bucketClient objstore.Bucket, registerer prometheus.Registerer
dbs: make(map[string]*userTSDB),
bucket: bucketClient,
tsdbMetrics: newTSDBMetrics(registerer),
forceCompactTrigger: make(chan chan<- struct{}),
shipTrigger: make(chan chan<- struct{}),
forceCompactTrigger: make(chan requestWithUsersAndCallback),
shipTrigger: make(chan requestWithUsersAndCallback),

compactionsTriggered: promauto.With(registerer).NewCounter(prometheus.CounterOpts{
Name: "cortex_ingester_tsdb_compactions_triggered_total",
Expand Down Expand Up @@ -1707,24 +1712,20 @@ func (i *Ingester) shipBlocksLoop(ctx context.Context) error {
for {
select {
case <-shipTicker.C:
i.shipBlocks(ctx)
i.shipBlocks(ctx, nil)

case ch := <-i.TSDBState.shipTrigger:
i.shipBlocks(ctx)

// Notify back.
select {
case ch <- struct{}{}:
default: // Nobody is waiting for notification, don't block this loop.
}
case req := <-i.TSDBState.shipTrigger:
i.shipBlocks(ctx, req.users)
close(req.callback) // Notify back.

case <-ctx.Done():
return nil
}
}
}

func (i *Ingester) shipBlocks(ctx context.Context) {
// shipBlocks runs shipping for all users.
func (i *Ingester) shipBlocks(ctx context.Context, allowed *util.AllowedTenants) {
// Do not ship blocks if the ingester is PENDING or JOINING. It's
// particularly important for the JOINING state because there could
// be a blocks transfer in progress (from another ingester) and if we
Expand All @@ -1739,6 +1740,10 @@ func (i *Ingester) shipBlocks(ctx context.Context) {
// Number of concurrent workers is limited in order to avoid to concurrently sync a lot
// of tenants in a large cluster.
_ = concurrency.ForEachUser(ctx, i.getTSDBUsers(), i.cfg.BlocksStorageConfig.TSDB.ShipConcurrency, func(ctx context.Context, userID string) error {
if !allowed.IsAllowed(userID) {
return nil
}

// Get the user's DB. If the user doesn't exist, we skip it.
userDB := i.getTSDB(userID)
if userDB == nil || userDB.shipper == nil {
Expand Down Expand Up @@ -1803,16 +1808,11 @@ func (i *Ingester) compactionLoop(ctx context.Context) error {
for ctx.Err() == nil {
select {
case <-ticker.C:
i.compactBlocks(ctx, false)

case ch := <-i.TSDBState.forceCompactTrigger:
i.compactBlocks(ctx, true)
i.compactBlocks(ctx, false, nil)

// Notify back.
select {
case ch <- struct{}{}:
default: // Nobody is waiting for notification, don't block this loop.
}
case req := <-i.TSDBState.forceCompactTrigger:
i.compactBlocks(ctx, true, req.users)
close(req.callback) // Notify back.

case <-ctx.Done():
return nil
Expand All @@ -1822,7 +1822,7 @@ func (i *Ingester) compactionLoop(ctx context.Context) error {
}

// Compacts all compactable blocks. Force flag will force compaction even if head is not compactable yet.
func (i *Ingester) compactBlocks(ctx context.Context, force bool) {
func (i *Ingester) compactBlocks(ctx context.Context, force bool, allowed *util.AllowedTenants) {
// Don't compact TSDB blocks while JOINING as there may be ongoing blocks transfers.
// Compaction loop is not running in LEAVING state, so if we get here in LEAVING state, we're flushing blocks.
if i.lifecycler != nil {
Expand All @@ -1833,6 +1833,10 @@ func (i *Ingester) compactBlocks(ctx context.Context, force bool) {
}

_ = concurrency.ForEachUser(ctx, i.getTSDBUsers(), i.cfg.BlocksStorageConfig.TSDB.HeadCompactionConcurrency, func(ctx context.Context, userID string) error {
if !allowed.IsAllowed(userID) {
return nil
}

userDB := i.getTSDB(userID)
if userDB == nil {
return nil
Expand Down Expand Up @@ -1982,28 +1986,43 @@ func (i *Ingester) v2LifecyclerFlush() {

ctx := context.Background()

i.compactBlocks(ctx, true)
i.compactBlocks(ctx, true, nil)
if i.cfg.BlocksStorageConfig.TSDB.IsBlocksShippingEnabled() {
i.shipBlocks(ctx)
i.shipBlocks(ctx, nil)
}

level.Info(i.logger).Log("msg", "finished flushing and shipping TSDB blocks")
}

const (
tenantParam = "tenant"
waitParam = "wait"
)

// Blocks version of Flush handler. It force-compacts blocks, and triggers shipping.
func (i *Ingester) v2FlushHandler(w http.ResponseWriter, _ *http.Request) {
go func() {
func (i *Ingester) v2FlushHandler(w http.ResponseWriter, r *http.Request) {
err := r.ParseForm()
if err != nil {
level.Warn(i.logger).Log("msg", "failed to parse HTTP request in flush handler", "err", err)
w.WriteHeader(http.StatusBadRequest)
return
}

tenants := r.Form[tenantParam]

allowedUsers := util.NewAllowedTenants(tenants, nil)
run := func() {
ingCtx := i.BasicService.ServiceContext()
if ingCtx == nil || ingCtx.Err() != nil {
level.Info(i.logger).Log("msg", "flushing TSDB blocks: ingester not running, ignoring flush request")
return
}

ch := make(chan struct{}, 1)
compactionCallbackCh := make(chan struct{})

level.Info(i.logger).Log("msg", "flushing TSDB blocks: triggering compaction")
select {
case i.TSDBState.forceCompactTrigger <- ch:
case i.TSDBState.forceCompactTrigger <- requestWithUsersAndCallback{users: allowedUsers, callback: compactionCallbackCh}:
// Compacting now.
case <-ingCtx.Done():
level.Warn(i.logger).Log("msg", "failed to compact TSDB blocks, ingester not running anymore")
Expand All @@ -2012,18 +2031,20 @@ func (i *Ingester) v2FlushHandler(w http.ResponseWriter, _ *http.Request) {

// Wait until notified about compaction being finished.
select {
case <-ch:
case <-compactionCallbackCh:
level.Info(i.logger).Log("msg", "finished compacting TSDB blocks")
case <-ingCtx.Done():
level.Warn(i.logger).Log("msg", "failed to compact TSDB blocks, ingester not running anymore")
return
}

if i.cfg.BlocksStorageConfig.TSDB.IsBlocksShippingEnabled() {
shippingCallbackCh := make(chan struct{}) // must be new channel, as compactionCallbackCh is closed now.

level.Info(i.logger).Log("msg", "flushing TSDB blocks: triggering shipping")

select {
case i.TSDBState.shipTrigger <- ch:
case i.TSDBState.shipTrigger <- requestWithUsersAndCallback{users: allowedUsers, callback: shippingCallbackCh}:
// shipping now
case <-ingCtx.Done():
level.Warn(i.logger).Log("msg", "failed to ship TSDB blocks, ingester not running anymore")
Expand All @@ -2032,7 +2053,7 @@ func (i *Ingester) v2FlushHandler(w http.ResponseWriter, _ *http.Request) {

// Wait until shipping finished.
select {
case <-ch:
case <-shippingCallbackCh:
level.Info(i.logger).Log("msg", "shipping of TSDB blocks finished")
case <-ingCtx.Done():
level.Warn(i.logger).Log("msg", "failed to ship TSDB blocks, ingester not running anymore")
Expand All @@ -2041,7 +2062,14 @@ func (i *Ingester) v2FlushHandler(w http.ResponseWriter, _ *http.Request) {
}

level.Info(i.logger).Log("msg", "flushing TSDB blocks: finished")
}()
}

if len(r.Form[waitParam]) > 0 && r.Form[waitParam][0] == "true" {
// Run synchronously. This simplifies and speeds up tests.
run()
} else {
go run()
}

w.WriteHeader(http.StatusNoContent)
}
Expand Down
Loading