Skip to content

Commit 04e1821

Browse files
committed
Add v1 test porting to v2
Signed-off-by: SungJin1212 <tjdwls1201@gmail.com>
1 parent a417e1f commit 04e1821

File tree

10 files changed

+3846
-710
lines changed

10 files changed

+3846
-710
lines changed

integration/e2e/util.go

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -311,6 +311,43 @@ func GenerateSeriesV2WithSamples(
311311
}
312312
}
313313

314+
func GenerateSeriesWithSamplesV2(
315+
st *writev2.SymbolsTable,
316+
name string,
317+
startTime time.Time,
318+
scrapeInterval time.Duration,
319+
startValue int,
320+
numSamples int,
321+
additionalLabels ...prompb.Label,
322+
) (series cortexpbv2.TimeSeries) {
323+
tsMillis := TimeToMilliseconds(startTime)
324+
durMillis := scrapeInterval.Milliseconds()
325+
326+
lbls := labels.Labels{{Name: labels.MetricName, Value: name}}
327+
st.Symbolize("__name__")
328+
st.Symbolize(name)
329+
for _, label := range additionalLabels {
330+
st.Symbolize(label.Name)
331+
st.Symbolize(label.Value)
332+
lbls = append(lbls, labels.Label{Name: label.Name, Value: label.Value})
333+
}
334+
335+
startTMillis := tsMillis
336+
samples := make([]cortexpbv2.Sample, numSamples)
337+
for i := 0; i < numSamples; i++ {
338+
samples[i] = cortexpbv2.Sample{
339+
Timestamp: startTMillis,
340+
Value: float64(i + startValue),
341+
}
342+
startTMillis += durMillis
343+
}
344+
345+
return cortexpbv2.TimeSeries{
346+
LabelsRefs: cortexpbv2.GetLabelsRefsFromLabels(st.Symbols(), lbls),
347+
Samples: samples,
348+
}
349+
}
350+
314351
func GenerateSeriesWithSamples(
315352
name string,
316353
startTime time.Time,

integration/query_fuzz_test.go

Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,14 @@ import (
1616
"testing"
1717
"time"
1818

19+
"github.com/cortexproject/cortex/pkg/cortexpbv2"
1920
"github.com/cortexproject/promqlsmith"
2021
"github.com/google/go-cmp/cmp"
2122
"github.com/google/go-cmp/cmp/cmpopts"
2223
"github.com/prometheus/common/model"
2324
"github.com/prometheus/prometheus/model/labels"
2425
"github.com/prometheus/prometheus/prompb"
26+
writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2"
2527
"github.com/prometheus/prometheus/promql/parser"
2628
"github.com/stretchr/testify/require"
2729
"github.com/thanos-io/thanos/pkg/block"
@@ -52,6 +54,166 @@ func init() {
5254
}
5355
}
5456

57+
func TestRemoteWriteV1AndV2QueryResultFuzz(t *testing.T) {
58+
s, err := e2e.NewScenario(networkName)
59+
require.NoError(t, err)
60+
defer s.Close()
61+
62+
// Start dependencies.
63+
consul1 := e2edb.NewConsulWithName("consul1")
64+
consul2 := e2edb.NewConsulWithName("consul2")
65+
require.NoError(t, s.StartAndWaitReady(consul1, consul2))
66+
67+
flags := mergeFlags(
68+
AlertmanagerLocalFlags(),
69+
map[string]string{
70+
"-store.engine": blocksStorageEngine,
71+
"-blocks-storage.backend": "filesystem",
72+
"-blocks-storage.tsdb.head-compaction-interval": "4m",
73+
"-blocks-storage.tsdb.block-ranges-period": "2h",
74+
"-blocks-storage.tsdb.ship-interval": "1h",
75+
"-blocks-storage.bucket-store.sync-interval": "15m",
76+
"-blocks-storage.tsdb.retention-period": "2h",
77+
"-blocks-storage.bucket-store.index-cache.backend": tsdb.IndexCacheBackendInMemory,
78+
"-querier.query-store-for-labels-enabled": "true",
79+
// Ingester.
80+
"-ring.store": "consul",
81+
// Distributor.
82+
"-distributor.replication-factor": "1",
83+
// Store-gateway.
84+
"-store-gateway.sharding-enabled": "false",
85+
// alert manager
86+
"-alertmanager.web.external-url": "http://localhost/alertmanager",
87+
},
88+
)
89+
90+
// make alert manager config dir
91+
require.NoError(t, writeFileToSharedDir(s, "alertmanager_configs", []byte{}))
92+
93+
path1 := path.Join(s.SharedDir(), "cortex-1")
94+
path2 := path.Join(s.SharedDir(), "cortex-2")
95+
96+
flags1 := mergeFlags(flags, map[string]string{
97+
"-blocks-storage.filesystem.dir": path1,
98+
"-consul.hostname": consul1.NetworkHTTPEndpoint(),
99+
})
100+
flags2 := mergeFlags(flags, map[string]string{
101+
"-blocks-storage.filesystem.dir": path2,
102+
"-consul.hostname": consul2.NetworkHTTPEndpoint(),
103+
})
104+
// Start Cortex replicas.
105+
cortex1 := e2ecortex.NewSingleBinary("cortex-1", flags1, "")
106+
cortex2 := e2ecortex.NewSingleBinary("cortex-2", flags2, "")
107+
require.NoError(t, s.StartAndWaitReady(cortex1, cortex2))
108+
109+
// Wait until Cortex replicas have updated the ring state.
110+
require.NoError(t, cortex1.WaitSumMetrics(e2e.Equals(float64(512)), "cortex_ring_tokens_total"))
111+
require.NoError(t, cortex2.WaitSumMetrics(e2e.Equals(float64(512)), "cortex_ring_tokens_total"))
112+
113+
c1, err := e2ecortex.NewClient(cortex1.HTTPEndpoint(), cortex1.HTTPEndpoint(), "", "", "user-1")
114+
require.NoError(t, err)
115+
c2, err := e2ecortex.NewClient(cortex2.HTTPEndpoint(), cortex2.HTTPEndpoint(), "", "", "user-1")
116+
require.NoError(t, err)
117+
118+
now := time.Now()
119+
// Push some series to Cortex.
120+
start := now.Add(-time.Minute * 60)
121+
scrapeInterval := 30 * time.Second
122+
123+
numSeries := 10
124+
numSamples := 120
125+
serieses := make([]prompb.TimeSeries, numSeries)
126+
seriesesV2 := make([]cortexpbv2.TimeSeries, numSeries)
127+
lbls := make([]labels.Labels, numSeries)
128+
129+
// make v1 series
130+
for i := 0; i < numSeries; i++ {
131+
series := e2e.GenerateSeriesWithSamples("test_series", start, scrapeInterval, i*numSamples, numSamples, prompb.Label{Name: "job", Value: "test"}, prompb.Label{Name: "series", Value: strconv.Itoa(i)})
132+
serieses[i] = series
133+
134+
builder := labels.NewBuilder(labels.EmptyLabels())
135+
for _, lbl := range series.Labels {
136+
builder.Set(lbl.Name, lbl.Value)
137+
}
138+
lbls[i] = builder.Labels()
139+
}
140+
// make v2 series
141+
st := writev2.NewSymbolTable()
142+
for i := 0; i < numSeries; i++ {
143+
series := e2e.GenerateSeriesWithSamplesV2(&st, "test_series", start, scrapeInterval, i*numSamples, numSamples, prompb.Label{Name: "job", Value: "test"}, prompb.Label{Name: "series", Value: strconv.Itoa(i)})
144+
seriesesV2[i] = series
145+
}
146+
147+
res, err := c1.Push(serieses)
148+
require.NoError(t, err)
149+
require.Equal(t, 200, res.StatusCode)
150+
151+
res, err = c2.PushV2(st.Symbols(), seriesesV2)
152+
require.NoError(t, err)
153+
require.Equal(t, 200, res.StatusCode)
154+
155+
waitUntilReady(t, context.Background(), c1, c2, `{job="test"}`, start, now)
156+
157+
rnd := rand.New(rand.NewSource(now.Unix()))
158+
opts := []promqlsmith.Option{
159+
promqlsmith.WithEnableOffset(true),
160+
promqlsmith.WithEnableAtModifier(true),
161+
promqlsmith.WithEnabledFunctions(enabledFunctions),
162+
}
163+
ps := promqlsmith.New(rnd, lbls, opts...)
164+
165+
type testCase struct {
166+
query string
167+
res1, res2 model.Value
168+
err1, err2 error
169+
}
170+
171+
queryStart := now.Add(-time.Minute * 50)
172+
queryEnd := now.Add(-time.Minute * 10)
173+
cases := make([]*testCase, 0, 500)
174+
testRun := 500
175+
var (
176+
expr parser.Expr
177+
query string
178+
)
179+
for i := 0; i < testRun; i++ {
180+
for {
181+
expr = ps.WalkRangeQuery()
182+
query = expr.Pretty(0)
183+
// timestamp is a known function that break with disable chunk trimming.
184+
if isValidQuery(expr, 5) && !strings.Contains(query, "timestamp") {
185+
break
186+
}
187+
}
188+
res1, err1 := c1.QueryRange(query, queryStart, queryEnd, scrapeInterval)
189+
res2, err2 := c2.QueryRange(query, queryStart, queryEnd, scrapeInterval)
190+
cases = append(cases, &testCase{
191+
query: query,
192+
res1: res1,
193+
res2: res2,
194+
err1: err1,
195+
err2: err2,
196+
})
197+
}
198+
199+
failures := 0
200+
for i, tc := range cases {
201+
qt := "range query"
202+
if tc.err1 != nil || tc.err2 != nil {
203+
if !cmp.Equal(tc.err1, tc.err2) {
204+
t.Logf("case %d error mismatch.\n%s: %s\nerr1: %v\nerr2: %v\n", i, qt, tc.query, tc.err1, tc.err2)
205+
failures++
206+
}
207+
} else if !cmp.Equal(tc.res1, tc.res2, comparer) {
208+
t.Logf("case %d results mismatch.\n%s: %s\nres1: %s\nres2: %s\n", i, qt, tc.query, tc.res1.String(), tc.res2.String())
209+
failures++
210+
}
211+
}
212+
if failures > 0 {
213+
require.Failf(t, "finished query fuzzing tests", "%d test cases failed", failures)
214+
}
215+
}
216+
55217
func TestDisableChunkTrimmingFuzz(t *testing.T) {
56218
s, err := e2e.NewScenario(networkName)
57219
require.NoError(t, err)

pkg/cortexpbv2/compatv2.go

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,27 @@ package cortexpbv2
22

33
import (
44
"github.com/prometheus/prometheus/model/labels"
5+
writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2"
56

67
"github.com/cortexproject/cortex/pkg/cortexpb"
78
)
89

910
// ToWriteRequestV2 converts matched slices of Labels, Samples, and Histograms into a WriteRequest proto.
10-
func ToWriteRequestV2(lbls []labels.Labels, symbols []string, samples []Sample, histograms []Histogram, metadata []Metadata, source WriteRequest_SourceEnum) *WriteRequest {
11+
func ToWriteRequestV2(lbls []labels.Labels, samples []Sample, histograms []Histogram, metadata []Metadata, source WriteRequest_SourceEnum, additionalSymbols ...string) *WriteRequest {
12+
st := writev2.NewSymbolTable()
13+
for _, lbl := range lbls {
14+
lbl.Range(func(l labels.Label) {
15+
st.Symbolize(l.Name)
16+
st.Symbolize(l.Value)
17+
})
18+
}
19+
20+
for _, s := range additionalSymbols {
21+
st.Symbolize(s)
22+
}
23+
24+
symbols := st.Symbols()
25+
1126
req := &WriteRequest{
1227
Symbols: symbols,
1328
Source: source,

pkg/distributor/distributor.go

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -768,9 +768,18 @@ func (d *Distributor) sendV2(ctx context.Context, symbols []string, ingester rin
768768
d.ingesterAppendFailures.WithLabelValues(id, typeSamples, getErrorStatus(err)).Inc()
769769
}
770770

771-
d.ingesterAppends.WithLabelValues(id, typeMetadata).Inc()
772-
if err != nil {
773-
d.ingesterAppendFailures.WithLabelValues(id, typeMetadata, getErrorStatus(err)).Inc()
771+
metadataAppend := false
772+
for _, ts := range timeseries {
773+
if ts.Metadata.Type != cortexpbv2.METRIC_TYPE_UNSPECIFIED {
774+
metadataAppend = true
775+
break
776+
}
777+
}
778+
if metadataAppend {
779+
d.ingesterAppends.WithLabelValues(id, typeMetadata).Inc()
780+
if err != nil {
781+
d.ingesterAppendFailures.WithLabelValues(id, typeMetadata, getErrorStatus(err)).Inc()
782+
}
774783
}
775784
}
776785

@@ -926,7 +935,7 @@ func (d *Distributor) PushV2(ctx context.Context, req *cortexpbv2.WriteRequest)
926935
// Return a 429 here to tell the client it is going too fast.
927936
// Client may discard the data or slow down and re-send.
928937
// Prometheus v2.26 added a remote-write option 'retry_on_http_429'.
929-
return nil, httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (%v) exceeded while adding %d samples", d.ingestionRateLimiter.Limit(now, userID), totalSamples)
938+
return nil, httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (%v) exceeded while adding %d samples and %d metadata", d.ingestionRateLimiter.Limit(now, userID), totalSamples, validatedMetadatas)
930939
}
931940

932941
// totalN included samples and metadata. Ingester follows this pattern when computing its ingestion rate.

0 commit comments

Comments
 (0)