Skip to content

Commit 1bd327c

Browse files
committed
non-allocating Http1Connection.ReadContentAsync
1 parent 31ff112 commit 1bd327c

File tree

10 files changed

+402
-126
lines changed

10 files changed

+402
-126
lines changed

NetworkToolkit.Tests/Http/Http1Tests.cs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,12 @@ await RunMultiStreamTest(
8080
[Fact]
8181
public async Task Pipelining_PausedReads_Success()
8282
{
83+
if (TrickleForceAsync)
84+
{
85+
// This test depends on synchronous completion of reads, so will not work when async completion is forced.
86+
return;
87+
}
88+
8389
const int PipelineLength = 10;
8490

8591
using var semaphore = new SemaphoreSlim(0);
@@ -199,4 +205,14 @@ public class Http1SslTests : Http1Tests
199205
{
200206
internal override bool UseSsl => true;
201207
}
208+
209+
public class Http1TrickleTests : Http1Tests
210+
{
211+
internal override bool Trickle => true;
212+
}
213+
214+
public class Http1TrickleAsyncTests : Http1TrickleTests
215+
{
216+
internal override bool TrickleForceAsync => true;
217+
}
202218
}

NetworkToolkit.Tests/Http/HttpGenericTests.cs

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -262,23 +262,16 @@ private static IEnumerable<List<string>> ContentData() => new[]
262262
new List<string> { "foo", "barbar", "bazbazbaz" }
263263
};
264264

265-
/// <summary>
266-
/// If true, more than one stream can be opened on a single connection.
267-
/// </summary>
268-
internal bool SupportsMultiStreamTests => Version != HttpPrimitiveVersion.Version10;
269-
270-
/// <summary>
271-
/// If true, the current <see cref="Version"/> supports concurrent duplex streams.
272-
/// </summary>
273-
internal bool SupportsMultiStreamConcurrentTests => Version.Major >= 2;
274-
275265
internal virtual bool UseSsl => false;
266+
internal virtual bool Trickle => false;
267+
internal virtual bool TrickleForceAsync => false;
276268
internal abstract HttpPrimitiveVersion Version { get; }
277269

278270
internal virtual ConnectionFactory CreateConnectionFactory()
279271
{
280272
ConnectionFactory factory = new MemoryConnectionFactory();
281273
if (UseSsl) factory = new SslConnectionFactory(factory);
274+
if (Trickle) factory = new TricklingConnectionFactory(factory) { ForceAsync = TrickleForceAsync };
282275
return factory;
283276
}
284277

NetworkToolkit.Tests/Http/Servers/Http1TestConnection.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@ internal async Task<string> ReadLineAsync()
106106
{
107107
if (!await FillReadBufferAsync().ConfigureAwait(false))
108108
{
109+
this.ToString();
109110
throw new Exception("Unexpected end of stream. Expected CRLF.");
110111
}
111112
}

NetworkToolkit.Tests/NetworkToolkit.Tests.csproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
</ItemGroup>
2020

2121
<ItemGroup>
22+
<Compile Include="..\NetworkToolKit\StreamExtensions.cs" />
2223
<Compile Include="..\NetworkToolKit\TaskToApm.cs" />
2324
<Compile Include="..\NetworkToolKit\Tools.cs" />
2425
</ItemGroup>

NetworkToolkit.Tests/TestStreamBase.cs

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
11
using System;
2+
using System.Collections.Generic;
23
using System.IO;
34
using System.Runtime.ExceptionServices;
45
using System.Threading;
56
using System.Threading.Tasks;
67

78
namespace NetworkToolkit.Tests
89
{
9-
internal abstract class TestStreamBase : Stream
10+
internal abstract class TestStreamBase : Stream, IGatheringStream, ICancellableAsyncDisposable
1011
{
1112

1213
public override bool CanRead => false;
@@ -17,6 +18,14 @@ internal abstract class TestStreamBase : Stream
1718
public override long Length => throw new InvalidOperationException();
1819
public override long Position { get => throw new InvalidOperationException(); set => throw new InvalidOperationException(); }
1920

21+
public override ValueTask DisposeAsync() =>
22+
DisposeAsync(CancellationToken.None);
23+
24+
public virtual ValueTask DisposeAsync(CancellationToken cancellationToken)
25+
{
26+
return default;
27+
}
28+
2029
public override void Flush() => throw new NotImplementedException();
2130

2231
public override long Seek(long offset, SeekOrigin origin) => throw new NotImplementedException();
@@ -45,6 +54,14 @@ public override ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationTo
4554
return ValueTask.FromException(ExceptionDispatchInfo.SetCurrentStackTrace(new InvalidOperationException()));
4655
}
4756

57+
public virtual async ValueTask WriteAsync(IReadOnlyList<ReadOnlyMemory<byte>> buffers, CancellationToken cancellationToken = default)
58+
{
59+
for (int i = 0, count = buffers.Count; i != count; ++i)
60+
{
61+
await WriteAsync(buffers[i], cancellationToken).ConfigureAwait(false);
62+
}
63+
}
64+
4865
public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) =>
4966
WriteAsync(buffer.AsMemory(offset, count), cancellationToken).AsTask();
5067

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
using NetworkToolkit.Connections;
2+
using System.Collections.Generic;
3+
using System.Diagnostics;
4+
using System.Linq;
5+
using System.Net;
6+
using System.Threading;
7+
using System.Threading.Tasks;
8+
9+
namespace NetworkToolkit.Tests
10+
{
11+
internal sealed class TricklingConnectionFactory : FilteringConnectionFactory
12+
{
13+
private static int[] s_defaultTrickleSequence = new[] { 1 };
14+
private readonly IEnumerable<int> _trickleSequence = s_defaultTrickleSequence;
15+
16+
public IEnumerable<int> TrickleSequence
17+
{
18+
get => _trickleSequence;
19+
init
20+
{
21+
Debug.Assert(!value.Any(x => x <= 0));
22+
_trickleSequence = value.ToArray();
23+
}
24+
}
25+
26+
public bool ForceAsync { get; init; }
27+
28+
public TricklingConnectionFactory(ConnectionFactory baseFactory)
29+
: base(baseFactory)
30+
{
31+
}
32+
33+
public override async ValueTask<Connection> ConnectAsync(EndPoint endPoint, IConnectionProperties? options = null, CancellationToken cancellationToken = default)
34+
{
35+
Connection c = await BaseFactory.ConnectAsync(endPoint, options, cancellationToken).ConfigureAwait(false);
36+
return new FilteringConnection(c, new TricklingStream(c.Stream, _trickleSequence, ForceAsync));
37+
}
38+
39+
public override async ValueTask<ConnectionListener> ListenAsync(EndPoint? endPoint = null, IConnectionProperties? options = null, CancellationToken cancellationToken = default)
40+
{
41+
ConnectionListener listener = await BaseFactory.ListenAsync(endPoint, options, cancellationToken).ConfigureAwait(false);
42+
return new TricklingListener(listener, _trickleSequence, ForceAsync);
43+
}
44+
45+
private sealed class TricklingListener : FilteringConnectionListener
46+
{
47+
private readonly IEnumerable<int> _trickleSequence;
48+
private readonly bool _forceAsync;
49+
50+
public TricklingListener(ConnectionListener baseListener, IEnumerable<int> trickleSequence, bool forceAsync) : base(baseListener)
51+
{
52+
_trickleSequence = trickleSequence;
53+
_forceAsync = forceAsync;
54+
}
55+
56+
public override async ValueTask<Connection?> AcceptConnectionAsync(IConnectionProperties? options = null, CancellationToken cancellationToken = default)
57+
{
58+
Connection? c = await BaseListener.AcceptConnectionAsync(options, cancellationToken).ConfigureAwait(false);
59+
return c != null ? new FilteringConnection(c, new TricklingStream(c.Stream, _trickleSequence, _forceAsync)) : null;
60+
}
61+
}
62+
}
63+
}
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Diagnostics;
4+
using System.IO;
5+
using System.Linq;
6+
using System.Threading;
7+
using System.Threading.Tasks;
8+
9+
namespace NetworkToolkit.Tests
10+
{
11+
internal sealed class TricklingStream : TestStreamBase
12+
{
13+
private readonly Stream _baseStream;
14+
private readonly int[] _trickleSequence;
15+
private readonly bool _forceAsync;
16+
private int _readIdx;
17+
18+
public override bool CanRead => _baseStream.CanRead;
19+
public override bool CanWrite => _baseStream.CanWrite;
20+
21+
public TricklingStream(Stream baseStream, IEnumerable<int> trickleSequence, bool forceAsync)
22+
{
23+
_baseStream = baseStream;
24+
_trickleSequence = trickleSequence.ToArray();
25+
_forceAsync = forceAsync;
26+
Debug.Assert(_trickleSequence.Length > 0);
27+
}
28+
29+
protected override void Dispose(bool disposing)
30+
{
31+
if(disposing) _baseStream.Dispose();
32+
}
33+
34+
public override ValueTask DisposeAsync(CancellationToken cancellationToken) =>
35+
_baseStream.DisposeAsync(cancellationToken);
36+
37+
public override void Flush() =>
38+
_baseStream.Flush();
39+
40+
public override Task FlushAsync(CancellationToken cancellationToken) =>
41+
_baseStream.FlushAsync(cancellationToken);
42+
43+
private int NextReadSize()
44+
{
45+
int size = _trickleSequence[_readIdx];
46+
_readIdx = (_readIdx + 1) % _trickleSequence.Length;
47+
return size;
48+
}
49+
50+
public override async ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = default)
51+
{
52+
int readLength = Math.Min(buffer.Length, NextReadSize());
53+
54+
ValueTask<int> readTask = _baseStream.ReadAsync(buffer.Slice(0, readLength), cancellationToken);
55+
56+
if (readTask.IsCompleted && _forceAsync)
57+
{
58+
await Task.Yield();
59+
}
60+
61+
return await readTask.ConfigureAwait(false);
62+
}
63+
64+
public override ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default) =>
65+
_baseStream.WriteAsync(buffer, cancellationToken);
66+
}
67+
}

NetworkToolkit/Http/Primitives/Http1Connection.Parsers.cs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
using NetworkToolkit.Http.Primitives;
22
using System;
3-
using System.Diagnostics;
43
using System.Numerics;
54
using System.Runtime.CompilerServices;
65
using System.Runtime.Intrinsics;
@@ -24,8 +23,6 @@ public partial class Http1Connection
2423
[MethodImpl(MethodImplOptions.AggressiveInlining)]
2524
private bool ReadHeadersImpl(Span<byte> buffer, IHttpHeadersSink headersSink, object? state, out int bytesConsumed)
2625
{
27-
Debug.Assert(buffer.Length != 0);
28-
2926
if (Avx2.IsSupported) return ReadHeadersAvx2(buffer, headersSink, state, out bytesConsumed);
3027
return ReadHeadersPortable(buffer, headersSink, state, out bytesConsumed);
3128
}
@@ -108,6 +105,12 @@ internal unsafe bool ReadHeadersPortable(Span<byte> buffer, IHttpHeadersSink hea
108105
/// </remarks>
109106
internal unsafe bool ReadHeadersAvx2(Span<byte> buffer, IHttpHeadersSink headersSink, object? state, out int bytesConsumed)
110107
{
108+
if (buffer.Length == 0)
109+
{
110+
bytesConsumed = 0;
111+
return false;
112+
}
113+
111114
Vector256<byte> maskCol = Vector256.Create((byte)':');
112115
Vector256<byte> maskLF = Vector256.Create((byte)'\n');
113116

0 commit comments

Comments
 (0)