Skip to content

Commit

Permalink
Code changes to revert back the health check on LB Channel.
Browse files Browse the repository at this point in the history
  • Loading branch information
kundadebdatta committed Sep 2, 2023
1 parent c3154a0 commit 69f05ff
Show file tree
Hide file tree
Showing 5 changed files with 9 additions and 83 deletions.
8 changes: 0 additions & 8 deletions Microsoft.Azure.Cosmos/src/direct/Channel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -529,14 +529,6 @@ private static void HandleTaskTimeout(Task runawayTask, Guid activityId, Guid co
TaskContinuationOptions.OnlyOnFaulted);
}

/// <inheritdoc/>
public void SetHealthState(
bool isHealthy)
{
// No implementation is required since the channel health state is managed
// using the State enumeration.
}

private enum State
{
New,
Expand Down
4 changes: 1 addition & 3 deletions Microsoft.Azure.Cosmos/src/direct/ChannelDictionary.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,7 @@ public ChannelDictionary(ChannelProperties channelProperties)
/// <returns>An instance of <see cref="IChannel"/> containing the <see cref="LoadBalancingChannel"/>.</returns>
public IChannel GetChannel(
Uri requestUri,
bool localRegionRequest,
bool validationRequired = false)
bool localRegionRequest)
{
this.ThrowIfDisposed();
ServerKey key = new ServerKey(requestUri);
Expand All @@ -57,7 +56,6 @@ public IChannel GetChannel(
new Uri(requestUri.GetLeftPart(UriPartial.Authority)),
this.channelProperties,
localRegionRequest,
validationRequired,
this.singleLoadBalancedPartitionForTest);
if (this.channels.TryAdd(key, value))
{
Expand Down
7 changes: 0 additions & 7 deletions Microsoft.Azure.Cosmos/src/direct/IChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,6 @@ Task<StoreResponse> RequestAsync(
public Task OpenChannelAsync(
Guid activityId);

/// <summary>
/// Sets the health state.
/// </summary>
/// <param name="isHealthy">A boolean flag indicating the health state to be set.</param>
public void SetHealthState(
bool isHealthy);

bool Healthy { get; }

void Close();
Expand Down
58 changes: 6 additions & 52 deletions Microsoft.Azure.Cosmos/src/direct/LoadBalancingChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ namespace Microsoft.Azure.Documents.Rntbd
{
using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;

// LoadBalancingChannel encapsulates the management of channels that connect to a single
Expand All @@ -23,19 +22,13 @@ internal sealed class LoadBalancingChannel : IChannel, IDisposable

private bool disposed = false;

private readonly ReaderWriterLockSlim healthStateLock = new(LockRecursionPolicy.NoRecursion);

private volatile bool healthy;

public LoadBalancingChannel(
Uri serverUri,
ChannelProperties channelProperties,
bool localRegionRequest,
bool validationRequired = false,
LoadBalancingPartition singleLoadBalancedPartitionForTest = null)
{
this.serverUri = serverUri;
this.SetHealthState(!validationRequired);

if ((channelProperties.PartitionCount < 1) ||
(channelProperties.PartitionCount > 8))
Expand Down Expand Up @@ -93,30 +86,7 @@ public bool Healthy
get
{
this.ThrowIfDisposed();
this.healthStateLock.EnterReadLock();
try
{
return this.healthy;
}
finally
{
this.healthStateLock.ExitReadLock();
}
}
}

/// <inheritdoc/>
public void SetHealthState(
bool isHealthy)
{
this.healthStateLock.EnterWriteLock();
try
{
this.healthy = isHealthy;
}
finally
{
this.healthStateLock.ExitWriteLock();
return true;
}
}

Expand Down Expand Up @@ -152,24 +122,21 @@ public Task<StoreResponse> RequestAsync(
/// </summary>
/// <param name="activityId">An unique identifier indicating the current activity id.</param>
/// <returns>A completed task once the channel is opened.</returns>
public async Task OpenChannelAsync(
public Task OpenChannelAsync(
Guid activityId)
{
this.ThrowIfDisposed();
if (this.singlePartition != null)
{
Debug.Assert(this.partitions == null);
await this.OpenChannelToPartitionAsync(
partition: this.singlePartition,
activityId: activityId);
return this.singlePartition.OpenChannelAsync(activityId);
}
else
{
Debug.Assert(this.partitions != null);
LoadBalancingPartition partition = this.GetLoadBalancedPartition(activityId);
await this.OpenChannelToPartitionAsync(
partition: partition,
activityId: activityId);
return partition.OpenChannelAsync(
activityId);
}
}

Expand All @@ -183,18 +150,7 @@ private async Task OpenChannelToPartitionAsync(
LoadBalancingPartition partition,
Guid activityId)
{
try
{
await partition.OpenChannelAsync(activityId);
this.SetHealthState(
isHealthy: true);
}
catch (Exception)
{
this.SetHealthState(
isHealthy: false);
throw;
}
await partition.OpenChannelAsync(activityId);
}

/// <summary>
Expand Down Expand Up @@ -234,8 +190,6 @@ void IDisposable.Dispose()
this.partitions[i].Dispose();
}
}

this.healthStateLock.Dispose();
}

private void ThrowIfDisposed()
Expand Down
15 changes: 2 additions & 13 deletions Microsoft.Azure.Cosmos/src/direct/rntbd2/TransportClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -130,15 +130,14 @@ internal override async Task<StoreResponse> InvokeStoreAsync(
options: this.DistributedTracingOptions,
request: request);
#endif
IChannel channel = null;
try
{
TransportClient.IncrementCounters();

operation = "GetChannel";
// Treat all retries as out of region request for open timeout. This is to prevent too many retries because of the shorter time duration.
bool localRegionRequest = request.RequestContext.IsRetry ? false : request.RequestContext.LocalRegionRequest;
channel = this.channelDictionary.GetChannel(physicalAddress.Uri, localRegionRequest);
IChannel channel = this.channelDictionary.GetChannel(physicalAddress.Uri, localRegionRequest);

TransportClient.GetTransportPerformanceCounters().IncrementRntbdRequestCount(resourceOperation.resourceType, resourceOperation.operationType);

Expand Down Expand Up @@ -190,9 +189,6 @@ internal override async Task<StoreResponse> InvokeStoreAsync(
operation, request.ResourceAddress, request.ResourceType,
resourceOperation, physicalAddress, ex);

channel?.SetHealthState(
isHealthy: false);

if (request.IsReadOnlyRequest)
{
DefaultTrace.TraceInformation("Converting to Gone (read-only request)");
Expand Down Expand Up @@ -234,9 +230,6 @@ internal override async Task<StoreResponse> InvokeStoreAsync(
}
catch (DocumentClientException ex)
{
channel?.SetHealthState(
isHealthy: false);

transportResponseStatusCode = (int)TransportResponseStatusCode.DocumentClientException;
DefaultTrace.TraceInformation("{0} failed: RID: {1}, Resource Type: {2}, Op: {3}, Address: {4}, " +
"Exception: {5}", operation, request.ResourceAddress, request.ResourceType, resourceOperation,
Expand All @@ -250,9 +243,6 @@ internal override async Task<StoreResponse> InvokeStoreAsync(
}
catch (Exception ex)
{
channel?.SetHealthState(
isHealthy: false);

transportResponseStatusCode = (int)TransportResponseStatusCode.UnknownException;
DefaultTrace.TraceInformation("{0} failed: RID: {1}, Resource Type: {2}, Op: {3}, Address: {4}, " +
"Exception: {5}", operation, request.ResourceAddress, request.ResourceType, resourceOperation,
Expand Down Expand Up @@ -353,8 +343,7 @@ internal override Task OpenConnectionAsync(
{
IChannel channel = this.channelDictionary.GetChannel(
requestUri: physicalAddress,
localRegionRequest: false,
validationRequired: true);
localRegionRequest: false);

return channel.Healthy
? Task.FromResult(0)
Expand Down

0 comments on commit 69f05ff

Please sign in to comment.