Skip to content

Commit

Permalink
Alt: replace global pool with per-session pool.
Browse files Browse the repository at this point in the history
  • Loading branch information
buybackoff committed Jan 10, 2020
1 parent 248d2dd commit 3203df4
Show file tree
Hide file tree
Showing 10 changed files with 233 additions and 184 deletions.
1 change: 1 addition & 0 deletions src/Abc.Zerio.Tests/Abc.Zerio.Tests.csproj
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>netcoreapp3.0</TargetFramework>
<TieredCompilation>False</TieredCompilation>
</PropertyGroup>
<ItemGroup>
<ProjectReference Include="..\Abc.Zerio\Abc.Zerio.csproj" />
Expand Down
9 changes: 4 additions & 5 deletions src/Abc.Zerio.Tests/BufferPoolTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public struct TestStruct
[Test, Explicit("Long running")]
public void ConcurrentBagTest()
{
int iterations = 100_000_000;
int iterations = 10_000_000;
int count = 0;
var capacity = 64 * 1024;
var cb1 = new ConcurrentBag<TestStruct>();
Expand Down Expand Up @@ -83,7 +83,7 @@ public void ConcurrentBagTest()
[Test, Explicit("Long running")]
public void ConcurrentQueueTest()
{
int iterations = 100_000_000;
int iterations = 10_000_000;
int count = 0;
var capacity = 64 * 1024;
var cb1 = new ConcurrentQueue<TestStruct>();
Expand Down Expand Up @@ -142,7 +142,7 @@ public void ConcurrentQueueTest()
[Test, Explicit("Long running")]
public void ConcurrentStackTest()
{
int iterations = 100_000_000;
int iterations = 10_000_000;
int count = 0;
var capacity = 64 * 1024;
var cb1 = new ConcurrentStack<TestStruct>();
Expand Down Expand Up @@ -198,11 +198,10 @@ public void ConcurrentStackTest()
Console.WriteLine($"MOPS: {2 * mops:N2}");
}


[Test, Explicit("Long running")]
public void BoundedLocalPoolTest()
{
int iterations = 100_000_000;
int iterations = 10_000_000;
int count = 0;
var capacity = 64 * 1024;
var cb1 = new BoundedLocalPool<TestStruct>(capacity);
Expand Down
48 changes: 8 additions & 40 deletions src/Abc.Zerio/Alt/Buffers/BoundedLocalPool.cs
Original file line number Diff line number Diff line change
@@ -1,35 +1,26 @@
using System;
using System.Collections.Concurrent;
using System.Runtime.CompilerServices;
using System.Threading;

namespace Abc.Zerio.Alt.Buffers
{
public class BoundedLocalPool<T>
{
private readonly ConcurrentQueue<T> _queue = new ConcurrentQueue<T>();
private readonly int _capacity;
private readonly T[] _items;
private int _mask;

// long is enough for 68 years at int.Max operations per second.
// [... head<- items -> tail ...]
private long _tail;
private long _head;

private SpinLock _spinLock = new SpinLock(false);


public BoundedLocalPool(int capacity)
{
if (!Utils.IsPowerOfTwo(capacity))
throw new ArgumentException("Capacity must be a power of 2");
_capacity = capacity;
_items = new T[_capacity];
_mask = _capacity - 1;
}

public int Count
{
[MethodImpl(MethodImplOptions.AggressiveInlining)]
get => (int)(Volatile.Read(ref _tail) - Volatile.Read(ref _head));
get => _queue.Count; // (Volatile.Read(ref _tail) - Volatile.Read(ref _head));
}

/// <summary>
Expand All @@ -38,11 +29,9 @@ public int Count
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void Return(T item)
{
if (Count == _capacity)
if (Count >= _capacity)
ThrowExceededCapacity();
var idx = (int)(_tail & _mask);
_items[idx] = item;
Volatile.Write(ref _tail, Volatile.Read(ref _tail) + 1);
_queue.Enqueue(item);
}

[MethodImpl(MethodImplOptions.NoInlining)]
Expand All @@ -56,31 +45,10 @@ private static void ThrowExceededCapacity()
/// </summary>
/// <param name="item"></param>
/// <returns></returns>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public bool TryRent(out T item)
{
item = default;
if (Count == 0)
return false;

var taken = false;
try
{
_spinLock.Enter(ref taken);

if (Count == 0)
return false;

var idx = (int)(_head & _mask);
item = _items[idx];
Volatile.Write(ref _head, Volatile.Read(ref _head) + 1);
}
finally
{
if (taken)
_spinLock.Exit(useMemoryBarrier: false);
}

return false;
return _queue.TryDequeue(out item);
}
}
}
20 changes: 20 additions & 0 deletions src/Abc.Zerio/Alt/Buffers/BufferPoolOptions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
namespace Abc.Zerio.Alt.Buffers
{
public class BufferPoolOptions
{
public int SendSegmentCount { get; }
public int SendSegmentSize { get; }
public int ReceiveSegmentCount { get; }
public int ReceiveSegmentSize { get; }
public bool UseSharedSendPool { get; }

public BufferPoolOptions(int sendSegmentCount = 2048, int sendSegmentSize = 2048, int receiveSegmentCount = 128, int receiveSegmentSize = 16384, bool useSharedSendPool = true)
{
SendSegmentCount = sendSegmentCount;
SendSegmentSize = sendSegmentSize;
ReceiveSegmentCount = receiveSegmentCount;
ReceiveSegmentSize = receiveSegmentSize;
UseSharedSendPool = useSharedSendPool;
}
}
}
13 changes: 2 additions & 11 deletions src/Abc.Zerio/Alt/Buffers/ISegmentPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,7 @@

namespace Abc.Zerio.Alt.Buffers
{
internal interface ISegmentPool : IDisposable
{
bool TryRent(out RioSegment segment);
void Return(RioSegment segment);
void AddBuffer(RegisteredBuffer buffer);
RegisteredBuffer GetBuffer(int bufferId);
int SegmentLength { get; }
}

internal class DefaultSegmentPool : CriticalFinalizerObject, ISegmentPool
internal class SharedSegmentPool : CriticalFinalizerObject
{
private readonly List<RegisteredBuffer> _buffers = new List<RegisteredBuffer>(64);
private volatile int _capacity;
Expand All @@ -25,7 +16,7 @@ internal class DefaultSegmentPool : CriticalFinalizerObject, ISegmentPool
private readonly CancellationToken _ct;
private ConcurrentQueue<RioSegment> _queue;

public DefaultSegmentPool(byte poolId, int segmentLength, CancellationToken ct = default)
public SharedSegmentPool(byte poolId, int segmentLength, CancellationToken ct = default)
{
_poolId = poolId;
SegmentLength = segmentLength;
Expand Down
51 changes: 15 additions & 36 deletions src/Abc.Zerio/Alt/Buffers/RioBufferPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,35 +5,17 @@

namespace Abc.Zerio.Alt.Buffers
{
public class BufferPoolOptions
{
public int SendSegmentCount { get; }
public int SendSegmentSize { get; }
public int ReceiveSegmentCount { get; }
public int ReceiveSegmentSize { get; }
public bool UseSharedSendPool { get; }

public BufferPoolOptions(int sendSegmentCount = 2048, int sendSegmentSize = 2048, int receiveSegmentCount = 128, int receiveSegmentSize = 16384, bool useSharedSendPool = true)
{
SendSegmentCount = sendSegmentCount;
SendSegmentSize = sendSegmentSize;
ReceiveSegmentCount = receiveSegmentCount;
ReceiveSegmentSize = receiveSegmentSize;
UseSharedSendPool = useSharedSendPool;
}
}

internal class RioBufferPool : IDisposable
{
private readonly ISegmentPool _sharedSendPool;
private readonly SharedSegmentPool _sharedSegmentPool;
private const byte _sharedSendPoolId = 1;

public RioBufferPool(BufferPoolOptions options = null, CancellationToken ct = default)
{
options = options ?? new BufferPoolOptions();
options ??= new BufferPoolOptions();
Options = options;
CancellationToken = ct;
_sharedSendPool = new DefaultSegmentPool(_sharedSendPoolId, options.SendSegmentSize, ct);
_sharedSegmentPool = new SharedSegmentPool(_sharedSendPoolId, options.SendSegmentSize, ct);

if (Options.UseSharedSendPool)
{
Expand All @@ -48,31 +30,28 @@ public RioBufferPool(BufferPoolOptions options = null, CancellationToken ct = de

private RegisteredBuffer AllocateBuffer()
{
var buffer = new RegisteredBuffer(Options.SendSegmentCount, Options.SendSegmentSize);
_sharedSendPool.AddBuffer(buffer);
var buffer = new RegisteredBuffer(Options.SendSegmentCount * 2, Options.SendSegmentSize);
buffer.PoolId = _sharedSendPoolId;
_sharedSegmentPool.AddBuffer(buffer);
Debug.Assert(buffer.IsPooled);
return buffer;
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
public bool TryRent(out RioSegment segment)
{
return _sharedSendPool.TryRent(out segment);
if (_sharedSegmentPool.TryRent(out segment))
{
Debug.Assert(segment.Id.PoolId == _sharedSendPoolId, "segment.Id.PoolId == _sharedSendPoolId");
return true;
}
return false;
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void Return(RioSegment segment)
{
_sharedSendPool.Return(segment);
}

public void Return(long correlationId)
{
var id = new BufferSegmentId(correlationId);
if (id.PoolId != _sharedSendPoolId)
throw new InvalidOperationException();
var segment = _sharedSendPool.GetBuffer(id.BufferId)[id.SegmentId];
_sharedSendPool.Return(segment);
_sharedSegmentPool.Return(segment);
}

public RioSegment this[long correlationId]
Expand All @@ -88,12 +67,12 @@ public RioSegment this[long correlationId]
public RioSegment this[BufferSegmentId id]
{
[MethodImpl(MethodImplOptions.AggressiveInlining)]
get => _sharedSendPool.GetBuffer(id.BufferId)[id.SegmentId];
get => _sharedSegmentPool.GetBuffer(id.BufferId)[id.SegmentId];
}

public void Dispose()
{
_sharedSendPool.Dispose();
_sharedSegmentPool.Dispose();
}
}
}
6 changes: 6 additions & 0 deletions src/Abc.Zerio/Alt/Poller.cs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
using System;
using System.Runtime.CompilerServices;
using System.Threading;

namespace Abc.Zerio.Alt
{
internal class Poller : IDisposable
{
public int ThreadId { get; private set; }
// TODO Support WSA events

private readonly CancellationToken _ct;
Expand Down Expand Up @@ -67,8 +69,12 @@ private int Wait()
return 0;
}

#if NETCOREAPP3_0
[MethodImpl(MethodImplOptions.AggressiveOptimization)]
#endif
private void Poll()
{
ThreadId = Thread.CurrentThread.ManagedThreadId;
while (!_ct.IsCancellationRequested)
{
try
Expand Down
Loading

0 comments on commit 3203df4

Please sign in to comment.