Skip to content

Commit

Permalink
Cleaning up metrics when files are deleted, adding tests to verify th…
Browse files Browse the repository at this point in the history
…e metrics.
  • Loading branch information
slim-bean committed Apr 17, 2019
1 parent c681c76 commit db31d96
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 2 deletions.
104 changes: 102 additions & 2 deletions pkg/promtail/promtail_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package promtail
import (
"errors"
"fmt"
"io"
"io/ioutil"
"math/rand"
"net/http"
"os"
Expand All @@ -18,6 +20,9 @@ import (
"github.com/prometheus/common/model"
sd_config "github.com/prometheus/prometheus/discovery/config"
"github.com/prometheus/prometheus/discovery/targetgroup"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/pkg/textparse"
"github.com/stretchr/testify/assert"

"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/parser"
Expand Down Expand Up @@ -110,9 +115,21 @@ func TestPromtail(t *testing.T) {

// Wait for all lines to be received.
if err := waitForEntries(20, handler, expectedCounts); err != nil {
t.Fatal("Timed out waiting all log entries to be sent, still waiting for ", err)
t.Fatal("Timed out waiting for log entries: ", err)
}

// Delete one of the log files so we can verify metrics are clean up
err = os.Remove(logFile1)
if err != nil {
t.Fatal("Could not delete a log file to verify metrics are removed: ", err)
}

// Sync period is 100ms in tests, need to wait for at least one sync period for tailer to be cleaned up
<-time.After(150 * time.Millisecond)

//Pull out some prometheus metrics before shutting down
metricsBytes, contentType := getPromMetrics(t)

p.Shutdown()

// Verify.
Expand All @@ -126,6 +143,21 @@ func TestPromtail(t *testing.T) {
t.Error("Somehow we ended up tailing more files than we were supposed to, this is likely a bug")
}

readBytesMetrics := parsePromMetrics(t, metricsBytes, contentType, "promtail_read_bytes_total", "path")
fileBytesMetrics := parsePromMetrics(t, metricsBytes, contentType, "promtail_file_bytes_total", "path")

verifyMetricAbsent(t, readBytesMetrics, "promtail_read_bytes_total", logFile1)
verifyMetricAbsent(t, fileBytesMetrics, "promtail_file_bytes_total", logFile1)

verifyMetric(t, readBytesMetrics, "promtail_read_bytes_total", logFile2, 800)
verifyMetric(t, fileBytesMetrics, "promtail_file_bytes_total", logFile2, 800)

verifyMetric(t, readBytesMetrics, "promtail_read_bytes_total", logFile3, 700)
verifyMetric(t, fileBytesMetrics, "promtail_file_bytes_total", logFile3, 700)

verifyMetric(t, readBytesMetrics, "promtail_read_bytes_total", logFile4, 590)
verifyMetric(t, fileBytesMetrics, "promtail_file_bytes_total", logFile4, 590)

}

func createStartupFile(t *testing.T, filename string) int {
Expand All @@ -148,6 +180,22 @@ func verifyFile(t *testing.T, expected int, prefix string, entries []logproto.En
}
}

func verifyMetricAbsent(t *testing.T, metrics map[string]float64, metric string, label string) {
if _, ok := metrics[label]; ok {
t.Error("Found metric", metric, "with label", label, "which was not expected, "+
"this metric should not be present")
}
}

func verifyMetric(t *testing.T, metrics map[string]float64, metric string, label string, expected float64) {
if _, ok := metrics[label]; !ok {
t.Error("Expected to find metric ", metric, " with", label, "but it was not present")
} else {
actualBytes := metrics[label]
assert.Equal(t, expected, actualBytes, "found incorrect value for metric %s and label %s", metric, label)
}
}

func singleFile(t *testing.T, filename string, prefix string) int {
f, err := os.Create(filename)
if err != nil {
Expand Down Expand Up @@ -294,9 +342,12 @@ func waitForEntries(timeoutSec int, handler *testServerHandler, expectedCounts m
for file, expectedCount := range expectedCounts {
if rcvd, ok := handler.receivedMap[file]; !ok || len(rcvd) != expectedCount {
waiting = waiting + " " + file
for _, e := range rcvd {
level.Info(util.Logger).Log("file", file, "entry", e.Line)
}
}
}
return errors.New("Did not receive the correct number of logs within timeout, still waiting for logs from" + waiting)
return errors.New("still waiting for logs from" + waiting)
}
return nil
}
Expand Down Expand Up @@ -340,6 +391,55 @@ func (h *testServerHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
h.recMtx.Unlock()
}

func getPromMetrics(t *testing.T) ([]byte, string) {
resp, err := http.Get("http://localhost:80/metrics")
if err != nil {
t.Fatal("Could not query metrics endpoint", err)
}

if resp.StatusCode != http.StatusOK {
t.Fatal("Received a non 200 status code from /metrics endpoint", resp.StatusCode)
}

b, err := ioutil.ReadAll(resp.Body)
if err != nil {
t.Fatal("Error reading response body from /metrics endpoint", err)
}
ct := resp.Header.Get("Content-Type")
return b, ct
}

func parsePromMetrics(t *testing.T, bytes []byte, contentType string, metricName string, label string) map[string]float64 {
rb := map[string]float64{}

pr := textparse.New(bytes, contentType)
for {
et, err := pr.Next()
if err == io.EOF {
break
}
if err != nil {
t.Fatal("Failed to parse prometheus metrics", err)
}
switch et {
case textparse.EntrySeries:
var res labels.Labels
_, _, v := pr.Series()
pr.Metric(&res)
switch res.Get(labels.MetricName) {
case metricName:
rb[res.Get(label)] = v
continue
default:
continue
}
default:
continue
}
}
return rb
}

func buildTestConfig(t *testing.T, positionsFileName string, logDirName string) config.Config {
var clientURL flagext.URLValue
err := clientURL.Set("http://localhost:3100/api/prom/push")
Expand Down
3 changes: 3 additions & 0 deletions pkg/promtail/targets/tailer.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,9 @@ func (t *tailer) stop() error {
close(t.quit)
<-t.done
filesActive.Add(-1.)
// When we stop tailing the file, also un-export metrics related to the file
readBytes.DeleteLabelValues(t.path)
totalBytes.DeleteLabelValues(t.path)
level.Info(t.logger).Log("msg", "stopped tailing file", "path", t.path)
return err
}
Expand Down

0 comments on commit db31d96

Please sign in to comment.