Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Event Hubs Client] Minor Improvements #15654

Merged
merged 1 commit into from
Oct 3, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@
/sdk/keyvault/ @schaabs @heaths

# PRLabel: %Cognitive - Metrics Advisor
sdk/metricsadvisor/ @kinelski
/sdk/metricsadvisor/ @kinelski

# PRLabel: %Search
/sdk/search/ @brjohnstmsft @arv100kri @bleroy @tg-msft @heaths
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,13 @@ namespace Azure.Messaging.EventHubs
    ///   group pairing to share work by using a common storage platform to communicate.  Fault tolerance is also built-in,
    ///   allowing the processor to be resilient in the face of errors.
    /// </summary>
///
/// <remarks>
/// The <see cref="EventProcessorClient" /> is safe to cache and use for the lifetime of an application, and that is best practice when the application
/// processes events regularly or semi-regularly. The processor holds responsibility for efficient resource management, working to keep resource usage low during
/// periods of inactivity and manage health during periods of higher use. Calling either the <see cref="StopProcessingAsync" /> or <see cref="StopProcessing" />
/// method when processing is complete or as the application is shutting down will ensure that network resources and other unmanaged objects are properly cleaned up.
/// </remarks>
    ///
[SuppressMessage("Usage", "CA1001:Types that own disposable fields should be disposable.", Justification = "Disposal is managed internally as part of the Stop operation.")]
public class EventProcessorClient : EventProcessor<EventProcessorPartition>
Expand Down Expand Up @@ -554,6 +561,14 @@ public override void StartProcessing(CancellationToken cancellationToken = defau
///
/// <param name="cancellationToken">A <see cref="CancellationToken"/> instance to signal the request to cancel the stop operation. If the operation is successfully canceled, the <see cref="EventProcessorClient" /> will keep running.</param>
///
/// <remarks>
/// When stopping, the processor will update the ownership of partitions that it was responsible for processing and clean up network resources used for communication with
/// the Event Hubs service. As a result, this method will perform network I/O and may need to wait for partition reads that were active to complete.
///
/// <para>Due to service calls and network latency, an invocation of this method may take slightly longer than the specified <see cref="EventProcessorClientOptions.MaximumWaitTime" /> or
/// if the wait time was not configured, the duration of the <see cref="EventHubsRetryOptions.TryTimeout" /> of the configured retry policy.</para>
/// </remarks>
///
public override Task StopProcessingAsync(CancellationToken cancellationToken = default) => base.StopProcessingAsync(cancellationToken);

/// <summary>
Expand All @@ -563,6 +578,14 @@ public override void StartProcessing(CancellationToken cancellationToken = defau
///
/// <param name="cancellationToken">A <see cref="CancellationToken"/> instance to signal the request to cancel the stop operation. If the operation is successfully canceled, the <see cref="EventProcessorClient" /> will keep running.</param>
///
/// <remarks>
/// When stopping, the processor will update the ownership of partitions that it was responsible for processing and clean up network resources used for communication with
/// the Event Hubs service. As a result, this method will perform network I/O and may need to wait for partition reads that were active to complete.
///
/// <para>Due to service calls and network latency, an invocation of this method may take slightly longer than the specified <see cref="EventProcessorClientOptions.MaximumWaitTime" /> or
/// if the wait time was not configured, the duration of the <see cref="EventHubsRetryOptions.TryTimeout" /> of the configured retry policy.</para>
/// </remarks>
///
public override void StopProcessing(CancellationToken cancellationToken = default) => base.StopProcessing(cancellationToken);

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,25 @@
namespace Azure.Messaging.EventHubs.Consumer
{
/// <summary>
/// A client responsible for reading <see cref="EventData" /> from a specific Event Hub
/// as a member of a specific consumer group.
/// <para>A client responsible for reading <see cref="EventData" /> from a specific Event Hub
/// as a member of a specific consumer group.</para>
///
/// A consumer may be exclusive, which asserts ownership over associated partitions for the consumer
/// <para>A consumer may be exclusive, which asserts ownership over associated partitions for the consumer
/// group to ensure that only one consumer from that group is reading the from the partition.
/// These exclusive consumers are sometimes referred to as "Epoch Consumers."
/// These exclusive consumers are sometimes referred to as "Epoch Consumers."</para>
///
/// A consumer may also be non-exclusive, allowing multiple consumers from the same consumer
/// <para>A consumer may also be non-exclusive, allowing multiple consumers from the same consumer
/// group to be actively reading events from a given partition. These non-exclusive consumers are
/// sometimes referred to as "Non-Epoch Consumers."
/// sometimes referred to as "Non-Epoch Consumers."</para>
/// </summary>
///
/// <remarks>
/// The <see cref="EventHubConsumerClient" /> is safe to cache and use for the lifetime of an application, and that is best practice when the application
/// reads events regularly or semi-regularly. The consumer holds responsibility for efficient resource management, working to keep resource usage low during
/// periods of inactivity and manage health during periods of higher use. Calling either the <see cref="CloseAsync" /> or <see cref="DisposeAsync" />
/// method as the application is shutting down will ensure that network resources and other unmanaged objects are properly cleaned up.
/// </remarks>
///
public class EventHubConsumerClient : IAsyncDisposable
{
/// <summary>The name of the default consumer group in the Event Hubs service.</summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,13 @@ namespace Azure.Messaging.EventHubs.Primitives
///
/// <typeparam name="TPartition">The context of the partition for which an operation is being performed.</typeparam>
///
/// <remarks>
/// The <see cref="EventProcessor{TPartition}" /> is safe to cache and use for the lifetime of an application, and that is best practice when the application
/// processes events regularly or semi-regularly. The processor holds responsibility for efficient resource management, working to keep resource usage low during
/// periods of inactivity and manage health during periods of higher use. Calling either the <see cref="StopProcessingAsync" /> or <see cref="StopProcessing" />
/// method when processing is complete or as the application is shutting down will ensure that network resources and other unmanaged objects are properly cleaned up.
/// </remarks>
///
[SuppressMessage("Microsoft.Design", "CA1001:TypesThatOwnDisposableFieldsShouldBeDisposable")]
public abstract class EventProcessor<TPartition> where TPartition : EventProcessorPartition, new()
{
Expand Down Expand Up @@ -371,6 +378,14 @@ public virtual void StartProcessing(CancellationToken cancellationToken = defaul
///
/// <param name="cancellationToken">A <see cref="CancellationToken"/> instance to signal the request to cancel the stop operation. If the operation is successfully canceled, the <see cref="EventProcessor{TPartition}" /> will keep running.</param>
///
/// <remarks>
/// When stopping, the processor will update the ownership of partitions that it was responsible for processing and clean up network resources used for communication with
/// the Event Hubs service. As a result, this method will perform network I/O and may need to wait for partition reads that were active to complete.
///
/// <para>Due to service calls and network latency, an invocation of this method may take slightly longer than the specified <see cref="EventProcessorOptions.MaximumWaitTime" /> or
/// if the wait time was not configured, the duration of the <see cref="EventHubsRetryOptions.TryTimeout" /> of the configured retry policy.</para>
/// </remarks>
///
public virtual async Task StopProcessingAsync(CancellationToken cancellationToken = default) =>
await StopProcessingInternalAsync(true, cancellationToken).ConfigureAwait(false);

Expand All @@ -381,6 +396,14 @@ public virtual async Task StopProcessingAsync(CancellationToken cancellationToke
///
/// <param name="cancellationToken">A <see cref="CancellationToken"/> instance to signal the request to cancel the stop operation. If the operation is successfully canceled, the <see cref="EventProcessor{TPartition}" /> will keep running.</param>
///
/// <remarks>
/// When stopping, the processor will update the ownership of partitions that it was responsible for processing and clean up network resources used for communication with
/// the Event Hubs service. As a result, this method will perform network I/O and may need to wait for partition reads that were active to complete.
///
/// <para>Due to service calls and network latency, an invocation of this method may take slightly longer than the specified <see cref="EventProcessorOptions.MaximumWaitTime" /> or
/// if the wait time was not configured, the duration of the <see cref="EventHubsRetryOptions.TryTimeout" /> of the configured retry policy.</para>
/// </remarks>
///
public virtual void StopProcessing(CancellationToken cancellationToken = default) =>
StopProcessingInternalAsync(false, cancellationToken).EnsureCompleted();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,14 @@ namespace Azure.Messaging.EventHubs.Primitives
/// </summary>
///
/// <remarks>
/// It is recommended that the <c>EventProcessorClient</c> or <see cref="EventHubConsumerClient" />
/// <para>It is recommended that the <c>EventProcessorClient</c> or <see cref="EventHubConsumerClient" />
/// be used for reading and processing events for the majority of scenarios. The partition receiver is
/// intended to enable scenarios with special needs which require more direct control.
/// intended to enable scenarios with special needs which require more direct control.</para>
///
/// <para>The <see cref="PartitionReceiver" /> is safe to cache and use for the lifetime of an application, and that is best practice when the application
/// reads events regularly or semi-regularly. The receiver holds responsibility for efficient resource management, working to keep resource usage low during
/// periods of inactivity and manage health during periods of higher use. Calling either the <see cref="CloseAsync" /> or <see cref="DisposeAsync" />
/// method as the application is shutting down will ensure that network resources and other unmanaged objects are properly cleaned up.</para>
/// </remarks>
///
/// <seealso href="https://www.nuget.org/packages/Azure.Messaging.EventHubs.Processor" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,24 @@ namespace Azure.Messaging.EventHubs.Producer
/// </summary>
///
/// <remarks>
/// Allowing automatic routing of partitions is recommended when:
/// <para>- The sending of events needs to be highly available.</para>
/// <para>- The event data should be evenly distributed among all available partitions.</para>
/// <para>
/// Allowing automatic routing of partitions is recommended when:
/// <para>- The sending of events needs to be highly available.</para>
/// <para>- The event data should be evenly distributed among all available partitions.</para>
/// </para>
///
/// If no partition is specified, the following rules are used for automatically selecting one:
/// <para>1) Distribute the events equally amongst all available partitions using a round-robin approach.</para>
/// <para>2) If a partition becomes unavailable, the Event Hubs service will automatically detect it and forward the message to another available partition.</para>
/// <para>
/// If no partition is specified, the following rules are used for automatically selecting one:
/// <para>1) Distribute the events equally amongst all available partitions using a round-robin approach.</para>
/// <para>2) If a partition becomes unavailable, the Event Hubs service will automatically detect it and forward the message to another available partition.</para>
/// </para>
///
/// <para>
/// The <see cref="EventHubProducerClient" /> is safe to cache and use for the lifetime of an application, and that is best practice when the application
/// publishes events regularly or semi-regularly. The producer holds responsibility for efficient resource management, working to keep resource usage low during
/// periods of inactivity and manage health during periods of higher use. Calling either the <see cref="CloseAsync" /> or <see cref="DisposeAsync" />
/// method as the application is shutting down will ensure that network resources and other unmanaged objects are properly cleaned up.
/// </para>
/// </remarks>
///
public class EventHubProducerClient : IAsyncDisposable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -542,5 +542,50 @@ public async Task ProducerCanInitializeWithPartialPartitionOptions()
Assert.That(async () => await producer.SendAsync(EventGenerator.CreateEvents(10), new SendEventOptions { PartitionId = partition }, cancellationSource.Token), Throws.Nothing);
}
}

/// <summary>
/// Verifies that the <see cref="EventHubProducerClient" /> is able to
/// perform operations when the idempotent publishing feature is enabled.
/// </summary>
///
[Test]
public async Task ProducerIsRejectedWithPartitionOptionsForInvalidState()
{
await using (EventHubScope scope = await EventHubScope.CreateAsync(2))
{
var connectionString = EventHubsTestEnvironment.Instance.BuildConnectionStringForEventHub(scope.EventHubName);
var options = new EventHubProducerClientOptions { EnableIdempotentPartitions = true };

var cancellationSource = new CancellationTokenSource();
cancellationSource.CancelAfter(EventHubsTestEnvironment.Instance.TestExecutionTimeLimit);

var partition = default(string);
var partitionProperties = default(PartitionPublishingProperties);

// Create a producer for a small scope that will Send some events and read the properties.

await using (var initialProducer = new EventHubProducerClient(connectionString, options))
{
partition = (await initialProducer.GetPartitionIdsAsync(cancellationSource.Token)).Last();

await initialProducer.SendAsync(EventGenerator.CreateEvents(10), new SendEventOptions { PartitionId = partition }, cancellationSource.Token);
partitionProperties = await initialProducer.GetPartitionPublishingPropertiesAsync(partition);
}

// Create a new producer using the previously read properties to set options for the partition.

options.PartitionOptions.Add(partition, new PartitionPublishingOptions
{
ProducerGroupId = partitionProperties.ProducerGroupId,
OwnerLevel = partitionProperties.OwnerLevel,
StartingSequenceNumber = (partitionProperties.LastPublishedSequenceNumber - 5)
});

await using var producer = new EventHubProducerClient(connectionString, options);

Assert.That(async () => await producer.SendAsync(EventGenerator.CreateEvents(10), new SendEventOptions { PartitionId = partition }, cancellationSource.Token),
Throws.InstanceOf<EventHubsException>().And.Property("Reason").EqualTo(EventHubsException.FailureReason.InvalidClientState));
}
}
}
}