Skip to content

Commit 5d55a28

Browse files
authored
Distributors rings status (cortexproject#4151)
* Distributors rings status Now we're able to see the ring status for the distributor when the ingestion-rate-limit-strategy setting is set to global Signed-off-by: Mario de Frutos <mario@defrutos.org> * Fix lint error Signed-off-by: Mario de Frutos <mario@defrutos.org> * Use the proper prometheus registerer Signed-off-by: Mario de Frutos <mario@defrutos.org> * Improve error message Signed-off-by: Mario de Frutos <mario@defrutos.org> * Create util.WriteHTMLResponse and use it Signed-off-by: Mario de Frutos <mario@defrutos.org>
1 parent c4b8bd1 commit 5d55a28

File tree

8 files changed

+73
-20
lines changed

8 files changed

+73
-20
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
* [CHANGE] Alertmanager: allowed to configure the experimental receivers firewall on a per-tenant basis. The following CLI flags (and their respective YAML config options) have been changed and moved to the limits config section: #4143
77
- `-alertmanager.receivers-firewall.block.cidr-networks` renamed to `-alertmanager.receivers-firewall-block-cidr-networks`
88
- `-alertmanager.receivers-firewall.block.private-addresses` renamed to `-alertmanager.receivers-firewall-block-private-addresses`
9+
* [CHANGE] Distributor: Added ring status section in the admin page #4151
910
* [ENHANCEMENT] Alertmanager: introduced new metrics to monitor operation when using `-alertmanager.sharding-enabled`: #4149
1011
* `cortex_alertmanager_state_fetch_replica_state_total`
1112
* `cortex_alertmanager_state_fetch_replica_state_failed_total`

docs/api/_index.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,14 @@ _For more information, please check out Prometheus [Remote storage integrations]
213213

214214
_Requires [authentication](#authentication)._
215215

216+
### Distributor ring status
217+
218+
```
219+
GET /distributor/ring
220+
```
221+
222+
Displays a web page with the distributor hash ring status, including the state, healthy and last heartbeat time of each distributor.
223+
216224
### Tenants stats
217225

218226
```

pkg/api/api.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -217,9 +217,11 @@ func (a *API) RegisterDistributor(d *distributor.Distributor, pushConfig distrib
217217

218218
a.RegisterRoute("/api/v1/push", push.Handler(pushConfig.MaxRecvMsgSize, a.sourceIPs, a.cfg.wrapDistributorPush(d)), true, "POST")
219219

220+
a.indexPage.AddLink(SectionAdminEndpoints, "/distributor/ring", "Distributor Ring Status")
220221
a.indexPage.AddLink(SectionAdminEndpoints, "/distributor/all_user_stats", "Usage Statistics")
221222
a.indexPage.AddLink(SectionAdminEndpoints, "/distributor/ha_tracker", "HA Tracking Status")
222223

224+
a.RegisterRoute("/distributor/ring", d, false, "GET", "POST")
223225
a.RegisterRoute("/distributor/all_user_stats", http.HandlerFunc(d.AllUserStatsHandler), false, "GET")
224226
a.RegisterRoute("/distributor/ha_tracker", d.HATracker, false, "GET")
225227

pkg/distributor/distributor.go

Lines changed: 41 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,8 @@ type Distributor struct {
7373

7474
// The global rate limiter requires a distributors ring to count
7575
// the number of healthy instances
76-
distributorsRing *ring.Lifecycler
76+
distributorsLifeCycler *ring.Lifecycler
77+
distributorsRing *ring.Ring
7778

7879
// For handling HA replicas.
7980
HATracker *haTracker
@@ -202,33 +203,39 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove
202203
// it's an internal dependency and can't join the distributors ring, we skip rate
203204
// limiting.
204205
var ingestionRateStrategy limiter.RateLimiterStrategy
205-
var distributorsRing *ring.Lifecycler
206+
var distributorsLifeCycler *ring.Lifecycler
207+
var distributorsRing *ring.Ring
206208

207209
if !canJoinDistributorsRing {
208210
ingestionRateStrategy = newInfiniteIngestionRateStrategy()
209211
} else if limits.IngestionRateStrategy() == validation.GlobalIngestionRateStrategy {
210-
distributorsRing, err = ring.NewLifecycler(cfg.DistributorRing.ToLifecyclerConfig(), nil, "distributor", ring.DistributorRingKey, true, reg)
212+
distributorsLifeCycler, err = ring.NewLifecycler(cfg.DistributorRing.ToLifecyclerConfig(), nil, "distributor", ring.DistributorRingKey, true, reg)
211213
if err != nil {
212214
return nil, err
213215
}
214216

215-
subservices = append(subservices, distributorsRing)
217+
distributorsRing, err = ring.New(cfg.DistributorRing.ToRingConfig(), "distributor", ring.DistributorRingKey, reg)
218+
if err != nil {
219+
return nil, errors.Wrap(err, "failed to initialize distributors' ring client")
220+
}
221+
subservices = append(subservices, distributorsLifeCycler, distributorsRing)
216222

217-
ingestionRateStrategy = newGlobalIngestionRateStrategy(limits, distributorsRing)
223+
ingestionRateStrategy = newGlobalIngestionRateStrategy(limits, distributorsLifeCycler)
218224
} else {
219225
ingestionRateStrategy = newLocalIngestionRateStrategy(limits)
220226
}
221227

222228
d := &Distributor{
223-
cfg: cfg,
224-
log: log,
225-
ingestersRing: ingestersRing,
226-
ingesterPool: NewPool(cfg.PoolConfig, ingestersRing, cfg.IngesterClientFactory, log),
227-
distributorsRing: distributorsRing,
228-
limits: limits,
229-
ingestionRateLimiter: limiter.NewRateLimiter(ingestionRateStrategy, 10*time.Second),
230-
HATracker: haTracker,
231-
ingestionRate: util_math.NewEWMARate(0.2, instanceIngestionRateTickInterval),
229+
cfg: cfg,
230+
log: log,
231+
ingestersRing: ingestersRing,
232+
ingesterPool: NewPool(cfg.PoolConfig, ingestersRing, cfg.IngesterClientFactory, log),
233+
distributorsLifeCycler: distributorsLifeCycler,
234+
distributorsRing: distributorsRing,
235+
limits: limits,
236+
ingestionRateLimiter: limiter.NewRateLimiter(ingestionRateStrategy, 10*time.Second),
237+
HATracker: haTracker,
238+
ingestionRate: util_math.NewEWMARate(0.2, instanceIngestionRateTickInterval),
232239

233240
queryDuration: instrument.NewHistogramCollector(promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
234241
Namespace: "cortex",
@@ -1029,3 +1036,23 @@ func (d *Distributor) AllUserStats(ctx context.Context) ([]UserIDStats, error) {
10291036

10301037
return response, nil
10311038
}
1039+
1040+
func (d *Distributor) ServeHTTP(w http.ResponseWriter, req *http.Request) {
1041+
if d.distributorsRing != nil {
1042+
d.distributorsRing.ServeHTTP(w, req)
1043+
} else {
1044+
var ringNotEnabledPage = `
1045+
<!DOCTYPE html>
1046+
<html>
1047+
<head>
1048+
<meta charset="UTF-8">
1049+
<title>Cortex Distributor Status</title>
1050+
</head>
1051+
<body>
1052+
<h1>Cortex Distributor Status</h1>
1053+
<p>Distributor is not running with global limits enabled</p>
1054+
</body>
1055+
</html>`
1056+
util.WriteHTMLResponse(w, ringNotEnabledPage)
1057+
}
1058+
}

pkg/distributor/distributor_ring.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,3 +86,14 @@ func (cfg *RingConfig) ToLifecyclerConfig() ring.LifecyclerConfig {
8686

8787
return lc
8888
}
89+
90+
func (cfg *RingConfig) ToRingConfig() ring.Config {
91+
rc := ring.Config{}
92+
flagext.DefaultValues(&rc)
93+
94+
rc.KVStore = cfg.KVStore
95+
rc.HeartbeatTimeout = cfg.HeartbeatTimeout
96+
rc.ReplicationFactor = 1
97+
98+
return rc
99+
}

pkg/distributor/distributor_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1789,7 +1789,7 @@ func prepare(t *testing.T, cfg prepConfig) ([]*Distributor, []mockIngester, *rin
17891789
// updates to the expected size
17901790
if distributors[0].distributorsRing != nil {
17911791
test.Poll(t, time.Second, cfg.numDistributors, func() interface{} {
1792-
return distributors[0].distributorsRing.HealthyInstancesCount()
1792+
return distributors[0].distributorsLifeCycler.HealthyInstancesCount()
17931793
})
17941794
}
17951795

pkg/ruler/ruler.go

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -426,11 +426,7 @@ func (r *Ruler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
426426
<p>Ruler running with shards disabled</p>
427427
</body>
428428
</html>`
429-
w.WriteHeader(http.StatusOK)
430-
_, err := w.Write([]byte(unshardedPage))
431-
if err != nil {
432-
level.Error(r.logger).Log("msg", "unable to serve status page", "err", err)
433-
}
429+
util.WriteHTMLResponse(w, unshardedPage)
434430
}
435431
}
436432

pkg/util/http.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,14 @@ func WriteTextResponse(w http.ResponseWriter, message string) {
8585
_, _ = w.Write([]byte(message))
8686
}
8787

88+
// Sends message as text/html response with 200 status code.
89+
func WriteHTMLResponse(w http.ResponseWriter, message string) {
90+
w.Header().Set("Content-Type", "text/html")
91+
92+
// Ignore inactionable errors.
93+
_, _ = w.Write([]byte(message))
94+
}
95+
8896
// RenderHTTPResponse either responds with json or a rendered html page using the passed in template
8997
// by checking the Accepts header
9098
func RenderHTTPResponse(w http.ResponseWriter, v interface{}, t *template.Template, r *http.Request) {

0 commit comments

Comments
 (0)