Skip to content

Commit 750d935

Browse files
author
Ganesh Vernekar
committed
Flusher target to flush WAL
Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>
1 parent 4685736 commit 750d935

File tree

18 files changed

+498
-155
lines changed

18 files changed

+498
-155
lines changed

CHANGELOG.md

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,10 @@
88
* `--store.min-chunk-age` has been removed
99
* `--querier.query-store-after` has been added in it's place.
1010
* [FEATURE] Added user sub rings to distribute users to a subset of ingesters. #1947
11-
* `--experimental.distributor.user-subring-size`
11+
* [FEATURE] Flusher target to flush the WAL.
12+
* `-flusher.wal-dir` for the WAL directory to recover from.
13+
* `-flusher.concurrent-flushes` for number of concurrent flushes.
14+
* `-flusher.flush-op-timeout` is duration after which a flush should timeout.
1215
* [ENHANCEMENT] Experimental TSDB: Export TSDB Syncer metrics from Compactor component, they are prefixed with `cortex_compactor_`. #2023
1316
* [ENHANCEMENT] Experimental TSDB: Added dedicated flag `-experimental.tsdb.bucket-store.tenant-sync-concurrency` to configure the maximum number of concurrent tenants for which blocks are synched. #2026
1417
* [ENHANCEMENT] Experimental TSDB: Expose metrics for objstore operations (prefixed with `cortex_<component>_thanos_objstore_`, component being one of `ingester`, `querier` and `compactor`). #2027

cmd/cortex/main.go

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"math/rand"
88
"os"
99
"runtime"
10+
"sync"
1011
"time"
1112

1213
"github.com/go-kit/kit/log/level"
@@ -99,12 +100,26 @@ func main() {
99100

100101
level.Info(util.Logger).Log("msg", "Starting Cortex", "version", version.Info())
101102

102-
if err := t.Run(); err != nil {
103-
level.Error(util.Logger).Log("msg", "error running Cortex", "err", err)
103+
var wg sync.WaitGroup
104+
wg.Add(1)
105+
go func() {
106+
defer wg.Done()
107+
if err := t.Run(); err != nil {
108+
level.Error(util.Logger).Log("msg", "error running Cortex", "err", err)
109+
}
110+
}()
111+
112+
if cfg.Target.IsJob() {
113+
err = t.Stop()
114+
}
115+
116+
wg.Wait()
117+
118+
if !cfg.Target.IsJob() {
119+
err = t.Stop()
104120
}
105121

106122
runtime.KeepAlive(ballast)
107-
err = t.Stop()
108123
util.CheckFatal("initializing cortex", err)
109124
}
110125

docs/configuration/arguments.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -323,6 +323,17 @@ It also talks to a KVStore and has it's own copies of the same flags used by the
323323
- `-ingester.recover-from-wal`
324324
Set this to `true` to recover data from an existing WAL. The data is recovered even if WAL is disabled and this is set to `true`. The WAL dir needs to be set for this.
325325

326+
#### Flusher
327+
328+
- `-flusher.wal-dir`
329+
Directory where the WAL data should be recovered from.
330+
331+
- `-flusher.concurrent-flushes`
332+
Number of concurrent flushes.
333+
334+
- `-flusher.flush-op-timeout`
335+
Duration after which a flush should timeout.
336+
326337
## Runtime Configuration file
327338

328339
Cortex has a concept of "runtime config" file, which is simply a file that is reloaded while Cortex is running. It is used by some Cortex components to allow operator to change some aspects of Cortex configuration without restarting it. File is specified by using `-runtime-config.file=<filename>` flag and reload period (which defaults to 10 seconds) can be changed by `-runtime-config.reload-period=<duration>` flag. Previously this mechanism was only used by limits overrides, and flags were called `-limits.per-user-override-config=<filename>` and `-limits.per-user-override-period=10s` respectively. These are still used, if `-runtime-config.file=<filename>` is not specified.

docs/configuration/config-file-reference.md

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,19 @@ Supported contents and default values of the config file:
5353
# The ingester_config configures the Cortex ingester.
5454
[ingester: <ingester_config>]
5555

56+
flusher:
57+
# Directory to read WAL from.
58+
# CLI flag: -flusher.wal-dir
59+
[wal_dir: <string> | default = "wal"]
60+
61+
# Number of concurrent goroutines flushing to dynamodb.
62+
# CLI flag: -flusher.concurrent-flushes
63+
[concurrent_flushes: <int> | default = 50]
64+
65+
# Timeout for individual flush operations.
66+
# CLI flag: -flusher.flush-op-timeout
67+
[flush_op_timeout: <duration> | default = 2m0s]
68+
5669
# The storage_config configures where Cortex stores the data (chunks storage
5770
# engine).
5871
[storage: <storage_config>]

pkg/cortex/cortex.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
config_client "github.com/cortexproject/cortex/pkg/configs/client"
2525
"github.com/cortexproject/cortex/pkg/configs/db"
2626
"github.com/cortexproject/cortex/pkg/distributor"
27+
"github.com/cortexproject/cortex/pkg/flusher"
2728
"github.com/cortexproject/cortex/pkg/ingester"
2829
"github.com/cortexproject/cortex/pkg/ingester/client"
2930
"github.com/cortexproject/cortex/pkg/querier"
@@ -66,6 +67,7 @@ type Config struct {
6667
Querier querier.Config `yaml:"querier,omitempty"`
6768
IngesterClient client.Config `yaml:"ingester_client,omitempty"`
6869
Ingester ingester.Config `yaml:"ingester,omitempty"`
70+
Flusher flusher.Config `yaml:"flusher,omitempty"`
6971
Storage storage.Config `yaml:"storage,omitempty"`
7072
ChunkStore chunk.StoreConfig `yaml:"chunk_store,omitempty"`
7173
Schema chunk.SchemaConfig `yaml:"schema,omitempty" doc:"hidden"` // Doc generation tool doesn't support it because part of the SchemaConfig doesn't support CLI flags (needs manual documentation)
@@ -101,6 +103,7 @@ func (c *Config) RegisterFlags(f *flag.FlagSet) {
101103
c.Querier.RegisterFlags(f)
102104
c.IngesterClient.RegisterFlags(f)
103105
c.Ingester.RegisterFlags(f)
106+
c.Flusher.RegisterFlags(f)
104107
c.Storage.RegisterFlags(f)
105108
c.ChunkStore.RegisterFlags(f)
106109
c.Schema.RegisterFlags(f)
@@ -164,6 +167,7 @@ type Cortex struct {
164167
overrides *validation.Overrides
165168
distributor *distributor.Distributor
166169
ingester *ingester.Ingester
170+
flusher *flusher.Flusher
167171
store chunk.Store
168172
worker frontend.Worker
169173
frontend *frontend.Frontend

pkg/cortex/modules.go

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"github.com/cortexproject/cortex/pkg/configs/api"
2626
"github.com/cortexproject/cortex/pkg/configs/db"
2727
"github.com/cortexproject/cortex/pkg/distributor"
28+
"github.com/cortexproject/cortex/pkg/flusher"
2829
"github.com/cortexproject/cortex/pkg/ingester"
2930
"github.com/cortexproject/cortex/pkg/ingester/client"
3031
"github.com/cortexproject/cortex/pkg/querier"
@@ -47,6 +48,7 @@ const (
4748
Server
4849
Distributor
4950
Ingester
51+
Flusher
5052
Querier
5153
QueryFrontend
5254
Store
@@ -74,6 +76,8 @@ func (m moduleName) String() string {
7476
return "store"
7577
case Ingester:
7678
return "ingester"
79+
case Flusher:
80+
return "flusher"
7781
case Querier:
7882
return "querier"
7983
case QueryFrontend:
@@ -115,6 +119,9 @@ func (m *moduleName) Set(s string) error {
115119
case "ingester":
116120
*m = Ingester
117121
return nil
122+
case "flusher":
123+
*m = Flusher
124+
return nil
118125
case "querier":
119126
*m = Querier
120127
return nil
@@ -156,6 +163,15 @@ func (m *moduleName) UnmarshalYAML(unmarshal func(interface{}) error) error {
156163
return m.Set(s)
157164
}
158165

166+
func (m moduleName) IsJob() bool {
167+
switch m {
168+
case Flusher:
169+
return true
170+
}
171+
172+
return false
173+
}
174+
159175
func (t *Cortex) initServer(cfg *Config) (err error) {
160176
t.server, err = server.New(cfg.Server)
161177
return
@@ -311,6 +327,28 @@ func (t *Cortex) stopIngester() error {
311327
return nil
312328
}
313329

330+
func (t *Cortex) initFlusher(cfg *Config) (err error) {
331+
// By the end of this call, the chunks should be recovered
332+
// from the WAL and flushed.
333+
t.flusher, err = flusher.New(
334+
cfg.Flusher,
335+
cfg.Ingester,
336+
cfg.IngesterClient,
337+
t.store,
338+
prometheus.DefaultRegisterer,
339+
)
340+
if err != nil {
341+
return
342+
}
343+
344+
t.server.HTTP.Path("/ready").Handler(http.HandlerFunc(t.flusher.ReadinessHandler))
345+
return
346+
}
347+
348+
func (t *Cortex) stopFlusher() error {
349+
return t.flusher.Close()
350+
}
351+
314352
func (t *Cortex) initStore(cfg *Config) (err error) {
315353
if cfg.Storage.Engine == storage.StorageEngineTSDB {
316354
return nil
@@ -520,6 +558,12 @@ var modules = map[moduleName]module{
520558
stop: (*Cortex).stopIngester,
521559
},
522560

561+
Flusher: {
562+
deps: []moduleName{Store, Server},
563+
init: (*Cortex).initFlusher,
564+
stop: (*Cortex).stopFlusher,
565+
},
566+
523567
Querier: {
524568
deps: []moduleName{Distributor, Store, Ring, Server},
525569
init: (*Cortex).initQuerier,

0 commit comments

Comments
 (0)