Skip to content

Commit

Permalink
Fix panic when rejecting empty batch (influxdata#5524)
Browse files Browse the repository at this point in the history
  • Loading branch information
danielnelson authored and otherpirate committed Mar 15, 2019
1 parent 3a37f2a commit 034002e
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 1 deletion.
7 changes: 6 additions & 1 deletion internal/models/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,10 @@ func (b *Buffer) Reject(batch []telegraf.Metric) {
b.Lock()
defer b.Unlock()

if len(batch) == 0 {
return
}

older := b.dist(b.first, b.batchFirst)
free := b.cap - b.size
restore := min(len(batch), free+older)
Expand All @@ -191,7 +195,8 @@ func (b *Buffer) Reject(batch []telegraf.Metric) {
rp := b.last
re := b.nextby(rp, restore)
b.last = re
for rb != rp {

for rb != rp && rp != re {
rp = b.prev(rp)
re = b.prev(re)

Expand Down
12 changes: 12 additions & 0 deletions internal/models/buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -714,3 +714,15 @@ func TestBuffer_AddOverwriteAndRejectOffset(t *testing.T) {
require.Equal(t, 13, reject)
require.Equal(t, 5, accept)
}

func TestBuffer_RejectEmptyBatch(t *testing.T) {
b := setup(NewBuffer("test", 5))
batch := b.Batch(2)
b.Add(MetricTime(1))
b.Reject(batch)
b.Add(MetricTime(2))
batch = b.Batch(2)
for _, m := range batch {
require.NotNil(t, m)
}
}

0 comments on commit 034002e

Please sign in to comment.