Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enables Series API in loki #1419

Merged
merged 13 commits into from
Dec 20, 2019
76 changes: 76 additions & 0 deletions docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,15 @@ The HTTP API includes the following endpoints:
- [`GET /loki/api/v1/labels`](#get-lokiapiv1labels)
- [`GET /loki/api/v1/label/<name>/values`](#get-lokiapiv1labelnamevalues)
- [`GET /loki/api/v1/tail`](#get-lokiapiv1tail)
- [`GET /loki/api/v1/series`](#series)
- [`POST /loki/api/v1/series`](#series)
- [`POST /loki/api/v1/push`](#post-lokiapiv1push)
- [`GET /api/prom/tail`](#get-apipromtail)
- [`GET /api/prom/query`](#get-apipromquery)
- [`GET /api/prom/label`](#get-apipromlabel)
- [`GET /api/prom/label/<name>/values`](#get-apipromlabelnamevalues)
- [`GET /api/prom/series`](#series)
- [`POST /api/prom/series`](#series)
- [`POST /api/prom/push`](#post-apiprompush)
- [`GET /ready`](#get-ready)
- [`POST /flush`](#post-flush)
Expand Down Expand Up @@ -743,3 +747,75 @@ In microservices mode, the `/flush` endpoint is exposed by the ingester.
for a list of exported metrics.

In microservices mode, the `/metrics` endpoint is exposed by all components.

## Series

The Series API is available under the following:
- `GET /loki/api/v1/series`
- `POST /loki/api/v1/series`
- `GET /api/prom/series`
- `POST /api/prom/series`

This endpoint returns the list of time series that match a certain label set.

URL query parameters:

- `match[]=<series_selector>`: Repeated log stream selector argument that selects the streams to return. At least one `match[]` argument must be provided.
- `start=<nanosecond Unix epoch>`: Start timestamp.
- `end=<nanosecond Unix epoch>`: End timestamp.

You can URL-encode these parameters directly in the request body by using the POST method and `Content-Type: application/x-www-form-urlencoded` header. This is useful when specifying a large or dynamic number of stream selectors that may breach server-side URL character limits.

In microservices mode, these endpoints are exposed by the querier.

### Examples

``` bash
$ curl -s "http://localhost:3100/loki/api/v1/series" --data-urlencode 'match={container_name=~"prometheus.*", component="server"}' --data-urlencode 'match={app="loki"}' | jq '.'
{
"status": "success",
"data": [
{
"container_name": "loki",
"app": "loki",
"stream": "stderr",
"filename": "/var/log/pods/default_loki-stack-0_50835643-1df0-11ea-ba79-025000000001/loki/0.log",
"name": "loki",
"job": "default/loki",
"controller_revision_hash": "loki-stack-757479754d",
"statefulset_kubernetes_io_pod_name": "loki-stack-0",
"release": "loki-stack",
"namespace": "default",
"instance": "loki-stack-0"
},
{
"chart": "prometheus-9.3.3",
"container_name": "prometheus-server-configmap-reload",
"filename": "/var/log/pods/default_loki-stack-prometheus-server-696cc9ddff-87lmq_507b1db4-1df0-11ea-ba79-025000000001/prometheus-server-configmap-reload/0.log",
"instance": "loki-stack-prometheus-server-696cc9ddff-87lmq",
"pod_template_hash": "696cc9ddff",
"app": "prometheus",
"component": "server",
"heritage": "Tiller",
"job": "default/prometheus",
"namespace": "default",
"release": "loki-stack",
"stream": "stderr"
},
{
"app": "prometheus",
"component": "server",
"filename": "/var/log/pods/default_loki-stack-prometheus-server-696cc9ddff-87lmq_507b1db4-1df0-11ea-ba79-025000000001/prometheus-server/0.log",
"release": "loki-stack",
"namespace": "default",
"pod_template_hash": "696cc9ddff",
"stream": "stderr",
"chart": "prometheus-9.3.3",
"container_name": "prometheus-server",
"heritage": "Tiller",
"instance": "loki-stack-prometheus-server-696cc9ddff-87lmq",
"job": "default/prometheus"
}
]
}
```
11 changes: 11 additions & 0 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,17 @@ func (i *Ingester) Label(ctx context.Context, req *logproto.LabelRequest) (*logp
return instance.Label(ctx, req)
}

// Series queries the ingester for log stream identifiers (label sets) matching a set of matchers
func (i *Ingester) Series(ctx context.Context, req *logproto.SeriesRequest) (*logproto.SeriesResponse, error) {
instanceID, err := user.ExtractOrgID(ctx)
if err != nil {
return nil, err
}

instance := i.getOrCreateInstance(instanceID)
return instance.Series(ctx, req)
}

// Check implements grpc_health_v1.HealthCheck.
func (*Ingester) Check(ctx context.Context, req *grpc_health_v1.HealthCheckRequest) (*grpc_health_v1.HealthCheckResponse, error) {
return &grpc_health_v1.HealthCheckResponse{Status: grpc_health_v1.HealthCheckResponse_SERVING}, nil
Expand Down
85 changes: 85 additions & 0 deletions pkg/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,91 @@ func TestIngester(t *testing.T) {
require.Len(t, result.resps, 1)
require.Len(t, result.resps[0].Streams, 1)
require.Equal(t, `{bar="baz2", foo="bar"}`, result.resps[0].Streams[0].Labels)

// Series

// empty matchers
_, err = i.Series(ctx, &logproto.SeriesRequest{
Start: time.Unix(0, 0),
End: time.Unix(1, 0),
})
require.Error(t, err)

// wrong matchers fmt
_, err = i.Series(ctx, &logproto.SeriesRequest{
Start: time.Unix(0, 0),
End: time.Unix(1, 0),
Groups: []string{`{a="b`},
})
require.Error(t, err)

// no selectors
_, err = i.Series(ctx, &logproto.SeriesRequest{
Start: time.Unix(0, 0),
End: time.Unix(1, 0),
Groups: []string{`{foo="bar"}`, `{}`},
})
require.Error(t, err)

// foo=bar
resp, err := i.Series(ctx, &logproto.SeriesRequest{
Start: time.Unix(0, 0),
End: time.Unix(1, 0),
Groups: []string{`{foo="bar"}`},
})
require.Nil(t, err)
require.ElementsMatch(t, []logproto.SeriesIdentifier{
{
Labels: map[string]string{
"foo": "bar",
"bar": "baz1",
},
},
{
Labels: map[string]string{
"foo": "bar",
"bar": "baz2",
},
},
}, resp.GetSeries())

// foo=bar, bar=~"baz[2-9]"
resp, err = i.Series(ctx, &logproto.SeriesRequest{
Start: time.Unix(0, 0),
End: time.Unix(1, 0),
Groups: []string{`{foo="bar", bar=~"baz[2-9]"}`},
})
require.Nil(t, err)
require.ElementsMatch(t, []logproto.SeriesIdentifier{
{
Labels: map[string]string{
"foo": "bar",
"bar": "baz2",
},
},
}, resp.GetSeries())

// foo=bar, bar=~"baz[2-9]" in different groups should OR the results
resp, err = i.Series(ctx, &logproto.SeriesRequest{
Start: time.Unix(0, 0),
End: time.Unix(1, 0),
Groups: []string{`{foo="bar"}`, `{bar=~"baz[2-9]"}`},
})
require.Nil(t, err)
require.ElementsMatch(t, []logproto.SeriesIdentifier{
{
Labels: map[string]string{
"foo": "bar",
"bar": "baz1",
},
},
{
Labels: map[string]string{
"foo": "bar",
"bar": "baz2",
},
},
}, resp.GetSeries())
}

func TestIngesterStreamLimitExceeded(t *testing.T) {
Expand Down
67 changes: 59 additions & 8 deletions pkg/ingester/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/grafana/loki/pkg/chunkenc"
"github.com/grafana/loki/pkg/helpers"
"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/loghttp"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/util"
Expand Down Expand Up @@ -190,7 +191,20 @@ func (i *instance) Query(req *logproto.QueryRequest, queryServer logproto.Querie
if err != nil {
return err
}
iters, err := i.lookupStreams(queryServer.Context(), req, expr.Matchers(), filter)

var iters []iter.EntryIterator

err = i.forMatchingStreams(
expr.Matchers(),
func(stream *stream) error {
iter, err := stream.Iterator(queryServer.Context(), req.Start, req.End, req.Direction, filter)
if err != nil {
return err
}
iters = append(iters, iter)
return nil
},
)
if err != nil {
return err
}
Expand Down Expand Up @@ -221,32 +235,69 @@ func (i *instance) Label(_ context.Context, req *logproto.LabelRequest) (*logpro
}, nil
}

func (i *instance) lookupStreams(ctx context.Context, req *logproto.QueryRequest, matchers []*labels.Matcher, filter logql.Filter) ([]iter.EntryIterator, error) {
func (i *instance) Series(_ context.Context, req *logproto.SeriesRequest) (*logproto.SeriesResponse, error) {
groups, err := loghttp.Match(req.GetGroups())
if err != nil {
return nil, err
}

dedupedSeries := make(map[uint64]logproto.SeriesIdentifier)
for _, matchers := range groups {
err = i.forMatchingStreams(matchers, func(stream *stream) error {
// exit early when this stream was added by an earlier group
key := stream.labels.Hash()
if _, found := dedupedSeries[key]; found {
return nil
}

dedupedSeries[key] = logproto.SeriesIdentifier{
Labels: stream.labels.Map(),
}
return nil
})

if err != nil {
return nil, err
}
}
series := make([]logproto.SeriesIdentifier, 0, len(dedupedSeries))
for _, v := range dedupedSeries {
series = append(series, v)

}
rfratto marked this conversation as resolved.
Show resolved Hide resolved
return &logproto.SeriesResponse{Series: series}, nil
}

// forMatchingStreams will execute a function for each stream that satisfies a set of requirements (time range, matchers, etc).
// It uses a function in order to enable generic stream acces without accidentally leaking streams under the mutex.
func (i *instance) forMatchingStreams(
matchers []*labels.Matcher,
fn func(*stream) error,
) error {
i.streamsMtx.RLock()
defer i.streamsMtx.RUnlock()

filters, matchers := cutil.SplitFiltersAndMatchers(matchers)
ids := i.index.Lookup(matchers)
iterators := make([]iter.EntryIterator, 0, len(ids))

outer:
for _, streamID := range ids {
stream, ok := i.streams[streamID]
if !ok {
return nil, ErrStreamMissing
return ErrStreamMissing
}
for _, filter := range filters {
if !filter.Matches(stream.labels.Get(filter.Name)) {
continue outer
}
}
iter, err := stream.Iterator(ctx, req.Start, req.End, req.Direction, filter)

err := fn(stream)
if err != nil {
return nil, err
return err
}
iterators = append(iterators, iter)
}
return iterators, nil
return nil
}

func (i *instance) addNewTailer(t *tailer) {
Expand Down
5 changes: 5 additions & 0 deletions pkg/loghttp/labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ type LabelResponse struct {
// LabelSet is a key/value pair mapping of labels
type LabelSet map[string]string

// Map coerces LabelSet into a map[string]string. This is useful for working with adapter types.
func (l LabelSet) Map() map[string]string {
return l
}

// String implements the Stringer interface. It returns a formatted/sorted set of label key/value pairs.
func (l LabelSet) String() string {
var b bytes.Buffer
Expand Down
16 changes: 16 additions & 0 deletions pkg/loghttp/labels_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,22 @@ func TestParseLabelQuery(t *testing.T) {
}
}

func TestLabelsMap(t *testing.T) {
ls := LabelSet{
"a": "1",
"b": "2",
}

require.Equal(
t,
map[string]string{
"a": "1",
"b": "2",
},
ls.Map(),
)
}

func timePtr(t time.Time) *time.Time {
return &t
}
Expand Down
23 changes: 23 additions & 0 deletions pkg/loghttp/params.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@ import (

"github.com/pkg/errors"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"

"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
)

const (
Expand Down Expand Up @@ -71,6 +73,27 @@ func step(r *http.Request, start, end time.Time) (time.Duration, error) {
return 0, errors.Errorf("cannot parse %q to a valid duration", value)
}

// Match extracts and parses multiple matcher groups from a slice of strings
func Match(xs []string) ([][]*labels.Matcher, error) {
if len(xs) == 0 {
return nil, errors.New("0 matcher groups supplied")
}

groups := make([][]*labels.Matcher, 0, len(xs))
for _, x := range xs {
ms, err := logql.ParseMatchers(x)
if err != nil {
return nil, err
}
if len(ms) == 0 {
return nil, errors.Errorf("0 matchers in group: %s", x)
}
groups = append(groups, ms)
}

return groups, nil
}

// defaultQueryRangeStep returns the default step used in the query range API,
// which is dinamically calculated based on the time range
func defaultQueryRangeStep(start time.Time, end time.Time) int {
Expand Down
Loading