Skip to content

Commit 739739a

Browse files
codebienolegbespalov
authored andcommitted
Concurrent flush for batches
1 parent 77d3f92 commit 739739a

File tree

3 files changed

+107
-38
lines changed

3 files changed

+107
-38
lines changed

output/cloud/expv2/flush.go

+67-16
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package expv2
22

33
import (
4+
"math"
5+
"sync"
46
"time"
57

68
"github.com/sirupsen/logrus"
@@ -21,6 +23,7 @@ type metricsFlusher struct {
2123
discardedLabels map[string]struct{}
2224
aggregationPeriodInSeconds uint32
2325
maxSeriesInBatch int
26+
batchPushConcurrency int
2427
}
2528

2629
// flush flushes the queued buckets sending them to the remote Cloud service.
@@ -43,45 +46,93 @@ func (f *metricsFlusher) flush() error {
4346
// the metricSetBuilder is used for doing it during the traverse of the buckets.
4447

4548
var (
46-
seriesCount int
47-
batchesCount int
48-
start = time.Now()
49+
start = time.Now()
50+
batches []metricSetBuilder
51+
seriesCount int
4952
)
5053

5154
defer func() {
5255
f.logger.
5356
WithField("t", time.Since(start)).
5457
WithField("series", seriesCount).
5558
WithField("buckets", len(buckets)).
56-
WithField("batches", batchesCount).Debug("Flush the queued buckets")
59+
WithField("batches", len(batches)).Debug("Flush the queued buckets")
5760
}()
5861

5962
msb := newMetricSetBuilder(f.testRunID, f.aggregationPeriodInSeconds)
6063
for i := 0; i < len(buckets); i++ {
6164
for timeSeries, sink := range buckets[i].Sinks {
6265
msb.addTimeSeries(buckets[i].Time, timeSeries, sink)
63-
if len(msb.seriesIndex) < f.maxSeriesInBatch {
66+
if len(msb.seriesIndex) <= f.maxSeriesInBatch {
6467
continue
6568
}
6669

67-
// we hit the batch size, let's flush
68-
batchesCount++
70+
// We hit the batch size, let's flush
6971
seriesCount += len(msb.seriesIndex)
70-
if err := f.push(msb); err != nil {
71-
return err
72-
}
72+
batches = append(batches, msb)
73+
74+
// Reset the builder
7375
msb = newMetricSetBuilder(f.testRunID, f.aggregationPeriodInSeconds)
7476
}
7577
}
7678

77-
if len(msb.seriesIndex) < 1 {
78-
return nil
79-
}
80-
8179
// send the last (or the unique) MetricSet chunk to the remote service
82-
batchesCount++
8380
seriesCount += len(msb.seriesIndex)
84-
return f.push(msb)
81+
batches = append(batches, msb)
82+
83+
return f.flushBatches(batches)
84+
}
85+
86+
func (f *metricsFlusher) flushBatches(batches []metricSetBuilder) error {
87+
var (
88+
wg = sync.WaitGroup{}
89+
errs = make(chan error)
90+
done = make(chan struct{})
91+
stop = make(chan struct{})
92+
93+
workers = int(math.Min(float64(len(batches)), float64(f.batchPushConcurrency)))
94+
chunkSize = len(batches) / workers
95+
)
96+
97+
wg.Add(workers)
98+
for workersIndex := 0; workersIndex < workers; workersIndex++ {
99+
offset := (workersIndex * chunkSize)
100+
end := offset + chunkSize
101+
if workersIndex == workers-1 {
102+
end = len(batches)
103+
}
104+
105+
go func(chunk []metricSetBuilder) {
106+
defer wg.Done()
107+
108+
for i := 0; i < len(chunk); i++ {
109+
select {
110+
case <-stop:
111+
return
112+
default:
113+
}
114+
if err := f.push(chunk[i]); err != nil {
115+
errs <- err
116+
}
117+
}
118+
}(batches[offset:end])
119+
}
120+
121+
go func() {
122+
defer close(done)
123+
wg.Wait()
124+
}()
125+
126+
for {
127+
select {
128+
case err := <-errs:
129+
close(stop)
130+
<-done
131+
return err
132+
case <-done:
133+
return nil
134+
}
135+
}
85136
}
86137

87138
// push sends the metric set to the remote service.

output/cloud/expv2/flush_test.go

+32-20
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ package expv2
22

33
import (
44
"strconv"
5+
"sync"
6+
"sync/atomic"
57
"testing"
68

79
"github.com/sirupsen/logrus"
@@ -56,11 +58,12 @@ func TestMetricsFlusherFlushInBatchWithinBucket(t *testing.T) {
5658
bq := &bucketQ{}
5759
pm := &pusherMock{}
5860
mf := metricsFlusher{
59-
bq: bq,
60-
client: pm,
61-
logger: logger,
62-
discardedLabels: make(map[string]struct{}),
63-
maxSeriesInBatch: 3,
61+
bq: bq,
62+
client: pm,
63+
logger: logger,
64+
discardedLabels: make(map[string]struct{}),
65+
maxSeriesInBatch: 3,
66+
batchPushConcurrency: 5,
6467
}
6568

6669
bq.buckets = make([]timeBucket, 0, tc.series)
@@ -78,7 +81,7 @@ func TestMetricsFlusherFlushInBatchWithinBucket(t *testing.T) {
7881
bq.Push([]timeBucket{{Time: 1, Sinks: sinks}})
7982
err := mf.flush()
8083
require.NoError(t, err)
81-
assert.Equal(t, tc.expFlushCalls, pm.pushCalled)
84+
assert.Equal(t, tc.expFlushCalls, pm.timesCalled())
8285
}
8386
}
8487

@@ -101,11 +104,12 @@ func TestMetricsFlusherFlushInBatchAcrossBuckets(t *testing.T) {
101104
bq := &bucketQ{}
102105
pm := &pusherMock{}
103106
mf := metricsFlusher{
104-
bq: bq,
105-
client: pm,
106-
logger: logger,
107-
discardedLabels: make(map[string]struct{}),
108-
maxSeriesInBatch: 3,
107+
bq: bq,
108+
client: pm,
109+
logger: logger,
110+
discardedLabels: make(map[string]struct{}),
111+
maxSeriesInBatch: 3,
112+
batchPushConcurrency: 5,
109113
}
110114

111115
bq.buckets = make([]timeBucket, 0, tc.series)
@@ -127,7 +131,7 @@ func TestMetricsFlusherFlushInBatchAcrossBuckets(t *testing.T) {
127131

128132
err := mf.flush()
129133
require.NoError(t, err)
130-
assert.Equal(t, tc.expFlushCalls, pm.pushCalled)
134+
assert.Equal(t, tc.expFlushCalls, pm.timesCalled())
131135
}
132136
}
133137

@@ -136,21 +140,25 @@ func TestFlushWithReservedLabels(t *testing.T) {
136140

137141
logger, hook := testutils.NewLoggerWithHook(t)
138142

143+
mutex := sync.Mutex{}
139144
collected := make([]*pbcloud.MetricSet, 0)
140145

141146
bq := &bucketQ{}
142147
pm := &pusherMock{
143148
hook: func(ms *pbcloud.MetricSet) {
149+
mutex.Lock()
144150
collected = append(collected, ms)
151+
mutex.Unlock()
145152
},
146153
}
147154

148155
mf := metricsFlusher{
149-
bq: bq,
150-
client: pm,
151-
maxSeriesInBatch: 2,
152-
logger: logger,
153-
discardedLabels: make(map[string]struct{}),
156+
bq: bq,
157+
client: pm,
158+
maxSeriesInBatch: 2,
159+
logger: logger,
160+
discardedLabels: make(map[string]struct{}),
161+
batchPushConcurrency: 5,
154162
}
155163

156164
r := metrics.NewRegistry()
@@ -186,7 +194,7 @@ func TestFlushWithReservedLabels(t *testing.T) {
186194
require.NoError(t, err)
187195

188196
loglines := hook.Drain()
189-
assert.Equal(t, 1, len(collected))
197+
require.Len(t, collected, 1)
190198

191199
// check that warnings sown only once per label
192200
assert.Len(t, testutils.FilterEntries(loglines, logrus.WarnLevel, "Tag __name__ has been discarded since it is reserved for Cloud operations."), 1)
@@ -207,14 +215,18 @@ func TestFlushWithReservedLabels(t *testing.T) {
207215

208216
type pusherMock struct {
209217
hook func(*pbcloud.MetricSet)
210-
pushCalled int
218+
pushCalled int64
219+
}
220+
221+
func (pm *pusherMock) timesCalled() int {
222+
return int(atomic.LoadInt64(&pm.pushCalled))
211223
}
212224

213225
func (pm *pusherMock) push(ms *pbcloud.MetricSet) error {
214226
if pm.hook != nil {
215227
pm.hook(ms)
216228
}
217229

218-
pm.pushCalled++
230+
atomic.AddInt64(&pm.pushCalled, 1)
219231
return nil
220232
}

output/cloud/expv2/output.go

+8-2
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"errors"
88
"fmt"
99
"net/http"
10+
"runtime"
1011
"strconv"
1112
"sync"
1213
"time"
@@ -112,6 +113,10 @@ func (o *Output) Start() error {
112113
discardedLabels: make(map[string]struct{}),
113114
aggregationPeriodInSeconds: uint32(o.config.AggregationPeriod.TimeDuration().Seconds()),
114115
maxSeriesInBatch: int(o.config.MaxTimeSeriesInBatch.Int64),
116+
// TODO: when the migration from v1 is over
117+
// change the default of cloudapi.MetricPushConcurrency to use GOMAXPROCS(0)
118+
// batchPushConcurrency: int(o.config.MetricPushConcurrency.Int64),
119+
batchPushConcurrency: runtime.GOMAXPROCS(0),
115120
}
116121

117122
o.runFlushWorkers()
@@ -178,12 +183,13 @@ func (o *Output) StopWithTestError(_ error) error {
178183
}
179184

180185
func (o *Output) runFlushWorkers() {
186+
t := time.NewTicker(o.config.MetricPushInterval.TimeDuration())
187+
188+
// TODO: drop it when we are sure of the new proposed architecture
181189
// workers := o.config.MetricPushConcurrency.Int64
182190
// Details: https://github.com/grafana/k6/issues/3192
183191
workers := 1
184192

185-
t := time.NewTicker(o.config.MetricPushInterval.TimeDuration())
186-
187193
for i := 0; i < workers; i++ {
188194
o.wg.Add(1)
189195
go func() {

0 commit comments

Comments
 (0)