Skip to content

Commit

Permalink
Adding a configurable timeout for calls to the loki server. (#368)
Browse files Browse the repository at this point in the history
Adding a configurable timeout for calls to the loki server.
Updating promtail test to set sane defaults for all configurations and then override for testing.
Removing .idea folder from .gitignore, we should encourage people (and myself) to use global ignore for IDE configs
  • Loading branch information
slim-bean authored Mar 5, 2019
1 parent a789aca commit 19260d4
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 29 deletions.
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,3 @@ cmd/loki/loki
cmd/promtail/promtail
/loki
/promtail
.idea/
7 changes: 6 additions & 1 deletion pkg/promtail/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ type Config struct {

BackoffConfig util.BackoffConfig `yaml:"backoff_config"`
ExternalLabels model.LabelSet `yaml:"external_labels,omitempty"`
Timeout time.Duration `yaml:"timeout"`
}

// RegisterFlags registers flags.
Expand All @@ -65,6 +66,7 @@ func (c *Config) RegisterFlags(flags *flag.FlagSet) {
flag.IntVar(&c.BackoffConfig.MaxRetries, "client.max-retries", 5, "Maximum number of retires when sending batches.")
flag.DurationVar(&c.BackoffConfig.MinBackoff, "client.min-backoff", 100*time.Millisecond, "Initial backoff time between retries.")
flag.DurationVar(&c.BackoffConfig.MaxBackoff, "client.max-backoff", 5*time.Second, "Maximum backoff time between retries.")
flag.DurationVar(&c.Timeout, "client.timeout", 10*time.Second, "Maximum time to wait for server to respond to a request")
}

// Client for pushing logs in snappy-compressed protos over HTTP.
Expand Down Expand Up @@ -157,6 +159,7 @@ func (c *Client) sendBatch(batch map[model.Fingerprint]*logproto.Stream) {
start := time.Now()
status, err = c.send(ctx, buf)
requestDuration.WithLabelValues(strconv.Itoa(status)).Observe(time.Since(start).Seconds())

if err == nil {
return
}
Expand All @@ -166,7 +169,7 @@ func (c *Client) sendBatch(batch map[model.Fingerprint]*logproto.Stream) {
break
}

level.Warn(c.logger).Log("msg", "error sending batch", "status", status, "error", err)
level.Warn(c.logger).Log("msg", "error sending batch, will retry", "status", status, "error", err)
backoff.Wait()
}

Expand All @@ -191,6 +194,8 @@ func encodeBatch(batch map[model.Fingerprint]*logproto.Stream) ([]byte, error) {
}

func (c *Client) send(ctx context.Context, buf []byte) (int, error) {
ctx, cancel := context.WithTimeout(ctx, c.cfg.Timeout)
defer cancel()
req, err := http.NewRequest("POST", c.cfg.URL.String(), bytes.NewReader(buf))
if err != nil {
return -1, err
Expand Down
40 changes: 13 additions & 27 deletions pkg/promtail/promtail_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,12 @@ import (
"github.com/prometheus/common/model"
sd_config "github.com/prometheus/prometheus/discovery/config"
"github.com/prometheus/prometheus/discovery/targetgroup"
"github.com/weaveworks/common/server"

"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/parser"
"github.com/grafana/loki/pkg/promtail/api"
"github.com/grafana/loki/pkg/promtail/client"
"github.com/grafana/loki/pkg/promtail/config"
"github.com/grafana/loki/pkg/promtail/positions"
"github.com/grafana/loki/pkg/promtail/scrape"
"github.com/grafana/loki/pkg/promtail/targets"
)

func TestPromtail(t *testing.T) {
Expand Down Expand Up @@ -350,17 +346,17 @@ func buildTestConfig(t *testing.T, positionsFileName string, logDirName string)
t.Fatal("Failed to parse client URL")
}

clientConfig := client.Config{
URL: clientURL,
BatchWait: 10 * time.Millisecond,
BatchSize: 10 * 1024,
ExternalLabels: nil,
}
cfg := config.Config{}
// Init everything with default values.
flagext.RegisterFlags(&cfg)

positionsConfig := positions.Config{
SyncPeriod: 100 * time.Millisecond,
PositionsFile: positionsFileName,
}
// Override some of those defaults
cfg.ClientConfig.URL = clientURL
cfg.ClientConfig.BatchWait = 10 * time.Millisecond
cfg.ClientConfig.BatchSize = 10 * 1024

cfg.PositionsConfig.SyncPeriod = 100 * time.Millisecond
cfg.PositionsConfig.PositionsFile = positionsFileName

targetGroup := targetgroup.Group{
Targets: []model.LabelSet{{
Expand All @@ -385,21 +381,11 @@ func buildTestConfig(t *testing.T, positionsFileName string, logDirName string)
RelabelConfigs: nil,
ServiceDiscoveryConfig: serviceConfig,
}
cfg.ScrapeConfig = append(cfg.ScrapeConfig, scrapeConfig)

targetConfig := targets.Config{
SyncPeriod: 10 * time.Millisecond,
}

return config.Config{
ServerConfig: server.Config{},
ClientConfig: clientConfig,
PositionsConfig: positionsConfig,
ScrapeConfig: []scrape.Config{
scrapeConfig,
},
TargetConfig: targetConfig,
}
cfg.TargetConfig.SyncPeriod = 10 * time.Millisecond

return cfg
}

func initRandom() {
Expand Down

0 comments on commit 19260d4

Please sign in to comment.