Skip to content

Convert Cortex components to services #2166

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 89 commits into from
Mar 4, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
89 commits
Select commit Hold shift + click to select a range
52f564d
First step towards Cortex using services model. Only Server module co…
pstibrany Feb 19, 2020
353cb26
Converted runtimeconfig.Manager to service.
pstibrany Feb 19, 2020
dab13d9
Added /services page.
pstibrany Feb 19, 2020
3a60a3e
Converted memberlistKV to service.
pstibrany Feb 19, 2020
a06feb8
Fixed runtimeconfig tests.
pstibrany Feb 19, 2020
a70c6ad
Converted Ring to service.
pstibrany Feb 19, 2020
2b142f0
Log waiting for other module to initialize.
pstibrany Feb 19, 2020
024ce38
Converted overrides to service.
pstibrany Feb 19, 2020
fda988a
Converted client pool to a service.
pstibrany Feb 19, 2020
afdd06f
Convert ring lifecycler into a service.
pstibrany Feb 19, 2020
aedece1
Converted HA Tracker to a service.
pstibrany Feb 19, 2020
daaa30a
Converted Distributor to a service.
pstibrany Feb 19, 2020
63a879c
Handle nil from wrappedService call.
pstibrany Feb 19, 2020
60f2b77
Explain why Server uses service and not a wrappedService.
pstibrany Feb 19, 2020
557ca5f
Make service from Store.
pstibrany Feb 19, 2020
4f071d3
Convert ingester to a service.
pstibrany Feb 19, 2020
142697c
Convert querier initialization into a service. This isn't full
pstibrany Feb 19, 2020
7fc55f8
Listen for module failures, and log them.
pstibrany Feb 19, 2020
be92f74
Convert blockQueryable and UserStore into a services.
pstibrany Feb 19, 2020
d152f4d
Wait a little before shutdown.
pstibrany Feb 19, 2020
0d43e4e
Converted queryFrontend to a service.
pstibrany Feb 20, 2020
dbabbee
Logging
pstibrany Feb 20, 2020
c804f78
Convert TableManager to service
pstibrany Feb 20, 2020
c3e1ff8
Convert Ruler to service
pstibrany Feb 20, 2020
e7cfc39
Convert Configs to service
pstibrany Feb 20, 2020
7044eef
Convert AlertManager to service.
pstibrany Feb 20, 2020
33a7e2c
Renamed init methods back.
pstibrany Feb 20, 2020
d35e81a
Fixed tests.
pstibrany Feb 20, 2020
733c2ab
Converted Compactor to a service.
pstibrany Feb 20, 2020
63c699d
Polishing, comments.
pstibrany Feb 20, 2020
71f3fc2
Comments.
pstibrany Feb 20, 2020
afaf6dd
Lint comments.
pstibrany Feb 20, 2020
2d9fb6f
Stop server only after all other modules have finished.
pstibrany Feb 20, 2020
9f4233e
Don't send more jobs to lifecycler loop, if it's not running anymore.
pstibrany Feb 20, 2020
9e5c371
Don't stop Server until other modules have stopped.
pstibrany Feb 20, 2020
b232303
Removed Compactor from All target. It was meant to be for testing only.
pstibrany Feb 20, 2020
da0b49d
Comment.
pstibrany Feb 20, 2020
118fa3b
More comments around startup logic.
pstibrany Feb 20, 2020
4716d9b
moduleServiceWrapper doesn't need full Cortex, only serviceMap
pstibrany Feb 20, 2020
c4b2749
Messages
pstibrany Feb 20, 2020
61e014a
Fix outdated comment.
pstibrany Feb 20, 2020
e202384
Start lifecycler in starting functions.
pstibrany Feb 20, 2020
bee16e0
Fixed comment. Return lifecycler's failure case, if any, as error.
pstibrany Feb 20, 2020
0690631
Fix lifecycler usage.
pstibrany Feb 20, 2020
602848f
Fix test.
pstibrany Feb 20, 2020
8a088e1
Removed obsolete waiting code. Only log error if it is real error.
pstibrany Feb 26, 2020
292ea46
Renamed servManager to subservices.
pstibrany Feb 26, 2020
19e73b9
Addressing review feedback.
pstibrany Feb 26, 2020
7282f5f
Fix test.
pstibrany Feb 26, 2020
4806bc2
Fix compilation errors after rebase.
pstibrany Feb 26, 2020
ce9ba71
Extracted code that creates server service into separate file.
pstibrany Feb 27, 2020
f608199
Added some helper methods.
pstibrany Feb 27, 2020
43dbd86
Use helper methods to simplify service creation.
pstibrany Feb 27, 2020
94b7e5e
Comment.
pstibrany Feb 27, 2020
3bdf54d
Helper functions for manager.
pstibrany Feb 27, 2020
7f1aa5d
Use helper functions.
pstibrany Feb 27, 2020
216e8a9
Fixes, use helper functions.
pstibrany Feb 27, 2020
8d929f5
Fixes, use helper functions.
pstibrany Feb 27, 2020
1ed2ca3
comment
pstibrany Feb 27, 2020
d9b94aa
Helper function
pstibrany Feb 27, 2020
8fa3e33
Use helper functions to reduce amount of code.
pstibrany Feb 27, 2020
916af3f
Added tests for helper functions.
pstibrany Feb 27, 2020
a0ae3d3
Fixed imports
pstibrany Feb 27, 2020
ac2c3c6
Comment
pstibrany Feb 27, 2020
452dc64
Simplify code.
pstibrany Feb 27, 2020
255f79a
Stop and wait until stopped.
pstibrany Feb 27, 2020
6e6f325
Imports
pstibrany Feb 27, 2020
1ed35ca
Manage compaction and shipper via subservices manager.
pstibrany Feb 28, 2020
d00f8a6
Improve error message.
pstibrany Feb 28, 2020
ce77c0d
Comment.
pstibrany Feb 28, 2020
22a4637
Comment.
pstibrany Feb 28, 2020
874c56b
Added unit test for Cortex initialization and module dependencies.
pstibrany Feb 28, 2020
702b575
Comments, return errors.
pstibrany Feb 28, 2020
ca27213
Unified /ready handlers into one, that reflects state of all services.
pstibrany Feb 28, 2020
1f597b4
Added //nolint:errcheck to `defer services.StopAndAwaitTerminated(...…
pstibrany Feb 28, 2020
5a22c3d
Fix http handler logic. Also renamed it.
pstibrany Feb 28, 2020
458fa76
Address review feedback.
pstibrany Feb 28, 2020
451eba7
One more test... since Shutdown already stops Run, no need to call St…
pstibrany Feb 28, 2020
abc8d2b
CHANGELOG.md
pstibrany Feb 28, 2020
6e5540d
Fixed integration test, old versions of Cortex didn't have /ready probe.
pstibrany Feb 28, 2020
ecfa362
Make lint happy.
pstibrany Feb 28, 2020
3d412a8
Mention /ready for all services.
pstibrany Feb 28, 2020
991bb9d
Wrap "not running" error into promql.ErrStorage.
pstibrany Feb 28, 2020
16d5a3a
Expose http port via method on HTTPService.
pstibrany Mar 2, 2020
085b6ac
Print number of services in each state.
pstibrany Mar 2, 2020
4329075
Fix comment and remove obsolete line.
pstibrany Mar 2, 2020
528451d
Fix compile errors after rebase.
pstibrany Mar 2, 2020
dda59cc
Rebased and converted data purger to a service.
pstibrany Mar 3, 2020
26fadae
Pass context to the bucket client.
pstibrany Mar 4, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
* [CHANGE] Experimental TSDB: TSDB head compaction interval and concurrency is now configurable (defaults to 1 min interval and 5 concurrent head compactions). New options: `-experimental.tsdb.head-compaction-interval` and `-experimental.tsdb.head-compaction-concurrency`. #2172
* [CHANGE] Remove fluentd-based billing infrastructure and flags such as `-distributor.enable-billing`. #1491
* [CHANGE] Experimental TSDB: the querier in-memory index cache used by the experimental blocks storage shifted from per-tenant to per-querier. The `-experimental.tsdb.bucket-store.index-cache-size-bytes` now configures the per-querier index cache max size instead of a per-tenant cache and its default has been increased to 1GB. #2189
* [CHANGE] If you are vendoring Cortex and use its components in your project, be aware that many Cortex components no longer start automatically when they are created. You may want to review PR and attached document. #2166
* [CHANGE] Cortex now has /ready probe for all services, not just ingester and querier as before. In single-binary mode, /ready reports 204 only if all components are running properly. #2166
* [FEATURE] Added a read-only local alertmanager config store using files named corresponding to their tenant id. #2125
* [FEATURE] Added user sub rings to distribute users to a subset of ingesters. #1947
* `--experimental.distributor.user-subring-size`
Expand Down
2 changes: 0 additions & 2 deletions cmd/cortex/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,6 @@ func main() {
}

runtime.KeepAlive(ballast)
err = t.Stop()
util.CheckFatal("initializing cortex", err)
}

// Parse -config.file and -config.expand-env option via separate flag set, to avoid polluting default one and calling flag.Parse on it twice.
Expand Down
4 changes: 4 additions & 0 deletions integration/backward_compatibility_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,12 @@ func TestBackwardCompatibilityWithChunksStorage(t *testing.T) {
// Start Cortex components (ingester running on previous version).
require.NoError(t, writeFileToSharedDir(s, cortexSchemaConfigFile, []byte(cortexSchemaConfigYaml)))
tableManager := e2ecortex.NewTableManager("table-manager", ChunksStorageFlags, previousVersionImage)
// Old table-manager doesn't expose a readiness probe, so we just check if the / returns 404
tableManager.SetReadinessProbe(e2e.NewReadinessProbe(tableManager.HTTPPort(), "/", 404))
ingester1 := e2ecortex.NewIngester("ingester-1", consul.NetworkHTTPEndpoint(), ChunksStorageFlags, "")
distributor := e2ecortex.NewDistributor("distributor", consul.NetworkHTTPEndpoint(), ChunksStorageFlags, "")
// Old ring didn't have /ready probe, use /ring instead.
distributor.SetReadinessProbe(e2e.NewReadinessProbe(distributor.HTTPPort(), "/ring", 200))
require.NoError(t, s.StartAndWaitReady(distributor, ingester1, tableManager))

// Wait until the first table-manager sync has completed, so that we're
Expand Down
8 changes: 8 additions & 0 deletions integration/e2e/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,10 @@ func (s *ConcreteService) NetworkEndpointFor(networkName string, port int) strin
return fmt.Sprintf("%s:%d", containerName(networkName, s.name), port)
}

func (s *ConcreteService) SetReadinessProbe(probe *ReadinessProbe) {
s.readiness = probe
}

func (s *ConcreteService) Ready() error {
if !s.isExpectedRunning() {
return fmt.Errorf("service %s is stopped", s.Name())
Expand Down Expand Up @@ -425,6 +429,10 @@ func (s *HTTPService) metrics() (_ string, err error) {
return string(body), err
}

func (s *HTTPService) HTTPPort() int {
return s.httpPort
}

func (s *HTTPService) HTTPEndpoint() string {
return s.Endpoint(s.httpPort)
}
Expand Down
11 changes: 4 additions & 7 deletions integration/e2ecortex/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func NewDistributorWithConfigFile(name, consulAddress, configFile string, flags
"-ring.store": "consul",
"-consul.hostname": consulAddress,
}, flags))...),
e2e.NewReadinessProbe(httpPort, "/ring", 200),
e2e.NewReadinessProbe(httpPort, "/ready", 204),
httpPort,
grpcPort,
)
Expand Down Expand Up @@ -143,8 +143,7 @@ func NewTableManagerWithConfigFile(name, configFile string, flags map[string]str
"-target": "table-manager",
"-log.level": "warn",
}, flags))...),
// The table-manager doesn't expose a readiness probe, so we just check if the / returns 404
e2e.NewReadinessProbe(httpPort, "/", 404),
e2e.NewReadinessProbe(httpPort, "/ready", 204),
httpPort,
grpcPort,
)
Expand All @@ -170,8 +169,7 @@ func NewQueryFrontendWithConfigFile(name, configFile string, flags map[string]st
"-target": "query-frontend",
"-log.level": "warn",
}, flags))...),
// The query-frontend doesn't expose a readiness probe, so we just check if the / returns 404
e2e.NewReadinessProbe(httpPort, "/", 404),
e2e.NewReadinessProbe(httpPort, "/ready", 204),
httpPort,
grpcPort,
)
Expand Down Expand Up @@ -207,8 +205,7 @@ func NewAlertmanager(name string, flags map[string]string, image string) *Cortex
"-target": "alertmanager",
"-log.level": "warn",
}, flags))...),
// The alertmanager doesn't expose a readiness probe, so we just check if the / returns 404
e2e.NewReadinessProbe(httpPort, "/", 404),
e2e.NewReadinessProbe(httpPort, "/ready", 204),
httpPort,
grpcPort,
)
Expand Down
49 changes: 23 additions & 26 deletions pkg/alertmanager/multitenant.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/cortexproject/cortex/pkg/alertmanager/alerts"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/flagext"
"github.com/cortexproject/cortex/pkg/util/services"
)

var backoffConfig = util.BackoffConfig{
Expand Down Expand Up @@ -128,6 +129,8 @@ func (cfg *MultitenantAlertmanagerConfig) RegisterFlags(f *flag.FlagSet) {
// A MultitenantAlertmanager manages Alertmanager instances for multiple
// organizations.
type MultitenantAlertmanager struct {
services.Service

cfg *MultitenantAlertmanagerConfig

store AlertStore
Expand All @@ -147,9 +150,6 @@ type MultitenantAlertmanager struct {
metrics *alertmanagerMetrics

peer *cluster.Peer

stop chan struct{}
done chan struct{}
}

// NewMultitenantAlertmanager creates a new MultitenantAlertmanager.
Expand Down Expand Up @@ -201,6 +201,10 @@ func NewMultitenantAlertmanager(cfg *MultitenantAlertmanagerConfig, logger log.L
return nil, err
}

return createMultitenantAlertmanager(cfg, fallbackConfig, peer, store, logger, registerer), nil
}

func createMultitenantAlertmanager(cfg *MultitenantAlertmanagerConfig, fallbackConfig []byte, peer *cluster.Peer, store AlertStore, logger log.Logger, registerer prometheus.Registerer) *MultitenantAlertmanager {
am := &MultitenantAlertmanager{
cfg: cfg,
fallbackConfig: string(fallbackConfig),
Expand All @@ -210,42 +214,34 @@ func NewMultitenantAlertmanager(cfg *MultitenantAlertmanagerConfig, logger log.L
peer: peer,
store: store,
logger: log.With(logger, "component", "MultiTenantAlertmanager"),
stop: make(chan struct{}),
done: make(chan struct{}),
}

if registerer != nil {
registerer.MustRegister(am.metrics)
}

return am, nil
am.Service = services.NewTimerService(am.cfg.PollInterval, am.starting, am.iteration, am.stopping)
return am
}

// Run the MultitenantAlertmanager.
func (am *MultitenantAlertmanager) Run() {
defer close(am.done)

func (am *MultitenantAlertmanager) starting(ctx context.Context) error {
// Load initial set of all configurations before polling for new ones.
am.syncConfigs(am.loadAllConfigs())
ticker := time.NewTicker(am.cfg.PollInterval)
for {
select {
case <-ticker.C:
err := am.updateConfigs()
if err != nil {
level.Warn(am.logger).Log("msg", "error updating configs", "err", err)
}
case <-am.stop:
ticker.Stop()
return
}
return nil
}

func (am *MultitenantAlertmanager) iteration(ctx context.Context) error {
err := am.updateConfigs()
if err != nil {
level.Warn(am.logger).Log("msg", "error updating configs", "err", err)
}
// Returning error here would stop "MultitenantAlertmanager" service completely,
// so we return nil to keep service running.
return nil
}

// Stop stops the MultitenantAlertmanager.
func (am *MultitenantAlertmanager) Stop() {
close(am.stop)
<-am.done
// stopping runs when MultitenantAlertmanager transitions to Stopping state.
func (am *MultitenantAlertmanager) stopping() error {
am.alertmanagersMtx.Lock()
for _, am := range am.alertmanagers {
am.Stop()
Expand All @@ -256,6 +252,7 @@ func (am *MultitenantAlertmanager) Stop() {
level.Warn(am.logger).Log("msg", "failed to leave the cluster", "err", err)
}
level.Debug(am.logger).Log("msg", "stopping")
return nil
}

// Load the full set of configurations from the alert store, retrying with backoff
Expand Down
23 changes: 6 additions & 17 deletions pkg/alertmanager/multitenant_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"context"
"io/ioutil"
"os"
"sync"
"testing"

"github.com/go-kit/kit/log"
Expand Down Expand Up @@ -65,20 +64,10 @@ func TestLoadAllConfigs(t *testing.T) {
require.NoError(t, err)
defer os.RemoveAll(tempDir)

am := &MultitenantAlertmanager{
cfg: &MultitenantAlertmanagerConfig{
ExternalURL: externalURL,
DataDir: tempDir,
},
store: mockStore,
cfgs: map[string]alerts.AlertConfigDesc{},
alertmanagersMtx: sync.Mutex{},
alertmanagers: map[string]*Alertmanager{},
logger: log.NewNopLogger(),
stop: make(chan struct{}),
done: make(chan struct{}),
metrics: newAlertmanagerMetrics(),
}
am := createMultitenantAlertmanager(&MultitenantAlertmanagerConfig{
ExternalURL: externalURL,
DataDir: tempDir,
}, nil, nil, mockStore, log.NewNopLogger(), nil)

// Ensure the configs are synced correctly
require.NoError(t, am.updateConfigs())
Expand Down Expand Up @@ -123,7 +112,7 @@ func TestLoadAllConfigs(t *testing.T) {

userAM, exists := am.alertmanagers["user3"]
require.True(t, exists)
require.False(t, userAM.isActive())
require.False(t, userAM.IsActive())

// Ensure when a 3rd config is re-added, it is synced correctly
mockStore.configs["user3"] = alerts.AlertConfigDesc{
Expand All @@ -140,5 +129,5 @@ func TestLoadAllConfigs(t *testing.T) {

userAM, exists = am.alertmanagers["user3"]
require.True(t, exists)
require.True(t, userAM.isActive())
require.True(t, userAM.IsActive())
}
62 changes: 25 additions & 37 deletions pkg/chunk/purger/purger.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/cortexproject/cortex/pkg/chunk"
"github.com/cortexproject/cortex/pkg/ingester/client"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/services"
)

const millisecondPerDay = int64(24 * time.Hour / time.Millisecond)
Expand Down Expand Up @@ -51,6 +52,8 @@ type workerJob struct {

// DataPurger does the purging of data which is requested to be deleted
type DataPurger struct {
services.Service

cfg Config
deleteStore *chunk.DeleteStore
chunkStore chunk.Store
Expand All @@ -67,8 +70,7 @@ type DataPurger struct {
pendingPlansCount map[string]int // per request pending plan count
pendingPlansCountMtx sync.Mutex

quit chan struct{}
wg sync.WaitGroup
wg sync.WaitGroup
}

// NewDataPurger creates a new DataPurger
Expand All @@ -82,45 +84,39 @@ func NewDataPurger(cfg Config, deleteStore *chunk.DeleteStore, chunkStore chunk.
workerJobChan: make(chan workerJob, 50),
inProcessRequestIDs: map[string]string{},
pendingPlansCount: map[string]int{},
quit: make(chan struct{}),
}

dataPurger.Service = services.NewTimerService(time.Hour, dataPurger.init, dataPurger.runOneIteration, dataPurger.stop)
return &dataPurger, nil
}

// Run keeps pulling delete requests for planning after initializing necessary things
func (dp *DataPurger) Run() {
dp.wg.Add(1)
defer dp.wg.Done()

pullDeleteRequestsToPlanDeletesTicker := time.NewTicker(time.Hour)
defer pullDeleteRequestsToPlanDeletesTicker.Stop()

for {
select {
case <-pullDeleteRequestsToPlanDeletesTicker.C:
err := dp.pullDeleteRequestsToPlanDeletes()
if err != nil {
level.Error(util.Logger).Log("msg", "error pulling delete requests for building plans", "err", err)
}
case <-dp.quit:
return
}
func (dp *DataPurger) runOneIteration(ctx context.Context) error {
err := dp.pullDeleteRequestsToPlanDeletes()
if err != nil {
level.Error(util.Logger).Log("msg", "error pulling delete requests for building plans", "err", err)
}
// Don't return error here, or Timer service will stop.
return nil
}

// Init starts workers, scheduler and then loads in process delete requests
func (dp *DataPurger) Init() error {
dp.runWorkers()
go dp.jobScheduler()
// init starts workers, scheduler and then loads in process delete requests
func (dp *DataPurger) init(ctx context.Context) error {
for i := 0; i < dp.cfg.NumWorkers; i++ {
dp.wg.Add(1)
go dp.worker()
}

dp.wg.Add(1)
go dp.jobScheduler(ctx)

return dp.loadInprocessDeleteRequests()
}

// Stop stops all background workers/loops
func (dp *DataPurger) Stop() {
close(dp.quit)
// Stop waits until all background tasks stop.
func (dp *DataPurger) stop() error {
dp.wg.Wait()
return nil
}

func (dp *DataPurger) workerJobCleanup(job workerJob) {
Expand Down Expand Up @@ -154,8 +150,7 @@ func (dp *DataPurger) workerJobCleanup(job workerJob) {
}

// we send all the delete plans to workerJobChan
func (dp *DataPurger) jobScheduler() {
dp.wg.Add(1)
func (dp *DataPurger) jobScheduler(ctx context.Context) {
defer dp.wg.Done()

for {
Expand All @@ -172,20 +167,13 @@ func (dp *DataPurger) jobScheduler() {
dp.workerJobChan <- workerJob{planNo: i, userID: req.UserID,
deleteRequestID: req.RequestID, logger: req.logger}
}
case <-dp.quit:
case <-ctx.Done():
close(dp.workerJobChan)
return
}
}
}

func (dp *DataPurger) runWorkers() {
for i := 0; i < dp.cfg.NumWorkers; i++ {
dp.wg.Add(1)
go dp.worker()
}
}

func (dp *DataPurger) worker() {
defer dp.wg.Done()

Expand Down
Loading