Skip to content

Commit

Permalink
Transport: Fix unify HttpClient usage across Gateway classes (#1548)
Browse files Browse the repository at this point in the history
#1429 added support for HttpClientFactory inside the gateway connection mode, but the DocumentClient was maintaining two other HttpClient instances on GatewayAccountReader and GatewayAddressCache.

If the user was trying to use a custom HttpClientFactory to use a single HttpClient instance, it was not reaching these classes.

The effect also was that we were maintaining 3 different HttpClient instances normally.

This PR unifies this into a single HttpClient, that can either be created or obtained from the HttpClientFactory.
  • Loading branch information
ealsur authored Jul 20, 2020
1 parent 768c0cf commit daf7a8c
Show file tree
Hide file tree
Showing 9 changed files with 179 additions and 299 deletions.
93 changes: 61 additions & 32 deletions Microsoft.Azure.Cosmos/src/DocumentClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ namespace Microsoft.Azure.Cosmos
using System.Net;
using System.Net.Http;
using System.Net.Http.Headers;
using System.Runtime.CompilerServices;
using System.Security;
using System.Text;
using System.Threading;
Expand Down Expand Up @@ -109,6 +110,8 @@ internal partial class DocumentClient : IDisposable, IAuthorizationTokenProvider
private const bool DefaultEnableCpuMonitor = true;
private const bool EnableAuthFailureTraces = false;

private static readonly TimeSpan GatewayRequestTimeout = TimeSpan.FromSeconds(65);
// Gateway has backoff/retry logic to hide transient errors.
private readonly IDictionary<string, List<PartitionKeyAndResourceTokenPair>> resourceTokens;
private RetryPolicy retryPolicy;
private bool allowOverrideStrongerConsistency = false;
Expand Down Expand Up @@ -150,6 +153,7 @@ internal partial class DocumentClient : IDisposable, IAuthorizationTokenProvider
// creator of TransportClient is responsible for disposing it.
private IStoreClientFactory storeClientFactory;
private HttpClient mediaClient;
private HttpClient httpClient;

// Flag that indicates whether store client factory must be disposed whenever client is disposed.
// Setting this flag to false will result in store client factory not being disposed when client is disposed.
Expand Down Expand Up @@ -564,6 +568,37 @@ private static List<ResourceToken> GetResourceTokens(IList<Documents.Permission>
}).ToList();
}

public static HttpClient BuildHttpClient(
ConnectionPolicy connectionPolicy,
ApiType apiType,
HttpMessageHandler messageHandler)
{
HttpClient httpClient;
if (connectionPolicy.HttpClientFactory != null)
{
httpClient = connectionPolicy.HttpClientFactory();
}
else
{
httpClient = messageHandler == null ? new HttpClient() : new HttpClient(messageHandler);
}

httpClient.Timeout = (connectionPolicy.RequestTimeout > DocumentClient.GatewayRequestTimeout) ? connectionPolicy.RequestTimeout : DocumentClient.GatewayRequestTimeout;
httpClient.DefaultRequestHeaders.CacheControl = new CacheControlHeaderValue { NoCache = true };

httpClient.AddUserAgentHeader(connectionPolicy.UserAgentContainer);
httpClient.AddApiTypeHeader(apiType);

// Set requested API version header that can be used for
// version enforcement.
httpClient.DefaultRequestHeaders.Add(HttpConstants.HttpHeaders.Version,
HttpConstants.Versions.CurrentVersion);

httpClient.DefaultRequestHeaders.Add(HttpConstants.HttpHeaders.Accept, RuntimeConstants.MediaTypes.Json);

return httpClient;
}

/// <summary>
/// Initializes a new instance of the <see cref="DocumentClient"/> class using the
/// specified Azure Cosmos DB service endpoint, a list of <see cref="ResourceToken"/> objects and a connection policy.
Expand Down Expand Up @@ -1053,6 +1088,8 @@ internal virtual void Initialize(Uri serviceEndpoint,
this.mediaClient.DefaultRequestHeaders.Add(HttpConstants.HttpHeaders.Accept,
RuntimeConstants.MediaTypes.Any);

this.httpClient = DocumentClient.BuildHttpClient(this.ConnectionPolicy, this.ApiType, this.httpMessageHandler);

if (sessionContainer != null)
{
this.sessionContainer = sessionContainer;
Expand Down Expand Up @@ -1117,33 +1154,13 @@ private async Task GetInitializationTaskAsync(IStoreClientFactory storeClientFac
this.EnsureValidOverwrite(this.desiredConsistencyLevel.Value);
}

GatewayStoreModel gatewayStoreModel;
if (this.ConnectionPolicy.HttpClientFactory != null)
{
gatewayStoreModel = new GatewayStoreModel(
GatewayStoreModel gatewayStoreModel = new GatewayStoreModel(
this.GlobalEndpointManager,
this.sessionContainer,
this.ConnectionPolicy.RequestTimeout,
(Cosmos.ConsistencyLevel)this.accountServiceConfiguration.DefaultConsistencyLevel,
this.eventSource,
this.serializerSettings,
this.ConnectionPolicy.UserAgentContainer,
this.ApiType,
this.ConnectionPolicy.HttpClientFactory);
}
else
{
gatewayStoreModel = new GatewayStoreModel(
this.GlobalEndpointManager,
this.sessionContainer,
this.ConnectionPolicy.RequestTimeout,
(Cosmos.ConsistencyLevel)this.accountServiceConfiguration.DefaultConsistencyLevel,
this.eventSource,
this.serializerSettings,
this.ConnectionPolicy.UserAgentContainer,
this.ApiType,
this.httpMessageHandler);
}
this.httpClient);

this.GatewayStoreModel = gatewayStoreModel;

Expand Down Expand Up @@ -1423,6 +1440,21 @@ public void Dispose()
this.mediaClient = null;
}

if (this.httpClient != null)
{
try
{
this.httpClient.Dispose();
}
catch (Exception exception)
{
DefaultTrace.TraceWarning("Exception {0} thrown during dispose of HttpClient, this could happen if there are inflight request during the dispose of client",
exception);
}

this.httpClient = null;
}

if (this.authKeyHashFunction != null)
{
this.authKeyHashFunction.Dispose();
Expand Down Expand Up @@ -6824,11 +6856,9 @@ private void InitializeDirectConnectivity(IStoreClientFactory storeClientFactory
this,
this.collectionCache,
this.partitionKeyRangeCache,
this.ConnectionPolicy.UserAgentContainer,
this.accountServiceConfiguration,
this.httpMessageHandler,
this.ConnectionPolicy,
this.ApiType);
this.httpClient);

// Check if we have a store client factory in input and if we do, do not initialize another store client
// The purpose is to reuse store client factory across all document clients inside compute gateway
Expand Down Expand Up @@ -6910,13 +6940,12 @@ private void DisableRntbdChannel()
private async Task InitializeGatewayConfigurationReaderAsync()
{
GatewayAccountReader accountReader = new GatewayAccountReader(
this.ServiceEndpoint,
this.authKeyHashFunction,
this.hasAuthKeyResourceToken,
this.authKeyResourceToken,
this.ConnectionPolicy,
this.ApiType,
this.httpMessageHandler);
serviceEndpoint: this.ServiceEndpoint,
stringHMACSHA256Helper: this.authKeyHashFunction,
hasResourceToken: this.hasAuthKeyResourceToken,
resourceToken: this.authKeyResourceToken,
connectionPolicy: this.ConnectionPolicy,
httpClient: this.httpClient);

this.accountServiceConfiguration = new CosmosAccountServiceConfiguration(accountReader.InitializeReaderAsync);

Expand Down
42 changes: 13 additions & 29 deletions Microsoft.Azure.Cosmos/src/GatewayAccountReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,38 +18,27 @@ internal sealed class GatewayAccountReader
private readonly IComputeHash authKeyHashFunction;
private readonly bool hasAuthKeyResourceToken = false;
private readonly string authKeyResourceToken = string.Empty;
private readonly HttpMessageHandler messageHandler;
private Uri serviceEndpoint;
private ApiType apiType;
private readonly HttpClient httpClient;
private readonly Uri serviceEndpoint;

public GatewayAccountReader(Uri serviceEndpoint,
IComputeHash stringHMACSHA256Helper,
bool hasResourceToken,
string resourceToken,
ConnectionPolicy connectionPolicy,
ApiType apiType,
HttpMessageHandler messageHandler = null)
IComputeHash stringHMACSHA256Helper,
bool hasResourceToken,
string resourceToken,
ConnectionPolicy connectionPolicy,
HttpClient httpClient)
{
this.httpClient = httpClient;
this.serviceEndpoint = serviceEndpoint;
this.authKeyHashFunction = stringHMACSHA256Helper;
this.hasAuthKeyResourceToken = hasResourceToken;
this.authKeyResourceToken = resourceToken;
this.connectionPolicy = connectionPolicy;
this.messageHandler = messageHandler;
this.apiType = apiType;
}

private async Task<AccountProperties> GetDatabaseAccountAsync(Uri serviceEndpoint)
{
HttpClient httpClient = this.messageHandler == null ? new HttpClient() : new HttpClient(this.messageHandler);

httpClient.DefaultRequestHeaders.Add(HttpConstants.HttpHeaders.Version,
HttpConstants.Versions.CurrentVersion);

// Send client version.
httpClient.AddUserAgentHeader(this.connectionPolicy.UserAgentContainer);
httpClient.AddApiTypeHeader(this.apiType);

INameValueCollection headers = new DictionaryNameValueCollection(StringComparer.Ordinal);
string authorizationToken = string.Empty;
if (this.hasAuthKeyResourceToken)
{
Expand All @@ -59,22 +48,17 @@ private async Task<AccountProperties> GetDatabaseAccountAsync(Uri serviceEndpoin
{
// Retrieve the document service properties.
string xDate = DateTime.UtcNow.ToString("r", CultureInfo.InvariantCulture);
httpClient.DefaultRequestHeaders.Add(HttpConstants.HttpHeaders.XDate, xDate);

INameValueCollection headersCollection = new DictionaryNameValueCollection();
headersCollection.Add(HttpConstants.HttpHeaders.XDate, xDate);
headers.Set(HttpConstants.HttpHeaders.XDate, xDate);

authorizationToken = AuthorizationHelper.GenerateKeyAuthorizationSignature(
HttpConstants.HttpMethods.Get,
serviceEndpoint,
headersCollection,
headers,
this.authKeyHashFunction);
}

httpClient.DefaultRequestHeaders.Add(HttpConstants.HttpHeaders.Authorization, authorizationToken);

using (HttpResponseMessage responseMessage = await httpClient.GetHttpAsync(
serviceEndpoint))
headers.Set(HttpConstants.HttpHeaders.Authorization, authorizationToken);
using (HttpResponseMessage responseMessage = await this.httpClient.GetAsync(serviceEndpoint, headers))
{
using (DocumentServiceResponse documentServiceResponse = await ClientExtensions.ParseResponseAsync(responseMessage))
{
Expand Down
89 changes: 4 additions & 85 deletions Microsoft.Azure.Cosmos/src/GatewayStoreModel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ namespace Microsoft.Azure.Cosmos
using System.Linq;
using System.Net;
using System.Net.Http;
using System.Net.Http.Headers;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.Core.Trace;
Expand All @@ -21,105 +20,25 @@ namespace Microsoft.Azure.Cosmos
// Marking it as non-sealed in order to unit test it using Moq framework
internal class GatewayStoreModel : IStoreModel, IDisposable
{
// Gateway has backoff/retry logic to hide transient errors.
private readonly TimeSpan requestTimeout = TimeSpan.FromSeconds(65);
private readonly GlobalEndpointManager endpointManager;
private readonly DocumentClientEventSource eventSource;
private readonly ISessionContainer sessionContainer;
private readonly ConsistencyLevel defaultConsistencyLevel;

private GatewayStoreClient gatewayStoreClient;
private CookieContainer cookieJar;

private GatewayStoreModel(
GlobalEndpointManager endpointManager,
ISessionContainer sessionContainer,
ConsistencyLevel defaultConsistencyLevel,
DocumentClientEventSource eventSource)
{
// CookieContainer is not really required, but is helpful in debugging.
this.cookieJar = new CookieContainer();
this.endpointManager = endpointManager;
this.sessionContainer = sessionContainer;
this.defaultConsistencyLevel = defaultConsistencyLevel;
this.eventSource = eventSource;
}

public GatewayStoreModel(
GlobalEndpointManager endpointManager,
ISessionContainer sessionContainer,
TimeSpan requestTimeout,
ConsistencyLevel defaultConsistencyLevel,
DocumentClientEventSource eventSource,
JsonSerializerSettings serializerSettings,
UserAgentContainer userAgent,
ApiType apiType,
HttpMessageHandler messageHandler)
: this(endpointManager,
sessionContainer,
defaultConsistencyLevel,
eventSource)
{
this.InitializeGatewayStoreClient(
requestTimeout,
serializerSettings,
userAgent,
apiType,
new HttpClient(messageHandler ?? new HttpClientHandler { CookieContainer = this.cookieJar }));
}

public GatewayStoreModel(
GlobalEndpointManager endpointManager,
ISessionContainer sessionContainer,
TimeSpan requestTimeout,
ConsistencyLevel defaultConsistencyLevel,
DocumentClientEventSource eventSource,
JsonSerializerSettings serializerSettings,
UserAgentContainer userAgent,
ApiType apiType,
Func<HttpClient> httpClientFactory)
: this(endpointManager,
sessionContainer,
defaultConsistencyLevel,
eventSource)
{
HttpClient httpClient = httpClientFactory();
if (httpClient == null)
{
throw new InvalidOperationException("HttpClientFactory did not produce an HttpClient");
}

this.InitializeGatewayStoreClient(
requestTimeout,
serializerSettings,
userAgent,
apiType,
httpClient);

}

private void InitializeGatewayStoreClient(
TimeSpan requestTimeout,
JsonSerializerSettings serializerSettings,
UserAgentContainer userAgent,
ApiType apiType,
HttpClient httpClient)
{
// Use max of client specified and our own request timeout value when sending
// requests to gateway. Otherwise, we will have gateway's transient
// error hiding retries are of no use.
httpClient.Timeout = (requestTimeout > this.requestTimeout) ? requestTimeout : this.requestTimeout;
httpClient.DefaultRequestHeaders.CacheControl = new CacheControlHeaderValue { NoCache = true };

httpClient.AddUserAgentHeader(userAgent);
httpClient.AddApiTypeHeader(apiType);

// Set requested API version header that can be used for
// version enforcement.
httpClient.DefaultRequestHeaders.Add(HttpConstants.HttpHeaders.Version,
HttpConstants.Versions.CurrentVersion);

httpClient.DefaultRequestHeaders.Add(HttpConstants.HttpHeaders.Accept, RuntimeConstants.MediaTypes.Json);
this.endpointManager = endpointManager;
this.sessionContainer = sessionContainer;
this.defaultConsistencyLevel = defaultConsistencyLevel;
this.eventSource = eventSource;

this.gatewayStoreClient = new GatewayStoreClient(
httpClient,
Expand Down
Loading

0 comments on commit daf7a8c

Please sign in to comment.