Skip to content

Commit

Permalink
RavenDB-22577 Making sure that we'll wait for the completion of Blitt…
Browse files Browse the repository at this point in the history
…ableJsonContent.SerializeToStreamAsync after HttpClient.SendAsync has already completed. There are cases when this is possible e.g. in HTTP2 is done to allow for duplex communication.
  • Loading branch information
arekpalinski committed Aug 30, 2024
1 parent f8d3926 commit c13f18f
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 9 deletions.
18 changes: 16 additions & 2 deletions src/Raven.Client/Http/RavenCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using System.Threading.Tasks;
using Raven.Client.Documents.Conventions;
using Raven.Client.Extensions;
using Raven.Client.Json;
using Raven.Client.Util;
using Sparrow;
using Sparrow.Json;
Expand Down Expand Up @@ -93,11 +94,24 @@ public virtual void SetResponse(JsonOperationContext context, BlittableJsonReade
throw new InvalidOperationException($"'{GetType()}' command must override the SetResponse method which expects response with the following type: {ResponseType}.");
}

public virtual Task<HttpResponseMessage> SendAsync(HttpClient client, HttpRequestMessage request, CancellationToken token)
public virtual async Task<HttpResponseMessage> SendAsync(HttpClient client, HttpRequestMessage request, CancellationToken token)
{
// We must use HttpCompletionOption.ResponseHeadersRead otherwise the client will buffer the response
// and we'll get OutOfMemoryException in huge responses (> 2GB).
return client.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, token);
try
{
return await client.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, token).ConfigureAwait(false);
}
finally
{
if (request.Content is BlittableJsonContent bjc)
{
var requestBodyTask = bjc.EnsureCompletedAsync();

if (requestBodyTask.IsCompleted == false)
await requestBodyTask.ConfigureAwait(false);
}
}
}

public virtual void SetResponseRaw(HttpResponseMessage response, Stream stream, JsonOperationContext context)
Expand Down
37 changes: 30 additions & 7 deletions src/Raven.Client/Json/BlittableJsonContent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,17 @@
using System.IO.Compression;
using System.Net;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;

namespace Raven.Client.Json
{
internal class BlittableJsonContent : HttpContent
{
private static readonly TaskCompletionSource<object> Sentinel = new(TaskCreationOptions.RunContinuationsAsynchronously);

private TaskCompletionSource<object> _tcs;

private readonly Func<Stream, Task> _asyncTaskWriter;

public BlittableJsonContent(Func<Stream, Task> writer)
Expand All @@ -17,20 +22,38 @@ public BlittableJsonContent(Func<Stream, Task> writer)
Headers.ContentEncoding.Add("gzip");
}

// In some cases a task returned by HttpClient.SendAsync may be completed before the call to the request content's SerializeToStreamAsync completes
// This method is used to wait for the completion of SerializeToStreamAsync
// https://github.com/dotnet/runtime/issues/107082
public Task EnsureCompletedAsync() =>
Interlocked.CompareExchange(ref _tcs, Sentinel, null) is null
? Task.CompletedTask // SerializeToStreamAsync was never called
: _tcs!.Task;

protected override async Task SerializeToStreamAsync(Stream stream, TransportContext context)
{
// Immediately flush request stream to send headers
// https://github.com/dotnet/corefx/issues/39586#issuecomment-516210081
// https://github.com/dotnet/runtime/issues/96223#issuecomment-1865009861
await stream.FlushAsync().ConfigureAwait(false);
if (Interlocked.CompareExchange(ref _tcs, new(TaskCreationOptions.RunContinuationsAsynchronously), null) is not null)
throw new InvalidOperationException($"Already called previously, or called after {nameof(EnsureCompletedAsync)}");

try
{
// Immediately flush request stream to send headers
// https://github.com/dotnet/corefx/issues/39586#issuecomment-516210081
// https://github.com/dotnet/runtime/issues/96223#issuecomment-1865009861
await stream.FlushAsync().ConfigureAwait(false);

#if NETSTANDARD2_0 || NETCOREAPP2_1
using (var gzipStream = new GZipStream(stream, CompressionMode.Compress, leaveOpen: true))
using (var gzipStream = new GZipStream(stream, CompressionMode.Compress, leaveOpen: true))
#else
await using (var gzipStream = new GZipStream(stream, CompressionMode.Compress, leaveOpen: true))
await using (var gzipStream = new GZipStream(stream, CompressionMode.Compress, leaveOpen: true))
#endif
{
await _asyncTaskWriter(gzipStream).ConfigureAwait(false);
}
}
finally
{
await _asyncTaskWriter(gzipStream).ConfigureAwait(false);
_tcs!.TrySetResult(null);
}
}

Expand Down

0 comments on commit c13f18f

Please sign in to comment.