Skip to content

Commit

Permalink
[Event Hubs Client] Minor Improvements (Azure#15654)
Browse files Browse the repository at this point in the history
The focus of these changes is to make several minor improvements, which
are largely unrelated to one another.  These include:

- Documentation for the expected delay when stopping an Event Processor
- Better documentation for client lifetimes and cachability
- Additional test scenarios for the Idempotent Producer
- Fixing a CODEOWNERS typo
  • Loading branch information
jsquire authored and annelo-msft committed Feb 17, 2021
1 parent 2581b50 commit 23e4a3b
Show file tree
Hide file tree
Showing 7 changed files with 129 additions and 15 deletions.
2 changes: 1 addition & 1 deletion .github/CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@
/sdk/keyvault/ @schaabs @heaths

# PRLabel: %Cognitive - Metrics Advisor
sdk/metricsadvisor/ @kinelski
/sdk/metricsadvisor/ @kinelski

# PRLabel: %Search
/sdk/search/ @brjohnstmsft @arv100kri @bleroy @tg-msft @heaths
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,13 @@ namespace Azure.Messaging.EventHubs
    ///   group pairing to share work by using a common storage platform to communicate.  Fault tolerance is also built-in,
    ///   allowing the processor to be resilient in the face of errors.
    /// </summary>
///
/// <remarks>
/// The <see cref="EventProcessorClient" /> is safe to cache and use for the lifetime of an application, and that is best practice when the application
/// processes events regularly or semi-regularly. The processor holds responsibility for efficient resource management, working to keep resource usage low during
/// periods of inactivity and manage health during periods of higher use. Calling either the <see cref="StopProcessingAsync" /> or <see cref="StopProcessing" />
/// method when processing is complete or as the application is shutting down will ensure that network resources and other unmanaged objects are properly cleaned up.
/// </remarks>
    ///
[SuppressMessage("Usage", "CA1001:Types that own disposable fields should be disposable.", Justification = "Disposal is managed internally as part of the Stop operation.")]
public class EventProcessorClient : EventProcessor<EventProcessorPartition>
Expand Down Expand Up @@ -554,6 +561,14 @@ public override void StartProcessing(CancellationToken cancellationToken = defau
///
/// <param name="cancellationToken">A <see cref="CancellationToken"/> instance to signal the request to cancel the stop operation. If the operation is successfully canceled, the <see cref="EventProcessorClient" /> will keep running.</param>
///
/// <remarks>
/// When stopping, the processor will update the ownership of partitions that it was responsible for processing and clean up network resources used for communication with
/// the Event Hubs service. As a result, this method will perform network I/O and may need to wait for partition reads that were active to complete.
///
/// <para>Due to service calls and network latency, an invocation of this method may take slightly longer than the specified <see cref="EventProcessorClientOptions.MaximumWaitTime" /> or
/// if the wait time was not configured, the duration of the <see cref="EventHubsRetryOptions.TryTimeout" /> of the configured retry policy.</para>
/// </remarks>
///
public override Task StopProcessingAsync(CancellationToken cancellationToken = default) => base.StopProcessingAsync(cancellationToken);

/// <summary>
Expand All @@ -563,6 +578,14 @@ public override void StartProcessing(CancellationToken cancellationToken = defau
///
/// <param name="cancellationToken">A <see cref="CancellationToken"/> instance to signal the request to cancel the stop operation. If the operation is successfully canceled, the <see cref="EventProcessorClient" /> will keep running.</param>
///
/// <remarks>
/// When stopping, the processor will update the ownership of partitions that it was responsible for processing and clean up network resources used for communication with
/// the Event Hubs service. As a result, this method will perform network I/O and may need to wait for partition reads that were active to complete.
///
/// <para>Due to service calls and network latency, an invocation of this method may take slightly longer than the specified <see cref="EventProcessorClientOptions.MaximumWaitTime" /> or
/// if the wait time was not configured, the duration of the <see cref="EventHubsRetryOptions.TryTimeout" /> of the configured retry policy.</para>
/// </remarks>
///
public override void StopProcessing(CancellationToken cancellationToken = default) => base.StopProcessing(cancellationToken);

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,25 @@
namespace Azure.Messaging.EventHubs.Consumer
{
/// <summary>
/// A client responsible for reading <see cref="EventData" /> from a specific Event Hub
/// as a member of a specific consumer group.
/// <para>A client responsible for reading <see cref="EventData" /> from a specific Event Hub
/// as a member of a specific consumer group.</para>
///
/// A consumer may be exclusive, which asserts ownership over associated partitions for the consumer
/// <para>A consumer may be exclusive, which asserts ownership over associated partitions for the consumer
/// group to ensure that only one consumer from that group is reading the from the partition.
/// These exclusive consumers are sometimes referred to as "Epoch Consumers."
/// These exclusive consumers are sometimes referred to as "Epoch Consumers."</para>
///
/// A consumer may also be non-exclusive, allowing multiple consumers from the same consumer
/// <para>A consumer may also be non-exclusive, allowing multiple consumers from the same consumer
/// group to be actively reading events from a given partition. These non-exclusive consumers are
/// sometimes referred to as "Non-Epoch Consumers."
/// sometimes referred to as "Non-Epoch Consumers."</para>
/// </summary>
///
/// <remarks>
/// The <see cref="EventHubConsumerClient" /> is safe to cache and use for the lifetime of an application, and that is best practice when the application
/// reads events regularly or semi-regularly. The consumer holds responsibility for efficient resource management, working to keep resource usage low during
/// periods of inactivity and manage health during periods of higher use. Calling either the <see cref="CloseAsync" /> or <see cref="DisposeAsync" />
/// method as the application is shutting down will ensure that network resources and other unmanaged objects are properly cleaned up.
/// </remarks>
///
public class EventHubConsumerClient : IAsyncDisposable
{
/// <summary>The name of the default consumer group in the Event Hubs service.</summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,13 @@ namespace Azure.Messaging.EventHubs.Primitives
///
/// <typeparam name="TPartition">The context of the partition for which an operation is being performed.</typeparam>
///
/// <remarks>
/// The <see cref="EventProcessor{TPartition}" /> is safe to cache and use for the lifetime of an application, and that is best practice when the application
/// processes events regularly or semi-regularly. The processor holds responsibility for efficient resource management, working to keep resource usage low during
/// periods of inactivity and manage health during periods of higher use. Calling either the <see cref="StopProcessingAsync" /> or <see cref="StopProcessing" />
/// method when processing is complete or as the application is shutting down will ensure that network resources and other unmanaged objects are properly cleaned up.
/// </remarks>
///
[SuppressMessage("Microsoft.Design", "CA1001:TypesThatOwnDisposableFieldsShouldBeDisposable")]
public abstract class EventProcessor<TPartition> where TPartition : EventProcessorPartition, new()
{
Expand Down Expand Up @@ -371,6 +378,14 @@ public virtual void StartProcessing(CancellationToken cancellationToken = defaul
///
/// <param name="cancellationToken">A <see cref="CancellationToken"/> instance to signal the request to cancel the stop operation. If the operation is successfully canceled, the <see cref="EventProcessor{TPartition}" /> will keep running.</param>
///
/// <remarks>
/// When stopping, the processor will update the ownership of partitions that it was responsible for processing and clean up network resources used for communication with
/// the Event Hubs service. As a result, this method will perform network I/O and may need to wait for partition reads that were active to complete.
///
/// <para>Due to service calls and network latency, an invocation of this method may take slightly longer than the specified <see cref="EventProcessorOptions.MaximumWaitTime" /> or
/// if the wait time was not configured, the duration of the <see cref="EventHubsRetryOptions.TryTimeout" /> of the configured retry policy.</para>
/// </remarks>
///
public virtual async Task StopProcessingAsync(CancellationToken cancellationToken = default) =>
await StopProcessingInternalAsync(true, cancellationToken).ConfigureAwait(false);

Expand All @@ -381,6 +396,14 @@ public virtual async Task StopProcessingAsync(CancellationToken cancellationToke
///
/// <param name="cancellationToken">A <see cref="CancellationToken"/> instance to signal the request to cancel the stop operation. If the operation is successfully canceled, the <see cref="EventProcessor{TPartition}" /> will keep running.</param>
///
/// <remarks>
/// When stopping, the processor will update the ownership of partitions that it was responsible for processing and clean up network resources used for communication with
/// the Event Hubs service. As a result, this method will perform network I/O and may need to wait for partition reads that were active to complete.
///
/// <para>Due to service calls and network latency, an invocation of this method may take slightly longer than the specified <see cref="EventProcessorOptions.MaximumWaitTime" /> or
/// if the wait time was not configured, the duration of the <see cref="EventHubsRetryOptions.TryTimeout" /> of the configured retry policy.</para>
/// </remarks>
///
public virtual void StopProcessing(CancellationToken cancellationToken = default) =>
StopProcessingInternalAsync(false, cancellationToken).EnsureCompleted();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,14 @@ namespace Azure.Messaging.EventHubs.Primitives
/// </summary>
///
/// <remarks>
/// It is recommended that the <c>EventProcessorClient</c> or <see cref="EventHubConsumerClient" />
/// <para>It is recommended that the <c>EventProcessorClient</c> or <see cref="EventHubConsumerClient" />
/// be used for reading and processing events for the majority of scenarios. The partition receiver is
/// intended to enable scenarios with special needs which require more direct control.
/// intended to enable scenarios with special needs which require more direct control.</para>
///
/// <para>The <see cref="PartitionReceiver" /> is safe to cache and use for the lifetime of an application, and that is best practice when the application
/// reads events regularly or semi-regularly. The receiver holds responsibility for efficient resource management, working to keep resource usage low during
/// periods of inactivity and manage health during periods of higher use. Calling either the <see cref="CloseAsync" /> or <see cref="DisposeAsync" />
/// method as the application is shutting down will ensure that network resources and other unmanaged objects are properly cleaned up.</para>
/// </remarks>
///
/// <seealso href="https://www.nuget.org/packages/Azure.Messaging.EventHubs.Processor" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,24 @@ namespace Azure.Messaging.EventHubs.Producer
/// </summary>
///
/// <remarks>
/// Allowing automatic routing of partitions is recommended when:
/// <para>- The sending of events needs to be highly available.</para>
/// <para>- The event data should be evenly distributed among all available partitions.</para>
/// <para>
/// Allowing automatic routing of partitions is recommended when:
/// <para>- The sending of events needs to be highly available.</para>
/// <para>- The event data should be evenly distributed among all available partitions.</para>
/// </para>
///
/// If no partition is specified, the following rules are used for automatically selecting one:
/// <para>1) Distribute the events equally amongst all available partitions using a round-robin approach.</para>
/// <para>2) If a partition becomes unavailable, the Event Hubs service will automatically detect it and forward the message to another available partition.</para>
/// <para>
/// If no partition is specified, the following rules are used for automatically selecting one:
/// <para>1) Distribute the events equally amongst all available partitions using a round-robin approach.</para>
/// <para>2) If a partition becomes unavailable, the Event Hubs service will automatically detect it and forward the message to another available partition.</para>
/// </para>
///
/// <para>
/// The <see cref="EventHubProducerClient" /> is safe to cache and use for the lifetime of an application, and that is best practice when the application
/// publishes events regularly or semi-regularly. The producer holds responsibility for efficient resource management, working to keep resource usage low during
/// periods of inactivity and manage health during periods of higher use. Calling either the <see cref="CloseAsync" /> or <see cref="DisposeAsync" />
/// method as the application is shutting down will ensure that network resources and other unmanaged objects are properly cleaned up.
/// </para>
/// </remarks>
///
public class EventHubProducerClient : IAsyncDisposable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -542,5 +542,50 @@ public async Task ProducerCanInitializeWithPartialPartitionOptions()
Assert.That(async () => await producer.SendAsync(EventGenerator.CreateEvents(10), new SendEventOptions { PartitionId = partition }, cancellationSource.Token), Throws.Nothing);
}
}

/// <summary>
/// Verifies that the <see cref="EventHubProducerClient" /> is able to
/// perform operations when the idempotent publishing feature is enabled.
/// </summary>
///
[Test]
public async Task ProducerIsRejectedWithPartitionOptionsForInvalidState()
{
await using (EventHubScope scope = await EventHubScope.CreateAsync(2))
{
var connectionString = EventHubsTestEnvironment.Instance.BuildConnectionStringForEventHub(scope.EventHubName);
var options = new EventHubProducerClientOptions { EnableIdempotentPartitions = true };

var cancellationSource = new CancellationTokenSource();
cancellationSource.CancelAfter(EventHubsTestEnvironment.Instance.TestExecutionTimeLimit);

var partition = default(string);
var partitionProperties = default(PartitionPublishingProperties);

// Create a producer for a small scope that will Send some events and read the properties.

await using (var initialProducer = new EventHubProducerClient(connectionString, options))
{
partition = (await initialProducer.GetPartitionIdsAsync(cancellationSource.Token)).Last();

await initialProducer.SendAsync(EventGenerator.CreateEvents(10), new SendEventOptions { PartitionId = partition }, cancellationSource.Token);
partitionProperties = await initialProducer.GetPartitionPublishingPropertiesAsync(partition);
}

// Create a new producer using the previously read properties to set options for the partition.

options.PartitionOptions.Add(partition, new PartitionPublishingOptions
{
ProducerGroupId = partitionProperties.ProducerGroupId,
OwnerLevel = partitionProperties.OwnerLevel,
StartingSequenceNumber = (partitionProperties.LastPublishedSequenceNumber - 5)
});

await using var producer = new EventHubProducerClient(connectionString, options);

Assert.That(async () => await producer.SendAsync(EventGenerator.CreateEvents(10), new SendEventOptions { PartitionId = partition }, cancellationSource.Token),
Throws.InstanceOf<EventHubsException>().And.Property("Reason").EqualTo(EventHubsException.FailureReason.InvalidClientState));
}
}
}
}

0 comments on commit 23e4a3b

Please sign in to comment.