Skip to content

Commit 2fa28ad

Browse files
committed
Change to not break exist behavior
Signed-off-by: SungJin1212 <tjdwls1201@gmail.com>
1 parent 68c41a9 commit 2fa28ad

File tree

3 files changed

+136
-77
lines changed

3 files changed

+136
-77
lines changed

integration/remote_write_v2_test.go

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,69 @@ func TestIngesterRollingUpdate(t *testing.T) {
137137
}
138138
}
139139

140+
func TestIngest_SenderSendPRW2_DistributorNotAllowPRW2(t *testing.T) {
141+
const blockRangePeriod = 5 * time.Second
142+
143+
s, err := e2e.NewScenario(networkName)
144+
require.NoError(t, err)
145+
defer s.Close()
146+
147+
// Start dependencies.
148+
consul := e2edb.NewConsulWithName("consul")
149+
require.NoError(t, s.StartAndWaitReady(consul))
150+
151+
flags := mergeFlags(
152+
AlertmanagerLocalFlags(),
153+
map[string]string{
154+
"-store.engine": blocksStorageEngine,
155+
"-blocks-storage.backend": "filesystem",
156+
"-blocks-storage.tsdb.head-compaction-interval": "4m",
157+
"-blocks-storage.bucket-store.sync-interval": "15m",
158+
"-blocks-storage.bucket-store.index-cache.backend": tsdb.IndexCacheBackendInMemory,
159+
"-blocks-storage.bucket-store.bucket-index.enabled": "true",
160+
"-querier.query-store-for-labels-enabled": "true",
161+
"-blocks-storage.tsdb.block-ranges-period": blockRangePeriod.String(),
162+
"-blocks-storage.tsdb.ship-interval": "1s",
163+
"-blocks-storage.tsdb.retention-period": ((blockRangePeriod * 2) - 1).String(),
164+
"-blocks-storage.tsdb.enable-native-histograms": "true",
165+
// Ingester.
166+
"-ring.store": "consul",
167+
"-consul.hostname": consul.NetworkHTTPEndpoint(),
168+
// Distributor.
169+
"-distributor.replication-factor": "1",
170+
"-distributor.remote-write2-enabled": "false",
171+
// Store-gateway.
172+
"-store-gateway.sharding-enabled": "false",
173+
// alert manager
174+
"-alertmanager.web.external-url": "http://localhost/alertmanager",
175+
},
176+
)
177+
178+
// make alert manager config dir
179+
require.NoError(t, writeFileToSharedDir(s, "alertmanager_configs", []byte{}))
180+
181+
path := path.Join(s.SharedDir(), "cortex-1")
182+
183+
flags = mergeFlags(flags, map[string]string{"-blocks-storage.filesystem.dir": path})
184+
// Start Cortex replicas.
185+
cortex := e2ecortex.NewSingleBinary("cortex", flags, "")
186+
require.NoError(t, s.StartAndWaitReady(cortex))
187+
188+
// Wait until Cortex replicas have updated the ring state.
189+
require.NoError(t, cortex.WaitSumMetrics(e2e.Equals(float64(512)), "cortex_ring_tokens_total"))
190+
191+
c, err := e2ecortex.NewClient(cortex.HTTPEndpoint(), cortex.HTTPEndpoint(), "", "", "user-1")
192+
require.NoError(t, err)
193+
194+
now := time.Now()
195+
196+
// series push
197+
symbols1, series, _ := e2e.GenerateSeriesV2("test_series", now, prompb.Label{Name: "job", Value: "test"}, prompb.Label{Name: "foo", Value: "bar"})
198+
res, err := c.PushV2(symbols1, series)
199+
require.NoError(t, err)
200+
require.Equal(t, 200, res.StatusCode)
201+
}
202+
140203
func TestIngest(t *testing.T) {
141204
const blockRangePeriod = 5 * time.Second
142205

pkg/util/push/push.go

Lines changed: 73 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,6 @@ const (
3232
rw20WrittenSamplesHeader = "X-Prometheus-Remote-Write-Samples-Written"
3333
rw20WrittenHistogramsHeader = "X-Prometheus-Remote-Write-Histograms-Written"
3434
rw20WrittenExemplarsHeader = "X-Prometheus-Remote-Write-Exemplars-Written"
35-
36-
errMsgNotEnabledPRW2 = "Not enabled prometheus remote write v2 push request"
3735
)
3836

3937
// Func defines the type of the push. It is similar to http.HandlerFunc.
@@ -52,36 +50,7 @@ func Handler(remoteWrite2Enabled bool, maxRecvMsgSize int, sourceIPs *middleware
5250
}
5351
}
5452

55-
// follow Prometheus https://github.com/prometheus/prometheus/blob/main/storage/remote/write_handler.go
56-
contentType := r.Header.Get("Content-Type")
57-
if contentType == "" {
58-
contentType = appProtoContentType
59-
}
60-
61-
msgType, err := parseProtoMsg(contentType)
62-
if err != nil {
63-
level.Error(logger).Log("Error decoding remote write request", "err", err)
64-
http.Error(w, err.Error(), http.StatusUnsupportedMediaType)
65-
return
66-
}
67-
68-
if msgType != config.RemoteWriteProtoMsgV1 && msgType != config.RemoteWriteProtoMsgV2 {
69-
level.Error(logger).Log("Not accepted msg type", "msgType", msgType, "err", err)
70-
http.Error(w, err.Error(), http.StatusUnsupportedMediaType)
71-
return
72-
}
73-
74-
enc := r.Header.Get("Content-Encoding")
75-
if enc == "" {
76-
} else if enc != string(remote.SnappyBlockCompression) {
77-
err := fmt.Errorf("%v encoding (compression) is not accepted by this server; only %v is acceptable", enc, remote.SnappyBlockCompression)
78-
level.Error(logger).Log("Error decoding remote write request", "err", err)
79-
http.Error(w, err.Error(), http.StatusUnsupportedMediaType)
80-
return
81-
}
82-
83-
switch msgType {
84-
case config.RemoteWriteProtoMsgV1:
53+
handlePRW1 := func() {
8554
var req cortexpb.PreallocWriteRequest
8655
err := util.ParseProtoReader(ctx, r.Body, int(r.ContentLength), maxRecvMsgSize, &req, util.RawSnappy)
8756
if err != nil {
@@ -108,55 +77,89 @@ func Handler(remoteWrite2Enabled bool, maxRecvMsgSize int, sourceIPs *middleware
10877
}
10978
http.Error(w, string(resp.Body), int(resp.Code))
11079
}
111-
case config.RemoteWriteProtoMsgV2:
112-
if remoteWrite2Enabled {
113-
var req writev2.Request
114-
err := util.ParseProtoReader(ctx, r.Body, int(r.ContentLength), maxRecvMsgSize, &req, util.RawSnappy)
115-
if err != nil {
116-
level.Error(logger).Log("err", err.Error())
117-
http.Error(w, err.Error(), http.StatusBadRequest)
118-
return
119-
}
80+
}
12081

121-
v1Req, err := convertV2RequestToV1(&req)
122-
if err != nil {
123-
level.Error(logger).Log("err", err.Error())
124-
http.Error(w, err.Error(), http.StatusBadRequest)
125-
return
126-
}
82+
handlePRW2 := func() {
83+
var req writev2.Request
84+
err := util.ParseProtoReader(ctx, r.Body, int(r.ContentLength), maxRecvMsgSize, &req, util.RawSnappy)
85+
if err != nil {
86+
level.Error(logger).Log("err", err.Error())
87+
http.Error(w, err.Error(), http.StatusBadRequest)
88+
return
89+
}
12790

128-
v1Req.SkipLabelNameValidation = false
129-
// Current source is only API
130-
if v1Req.Source == 0 {
131-
v1Req.Source = cortexpb.API
132-
}
91+
v1Req, err := convertV2RequestToV1(&req)
92+
if err != nil {
93+
level.Error(logger).Log("err", err.Error())
94+
http.Error(w, err.Error(), http.StatusBadRequest)
95+
return
96+
}
13397

134-
if resp, err := push(ctx, &v1Req.WriteRequest); err != nil {
135-
resp, ok := httpgrpc.HTTPResponseFromError(err)
136-
setHeader(w, 0, 0, 0)
137-
if !ok {
138-
http.Error(w, err.Error(), http.StatusInternalServerError)
139-
return
140-
}
141-
if resp.GetCode()/100 == 5 {
142-
level.Error(logger).Log("msg", "push error", "err", err)
143-
} else if resp.GetCode() != http.StatusAccepted && resp.GetCode() != http.StatusTooManyRequests {
144-
level.Warn(logger).Log("msg", "push refused", "err", err)
145-
}
146-
http.Error(w, string(resp.Body), int(resp.Code))
147-
} else {
148-
setHeader(w, resp.Samples, resp.Histograms, resp.Exemplars)
98+
v1Req.SkipLabelNameValidation = false
99+
if v1Req.Source == 0 {
100+
v1Req.Source = cortexpb.API
101+
}
102+
103+
if resp, err := push(ctx, &v1Req.WriteRequest); err != nil {
104+
resp, ok := httpgrpc.HTTPResponseFromError(err)
105+
setPRW2RespHeader(w, 0, 0, 0)
106+
if !ok {
107+
http.Error(w, err.Error(), http.StatusInternalServerError)
108+
return
149109
}
110+
if resp.GetCode()/100 == 5 {
111+
level.Error(logger).Log("msg", "push error", "err", err)
112+
} else if resp.GetCode() != http.StatusAccepted && resp.GetCode() != http.StatusTooManyRequests {
113+
level.Warn(logger).Log("msg", "push refused", "err", err)
114+
}
115+
http.Error(w, string(resp.Body), int(resp.Code))
150116
} else {
151-
level.Error(logger).Log(errMsgNotEnabledPRW2)
152-
http.Error(w, errMsgNotEnabledPRW2, http.StatusUnsupportedMediaType)
117+
setPRW2RespHeader(w, resp.Samples, resp.Histograms, resp.Exemplars)
118+
}
119+
}
120+
121+
if remoteWrite2Enabled {
122+
// follow Prometheus https://github.com/prometheus/prometheus/blob/main/storage/remote/write_handler.go
123+
contentType := r.Header.Get("Content-Type")
124+
if contentType == "" {
125+
contentType = appProtoContentType
126+
}
127+
128+
msgType, err := parseProtoMsg(contentType)
129+
if err != nil {
130+
level.Error(logger).Log("Error decoding remote write request", "err", err)
131+
http.Error(w, err.Error(), http.StatusUnsupportedMediaType)
153132
return
154133
}
134+
135+
if msgType != config.RemoteWriteProtoMsgV1 && msgType != config.RemoteWriteProtoMsgV2 {
136+
level.Error(logger).Log("Not accepted msg type", "msgType", msgType, "err", err)
137+
http.Error(w, err.Error(), http.StatusUnsupportedMediaType)
138+
return
139+
}
140+
141+
enc := r.Header.Get("Content-Encoding")
142+
if enc == "" {
143+
} else if enc != string(remote.SnappyBlockCompression) {
144+
err := fmt.Errorf("%v encoding (compression) is not accepted by this server; only %v is acceptable", enc, remote.SnappyBlockCompression)
145+
level.Error(logger).Log("Error decoding remote write request", "err", err)
146+
http.Error(w, err.Error(), http.StatusUnsupportedMediaType)
147+
return
148+
}
149+
150+
switch msgType {
151+
case config.RemoteWriteProtoMsgV1:
152+
handlePRW1()
153+
case config.RemoteWriteProtoMsgV2:
154+
handlePRW2()
155+
}
156+
} else {
157+
handlePRW1()
155158
}
156159
})
157160
}
158161

159-
func setHeader(w http.ResponseWriter, samples, histograms, exemplars int64) {
162+
func setPRW2RespHeader(w http.ResponseWriter, samples, histograms, exemplars int64) {
160163
w.Header().Set(rw20WrittenSamplesHeader, strconv.FormatInt(samples, 10))
161164
w.Header().Set(rw20WrittenHistogramsHeader, strconv.FormatInt(histograms, 10))
162165
w.Header().Set(rw20WrittenExemplarsHeader, strconv.FormatInt(exemplars, 10))

pkg/util/push/push_test.go

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -98,13 +98,6 @@ func TestHandler_remoteWrite(t *testing.T) {
9898
assert.Equal(t, "1", respHeader[rw20WrittenHistogramsHeader][0])
9999
assert.Equal(t, "1", respHeader[rw20WrittenExemplarsHeader][0])
100100
})
101-
t.Run("remote write v2 with not support remote write 2.0", func(t *testing.T) {
102-
handler := Handler(false, 100000, nil, verifyWriteRequestHandler(t, cortexpb.API))
103-
req := createRequest(t, createPrometheusRemoteWriteV2Protobuf(t), true)
104-
resp := httptest.NewRecorder()
105-
handler.ServeHTTP(resp, req)
106-
assert.Equal(t, http.StatusUnsupportedMediaType, resp.Code)
107-
})
108101
}
109102

110103
func TestHandler_ContentTypeAndEncoding(t *testing.T) {

0 commit comments

Comments
 (0)