Skip to content

Commit e9d6af0

Browse files
authored
Backport of #80693 (#81092)
- return bytes of streaming response as soon as available - fix unhandled error in reader.cancel() promise - return cancelable promise from http_wasm_get_streamed_response_bytes - unit test for slowly streamed chunks - unit test for streaming and default cancellation
1 parent b4f6d21 commit e9d6af0

File tree

2 files changed

+136
-36
lines changed

2 files changed

+136
-36
lines changed

src/libraries/Common/tests/System/Net/Http/HttpClientHandlerTest.cs

Lines changed: 111 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -984,13 +984,14 @@ await connection.WriteStringAsync(
984984
}
985985

986986
[Theory]
987-
[InlineData(true, true)]
988-
[InlineData(true, false)]
989-
[InlineData(false, true)]
990-
[InlineData(false, false)]
991-
[InlineData(null, false)]
987+
[InlineData(true, true, true)]
988+
[InlineData(true, true, false)]
989+
[InlineData(true, false, false)]
990+
[InlineData(false, true, false)]
991+
[InlineData(false, false, false)]
992+
[InlineData(null, false, false)]
992993
[ActiveIssue("https://github.com/dotnet/runtime/issues/65429", typeof(PlatformDetection), nameof(PlatformDetection.IsNodeJS))]
993-
public async Task ReadAsStreamAsync_HandlerProducesWellBehavedResponseStream(bool? chunked, bool enableWasmStreaming)
994+
public async Task ReadAsStreamAsync_HandlerProducesWellBehavedResponseStream(bool? chunked, bool enableWasmStreaming, bool slowChunks)
994995
{
995996
if (IsWinHttpHandler && UseVersion >= HttpVersion20.Value)
996997
{
@@ -1003,6 +1004,13 @@ public async Task ReadAsStreamAsync_HandlerProducesWellBehavedResponseStream(boo
10031004
return;
10041005
}
10051006

1007+
if (enableWasmStreaming && !PlatformDetection.IsBrowser)
1008+
{
1009+
// enableWasmStreaming makes only sense on Browser platform
1010+
return;
1011+
}
1012+
1013+
var tcs = new TaskCompletionSource<bool>();
10061014
await LoopbackServerFactory.CreateClientAndServerAsync(async uri =>
10071015
{
10081016
var request = new HttpRequestMessage(HttpMethod.Get, uri) { Version = UseVersion };
@@ -1079,11 +1087,21 @@ await LoopbackServerFactory.CreateClientAndServerAsync(async uri =>
10791087

10801088
// Various forms of reading
10811089
var buffer = new byte[1];
1090+
var buffer2 = new byte[2];
10821091

10831092
if (PlatformDetection.IsBrowser)
10841093
{
10851094
#if !NETFRAMEWORK
1086-
Assert.Equal('h', await responseStream.ReadByteAsync());
1095+
if(slowChunks)
1096+
{
1097+
Assert.Equal(1, await responseStream.ReadAsync(new Memory<byte>(buffer2)));
1098+
Assert.Equal((byte)'h', buffer2[0]);
1099+
tcs.SetResult(true);
1100+
}
1101+
else
1102+
{
1103+
Assert.Equal('h', await responseStream.ReadByteAsync());
1104+
}
10871105
Assert.Equal('e', await responseStream.ReadByteAsync());
10881106
Assert.Equal(1, await responseStream.ReadAsync(new Memory<byte>(buffer)));
10891107
Assert.Equal((byte)'l', buffer[0]);
@@ -1184,7 +1202,18 @@ await server.AcceptConnectionAsync(async connection =>
11841202
{
11851203
case true:
11861204
await connection.SendResponseAsync(HttpStatusCode.OK, headers: new HttpHeaderData[] { new HttpHeaderData("Transfer-Encoding", "chunked") }, isFinal: false);
1187-
await connection.SendResponseBodyAsync("3\r\nhel\r\n8\r\nlo world\r\n0\r\n\r\n");
1205+
if(PlatformDetection.IsBrowser && slowChunks)
1206+
{
1207+
await connection.SendResponseBodyAsync("1\r\nh\r\n", false);
1208+
await tcs.Task;
1209+
await connection.SendResponseBodyAsync("2\r\nel\r\n", false);
1210+
await connection.SendResponseBodyAsync("8\r\nlo world\r\n", false);
1211+
await connection.SendResponseBodyAsync("0\r\n\r\n", true);
1212+
}
1213+
else
1214+
{
1215+
await connection.SendResponseBodyAsync("3\r\nhel\r\n8\r\nlo world\r\n0\r\n\r\n");
1216+
}
11881217
break;
11891218

11901219
case false:
@@ -1295,6 +1324,80 @@ await LoopbackServerFactory.CreateClientAndServerAsync(async uri =>
12951324
server => server.AcceptConnectionSendResponseAndCloseAsync());
12961325
}
12971326

1327+
[ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsBrowser))]
1328+
[ActiveIssue("https://github.com/dotnet/runtime/issues/65429", typeof(PlatformDetection), nameof(PlatformDetection.IsNodeJS))]
1329+
public async Task ReadAsStreamAsync_StreamingCancellation()
1330+
{
1331+
var tcs = new TaskCompletionSource<bool>();
1332+
var tcs2 = new TaskCompletionSource<bool>();
1333+
await LoopbackServerFactory.CreateClientAndServerAsync(async uri =>
1334+
{
1335+
var request = new HttpRequestMessage(HttpMethod.Get, uri) { Version = UseVersion };
1336+
#if !NETFRAMEWORK
1337+
request.Options.Set(new HttpRequestOptionsKey<bool>("WebAssemblyEnableStreamingResponse"), true);
1338+
#endif
1339+
1340+
var cts = new CancellationTokenSource();
1341+
using (var client = new HttpMessageInvoker(CreateHttpClientHandler()))
1342+
using (HttpResponseMessage response = await client.SendAsync(TestAsync, request, CancellationToken.None))
1343+
{
1344+
using (Stream responseStream = await response.Content.ReadAsStreamAsync(TestAsync))
1345+
{
1346+
var buffer = new byte[1];
1347+
#if !NETFRAMEWORK
1348+
Assert.Equal(1, await responseStream.ReadAsync(new Memory<byte>(buffer)));
1349+
Assert.Equal((byte)'h', buffer[0]);
1350+
var sizePromise = responseStream.ReadAsync(new Memory<byte>(buffer), cts.Token);
1351+
await tcs2.Task; // wait for the request and response header to be sent
1352+
cts.Cancel();
1353+
await Assert.ThrowsAsync<TaskCanceledException>(async () => await sizePromise);
1354+
tcs.SetResult(true);
1355+
#endif
1356+
}
1357+
}
1358+
}, async server =>
1359+
{
1360+
await server.AcceptConnectionAsync(async connection =>
1361+
{
1362+
await connection.ReadRequestDataAsync();
1363+
await connection.SendResponseAsync(HttpStatusCode.OK, headers: new HttpHeaderData[] { new HttpHeaderData("Transfer-Encoding", "chunked") }, isFinal: false);
1364+
await connection.SendResponseBodyAsync("1\r\nh\r\n", false);
1365+
tcs2.SetResult(true);
1366+
await tcs.Task;
1367+
});
1368+
});
1369+
}
1370+
1371+
[ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsBrowser))]
1372+
public async Task ReadAsStreamAsync_Cancellation()
1373+
{
1374+
var tcs = new TaskCompletionSource<bool>();
1375+
var tcs2 = new TaskCompletionSource<bool>();
1376+
await LoopbackServerFactory.CreateClientAndServerAsync(async uri =>
1377+
{
1378+
var request = new HttpRequestMessage(HttpMethod.Get, uri) { Version = UseVersion };
1379+
var cts = new CancellationTokenSource();
1380+
using (var client = new HttpMessageInvoker(CreateHttpClientHandler()))
1381+
{
1382+
var responsePromise = client.SendAsync(TestAsync, request, cts.Token);
1383+
await tcs2.Task; // wait for the request to be sent
1384+
cts.Cancel();
1385+
await Assert.ThrowsAsync<TaskCanceledException>(async () => await responsePromise);
1386+
tcs.SetResult(true);
1387+
}
1388+
}, async server =>
1389+
{
1390+
await server.AcceptConnectionAsync(async connection =>
1391+
{
1392+
await connection.ReadRequestDataAsync();
1393+
tcs2.SetResult(true);
1394+
await connection.SendResponseAsync(HttpStatusCode.OK, headers: new HttpHeaderData[] { new HttpHeaderData("Transfer-Encoding", "chunked") }, isFinal: false);
1395+
await connection.SendResponseBodyAsync("1\r\nh\r\n", false);
1396+
await tcs.Task;
1397+
});
1398+
});
1399+
}
1400+
12981401
[Fact]
12991402
[ActiveIssue("https://github.com/dotnet/runtime/issues/58812", TestPlatforms.Browser)]
13001403
public async Task Dispose_DisposingHandlerCancelsActiveOperationsWithoutResponses()

src/mono/wasm/runtime/http.ts

Lines changed: 25 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
// The .NET Foundation licenses this file to you under the MIT license.
33

44
import { wrap_as_cancelable_promise } from "./cancelable-promise";
5+
import { Module } from "./imports";
56
import { MemoryViewType, Span } from "./marshal";
67
import { mono_assert } from "./types";
78
import { VoidPtr } from "./types/emscripten";
@@ -21,7 +22,12 @@ export function http_wasm_abort_request(abort_controller: AbortController): void
2122
export function http_wasm_abort_response(res: ResponseExtension): void {
2223
res.__abort_controller.abort();
2324
if (res.__reader) {
24-
res.__reader.cancel();
25+
res.__reader.cancel().catch((err) => {
26+
if (err && err.name !== "AbortError") {
27+
Module.printErr("MONO_WASM: Error in http_wasm_abort_response: " + err);
28+
}
29+
// otherwise, it's expected
30+
});
2531
}
2632
}
2733

@@ -100,42 +106,33 @@ export function http_wasm_get_response_bytes(res: ResponseExtension, view: Span)
100106
return bytes_read;
101107
}
102108

103-
export async function http_wasm_get_streamed_response_bytes(res: ResponseExtension, bufferPtr: VoidPtr, bufferLength: number): Promise<number> {
109+
export function http_wasm_get_streamed_response_bytes(res: ResponseExtension, bufferPtr: VoidPtr, bufferLength: number): Promise<number> {
104110
// the bufferPtr is pinned by the caller
105111
const view = new Span(bufferPtr, bufferLength, MemoryViewType.Byte);
106112
return wrap_as_cancelable_promise(async () => {
107-
if (!res.__chunk && res.body) {
108-
res.__reader = res.body.getReader();
113+
if (!res.__reader) {
114+
res.__reader = res.body!.getReader();
115+
}
116+
if (!res.__chunk) {
109117
res.__chunk = await res.__reader.read();
110118
res.__source_offset = 0;
111119
}
120+
if (res.__chunk.done) {
121+
return 0;
122+
}
112123

113-
let target_offset = 0;
114-
let bytes_read = 0;
115-
// loop until end of browser stream or end of C# buffer
116-
while (res.__reader && res.__chunk && !res.__chunk.done) {
117-
const remaining_source = res.__chunk.value.byteLength - res.__source_offset;
118-
if (remaining_source === 0) {
119-
res.__chunk = await res.__reader.read();
120-
res.__source_offset = 0;
121-
continue;// are we done yet
122-
}
123-
124-
const remaining_target = view.byteLength - target_offset;
125-
const bytes_copied = Math.min(remaining_source, remaining_target);
126-
const source_view = res.__chunk.value.subarray(res.__source_offset, res.__source_offset + bytes_copied);
127-
128-
// copy available bytes
129-
view.set(source_view, target_offset);
130-
target_offset += bytes_copied;
131-
bytes_read += bytes_copied;
132-
res.__source_offset += bytes_copied;
124+
const remaining_source = res.__chunk.value.byteLength - res.__source_offset;
125+
mono_assert(remaining_source > 0, "expected remaining_source to be greater than 0");
133126

134-
if (target_offset == view.byteLength) {
135-
return bytes_read;
136-
}
127+
const bytes_copied = Math.min(remaining_source, view.byteLength);
128+
const source_view = res.__chunk.value.subarray(res.__source_offset, res.__source_offset + bytes_copied);
129+
view.set(source_view, 0);
130+
res.__source_offset += bytes_copied;
131+
if (remaining_source == bytes_copied) {
132+
res.__chunk = undefined;
137133
}
138-
return bytes_read;
134+
135+
return bytes_copied;
139136
});
140137
}
141138

0 commit comments

Comments
 (0)