Skip to content

Commit

Permalink
Merge branch 'master' into debug-helm
Browse files Browse the repository at this point in the history
  • Loading branch information
daixiang0 authored Apr 28, 2019
2 parents b95805d + e506f16 commit 2ce5af1
Show file tree
Hide file tree
Showing 54 changed files with 1,972 additions and 924 deletions.
11 changes: 6 additions & 5 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@

[[constraint]]
name = "github.com/cortexproject/cortex"
branch = "master"
source = "https://github.com/grafana/cortex"
branch = "lazy-load-chunks"

[[constraint]]
name = "github.com/weaveworks/common"
Expand Down
10 changes: 10 additions & 0 deletions cmd/loki/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/flagext"
"github.com/cortexproject/cortex/pkg/util/validation"
)

func init() {
Expand All @@ -29,6 +30,15 @@ func main() {
flagext.RegisterFlags(&cfg)
flag.Parse()

// LimitsConfig has a customer UnmarshalYAML that will set the defaults to a global.
// This global is set to the config passed into the last call to `NewOverrides`. If we don't
// call it atleast once, the defaults are set to an empty struct.
// We call it with the flag values so that the config file unmarshalling only overrides the values set in the config.
if _, err := validation.NewOverrides(cfg.LimitsConfig); err != nil {
level.Error(util.Logger).Log("msg", "error loading limits", "err", err)
os.Exit(1)
}

util.InitLogger(&cfg.Server)

if configFile != "" {
Expand Down
7 changes: 4 additions & 3 deletions docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ The Loki server has the following API endpoints (_Note:_ Authentication is out o
"labels": "{instance=\"...\", job=\"...\", namespace=\"...\"}",
"entries": [
{
"timestamp": "2018-06-27T05:20:28.699492635Z",
"ts": "2018-06-27T05:20:28.699492635Z",
"line": "..."
},
...
Expand Down Expand Up @@ -88,6 +88,7 @@ The Loki server has the following API endpoints (_Note:_ Authentication is out o
}
```

## Example of using the API in a third-party client library
## Examples of using the API in a third-party client library

Take a look at this [client](https://github.com/afiskon/promtail-client), but be aware that the API is not stable yet.
1) Take a look at this [client](https://github.com/afiskon/promtail-client), but be aware that the API is not stable yet (Golang).
2) Example on [Python3](https://github.com/sleleko/devops-kb/blob/master/python/push-to-loki.py)
2 changes: 1 addition & 1 deletion docs/operations.md
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ The S3 configuration is setup with url format: `s3://access_key:secret_access_ke
#### DynamoDB

Loki uses DynamoDB for the index storage. It is used for querying logs, make
sure you adjuest your throughput to your usage.
sure you adjust your throughput to your usage.

DynamoDB access is very similar to S3, however you do not need to specify a
table name in the storage section, as Loki will calculate that for you.
Expand Down
12 changes: 10 additions & 2 deletions docs/troubleshooting.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,6 @@ to

Both binaries support a log level parameter on the command-line, e.g.: `loki —log.level= debug ...`

## No labels:

## Failed to create target, "ioutil.ReadDir: readdirent: not a directory"

The promtail configuration contains a `__path__` entry to a directory that promtail cannot find.
Expand Down Expand Up @@ -82,3 +80,13 @@ Once connected, verify the config in `/etc/promtail/promtail.yml` is what you ex
Also check `/var/log/positions.yaml` and make sure promtail is tailing the logs you would expect

You can check the promtail log by looking in `/var/log/containers` at the promtail container log

## Enable tracing for loki

We support (jaeger)[https://www.jaegertracing.io/] to trace loki, just add env `JAEGER_AGENT_HOST` to where loki run, and you can use jaeger to trace.

If you deploy with helm, refer to following command:

```bash
$ helm upgrade --install loki loki/loki --set "loki.jaegerAgentHost=YOUR_JAEGER_AGENT_HOST"
```
87 changes: 87 additions & 0 deletions pkg/chunkenc/lazy_chunk.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package chunkenc

import (
"context"
"time"

"github.com/cortexproject/cortex/pkg/chunk"
"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/logproto"
)

// LazyChunk loads the chunk when it is accessed.
type LazyChunk struct {
Chunk chunk.Chunk
Fetcher *chunk.Fetcher
}

func (c *LazyChunk) getChunk(ctx context.Context) (Chunk, error) {
chunks, err := c.Fetcher.FetchChunks(ctx, []chunk.Chunk{c.Chunk}, []string{c.Chunk.ExternalKey()})
if err != nil {
return nil, err
}

c.Chunk = chunks[0]
return chunks[0].Data.(*Facade).LokiChunk(), nil
}

// Iterator returns an entry iterator.
func (c LazyChunk) Iterator(ctx context.Context, from, through time.Time, direction logproto.Direction) (iter.EntryIterator, error) {
// If the chunk is already loaded, then use that.
if c.Chunk.Data != nil {
lokiChunk := c.Chunk.Data.(*Facade).LokiChunk()
return lokiChunk.Iterator(from, through, direction)
}

return &lazyIterator{
chunk: c,

from: from,
through: through,
direction: direction,
context: ctx,
}, nil
}

type lazyIterator struct {
iter.EntryIterator

chunk LazyChunk
err error

from, through time.Time
direction logproto.Direction
context context.Context
}

func (it *lazyIterator) Next() bool {
if it.err != nil {
return false
}

if it.EntryIterator != nil {
return it.EntryIterator.Next()
}

chk, err := it.chunk.getChunk(it.context)
if err != nil {
it.err = err
return false
}

it.EntryIterator, it.err = chk.Iterator(it.from, it.through, it.direction)

return it.Next()
}

func (it *lazyIterator) Labels() string {
return it.chunk.Chunk.Metric.String()
}

func (it *lazyIterator) Error() error {
if it.err != nil {
return it.err
}

return it.EntryIterator.Error()
}
66 changes: 55 additions & 11 deletions pkg/ingester/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"golang.org/x/net/context"

"github.com/go-kit/kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/common/model"
"github.com/weaveworks/common/user"

Expand All @@ -17,6 +19,34 @@ import (
"github.com/grafana/loki/pkg/chunkenc"
)

var (
chunkEntries = promauto.NewHistogram(prometheus.HistogramOpts{
Name: "loki_ingester_chunk_entries",
Help: "Distribution of stored chunk entries (when stored).",
Buckets: prometheus.ExponentialBuckets(20, 2, 11), // biggest bucket is 5*2^(11-1) = 5120
})
chunkSize = promauto.NewHistogram(prometheus.HistogramOpts{
Name: "loki_ingester_chunk_size_bytes",
Help: "Distribution of stored chunk sizes (when stored).",
Buckets: prometheus.ExponentialBuckets(500, 2, 5), // biggest bucket is 500*2^(5-1) = 8000
})
chunksPerTenant = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "loki_ingester_chunks_stored_total",
Help: "Total stored chunks per tenant.",
}, []string{"tenant"})
chunkSizePerTenant = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "loki_ingester_chunk_stored_bytes_total",
Help: "Total bytes stored in chunks per tenant.",
}, []string{"tenant"})
chunkAge = promauto.NewHistogram(prometheus.HistogramOpts{
Name: "loki_ingester_chunk_age_seconds",
Help: "Distribution of chunk ages (when stored).",
// with default settings chunks should flush between 5 min and 12 hours
// so buckets at 1min, 5min, 10min, 30min, 1hr, 2hr, 4hr, 10hr, 12hr, 16hr
Buckets: []float64{60, 300, 600, 1800, 3600, 7200, 14400, 36000, 43200, 57600},
})
)

const (
// Backoff for retrying 'immediate' flushes. Only counts for queue
// position, not wallclock time.
Expand Down Expand Up @@ -154,7 +184,7 @@ func (i *Ingester) flushUserSeries(userID string, fp model.Fingerprint, immediat
return nil
}

func (i *Ingester) collectChunksToFlush(instance *instance, fp model.Fingerprint, immediate bool) ([]*chunkDesc, []client.LabelPair) {
func (i *Ingester) collectChunksToFlush(instance *instance, fp model.Fingerprint, immediate bool) ([]*chunkDesc, []client.LabelAdapter) {
instance.streamsMtx.Lock()
defer instance.streamsMtx.Unlock()

Expand Down Expand Up @@ -204,18 +234,18 @@ func (i *Ingester) removeFlushedChunks(instance *instance, stream *stream) {

if len(stream.chunks) == 0 {
delete(instance.streams, stream.fp)
instance.index.Delete(client.FromLabelPairsToLabels(stream.labels), stream.fp)
instance.index.Delete(client.FromLabelAdaptersToLabels(stream.labels), stream.fp)
instance.streamsRemovedTotal.Inc()
}
}

func (i *Ingester) flushChunks(ctx context.Context, fp model.Fingerprint, labelPairs []client.LabelPair, cs []*chunkDesc) error {
func (i *Ingester) flushChunks(ctx context.Context, fp model.Fingerprint, labelPairs []client.LabelAdapter, cs []*chunkDesc) error {
userID, err := user.ExtractOrgID(ctx)
if err != nil {
return err
}

metric := fromLabelPairs(labelPairs)
metric := client.FromLabelAdaptersToMetric(labelPairs)
metric[nameLabel] = logsValue

wireChunks := make([]chunk.Chunk, 0, len(cs))
Expand All @@ -234,13 +264,27 @@ func (i *Ingester) flushChunks(ctx context.Context, fp model.Fingerprint, labelP
wireChunks = append(wireChunks, c)
}

return i.store.Put(ctx, wireChunks)
}
if err := i.store.Put(ctx, wireChunks); err != nil {
return err
}

// Record statistsics only when actual put request did not return error.
sizePerTenant := chunkSizePerTenant.WithLabelValues(userID)
countPerTenant := chunksPerTenant.WithLabelValues(userID)
for i, wc := range wireChunks {
numEntries := cs[i].chunk.Size()
byt, err := wc.Encoded()
if err != nil {
continue
}

func fromLabelPairs(ls []client.LabelPair) model.Metric {
m := make(model.Metric, len(ls))
for _, l := range ls {
m[model.LabelName(l.Name)] = model.LabelValue(l.Value)
chunkEntries.Observe(float64(numEntries))
chunkSize.Observe(float64(len(byt)))
sizePerTenant.Add(float64(len(byt)))
countPerTenant.Inc()
firstTime, _ := cs[i].chunk.Bounds()
chunkAge.Observe(time.Since(firstTime).Seconds())
}
return m

return nil
}
6 changes: 3 additions & 3 deletions pkg/ingester/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,16 +121,16 @@ func (i *instance) Query(req *logproto.QueryRequest, queryServer logproto.Querie
func (i *instance) Label(ctx context.Context, req *logproto.LabelRequest) (*logproto.LabelResponse, error) {
var labels []string
if req.Values {
values := i.index.LabelValues(model.LabelName(req.Name))
values := i.index.LabelValues(req.Name)
labels = make([]string, len(values))
for i := 0; i < len(values); i++ {
labels[i] = string(values[i])
labels[i] = values[i]
}
} else {
names := i.index.LabelNames()
labels = make([]string, len(names))
for i := 0; i < len(names); i++ {
labels[i] = string(names[i])
labels[i] = names[i]
}
}
return &logproto.LabelResponse{
Expand Down
8 changes: 4 additions & 4 deletions pkg/ingester/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ type stream struct {
// Not thread-safe; assume accesses to this are locked by caller.
chunks []chunkDesc
fp model.Fingerprint
labels []client.LabelPair
labels []client.LabelAdapter
}

type chunkDesc struct {
Expand All @@ -58,7 +58,7 @@ type chunkDesc struct {
lastUpdated time.Time
}

func newStream(fp model.Fingerprint, labels []client.LabelPair) *stream {
func newStream(fp model.Fingerprint, labels []client.LabelAdapter) *stream {
return &stream{
fp: fp,
labels: labels,
Expand Down Expand Up @@ -96,7 +96,7 @@ func (s *stream) Push(_ context.Context, entries []logproto.Entry) error {
}

if appendErr == chunkenc.ErrOutOfOrder {
return httpgrpc.Errorf(http.StatusBadRequest, "entry out of order for stream: %s", client.FromLabelPairsToLabels(s.labels).String())
return httpgrpc.Errorf(http.StatusBadRequest, "entry out of order for stream: %s", client.FromLabelAdaptersToLabels(s.labels).String())
}

return appendErr
Expand All @@ -121,5 +121,5 @@ func (s *stream) Iterator(from, through time.Time, direction logproto.Direction)
}
}

return iter.NewNonOverlappingIterator(iterators, client.FromLabelPairsToLabels(s.labels).String()), nil
return iter.NewNonOverlappingIterator(iterators, client.FromLabelAdaptersToLabels(s.labels).String()), nil
}
8 changes: 6 additions & 2 deletions pkg/iter/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,11 +352,15 @@ func (i *nonOverlappingIterator) Entry() logproto.Entry {
}

func (i *nonOverlappingIterator) Labels() string {
return i.labels
if i.labels != "" {
return i.labels
}

return i.curr.Labels()
}

func (i *nonOverlappingIterator) Error() error {
return nil
return i.curr.Error()
}

func (i *nonOverlappingIterator) Close() error {
Expand Down
Loading

0 comments on commit 2ce5af1

Please sign in to comment.