Skip to content

Commit

Permalink
chore(deps): update Loki and Promtail code to work with the new Prome…
Browse files Browse the repository at this point in the history
…theus
  • Loading branch information
ptodev committed Mar 20, 2024
1 parent d3db16f commit 3008a47
Show file tree
Hide file tree
Showing 14 changed files with 194 additions and 70 deletions.
71 changes: 33 additions & 38 deletions clients/pkg/promtail/discovery/consulagent/consul.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package consulagent
import (
"context"
"encoding/json"
"fmt"
"net"
"net/http"
"strconv"
Expand Down Expand Up @@ -62,26 +63,6 @@ const (
)

var (
rpcFailuresCount = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: namespace,
Name: "sd_consulagent_rpc_failures_total",
Help: "The number of Consul Agent RPC call failures.",
})
rpcDuration = prometheus.NewSummaryVec(
prometheus.SummaryOpts{
Namespace: namespace,
Name: "sd_consulagent_rpc_duration_seconds",
Help: "The duration of a Consul Agent RPC call in seconds.",
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
},
[]string{"endpoint", "call"},
)

// Initialize metric vectors.
servicesRPCDuration = rpcDuration.WithLabelValues("agent", "services")
serviceRPCDuration = rpcDuration.WithLabelValues("agent", "service")

// DefaultSDConfig is the default Consul SD configuration.
DefaultSDConfig = SDConfig{
TagSeparator: ",",
Expand All @@ -94,8 +75,6 @@ var (

func init() {
discovery.RegisterConfig(&SDConfig{})
prometheus.MustRegister(rpcFailuresCount)
prometheus.MustRegister(rpcDuration)
}

// SDConfig is the configuration for Consul service discovery.
Expand Down Expand Up @@ -129,12 +108,17 @@ type SDConfig struct {
TLSConfig config.TLSConfig `yaml:"tls_config,omitempty"`
}

// NewDiscovererMetrics implements discovery.Config.
func (c *SDConfig) NewDiscovererMetrics(reg prometheus.Registerer, rmi discovery.RefreshMetricsInstantiator) discovery.DiscovererMetrics {
return newDiscovererMetrics(reg, rmi)
}

// Name returns the name of the Config.
func (*SDConfig) Name() string { return "consulagent" }

// NewDiscoverer returns a Discoverer for the Config.
func (c *SDConfig) NewDiscoverer(opts discovery.DiscovererOptions) (discovery.Discoverer, error) {
return NewDiscovery(c, opts.Logger)
return NewDiscovery(c, opts.Logger, opts.Metrics)
}

// SetDirectory joins any relative file paths with dir.
Expand Down Expand Up @@ -169,10 +153,16 @@ type Discovery struct {
refreshInterval time.Duration
finalizer func()
logger log.Logger
metrics *consulMetrics
}

// NewDiscovery returns a new Discovery for the given config.
func NewDiscovery(conf *SDConfig, logger log.Logger) (*Discovery, error) {
func NewDiscovery(conf *SDConfig, logger log.Logger, metrics discovery.DiscovererMetrics) (*Discovery, error) {
m, ok := metrics.(*consulMetrics)
if !ok {
return nil, fmt.Errorf("invalid discovery metrics type")
}

if logger == nil {
logger = log.NewNopLogger()
}
Expand Down Expand Up @@ -220,6 +210,7 @@ func NewDiscovery(conf *SDConfig, logger log.Logger) (*Discovery, error) {
clientDatacenter: conf.Datacenter,
finalizer: transport.CloseIdleConnections,
logger: logger,
metrics: m,
}
return cd, nil
}
Expand Down Expand Up @@ -275,7 +266,7 @@ func (d *Discovery) getDatacenter() error {
info, err := d.client.Agent().Self()
if err != nil {
level.Error(d.logger).Log("msg", "Error retrieving datacenter name", "err", err)
rpcFailuresCount.Inc()
d.metrics.rpcFailuresCount.Inc()
return err
}

Expand Down Expand Up @@ -356,7 +347,7 @@ func (d *Discovery) watchServices(ctx context.Context, ch chan<- []*targetgroup.
t0 := time.Now()
srvs, err := agent.Services()
elapsed := time.Since(t0)
servicesRPCDuration.Observe(elapsed.Seconds())
d.metrics.servicesRPCDuration.Observe(elapsed.Seconds())

// Check the context before in order to exit early.
select {
Expand All @@ -367,7 +358,7 @@ func (d *Discovery) watchServices(ctx context.Context, ch chan<- []*targetgroup.

if err != nil {
level.Error(d.logger).Log("msg", "Error refreshing service list", "err", err)
rpcFailuresCount.Inc()
d.metrics.rpcFailuresCount.Inc()
time.Sleep(retryInterval)
return
}
Expand Down Expand Up @@ -423,13 +414,15 @@ func (d *Discovery) watchServices(ctx context.Context, ch chan<- []*targetgroup.

// consulService contains data belonging to the same service.
type consulService struct {
name string
tags []string
labels model.LabelSet
discovery *Discovery
client *consul.Client
tagSeparator string
logger log.Logger
name string
tags []string
labels model.LabelSet
discovery *Discovery
client *consul.Client
tagSeparator string
logger log.Logger
rpcFailuresCount prometheus.Counter
serviceRPCDuration prometheus.Observer
}

// Start watching a service.
Expand All @@ -443,8 +436,10 @@ func (d *Discovery) watchService(ctx context.Context, ch chan<- []*targetgroup.G
serviceLabel: model.LabelValue(name),
datacenterLabel: model.LabelValue(d.clientDatacenter),
},
tagSeparator: d.tagSeparator,
logger: d.logger,
tagSeparator: d.tagSeparator,
logger: d.logger,
rpcFailuresCount: d.metrics.rpcFailuresCount,
serviceRPCDuration: d.metrics.serviceRPCDuration,
}

go func() {
Expand Down Expand Up @@ -474,7 +469,7 @@ func (srv *consulService) watch(ctx context.Context, ch chan<- []*targetgroup.Gr
t0 := time.Now()
aggregatedStatus, serviceChecks, err := agent.AgentHealthServiceByName(srv.name)
elapsed := time.Since(t0)
serviceRPCDuration.Observe(elapsed.Seconds())
srv.serviceRPCDuration.Observe(elapsed.Seconds())

// Check the context before in order to exit early.
select {
Expand All @@ -486,7 +481,7 @@ func (srv *consulService) watch(ctx context.Context, ch chan<- []*targetgroup.Gr

if err != nil {
level.Error(srv.logger).Log("msg", "Error refreshing service", "service", srv.name, "tags", strings.Join(srv.tags, ","), "err", err)
rpcFailuresCount.Inc()
srv.rpcFailuresCount.Inc()
time.Sleep(retryInterval)
return
}
Expand Down
65 changes: 65 additions & 0 deletions clients/pkg/promtail/discovery/consulagent/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// This code was adapted from the consul service discovery
// package in prometheus: https://github.com/prometheus/prometheus/blob/main/discovery/consul/metrics.go
// which is copyrighted: 2015 The Prometheus Authors
// and licensed under the Apache License, Version 2.0 (the "License");

package consulagent

import (
"github.com/prometheus/client_golang/prometheus"

"github.com/prometheus/prometheus/discovery"
)

var _ discovery.DiscovererMetrics = (*consulMetrics)(nil)

type consulMetrics struct {
rpcFailuresCount prometheus.Counter
rpcDuration *prometheus.SummaryVec

servicesRPCDuration prometheus.Observer
serviceRPCDuration prometheus.Observer

metricRegisterer discovery.MetricRegisterer
}

func newDiscovererMetrics(reg prometheus.Registerer, rmi discovery.RefreshMetricsInstantiator) discovery.DiscovererMetrics {
m := &consulMetrics{
rpcFailuresCount: prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: namespace,
Name: "sd_consulagent_rpc_failures_total",
Help: "The number of Consul Agent RPC call failures.",
}),
rpcDuration: prometheus.NewSummaryVec(
prometheus.SummaryOpts{
Namespace: namespace,
Name: "sd_consulagent_rpc_duration_seconds",
Help: "The duration of a Consul Agent RPC call in seconds.",
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
},
[]string{"endpoint", "call"},
),
}

m.metricRegisterer = discovery.NewMetricRegisterer(reg, []prometheus.Collector{
m.rpcFailuresCount,
m.rpcDuration,
})

// Initialize metric vectors.
m.servicesRPCDuration = m.rpcDuration.WithLabelValues("agent", "services")
m.serviceRPCDuration = m.rpcDuration.WithLabelValues("agent", "service")

return m
}

// Register implements discovery.DiscovererMetrics.
func (m *consulMetrics) Register() error {
return m.metricRegisterer.RegisterMetrics()
}

// Unregister implements discovery.DiscovererMetrics.
func (m *consulMetrics) Unregister() {
m.metricRegisterer.UnregisterMetrics()
}
4 changes: 2 additions & 2 deletions clients/pkg/promtail/targets/docker/target.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"sync"
"time"

docker_types "github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/container"
"github.com/docker/docker/client"
"github.com/docker/docker/pkg/stdcopy"
"github.com/go-kit/log"
Expand Down Expand Up @@ -88,7 +88,7 @@ func (t *Target) processLoop(ctx context.Context) {
t.wg.Add(1)
defer t.wg.Done()

opts := docker_types.ContainerLogsOptions{
opts := container.LogsOptions{
ShowStdout: true,
ShowStderr: true,
Follow: true,
Expand Down
23 changes: 17 additions & 6 deletions clients/pkg/promtail/targets/docker/targetmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,25 @@ func NewTargetManager(
pushClient api.EntryHandler,
scrapeConfigs []scrapeconfig.Config,
) (*TargetManager, error) {
noopRegistry := util.NoopRegistry{}
noopSdMetrics, err := discovery.CreateAndRegisterSDMetrics(noopRegistry)
if err != nil {
return nil, err
}

ctx, cancel := context.WithCancel(context.Background())
tm := &TargetManager{
metrics: metrics,
logger: logger,
cancel: cancel,
done: make(chan struct{}),
positions: positions,
manager: discovery.NewManager(ctx, log.With(logger, "component", "docker_discovery")),
metrics: metrics,
logger: logger,
cancel: cancel,
done: make(chan struct{}),
positions: positions,
manager: discovery.NewManager(
ctx,
log.With(logger, "component", "docker_discovery"),
noopRegistry,
noopSdMetrics,
),
pushClient: pushClient,
groups: make(map[string]*targetGroup),
}
Expand Down
13 changes: 12 additions & 1 deletion clients/pkg/promtail/targets/file/filetargetmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,12 @@ func NewFileTargetManager(
reg = prometheus.DefaultRegisterer
}

noopRegistry := util.NoopRegistry{}
noopSdMetrics, err := discovery.CreateAndRegisterSDMetrics(noopRegistry)
if err != nil {
return nil, err
}

watcher, err := fsnotify.NewWatcher()
if err != nil {
return nil, err
Expand All @@ -76,7 +82,12 @@ func NewFileTargetManager(
watcher: watcher,
targetEventHandler: make(chan fileTargetEvent),
syncers: map[string]*targetSyncer{},
manager: discovery.NewManager(ctx, log.With(logger, "component", "discovery")),
manager: discovery.NewManager(
ctx,
log.With(logger, "component", "discovery"),
noopRegistry,
noopSdMetrics,
),
}

hostname, err := hostname()
Expand Down
13 changes: 1 addition & 12 deletions clients/pkg/promtail/targets/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ const (
KafkaConfigs = "kafkaConfigs"
GelfConfigs = "gelfConfigs"
CloudflareConfigs = "cloudflareConfigs"
DockerConfigs = "dockerConfigs"
DockerSDConfigs = "dockerSDConfigs"
HerokuDrainConfigs = "herokuDrainConfigs"
AzureEventHubsScrapeConfigs = "azureeventhubsScrapeConfigs"
Expand Down Expand Up @@ -150,7 +149,7 @@ func NewTargetManagers(
if len(targetScrapeConfigs[CloudflareConfigs]) > 0 && cloudflareMetrics == nil {
cloudflareMetrics = cloudflare.NewMetrics(reg)
}
if (len(targetScrapeConfigs[DockerConfigs]) > 0 || len(targetScrapeConfigs[DockerSDConfigs]) > 0) && dockerMetrics == nil {
if (len(targetScrapeConfigs[DockerSDConfigs]) > 0) && dockerMetrics == nil {
dockerMetrics = docker.NewMetrics(reg)
}
if len(targetScrapeConfigs[JournalScrapeConfigs]) > 0 && journalMetrics == nil {
Expand Down Expand Up @@ -269,16 +268,6 @@ func NewTargetManagers(
return nil, errors.Wrap(err, "failed to make cloudflare target manager")
}
targetManagers = append(targetManagers, cfTargetManager)
case DockerConfigs:
pos, err := getPositionFile()
if err != nil {
return nil, err
}
cfTargetManager, err := docker.NewTargetManager(dockerMetrics, logger, pos, client, scrapeConfigs)
if err != nil {
return nil, errors.Wrap(err, "failed to make Docker target manager")
}
targetManagers = append(targetManagers, cfTargetManager)
case DockerSDConfigs:
pos, err := getPositionFile()
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions pkg/loghttp/push/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ import (
"sort"
"time"

prometheustranslator "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/prometheus"
"github.com/pkg/errors"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheus"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/plog/plogotlp"
Expand Down Expand Up @@ -343,7 +343,7 @@ func attributeToLabels(k string, v pcommon.Value, prefix string) push.LabelsAdap
if prefix != "" {
keyWithPrefix = prefix + "_" + k
}
keyWithPrefix = prometheustranslator.NormalizeLabel(keyWithPrefix)
keyWithPrefix = prometheus.NormalizeLabel(keyWithPrefix)

typ := v.Type()
if typ == pcommon.ValueTypeMap {
Expand Down
Loading

0 comments on commit 3008a47

Please sign in to comment.