Skip to content

Commit 9bbfdb0

Browse files
committed
Fix concurrent problem with tracking head
1 parent 7f82020 commit 9bbfdb0

File tree

2 files changed

+21
-6
lines changed

2 files changed

+21
-6
lines changed

src/kafka-net/Common/ConcurrentCircularBuffer.cs

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ namespace KafkaNet.Common
88
public class ConcurrentCircularBuffer<T> : IEnumerable<T>
99
{
1010
private readonly int _maxSize;
11-
private long _count = 0;
11+
private long _count;
1212
private int _head = -1;
1313
readonly T[] _values;
1414

@@ -30,13 +30,18 @@ public long Count
3030

3131
public ConcurrentCircularBuffer<T> Enqueue(T obj)
3232
{
33-
if (Interlocked.Increment(ref _head) > (_maxSize - 1))
34-
Interlocked.Exchange(ref _head, 0);
33+
var head = Interlocked.Increment(ref _head);
3534

36-
_values[_head] = obj;
35+
if (head > _maxSize - 1)
36+
{
37+
Interlocked.Exchange(ref _head, head - _maxSize);
38+
head = head - _maxSize;
39+
}
40+
41+
_values[head] = obj;
3742

38-
Interlocked.Exchange(ref _count,
39-
Math.Min(Interlocked.Increment(ref _count), _maxSize));
43+
if (_count != _maxSize) //once we hit max size we dont need to track count.
44+
Interlocked.Exchange(ref _count, Math.Min(Interlocked.Increment(ref _count), _maxSize));
4045

4146
return this;
4247
}

src/kafka-tests/Unit/CircularBufferTests.cs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
using System.Linq;
2+
using System.Threading.Tasks;
23
using KafkaNet.Common;
34
using NUnit.Framework;
45

@@ -66,5 +67,14 @@ public void EnqueueShouldAddToFirstSlot()
6667
buffer.Enqueue(1);
6768
Assert.That(buffer.First(), Is.EqualTo(1));
6869
}
70+
71+
[Test]
72+
public void ConcurrentEnqueueShouldNotLeakIndexPosition()
73+
{
74+
var buffer = new ConcurrentCircularBuffer<int>(10);
75+
Parallel.For(0, 100000, i => buffer.Enqueue(i));
76+
Assert.That(buffer.Count, Is.EqualTo(10));
77+
78+
}
6979
}
7080
}

0 commit comments

Comments
 (0)