Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Transport: Fix unify HttpClient usage across Gateway classes #1548

Merged
merged 19 commits into from
Jul 20, 2020
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 61 additions & 32 deletions Microsoft.Azure.Cosmos/src/DocumentClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ namespace Microsoft.Azure.Cosmos
using System.Net;
using System.Net.Http;
using System.Net.Http.Headers;
using System.Runtime.CompilerServices;
using System.Security;
using System.Text;
using System.Threading;
Expand Down Expand Up @@ -109,6 +110,8 @@ internal partial class DocumentClient : IDisposable, IAuthorizationTokenProvider
private const bool DefaultEnableCpuMonitor = true;
private const bool EnableAuthFailureTraces = false;

private static readonly TimeSpan GatewayRequestTimeout = TimeSpan.FromSeconds(65);
// Gateway has backoff/retry logic to hide transient errors.
private readonly IDictionary<string, List<PartitionKeyAndResourceTokenPair>> resourceTokens;
private RetryPolicy retryPolicy;
private bool allowOverrideStrongerConsistency = false;
Expand Down Expand Up @@ -150,6 +153,7 @@ internal partial class DocumentClient : IDisposable, IAuthorizationTokenProvider
// creator of TransportClient is responsible for disposing it.
private IStoreClientFactory storeClientFactory;
private HttpClient mediaClient;
private HttpClient httpClient;

// Flag that indicates whether store client factory must be disposed whenever client is disposed.
// Setting this flag to false will result in store client factory not being disposed when client is disposed.
Expand Down Expand Up @@ -564,6 +568,37 @@ private static List<ResourceToken> GetResourceTokens(IList<Documents.Permission>
}).ToList();
}

public static HttpClient BuildHttpClient(
ConnectionPolicy connectionPolicy,
ApiType apiType,
HttpMessageHandler messageHandler)
{
HttpClient httpClient;
if (connectionPolicy.HttpClientFactory != null)
{
httpClient = connectionPolicy.HttpClientFactory();
}
else
{
httpClient = messageHandler == null ? new HttpClient() : new HttpClient(messageHandler);
}

httpClient.Timeout = (connectionPolicy.RequestTimeout > DocumentClient.GatewayRequestTimeout) ? connectionPolicy.RequestTimeout : DocumentClient.GatewayRequestTimeout;
httpClient.DefaultRequestHeaders.CacheControl = new CacheControlHeaderValue { NoCache = true };

httpClient.AddUserAgentHeader(connectionPolicy.UserAgentContainer);
httpClient.AddApiTypeHeader(apiType);

// Set requested API version header that can be used for
// version enforcement.
httpClient.DefaultRequestHeaders.Add(HttpConstants.HttpHeaders.Version,
HttpConstants.Versions.CurrentVersion);

httpClient.DefaultRequestHeaders.Add(HttpConstants.HttpHeaders.Accept, RuntimeConstants.MediaTypes.Json);

return httpClient;
}

/// <summary>
/// Initializes a new instance of the <see cref="DocumentClient"/> class using the
/// specified Azure Cosmos DB service endpoint, a list of <see cref="ResourceToken"/> objects and a connection policy.
Expand Down Expand Up @@ -1053,6 +1088,8 @@ internal virtual void Initialize(Uri serviceEndpoint,
this.mediaClient.DefaultRequestHeaders.Add(HttpConstants.HttpHeaders.Accept,
RuntimeConstants.MediaTypes.Any);

this.httpClient = DocumentClient.BuildHttpClient(this.ConnectionPolicy, this.ApiType, this.httpMessageHandler);

if (sessionContainer != null)
{
this.sessionContainer = sessionContainer;
Expand Down Expand Up @@ -1117,33 +1154,13 @@ private async Task GetInitializationTaskAsync(IStoreClientFactory storeClientFac
this.EnsureValidOverwrite(this.desiredConsistencyLevel.Value);
}

GatewayStoreModel gatewayStoreModel;
if (this.ConnectionPolicy.HttpClientFactory != null)
{
gatewayStoreModel = new GatewayStoreModel(
GatewayStoreModel gatewayStoreModel = new GatewayStoreModel(
this.GlobalEndpointManager,
this.sessionContainer,
this.ConnectionPolicy.RequestTimeout,
(Cosmos.ConsistencyLevel)this.accountServiceConfiguration.DefaultConsistencyLevel,
this.eventSource,
this.serializerSettings,
this.ConnectionPolicy.UserAgentContainer,
this.ApiType,
this.ConnectionPolicy.HttpClientFactory);
}
else
{
gatewayStoreModel = new GatewayStoreModel(
this.GlobalEndpointManager,
this.sessionContainer,
this.ConnectionPolicy.RequestTimeout,
(Cosmos.ConsistencyLevel)this.accountServiceConfiguration.DefaultConsistencyLevel,
this.eventSource,
this.serializerSettings,
this.ConnectionPolicy.UserAgentContainer,
this.ApiType,
this.httpMessageHandler);
}
this.httpClient);

this.GatewayStoreModel = gatewayStoreModel;

Expand Down Expand Up @@ -1423,6 +1440,21 @@ public void Dispose()
this.mediaClient = null;
}

if (this.httpClient != null)
{
try
{
this.httpClient.Dispose();
}
catch (Exception exception)
{
DefaultTrace.TraceWarning("Exception {0} thrown during dispose of HttpClient, this could happen if there are inflight request during the dispose of client",
exception);
}

this.httpClient = null;
}

if (this.authKeyHashFunction != null)
{
this.authKeyHashFunction.Dispose();
Expand Down Expand Up @@ -6824,11 +6856,9 @@ private void InitializeDirectConnectivity(IStoreClientFactory storeClientFactory
this,
this.collectionCache,
this.partitionKeyRangeCache,
this.ConnectionPolicy.UserAgentContainer,
this.accountServiceConfiguration,
this.httpMessageHandler,
this.ConnectionPolicy,
this.ApiType);
this.httpClient);

// Check if we have a store client factory in input and if we do, do not initialize another store client
// The purpose is to reuse store client factory across all document clients inside compute gateway
Expand Down Expand Up @@ -6910,13 +6940,12 @@ private void DisableRntbdChannel()
private async Task InitializeGatewayConfigurationReaderAsync()
ealsur marked this conversation as resolved.
Show resolved Hide resolved
{
GatewayAccountReader accountReader = new GatewayAccountReader(
this.ServiceEndpoint,
this.authKeyHashFunction,
this.hasAuthKeyResourceToken,
this.authKeyResourceToken,
this.ConnectionPolicy,
this.ApiType,
this.httpMessageHandler);
serviceEndpoint: this.ServiceEndpoint,
stringHMACSHA256Helper: this.authKeyHashFunction,
hasResourceToken: this.hasAuthKeyResourceToken,
resourceToken: this.authKeyResourceToken,
connectionPolicy: this.ConnectionPolicy,
httpClient: this.httpClient);

this.accountServiceConfiguration = new CosmosAccountServiceConfiguration(accountReader.InitializeReaderAsync);

Expand Down
42 changes: 13 additions & 29 deletions Microsoft.Azure.Cosmos/src/GatewayAccountReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,38 +18,27 @@ internal sealed class GatewayAccountReader
private readonly IComputeHash authKeyHashFunction;
private readonly bool hasAuthKeyResourceToken = false;
private readonly string authKeyResourceToken = string.Empty;
private readonly HttpMessageHandler messageHandler;
private Uri serviceEndpoint;
private ApiType apiType;
private readonly HttpClient httpClient;
private readonly Uri serviceEndpoint;

public GatewayAccountReader(Uri serviceEndpoint,
IComputeHash stringHMACSHA256Helper,
bool hasResourceToken,
string resourceToken,
ConnectionPolicy connectionPolicy,
ApiType apiType,
HttpMessageHandler messageHandler = null)
IComputeHash stringHMACSHA256Helper,
bool hasResourceToken,
string resourceToken,
ConnectionPolicy connectionPolicy,
HttpClient httpClient)
{
this.httpClient = httpClient;
this.serviceEndpoint = serviceEndpoint;
this.authKeyHashFunction = stringHMACSHA256Helper;
this.hasAuthKeyResourceToken = hasResourceToken;
this.authKeyResourceToken = resourceToken;
this.connectionPolicy = connectionPolicy;
this.messageHandler = messageHandler;
this.apiType = apiType;
}

private async Task<AccountProperties> GetDatabaseAccountAsync(Uri serviceEndpoint)
{
HttpClient httpClient = this.messageHandler == null ? new HttpClient() : new HttpClient(this.messageHandler);

httpClient.DefaultRequestHeaders.Add(HttpConstants.HttpHeaders.Version,
HttpConstants.Versions.CurrentVersion);

// Send client version.
httpClient.AddUserAgentHeader(this.connectionPolicy.UserAgentContainer);
httpClient.AddApiTypeHeader(this.apiType);

INameValueCollection headers = new DictionaryNameValueCollection(StringComparer.Ordinal);
string authorizationToken = string.Empty;
if (this.hasAuthKeyResourceToken)
{
Expand All @@ -59,22 +48,17 @@ private async Task<AccountProperties> GetDatabaseAccountAsync(Uri serviceEndpoin
{
// Retrieve the document service properties.
string xDate = DateTime.UtcNow.ToString("r", CultureInfo.InvariantCulture);
httpClient.DefaultRequestHeaders.Add(HttpConstants.HttpHeaders.XDate, xDate);

INameValueCollection headersCollection = new DictionaryNameValueCollection();
headersCollection.Add(HttpConstants.HttpHeaders.XDate, xDate);
headers.Set(HttpConstants.HttpHeaders.XDate, xDate);

authorizationToken = AuthorizationHelper.GenerateKeyAuthorizationSignature(
HttpConstants.HttpMethods.Get,
serviceEndpoint,
headersCollection,
headers,
this.authKeyHashFunction);
}

httpClient.DefaultRequestHeaders.Add(HttpConstants.HttpHeaders.Authorization, authorizationToken);

using (HttpResponseMessage responseMessage = await httpClient.GetHttpAsync(
serviceEndpoint))
headers.Set(HttpConstants.HttpHeaders.Authorization, authorizationToken);
using (HttpResponseMessage responseMessage = await this.httpClient.GetAsync(serviceEndpoint, headers))
{
using (DocumentServiceResponse documentServiceResponse = await ClientExtensions.ParseResponseAsync(responseMessage))
{
Expand Down
89 changes: 4 additions & 85 deletions Microsoft.Azure.Cosmos/src/GatewayStoreModel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ namespace Microsoft.Azure.Cosmos
using System.Linq;
using System.Net;
using System.Net.Http;
using System.Net.Http.Headers;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.Core.Trace;
Expand All @@ -21,105 +20,25 @@ namespace Microsoft.Azure.Cosmos
// Marking it as non-sealed in order to unit test it using Moq framework
internal class GatewayStoreModel : IStoreModel, IDisposable
{
// Gateway has backoff/retry logic to hide transient errors.
private readonly TimeSpan requestTimeout = TimeSpan.FromSeconds(65);
private readonly GlobalEndpointManager endpointManager;
private readonly DocumentClientEventSource eventSource;
private readonly ISessionContainer sessionContainer;
private readonly ConsistencyLevel defaultConsistencyLevel;

private GatewayStoreClient gatewayStoreClient;
private CookieContainer cookieJar;

private GatewayStoreModel(
GlobalEndpointManager endpointManager,
ISessionContainer sessionContainer,
ConsistencyLevel defaultConsistencyLevel,
DocumentClientEventSource eventSource)
{
// CookieContainer is not really required, but is helpful in debugging.
this.cookieJar = new CookieContainer();
this.endpointManager = endpointManager;
this.sessionContainer = sessionContainer;
this.defaultConsistencyLevel = defaultConsistencyLevel;
this.eventSource = eventSource;
}

public GatewayStoreModel(
GlobalEndpointManager endpointManager,
ISessionContainer sessionContainer,
TimeSpan requestTimeout,
ConsistencyLevel defaultConsistencyLevel,
DocumentClientEventSource eventSource,
JsonSerializerSettings serializerSettings,
UserAgentContainer userAgent,
ApiType apiType,
HttpMessageHandler messageHandler)
: this(endpointManager,
sessionContainer,
defaultConsistencyLevel,
eventSource)
{
this.InitializeGatewayStoreClient(
requestTimeout,
serializerSettings,
userAgent,
apiType,
new HttpClient(messageHandler ?? new HttpClientHandler { CookieContainer = this.cookieJar }));
}

public GatewayStoreModel(
GlobalEndpointManager endpointManager,
ISessionContainer sessionContainer,
TimeSpan requestTimeout,
ConsistencyLevel defaultConsistencyLevel,
DocumentClientEventSource eventSource,
JsonSerializerSettings serializerSettings,
UserAgentContainer userAgent,
ApiType apiType,
Func<HttpClient> httpClientFactory)
: this(endpointManager,
sessionContainer,
defaultConsistencyLevel,
eventSource)
{
HttpClient httpClient = httpClientFactory();
if (httpClient == null)
{
throw new InvalidOperationException("HttpClientFactory did not produce an HttpClient");
}

this.InitializeGatewayStoreClient(
requestTimeout,
serializerSettings,
userAgent,
apiType,
httpClient);

}

private void InitializeGatewayStoreClient(
TimeSpan requestTimeout,
JsonSerializerSettings serializerSettings,
UserAgentContainer userAgent,
ApiType apiType,
HttpClient httpClient)
{
// Use max of client specified and our own request timeout value when sending
// requests to gateway. Otherwise, we will have gateway's transient
// error hiding retries are of no use.
httpClient.Timeout = (requestTimeout > this.requestTimeout) ? requestTimeout : this.requestTimeout;
httpClient.DefaultRequestHeaders.CacheControl = new CacheControlHeaderValue { NoCache = true };

httpClient.AddUserAgentHeader(userAgent);
httpClient.AddApiTypeHeader(apiType);

// Set requested API version header that can be used for
// version enforcement.
httpClient.DefaultRequestHeaders.Add(HttpConstants.HttpHeaders.Version,
HttpConstants.Versions.CurrentVersion);

httpClient.DefaultRequestHeaders.Add(HttpConstants.HttpHeaders.Accept, RuntimeConstants.MediaTypes.Json);
this.endpointManager = endpointManager;
this.sessionContainer = sessionContainer;
this.defaultConsistencyLevel = defaultConsistencyLevel;
this.eventSource = eventSource;

this.gatewayStoreClient = new GatewayStoreClient(
httpClient,
Expand Down
Loading