Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add tailing metrics #6059

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ func (t *Loki) initQuerier() (services.Service, error) {
return nil, err
}

q, err := querier.New(t.Cfg.Querier, t.Store, t.ingesterQuerier, t.overrides, deleteStore)
q, err := querier.New(t.Cfg.Querier, t.Store, t.ingesterQuerier, t.overrides, deleteStore, prometheus.DefaultRegisterer)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -699,7 +699,7 @@ func (t *Loki) initRuler() (_ services.Service, err error) {
return nil, err
}

q, err := querier.New(t.Cfg.Querier, t.Store, t.ingesterQuerier, t.overrides, deleteStore)
q, err := querier.New(t.Cfg.Querier, t.Store, t.ingesterQuerier, t.overrides, deleteStore, nil)
if err != nil {
return nil, err
}
Expand Down
29 changes: 29 additions & 0 deletions pkg/querier/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package querier

import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)

type Metrics struct {
tailsActive prometheus.Gauge
tailedStreamsActive prometheus.Gauge
tailedBytesTotal prometheus.Counter
}

func NewMetrics(r prometheus.Registerer) *Metrics {
return &Metrics{
tailsActive: promauto.With(r).NewGauge(prometheus.GaugeOpts{
Name: "loki_querier_tail_active",
Help: "Number of active tailers",
}),
tailedStreamsActive: promauto.With(r).NewGauge(prometheus.GaugeOpts{
Name: "loki_querier_tail_active_streams",
Help: "Number of active streams being tailed",
}),
tailedBytesTotal: promauto.With(r).NewCounter(prometheus.CounterOpts{
Name: "loki_querier_tail_bytes_total",
Help: "total bytes tailed",
}),
}
}
7 changes: 6 additions & 1 deletion pkg/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"net/http"
"time"

"github.com/prometheus/client_golang/prometheus"

"github.com/grafana/loki/pkg/storage/stores/shipper/compactor/deletion"

"github.com/go-kit/log/level"
Expand Down Expand Up @@ -90,20 +92,22 @@ type SingleTenantQuerier struct {
limits *validation.Overrides
ingesterQuerier *IngesterQuerier
deleteGetter deleteGetter
metrics *Metrics
}

type deleteGetter interface {
GetAllDeleteRequestsForUser(ctx context.Context, userID string) ([]deletion.DeleteRequest, error)
}

// New makes a new Querier.
func New(cfg Config, store storage.Store, ingesterQuerier *IngesterQuerier, limits *validation.Overrides, d deleteGetter) (*SingleTenantQuerier, error) {
func New(cfg Config, store storage.Store, ingesterQuerier *IngesterQuerier, limits *validation.Overrides, d deleteGetter, r prometheus.Registerer) (*SingleTenantQuerier, error) {
return &SingleTenantQuerier{
cfg: cfg,
store: store,
ingesterQuerier: ingesterQuerier,
limits: limits,
deleteGetter: d,
metrics: NewMetrics(r),
}, nil
}

Expand Down Expand Up @@ -454,6 +458,7 @@ func (q *SingleTenantQuerier) Tail(ctx context.Context, req *logproto.TailReques
},
q.cfg.TailMaxDuration,
tailerWaitEntryThrottle,
q.metrics,
), nil
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/querier/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1175,7 +1175,7 @@ func newQuerier(cfg Config, clientCfg client.Config, clientFactory ring_client.P
return nil, err
}

return New(cfg, store, iq, limits, dg)
return New(cfg, store, iq, limits, dg, nil)
}

type mockDeleteGettter struct {
Expand Down
42 changes: 40 additions & 2 deletions pkg/querier/tail.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ type Tailer struct {
currEntry logproto.Entry
currLabels string

// keep track of the streams for metrics about active streams
seenStreams map[uint64]struct{}
seenStreamsMtx sync.Mutex

tailDisconnectedIngesters func([]string) (map[string]logproto.Querier_TailClient, error)

querierTailClients map[string]logproto.Querier_TailClient // addr -> grpc clients for tailing logs from ingesters
Expand All @@ -55,6 +59,7 @@ type Tailer struct {
// if we are not seeing any response from ingester,
// how long do we want to wait by going into sleep
waitEntryThrottle time.Duration
metrics *Metrics
}

func (t *Tailer) readTailClients() {
Expand Down Expand Up @@ -95,8 +100,11 @@ func (t *Tailer) loop() {

// Read as much entries as we can (up to the max allowed) and populate the
// tail response we'll send over the response channel
tailResponse := new(loghttp.TailResponse)
entriesCount := 0
var (
tailResponse = new(loghttp.TailResponse)
entriesCount = 0
entriesSize = 0
)

for ; entriesCount < maxEntriesPerTailResponse && t.next(); entriesCount++ {
// If the response channel channel is blocked, we drop the current entry directly
Expand All @@ -106,6 +114,7 @@ func (t *Tailer) loop() {
continue
}

entriesSize += len(t.currEntry.Line)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't necessarily the number of bytes, but it is consistent with how we calculate similar metrics in other components.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why isn't this the number of bytes? wouldn't each character take up a byte? either way I agree keep it consistent, but maybe we want to consider an issue to fix this across the code base (as I think actually knowing bytes is helpful)

Copy link
Contributor Author

@MasslessParticle MasslessParticle Apr 29, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Each character takes up 1+ bytes. The string 世界 has 2 characters but 6 bytes. That said it looks like Golang len gives back bytes rather characters. I can't believe I didn't know this.

I wonder how many bugs I've caused with it?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

haha, go was clearly designed for dummies like me who only use English characters and still think ASCII is the only encoding out there.

tailResponse.Streams = append(tailResponse.Streams, logproto.Stream{
Labels: t.currLabels,
Entries: []logproto.Entry{t.currEntry},
Expand Down Expand Up @@ -151,6 +160,7 @@ func (t *Tailer) loop() {

select {
case t.responseChan <- tailResponse:
t.metrics.tailedBytesTotal.Add(float64(entriesSize))
if len(droppedEntries) > 0 {
droppedEntries = make([]loghttp.DroppedEntry, 0)
}
Expand Down Expand Up @@ -239,13 +249,18 @@ func (t *Tailer) next() bool {

t.currEntry = t.openStreamIterator.Entry()
t.currLabels = t.openStreamIterator.Labels()
t.recordStream(t.openStreamIterator.StreamHash())

return true
}

func (t *Tailer) close() error {
t.streamMtx.Lock()
defer t.streamMtx.Unlock()

t.metrics.tailsActive.Dec()
t.metrics.tailedStreamsActive.Sub(t.activeStreamCount())

t.stopped = true
return t.openStreamIterator.Close()
}
Expand All @@ -264,25 +279,48 @@ func (t *Tailer) getCloseErrorChan() <-chan error {
return t.closeErrChan
}

func (t *Tailer) recordStream(id uint64) {
t.seenStreamsMtx.Lock()
defer t.seenStreamsMtx.Unlock()

if _, ok := t.seenStreams[id]; ok {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you're only writing once there's a lot of chances that sync.Map will perform better. Not sure this is important in this case but at least I shared the options for later.

return
}

t.seenStreams[id] = struct{}{}
t.metrics.tailedStreamsActive.Inc()
}

func (t *Tailer) activeStreamCount() float64 {
t.seenStreamsMtx.Lock()
defer t.seenStreamsMtx.Unlock()

return float64(len(t.seenStreams))
}

func newTailer(
delayFor time.Duration,
querierTailClients map[string]logproto.Querier_TailClient,
historicEntries iter.EntryIterator,
tailDisconnectedIngesters func([]string) (map[string]logproto.Querier_TailClient, error),
tailMaxDuration time.Duration,
waitEntryThrottle time.Duration,
m *Metrics,
) *Tailer {
t := Tailer{
openStreamIterator: iter.NewMergeEntryIterator(context.Background(), []iter.EntryIterator{historicEntries}, logproto.FORWARD),
querierTailClients: querierTailClients,
delayFor: delayFor,
responseChan: make(chan *loghttp.TailResponse, maxBufferedTailResponses),
closeErrChan: make(chan error),
seenStreams: make(map[uint64]struct{}),
tailDisconnectedIngesters: tailDisconnectedIngesters,
tailMaxDuration: tailMaxDuration,
waitEntryThrottle: waitEntryThrottle,
metrics: m,
}

t.metrics.tailsActive.Inc()
t.readTailClients()
go t.loop()
return &t
Expand Down
2 changes: 1 addition & 1 deletion pkg/querier/tail_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func TestTailer(t *testing.T) {
tailClients["test"] = test.tailClient
}

tailer := newTailer(0, tailClients, test.historicEntries, tailDisconnectedIngesters, timeout, throttle)
tailer := newTailer(0, tailClients, test.historicEntries, tailDisconnectedIngesters, timeout, throttle, NewMetrics(nil))
defer tailer.close()

test.tester(t, tailer, test.tailClient)
Expand Down