Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Event Hubs Client] Storage Implementation Feedback #18439

Merged
merged 1 commit into from
Feb 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,6 @@ internal partial class BlobsCheckpointStore : StorageManager
private static readonly string BlobsResourceDoesNotExist = "The Azure Storage Blobs container or blob used by the Event Processor Client does not exist.";
#pragma warning restore CA1810

/// <summary>A regular expression used to capture strings enclosed in double quotes.</summary>
private static readonly Regex DoubleQuotesExpression = new Regex("\"(.*)\"", RegexOptions.Compiled);

/// <summary>An ETag value to be used for permissive matching when querying Storage.</summary>
private static readonly ETag IfNoneMatchAllTag = new ETag("*");

Expand Down Expand Up @@ -126,7 +123,7 @@ public override async Task<IEnumerable<EventProcessorPartitionOwnership>> ListOw
cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
ListOwnershipStart(fullyQualifiedNamespace, eventHubName, consumerGroup);

List<EventProcessorPartitionOwnership> result = new List<EventProcessorPartitionOwnership>();
var result = new List<EventProcessorPartitionOwnership>();

try
{
Expand Down Expand Up @@ -156,7 +153,7 @@ public override async Task<IEnumerable<EventProcessorPartitionOwnership>> ListOw
catch (RequestFailedException ex) when (ex.ErrorCode == BlobErrorCode.ContainerNotFound)
{
ListOwnershipError(fullyQualifiedNamespace, eventHubName, consumerGroup, ex);
throw new RequestFailedException(BlobsResourceDoesNotExist);
throw new RequestFailedException(BlobsResourceDoesNotExist, ex);
}
finally
{
Expand Down Expand Up @@ -231,14 +228,9 @@ public override async Task<IEnumerable<EventProcessorPartitionOwnership>> ClaimO
}

// Small workaround to retrieve the eTag. The current storage SDK returns it enclosed in
// double quotes ('"ETAG_VALUE"' instead of 'ETAG_VALUE').
// double quotes ("ETAG_VALUE" instead of ETAG_VALUE).

var match = DoubleQuotesExpression.Match(ownership.Version);

if (match.Success)
{
ownership.Version = match.Groups[1].ToString();
}
ownership.Version = ownership.Version?.Trim('"');

claimedOwnership.Add(ownership);
OwnershipClaimed(ownership.PartitionId, ownership.FullyQualifiedNamespace, ownership.EventHubName, ownership.ConsumerGroup, ownership.OwnerIdentifier);
Expand All @@ -250,7 +242,7 @@ public override async Task<IEnumerable<EventProcessorPartitionOwnership>> ClaimO
catch (RequestFailedException ex) when (ex.ErrorCode == BlobErrorCode.ContainerNotFound || ex.ErrorCode == BlobErrorCode.BlobNotFound)
{
ClaimOwnershipError(ownership.PartitionId, ownership.FullyQualifiedNamespace, ownership.EventHubName, ownership.ConsumerGroup, ownership.OwnerIdentifier, ex);
throw new RequestFailedException(BlobsResourceDoesNotExist);
throw new RequestFailedException(BlobsResourceDoesNotExist, ex);
}
catch (Exception ex)
{
Expand Down Expand Up @@ -303,20 +295,23 @@ public override async Task<IEnumerable<EventProcessorCheckpoint>> ListCheckpoint

if (InitializeWithLegacyCheckpoints)
{
// Legacy checkpoints are not normalized to lowercase
// Legacy checkpoints are not normalized to lowercase.

var legacyPrefix = string.Format(CultureInfo.InvariantCulture, FunctionsLegacyCheckpointPrefix, fullyQualifiedNamespace, eventHubName, consumerGroup);

await foreach (BlobItem blob in ContainerClient.GetBlobsAsync(prefix: legacyPrefix, cancellationToken: cancellationToken).ConfigureAwait(false))
{
// Skip new checkpoints and empty blobs
// Skip new checkpoints and empty blobs.

if (blob.Properties.ContentLength == 0)
{
continue;
}

var partitionId = blob.Name.Substring(legacyPrefix.Length);

// Check whether there is already a checkpoint for this partition id
// Check whether there is already a checkpoint for this partition id.

if (checkpoints.Any(existingCheckpoint => string.Equals(existingCheckpoint.PartitionId, partitionId, StringComparison.Ordinal)))
{
continue;
Expand All @@ -335,7 +330,7 @@ public override async Task<IEnumerable<EventProcessorCheckpoint>> ListCheckpoint
catch (RequestFailedException ex) when (ex.ErrorCode == BlobErrorCode.ContainerNotFound)
{
ListCheckpointsError(fullyQualifiedNamespace, eventHubName, consumerGroup, ex);
throw new RequestFailedException(BlobsResourceDoesNotExist);
throw new RequestFailedException(BlobsResourceDoesNotExist, ex);
}
catch (Exception ex)
{
Expand Down Expand Up @@ -502,7 +497,8 @@ private async Task<EventProcessorCheckpoint> CreateLegacyCheckpoint(string fully
}
else
{
// Skip checkpoints without an offset without logging an error
// Skip checkpoints without an offset without logging an error.

return null;
}
}
Expand Down Expand Up @@ -563,14 +559,15 @@ public override async Task UpdateCheckpointAsync(EventProcessorCheckpoint checkp
catch (RequestFailedException ex) when ((ex.ErrorCode == BlobErrorCode.BlobNotFound) || (ex.ErrorCode == BlobErrorCode.ContainerNotFound))
{
// If the blob wasn't present, fall-back to trying to create a new one.

using var blobContent = new MemoryStream(Array.Empty<byte>());
await blobClient.UploadAsync(blobContent, metadata: metadata, cancellationToken: cancellationToken).ConfigureAwait(false);
}
}
catch (RequestFailedException ex) when (ex.ErrorCode == BlobErrorCode.ContainerNotFound)
{
UpdateCheckpointError(checkpoint.PartitionId, checkpoint.FullyQualifiedNamespace, checkpoint.EventHubName, checkpoint.ConsumerGroup, ex);
throw new RequestFailedException(BlobsResourceDoesNotExist);
throw new RequestFailedException(BlobsResourceDoesNotExist, ex);
}
catch (Exception ex)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,10 +179,16 @@ public virtual async ValueTask<EventProcessorPartitionOwnership> RunLoadBalancin
.ConfigureAwait(false))
.ToList();
}
catch (TaskCanceledException)
{
throw;
}
catch (OperationCanceledException)
{
throw new TaskCanceledException();
}
catch (Exception ex)
{
cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();

// If ownership list retrieval fails, give up on the current cycle. There's nothing more we can do
// without an updated ownership list. Set the EventHubName to null so it doesn't modify the exception
// message. This exception message is used so the processor can retrieve the raw Operation string, and
Expand All @@ -198,14 +204,12 @@ public virtual async ValueTask<EventProcessorPartitionOwnership> RunLoadBalancin
return default;
}

var unclaimedPartitions = new HashSet<string>(partitionIds);

// Create a partition distribution dictionary from the complete ownership list we have, mapping an owner's identifier to the list of
// partitions it owns. When an event processor goes down and it has only expired ownership, it will not be taken into consideration
// by others. The expiration time defaults to 30 seconds, but it may be overridden by a derived class.

var unclaimedPartitions = new HashSet<string>(partitionIds);
var utcNow = GetDateTimeOffsetNow();

var activeOwnership = default(EventProcessorPartitionOwnership);

ActiveOwnershipWithDistribution.Clear();
Expand Down Expand Up @@ -290,7 +294,6 @@ public virtual async Task RelinquishOwnershipAsync(CancellationToken cancellatio
});

await StorageManager.ClaimOwnershipAsync(ownershipToRelinquish, cancellationToken).ConfigureAwait(false);

InstanceOwnership.Clear();
}

Expand Down Expand Up @@ -422,7 +425,7 @@ public virtual async Task RelinquishOwnershipAsync(CancellationToken cancellatio

// No ownership has been claimed.

return new ValueTask<(bool, EventProcessorPartitionOwnership)>((false, default(EventProcessorPartitionOwnership)));
return new ValueTask<(bool, EventProcessorPartitionOwnership)>((false, default));
}

/// <summary>
Expand Down