Skip to content

Commit 15ad4de

Browse files
authored
New ring state READONLY to be used by ingesters (#6163)
* New READONLY state on ring Signed-off-by: Daniel Deluiggi <ddeluigg@amazon.com> * Changelog Signed-off-by: Daniel Deluiggi <ddeluigg@amazon.com> * remove white space Signed-off-by: Daniel Deluiggi <ddeluigg@amazon.com> * Update changelog Signed-off-by: Daniel Deluiggi <ddeluigg@amazon.com> * Add new tests. Change api endpoints Signed-off-by: Daniel Deluiggi <ddeluigg@amazon.com> * Remove cross-site scripting. Add tests Signed-off-by: Daniel Deluiggi <ddeluigg@amazon.com> * Remove unRegister on shutdown Signed-off-by: Daniel Deluiggi <ddeluigg@amazon.com> * Add api endpoint Signed-off-by: Daniel Deluiggi <ddeluigg@amazon.com> --------- Signed-off-by: Daniel Deluiggi <ddeluigg@amazon.com> Signed-off-by: Daniel Blando <daniel@blando.com.br>
1 parent 39a168d commit 15ad4de

File tree

10 files changed

+333
-45
lines changed

10 files changed

+333
-45
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
## master / unreleased
44

55
* [ENHANCEMENT] Ruler: Add new ruler metric `cortex_ruler_rule_groups_in_store` that is the total rule groups per tenant in store, which can be used to compare with `cortex_prometheus_rule_group_rules` to count the number of rule groups that are not loaded by a ruler. #5869
6+
* [ENHANCEMENT] Ingester/Ring: New `READONLY` status on ring to be used by Ingester. New ingester API to change mode of ingester #6163
67
* [ENHANCEMENT] Ruler: Add query statistics metrics when --ruler.query-stats-enabled=true. #6173
78
* [ENHANCEMENT] Distributor: Add new `cortex_reduced_resolution_histogram_samples_total` metric to to track the number of histogram samples which resolution was reduced. #6182
89

docs/api/_index.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ For the sake of clarity, in this document we have grouped API endpoints by servi
3232
| [Flush blocks](#flush-blocks) | Ingester || `GET,POST /ingester/flush` |
3333
| [Shutdown](#shutdown) | Ingester || `GET,POST /ingester/shutdown` |
3434
| [Ingesters ring status](#ingesters-ring-status) | Ingester || `GET /ingester/ring` |
35+
| [Ingester mode](#ingester-mode) | Ingester || `GET,POST /ingester/mode` |
3536
| [Instant query](#instant-query) | Querier, Query-frontend || `GET,POST <prometheus-http-prefix>/api/v1/query` |
3637
| [Range query](#range-query) | Querier, Query-frontend || `GET,POST <prometheus-http-prefix>/api/v1/query_range` |
3738
| [Exemplar query](#exemplar-query) | Querier, Query-frontend || `GET,POST <prometheus-http-prefix>/api/v1/query_exemplars` |
@@ -296,6 +297,15 @@ GET /ring
296297

297298
Displays a web page with the ingesters hash ring status, including the state, healthy and last heartbeat time of each ingester.
298299

300+
### Ingester mode
301+
302+
```
303+
GET,POST /ingester/mode
304+
```
305+
Change ingester mode between ACTIVE or READONLY. READONLY ingester does not receive push requests and will only be called for query operations.
306+
307+
The endpoint accept query param `mode` or POST as `application/x-www-form-urlencoded` with mode type.
308+
299309

300310
## Querier / Query-frontend
301311

pkg/api/api.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -284,6 +284,7 @@ type Ingester interface {
284284
FlushHandler(http.ResponseWriter, *http.Request)
285285
ShutdownHandler(http.ResponseWriter, *http.Request)
286286
RenewTokenHandler(http.ResponseWriter, *http.Request)
287+
ModeHandler(http.ResponseWriter, *http.Request)
287288
Push(context.Context, *cortexpb.WriteRequest) (*cortexpb.WriteResponse, error)
288289
}
289290

@@ -294,9 +295,12 @@ func (a *API) RegisterIngester(i Ingester, pushConfig distributor.Config) {
294295
a.indexPage.AddLink(SectionDangerous, "/ingester/flush", "Trigger a Flush of data from Ingester to storage")
295296
a.indexPage.AddLink(SectionDangerous, "/ingester/shutdown", "Trigger Ingester Shutdown (Dangerous)")
296297
a.indexPage.AddLink(SectionDangerous, "/ingester/renewTokens", "Renew Ingester Tokens (10%)")
298+
a.indexPage.AddLink(SectionDangerous, "/ingester/mode?mode=READONLY", "Set Ingester to READONLY mode")
299+
a.indexPage.AddLink(SectionDangerous, "/ingester/mode?mode=ACTIVE", "Set Ingester to ACTIVE mode")
297300
a.RegisterRoute("/ingester/flush", http.HandlerFunc(i.FlushHandler), false, "GET", "POST")
298301
a.RegisterRoute("/ingester/shutdown", http.HandlerFunc(i.ShutdownHandler), false, "GET", "POST")
299302
a.RegisterRoute("/ingester/renewTokens", http.HandlerFunc(i.RenewTokenHandler), false, "GET", "POST")
303+
a.RegisterRoute("/ingester/mode", http.HandlerFunc(i.ModeHandler), false, "GET", "POST")
300304
a.RegisterRoute("/ingester/push", push.Handler(pushConfig.MaxRecvMsgSize, a.sourceIPs, i.Push), true, "POST") // For testing and debugging.
301305

302306
// Legacy Routes

pkg/ingester/ingester.go

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"flag"
66
"fmt"
7+
"html"
78
"io"
89
"math"
910
"net/http"
@@ -2881,6 +2882,61 @@ func (i *Ingester) flushHandler(w http.ResponseWriter, r *http.Request) {
28812882
w.WriteHeader(http.StatusNoContent)
28822883
}
28832884

2885+
// ModeHandler Change mode of ingester. It will also update set unregisterOnShutdown to true if READONLY mode
2886+
func (i *Ingester) ModeHandler(w http.ResponseWriter, r *http.Request) {
2887+
err := r.ParseForm()
2888+
if err != nil {
2889+
respMsg := "failed to parse HTTP request in mode handler"
2890+
level.Warn(logutil.WithContext(r.Context(), i.logger)).Log("msg", respMsg, "err", err)
2891+
w.WriteHeader(http.StatusBadRequest)
2892+
// We ignore errors here, because we cannot do anything about them.
2893+
_, _ = w.Write([]byte(respMsg))
2894+
return
2895+
}
2896+
2897+
currentState := i.lifecycler.GetState()
2898+
reqMode := strings.ToUpper(r.Form.Get("mode"))
2899+
switch reqMode {
2900+
case "READONLY":
2901+
if currentState != ring.READONLY {
2902+
err = i.lifecycler.ChangeState(r.Context(), ring.READONLY)
2903+
if err != nil {
2904+
respMsg := fmt.Sprintf("failed to change state: %s", err)
2905+
level.Warn(logutil.WithContext(r.Context(), i.logger)).Log("msg", respMsg)
2906+
w.WriteHeader(http.StatusBadRequest)
2907+
// We ignore errors here, because we cannot do anything about them.
2908+
_, _ = w.Write([]byte(respMsg))
2909+
return
2910+
}
2911+
}
2912+
case "ACTIVE":
2913+
if currentState != ring.ACTIVE {
2914+
err = i.lifecycler.ChangeState(r.Context(), ring.ACTIVE)
2915+
if err != nil {
2916+
respMsg := fmt.Sprintf("failed to change state: %s", err)
2917+
level.Warn(logutil.WithContext(r.Context(), i.logger)).Log("msg", respMsg)
2918+
w.WriteHeader(http.StatusBadRequest)
2919+
// We ignore errors here, because we cannot do anything about them.
2920+
_, _ = w.Write([]byte(respMsg))
2921+
return
2922+
}
2923+
}
2924+
default:
2925+
respMsg := fmt.Sprintf("invalid mode input: %s", html.EscapeString(reqMode))
2926+
level.Warn(logutil.WithContext(r.Context(), i.logger)).Log("msg", respMsg)
2927+
w.WriteHeader(http.StatusBadRequest)
2928+
// We ignore errors here, because we cannot do anything about them.
2929+
_, _ = w.Write([]byte(respMsg))
2930+
return
2931+
}
2932+
2933+
respMsg := fmt.Sprintf("Ingester mode %s", i.lifecycler.GetState())
2934+
level.Info(logutil.WithContext(r.Context(), i.logger)).Log("msg", respMsg)
2935+
w.WriteHeader(http.StatusOK)
2936+
// We ignore errors here, because we cannot do anything about them.
2937+
_, _ = w.Write([]byte(respMsg))
2938+
}
2939+
28842940
// metadataQueryRange returns the best range to query for metadata queries based on the timerange in the ingester.
28852941
func metadataQueryRange(queryStart, queryEnd int64, db *userTSDB, queryIngestersWithin time.Duration) (mint, maxt int64, err error) {
28862942
if queryIngestersWithin > 0 {

pkg/ingester/ingester_test.go

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5107,6 +5107,117 @@ func generateSamplesForLabel(l labels.Labels, count int) *cortexpb.WriteRequest
51075107
return cortexpb.ToWriteRequest(lbls, samples, nil, nil, cortexpb.API)
51085108
}
51095109

5110+
func Test_Ingester_ModeHandler(t *testing.T) {
5111+
tests := map[string]struct {
5112+
method string
5113+
requestBody io.Reader
5114+
requestUrl string
5115+
initialState ring.InstanceState
5116+
mode string
5117+
expectedState ring.InstanceState
5118+
expectedResponse int
5119+
}{
5120+
"should change to READONLY mode": {
5121+
method: "POST",
5122+
initialState: ring.ACTIVE,
5123+
requestUrl: "/mode?mode=reAdOnLy",
5124+
expectedState: ring.READONLY,
5125+
expectedResponse: http.StatusOK,
5126+
},
5127+
"should change mode on GET method": {
5128+
method: "GET",
5129+
initialState: ring.ACTIVE,
5130+
requestUrl: "/mode?mode=READONLY",
5131+
expectedState: ring.READONLY,
5132+
expectedResponse: http.StatusOK,
5133+
},
5134+
"should change mode on POST method via body": {
5135+
method: "POST",
5136+
initialState: ring.ACTIVE,
5137+
requestUrl: "/mode",
5138+
requestBody: strings.NewReader("mode=readonly"),
5139+
expectedState: ring.READONLY,
5140+
expectedResponse: http.StatusOK,
5141+
},
5142+
"should change to ACTIVE mode": {
5143+
method: "POST",
5144+
initialState: ring.READONLY,
5145+
requestUrl: "/mode?mode=active",
5146+
expectedState: ring.ACTIVE,
5147+
expectedResponse: http.StatusOK,
5148+
},
5149+
"should fail to unknown mode": {
5150+
method: "POST",
5151+
initialState: ring.ACTIVE,
5152+
requestUrl: "/mode?mode=NotSupported",
5153+
expectedState: ring.ACTIVE,
5154+
expectedResponse: http.StatusBadRequest,
5155+
},
5156+
"should maintain in readonly": {
5157+
method: "POST",
5158+
initialState: ring.READONLY,
5159+
requestUrl: "/mode?mode=READONLY",
5160+
expectedState: ring.READONLY,
5161+
expectedResponse: http.StatusOK,
5162+
},
5163+
"should maintain in active": {
5164+
method: "POST",
5165+
initialState: ring.ACTIVE,
5166+
requestUrl: "/mode?mode=ACTIVE",
5167+
expectedState: ring.ACTIVE,
5168+
expectedResponse: http.StatusOK,
5169+
},
5170+
"should fail mode READONLY if LEAVING state": {
5171+
method: "POST",
5172+
initialState: ring.LEAVING,
5173+
requestUrl: "/mode?mode=READONLY",
5174+
expectedState: ring.LEAVING,
5175+
expectedResponse: http.StatusBadRequest,
5176+
},
5177+
"should fail with malformatted request": {
5178+
method: "GET",
5179+
initialState: ring.ACTIVE,
5180+
requestUrl: "/mode?mod;e=READONLY",
5181+
expectedResponse: http.StatusBadRequest,
5182+
},
5183+
}
5184+
5185+
for testName, testData := range tests {
5186+
t.Run(testName, func(t *testing.T) {
5187+
cfg := defaultIngesterTestConfig(t)
5188+
i, err := prepareIngesterWithBlocksStorage(t, cfg, prometheus.NewRegistry())
5189+
require.NoError(t, err)
5190+
require.NoError(t, services.StartAndAwaitRunning(context.Background(), i))
5191+
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck
5192+
5193+
// Wait until it's ACTIVE
5194+
test.Poll(t, 1*time.Second, ring.ACTIVE, func() interface{} {
5195+
return i.lifecycler.GetState()
5196+
})
5197+
5198+
if testData.initialState != ring.ACTIVE {
5199+
err = i.lifecycler.ChangeState(context.Background(), testData.initialState)
5200+
require.NoError(t, err)
5201+
5202+
// Wait until initial state
5203+
test.Poll(t, 1*time.Second, testData.initialState, func() interface{} {
5204+
return i.lifecycler.GetState()
5205+
})
5206+
}
5207+
5208+
response := httptest.NewRecorder()
5209+
request := httptest.NewRequest(testData.method, testData.requestUrl, testData.requestBody)
5210+
if testData.requestBody != nil {
5211+
request.Header.Set("Content-Type", "application/x-www-form-urlencoded; param=value")
5212+
}
5213+
i.ModeHandler(response, request)
5214+
5215+
require.Equal(t, testData.expectedResponse, response.Code)
5216+
require.Equal(t, testData.expectedState, i.lifecycler.GetState())
5217+
})
5218+
}
5219+
}
5220+
51105221
// mockTenantLimits exposes per-tenant limits based on a provided map
51115222
type mockTenantLimits struct {
51125223
limits map[string]*validation.Limits

pkg/ring/lifecycler.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -910,7 +910,10 @@ func (i *Lifecycler) changeState(ctx context.Context, state InstanceState) error
910910
(currState == JOINING && state == PENDING) || // triggered by TransferChunks on failure
911911
(currState == JOINING && state == ACTIVE) || // triggered by TransferChunks on success
912912
(currState == PENDING && state == ACTIVE) || // triggered by autoJoin
913-
(currState == ACTIVE && state == LEAVING)) { // triggered by shutdown
913+
(currState == ACTIVE && state == LEAVING) || // triggered by shutdown
914+
(currState == ACTIVE && state == READONLY) || // triggered by ingester mode
915+
(currState == READONLY && state == ACTIVE) || // triggered by ingester mode
916+
(currState == READONLY && state == LEAVING)) { // triggered by shutdown
914917
return fmt.Errorf("Changing instance state from %v -> %v is disallowed", currState, state)
915918
}
916919

pkg/ring/ring.go

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -106,13 +106,17 @@ var (
106106
})
107107

108108
// WriteNoExtend is like Write, but with no replicaset extension.
109-
WriteNoExtend = NewOp([]InstanceState{ACTIVE}, nil)
109+
WriteNoExtend = NewOp([]InstanceState{ACTIVE}, func(s InstanceState) bool {
110+
// We want to skip instances that are READONLY. So we will increase the size of replication
111+
// for the key
112+
return s == READONLY
113+
})
110114

111-
// Read operation that extends the replica set if an instance is not ACTIVE, LEAVING OR JOINING
112-
Read = NewOp([]InstanceState{ACTIVE, PENDING, LEAVING, JOINING}, func(s InstanceState) bool {
115+
// Read operation that extends the replica set if an instance is not ACTIVE, PENDING, LEAVING, JOINING OR READONLY
116+
Read = NewOp([]InstanceState{ACTIVE, PENDING, LEAVING, JOINING, READONLY}, func(s InstanceState) bool {
113117
// To match Write with extended replica set we have to also increase the
114118
// size of the replica set for Read, but we can read from LEAVING ingesters.
115-
return s != ACTIVE && s != LEAVING && s != JOINING
119+
return s != ACTIVE && s != LEAVING && s != JOINING && s != READONLY
116120
})
117121

118122
// Reporting is a special value for inquiring about health.
@@ -661,7 +665,7 @@ func (r *Ring) updateRingMetrics(compareResult CompareResult) {
661665
oldestTimestampByState := map[string]int64{}
662666

663667
// Initialized to zero so we emit zero-metrics (instead of not emitting anything)
664-
for _, s := range []string{unhealthy, ACTIVE.String(), LEAVING.String(), PENDING.String(), JOINING.String()} {
668+
for _, s := range []string{unhealthy, ACTIVE.String(), LEAVING.String(), PENDING.String(), JOINING.String(), READONLY.String()} {
665669
numByState[s] = 0
666670
oldestTimestampByState[s] = 0
667671
}
@@ -995,7 +999,7 @@ func NewOp(healthyStates []InstanceState, shouldExtendReplicaSet func(s Instance
995999
}
9961000

9971001
if shouldExtendReplicaSet != nil {
998-
for _, s := range []InstanceState{ACTIVE, LEAVING, PENDING, JOINING, LEFT} {
1002+
for _, s := range []InstanceState{ACTIVE, LEAVING, PENDING, JOINING, LEFT, READONLY} {
9991003
if shouldExtendReplicaSet(s) {
10001004
op |= (0x10000 << s)
10011005
}

pkg/ring/ring.pb.go

Lines changed: 37 additions & 33 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/ring/ring.proto

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,4 +51,6 @@ enum InstanceState {
5151
// This state is only used by gossiping code to distribute information about
5252
// instances that have been removed from the ring. Ring users should not use it directly.
5353
LEFT = 4;
54+
55+
READONLY= 5;
5456
}

0 commit comments

Comments
 (0)