Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for processing requests with StreamContent to AddStandardHedgingHandler() #5112

Closed
Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// The .NET Foundation licenses this file to you under the MIT license.

using System;
using System.IO;
using System.Net.Http;
using System.Threading.Tasks;
using Microsoft.Shared.Diagnostics;
Expand All @@ -25,10 +26,15 @@ protected override async ValueTask<Outcome<TResult>> ExecuteCore<TResult, TState
Throw.InvalidOperationException("The HTTP request message was not found in the resilience context.");
}

using var snapshot = RequestMessageSnapshot.Create(request);

context.Properties.Set(ResilienceKeys.RequestSnapshot, snapshot);

return await callback(context, state).ConfigureAwait(context.ContinueOnCapturedContext);
try
{
using var snapshot = await RequestMessageSnapshot.CreateAsync(request).ConfigureAwait(context.ContinueOnCapturedContext);
context.Properties.Set(ResilienceKeys.RequestSnapshot, snapshot);
return await callback(context, state).ConfigureAwait(context.ContinueOnCapturedContext);
}
catch (IOException e)
{
return Outcome.FromException<TResult>(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@

using System;
using System.Diagnostics.CodeAnalysis;
using System.IO;
using System.Net.Http;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using Microsoft.Extensions.Http.Resilience;
Expand Down Expand Up @@ -88,26 +90,49 @@ public static IStandardHedgingHandlerBuilder AddStandardHedgingHandler(this IHtt
Throw.InvalidOperationException("Request message snapshot is not attached to the resilience context.");
}

var requestMessage = snapshot.CreateRequestMessage();

// The secondary request message should use the action resilience context
requestMessage.SetResilienceContext(args.ActionContext);

// replace the request message
args.ActionContext.Properties.Set(ResilienceKeys.RequestMessage, requestMessage);

// if a routing strategy has been configured, get the next route from the routing strategy
Uri? route;
if (args.PrimaryContext.Properties.TryGetValue(ResilienceKeys.RoutingStrategy, out var routingPipeline))
{
if (!routingPipeline.TryGetNextRoute(out var route))
// if a routing strategy has been configured but it does not return the next route, then no more routes
// are availabe, stop hedging
if (!routingPipeline.TryGetNextRoute(out route))
{
// no routes left, stop hedging
return null;
}

requestMessage.RequestUri = requestMessage.RequestUri!.ReplaceHost(route);
}
else
{
route = null;
}

return async () =>
{
Outcome<HttpResponseMessage>? actionResult = null;

try
{
var requestMessage = await snapshot.CreateRequestMessageAsync().ConfigureAwait(args.ActionContext.ContinueOnCapturedContext);

// The secondary request message should use the action resilience context
requestMessage.SetResilienceContext(args.ActionContext);

// replace the request message
args.ActionContext.Properties.Set(ResilienceKeys.RequestMessage, requestMessage);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
args.ActionContext.Properties.Set(ResilienceKeys.RequestMessage, requestMessage);
args.ActionContext.Properties.Set(ResilienceKeys.RequestMessage, requestMessage);

There is a breaking change here :/

The OnHedging callback does not have access to request message anymore. This is because OnHedging is called after the action is created and before it is invoked.

https://github.com/App-vNext/Polly/blob/f85029c6d14ad20fd36e4fcdde7a32f33409137a/src/Polly.Core/Hedging/Controller/TaskExecution.cs#L127


if (route is not null)
{
// replace the Host on the RequestUri of the request per the routing strategy
requestMessage.RequestUri = requestMessage.RequestUri!.ReplaceHost(route);
}
}
catch (IOException e)
{
actionResult = Outcome.FromException<HttpResponseMessage>(e);
}

return () => args.Callback(args.ActionContext);
return actionResult ?? await args.Callback(args.ActionContext).ConfigureAwait(args.ActionContext.ContinueOnCapturedContext);
};
};
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@

using System;
using System.Collections.Generic;
using System.IO;
using System.Net.Http;
using System.Threading.Tasks;
using Microsoft.Extensions.ObjectPool;
using Microsoft.Shared.Diagnostics;
using Microsoft.Shared.Pools;
Expand All @@ -22,21 +24,40 @@ internal sealed class RequestMessageSnapshot : IResettable, IDisposable
private Version? _version;
private HttpContent? _content;

public static RequestMessageSnapshot Create(HttpRequestMessage request)
[System.Diagnostics.CodeAnalysis.SuppressMessage("Resilience", "EA0014:The async method doesn't support cancellation", Justification = "Past the point of no cancellation.")]
public static async Task<RequestMessageSnapshot> CreateAsync(HttpRequestMessage request)
{
_ = Throw.IfNull(request);

var snapshot = _snapshots.Get();
snapshot.Initialize(request);
await snapshot.InitializeAsync(request).ConfigureAwait(false);
return snapshot;
}

public HttpRequestMessage CreateRequestMessage()
[System.Diagnostics.CodeAnalysis.SuppressMessage("Resilience", "EA0014:The async method doesn't support cancellation", Justification = "Past the point of no cancellation.")]
public async Task<HttpRequestMessage> CreateRequestMessageAsync()
adamhammond marked this conversation as resolved.
Show resolved Hide resolved
{
var clone = new HttpRequestMessage(_method!, _requestUri)
if (!IsInitialized())
{
throw new InvalidOperationException($"{nameof(CreateRequestMessageAsync)}() cannot be called on a snapshot object that has been reset and/or has not been initialized");
}

var clone = new HttpRequestMessage(_method!, _requestUri?.OriginalString)
{
Content = _content,
Version = _version!
};

if (_content is StreamContent)
adamhammond marked this conversation as resolved.
Show resolved Hide resolved
{
(HttpContent? content, HttpContent? clonedContent) = await CloneContentAsync(_content).ConfigureAwait(false);
_content = content;
clone.Content = clonedContent;
}
else
{
clone.Content = _content;
}

#if NET5_0_OR_GREATER
foreach (var prop in _properties)
{
Expand All @@ -56,6 +77,7 @@ public HttpRequestMessage CreateRequestMessage()
return clone;
}

[System.Diagnostics.CodeAnalysis.SuppressMessage("Critical Bug", "S2952:Classes should \"Dispose\" of members from the classes' own \"Dispose\" methods", Justification = "Handled by ObjectPool")]
bool IResettable.TryReset()
{
_properties.Clear();
Expand All @@ -64,24 +86,76 @@ bool IResettable.TryReset()
_method = null;
_version = null;
_requestUri = null;
if (_content is StreamContent)
{
// a snapshot's StreamContent is always a unique copy (deep clone)
// therefore, it is safe to dispose when snapshot is no longer needed
_content.Dispose();
}

_content = null;

return true;
}

void IDisposable.Dispose() => _snapshots.Return(this);

private void Initialize(HttpRequestMessage request)
[System.Diagnostics.CodeAnalysis.SuppressMessage("Resilience", "EA0014:The async method doesn't support cancellation", Justification = "Past the point of no cancellation.")]
private static async Task<(HttpContent? content, HttpContent? clonedContent)> CloneContentAsync(HttpContent? content)
{
if (request.Content is StreamContent)
HttpContent? clonedContent = null;
if (content is not null)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This introduces performance regression for non-streamed content. Create an extra branch for StreamContent where this logic belongs.

Copy link
Author

@adamhammond adamhammond Apr 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@martintmk can you expand on why you believe this will cause a performance regression?

The cloning of the HttpContent (i.e. this method) only executes if the request.Content is StreamContent. See the other places in this class where this CloneContentAsync method is called for more context. You will see that each call is wrapped in a if (request.Content is StreamContent) check. Therefore, I am performing the branching that you are suggesting (pending that I'm understanding your usage of the term "branch" to mean logic branch is correct). Note: I did this specifically to prevent any performance regression for existing non-Streamed content users of the AddStandardHedgingHandler API.

Also, if you are referring to the if (request.Content is StreamContent) check as being the culprit of a new performance regression, please observe that this check already existed before my change. The only difference is that, prior to this change, if request.Content is StreamContent is true, then the RequestMessageSnapshot methods throw an InvalidOperationException. Therefore, perf should remain the same for existing, non-Streamed content users.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, thanks for detailed explanation. Can you still try to avoid the extra branches by condensing the code into a single PrepareContentAsync method
It would also reduce the number of mutants you have to kill.

Copy link
Author

@adamhammond adamhammond Apr 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh I see what you mean. I understand where you are coming from; however, I don't think encapsulating the content operations behind a single method will achieve the additional clarity or eliminate the required mutation tests as you are suggesting. Below are my thoughts:

  1. The mutation tests still need to be accounted for regardless.
  2. I don't believe it would add any additional clarity. In fact, I believe it might do the opposite and here's why: the method would need to operate in both directions (i.e. cloning a passed-in request object into the given RequestMessageSnapshot instance fields and cloning the given RequestMessageSnapshot instance into a new request object). I don't see any clear way to represent that behind a single method without it creating more confusion. It even seems like it would be a code smell.

{
Throw.InvalidOperationException($"{nameof(StreamContent)} content cannot by cloned.");
HttpContent originalContent = content;
Stream originalRequestBody = await content.ReadAsStreamAsync().ConfigureAwait(false);
MemoryStream clonedRequestBody = new MemoryStream();
await originalRequestBody.CopyToAsync(clonedRequestBody).ConfigureAwait(false);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The streamed content can point to a large file which can have > GBs in size. Since we are copying to memory we should constraint it. Let's say < 10MB

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if it's a good idea to impose a size limit. For example, I don't think the framework imposes any such limit when cloning request content for redirected requests. IMO, we should allow this to be limitless, and require users to enforce their own request content size limits, if they have them, via server configurations and/or their own custom handlers, filters, middleware, etc. Thoughts?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change can buffer potentially endless stream into memory and crash the process. I think it's something we should guard against.

This thing should be used for relatively small streamed payloads.

Btw, you can try to call LoadIntoBufferAsync on the content to see how it behaves.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do understand the concern; however, the max size that a given stream can grow before it crashes the process is bound only by the memory resources available on the given server that it's executing on. Therefore, by choosing a static max limit, we are creating a cap that might unnecessarily hinder users of the API that have both requirements and the necessary server resources available to process requests with exceptionally large stream content. Further, RequestMessageSnapshot is only cloning content from existing request objects that contain existing StreamContent. Therefore, a HttpRequestMessage object has already been created with StreamContent of the given size before any of the RequestMessageSnapshot operations are executed. If the size of the StreamContent was in fact too large, it would have already crashed the process.

I feel strongly that we should not impose a size limit when cloning the StreamContent.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the size of the StreamContent was in fact too large, it would have already crashed the process.

This is not true, the StreamContent can point to file stream and it consumes minimal memory even as you read the whole file. What you are doing here is materializing the whole stream into the memory. This is what concerns me.

Copy link
Author

@adamhammond adamhammond May 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only way what you are saying is true is if the content is a FileStream and is read into a buffer, correct? This is somewhat of a remarkable edge case since we are talking about streams in the context of HTTP request content. It would help me understand this better if you could provide an example scenario where a developer would need to support this. Also, it is universally known that a given request must be deep cloned in order to support hedging. I believe the general assumption is that deep cloning will be handled via cloning to a MemoryStream as opposed to any alternative stream, which would require the entire stream to be copied to memory. Therefore, I return to my previous stance that users of the AddStandardHedgingHandler extension method should add their own handler, middleware, etc. to impose a size limit on request content if they have one. My biggest concern is that we don't have enough information to know what the optimal max limit value should be that would support the most use cases effectively.

clonedRequestBody.Position = 0;
if (originalRequestBody.CanSeek)
{
originalRequestBody.Position = 0;
}
else
{
originalRequestBody = new MemoryStream();
await clonedRequestBody.CopyToAsync(originalRequestBody).ConfigureAwait(false);
originalRequestBody.Position = 0;
clonedRequestBody.Position = 0;
}

clonedContent = new StreamContent(clonedRequestBody);
content = new StreamContent(originalRequestBody);
foreach (KeyValuePair<string, IEnumerable<string>> header in originalContent.Headers)
{
_ = clonedContent.Headers.TryAddWithoutValidation(header.Key, header.Value);
_ = content.Headers.TryAddWithoutValidation(header.Key, header.Value);
}
}

return (content, clonedContent);
}

private bool IsInitialized()
{
return _method is not null;
}

[System.Diagnostics.CodeAnalysis.SuppressMessage("Resilience", "EA0014:The async method doesn't support cancellation", Justification = "Past the point of no cancellation.")]
private async Task InitializeAsync(HttpRequestMessage request)
{
_method = request.Method;
_version = request.Version;
_requestUri = request.RequestUri;
_content = request.Content;
if (request.Content is StreamContent)
{
(HttpContent? requestContent, HttpContent? clonedRequestContent) = await CloneContentAsync(request.Content).ConfigureAwait(false);
_content = clonedRequestContent;
request.Content = requestContent;
}
else
{
_content = request.Content;
}

// headers
_headers.AddRange(request.Headers);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ public void Configure_ValidConfigurationSection_ShouldInitialize()
}

[Fact]
public void ActionGenerator_Ok()
public async Task ActionGenerator_Ok()
{
var options = Builder.Services.BuildServiceProvider().GetRequiredService<IOptionsMonitor<HttpStandardHedgingResilienceOptions>>().Get(Builder.Name);
var generator = options.Hedging.ActionGenerator;
Expand All @@ -115,7 +115,7 @@ public void ActionGenerator_Ok()
generator.Invoking(g => g(args)).Should().Throw<InvalidOperationException>().WithMessage("Request message snapshot is not attached to the resilience context.");

using var request = new HttpRequestMessage();
using var snapshot = RequestMessageSnapshot.Create(request);
using var snapshot = await RequestMessageSnapshot.CreateAsync(request).ConfigureAwait(false);
primary.Properties.Set(ResilienceKeys.RequestSnapshot, snapshot);
generator.Invoking(g => g(args)).Should().NotThrow();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public async Task SendAsync_EnsureSnapshotAttached()
}

[Fact]
public void ExecuteAsync_requestMessageNotFound_Throws()
public void ExecuteAsync_RequestMessageNotFound_Throws()
{
var strategy = Create();

Expand Down
Loading
Loading