diff --git a/src/Raven.Client/Http/RavenCommand.cs b/src/Raven.Client/Http/RavenCommand.cs index b11c84a981f1..c357e81fc9a7 100644 --- a/src/Raven.Client/Http/RavenCommand.cs +++ b/src/Raven.Client/Http/RavenCommand.cs @@ -8,6 +8,7 @@ using System.Threading.Tasks; using Raven.Client.Documents.Conventions; using Raven.Client.Extensions; +using Raven.Client.Json; using Raven.Client.Util; using Sparrow; using Sparrow.Json; @@ -93,11 +94,24 @@ public virtual void SetResponse(JsonOperationContext context, BlittableJsonReade throw new InvalidOperationException($"'{GetType()}' command must override the SetResponse method which expects response with the following type: {ResponseType}."); } - public virtual Task SendAsync(HttpClient client, HttpRequestMessage request, CancellationToken token) + public virtual async Task SendAsync(HttpClient client, HttpRequestMessage request, CancellationToken token) { // We must use HttpCompletionOption.ResponseHeadersRead otherwise the client will buffer the response // and we'll get OutOfMemoryException in huge responses (> 2GB). - return client.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, token); + try + { + return await client.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, token).ConfigureAwait(false); + } + finally + { + if (request.Content is BlittableJsonContent bjc) + { + var requestBodyTask = bjc.EnsureCompletedAsync(); + + if (requestBodyTask.IsCompleted == false) + await requestBodyTask.ConfigureAwait(false); + } + } } public virtual void SetResponseRaw(HttpResponseMessage response, Stream stream, JsonOperationContext context) diff --git a/src/Raven.Client/Json/BlittableJsonContent.cs b/src/Raven.Client/Json/BlittableJsonContent.cs index 45c486545139..d66399463cd0 100644 --- a/src/Raven.Client/Json/BlittableJsonContent.cs +++ b/src/Raven.Client/Json/BlittableJsonContent.cs @@ -3,12 +3,17 @@ using System.IO.Compression; using System.Net; using System.Net.Http; +using System.Threading; using System.Threading.Tasks; namespace Raven.Client.Json { internal class BlittableJsonContent : HttpContent { + private static readonly TaskCompletionSource Sentinel = new(TaskCreationOptions.RunContinuationsAsynchronously); + + private TaskCompletionSource _tcs; + private readonly Func _asyncTaskWriter; public BlittableJsonContent(Func writer) @@ -17,20 +22,38 @@ public BlittableJsonContent(Func writer) Headers.ContentEncoding.Add("gzip"); } + // In some cases a task returned by HttpClient.SendAsync may be completed before the call to the request content's SerializeToStreamAsync completes + // This method is used to wait for the completion of SerializeToStreamAsync + // https://github.com/dotnet/runtime/issues/107082 + public Task EnsureCompletedAsync() => + Interlocked.CompareExchange(ref _tcs, Sentinel, null) is null + ? Task.CompletedTask // SerializeToStreamAsync was never called + : _tcs!.Task; + protected override async Task SerializeToStreamAsync(Stream stream, TransportContext context) { - // Immediately flush request stream to send headers - // https://github.com/dotnet/corefx/issues/39586#issuecomment-516210081 - // https://github.com/dotnet/runtime/issues/96223#issuecomment-1865009861 - await stream.FlushAsync().ConfigureAwait(false); + if (Interlocked.CompareExchange(ref _tcs, new(TaskCreationOptions.RunContinuationsAsynchronously), null) is not null) + throw new InvalidOperationException($"Already called previously, or called after {nameof(EnsureCompletedAsync)}"); + + try + { + // Immediately flush request stream to send headers + // https://github.com/dotnet/corefx/issues/39586#issuecomment-516210081 + // https://github.com/dotnet/runtime/issues/96223#issuecomment-1865009861 + await stream.FlushAsync().ConfigureAwait(false); #if NETSTANDARD2_0 || NETCOREAPP2_1 - using (var gzipStream = new GZipStream(stream, CompressionMode.Compress, leaveOpen: true)) + using (var gzipStream = new GZipStream(stream, CompressionMode.Compress, leaveOpen: true)) #else - await using (var gzipStream = new GZipStream(stream, CompressionMode.Compress, leaveOpen: true)) + await using (var gzipStream = new GZipStream(stream, CompressionMode.Compress, leaveOpen: true)) #endif + { + await _asyncTaskWriter(gzipStream).ConfigureAwait(false); + } + } + finally { - await _asyncTaskWriter(gzipStream).ConfigureAwait(false); + _tcs!.TrySetResult(null); } }