Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade Resiliency: Adds Implementation for Validating the Unhealthy Backend Replicas in Direct mode #3631

Merged
93 changes: 69 additions & 24 deletions Microsoft.Azure.Cosmos/src/Routing/AsyncCacheNonBlocking.cs
Original file line number Diff line number Diff line change
Expand Up @@ -120,30 +120,11 @@ public async Task<TValue> GetAsync(
throw;
}

try
{
return await initialLazyValue.CreateAndWaitForBackgroundRefreshTaskAsync(
createRefreshTask: singleValueInitFunc);
}
catch (Exception e)
{
if (initialLazyValue.ShouldRemoveFromCacheThreadSafe())
{
DefaultTrace.TraceError(
"AsyncCacheNonBlocking.GetAsync with ForceRefresh Failed. key: {0}, Exception: {1}",
key,
e);

// In some scenarios when a background failure occurs like a 404
// the initial cache value should be removed.
if (this.removeFromCacheOnBackgroundRefreshException(e))
{
this.TryRemove(key);
}
}

throw;
}
return await this.UpdateCacheAndGetValueFromBackgroundTaskAsync(
key: key,
initialValue: initialLazyValue,
callbackDelegate: singleValueInitFunc,
operationName: nameof(GetAsync));
}

// The AsyncLazyWithRefreshTask is lazy and won't create the task until GetValue is called.
Expand Down Expand Up @@ -196,6 +177,70 @@ public bool TryRemove(TKey key)
return this.values.TryRemove(key, out _);
}

/// <summary>
/// Refreshes the async non blocking cache on-demand for the given <paramref name="key"/>
/// and caches the result for later usage.
/// </summary>
/// <param name="key">The requested key to be refreshed.</param>
/// <param name="singleValueInitFunc">A func delegate to be invoked at a later point of time.</param>
public async Task RefreshAsync(
ealsur marked this conversation as resolved.
Show resolved Hide resolved
TKey key,
kundadebdatta marked this conversation as resolved.
Show resolved Hide resolved
Func<TValue, Task<TValue>> singleValueInitFunc)
{
if (this.values.TryGetValue(key, out AsyncLazyWithRefreshTask<TValue> initialLazyValue))
{
await this.UpdateCacheAndGetValueFromBackgroundTaskAsync(
key: key,
initialValue: initialLazyValue,
callbackDelegate: singleValueInitFunc,
operationName: nameof(RefreshAsync));
}
}

/// <summary>
/// Creates a background task to invoke the callback delegate and updates the cache with the value returned from the delegate.
/// </summary>
/// <param name="key">The requested key to be updated.</param>
/// <param name="initialValue">An instance of <see cref="AsyncLazyWithRefreshTask{T}"/> containing the initial cached value.</param>
/// <param name="callbackDelegate">A func callback delegate to be invoked at a later point of time.</param>
/// <param name="operationName">A string indicating the operation on the cache.</param>
/// <returns>A <see cref="Task{TValue}"/> containing the updated, refreshed value.</returns>
private async Task<TValue> UpdateCacheAndGetValueFromBackgroundTaskAsync(
TKey key,
AsyncLazyWithRefreshTask<TValue> initialValue,
Func<TValue, Task<TValue>> callbackDelegate,
string operationName)
{
try
{
return await initialValue.CreateAndWaitForBackgroundRefreshTaskAsync(
createRefreshTask: callbackDelegate);
}
catch (Exception ex)
{
if (initialValue.ShouldRemoveFromCacheThreadSafe())
{
bool removed = false;

// In some scenarios when a background failure occurs like a 404
// the initial cache value should be removed.
if (this.removeFromCacheOnBackgroundRefreshException(ex))
{
removed = this.TryRemove(key);
}

DefaultTrace.TraceError(
"AsyncCacheNonBlocking Failed. key: {0}, operation: {1}, tryRemoved: {2}, Exception: {3}",
key,
operationName,
removed,
ex);
}

throw;
}
}

/// <summary>
/// This is AsyncLazy that has an additional Task that can
/// be used to update the value. This allows concurrent requests
Expand Down
154 changes: 154 additions & 0 deletions Microsoft.Azure.Cosmos/src/Routing/GatewayAddressCache.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ internal class GatewayAddressCache : IAddressCache, IDisposable
private readonly bool enableTcpConnectionEndpointRediscovery;

private readonly CosmosHttpClient httpClient;
private readonly bool isReplicaAddressValidationEnabled;

private Tuple<PartitionKeyRangeIdentity, PartitionAddressInformation> masterPartitionAddressCache;
private DateTime suboptimalMasterPartitionTimestamp;
Expand Down Expand Up @@ -84,10 +85,27 @@ public GatewayAddressCache(
GatewayAddressCache.ProtocolString(this.protocol));

this.openConnectionsHandler = openConnectionsHandler;
this.isReplicaAddressValidationEnabled = Helpers.GetEnvironmentVariableAsBool(
name: Constants.EnvironmentVariables.ReplicaConnectivityValidationEnabled,
defaultValue: false);
}

public Uri ServiceEndpoint => this.serviceEndpoint;

/// <summary>
/// Gets the address information from the gateway and sets them into the async non blocking cache for later lookup.
/// Additionally attempts to establish Rntbd connections to the backend replicas based on `shouldOpenRntbdChannels`
/// boolean flag.
/// </summary>
/// <param name="databaseName">A string containing the database name.</param>
/// <param name="collection">An instance of <see cref="ContainerProperties"/> containing the collection properties.</param>
/// <param name="partitionKeyRangeIdentities">A read only list containing the partition key range identities.</param>
/// <param name="shouldOpenRntbdChannels">A boolean flag indicating whether Rntbd connections are required to be established
/// to the backend replica nodes. For cosmos client initialization and cache warmups, the Rntbd connection are needed to be
/// openned deterministically to the backend replicas to reduce latency, thus the <paramref name="shouldOpenRntbdChannels"/>
/// should be set to `true` during cosmos client initialization and cache warmups. The OpenAsync flow from DocumentClient
/// doesn't require the connections to be opened deterministically thus should set the parameter to `false`.</param>
/// <param name="cancellationToken">An instance of <see cref="CancellationToken"/>.</param>
public async Task OpenConnectionsAsync(
string databaseName,
ContainerProperties collection,
Expand Down Expand Up @@ -161,6 +179,10 @@ public async Task OpenConnectionsAsync(
new PartitionKeyRangeIdentity(collection.ResourceId, addressInfo.Item1.PartitionKeyRangeId),
addressInfo.Item2);

// The `shouldOpenRntbdChannels` boolean flag indicates whether the SDK should establish Rntbd connections to the
// backend replica nodes. For the `CosmosClient.CreateAndInitializeAsync()` flow, the flag should be passed as
// `true` so that the Rntbd connections to the backend replicas could be established deterministically. For any
// other flow, the flag should be passed as `false`.
if (this.openConnectionsHandler != null && shouldOpenRntbdChannels)
{
await this.openConnectionsHandler
Expand All @@ -178,6 +200,7 @@ public void SetOpenConnectionsHandler(IOpenConnectionsHandler openConnectionsHan
this.openConnectionsHandler = openConnectionsHandler;
}

/// <inheritdoc/>
public async Task<PartitionAddressInformation> TryGetAddressesAsync(
DocumentServiceRequest request,
PartitionKeyRangeIdentity partitionKeyRangeIdentity,
Expand Down Expand Up @@ -229,6 +252,7 @@ public async Task<PartitionAddressInformation> TryGetAddressesAsync(

return this.GetAddressesForRangeIdAsync(
request,
cachedAddresses: currentCachedValue,
kundadebdatta marked this conversation as resolved.
Show resolved Hide resolved
partitionKeyRangeIdentity.CollectionRid,
partitionKeyRangeIdentity.PartitionKeyRangeId,
forceRefresh: forceRefreshPartitionAddresses);
Expand Down Expand Up @@ -259,6 +283,7 @@ public async Task<PartitionAddressInformation> TryGetAddressesAsync(
key: partitionKeyRangeIdentity,
singleValueInitFunc: (_) => this.GetAddressesForRangeIdAsync(
request,
cachedAddresses: null,
partitionKeyRangeIdentity.CollectionRid,
partitionKeyRangeIdentity.PartitionKeyRangeId,
forceRefresh: false),
Expand All @@ -278,6 +303,27 @@ public async Task<PartitionAddressInformation> TryGetAddressesAsync(
this.suboptimalServerPartitionTimestamps.TryAdd(partitionKeyRangeIdentity, DateTime.UtcNow);
}

// Refresh the cache on-demand, if there were some address that remained as unhealthy long enough (more than 1 minute)
// and need to revalidate its status. The reason it is not dependent on 410 to force refresh the addresses, is being:
// When an address is marked as unhealthy, then the address enumerator will deprioritize it and move it back to the
// end of the transport uris list. Therefore, it could happen that no request will land on the unhealthy address for
// an extended period of time therefore, the chances of 410 (Gone Exception) to trigger the forceRefresh workflow may
// not happen for that particular replica.
if (addresses
kundadebdatta marked this conversation as resolved.
Show resolved Hide resolved
.Get(Protocol.Tcp)
.ReplicaTransportAddressUris
.Any(x => x.ShouldRefreshHealthStatus()))
{
Task refreshAddressesInBackgroundTask = Task.Run(async () => await this.serverPartitionAddressCache.RefreshAsync(
key: partitionKeyRangeIdentity,
singleValueInitFunc: (currentCachedValue) => this.GetAddressesForRangeIdAsync(
request,
cachedAddresses: currentCachedValue,
partitionKeyRangeIdentity.CollectionRid,
partitionKeyRangeIdentity.PartitionKeyRangeId,
forceRefresh: true)));
}

return addresses;
}
catch (DocumentClientException ex)
Expand Down Expand Up @@ -384,6 +430,7 @@ public async Task<PartitionAddressInformation> UpdateAsync(
key: partitionKeyRangeIdentity,
singleValueInitFunc: (_) => this.GetAddressesForRangeIdAsync(
null,
cachedAddresses: null,
partitionKeyRangeIdentity.CollectionRid,
partitionKeyRangeIdentity.PartitionKeyRangeId,
forceRefresh: true),
Expand Down Expand Up @@ -444,6 +491,7 @@ private async Task<Tuple<PartitionKeyRangeIdentity, PartitionAddressInformation>

private async Task<PartitionAddressInformation> GetAddressesForRangeIdAsync(
DocumentServiceRequest request,
PartitionAddressInformation cachedAddresses,
string collectionRid,
string partitionKeyRangeId,
bool forceRefresh)
Expand Down Expand Up @@ -475,6 +523,32 @@ await this.GetServerAddressesViaGatewayAsync(request, collectionRid, new[] { par
throw new PartitionKeyRangeGoneException(errorMessage) { ResourceAddress = collectionRid };
}

if (this.isReplicaAddressValidationEnabled)
kundadebdatta marked this conversation as resolved.
Show resolved Hide resolved
{
// The purpose of this step is to merge the new transport addresses with the old one. What this means is -
// 1. If a newly returned address from gateway is already a part of the cache, then restore the health state
// of the new address with that of the cached one.
// 2. If a newly returned address from gateway doesn't exist in the cache, then keep using the new address
// with `Unknown` (initial) status.
PartitionAddressInformation mergedAddresses = GatewayAddressCache.MergeAddresses(result.Item2, cachedAddresses);
IReadOnlyList<TransportAddressUri> transportAddressUris = mergedAddresses.Get(Protocol.Tcp)?.ReplicaTransportAddressUris;

// If cachedAddresses are null, that would mean that the returned address from gateway would remain in Unknown
// status and there is no cached state that could transition them into Unhealthy.
if (cachedAddresses != null)
{
foreach (TransportAddressUri address in transportAddressUris)
{
// The main purpose for this step is to move address health status from Unhealthy to UnhealthyPending.
address.SetRefreshedIfUnhealthy();
}
}

this.ValidateUnhealthyPendingReplicas(transportAddressUris);

return mergedAddresses;
}

return result.Item2;
}
}
Expand Down Expand Up @@ -760,6 +834,86 @@ await this.GetServerAddressesViaGatewayAsync(
}
}

/// <summary>
/// Validates the unhealthy pending replicas by attempting to open the Rntbd connection. This operation
/// will eventually marks the unhealthy pending replicas to healthy, if the rntbd connection attempt made was
/// successful or unhealthy otherwise.
/// </summary>
/// <param name="addresses">A read-only list of <see cref="TransportAddressUri"/> needs to be validated.</param>
private void ValidateUnhealthyPendingReplicas(
IReadOnlyList<TransportAddressUri> addresses)
{
if (addresses == null)
{
throw new ArgumentNullException(nameof(addresses));
}

IEnumerable<TransportAddressUri> addressesNeedToValidation = addresses
.Where(address => address
.GetCurrentHealthState()
.GetHealthStatus() == TransportAddressHealthState.HealthStatus.UnhealthyPending);

if (addressesNeedToValidation.Any())
{
Task openConnectionsInBackgroundTask = Task.Run(async () => await this.openConnectionsHandler.TryOpenRntbdChannelsAsync(
kundadebdatta marked this conversation as resolved.
Show resolved Hide resolved
addresses: addressesNeedToValidation.ToList()));
ealsur marked this conversation as resolved.
Show resolved Hide resolved
}
}

/// <summary>
/// Merge the new addresses returned from gateway service with that of the cached addresses. If the returned
/// new addresses list contains some of the addresses, which are already cached, then reset the health state
/// of the new address to that of the cached one. If the the new addresses doesn't contain any of the cached
/// addresses, then keep using the health state of the new addresses, which should be `unknown`.
/// </summary>
/// <param name="newAddresses">A list of <see cref="PartitionAddressInformation"/> containing the latest
/// addresses being returned from gateway.</param>
/// <param name="cachedAddresses">A list of <see cref="PartitionAddressInformation"/> containing the cached
/// addresses from the async non blocking cache.</param>
/// <returns>A list of <see cref="PartitionAddressInformation"/> containing the merged addresses.</returns>
private static PartitionAddressInformation MergeAddresses(
PartitionAddressInformation newAddresses,
PartitionAddressInformation cachedAddresses)
{
if (newAddresses == null)
{
throw new ArgumentNullException(nameof(newAddresses));
}

if (cachedAddresses == null)
{
return newAddresses;
}

PerProtocolPartitionAddressInformation currentAddressInfo = newAddresses.Get(Protocol.Tcp);
PerProtocolPartitionAddressInformation cachedAddressInfo = cachedAddresses.Get(Protocol.Tcp);
Dictionary<string, TransportAddressUri> cachedAddressDict = new ();

foreach (TransportAddressUri transportAddressUri in cachedAddressInfo.ReplicaTransportAddressUris)
{
cachedAddressDict[transportAddressUri.ToString()] = transportAddressUri;
xinlian12 marked this conversation as resolved.
Show resolved Hide resolved
}

foreach (TransportAddressUri transportAddressUri in currentAddressInfo.ReplicaTransportAddressUris)
{
if (cachedAddressDict.ContainsKey(transportAddressUri.ToString()))
{
TransportAddressUri cachedTransportAddressUri = cachedAddressDict[transportAddressUri.ToString()];
transportAddressUri.ResetHealthStatus(
status: cachedTransportAddressUri.GetCurrentHealthState().GetHealthStatus(),
lastUnknownTimestamp: cachedTransportAddressUri.GetCurrentHealthState().GetLastKnownTimestampByHealthStatus(
healthStatus: TransportAddressHealthState.HealthStatus.Unknown),
lastUnhealthyPendingTimestamp: cachedTransportAddressUri.GetCurrentHealthState().GetLastKnownTimestampByHealthStatus(
healthStatus: TransportAddressHealthState.HealthStatus.UnhealthyPending),
lastUnhealthyTimestamp: cachedTransportAddressUri.GetCurrentHealthState().GetLastKnownTimestampByHealthStatus(
healthStatus: TransportAddressHealthState.HealthStatus.Unhealthy));

}
}

return newAddresses;
}

protected virtual void Dispose(bool disposing)
{
if (this.disposedValue)
Expand Down
Loading