Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for Prometheus remote read and write API. #8784

Merged
merged 8 commits into from
Sep 7, 2017
Prev Previous commit
Move prometheus remote proto to remote package
  • Loading branch information
pauldix committed Sep 7, 2017
commit b1dcdaa099e1cbcc8364c3109268879dc353357a
25 changes: 12 additions & 13 deletions prometheus/converters.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,9 @@ import (

"github.com/influxdata/influxdb/influxql"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/prometheus/remote"
)

//go:generate protoc -I$GOPATH/src -I. --gogofaster_out=. remote.proto

const (
// measurementName is where all prometheus time series go to
measurementName = "_"
Expand All @@ -24,7 +23,7 @@ var ErrNaNDropped = errors.New("dropped NaN from Prometheus since they are not s

// WriteRequestToPoints converts a Prometheus remote write request of time series and their
// samples into Points that can be written into Influx
func WriteRequestToPoints(req *WriteRequest) ([]models.Point, error) {
func WriteRequestToPoints(req *remote.WriteRequest) ([]models.Point, error) {
var maxPoints int
for _, ts := range req.Timeseries {
maxPoints += len(ts.Samples)
Expand Down Expand Up @@ -62,7 +61,7 @@ func WriteRequestToPoints(req *WriteRequest) ([]models.Point, error) {

// ReadRequestToInfluxQLQuery converts a Prometheus remote read request to an equivalent InfluxQL
// query that will return the requested data when executed
func ReadRequestToInfluxQLQuery(req *ReadRequest, db, rp string) (*influxql.Query, error) {
func ReadRequestToInfluxQLQuery(req *remote.ReadRequest, db, rp string) (*influxql.Query, error) {
if len(req.Queries) != 1 {
return nil, errors.New("Prometheus read endpoint currently only supports one query at a time")
}
Expand Down Expand Up @@ -92,16 +91,16 @@ func ReadRequestToInfluxQLQuery(req *ReadRequest, db, rp string) (*influxql.Quer
}

// condFromMatcher converts a Prometheus LabelMatcher into an equivalent InfluxQL BinaryExpr
func condFromMatcher(m *LabelMatcher) (*influxql.BinaryExpr, error) {
func condFromMatcher(m *remote.LabelMatcher) (*influxql.BinaryExpr, error) {
var op influxql.Token
switch m.Type {
case MatchType_EQUAL:
case remote.MatchType_EQUAL:
op = influxql.EQ
case MatchType_NOT_EQUAL:
case remote.MatchType_NOT_EQUAL:
op = influxql.NEQ
case MatchType_REGEX_MATCH:
case remote.MatchType_REGEX_MATCH:
op = influxql.EQREGEX
case MatchType_REGEX_NO_MATCH:
case remote.MatchType_REGEX_NO_MATCH:
op = influxql.NEQREGEX
default:
return nil, fmt.Errorf("unknown match type %v", m.Type)
Expand All @@ -118,7 +117,7 @@ func condFromMatcher(m *LabelMatcher) (*influxql.BinaryExpr, error) {
// into an equivalent influxql.BinaryExpr. This assume a schema that is written via the Prometheus
// remote write endpoint, which uses a measurement name of _ and a field name of f64. Tags and labels
// are kept equivalent.
func condFromMatchers(q *Query, matchers []*LabelMatcher) (*influxql.BinaryExpr, error) {
func condFromMatchers(q *remote.Query, matchers []*remote.LabelMatcher) (*influxql.BinaryExpr, error) {
if len(matchers) > 0 {
lhs, err := condFromMatcher(matchers[0])
if err != nil {
Expand Down Expand Up @@ -152,8 +151,8 @@ func condFromMatchers(q *Query, matchers []*LabelMatcher) (*influxql.BinaryExpr,
}

// TagsToLabelPairs converts a map of Influx tags into a slice of Prometheus label pairs
func TagsToLabelPairs(tags map[string]string) []*LabelPair {
pairs := make([]*LabelPair, 0, len(tags))
func TagsToLabelPairs(tags map[string]string) []*remote.LabelPair {
pairs := make([]*remote.LabelPair, 0, len(tags))
for k, v := range tags {
if v == "" {
// If we select metrics with different sets of labels names,
Expand All @@ -164,7 +163,7 @@ func TagsToLabelPairs(tags map[string]string) []*LabelPair {
// to make the result correct.
continue
}
pairs = append(pairs, &LabelPair{
pairs = append(pairs, &remote.LabelPair{
Name: k,
Value: v,
})
Expand Down
3 changes: 3 additions & 0 deletions prometheus/remote/generate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package remote

//go:generate protoc -I$GOPATH/src -I. --gogofaster_out=. remote.proto
86 changes: 43 additions & 43 deletions prometheus/remote.pb.go → prometheus/remote/remote.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion prometheus/remote.proto → prometheus/remote/remote.proto
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

syntax = "proto3";

package prometheus; // change package from remote to prometheus
package remote;

message Sample {
double value = 1;
Expand Down
13 changes: 7 additions & 6 deletions services/httpd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/influxdata/influxdb/monitor"
"github.com/influxdata/influxdb/monitor/diagnostics"
"github.com/influxdata/influxdb/prometheus"
"github.com/influxdata/influxdb/prometheus/remote"
"github.com/influxdata/influxdb/query"
"github.com/influxdata/influxdb/services/meta"
"github.com/influxdata/influxdb/tsdb"
Expand Down Expand Up @@ -855,7 +856,7 @@ func (h *Handler) servePromWrite(w http.ResponseWriter, r *http.Request, user me
}

// Convert the Prometheus remote write request to Influx Points
var req prometheus.WriteRequest
var req remote.WriteRequest
if err := proto.Unmarshal(reqBuf, &req); err != nil {
h.httpError(w, err.Error(), http.StatusBadRequest)
return
Expand Down Expand Up @@ -923,7 +924,7 @@ func (h *Handler) servePromRead(w http.ResponseWriter, r *http.Request, user met
return
}

var req prometheus.ReadRequest
var req remote.ReadRequest
if err := proto.Unmarshal(reqBuf, &req); err != nil {
h.httpError(w, err.Error(), http.StatusBadRequest)
return
Expand Down Expand Up @@ -990,8 +991,8 @@ func (h *Handler) servePromRead(w http.ResponseWriter, r *http.Request, user met
// Execute query.
results := h.QueryExecutor.ExecuteQuery(q, opts, closing)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This entire section here should probably be refactored because it is about 95% in common with serveQuery, but that doesn't have to be done now. I think #8725 makes it a lot easier.

So no action on this, but I just wanted to get it on the record.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, the same is true of the servePromWrite method, but I wanted to get this in without modifying any of the existing code paths to make it a very low risk merge.


resp := &prometheus.ReadResponse{
Results: []*prometheus.QueryResult{{}},
resp := &remote.ReadResponse{
Results: []*remote.QueryResult{{}},
}

// pull all results from the channel
Expand All @@ -1003,7 +1004,7 @@ func (h *Handler) servePromRead(w http.ResponseWriter, r *http.Request, user met

// read the series data and convert into Prometheus samples
for _, s := range r.Series {
ts := &prometheus.TimeSeries{
ts := &remote.TimeSeries{
Labels: prometheus.TagsToLabelPairs(s.Tags),
}

Expand All @@ -1018,7 +1019,7 @@ func (h *Handler) servePromRead(w http.ResponseWriter, r *http.Request, user met
h.httpError(w, fmt.Sprintf("value %v wasn't a float64", v[1]), http.StatusBadRequest)
}
timestamp := t.UnixNano() / int64(time.Millisecond) / int64(time.Nanosecond)
ts.Samples = append(ts.Samples, &prometheus.Sample{
ts.Samples = append(ts.Samples, &remote.Sample{
TimestampMs: timestamp,
Value: val,
})
Expand Down
30 changes: 15 additions & 15 deletions services/httpd/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
"github.com/influxdata/influxdb/influxql"
"github.com/influxdata/influxdb/internal"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/prometheus"
"github.com/influxdata/influxdb/prometheus/remote"
"github.com/influxdata/influxdb/query"
"github.com/influxdata/influxdb/services/httpd"
"github.com/influxdata/influxdb/services/meta"
Expand Down Expand Up @@ -531,14 +531,14 @@ func TestHandler_Query_CloseNotify(t *testing.T) {

// Ensure the prometheus remote write works
func TestHandler_PromWrite(t *testing.T) {
req := &prometheus.WriteRequest{
Timeseries: []*prometheus.TimeSeries{
req := &remote.WriteRequest{
Timeseries: []*remote.TimeSeries{
{
Labels: []*prometheus.LabelPair{
Labels: []*remote.LabelPair{
{Name: "host", Value: "a"},
{Name: "region", Value: "west"},
},
Samples: []*prometheus.Sample{
Samples: []*remote.Sample{
{TimestampMs: 1, Value: 1.2},
{TimestampMs: 2, Value: math.NaN()},
},
Expand Down Expand Up @@ -594,13 +594,13 @@ func TestHandler_PromWrite(t *testing.T) {
// Ensure Prometheus remote read requests are converted to the correct InfluxQL query and
// data is returned
func TestHandler_PromRead(t *testing.T) {
req := &prometheus.ReadRequest{
Queries: []*prometheus.Query{{
Matchers: []*prometheus.LabelMatcher{
{Type: prometheus.MatchType_EQUAL, Name: "eq", Value: "a"},
{Type: prometheus.MatchType_NOT_EQUAL, Name: "neq", Value: "b"},
{Type: prometheus.MatchType_REGEX_MATCH, Name: "regex", Value: "c"},
{Type: prometheus.MatchType_REGEX_NO_MATCH, Name: "neqregex", Value: "d"},
req := &remote.ReadRequest{
Queries: []*remote.Query{{
Matchers: []*remote.LabelMatcher{
{Type: remote.MatchType_EQUAL, Name: "eq", Value: "a"},
{Type: remote.MatchType_NOT_EQUAL, Name: "neq", Value: "b"},
{Type: remote.MatchType_REGEX_MATCH, Name: "regex", Value: "c"},
{Type: remote.MatchType_REGEX_NO_MATCH, Name: "neqregex", Value: "d"},
},
StartTimestampMs: 1,
EndTimestampMs: 2,
Expand Down Expand Up @@ -642,13 +642,13 @@ func TestHandler_PromRead(t *testing.T) {
t.Fatal(err.Error())
}

var resp prometheus.ReadResponse
var resp remote.ReadResponse
if err := proto.Unmarshal(reqBuf, &resp); err != nil {
t.Fatal(err.Error())
}

expLabels := []*prometheus.LabelPair{{Name: "foo", Value: "bar"}}
expSamples := []*prometheus.Sample{{TimestampMs: 23000, Value: 1.2}}
expLabels := []*remote.LabelPair{{Name: "foo", Value: "bar"}}
expSamples := []*remote.Sample{{TimestampMs: 23000, Value: 1.2}}

ts := resp.Results[0].Timeseries[0]

Expand Down