Skip to content

Commit 46f68c8

Browse files
committed
output/influxdb: use the async periodic flusher
1 parent f0329ef commit 46f68c8

File tree

2 files changed

+36
-20
lines changed

2 files changed

+36
-20
lines changed

output/influxdb/config.go

+14-5
Original file line numberDiff line numberDiff line change
@@ -56,11 +56,20 @@ type Config struct {
5656
// NewConfig creates a new InfluxDB output config with some default values.
5757
func NewConfig() Config {
5858
c := Config{
59-
Addr: null.NewString("http://localhost:8086", false),
60-
DB: null.NewString("k6", false),
61-
TagsAsFields: []string{"vu", "iter", "url"},
62-
ConcurrentWrites: null.NewInt(10, false),
63-
PushInterval: types.NewNullDuration(time.Second, false),
59+
Addr: null.NewString("http://localhost:8086", false),
60+
DB: null.NewString("k6", false),
61+
TagsAsFields: []string{"vu", "iter", "url"},
62+
PushInterval: types.NewNullDuration(time.Second, false),
63+
64+
// The minimum value of pow(2, N) for handling a stressful situation
65+
// with the default push interval set to 1s.
66+
// Concurrency is not expected for the normal use-case,
67+
// the response time should be lower than the push interval set value.
68+
// In case of spikes, the response time could go around 2s,
69+
// higher values will highlight a not sustainable situation
70+
// and the user should adjust the executed script
71+
// or the configuration based on the environment and rate expected.
72+
ConcurrentWrites: null.NewInt(4, false),
6473
}
6574
return c
6675
}

output/influxdb/output.go

+22-15
Original file line numberDiff line numberDiff line change
@@ -58,9 +58,8 @@ type Output struct {
5858
Config Config
5959
BatchConf client.BatchPointsConfig
6060

61-
logger logrus.FieldLogger
62-
semaphoreCh chan struct{}
63-
fieldKinds map[string]FieldKind
61+
logger logrus.FieldLogger
62+
fieldKinds map[string]FieldKind
6463
}
6564

6665
// New returns new influxdb output
@@ -87,11 +86,10 @@ func newOutput(params output.Params) (*Output, error) {
8786
logger: params.Logger.WithFields(logrus.Fields{
8887
"output": "InfluxDBv1",
8988
}),
90-
Client: cl,
91-
Config: conf,
92-
BatchConf: batchConf,
93-
semaphoreCh: make(chan struct{}, conf.ConcurrentWrites.Int64),
94-
fieldKinds: fldKinds,
89+
Client: cl,
90+
Config: conf,
91+
BatchConf: batchConf,
92+
fieldKinds: fldKinds,
9593
}, err
9694
}
9795

@@ -178,12 +176,15 @@ func (o *Output) Start() error {
178176
// usually means we're either a non-admin user to an existing DB or connecting over UDP.
179177
_, err := o.Client.Query(client.NewQuery("CREATE DATABASE "+o.BatchConf.Database, "", ""))
180178
if err != nil {
181-
o.logger.WithError(err).Debug("InfluxDB: Couldn't create database; most likely harmless")
179+
o.logger.WithError(err).Debug("Couldn't create database; most likely harmless")
182180
}
183181

184-
pf, err := output.NewPeriodicFlusher(time.Duration(o.Config.PushInterval.Duration), o.flushMetrics)
182+
pf, err := output.NewAsyncPeriodicFlusher(
183+
time.Duration(o.Config.PushInterval.Duration),
184+
int(o.Config.ConcurrentWrites.Int64),
185+
o.flushMetrics)
185186
if err != nil {
186-
return err //nolint:wrapcheck
187+
return err
187188
}
188189
o.logger.Debug("Started!")
189190
o.periodicFlusher = pf
@@ -201,11 +202,11 @@ func (o *Output) Stop() error {
201202

202203
func (o *Output) flushMetrics() {
203204
samples := o.GetBufferedSamples()
205+
if len(samples) < 1 {
206+
o.logger.Debug("Any buffered samples, skipping the flush operation")
207+
return
208+
}
204209

205-
o.semaphoreCh <- struct{}{}
206-
defer func() {
207-
<-o.semaphoreCh
208-
}()
209210
o.logger.Debug("Committing...")
210211
o.logger.WithField("samples", len(samples)).Debug("Writing...")
211212

@@ -219,7 +220,13 @@ func (o *Output) flushMetrics() {
219220
startTime := time.Now()
220221
if err := o.Client.Write(batch); err != nil {
221222
o.logger.WithError(err).Error("Couldn't write stats")
223+
return
222224
}
223225
t := time.Since(startTime)
224226
o.logger.WithField("t", t).Debug("Batch written!")
227+
228+
if t > time.Duration(o.Config.PushInterval.Duration) {
229+
o.logger.WithField("t", t).
230+
Warn("The flush operation took higher than the expected set push interval. If you see this message multiple times then the setup or configuration need to be adjusted to achieve a sustainable rate.") //nolint:lll
231+
}
225232
}

0 commit comments

Comments
 (0)