Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Session: Fixes NotFound/ReadSessionNotAvailable (404/1002) errors due to inconsistencies on internal caches on collection-recreate scenario for query-only workloads #3119

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Fixing 404/1002 due to Dictionaries used for CollectionName->RID look…
…up in SessionContainer and CollectionCache getting out-of-sync
  • Loading branch information
FabianMeiswinkel committed Mar 29, 2022
commit 57ea799f30513b47df553af858ec2e358c8fea94
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ public async Task<IReadOnlyList<FeedRange>> GetFeedRangesAsync(
}

partitionKeyRanges = await partitionKeyRangeCache.TryGetOverlappingRangesAsync(
containerRId,
refreshedContainerRId,
ContainerCore.allRanges,
trace,
forceRefresh: true);
Expand Down
104 changes: 91 additions & 13 deletions Microsoft.Azure.Cosmos/src/Routing/ClientCollectionCache.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,11 @@ internal class ClientCollectionCache : CollectionCache
private readonly IRetryPolicyFactory retryPolicy;
private readonly ISessionContainer sessionContainer;

public ClientCollectionCache(ISessionContainer sessionContainer, IStoreModel storeModel, ICosmosAuthorizationTokenProvider tokenProvider, IRetryPolicyFactory retryPolicy)
public ClientCollectionCache(
ISessionContainer sessionContainer,
IStoreModel storeModel,
ICosmosAuthorizationTokenProvider tokenProvider,
IRetryPolicyFactory retryPolicy)
{
this.storeModel = storeModel ?? throw new ArgumentNullException("storeModel");
this.tokenProvider = tokenProvider;
Expand All @@ -38,9 +42,15 @@ protected override Task<ContainerProperties> GetByRidAsync(string apiVersion,
CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();
IDocumentClientRetryPolicy retryPolicyInstance = new ClearingSessionContainerClientRetryPolicy(this.sessionContainer, this.retryPolicy.GetRequestPolicy());
IDocumentClientRetryPolicy retryPolicyInstance = new ClearingSessionContainerClientRetryPolicy(
this.sessionContainer, this.retryPolicy.GetRequestPolicy());
return TaskHelper.InlineIfPossible(
() => this.ReadCollectionAsync(PathsHelper.GeneratePath(ResourceType.Collection, collectionRid, false), retryPolicyInstance, trace, clientSideRequestStatistics, cancellationToken),
() => this.ReadCollectionAsync(
FabianMeiswinkel marked this conversation as resolved.
Show resolved Hide resolved
PathsHelper.GeneratePath(ResourceType.Collection, collectionRid, false),
retryPolicyInstance,
trace,
clientSideRequestStatistics,
cancellationToken),
retryPolicyInstance,
cancellationToken);
}
Expand All @@ -52,18 +62,82 @@ protected override Task<ContainerProperties> GetByNameAsync(string apiVersion,
CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();
IDocumentClientRetryPolicy retryPolicyInstance = new ClearingSessionContainerClientRetryPolicy(this.sessionContainer, this.retryPolicy.GetRequestPolicy());
IDocumentClientRetryPolicy retryPolicyInstance = new ClearingSessionContainerClientRetryPolicy(
this.sessionContainer, this.retryPolicy.GetRequestPolicy());
return TaskHelper.InlineIfPossible(
() => this.ReadCollectionAsync(resourceAddress, retryPolicyInstance, trace, clientSideRequestStatistics, cancellationToken),
() => this.ReadCollectionAsync(
resourceAddress, retryPolicyInstance, trace, clientSideRequestStatistics, cancellationToken),
retryPolicyInstance,
cancellationToken);
}

private async Task<ContainerProperties> ReadCollectionAsync(string collectionLink,
IDocumentClientRetryPolicy retryPolicyInstance,
ITrace trace,
IClientSideRequestStatistics clientSideRequestStatistics,
CancellationToken cancellationToken)
internal override Task<ContainerProperties> ResolveByNameAsync(
string apiVersion,
string resourceAddress,
bool forceRefesh,
FabianMeiswinkel marked this conversation as resolved.
Show resolved Hide resolved
ITrace trace,
IClientSideRequestStatistics clientSideRequestStatistics,
CancellationToken cancellationToken)
{
if (forceRefesh && this.sessionContainer != null)
{
string resourceFullName = PathsHelper.GetCollectionPath(resourceAddress);
this.sessionContainer.ClearTokenByCollectionFullname(resourceFullName);
FabianMeiswinkel marked this conversation as resolved.
Show resolved Hide resolved
}

return TaskHelper.InlineIfPossible(
() => base.ResolveByNameAsync(
apiVersion, resourceAddress, forceRefesh, trace, clientSideRequestStatistics, cancellationToken),
retryPolicy: null,
cancellationToken);
}

public override Task<ContainerProperties> ResolveCollectionAsync(
DocumentServiceRequest request, CancellationToken cancellationToken, ITrace trace)
{
return TaskHelper.InlineIfPossible(
() => this.ResolveCollectionWithSessionContainerCleanupAsync(
request,
() => base.ResolveCollectionAsync(request, cancellationToken, trace)),
retryPolicy: null,
cancellationToken);
}

public override Task<ContainerProperties> ResolveCollectionAsync(
DocumentServiceRequest request, TimeSpan refreshAfter, CancellationToken cancellationToken, ITrace trace)
{
return TaskHelper.InlineIfPossible(
FabianMeiswinkel marked this conversation as resolved.
Show resolved Hide resolved
() => this.ResolveCollectionWithSessionContainerCleanupAsync(
request,
() => base.ResolveCollectionAsync(request, refreshAfter, cancellationToken, trace)),
retryPolicy: null,
cancellationToken);
}

private async Task<ContainerProperties> ResolveCollectionWithSessionContainerCleanupAsync(
FabianMeiswinkel marked this conversation as resolved.
Show resolved Hide resolved
DocumentServiceRequest request,
Func<Task<ContainerProperties>> resolveContainerProvider)
{
string previouslyResolvedCollectionRid = request?.RequestContext?.ResolvedCollectionRid;

ContainerProperties properties = await resolveContainerProvider();

if (this.sessionContainer != null &&
previouslyResolvedCollectionRid != null &&
previouslyResolvedCollectionRid != properties.ResourceId)
{
this.sessionContainer.ClearTokenByResourceId(previouslyResolvedCollectionRid);
}

return properties;
}

private async Task<ContainerProperties> ReadCollectionAsync(
string collectionLink,
IDocumentClientRetryPolicy retryPolicyInstance,
ITrace trace,
IClientSideRequestStatistics clientSideRequestStatistics,
CancellationToken cancellationToken)
{
using (ITrace childTrace = trace.StartChild("Read Collection", TraceComponent.Transport, TraceLevel.Info))
{
Expand All @@ -78,10 +152,13 @@ private async Task<ContainerProperties> ReadCollectionAsync(string collectionLin
{
request.Headers[HttpConstants.HttpHeaders.XDate] = DateTime.UtcNow.ToString("r");

request.RequestContext.ClientRequestStatistics = clientSideRequestStatistics ?? new ClientSideRequestStatisticsTraceDatum(DateTime.UtcNow);
request.RequestContext.ClientRequestStatistics =
clientSideRequestStatistics ?? new ClientSideRequestStatisticsTraceDatum(DateTime.UtcNow);
if (clientSideRequestStatistics == null)
{
childTrace.AddDatum("Client Side Request Stats", request.RequestContext.ClientRequestStatistics);
childTrace.AddDatum(
"Client Side Request Stats",
request.RequestContext.ClientRequestStatistics);
}

string authorizationToken = await this.tokenProvider.GetUserAuthorizationTokenAsync(
Expand All @@ -100,7 +177,8 @@ private async Task<ContainerProperties> ReadCollectionAsync(string collectionLin

try
{
using (DocumentServiceResponse response = await this.storeModel.ProcessMessageAsync(request))
using (DocumentServiceResponse response =
await this.storeModel.ProcessMessageAsync(request))
{
return CosmosResource.FromStream<ContainerProperties>(response);
}
Expand Down
Loading