Skip to content

Commit 58ef607

Browse files
authored
Add support to horizontally scale blocks compactor (cortexproject#2113)
* Added sharding support to compactor Signed-off-by: Marco Pracucci <marco@pracucci.com>
1 parent 5292538 commit 58ef607

File tree

12 files changed

+648
-44
lines changed

12 files changed

+648
-44
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
* [FEATURE] Added user sub rings to distribute users to a subset of ingesters. #1947
1111
* `--experimental.distributor.user-subring-size`
1212
* [FEATURE] Added flag `-experimental.ruler.enable-api` to enable the ruler api which implements the Prometheus API `/api/v1/rules` and `/api/v1/alerts` endpoints under the configured `-http.prefix`. #1999
13+
* [FEATURE] Added sharding support to compactor when using the experimental TSDB blocks storage. #2113
1314
* [ENHANCEMENT] Experimental TSDB: Export TSDB Syncer metrics from Compactor component, they are prefixed with `cortex_compactor_`. #2023
1415
* [ENHANCEMENT] Experimental TSDB: Added dedicated flag `-experimental.tsdb.bucket-store.tenant-sync-concurrency` to configure the maximum number of concurrent tenants for which blocks are synched. #2026
1516
* [ENHANCEMENT] Experimental TSDB: Expose metrics for objstore operations (prefixed with `cortex_<component>_thanos_objstore_`, component being one of `ingester`, `querier` and `compactor`). #2027

development/tsdb-blocks-storage-s3/config/cortex.yaml

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,3 +55,14 @@ ruler:
5555

5656
storage:
5757
engine: tsdb
58+
59+
compactor:
60+
compaction_interval: 30s
61+
data_dir: /tmp/cortex-compactor
62+
consistency_delay: 1m
63+
sharding_enabled: true
64+
sharding_ring:
65+
kvstore:
66+
store: consul
67+
consul:
68+
host: consul:8500

development/tsdb-blocks-storage-s3/docker-compose.yml

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,3 +94,17 @@ services:
9494
- 8005:8005
9595
volumes:
9696
- ./config:/cortex/config
97+
98+
compactor:
99+
build:
100+
context: .
101+
dockerfile: dev.dockerfile
102+
image: cortex
103+
command: ["sh", "-c", "sleep 3 && exec ./cortex -config.file=./config/cortex.yaml -target=compactor -server.http-listen-port=8006 -server.grpc-listen-port=9006"]
104+
depends_on:
105+
- consul
106+
- minio
107+
ports:
108+
- 8006:8006
109+
volumes:
110+
- ./config:/cortex/config

docs/operations/blocks-storage.md

Lines changed: 69 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ The blocks chunks and the entire index is never fully downloaded by the queriers
4141

4242
The index header is also stored to the local disk, in order to avoid to re-download it on subsequent restarts of a querier. For this reason, it's recommended - but not required - to run the querier with a persistent local disk. For example, if you're running the Cortex cluster in Kubernetes, you may use a StatefulSet with a persistent volume claim for the queriers.
4343

44-
### Sharding and Replication
44+
### Series sharding and replication
4545

4646
The series sharding and replication doesn't change based on the storage engine, so the general overview provided by the "[Cortex architecture](../architecture.md)" documentation applies to the blocks storage as well.
4747

@@ -60,6 +60,17 @@ The **horizontal compaction** triggers after the vertical compaction and compact
6060

6161
The compactor is **stateless**.
6262

63+
#### Compactor sharding
64+
65+
The compactor optionally supports sharding. When sharding is enabled, multiple compactor instances can coordinate to split the workload and shard blocks by tenant. All the blocks of a tenant are processed by a single compactor instance at any given time, but compaction for different tenants may simultaneously run on different compactor instances.
66+
67+
Whenever the pool of compactors increase or decrease (ie. following up a scale up/down), tenants are resharded across the available compactor instances without any manual intervention. Compactors coordinate via the Cortex [hash ring](../architecture.md#the-hash-ring).
68+
69+
#### Compactor HTTP endpoints
70+
71+
- `GET /compactor_ring`<br />
72+
Displays the status of the compactors ring, including the tokens owned by each compactor and an option to remove (forget) instances from the ring.
73+
6374
## Configuration
6475

6576
The general [configuration documentation](../configuration/_index.md) also applied to a Cortex cluster running the blocks storage, with few differences:
@@ -251,14 +262,67 @@ compactor:
251262
# interval.
252263
# CLI flag: -compactor.compaction-retries
253264
[compaction_retries: <int> | default = 3]
265+
266+
# Shard tenants across multiple compactor instances. Sharding is required if
267+
# you run multiple compactor instances, in order to coordinate compactions
268+
# and avoid race conditions leading to the same tenant blocks simultaneously
269+
# compacted by different instances.
270+
# CLI flag: -compactor.sharding-enabled
271+
[sharding_enabled: <bool> | default = false]
272+
273+
# Configures the ring used when sharding is enabled.
274+
sharding_ring:
275+
kvstore:
276+
# Backend storage to use for the ring. Supported values are: consul, etcd,
277+
# inmemory, multi, memberlist (experimental).
278+
# CLI flag: -compactor.ring.store
279+
[store: <string> | default = "consul"]
280+
281+
# The prefix for the keys in the store. Should end with a /.
282+
# CLI flag: -compactor.ring.prefix
283+
[prefix: <string> | default = "collectors/"]
284+
285+
# The consul_config configures the consul client.
286+
# The CLI flags prefix for this block config is: compactor.ring
287+
[consul: <consul_config>]
288+
289+
# The etcd_config configures the etcd client.
290+
# The CLI flags prefix for this block config is: compactor.ring
291+
[etcd: <etcd_config>]
292+
293+
# The memberlist_config configures the Gossip memberlist.
294+
# The CLI flags prefix for this block config is: compactor.ring
295+
[memberlist: <memberlist_config>]
296+
297+
multi:
298+
# Primary backend storage used by multi-client.
299+
# CLI flag: -compactor.ring.multi.primary
300+
[primary: <string> | default = ""]
301+
302+
# Secondary backend storage used by multi-client.
303+
# CLI flag: -compactor.ring.multi.secondary
304+
[secondary: <string> | default = ""]
305+
306+
# Mirror writes to secondary store.
307+
# CLI flag: -compactor.ring.multi.mirror-enabled
308+
[mirror_enabled: <boolean> | default = false]
309+
310+
# Timeout for storing value to secondary store.
311+
# CLI flag: -compactor.ring.multi.mirror-timeout
312+
[mirror_timeout: <duration> | default = 2s]
313+
314+
# Period at which to heartbeat to the ring.
315+
# CLI flag: -compactor.ring.heartbeat-period
316+
[heartbeat_period: <duration> | default = 5m]
317+
318+
# The heartbeat timeout after which compactors are considered unhealthy
319+
# within the ring.
320+
# CLI flag: -compactor.ring.heartbeat-timeout
321+
[heartbeat_timeout: <duration> | default = 1m]
254322
```
255323

256324
## Known issues
257325

258-
### Horizontal scalability
259-
260-
The compactor currently doesn't support horizontal scalability and only 1 replica of the compactor should run at any given time within a Cortex cluster.
261-
262326
### Migrating from the chunks to the blocks storage
263327

264328
Currently, no smooth migration path is provided to migrate from chunks to blocks storage. For this reason, the blocks storage can only be enabled in new Cortex clusters.

pkg/compactor/compactor.go

Lines changed: 116 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"flag"
66
"fmt"
7+
"hash/fnv"
78
"path"
89
"strings"
910
"sync"
@@ -19,6 +20,7 @@ import (
1920
"github.com/thanos-io/thanos/pkg/compact/downsample"
2021
"github.com/thanos-io/thanos/pkg/objstore"
2122

23+
"github.com/cortexproject/cortex/pkg/ring"
2224
cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb"
2325
"github.com/cortexproject/cortex/pkg/util"
2426
)
@@ -33,6 +35,10 @@ type Config struct {
3335
CompactionInterval time.Duration `yaml:"compaction_interval"`
3436
CompactionRetries int `yaml:"compaction_retries"`
3537

38+
// Compactors sharding.
39+
ShardingEnabled bool `yaml:"sharding_enabled"`
40+
ShardingRing RingConfig `yaml:"sharding_ring"`
41+
3642
// No need to add options to customize the retry backoff,
3743
// given the defaults should be fine, but allow to override
3844
// it in tests.
@@ -42,6 +48,8 @@ type Config struct {
4248

4349
// RegisterFlags registers the Compactor flags.
4450
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
51+
cfg.ShardingRing.RegisterFlags(f)
52+
4553
cfg.BlockRanges = cortex_tsdb.DurationList{2 * time.Hour, 12 * time.Hour, 24 * time.Hour}
4654
cfg.retryMinBackoff = 10 * time.Second
4755
cfg.retryMaxBackoff = time.Minute
@@ -53,6 +61,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
5361
f.StringVar(&cfg.DataDir, "compactor.data-dir", "./data", "Data directory in which to cache blocks and process compactions")
5462
f.DurationVar(&cfg.CompactionInterval, "compactor.compaction-interval", time.Hour, "The frequency at which the compaction runs")
5563
f.IntVar(&cfg.CompactionRetries, "compactor.compaction-retries", 3, "How many times to retry a failed compaction during a single compaction interval")
64+
f.BoolVar(&cfg.ShardingEnabled, "compactor.sharding-enabled", false, "Shard tenants across multiple compactor instances. Sharding is required if you run multiple compactor instances, in order to coordinate compactions and avoid race conditions leading to the same tenant blocks simultaneously compacted by different instances.")
5665
}
5766

5867
// Compactor is a multi-tenant TSDB blocks compactor based on Thanos.
@@ -75,6 +84,10 @@ type Compactor struct {
7584
ctx context.Context
7685
cancelCtx context.CancelFunc
7786

87+
// Ring used for sharding compactions.
88+
ringLifecycler *ring.Lifecycler
89+
ring *ring.Ring
90+
7891
// Metrics.
7992
compactionRunsStarted prometheus.Counter
8093
compactionRunsCompleted prometheus.Counter
@@ -104,7 +117,13 @@ func NewCompactor(compactorCfg Config, storageCfg cortex_tsdb.Config, logger log
104117
return nil, errors.Wrap(err, "failed to create TSDB compactor")
105118
}
106119

107-
return newCompactor(ctx, cancelCtx, compactorCfg, storageCfg, bucketClient, tsdbCompactor, logger, registerer)
120+
cortexCompactor, err := newCompactor(ctx, cancelCtx, compactorCfg, storageCfg, bucketClient, tsdbCompactor, logger, registerer)
121+
if err != nil {
122+
cancelCtx()
123+
return nil, errors.Wrap(err, "failed to create Cortex blocks compactor")
124+
}
125+
126+
return cortexCompactor, nil
108127
}
109128

110129
func newCompactor(
@@ -139,29 +158,70 @@ func newCompactor(
139158
}),
140159
}
141160

161+
// Initialize the compactors ring if sharding is enabled.
162+
if compactorCfg.ShardingEnabled {
163+
lifecyclerCfg := compactorCfg.ShardingRing.ToLifecyclerConfig()
164+
lifecycler, err := ring.NewLifecycler(lifecyclerCfg, ring.NewNoopFlushTransferer(), "compactor", ring.CompactorRingKey, false)
165+
if err != nil {
166+
return nil, errors.Wrap(err, "unable to initialize compactor ring lifecycler")
167+
}
168+
169+
lifecycler.Start()
170+
c.ringLifecycler = lifecycler
171+
172+
ring, err := ring.New(lifecyclerCfg.RingConfig, "compactor", ring.CompactorRingKey)
173+
if err != nil {
174+
return nil, errors.Wrap(err, "unable to initialize compactor ring")
175+
}
176+
177+
c.ring = ring
178+
}
179+
142180
// Register metrics.
143181
if registerer != nil {
144182
registerer.MustRegister(c.compactionRunsStarted, c.compactionRunsCompleted, c.compactionRunsFailed)
145183
c.syncerMetrics = newSyncerMetrics(registerer)
146184
}
147185

186+
return c, nil
187+
}
188+
189+
// Start the compactor.
190+
func (c *Compactor) Start() {
148191
// Start the compactor loop.
149192
c.runner.Add(1)
150193
go c.run()
151-
152-
return c, nil
153194
}
154195

155-
// Shutdown the compactor and waits until done. This may take some time
196+
// Stop the compactor and waits until done. This may take some time
156197
// if there's a on-going compaction.
157-
func (c *Compactor) Shutdown() {
198+
func (c *Compactor) Stop() {
158199
c.cancelCtx()
159200
c.runner.Wait()
201+
202+
// Shutdown the ring lifecycler (if any)
203+
if c.ringLifecycler != nil {
204+
c.ringLifecycler.Shutdown()
205+
}
206+
207+
if c.ring != nil {
208+
c.ring.Stop()
209+
}
160210
}
161211

162212
func (c *Compactor) run() {
163213
defer c.runner.Done()
164214

215+
// If sharding is enabled we should wait until this instance is
216+
// ACTIVE within the ring.
217+
if c.compactorCfg.ShardingEnabled {
218+
level.Info(c.logger).Log("msg", "waiting until compactor is ACTIVE in the ring")
219+
if err := c.waitRingActive(); err != nil {
220+
return
221+
}
222+
level.Info(c.logger).Log("msg", "compactor is ACTIVE in the ring")
223+
}
224+
165225
// Run an initial compaction before starting the interval.
166226
c.compactUsersWithRetries(c.ctx)
167227

@@ -215,6 +275,17 @@ func (c *Compactor) compactUsers(ctx context.Context) bool {
215275
return false
216276
}
217277

278+
// If sharding is enabled, ensure the user ID belongs to our shard.
279+
if c.compactorCfg.ShardingEnabled {
280+
if owned, err := c.ownUser(userID); err != nil {
281+
level.Warn(c.logger).Log("msg", "unable to check if user is owned by this shard", "user", userID, "err", err)
282+
continue
283+
} else if !owned {
284+
level.Debug(c.logger).Log("msg", "skipping user because not owned by this shard", "user", userID)
285+
continue
286+
}
287+
}
288+
218289
level.Info(c.logger).Log("msg", "starting compaction of user blocks", "user", userID)
219290

220291
if err = c.compactUser(ctx, userID); err != nil {
@@ -290,3 +361,43 @@ func (c *Compactor) discoverUsers(ctx context.Context) ([]string, error) {
290361

291362
return users, err
292363
}
364+
365+
func (c *Compactor) ownUser(userID string) (bool, error) {
366+
// Hash the user ID.
367+
hasher := fnv.New32a()
368+
_, _ = hasher.Write([]byte(userID))
369+
userHash := hasher.Sum32()
370+
371+
// Check whether this compactor instance owns the user.
372+
rs, err := c.ring.Get(userHash, ring.Read, []ring.IngesterDesc{})
373+
if err != nil {
374+
return false, err
375+
}
376+
377+
if len(rs.Ingesters) != 1 {
378+
return false, fmt.Errorf("unexpected number of compactors in the shard (expected 1, got %d)", len(rs.Ingesters))
379+
}
380+
381+
return rs.Ingesters[0].Addr == c.ringLifecycler.Addr, nil
382+
}
383+
384+
func (c *Compactor) waitRingActive() error {
385+
for {
386+
// Check if the ingester is ACTIVE in the ring and our ring client
387+
// has detected it.
388+
if rs, err := c.ring.GetAll(); err == nil {
389+
for _, i := range rs.Ingesters {
390+
if i.GetAddr() == c.ringLifecycler.Addr && i.GetState() == ring.ACTIVE {
391+
return nil
392+
}
393+
}
394+
}
395+
396+
select {
397+
case <-time.After(time.Second):
398+
// Nothing to do
399+
case <-c.ctx.Done():
400+
return c.ctx.Err()
401+
}
402+
}
403+
}

pkg/compactor/compactor_http.go

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
package compactor
2+
3+
import (
4+
"net/http"
5+
6+
"github.com/go-kit/kit/log/level"
7+
)
8+
9+
const (
10+
shardingDisabledPage = `
11+
<!DOCTYPE html>
12+
<html>
13+
<head>
14+
<meta charset="UTF-8">
15+
<title>Cortex Compactor Ring</title>
16+
</head>
17+
<body>
18+
<h1>Cortex Compactor Ring</h1>
19+
<p>Compactor has no ring because sharding is disabled.</p>
20+
</body>
21+
</html>
22+
`
23+
)
24+
25+
func (c *Compactor) RingHandler(w http.ResponseWriter, req *http.Request) {
26+
if c.compactorCfg.ShardingEnabled {
27+
c.ring.ServeHTTP(w, req)
28+
return
29+
}
30+
31+
w.WriteHeader(http.StatusOK)
32+
if _, err := w.Write([]byte(shardingDisabledPage)); err != nil {
33+
level.Error(c.logger).Log("msg", "unable to serve compactor ring page", "err", err)
34+
}
35+
}

0 commit comments

Comments
 (0)