diff --git a/internal/models/buffer.go b/internal/models/buffer.go index 7b27e6686b222..5d036c7280716 100644 --- a/internal/models/buffer.go +++ b/internal/models/buffer.go @@ -182,6 +182,10 @@ func (b *Buffer) Reject(batch []telegraf.Metric) { b.Lock() defer b.Unlock() + if len(batch) == 0 { + return + } + older := b.dist(b.first, b.batchFirst) free := b.cap - b.size restore := min(len(batch), free+older) @@ -191,7 +195,8 @@ func (b *Buffer) Reject(batch []telegraf.Metric) { rp := b.last re := b.nextby(rp, restore) b.last = re - for rb != rp { + + for rb != rp && rp != re { rp = b.prev(rp) re = b.prev(re) diff --git a/internal/models/buffer_test.go b/internal/models/buffer_test.go index 892af8bd4039f..bc19680d18b6b 100644 --- a/internal/models/buffer_test.go +++ b/internal/models/buffer_test.go @@ -714,3 +714,15 @@ func TestBuffer_AddOverwriteAndRejectOffset(t *testing.T) { require.Equal(t, 13, reject) require.Equal(t, 5, accept) } + +func TestBuffer_RejectEmptyBatch(t *testing.T) { + b := setup(NewBuffer("test", 5)) + batch := b.Batch(2) + b.Add(MetricTime(1)) + b.Reject(batch) + b.Add(MetricTime(2)) + batch = b.Batch(2) + for _, m := range batch { + require.NotNil(t, m) + } +}