diff --git a/pkg/logentry/metric/metricvec.go b/pkg/logentry/metric/metricvec.go index c7a2feadb56ee..335982681dce4 100644 --- a/pkg/logentry/metric/metricvec.go +++ b/pkg/logentry/metric/metricvec.go @@ -58,6 +58,17 @@ func (c *metricVec) With(labels model.LabelSet) prometheus.Metric { return metric } +func (c *metricVec) Delete(labels model.LabelSet) bool { + c.mtx.Lock() + defer c.mtx.Unlock() + fp := labels.Fingerprint() + _, ok := c.metrics[fp] + if ok { + delete(c.metrics, fp) + } + return ok +} + // prune will remove all metrics which implement the Expirable interface and have expired // it does not take out a lock on the metrics map so whoever calls this function should do so. func (c *metricVec) prune() { diff --git a/pkg/promtail/api/types.go b/pkg/promtail/api/types.go index 48fea8f01d775..69efd0eb317d0 100644 --- a/pkg/promtail/api/types.go +++ b/pkg/promtail/api/types.go @@ -6,6 +6,11 @@ import ( "github.com/prometheus/common/model" ) +type InstrumentedEntryHandler interface { + EntryHandler + UnregisterLatencyMetric(labels model.LabelSet) +} + // EntryHandler is something that can "handle" entries. type EntryHandler interface { Handle(labels model.LabelSet, time time.Time, entry string) error diff --git a/pkg/promtail/client/client.go b/pkg/promtail/client/client.go index bb5014b58ef0b..083a5f07af463 100644 --- a/pkg/promtail/client/client.go +++ b/pkg/promtail/client/client.go @@ -12,6 +12,9 @@ import ( "sync" "time" + "github.com/prometheus/prometheus/promql/parser" + + "github.com/grafana/loki/pkg/logentry/metric" "github.com/grafana/loki/pkg/promtail/api" "github.com/cortexproject/cortex/pkg/util" @@ -34,6 +37,9 @@ const ( // Label reserved to override the tenant ID while processing // pipeline stages ReservedLabelTenantID = "__tenant_id__" + + LatencyLabel = "filename" + HostLabel = "host" ) var ( @@ -41,32 +47,33 @@ var ( Namespace: "promtail", Name: "encoded_bytes_total", Help: "Number of bytes encoded and ready to send.", - }, []string{"host"}) + }, []string{HostLabel}) sentBytes = prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: "promtail", Name: "sent_bytes_total", Help: "Number of bytes sent.", - }, []string{"host"}) + }, []string{HostLabel}) droppedBytes = prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: "promtail", Name: "dropped_bytes_total", Help: "Number of bytes dropped because failed to be sent to the ingester after all retries.", - }, []string{"host"}) + }, []string{HostLabel}) sentEntries = prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: "promtail", Name: "sent_entries_total", Help: "Number of log entries sent to the ingester.", - }, []string{"host"}) + }, []string{HostLabel}) droppedEntries = prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: "promtail", Name: "dropped_entries_total", Help: "Number of log entries dropped because failed to be sent to the ingester after all retries.", - }, []string{"host"}) + }, []string{HostLabel}) requestDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{ Namespace: "promtail", Name: "request_duration_seconds", Help: "Duration of send requests.", - }, []string{"status_code", "host"}) + }, []string{"status_code", HostLabel}) + streamLag *metric.Gauges countersWithHost = []*prometheus.CounterVec{ encodedBytes, sentBytes, droppedBytes, sentEntries, droppedEntries, @@ -82,6 +89,16 @@ func init() { prometheus.MustRegister(sentEntries) prometheus.MustRegister(droppedEntries) prometheus.MustRegister(requestDuration) + var err error + streamLag, err = metric.NewGauges("promtail_stream_lag_seconds", + "Difference between current time and last batch timestamp for successful sends", + metric.GaugeConfig{Action: "set"}, + int64(1*time.Minute.Seconds()), // This strips out files which update slowly and reduces noise in this metric. + ) + if err != nil { + panic(err) + } + prometheus.MustRegister(streamLag) } // Client pushes entries to Loki and can be stopped @@ -234,6 +251,26 @@ func (c *client) sendBatch(tenantID string, batch *batch) { if err == nil { sentBytes.WithLabelValues(c.cfg.URL.Host).Add(bufBytes) sentEntries.WithLabelValues(c.cfg.URL.Host).Add(float64(entriesCount)) + for _, s := range batch.streams { + lbls, err := parser.ParseMetric(s.Labels) + if err != nil { + // is this possible? + level.Warn(c.logger).Log("msg", "error converting stream label string to label.Labels, cannot update lagging metric", "error", err) + return + } + var lblSet model.LabelSet + for i := range lbls { + if lbls[i].Name == LatencyLabel { + lblSet = model.LabelSet{ + model.LabelName(HostLabel): model.LabelValue(c.cfg.URL.Host), + model.LabelName(LatencyLabel): model.LabelValue(lbls[i].Value), + } + } + } + if lblSet != nil { + streamLag.With(lblSet).Set(time.Now().Sub(s.Entries[len(s.Entries)-1].Timestamp).Seconds()) + } + } return } @@ -330,3 +367,8 @@ func (c *client) Handle(ls model.LabelSet, t time.Time, s string) error { }} return nil } + +func (c *client) UnregisterLatencyMetric(labels model.LabelSet) { + labels[HostLabel] = model.LabelValue(c.cfg.URL.Host) + streamLag.Delete(labels) +} diff --git a/pkg/promtail/client/config.go b/pkg/promtail/client/config.go index 444490acd1cbf..590753e4c0c05 100644 --- a/pkg/promtail/client/config.go +++ b/pkg/promtail/client/config.go @@ -34,7 +34,7 @@ type Config struct { func (c *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { f.Var(&c.URL, prefix+"client.url", "URL of log server") f.DurationVar(&c.BatchWait, prefix+"client.batch-wait", 1*time.Second, "Maximum wait period before sending batch.") - f.IntVar(&c.BatchSize, prefix+"client.batch-size-bytes", 100*1024, "Maximum batch size to accrue before sending. ") + f.IntVar(&c.BatchSize, prefix+"client.batch-size-bytes", 1024*1024, "Maximum batch size to accrue before sending. ") // Default backoff schedule: 0.5s, 1s, 2s, 4s, 8s, 16s, 32s, 64s, 128s, 256s(4.267m) For a total time of 511.5s(8.5m) before logs are lost f.IntVar(&c.BackoffConfig.MaxRetries, prefix+"client.max-retries", 10, "Maximum number of retires when sending batches.") f.DurationVar(&c.BackoffConfig.MinBackoff, prefix+"client.min-backoff", 500*time.Millisecond, "Initial backoff time between retries.") diff --git a/pkg/promtail/targets/file/filetarget.go b/pkg/promtail/targets/file/filetarget.go index 93783ba0a4759..620704931f78b 100644 --- a/pkg/promtail/targets/file/filetarget.go +++ b/pkg/promtail/targets/file/filetarget.go @@ -17,6 +17,7 @@ import ( "github.com/grafana/loki/pkg/helpers" "github.com/grafana/loki/pkg/promtail/api" + "github.com/grafana/loki/pkg/promtail/client" "github.com/grafana/loki/pkg/promtail/positions" "github.com/grafana/loki/pkg/promtail/targets/target" ) @@ -316,6 +317,9 @@ func (t *FileTarget) stopTailingAndRemovePosition(ps []string) { t.positions.Remove(tailer.path) delete(t.tails, p) } + if h, ok := t.handler.(api.InstrumentedEntryHandler); ok { + h.UnregisterLatencyMetric(model.LabelSet{model.LabelName(client.LatencyLabel): model.LabelValue(p)}) + } } }