Skip to content

Commit e2ecd79

Browse files
committed
output/influxdb: use the async periodic flusher
1 parent 1958594 commit e2ecd79

File tree

2 files changed

+18
-16
lines changed

2 files changed

+18
-16
lines changed

output/influxdb/config.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ func NewConfig() Config {
5959
Addr: null.NewString("http://localhost:8086", false),
6060
DB: null.NewString("k6", false),
6161
TagsAsFields: []string{"vu", "iter", "url"},
62-
ConcurrentWrites: null.NewInt(10, false),
62+
ConcurrentWrites: null.NewInt(4, false),
6363
PushInterval: types.NewNullDuration(time.Second, false),
6464
}
6565
return c

output/influxdb/output.go

+17-15
Original file line numberDiff line numberDiff line change
@@ -58,9 +58,8 @@ type Output struct {
5858
Config Config
5959
BatchConf client.BatchPointsConfig
6060

61-
logger logrus.FieldLogger
62-
semaphoreCh chan struct{}
63-
fieldKinds map[string]FieldKind
61+
logger logrus.FieldLogger
62+
fieldKinds map[string]FieldKind
6463
}
6564

6665
// New returns new influxdb output
@@ -87,11 +86,10 @@ func newOutput(params output.Params) (*Output, error) {
8786
logger: params.Logger.WithFields(logrus.Fields{
8887
"output": "InfluxDBv1",
8988
}),
90-
Client: cl,
91-
Config: conf,
92-
BatchConf: batchConf,
93-
semaphoreCh: make(chan struct{}, conf.ConcurrentWrites.Int64),
94-
fieldKinds: fldKinds,
89+
Client: cl,
90+
Config: conf,
91+
BatchConf: batchConf,
92+
fieldKinds: fldKinds,
9593
}, err
9694
}
9795

@@ -178,12 +176,15 @@ func (o *Output) Start() error {
178176
// usually means we're either a non-admin user to an existing DB or connecting over UDP.
179177
_, err := o.Client.Query(client.NewQuery("CREATE DATABASE "+o.BatchConf.Database, "", ""))
180178
if err != nil {
181-
o.logger.WithError(err).Debug("InfluxDB: Couldn't create database; most likely harmless")
179+
o.logger.WithError(err).Debug("Couldn't create database; most likely harmless")
182180
}
183181

184-
pf, err := output.NewPeriodicFlusher(time.Duration(o.Config.PushInterval.Duration), o.flushMetrics)
182+
pf, err := output.NewAsyncPeriodicFlusher(
183+
time.Duration(o.Config.PushInterval.Duration),
184+
int(o.Config.ConcurrentWrites.Int64),
185+
o.flushMetrics)
185186
if err != nil {
186-
return err //nolint:wrapcheck
187+
return err
187188
}
188189
o.logger.Debug("Started!")
189190
o.periodicFlusher = pf
@@ -201,11 +202,11 @@ func (o *Output) Stop() error {
201202

202203
func (o *Output) flushMetrics() {
203204
samples := o.GetBufferedSamples()
205+
if len(samples) < 1 {
206+
o.logger.Debug("Any buffered samples, skipping the flush operation")
207+
return
208+
}
204209

205-
o.semaphoreCh <- struct{}{}
206-
defer func() {
207-
<-o.semaphoreCh
208-
}()
209210
o.logger.Debug("Committing...")
210211
o.logger.WithField("samples", len(samples)).Debug("Writing...")
211212

@@ -219,6 +220,7 @@ func (o *Output) flushMetrics() {
219220
startTime := time.Now()
220221
if err := o.Client.Write(batch); err != nil {
221222
o.logger.WithError(err).Error("Couldn't write stats")
223+
return
222224
}
223225
t := time.Since(startTime)
224226
o.logger.WithField("t", t).Debug("Batch written!")

0 commit comments

Comments
 (0)