Skip to content

Commit 3952986

Browse files
committed
Implement remote write v2
Signed-off-by: SungJin1212 <tjdwls1201@gmail.com>
1 parent 3475726 commit 3952986

24 files changed

+8990
-98
lines changed

.github/workflows/test-build-deploy.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,7 @@ jobs:
144144
- integration_querier
145145
- integration_ruler
146146
- integration_query_fuzz
147+
- integration_remote_write_v2
147148
steps:
148149
- name: Upgrade golang
149150
uses: actions/setup-go@41dfa10bad2bb2ae585af6ee5bb4d7d973ad74ed # v5.1.0

.golangci.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,3 +49,4 @@ run:
4949
- integration_querier
5050
- integration_ruler
5151
- integration_query_fuzz
52+
- integration_remote_write_v2

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
* [FEATURE] Ruler: Minimize chances of missed rule group evaluations that can occur due to OOM kills, bad underlying nodes, or due to an unhealthy ruler that appears in the ring as healthy. This feature is enabled via `-ruler.enable-ha-evaluation` flag. #6129
1616
* [FEATURE] Store Gateway: Add an in-memory chunk cache. #6245
1717
* [FEATURE] Chunk Cache: Support multi level cache and add metrics. #6249
18+
* [FEATURE] Distributor/Ingester: Support remote write 2.0. It includes proto, samples, and (native) histograms ingestion. #6292
1819
* [ENHANCEMENT] S3 Bucket Client: Add a list objects version configs to configure list api object version. #6280
1920
* [ENHANCEMENT] OpenStack Swift: Add application credential configs for Openstack swift object storage backend. #6255
2021
* [ENHANCEMENT] Query Frontend: Add new query stats metrics `cortex_query_samples_scanned_total` and `cortex_query_peak_samples` to track scannedSamples and peakSample per user. #6228

integration/e2e/util.go

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"github.com/prometheus/prometheus/model/histogram"
1919
"github.com/prometheus/prometheus/model/labels"
2020
"github.com/prometheus/prometheus/prompb"
21+
writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2"
2122
"github.com/prometheus/prometheus/storage"
2223
"github.com/prometheus/prometheus/tsdb"
2324
"github.com/prometheus/prometheus/tsdb/tsdbutil"
@@ -149,6 +150,40 @@ func GenerateSeries(name string, ts time.Time, additionalLabels ...prompb.Label)
149150
return
150151
}
151152

153+
func GenerateHistogramSeriesV2(name string, ts time.Time, i uint32, floatHistogram bool, additionalLabels ...prompb.Label) (symbols []string, series []writev2.TimeSeries) {
154+
tsMillis := TimeToMilliseconds(ts)
155+
156+
st := writev2.NewSymbolTable()
157+
158+
lbs := labels.Labels{labels.Label{Name: "__name__", Value: name}}
159+
for _, lbl := range additionalLabels {
160+
lbs = append(lbs, labels.Label{Name: lbl.Name, Value: lbl.Value})
161+
}
162+
163+
var (
164+
h *histogram.Histogram
165+
fh *histogram.FloatHistogram
166+
ph writev2.Histogram
167+
)
168+
if floatHistogram {
169+
fh = tsdbutil.GenerateTestFloatHistogram(int(i))
170+
ph = writev2.FromFloatHistogram(tsMillis, fh)
171+
} else {
172+
h = tsdbutil.GenerateTestHistogram(int(i))
173+
ph = writev2.FromIntHistogram(tsMillis, h)
174+
}
175+
176+
// Generate the series
177+
series = append(series, writev2.TimeSeries{
178+
LabelsRefs: st.SymbolizeLabels(lbs, nil),
179+
Histograms: []writev2.Histogram{ph},
180+
})
181+
182+
symbols = st.Symbols()
183+
184+
return
185+
}
186+
152187
func GenerateHistogramSeries(name string, ts time.Time, i uint32, floatHistogram bool, additionalLabels ...prompb.Label) (series []prompb.TimeSeries) {
153188
tsMillis := TimeToMilliseconds(ts)
154189

@@ -188,6 +223,47 @@ func GenerateHistogramSeries(name string, ts time.Time, i uint32, floatHistogram
188223
return
189224
}
190225

226+
func GenerateSeriesV2(name string, ts time.Time, additionalLabels ...prompb.Label) (symbols []string, series []writev2.TimeSeries, vector model.Vector) {
227+
tsMillis := TimeToMilliseconds(ts)
228+
value := rand.Float64()
229+
230+
st := writev2.NewSymbolTable()
231+
lbs := labels.Labels{{Name: labels.MetricName, Value: name}}
232+
233+
for _, label := range additionalLabels {
234+
lbs = append(lbs, labels.Label{
235+
Name: label.Name,
236+
Value: label.Value,
237+
})
238+
}
239+
series = append(series, writev2.TimeSeries{
240+
// Generate the series
241+
LabelsRefs: st.SymbolizeLabels(lbs, nil),
242+
Samples: []writev2.Sample{
243+
{Value: value, Timestamp: tsMillis},
244+
},
245+
Metadata: writev2.Metadata{
246+
Type: writev2.Metadata_METRIC_TYPE_GAUGE,
247+
},
248+
})
249+
symbols = st.Symbols()
250+
251+
// Generate the expected vector when querying it
252+
metric := model.Metric{}
253+
metric[labels.MetricName] = model.LabelValue(name)
254+
for _, lbl := range additionalLabels {
255+
metric[model.LabelName(lbl.Name)] = model.LabelValue(lbl.Value)
256+
}
257+
258+
vector = append(vector, &model.Sample{
259+
Metric: metric,
260+
Value: model.SampleValue(value),
261+
Timestamp: model.Time(tsMillis),
262+
})
263+
264+
return
265+
}
266+
191267
func GenerateSeriesWithSamples(
192268
name string,
193269
startTime time.Time,

integration/e2ecortex/client.go

Lines changed: 40 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,13 @@ import (
2323
"github.com/prometheus/prometheus/model/labels"
2424
"github.com/prometheus/prometheus/model/rulefmt"
2525
"github.com/prometheus/prometheus/prompb"
26+
writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2"
2627
"github.com/prometheus/prometheus/storage"
2728
"github.com/prometheus/prometheus/storage/remote"
28-
yaml "gopkg.in/yaml.v3"
29-
3029
"go.opentelemetry.io/collector/pdata/pcommon"
3130
"go.opentelemetry.io/collector/pdata/pmetric"
3231
"go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp"
32+
yaml "gopkg.in/yaml.v3"
3333

3434
"github.com/cortexproject/cortex/pkg/ruler"
3535
"github.com/cortexproject/cortex/pkg/util/backoff"
@@ -113,6 +113,39 @@ func NewPromQueryClient(address string) (*Client, error) {
113113
return c, nil
114114
}
115115

116+
// PushV2 the input timeseries to the remote endpoint
117+
func (c *Client) PushV2(symbols []string, timeseries []writev2.TimeSeries) (*http.Response, error) {
118+
// Create write request
119+
data, err := proto.Marshal(&writev2.Request{Symbols: symbols, Timeseries: timeseries})
120+
if err != nil {
121+
return nil, err
122+
}
123+
124+
// Create HTTP request
125+
compressed := snappy.Encode(nil, data)
126+
req, err := http.NewRequest("POST", fmt.Sprintf("http://%s/api/prom/push", c.distributorAddress), bytes.NewReader(compressed))
127+
if err != nil {
128+
return nil, err
129+
}
130+
131+
req.Header.Add("Content-Encoding", "snappy")
132+
req.Header.Set("Content-Type", "application/x-protobuf;proto=io.prometheus.write.v2.Request")
133+
req.Header.Set("X-Prometheus-Remote-Write-Version", "2.0.0")
134+
req.Header.Set("X-Scope-OrgID", c.orgID)
135+
136+
ctx, cancel := context.WithTimeout(context.Background(), c.timeout)
137+
defer cancel()
138+
139+
// Execute HTTP request
140+
res, err := c.httpClient.Do(req.WithContext(ctx))
141+
if err != nil {
142+
return nil, err
143+
}
144+
145+
defer res.Body.Close()
146+
return res, nil
147+
}
148+
116149
// Push the input timeseries to the remote endpoint
117150
func (c *Client) Push(timeseries []prompb.TimeSeries) (*http.Response, error) {
118151
// Create write request
@@ -336,6 +369,11 @@ func (c *Client) Query(query string, ts time.Time) (model.Value, error) {
336369
return value, err
337370
}
338371

372+
func (c *Client) Metadata(name, limit string) (map[string][]promv1.Metadata, error) {
373+
metadata, err := c.querierClient.Metadata(context.Background(), name, limit)
374+
return metadata, err
375+
}
376+
339377
// QueryExemplars runs an exemplars query
340378
func (c *Client) QueryExemplars(query string, start, end time.Time) ([]promv1.ExemplarQueryResult, error) {
341379
ctx, cancel := context.WithTimeout(context.Background(), c.timeout)

integration/remote_write_v2_test.go

Lines changed: 211 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,211 @@
1+
//go:build integration_remote_write_v2
2+
// +build integration_remote_write_v2
3+
4+
package integration
5+
6+
import (
7+
"math/rand"
8+
"net/http"
9+
"path"
10+
"testing"
11+
"time"
12+
13+
"github.com/prometheus/common/model"
14+
"github.com/prometheus/prometheus/prompb"
15+
writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2"
16+
"github.com/prometheus/prometheus/tsdb/tsdbutil"
17+
"github.com/stretchr/testify/assert"
18+
"github.com/stretchr/testify/require"
19+
20+
"github.com/cortexproject/cortex/integration/e2e"
21+
e2edb "github.com/cortexproject/cortex/integration/e2e/db"
22+
"github.com/cortexproject/cortex/integration/e2ecortex"
23+
"github.com/cortexproject/cortex/pkg/storage/tsdb"
24+
)
25+
26+
func TestIngest(t *testing.T) {
27+
const blockRangePeriod = 5 * time.Second
28+
29+
s, err := e2e.NewScenario(networkName)
30+
require.NoError(t, err)
31+
defer s.Close()
32+
33+
// Start dependencies.
34+
consul := e2edb.NewConsulWithName("consul")
35+
require.NoError(t, s.StartAndWaitReady(consul))
36+
37+
flags := mergeFlags(
38+
AlertmanagerLocalFlags(),
39+
map[string]string{
40+
"-store.engine": blocksStorageEngine,
41+
"-blocks-storage.backend": "filesystem",
42+
"-blocks-storage.tsdb.head-compaction-interval": "4m",
43+
"-blocks-storage.bucket-store.sync-interval": "15m",
44+
"-blocks-storage.bucket-store.index-cache.backend": tsdb.IndexCacheBackendInMemory,
45+
"-blocks-storage.bucket-store.bucket-index.enabled": "true",
46+
"-querier.query-store-for-labels-enabled": "true",
47+
"-blocks-storage.tsdb.block-ranges-period": blockRangePeriod.String(),
48+
"-blocks-storage.tsdb.ship-interval": "1s",
49+
"-blocks-storage.tsdb.retention-period": ((blockRangePeriod * 2) - 1).String(),
50+
"-blocks-storage.tsdb.enable-native-histograms": "true",
51+
// Ingester.
52+
"-ring.store": "consul",
53+
"-consul.hostname": consul.NetworkHTTPEndpoint(),
54+
// Distributor.
55+
"-distributor.replication-factor": "1",
56+
// Store-gateway.
57+
"-store-gateway.sharding-enabled": "false",
58+
// alert manager
59+
"-alertmanager.web.external-url": "http://localhost/alertmanager",
60+
},
61+
)
62+
63+
// make alert manager config dir
64+
require.NoError(t, writeFileToSharedDir(s, "alertmanager_configs", []byte{}))
65+
66+
path := path.Join(s.SharedDir(), "cortex-1")
67+
68+
flags = mergeFlags(flags, map[string]string{"-blocks-storage.filesystem.dir": path})
69+
// Start Cortex replicas.
70+
cortex := e2ecortex.NewSingleBinary("cortex", flags, "")
71+
require.NoError(t, s.StartAndWaitReady(cortex))
72+
73+
// Wait until Cortex replicas have updated the ring state.
74+
require.NoError(t, cortex.WaitSumMetrics(e2e.Equals(float64(512)), "cortex_ring_tokens_total"))
75+
76+
c, err := e2ecortex.NewClient(cortex.HTTPEndpoint(), cortex.HTTPEndpoint(), "", "", "user-1")
77+
require.NoError(t, err)
78+
79+
now := time.Now()
80+
81+
// series push
82+
symbols1, series, expectedVector := e2e.GenerateSeriesV2("test_series", now, prompb.Label{Name: "job", Value: "test"}, prompb.Label{Name: "foo", Value: "bar"})
83+
res, err := c.PushV2(symbols1, series)
84+
require.NoError(t, err)
85+
require.Equal(t, 200, res.StatusCode)
86+
testPushHeader(t, res.Header, "1", "0", "0")
87+
88+
// sample
89+
result, err := c.Query("test_series", now)
90+
require.NoError(t, err)
91+
assert.Equal(t, expectedVector, result.(model.Vector))
92+
93+
// metadata
94+
metadata, err := c.Metadata("test_series", "")
95+
require.NoError(t, err)
96+
require.Equal(t, 1, len(metadata["test_series"]))
97+
98+
// histogram
99+
histogramIdx := rand.Uint32()
100+
symbols2, histogramSeries := e2e.GenerateHistogramSeriesV2("test_histogram", now, histogramIdx, false, prompb.Label{Name: "job", Value: "test"}, prompb.Label{Name: "float", Value: "false"})
101+
res, err = c.PushV2(symbols2, histogramSeries)
102+
require.NoError(t, err)
103+
require.Equal(t, 200, res.StatusCode)
104+
testPushHeader(t, res.Header, "1", "1", "0")
105+
106+
symbols3, histogramFloatSeries := e2e.GenerateHistogramSeriesV2("test_histogram", now, histogramIdx, false, prompb.Label{Name: "job", Value: "test"}, prompb.Label{Name: "float", Value: "true"})
107+
res, err = c.PushV2(symbols3, histogramFloatSeries)
108+
require.NoError(t, err)
109+
require.Equal(t, 200, res.StatusCode)
110+
testPushHeader(t, res.Header, "1", "1", "0")
111+
112+
testHistogramTimestamp := now.Add(blockRangePeriod * 2)
113+
expectedHistogram := tsdbutil.GenerateTestHistogram(int(histogramIdx))
114+
result, err = c.Query(`test_histogram`, testHistogramTimestamp)
115+
require.NoError(t, err)
116+
require.Equal(t, model.ValVector, result.Type())
117+
v := result.(model.Vector)
118+
require.Equal(t, 2, v.Len())
119+
for _, s := range v {
120+
require.NotNil(t, s.Histogram)
121+
require.Equal(t, float64(expectedHistogram.Count), float64(s.Histogram.Count))
122+
require.Equal(t, float64(expectedHistogram.Sum), float64(s.Histogram.Sum))
123+
}
124+
}
125+
126+
func TestExemplar(t *testing.T) {
127+
s, err := e2e.NewScenario(networkName)
128+
require.NoError(t, err)
129+
defer s.Close()
130+
131+
// Start dependencies.
132+
consul := e2edb.NewConsulWithName("consul")
133+
require.NoError(t, s.StartAndWaitReady(consul))
134+
135+
flags := mergeFlags(
136+
AlertmanagerLocalFlags(),
137+
map[string]string{
138+
"-store.engine": blocksStorageEngine,
139+
"-blocks-storage.backend": "filesystem",
140+
"-blocks-storage.tsdb.head-compaction-interval": "4m",
141+
"-blocks-storage.bucket-store.sync-interval": "15m",
142+
"-blocks-storage.bucket-store.index-cache.backend": tsdb.IndexCacheBackendInMemory,
143+
"-blocks-storage.bucket-store.bucket-index.enabled": "true",
144+
"-querier.query-store-for-labels-enabled": "true",
145+
"-blocks-storage.tsdb.ship-interval": "1s",
146+
"-blocks-storage.tsdb.enable-native-histograms": "true",
147+
// Ingester.
148+
"-ring.store": "consul",
149+
"-consul.hostname": consul.NetworkHTTPEndpoint(),
150+
"-ingester.max-exemplars": "100",
151+
// Distributor.
152+
"-distributor.replication-factor": "1",
153+
// Store-gateway.
154+
"-store-gateway.sharding-enabled": "false",
155+
// alert manager
156+
"-alertmanager.web.external-url": "http://localhost/alertmanager",
157+
},
158+
)
159+
160+
// make alert manager config dir
161+
require.NoError(t, writeFileToSharedDir(s, "alertmanager_configs", []byte{}))
162+
163+
path := path.Join(s.SharedDir(), "cortex-1")
164+
165+
flags = mergeFlags(flags, map[string]string{"-blocks-storage.filesystem.dir": path})
166+
// Start Cortex replicas.
167+
cortex := e2ecortex.NewSingleBinary("cortex", flags, "")
168+
require.NoError(t, s.StartAndWaitReady(cortex))
169+
170+
// Wait until Cortex replicas have updated the ring state.
171+
require.NoError(t, cortex.WaitSumMetrics(e2e.Equals(float64(512)), "cortex_ring_tokens_total"))
172+
173+
c, err := e2ecortex.NewClient(cortex.HTTPEndpoint(), cortex.HTTPEndpoint(), "", "", "user-1")
174+
require.NoError(t, err)
175+
176+
now := time.Now()
177+
tsMillis := e2e.TimeToMilliseconds(now)
178+
179+
symbols := []string{"", "__name__", "test_metric", "b", "c", "baz", "qux", "d", "e", "foo", "bar", "f", "g", "h", "i", "Test gauge for test purposes", "Maybe op/sec who knows (:", "Test counter for test purposes"}
180+
timeseries := []writev2.TimeSeries{
181+
{
182+
LabelsRefs: []uint32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, // Symbolized writeRequestFixture.Timeseries[0].Labels
183+
Metadata: writev2.Metadata{
184+
Type: writev2.Metadata_METRIC_TYPE_COUNTER, // writeV2RequestSeries1Metadata.Type.
185+
186+
HelpRef: 15, // Symbolized writeV2RequestSeries1Metadata.Help.
187+
UnitRef: 16, // Symbolized writeV2RequestSeries1Metadata.Unit.
188+
},
189+
Samples: []writev2.Sample{{Value: 1, Timestamp: tsMillis}},
190+
Exemplars: []writev2.Exemplar{{LabelsRefs: []uint32{11, 12}, Value: 1, Timestamp: tsMillis}},
191+
},
192+
}
193+
194+
res, err := c.PushV2(symbols, timeseries)
195+
require.NoError(t, err)
196+
require.Equal(t, 200, res.StatusCode)
197+
testPushHeader(t, res.Header, "1", "0", "1")
198+
199+
start := time.Now().Add(-time.Minute)
200+
end := now.Add(time.Minute)
201+
202+
exemplars, err := c.QueryExemplars("test_metric", start, end)
203+
require.NoError(t, err)
204+
require.Equal(t, 1, len(exemplars))
205+
}
206+
207+
func testPushHeader(t *testing.T, header http.Header, expectedSamples, expectedHistogram, expectedExemplars string) {
208+
require.Equal(t, expectedSamples, header.Get("X-Prometheus-Remote-Write-Samples-Written"))
209+
require.Equal(t, expectedHistogram, header.Get("X-Prometheus-Remote-Write-Histograms-Written"))
210+
require.Equal(t, expectedExemplars, header.Get("X-Prometheus-Remote-Write-Exemplars-Written"))
211+
}

0 commit comments

Comments
 (0)