diff --git a/src/Abc.Zerio.Tests/Abc.Zerio.Tests.csproj b/src/Abc.Zerio.Tests/Abc.Zerio.Tests.csproj index da61a39..1a9facc 100644 --- a/src/Abc.Zerio.Tests/Abc.Zerio.Tests.csproj +++ b/src/Abc.Zerio.Tests/Abc.Zerio.Tests.csproj @@ -1,6 +1,7 @@  netcoreapp3.0 + False diff --git a/src/Abc.Zerio.Tests/BufferPoolTest.cs b/src/Abc.Zerio.Tests/BufferPoolTest.cs index 93e089f..af3b698 100644 --- a/src/Abc.Zerio.Tests/BufferPoolTest.cs +++ b/src/Abc.Zerio.Tests/BufferPoolTest.cs @@ -23,7 +23,7 @@ public struct TestStruct [Test, Explicit("Long running")] public void ConcurrentBagTest() { - int iterations = 100_000_000; + int iterations = 10_000_000; int count = 0; var capacity = 64 * 1024; var cb1 = new ConcurrentBag(); @@ -83,7 +83,7 @@ public void ConcurrentBagTest() [Test, Explicit("Long running")] public void ConcurrentQueueTest() { - int iterations = 100_000_000; + int iterations = 10_000_000; int count = 0; var capacity = 64 * 1024; var cb1 = new ConcurrentQueue(); @@ -142,7 +142,7 @@ public void ConcurrentQueueTest() [Test, Explicit("Long running")] public void ConcurrentStackTest() { - int iterations = 100_000_000; + int iterations = 10_000_000; int count = 0; var capacity = 64 * 1024; var cb1 = new ConcurrentStack(); @@ -198,11 +198,10 @@ public void ConcurrentStackTest() Console.WriteLine($"MOPS: {2 * mops:N2}"); } - [Test, Explicit("Long running")] public void BoundedLocalPoolTest() { - int iterations = 100_000_000; + int iterations = 10_000_000; int count = 0; var capacity = 64 * 1024; var cb1 = new BoundedLocalPool(capacity); diff --git a/src/Abc.Zerio/Alt/Buffers/BoundedLocalPool.cs b/src/Abc.Zerio/Alt/Buffers/BoundedLocalPool.cs index 12024a9..1373c85 100644 --- a/src/Abc.Zerio/Alt/Buffers/BoundedLocalPool.cs +++ b/src/Abc.Zerio/Alt/Buffers/BoundedLocalPool.cs @@ -1,4 +1,5 @@ using System; +using System.Collections.Concurrent; using System.Runtime.CompilerServices; using System.Threading; @@ -6,30 +7,20 @@ namespace Abc.Zerio.Alt.Buffers { public class BoundedLocalPool { + private readonly ConcurrentQueue _queue = new ConcurrentQueue(); private readonly int _capacity; - private readonly T[] _items; - private int _mask; - - // long is enough for 68 years at int.Max operations per second. - // [... head<- items -> tail ...] - private long _tail; - private long _head; - - private SpinLock _spinLock = new SpinLock(false); - + public BoundedLocalPool(int capacity) { if (!Utils.IsPowerOfTwo(capacity)) throw new ArgumentException("Capacity must be a power of 2"); _capacity = capacity; - _items = new T[_capacity]; - _mask = _capacity - 1; } public int Count { [MethodImpl(MethodImplOptions.AggressiveInlining)] - get => (int)(Volatile.Read(ref _tail) - Volatile.Read(ref _head)); + get => _queue.Count; // (Volatile.Read(ref _tail) - Volatile.Read(ref _head)); } /// @@ -38,11 +29,9 @@ public int Count [MethodImpl(MethodImplOptions.AggressiveInlining)] public void Return(T item) { - if (Count == _capacity) + if (Count >= _capacity) ThrowExceededCapacity(); - var idx = (int)(_tail & _mask); - _items[idx] = item; - Volatile.Write(ref _tail, Volatile.Read(ref _tail) + 1); + _queue.Enqueue(item); } [MethodImpl(MethodImplOptions.NoInlining)] @@ -56,31 +45,10 @@ private static void ThrowExceededCapacity() /// /// /// + [MethodImpl(MethodImplOptions.AggressiveInlining)] public bool TryRent(out T item) { - item = default; - if (Count == 0) - return false; - - var taken = false; - try - { - _spinLock.Enter(ref taken); - - if (Count == 0) - return false; - - var idx = (int)(_head & _mask); - item = _items[idx]; - Volatile.Write(ref _head, Volatile.Read(ref _head) + 1); - } - finally - { - if (taken) - _spinLock.Exit(useMemoryBarrier: false); - } - - return false; + return _queue.TryDequeue(out item); } } } diff --git a/src/Abc.Zerio/Alt/Buffers/BufferPoolOptions.cs b/src/Abc.Zerio/Alt/Buffers/BufferPoolOptions.cs new file mode 100644 index 0000000..a0ccb3c --- /dev/null +++ b/src/Abc.Zerio/Alt/Buffers/BufferPoolOptions.cs @@ -0,0 +1,20 @@ +namespace Abc.Zerio.Alt.Buffers +{ + public class BufferPoolOptions + { + public int SendSegmentCount { get; } + public int SendSegmentSize { get; } + public int ReceiveSegmentCount { get; } + public int ReceiveSegmentSize { get; } + public bool UseSharedSendPool { get; } + + public BufferPoolOptions(int sendSegmentCount = 2048, int sendSegmentSize = 2048, int receiveSegmentCount = 128, int receiveSegmentSize = 16384, bool useSharedSendPool = true) + { + SendSegmentCount = sendSegmentCount; + SendSegmentSize = sendSegmentSize; + ReceiveSegmentCount = receiveSegmentCount; + ReceiveSegmentSize = receiveSegmentSize; + UseSharedSendPool = useSharedSendPool; + } + } +} \ No newline at end of file diff --git a/src/Abc.Zerio/Alt/Buffers/ISegmentPool.cs b/src/Abc.Zerio/Alt/Buffers/ISegmentPool.cs index ded6f22..7116b24 100644 --- a/src/Abc.Zerio/Alt/Buffers/ISegmentPool.cs +++ b/src/Abc.Zerio/Alt/Buffers/ISegmentPool.cs @@ -7,16 +7,7 @@ namespace Abc.Zerio.Alt.Buffers { - internal interface ISegmentPool : IDisposable - { - bool TryRent(out RioSegment segment); - void Return(RioSegment segment); - void AddBuffer(RegisteredBuffer buffer); - RegisteredBuffer GetBuffer(int bufferId); - int SegmentLength { get; } - } - - internal class DefaultSegmentPool : CriticalFinalizerObject, ISegmentPool + internal class SharedSegmentPool : CriticalFinalizerObject { private readonly List _buffers = new List(64); private volatile int _capacity; @@ -25,7 +16,7 @@ internal class DefaultSegmentPool : CriticalFinalizerObject, ISegmentPool private readonly CancellationToken _ct; private ConcurrentQueue _queue; - public DefaultSegmentPool(byte poolId, int segmentLength, CancellationToken ct = default) + public SharedSegmentPool(byte poolId, int segmentLength, CancellationToken ct = default) { _poolId = poolId; SegmentLength = segmentLength; diff --git a/src/Abc.Zerio/Alt/Buffers/RioBufferPool.cs b/src/Abc.Zerio/Alt/Buffers/RioBufferPool.cs index a54268a..45af0be 100644 --- a/src/Abc.Zerio/Alt/Buffers/RioBufferPool.cs +++ b/src/Abc.Zerio/Alt/Buffers/RioBufferPool.cs @@ -5,35 +5,17 @@ namespace Abc.Zerio.Alt.Buffers { - public class BufferPoolOptions - { - public int SendSegmentCount { get; } - public int SendSegmentSize { get; } - public int ReceiveSegmentCount { get; } - public int ReceiveSegmentSize { get; } - public bool UseSharedSendPool { get; } - - public BufferPoolOptions(int sendSegmentCount = 2048, int sendSegmentSize = 2048, int receiveSegmentCount = 128, int receiveSegmentSize = 16384, bool useSharedSendPool = true) - { - SendSegmentCount = sendSegmentCount; - SendSegmentSize = sendSegmentSize; - ReceiveSegmentCount = receiveSegmentCount; - ReceiveSegmentSize = receiveSegmentSize; - UseSharedSendPool = useSharedSendPool; - } - } - internal class RioBufferPool : IDisposable { - private readonly ISegmentPool _sharedSendPool; + private readonly SharedSegmentPool _sharedSegmentPool; private const byte _sharedSendPoolId = 1; public RioBufferPool(BufferPoolOptions options = null, CancellationToken ct = default) { - options = options ?? new BufferPoolOptions(); + options ??= new BufferPoolOptions(); Options = options; CancellationToken = ct; - _sharedSendPool = new DefaultSegmentPool(_sharedSendPoolId, options.SendSegmentSize, ct); + _sharedSegmentPool = new SharedSegmentPool(_sharedSendPoolId, options.SendSegmentSize, ct); if (Options.UseSharedSendPool) { @@ -48,8 +30,9 @@ public RioBufferPool(BufferPoolOptions options = null, CancellationToken ct = de private RegisteredBuffer AllocateBuffer() { - var buffer = new RegisteredBuffer(Options.SendSegmentCount, Options.SendSegmentSize); - _sharedSendPool.AddBuffer(buffer); + var buffer = new RegisteredBuffer(Options.SendSegmentCount * 2, Options.SendSegmentSize); + buffer.PoolId = _sharedSendPoolId; + _sharedSegmentPool.AddBuffer(buffer); Debug.Assert(buffer.IsPooled); return buffer; } @@ -57,22 +40,18 @@ private RegisteredBuffer AllocateBuffer() [MethodImpl(MethodImplOptions.AggressiveInlining)] public bool TryRent(out RioSegment segment) { - return _sharedSendPool.TryRent(out segment); + if (_sharedSegmentPool.TryRent(out segment)) + { + Debug.Assert(segment.Id.PoolId == _sharedSendPoolId, "segment.Id.PoolId == _sharedSendPoolId"); + return true; + } + return false; } [MethodImpl(MethodImplOptions.AggressiveInlining)] public void Return(RioSegment segment) { - _sharedSendPool.Return(segment); - } - - public void Return(long correlationId) - { - var id = new BufferSegmentId(correlationId); - if (id.PoolId != _sharedSendPoolId) - throw new InvalidOperationException(); - var segment = _sharedSendPool.GetBuffer(id.BufferId)[id.SegmentId]; - _sharedSendPool.Return(segment); + _sharedSegmentPool.Return(segment); } public RioSegment this[long correlationId] @@ -88,12 +67,12 @@ public RioSegment this[long correlationId] public RioSegment this[BufferSegmentId id] { [MethodImpl(MethodImplOptions.AggressiveInlining)] - get => _sharedSendPool.GetBuffer(id.BufferId)[id.SegmentId]; + get => _sharedSegmentPool.GetBuffer(id.BufferId)[id.SegmentId]; } public void Dispose() { - _sharedSendPool.Dispose(); + _sharedSegmentPool.Dispose(); } } } diff --git a/src/Abc.Zerio/Alt/Poller.cs b/src/Abc.Zerio/Alt/Poller.cs index 1fa267f..fc246ed 100644 --- a/src/Abc.Zerio/Alt/Poller.cs +++ b/src/Abc.Zerio/Alt/Poller.cs @@ -1,10 +1,12 @@ using System; +using System.Runtime.CompilerServices; using System.Threading; namespace Abc.Zerio.Alt { internal class Poller : IDisposable { + public int ThreadId { get; private set; } // TODO Support WSA events private readonly CancellationToken _ct; @@ -67,8 +69,12 @@ private int Wait() return 0; } + #if NETCOREAPP3_0 + [MethodImpl(MethodImplOptions.AggressiveOptimization)] + #endif private void Poll() { + ThreadId = Thread.CurrentThread.ManagedThreadId; while (!_ct.IsCancellationRequested) { try diff --git a/src/Abc.Zerio/Alt/Session.cs b/src/Abc.Zerio/Alt/Session.cs index 7f4ea65..b2328f7 100644 --- a/src/Abc.Zerio/Alt/Session.cs +++ b/src/Abc.Zerio/Alt/Session.cs @@ -1,7 +1,6 @@ using Abc.Zerio.Alt.Buffers; using Abc.Zerio.Interop; using System; -using System.Diagnostics; using System.Runtime.CompilerServices; using System.Runtime.ConstrainedExecution; using System.Runtime.InteropServices; @@ -12,25 +11,27 @@ namespace Abc.Zerio.Alt { internal delegate void SessionMessageReceivedDelegate(string peerId, ReadOnlySpan message); - internal unsafe class Session : CriticalFinalizerObject, IDisposable + internal unsafe class Session : SessionSCQLockRhsPad, IDisposable { - private readonly BoundedLocalPool _localPool; + private readonly BoundedLocalPool _localSendPool; private readonly RegisteredBuffer _localSendBuffer; private readonly RegisteredBuffer _localReceiveBuffer; - private readonly RioBufferPool _poolX; + // private readonly RioBufferPool _globalPool; + private readonly Poller _poller; + private CancellationToken _ct; + private readonly IntPtr _scq; private readonly IntPtr _rcq; - private const int _sendPollCount = 128; - private const int _receivePollCount = 128; + private const int _sendPollCount = 64; + private const int _receivePollCount = 64; private readonly IntPtr _rq; // TODO there is no Close method on RQ, review if need to do something else with it. private readonly RioSendReceive _sendReceive; - private readonly SemaphoreSlim _sendSemaphore; - private int _isPollingSend; + private readonly SemaphoreSlimRhsPad _sendSemaphore; private readonly MessageFramer _messageFramer; @@ -46,40 +47,36 @@ internal unsafe class Session : CriticalFinalizerObject, IDisposable public Session(IntPtr socket, RioBufferPool pool, Poller poller, SessionMessageReceivedDelegate messageReceived, Action closed) { - _pool = pool; - _poller = poller; + var options = pool.Options; + _ct = pool.CancellationToken; + // _globalPool = pool; + + _localSendPool = new BoundedLocalPool(options.SendSegmentCount); + _localSendBuffer = new RegisteredBuffer(options.SendSegmentCount, options.SendSegmentSize); + _localReceiveBuffer = new RegisteredBuffer(options.ReceiveSegmentCount, options.ReceiveSegmentSize); - _localPool = new BoundedLocalPool(pool.Options.SendSegmentCount); - _localSendBuffer = new RegisteredBuffer(pool.Options.SendSegmentCount, pool.Options.SendSegmentSize); - for (int i = 0; i < pool.Options.SendSegmentCount; i++) + for (int i = 0; i < options.SendSegmentCount; i++) { - _localPool.Return(_localSendBuffer[i]); + _localSendPool.Return(_localSendBuffer[i]); } - _localReceiveBuffer = new RegisteredBuffer(pool.Options.ReceiveSegmentCount, pool.Options.ReceiveSegmentSize); - _messageReceived = messageReceived; _closed = closed; - // This is a straightforward implementation that adds send requests - // directly to RQ and polls a single CQ using the Poller. - - _sendSemaphore = new SemaphoreSlim(pool.Options.SendSegmentCount * 2, pool.Options.SendSegmentCount * 2); + _sendSemaphore = new SemaphoreSlimRhsPad(_localSendPool.Count, _localSendPool.Count); - const int safeCacheLine = 128; - _resultsPtr = (byte*)Marshal.AllocHGlobal(Unsafe.SizeOf() * (_sendPollCount + _receivePollCount) + safeCacheLine * 3); - Debug.Assert(Utils.IsAligned((long)_resultsPtr, 8)); + _resultsPtr = (byte*)Marshal.AllocHGlobal(Unsafe.SizeOf() * (_sendPollCount + _receivePollCount) + Padding.SafeCacheLine * 3); // 128 ... send_results ... 128 ... receive_results ... 128 - _sendResults = (RIO_RESULT*)(_resultsPtr + safeCacheLine); - _receiveResults = (RIO_RESULT*)((byte*)_sendResults + +(uint)Unsafe.SizeOf() * _sendPollCount + safeCacheLine); + _sendResults = (RIO_RESULT*)(_resultsPtr + Padding.SafeCacheLine); + _receiveResults = (RIO_RESULT*)((byte*)_sendResults + +(uint)Unsafe.SizeOf() * _sendPollCount + Padding.SafeCacheLine); - _scq = WinSock.Extensions.CreateCompletionQueue((uint)pool.Options.SendSegmentCount * 2); - _rcq = WinSock.Extensions.CreateCompletionQueue((uint)pool.Options.ReceiveSegmentCount); + _scq = WinSock.Extensions.CreateCompletionQueue((uint)options.SendSegmentCount); + _rcq = WinSock.Extensions.CreateCompletionQueue((uint)options.ReceiveSegmentCount); _rq = WinSock.Extensions.CreateRequestQueue(socket, - (uint)pool.Options.ReceiveSegmentCount, + (uint)options.ReceiveSegmentCount, 1, - (uint)pool.Options.SendSegmentCount * 2, + (uint)options.SendSegmentCount, 1, _rcq, _scq, @@ -87,7 +84,7 @@ public Session(IntPtr socket, RioBufferPool pool, Poller poller, SessionMessageR _sendReceive = new RioSendReceive(_rq); - for (int i = 0; i < pool.Options.ReceiveSegmentCount; i++) + for (int i = 0; i < options.ReceiveSegmentCount; i++) { var segment = _localReceiveBuffer[i]; _sendReceive.Receive(segment); @@ -96,6 +93,7 @@ public Session(IntPtr socket, RioBufferPool pool, Poller poller, SessionMessageR _messageFramer = new MessageFramer(64 * 1024); _messageFramer.MessageFramed += OnMessageFramed; + _poller = poller; // this line at the very end after session is ready _poller.AddSession(this); } @@ -114,8 +112,6 @@ private void OnMessageFramed(ReadOnlySpan message) _messageReceived?.Invoke(PeerId, message); } - - public void Send(ReadOnlySpan message) { var claim = Claim(); @@ -126,22 +122,35 @@ public void Send(ReadOnlySpan message) [MethodImpl(MethodImplOptions.AggressiveInlining)] public Claim Claim() { - RioSegment segment; - - bool entered; + bool isPollerThread = Thread.CurrentThread.ManagedThreadId == _poller.ThreadId; var count = 0; - // total wait is (1 + spinLimit) * spinLimit / 2 - // 25 -> 325 - // 50 -> 1275 - // 100 -> 5050 - const int spinLimit = 50; + const int spinLimit = 25; while (true) { - entered = _sendSemaphore.Wait(0); + bool entered = false; + if (count >= spinLimit && !isPollerThread) + { + _sendSemaphore.Wait(_ct); + entered = true; + } + else if (count == 0 || !isPollerThread) + { + entered = _sendSemaphore.Wait(0); + } + + RioSegment segment; if (entered) - break; + { + if (!_localSendPool.TryRent(out segment)) + { + ThrowCannotGetSegmentAfterSemaphoreEnter(); + } - if (1 == Interlocked.Increment(ref _isPollingSend)) // 0 -> 1 + return new Claim(segment, this); + } + + if (isPollerThread + || 1 == Interlocked.Increment(ref _isPollingSend)) // 0 -> 1 { RIO_RESULT result = default; var resultCount = WinSock.Extensions.DequeueCompletion(_scq, &result, 1); @@ -153,25 +162,20 @@ public Claim Claim() if (resultCount == 1) { var id = new BufferSegmentId(result.RequestCorrelation); - segment = _pool[id]; + segment = _localSendBuffer[id.SegmentId]; //id.PoolId == 0 ? _localSendBuffer[id.SegmentId] : _globalPool[id]; return new Claim(segment, this); } } count++; - if (count > spinLimit) - break; - Thread.SpinWait(count); - } - - if (!entered) - { - // this semaphore is release when an item is returned to the local pool - _sendSemaphore.Wait(_pool.CancellationToken); + Thread.SpinWait(Math.Min(count, spinLimit)); } + } - segment = _pool.RentSendSegment(); - return new Claim(segment, this); + [MethodImpl(MethodImplOptions.NoInlining)] + private static void ThrowCannotGetSegmentAfterSemaphoreEnter() + { + throw new InvalidOperationException("_localSendPool.TryRent(out segment)"); } [MethodImpl(MethodImplOptions.AggressiveInlining)] @@ -184,7 +188,11 @@ public void Commit(RioSegment segment) /// This method is called by on its thread. /// /// - [MethodImpl(MethodImplOptions.AggressiveInlining)] + [MethodImpl(MethodImplOptions.AggressiveInlining +#if NETCOREAPP3_0 + | MethodImplOptions.AggressiveOptimization +#endif + )] public int Poll() { int count = 0; @@ -210,7 +218,7 @@ private int PollReceive() { var result = _receiveResults[i]; var id = new BufferSegmentId(result.RequestCorrelation); - var segment = _pool[id]; + var segment = _localReceiveBuffer[id.SegmentId]; var span = segment.Span.Slice(0, (int)result.BytesTransferred); OnSegmentReceived(span); _sendReceive.Receive(segment); @@ -235,8 +243,21 @@ private int PollSend() for (int i = 0; i < resultCount; i++) { var result = _sendResults[i]; + var id = new BufferSegmentId(result.RequestCorrelation); + if (id.PoolId == 0) + { + var segment = _localSendBuffer[id.SegmentId]; + _localSendPool.Return(segment); + } + else + { + throw new InvalidOperationException(); + //var segment = _globalPool[id]; + //_globalPool.Return(segment); + } + + // only after adding to the pool _sendSemaphore.Release(); - _pool.Return(result.ConnectionCorrelation); } return (int)resultCount; @@ -247,36 +268,6 @@ private void OnSegmentReceived(Span bytes) _messageFramer.SubmitBytes(bytes); } - private class RioSendReceive - { - private readonly IntPtr _rq; - - public RioSendReceive(IntPtr rq) - { - _rq = rq; - } - - public void Send(RioSegment segment) - { - // SpinLock is not better, already tried - lock (this) - { - if (!WinSock.Extensions.Send(_rq, &segment.RioBuf, 1, RIO_SEND_FLAGS.DONT_NOTIFY, segment.Id.Value)) - WinSock.ThrowLastWsaError(); - } - } - - public void Receive(RioSegment segment) - { - lock (this) - { - if (!WinSock.Extensions.Receive(_rq, &segment.RioBuf, 1, RIO_RECEIVE_FLAGS.DONT_NOTIFY, segment.Id.Value)) - WinSock.ThrowLastWsaError(); - } - } - } - - ~Session() { Dispose(false); @@ -313,5 +304,81 @@ public void Dispose() Dispose(true); GC.SuppressFinalize(this); } + + private class RioSendReceive + { + // TODO SpinLock with padding + private readonly IntPtr _rq; + + public RioSendReceive(IntPtr rq) + { + _rq = rq; + } + + public void Send(RioSegment segment) + { + // SpinLock is not better, already tried + lock (this) + { + if (!WinSock.Extensions.Send(_rq, &segment.RioBuf, 1, RIO_SEND_FLAGS.DONT_NOTIFY, segment.Id.Value)) + WinSock.ThrowLastWsaError(); + } + } + + public void Receive(RioSegment segment) + { + lock (this) + { + if (!WinSock.Extensions.Receive(_rq, &segment.RioBuf, 1, RIO_RECEIVE_FLAGS.DONT_NOTIFY, segment.Id.Value)) + WinSock.ThrowLastWsaError(); + } + } + } + + /// + /// m_currentCount is at offset 32 with 12 bytes after in . We cannot pad before, but at least + /// + private class SemaphoreSlimRhsPad : SemaphoreSlim + { +#pragma warning disable 169 + private Padding _padding; +#pragma warning restore 169 + + public SemaphoreSlimRhsPad(int initialCount, int maxCount) : base(initialCount, maxCount) + { + } + } } + + // ReSharper disable InconsistentNaming + + [StructLayout(LayoutKind.Sequential, Size = SafeCacheLine - 8)] + internal readonly struct Padding + { + public const int SafeCacheLine = 128; + } + + internal class SessionLhsPad : CriticalFinalizerObject + { +#pragma warning disable 169 + private Padding _padding; +#pragma warning restore 169 + } + + internal class SessionSCQLock : SessionLhsPad + { + protected int _isPollingSend; +#pragma warning disable 169 + private readonly int _padding; +#pragma warning restore 169 + } + + internal class SessionSCQLockRhsPad : SessionSCQLock + { +#pragma warning disable 169 + private Padding _padding; +#pragma warning restore 169 + } + + // ReSharper restore InconsistentNaming } diff --git a/src/Abc.Zerio/Alt/ZerioClient.cs b/src/Abc.Zerio/Alt/ZerioClient.cs index efe303b..91c6287 100644 --- a/src/Abc.Zerio/Alt/ZerioClient.cs +++ b/src/Abc.Zerio/Alt/ZerioClient.cs @@ -100,8 +100,17 @@ private static unsafe IntPtr CreateSocket() var tcpNoDelay = -1; WinSock.setsockopt(connectionSocket, WinSock.Consts.IPPROTO_TCP, WinSock.Consts.TCP_NODELAY, (char*)&tcpNoDelay, sizeof(int)); - var reuseAddr = 1; - WinSock.setsockopt(connectionSocket, WinSock.Consts.SOL_SOCKET, WinSock.Consts.SO_REUSEADDR, (char*)&reuseAddr, sizeof(int)); + var fastLoopback = 1; + const int SIO_LOOPBACK_FAST_PATH = -1744830448; + WinSock.WSAIoctlGeneral(connectionSocket, + (IntPtr) SIO_LOOPBACK_FAST_PATH, + &fastLoopback, + 4, + null, + 0, + out _, + IntPtr.Zero, + IntPtr.Zero); return connectionSocket; } diff --git a/src/Abc.Zerio/Alt/ZerioServer.cs b/src/Abc.Zerio/Alt/ZerioServer.cs index 38ccfe8..9e22e34 100644 --- a/src/Abc.Zerio/Alt/ZerioServer.cs +++ b/src/Abc.Zerio/Alt/ZerioServer.cs @@ -49,8 +49,17 @@ private static unsafe IntPtr CreateListeningSocket() var tcpNoDelay = -1; WinSock.setsockopt(listeningSocket, WinSock.Consts.IPPROTO_TCP, WinSock.Consts.TCP_NODELAY, (char*)&tcpNoDelay, sizeof(int)); - var reuseAddr = 1; - WinSock.setsockopt(listeningSocket, WinSock.Consts.SOL_SOCKET, WinSock.Consts.SO_REUSEADDR, (char*)&reuseAddr, sizeof(int)); + var fastLoopback = 1; + const int SIO_LOOPBACK_FAST_PATH = -1744830448; + WinSock.WSAIoctlGeneral(listeningSocket, + (IntPtr)SIO_LOOPBACK_FAST_PATH, + &fastLoopback, + 4, + null, + 0, + out _, + IntPtr.Zero, + IntPtr.Zero); return listeningSocket; }