Skip to content

Commit

Permalink
Gateway: Fixes gateway mode ApplySessionToken bug causes cache to alw…
Browse files Browse the repository at this point in the history
…ays refresh (#2408)

The ApplySessionToken logic was using the partitionKeyRangeId.ToString() instead of the correct partitionKeyRangeId.PartitionKeyRangeId. This was causing the cache to always miss and force a refresh on every call.

Testing:

Added a new http handler which validates that partition key range cache only does a network call on the initial load.
Added unit tests validating the partition key range scenario
  • Loading branch information
j82w authored Apr 22, 2021
1 parent 8c23103 commit b9bb04a
Show file tree
Hide file tree
Showing 9 changed files with 284 additions and 143 deletions.
104 changes: 60 additions & 44 deletions Microsoft.Azure.Cosmos/src/GatewayStoreModel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ internal static async Task ApplySessionTokenAsync(
ConsistencyLevel defaultConsistencyLevel,
ISessionContainer sessionContainer,
PartitionKeyRangeCache partitionKeyRangeCache,
ClientCollectionCache clientCollectionCache)
CollectionCache clientCollectionCache)
{
if (request.Headers == null)
{
Expand Down Expand Up @@ -250,10 +250,11 @@ internal static async Task ApplySessionTokenAsync(
return; // Only apply the session token in case of session consistency and the request is read only
}

(bool isSuccess, string sessionToken) = await GatewayStoreModel.TryResolveSessionTokenAsync(request,
sessionContainer,
partitionKeyRangeCache,
clientCollectionCache);
(bool isSuccess, string sessionToken) = await GatewayStoreModel.TryResolveSessionTokenAsync(
request,
sessionContainer,
partitionKeyRangeCache,
clientCollectionCache);

if (!isSuccess)
{
Expand All @@ -266,10 +267,11 @@ internal static async Task ApplySessionTokenAsync(
}
}

internal static async Task<Tuple<bool, string>> TryResolveSessionTokenAsync(DocumentServiceRequest request,
ISessionContainer sessionContainer,
PartitionKeyRangeCache partitionKeyRangeCache,
ClientCollectionCache clientCollectionCache)
internal static async Task<Tuple<bool, string>> TryResolveSessionTokenAsync(
DocumentServiceRequest request,
ISessionContainer sessionContainer,
PartitionKeyRangeCache partitionKeyRangeCache,
CollectionCache clientCollectionCache)
{
if (request == null)
{
Expand All @@ -293,11 +295,12 @@ internal static async Task<Tuple<bool, string>> TryResolveSessionTokenAsync(Docu

if (request.ResourceType.IsPartitioned())
{
(bool isSuccess, PartitionKeyRange partitionKeyRange) = await TryResolvePartitionKeyRangeAsync(request: request,
sessionContainer: sessionContainer,
partitionKeyRangeCache: partitionKeyRangeCache,
clientCollectionCache: clientCollectionCache,
refreshCache: false);
(bool isSuccess, PartitionKeyRange partitionKeyRange) = await TryResolvePartitionKeyRangeAsync(
request: request,
sessionContainer: sessionContainer,
partitionKeyRangeCache: partitionKeyRangeCache,
clientCollectionCache: clientCollectionCache,
refreshCache: false);

if (isSuccess && sessionContainer is SessionContainer gatewaySessionContainer)
{
Expand All @@ -313,11 +316,12 @@ internal static async Task<Tuple<bool, string>> TryResolveSessionTokenAsync(Docu
return new Tuple<bool, string>(false, null);
}

private static async Task<Tuple<bool, PartitionKeyRange>> TryResolvePartitionKeyRangeAsync(DocumentServiceRequest request,
ISessionContainer sessionContainer,
PartitionKeyRangeCache partitionKeyRangeCache,
ClientCollectionCache clientCollectionCache,
bool refreshCache)
private static async Task<Tuple<bool, PartitionKeyRange>> TryResolvePartitionKeyRangeAsync(
DocumentServiceRequest request,
ISessionContainer sessionContainer,
PartitionKeyRangeCache partitionKeyRangeCache,
CollectionCache clientCollectionCache,
bool refreshCache)
{
if (refreshCache)
{
Expand All @@ -326,39 +330,50 @@ private static async Task<Tuple<bool, PartitionKeyRange>> TryResolvePartitionKey
}

PartitionKeyRange partitonKeyRange = null;
ContainerProperties collection = await clientCollectionCache.ResolveCollectionAsync(request, CancellationToken.None, NoOpTrace.Singleton);
ContainerProperties collection = await clientCollectionCache.ResolveCollectionAsync(
request,
CancellationToken.None,
NoOpTrace.Singleton);

string partitionKeyString = request.Headers[HttpConstants.HttpHeaders.PartitionKey];
if (partitionKeyString != null)
{
CollectionRoutingMap collectionRoutingMap = await partitionKeyRangeCache.TryLookupAsync(collectionRid: collection.ResourceId,
previousValue: null,
request: request,
cancellationToken: CancellationToken.None,
NoOpTrace.Singleton);
CollectionRoutingMap collectionRoutingMap = await partitionKeyRangeCache.TryLookupAsync(
collectionRid: collection.ResourceId,
previousValue: null,
request: request,
cancellationToken: CancellationToken.None,
NoOpTrace.Singleton);

if (refreshCache && collectionRoutingMap != null)
{
collectionRoutingMap = await partitionKeyRangeCache.TryLookupAsync(collectionRid: collection.ResourceId,
previousValue: collectionRoutingMap,
request: request,
cancellationToken: CancellationToken.None,
NoOpTrace.Singleton);
collectionRoutingMap = await partitionKeyRangeCache.TryLookupAsync(
collectionRid: collection.ResourceId,
previousValue: collectionRoutingMap,
request: request,
cancellationToken: CancellationToken.None,
NoOpTrace.Singleton);
}

partitonKeyRange = AddressResolver.TryResolveServerPartitionByPartitionKey(request: request,
partitionKeyString: partitionKeyString,
collectionCacheUptoDate: false,
collection: collection,
routingMap: collectionRoutingMap);
partitonKeyRange = AddressResolver.TryResolveServerPartitionByPartitionKey(
request: request,
partitionKeyString: partitionKeyString,
collectionCacheUptoDate: false,
collection: collection,
routingMap: collectionRoutingMap);
}
else if (request.PartitionKeyRangeIdentity != null)
{
PartitionKeyRangeIdentity partitionKeyRangeId = request.PartitionKeyRangeIdentity;
partitonKeyRange = await partitionKeyRangeCache.TryGetPartitionKeyRangeByIdAsync(collection.ResourceId,
partitionKeyRangeId.ToString(),
NoOpTrace.Singleton,
refreshCache);
partitonKeyRange = await partitionKeyRangeCache.TryGetPartitionKeyRangeByIdAsync(
collection.ResourceId,
partitionKeyRangeId.PartitionKeyRangeId,
NoOpTrace.Singleton,
refreshCache);
}
else if (request.RequestContext.ResolvedPartitionKeyRange != null)
{
partitonKeyRange = request.RequestContext.ResolvedPartitionKeyRange;
}

if (partitonKeyRange == null)
Expand All @@ -369,11 +384,12 @@ private static async Task<Tuple<bool, PartitionKeyRange>> TryResolvePartitionKey
}

// need to refresh cache. Maybe split happened.
return await GatewayStoreModel.TryResolvePartitionKeyRangeAsync(request: request,
sessionContainer: sessionContainer,
partitionKeyRangeCache: partitionKeyRangeCache,
clientCollectionCache: clientCollectionCache,
refreshCache: true);
return await GatewayStoreModel.TryResolvePartitionKeyRangeAsync(
request: request,
sessionContainer: sessionContainer,
partitionKeyRangeCache: partitionKeyRangeCache,
clientCollectionCache: clientCollectionCache,
refreshCache: true);
}

return new Tuple<bool, PartitionKeyRange>(true, partitonKeyRange);
Expand Down
5 changes: 4 additions & 1 deletion Microsoft.Azure.Cosmos/src/Routing/PartitionKeyRangeCache.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@ internal class PartitionKeyRangeCache : IRoutingMapProvider, ICollectionRoutingM
private readonly IStoreModel storeModel;
private readonly CollectionCache collectionCache;

public PartitionKeyRangeCache(IAuthorizationTokenProvider authorizationTokenProvider, IStoreModel storeModel, CollectionCache collectionCache)
public PartitionKeyRangeCache(
IAuthorizationTokenProvider authorizationTokenProvider,
IStoreModel storeModel,
CollectionCache collectionCache)
{
this.routingMapCache = new AsyncCache<string, CollectionRoutingMap>(
EqualityComparer<CollectionRoutingMap>.Default,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public class CosmosItemTests : BaseCosmosClientHelper
[TestInitialize]
public async Task TestInitialize()
{
await base.TestInit();
await base.TestInit(validateSinglePartitionKeyRangeCacheCall: true);
string PartitionKey = "/pk";
this.containerSettings = new ContainerProperties(id: Guid.NewGuid().ToString(), partitionKeyPath: PartitionKey);
ContainerResponse response = await this.database.CreateContainerAsync(
Expand Down Expand Up @@ -436,7 +436,8 @@ public async Task NonPartitionKeyLookupCacheTest()
Debugger.Break();
}
});
});
},
validatePartitionKeyRangeCalls: false);

string dbName = Guid.NewGuid().ToString();
string containerName = Guid.NewGuid().ToString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,31 @@ namespace Microsoft.Azure.Cosmos.SDK.EmulatorTests
using Microsoft.VisualStudio.TestTools.UnitTesting;

[TestClass]
public class GatewaySessionTokenTests : BaseCosmosClientHelper
public class GatewaySessionTokenTests
{
private CosmosClient cosmosClient;
private Cosmos.Database database;
private ContainerInternal Container = null;
private const string PartitionKey = "/pk";

[TestInitialize]
public async Task TestInitialize()
{
this.cosmosClient = TestCommon.CreateCosmosClient(useGateway: true);
CosmosClientOptions cosmosClientOptions = new CosmosClientOptions()
{
ConnectionMode = ConnectionMode.Gateway,
ConnectionProtocol = Documents.Client.Protocol.Https,
ConsistencyLevel = Cosmos.ConsistencyLevel.Session,
HttpClientFactory = () => new HttpClient(new HttpHandlerMetaDataValidator()),
};

this.cosmosClient = TestCommon.CreateCosmosClient(cosmosClientOptions);
this.database = await this.cosmosClient.CreateDatabaseAsync(
id: Guid.NewGuid().ToString());
ContainerResponse response = await this.database.CreateContainerAsync(
new ContainerProperties(id: Guid.NewGuid().ToString(), partitionKeyPath: PartitionKey),
throughput: 20000,
cancellationToken: this.cancellationToken);
cancellationToken: default);
Assert.IsNotNull(response);
Assert.IsNotNull(response.Container);
Assert.IsNotNull(response.Resource);
Expand All @@ -49,13 +59,24 @@ public async Task TestInitialize()
[TestCleanup]
public async Task Cleanup()
{
await base.TestCleanup();
if (this.cosmosClient != null)
{
if(this.database != null)
{
await this.database.DeleteStreamAsync();
}

this.cosmosClient.Dispose();
}
}

[TestMethod]
public async Task TestGatewayModelSession()
{
ContainerProperties containerProperties = await this.Container.GetCachedContainerPropertiesAsync(false, NoOpTrace.Singleton, CancellationToken.None);
ContainerProperties containerProperties = await this.Container.GetCachedContainerPropertiesAsync(
false,
Trace.GetRootTrace("Test"),
CancellationToken.None);

ISessionContainer sessionContainer = this.cosmosClient.DocumentClient.sessionContainer;
string docLink = "dbs/" + this.database.Id + "/colls/" + containerProperties.Id + "/docs/3";
Expand All @@ -80,14 +101,13 @@ await this.cosmosClient.DocumentClient.GetPartitionKeyRangeCacheAsync(NoOpTrace.
public async Task GatewaySameSessionTokenTest()
{
string createSessionToken = null;
GatewaySessionTokenTests.HttpClientHandlerHelper httpClientHandler = new HttpClientHandlerHelper
HttpHandlerMetaDataValidator httpClientHandler = new HttpHandlerMetaDataValidator
{
ResponseCallBack = (result) =>
RequestCallBack = (request, response) =>
{
HttpResponseMessage response = result.Result;
if (response.StatusCode != HttpStatusCode.Created)
{
return response;
return;
}
response.Headers.TryGetValues("x-ms-session-token", out IEnumerable<string> sessionTokens);
Expand All @@ -96,7 +116,6 @@ public async Task GatewaySameSessionTokenTest()
createSessionToken = singleToken;
break;
}
return response;
}
};

Expand Down Expand Up @@ -126,19 +145,5 @@ await client.DocumentClient.GetPartitionKeyRangeCacheAsync(NoOpTrace.Singleton),
Assert.AreEqual(readSessionToken, createSessionToken);
}
}

private class HttpClientHandlerHelper : DelegatingHandler
{
public HttpClientHandlerHelper() : base(new HttpClientHandler())
{
}

public Func<Task<HttpResponseMessage>, HttpResponseMessage> ResponseCallBack { get; set; }

protected override Task<HttpResponseMessage> SendAsync(HttpRequestMessage request, CancellationToken cancellationToken)
{
return base.SendAsync(request, cancellationToken).ContinueWith(this.ResponseCallBack);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ namespace Microsoft.Azure.Cosmos.SDK.EmulatorTests
using System.Collections.Generic;
using System.Collections.ObjectModel;
using System.Linq;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;
using Linq;
using Microsoft.Azure.Cosmos.Utils;
Expand All @@ -19,9 +21,6 @@ namespace Microsoft.Azure.Cosmos.SDK.EmulatorTests
[TestCategory("Emulator")]
public class SmokeTests
{
private static readonly string Host;
private static readonly string MasterKey;

private const string DatabaseName = "netcore_test_db";
private const string CollectionName = "netcore_test_coll";
private const string PartitionedCollectionName = "netcore_test_pcoll";
Expand All @@ -32,8 +31,6 @@ public class SmokeTests

static SmokeTests()
{
SmokeTests.MasterKey = ConfigurationManager.AppSettings["MasterKey"];
SmokeTests.Host = ConfigurationManager.AppSettings["GatewayEndpoint"];
}

/// <summary>
Expand Down Expand Up @@ -89,8 +86,6 @@ private async Task DocumentInsertsTest()
Database database = await this.client.CreateDatabaseIfNotExistsAsync(DatabaseName);
Container container = await this.CreatePartitionedCollectionIfNotExists(database, PartitionedCollectionName);

Uri documentCollectionUri = UriFactory.CreateDocumentCollectionUri(DatabaseName, PartitionedCollectionName);

for (int i = 0; i < 2; i++)
{
string id = i.ToString();
Expand Down Expand Up @@ -317,9 +312,14 @@ private async Task CreateDocumentCollectionIfNotExists()

private CosmosClient GetDocumentClient(ConnectionMode connectionMode, Documents.Client.Protocol protocol)
{
CosmosClientOptions connectionPolicy = new CosmosClientOptions() { ConnectionMode = connectionMode, ConnectionProtocol = protocol };
CosmosClientOptions connectionPolicy = new CosmosClientOptions()
{
ConnectionMode = connectionMode,
ConnectionProtocol = protocol,
ConsistencyLevel = ConsistencyLevel.Session,
};

return new CosmosClient(Host, MasterKey, connectionPolicy);
return TestCommon.CreateCosmosClient(connectionPolicy);
}

private async Task<Container> CreatePartitionedCollectionIfNotExists(Database database, string collectionName)
Expand Down
Loading

0 comments on commit b9bb04a

Please sign in to comment.