Skip to content

Commit 16be249

Browse files
codebienolegbespalov
authored andcommitted
A better error handling and test the case
1 parent 739739a commit 16be249

File tree

4 files changed

+90
-36
lines changed

4 files changed

+90
-36
lines changed

output/cloud/expv2/flush.go

+5-1
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,11 @@ func (f *metricsFlusher) flushBatches(batches []metricSetBuilder) error {
112112
default:
113113
}
114114
if err := f.push(chunk[i]); err != nil {
115-
errs <- err
115+
select {
116+
case errs <- err:
117+
case <-stop:
118+
return
119+
}
116120
}
117121
}
118122
}(batches[offset:end])

output/cloud/expv2/flush_test.go

+66-6
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package expv2
22

33
import (
4+
"errors"
45
"strconv"
56
"sync"
67
"sync/atomic"
@@ -39,6 +40,7 @@ func TestMetricSetBuilderAddTimeBucket(t *testing.T) {
3940
require.Len(t, msb.MetricSet.Metrics, 1)
4041
assert.Len(t, msb.MetricSet.Metrics[0].TimeSeries, 1)
4142
}
43+
4244
func TestMetricsFlusherFlushInBatchWithinBucket(t *testing.T) {
4345
t.Parallel()
4446

@@ -89,11 +91,11 @@ func TestMetricsFlusherFlushInBatchAcrossBuckets(t *testing.T) {
8991
t.Parallel()
9092

9193
testCases := []struct {
92-
series int
93-
expFlushCalls int
94+
series int
95+
expPushCalls int
9496
}{
95-
{series: 5, expFlushCalls: 2},
96-
{series: 2, expFlushCalls: 1},
97+
{series: 5, expPushCalls: 2},
98+
{series: 2, expPushCalls: 1},
9799
}
98100

99101
r := metrics.NewRegistry()
@@ -131,7 +133,7 @@ func TestMetricsFlusherFlushInBatchAcrossBuckets(t *testing.T) {
131133

132134
err := mf.flush()
133135
require.NoError(t, err)
134-
assert.Equal(t, tc.expFlushCalls, pm.timesCalled())
136+
assert.Equal(t, tc.expPushCalls, pm.timesCalled())
135137
}
136138
}
137139

@@ -214,7 +216,10 @@ func TestFlushWithReservedLabels(t *testing.T) {
214216
}
215217

216218
type pusherMock struct {
217-
hook func(*pbcloud.MetricSet)
219+
// hook is called when the push method is called.
220+
hook func(*pbcloud.MetricSet)
221+
// errFn if this defined, it is called instead
222+
errFn func() error
218223
pushCalled int64
219224
}
220225

@@ -228,5 +233,60 @@ func (pm *pusherMock) push(ms *pbcloud.MetricSet) error {
228233
}
229234

230235
atomic.AddInt64(&pm.pushCalled, 1)
236+
237+
if pm.errFn != nil {
238+
return pm.errFn()
239+
}
240+
231241
return nil
232242
}
243+
244+
func TestMetricsFlusherErrorCase(t *testing.T) {
245+
t.Parallel()
246+
247+
r := metrics.NewRegistry()
248+
m1 := r.MustNewMetric("metric1", metrics.Counter)
249+
250+
logger, _ := testutils.NewLoggerWithHook(t)
251+
252+
bq := &bucketQ{}
253+
pm := &pusherMock{
254+
errFn: func() error {
255+
return errors.New("some error")
256+
},
257+
}
258+
mf := metricsFlusher{
259+
bq: bq,
260+
client: pm,
261+
logger: logger,
262+
discardedLabels: make(map[string]struct{}),
263+
maxSeriesInBatch: 3,
264+
batchPushConcurrency: 5,
265+
}
266+
267+
series := 5
268+
expPushCalls := 2
269+
270+
bq.buckets = make([]timeBucket, 0, series)
271+
for i := 0; i < series; i++ {
272+
ts := metrics.TimeSeries{
273+
Metric: m1,
274+
Tags: r.RootTagSet().With("key1", "val"+strconv.Itoa(i)),
275+
}
276+
bq.Push([]timeBucket{
277+
{
278+
Time: int64(i) + 1,
279+
Sinks: map[metrics.TimeSeries]metricValue{
280+
ts: &counter{Sum: float64(1)},
281+
},
282+
},
283+
})
284+
}
285+
require.Len(t, bq.buckets, series)
286+
287+
err := mf.flush()
288+
require.Error(t, err)
289+
// since the push happens concurrently the number of the calls can be less
290+
// than expected
291+
assert.LessOrEqual(t, expPushCalls, pm.timesCalled())
292+
}

output/cloud/expv2/output.go

+17-25
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ import (
77
"errors"
88
"fmt"
99
"net/http"
10-
"runtime"
1110
"strconv"
1211
"sync"
1312
"time"
@@ -115,8 +114,7 @@ func (o *Output) Start() error {
115114
maxSeriesInBatch: int(o.config.MaxTimeSeriesInBatch.Int64),
116115
// TODO: when the migration from v1 is over
117116
// change the default of cloudapi.MetricPushConcurrency to use GOMAXPROCS(0)
118-
// batchPushConcurrency: int(o.config.MetricPushConcurrency.Int64),
119-
batchPushConcurrency: runtime.GOMAXPROCS(0),
117+
batchPushConcurrency: int(o.config.MetricPushConcurrency.Int64),
120118
}
121119

122120
o.runFlushWorkers()
@@ -185,31 +183,25 @@ func (o *Output) StopWithTestError(_ error) error {
185183
func (o *Output) runFlushWorkers() {
186184
t := time.NewTicker(o.config.MetricPushInterval.TimeDuration())
187185

188-
// TODO: drop it when we are sure of the new proposed architecture
189-
// workers := o.config.MetricPushConcurrency.Int64
190-
// Details: https://github.com/grafana/k6/issues/3192
191-
workers := 1
186+
o.wg.Add(1)
192187

193-
for i := 0; i < workers; i++ {
194-
o.wg.Add(1)
195-
go func() {
196-
defer func() {
197-
t.Stop()
198-
o.wg.Done()
199-
}()
188+
go func() {
189+
defer func() {
190+
t.Stop()
191+
o.wg.Done()
192+
}()
200193

201-
for {
202-
select {
203-
case <-t.C:
204-
o.flushMetrics()
205-
case <-o.stop:
206-
return
207-
case <-o.abort:
208-
return
209-
}
194+
for {
195+
select {
196+
case <-t.C:
197+
o.flushMetrics()
198+
case <-o.stop:
199+
return
200+
case <-o.abort:
201+
return
210202
}
211-
}()
212-
}
203+
}
204+
}()
213205
}
214206

215207
// AddMetricSamples receives the samples streaming.

output/cloud/expv2/output_test.go

+2-4
Original file line numberDiff line numberDiff line change
@@ -305,7 +305,7 @@ func TestOutputStopWithTestError(t *testing.T) {
305305
require.NoError(t, o.StopWithTestError(errors.New("an error")))
306306
}
307307

308-
func TestOutputFlushMetricsConcurrently(t *testing.T) {
308+
func TestOutputFlushTicks(t *testing.T) {
309309
t.Parallel()
310310

311311
done := make(chan struct{})
@@ -321,12 +321,10 @@ func TestOutputFlushMetricsConcurrently(t *testing.T) {
321321
close(done)
322322
return
323323
}
324-
<-done
325324
}
326325

327326
o := Output{logger: testutils.NewLogger(t)}
328-
o.config.MetricPushConcurrency = null.IntFrom(2)
329-
o.config.MetricPushInterval = types.NullDurationFrom(1) // loop
327+
o.config.MetricPushInterval = types.NullDurationFrom(100 * time.Millisecond) // loop
330328
o.flushing = flusherFunc(flusherMock)
331329
o.runFlushWorkers()
332330

0 commit comments

Comments
 (0)