Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Gateway: Fixes gateway mode ApplySessionToken bug causes cache to always refresh #2408

Merged
merged 15 commits into from
Apr 22, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 60 additions & 44 deletions Microsoft.Azure.Cosmos/src/GatewayStoreModel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ internal static async Task ApplySessionTokenAsync(
ConsistencyLevel defaultConsistencyLevel,
ISessionContainer sessionContainer,
PartitionKeyRangeCache partitionKeyRangeCache,
ClientCollectionCache clientCollectionCache)
CollectionCache clientCollectionCache)
{
if (request.Headers == null)
{
Expand Down Expand Up @@ -250,10 +250,11 @@ internal static async Task ApplySessionTokenAsync(
return; // Only apply the session token in case of session consistency and the request is read only
}

(bool isSuccess, string sessionToken) = await GatewayStoreModel.TryResolveSessionTokenAsync(request,
sessionContainer,
partitionKeyRangeCache,
clientCollectionCache);
(bool isSuccess, string sessionToken) = await GatewayStoreModel.TryResolveSessionTokenAsync(
request,
sessionContainer,
partitionKeyRangeCache,
clientCollectionCache);

if (!isSuccess)
{
Expand All @@ -266,10 +267,11 @@ internal static async Task ApplySessionTokenAsync(
}
}

internal static async Task<Tuple<bool, string>> TryResolveSessionTokenAsync(DocumentServiceRequest request,
ISessionContainer sessionContainer,
PartitionKeyRangeCache partitionKeyRangeCache,
ClientCollectionCache clientCollectionCache)
internal static async Task<Tuple<bool, string>> TryResolveSessionTokenAsync(
DocumentServiceRequest request,
ISessionContainer sessionContainer,
PartitionKeyRangeCache partitionKeyRangeCache,
CollectionCache clientCollectionCache)
{
if (request == null)
{
Expand All @@ -293,11 +295,12 @@ internal static async Task<Tuple<bool, string>> TryResolveSessionTokenAsync(Docu

if (request.ResourceType.IsPartitioned())
{
(bool isSuccess, PartitionKeyRange partitionKeyRange) = await TryResolvePartitionKeyRangeAsync(request: request,
sessionContainer: sessionContainer,
partitionKeyRangeCache: partitionKeyRangeCache,
clientCollectionCache: clientCollectionCache,
refreshCache: false);
(bool isSuccess, PartitionKeyRange partitionKeyRange) = await TryResolvePartitionKeyRangeAsync(
request: request,
sessionContainer: sessionContainer,
partitionKeyRangeCache: partitionKeyRangeCache,
clientCollectionCache: clientCollectionCache,
refreshCache: false);

if (isSuccess && sessionContainer is SessionContainer gatewaySessionContainer)
{
Expand All @@ -313,11 +316,12 @@ internal static async Task<Tuple<bool, string>> TryResolveSessionTokenAsync(Docu
return new Tuple<bool, string>(false, null);
}

private static async Task<Tuple<bool, PartitionKeyRange>> TryResolvePartitionKeyRangeAsync(DocumentServiceRequest request,
ISessionContainer sessionContainer,
PartitionKeyRangeCache partitionKeyRangeCache,
ClientCollectionCache clientCollectionCache,
bool refreshCache)
private static async Task<Tuple<bool, PartitionKeyRange>> TryResolvePartitionKeyRangeAsync(
DocumentServiceRequest request,
ISessionContainer sessionContainer,
PartitionKeyRangeCache partitionKeyRangeCache,
CollectionCache clientCollectionCache,
bool refreshCache)
{
if (refreshCache)
{
Expand All @@ -326,39 +330,50 @@ private static async Task<Tuple<bool, PartitionKeyRange>> TryResolvePartitionKey
}

PartitionKeyRange partitonKeyRange = null;
ContainerProperties collection = await clientCollectionCache.ResolveCollectionAsync(request, CancellationToken.None, NoOpTrace.Singleton);
ContainerProperties collection = await clientCollectionCache.ResolveCollectionAsync(
request,
CancellationToken.None,
NoOpTrace.Singleton);

string partitionKeyString = request.Headers[HttpConstants.HttpHeaders.PartitionKey];
if (partitionKeyString != null)
{
CollectionRoutingMap collectionRoutingMap = await partitionKeyRangeCache.TryLookupAsync(collectionRid: collection.ResourceId,
previousValue: null,
request: request,
cancellationToken: CancellationToken.None,
NoOpTrace.Singleton);
CollectionRoutingMap collectionRoutingMap = await partitionKeyRangeCache.TryLookupAsync(
collectionRid: collection.ResourceId,
previousValue: null,
request: request,
cancellationToken: CancellationToken.None,
NoOpTrace.Singleton);

if (refreshCache && collectionRoutingMap != null)
{
collectionRoutingMap = await partitionKeyRangeCache.TryLookupAsync(collectionRid: collection.ResourceId,
previousValue: collectionRoutingMap,
request: request,
cancellationToken: CancellationToken.None,
NoOpTrace.Singleton);
collectionRoutingMap = await partitionKeyRangeCache.TryLookupAsync(
collectionRid: collection.ResourceId,
previousValue: collectionRoutingMap,
request: request,
cancellationToken: CancellationToken.None,
NoOpTrace.Singleton);
}

partitonKeyRange = AddressResolver.TryResolveServerPartitionByPartitionKey(request: request,
partitionKeyString: partitionKeyString,
collectionCacheUptoDate: false,
collection: collection,
routingMap: collectionRoutingMap);
partitonKeyRange = AddressResolver.TryResolveServerPartitionByPartitionKey(
request: request,
partitionKeyString: partitionKeyString,
collectionCacheUptoDate: false,
collection: collection,
routingMap: collectionRoutingMap);
}
else if (request.PartitionKeyRangeIdentity != null)
{
PartitionKeyRangeIdentity partitionKeyRangeId = request.PartitionKeyRangeIdentity;
partitonKeyRange = await partitionKeyRangeCache.TryGetPartitionKeyRangeByIdAsync(collection.ResourceId,
partitionKeyRangeId.ToString(),
NoOpTrace.Singleton,
refreshCache);
partitonKeyRange = await partitionKeyRangeCache.TryGetPartitionKeyRangeByIdAsync(
collection.ResourceId,
partitionKeyRangeId.PartitionKeyRangeId,
NoOpTrace.Singleton,
refreshCache);
}
else if (request.RequestContext.ResolvedPartitionKeyRange != null)
kirankumarkolli marked this conversation as resolved.
Show resolved Hide resolved
{
partitonKeyRange = request.RequestContext.ResolvedPartitionKeyRange;
}

if (partitonKeyRange == null)
Expand All @@ -369,11 +384,12 @@ private static async Task<Tuple<bool, PartitionKeyRange>> TryResolvePartitionKey
}

// need to refresh cache. Maybe split happened.
return await GatewayStoreModel.TryResolvePartitionKeyRangeAsync(request: request,
sessionContainer: sessionContainer,
partitionKeyRangeCache: partitionKeyRangeCache,
clientCollectionCache: clientCollectionCache,
refreshCache: true);
return await GatewayStoreModel.TryResolvePartitionKeyRangeAsync(
request: request,
sessionContainer: sessionContainer,
partitionKeyRangeCache: partitionKeyRangeCache,
clientCollectionCache: clientCollectionCache,
refreshCache: true);
}

return new Tuple<bool, PartitionKeyRange>(true, partitonKeyRange);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@ internal class PartitionKeyRangeCache : IRoutingMapProvider, ICollectionRoutingM
private readonly IStoreModel storeModel;
private readonly CollectionCache collectionCache;

public PartitionKeyRangeCache(IAuthorizationTokenProvider authorizationTokenProvider, IStoreModel storeModel, CollectionCache collectionCache)
public PartitionKeyRangeCache(
IAuthorizationTokenProvider authorizationTokenProvider,
IStoreModel storeModel,
CollectionCache collectionCache)
{
this.routingMapCache = new AsyncCache<string, CollectionRoutingMap>(
EqualityComparer<CollectionRoutingMap>.Default,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,31 @@ namespace Microsoft.Azure.Cosmos.SDK.EmulatorTests
using Microsoft.VisualStudio.TestTools.UnitTesting;

[TestClass]
public class GatewaySessionTokenTests : BaseCosmosClientHelper
public class GatewaySessionTokenTests
{
private CosmosClient cosmosClient;
private Cosmos.Database database;
private ContainerInternal Container = null;
private const string PartitionKey = "/pk";

[TestInitialize]
public async Task TestInitialize()
{
this.cosmosClient = TestCommon.CreateCosmosClient(useGateway: true);
CosmosClientOptions cosmosClientOptions = new CosmosClientOptions()
{
ConnectionMode = ConnectionMode.Gateway,
ConnectionProtocol = Documents.Client.Protocol.Https,
ConsistencyLevel = Cosmos.ConsistencyLevel.Session,
HttpClientFactory = () => new HttpClient(new HttpHandlerMetaDataValidator()),
};

this.cosmosClient = TestCommon.CreateCosmosClient(cosmosClientOptions);
this.database = await this.cosmosClient.CreateDatabaseAsync(
id: Guid.NewGuid().ToString());
ContainerResponse response = await this.database.CreateContainerAsync(
new ContainerProperties(id: Guid.NewGuid().ToString(), partitionKeyPath: PartitionKey),
throughput: 20000,
cancellationToken: this.cancellationToken);
cancellationToken: default);
Assert.IsNotNull(response);
Assert.IsNotNull(response.Container);
Assert.IsNotNull(response.Resource);
Expand All @@ -49,13 +59,24 @@ public async Task TestInitialize()
[TestCleanup]
public async Task Cleanup()
{
await base.TestCleanup();
if (this.cosmosClient != null)
{
if(this.database != null)
{
await this.database.DeleteStreamAsync();
}

this.cosmosClient.Dispose();
}
}

[TestMethod]
public async Task TestGatewayModelSession()
{
ContainerProperties containerProperties = await this.Container.GetCachedContainerPropertiesAsync(false, NoOpTrace.Singleton, CancellationToken.None);
ContainerProperties containerProperties = await this.Container.GetCachedContainerPropertiesAsync(
false,
Trace.GetRootTrace("Test"),
CancellationToken.None);

ISessionContainer sessionContainer = this.cosmosClient.DocumentClient.sessionContainer;
string docLink = "dbs/" + this.database.Id + "/colls/" + containerProperties.Id + "/docs/3";
Expand All @@ -80,14 +101,13 @@ await this.cosmosClient.DocumentClient.GetPartitionKeyRangeCacheAsync(NoOpTrace.
public async Task GatewaySameSessionTokenTest()
{
string createSessionToken = null;
GatewaySessionTokenTests.HttpClientHandlerHelper httpClientHandler = new HttpClientHandlerHelper
HttpHandlerMetaDataValidator httpClientHandler = new HttpHandlerMetaDataValidator
{
ResponseCallBack = (result) =>
RequestCallBack = (request, response) =>
{
HttpResponseMessage response = result.Result;
if (response.StatusCode != HttpStatusCode.Created)
{
return response;
return;
}
response.Headers.TryGetValues("x-ms-session-token", out IEnumerable<string> sessionTokens);
Expand All @@ -96,7 +116,6 @@ public async Task GatewaySameSessionTokenTest()
createSessionToken = singleToken;
break;
}
return response;
}
};

Expand Down Expand Up @@ -126,19 +145,5 @@ await client.DocumentClient.GetPartitionKeyRangeCacheAsync(NoOpTrace.Singleton),
Assert.AreEqual(readSessionToken, createSessionToken);
}
}

private class HttpClientHandlerHelper : DelegatingHandler
{
public HttpClientHandlerHelper() : base(new HttpClientHandler())
{
}

public Func<Task<HttpResponseMessage>, HttpResponseMessage> ResponseCallBack { get; set; }

protected override Task<HttpResponseMessage> SendAsync(HttpRequestMessage request, CancellationToken cancellationToken)
{
return base.SendAsync(request, cancellationToken).ContinueWith(this.ResponseCallBack);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ namespace Microsoft.Azure.Cosmos.SDK.EmulatorTests
using System.Collections.Generic;
using System.Collections.ObjectModel;
using System.Linq;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;
using Linq;
using Microsoft.Azure.Cosmos.Utils;
Expand All @@ -19,9 +21,6 @@ namespace Microsoft.Azure.Cosmos.SDK.EmulatorTests
[TestCategory("Emulator")]
public class SmokeTests
{
private static readonly string Host;
private static readonly string MasterKey;

private const string DatabaseName = "netcore_test_db";
private const string CollectionName = "netcore_test_coll";
private const string PartitionedCollectionName = "netcore_test_pcoll";
Expand All @@ -32,8 +31,6 @@ public class SmokeTests

static SmokeTests()
{
SmokeTests.MasterKey = ConfigurationManager.AppSettings["MasterKey"];
SmokeTests.Host = ConfigurationManager.AppSettings["GatewayEndpoint"];
}

/// <summary>
Expand Down Expand Up @@ -89,8 +86,6 @@ private async Task DocumentInsertsTest()
Database database = await this.client.CreateDatabaseIfNotExistsAsync(DatabaseName);
Container container = await this.CreatePartitionedCollectionIfNotExists(database, PartitionedCollectionName);

Uri documentCollectionUri = UriFactory.CreateDocumentCollectionUri(DatabaseName, PartitionedCollectionName);

for (int i = 0; i < 2; i++)
{
string id = i.ToString();
Expand Down Expand Up @@ -317,9 +312,14 @@ private async Task CreateDocumentCollectionIfNotExists()

private CosmosClient GetDocumentClient(ConnectionMode connectionMode, Documents.Client.Protocol protocol)
{
CosmosClientOptions connectionPolicy = new CosmosClientOptions() { ConnectionMode = connectionMode, ConnectionProtocol = protocol };
CosmosClientOptions connectionPolicy = new CosmosClientOptions()
{
ConnectionMode = connectionMode,
ConnectionProtocol = protocol,
ConsistencyLevel = ConsistencyLevel.Session,
};

return new CosmosClient(Host, MasterKey, connectionPolicy);
return TestCommon.CreateCosmosClient(connectionPolicy);
}

private async Task<Container> CreatePartitionedCollectionIfNotExists(Database database, string collectionName)
Expand Down
Loading