diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/MigrationGuide.md b/sdk/eventhub/Azure.Messaging.EventHubs/MigrationGuide.md index 984ad78ebcaa..69ab56c20ae9 100755 --- a/sdk/eventhub/Azure.Messaging.EventHubs/MigrationGuide.md +++ b/sdk/eventhub/Azure.Messaging.EventHubs/MigrationGuide.md @@ -1,4 +1,4 @@ -# Guide for migrating to Azure.Messaging.EventHubs from Microsoft.Azure.EventHubs +# Guide for migrating to Azure.Messaging.EventHubs from Microsoft.Azure.EventHubs This guide is intended to assist in the migration to version 5 of the Event Hubs client library from version 4. It will focus on side-by-side comparisons for similar operations between the v5 packages, [`Azure.Messaging.EventHubs`](https://www.nuget.org/packages/Azure.Messaging.EventHubs/) and [`Azure.Messaging.EventHubs.Processor`](https://www.nuget.org/packages/Azure.Messaging.EventHubs.Processor/) and their v4 equivalents, [`Microsoft.Azure.EventHubs`](https://www.nuget.org/packages/Microsoft.Azure.EventHubs/) and [`Microsoft.Azure.EventHubs.Processor`](https://www.nuget.org/packages/Microsoft.Azure.EventHubs.Processor/). @@ -45,18 +45,31 @@ In the case of Event Hubs, the modern client libraries have packages and namespa ### Client hierarchy -In the interest of simplifying the API surface we've made two distinct -clients, rather than having a single `EventHubClient`: -* [EventHubProducerClient](https://docs.microsoft.com/en-us/dotnet/api/azure.messaging.eventhubs.eventhubproducerclient?view=azure-dotnet-preview) - for publishing messages. -* [EventHubConsumerClient](https://docs.microsoft.com/en-us/dotnet/api/azure.messaging.eventhubs.eventhubconsumerclient?view=azure-dotnet-preview) - for reading messages. +The key goal for the modern Event Hubs client library was to provide a first-class experience for developers, from early exploration of Event Hubs through real-world use. We wanted to simplify the API surface to focus on scenarios important to the majority of developers without losing support for those with specialized needs. To achieve this, the client hierarchy has been split into two general categories, mainstream and specialized. -The producer and consumer clients operate in the context of a specific event hub and offer operations for all partitions. Unlike the v4, the clients are not bound to a specific partition, but the methods on them have overloads to handle specific partitions if needed. +The mainstream set of clients provides an approachable onboarding experience for those new to Event Hubs with a clear step-up path to production use. The specialized set of clients is focused on high-throughput and allowing developers a higher degree of control, at the cost of more complexity in their use. This section will briefly introduce the clients in both categories, with the remainder of the migration guide focused on mainstream scenarios. -The [EventHubConsumerClient](https://docs.microsoft.com/en-us/dotnet/api/azure.messaging.eventhubs.eventhubconsumerclient?view=azure-dotnet-preview) supports reading events from a single partition and also offers an easy way to familiarize yourself with Event Hubs by reading from all partitions without the rigor and complexity that you would need in a production application. For reading events from all partitions in a production scenario, we strongly recommend using the [EventProcessorClient](https://azuresdkdocs.blob.core.windows.net/$web/dotnet/Azure.Messaging.EventHubs.Processor/5.0.0-preview.6/api/Azure.Messaging.EventHubs/Azure.Messaging.EventHubs.EventProcessorClient.html). +#### Mainstream -The [EventProcessorClient](https://azuresdkdocs.blob.core.windows.net/$web/dotnet/Azure.Messaging.EventHubs.Processor/5.0.0-preview.6/api/Azure.Messaging.EventHubs/Azure.Messaging.EventHubs.EventProcessorClient.html) can be found in the new [Azure.Messaging.EventHubs.Processor](https://www.nuget.org/packages/Azure.Messaging.EventHubs.Processor/) which replaces the older [Microsoft.Azure.EventHubs.Processor](https://www.nuget.org/packages/Microsoft.Azure.EventHubs.Processor/). Here we have the [EventProcessorClient](https://azuresdkdocs.blob.core.windows.net/$web/dotnet/Azure.Messaging.EventHubs.Processor/5.0.0-preview.6/api/Azure.Messaging.EventHubs/Azure.Messaging.EventHubs.EventProcessorClient.html) which is responsible for consuming events for the configured Event Hub and consumer group across all partitions. It also supports checkpointing and load balancing. Currently, only Azure Storage Blobs is supported for checkpointing. +In order to allow for a single focus and clear responsibility, the core functionality for publishing and reading events belongs to two distinct clients, rather than the single `EventHubClient` used by previous versions. The producer and consumer clients operate in the context of a specific Event Hub and offer operations for all partitions. Unlike in v4, the clients are not bound to a specific partition, but the methods on them have overloads to handle specific partitions if needed. + +- The [EventHubProducerClient](https://docs.microsoft.com/dotnet/api/azure.messaging.eventhubs.producer?view=azure-dotnet) is responsible for publishing events and supports multiple approaches for selecting the partition to which the event is associated, including automatic routing by the Event Hubs service and specifying an explicit partition. + +- The [EventHubConsumerClient](https://docs.microsoft.com/dotnet/api/azure.messaging.eventhubs.consumer.eventhubconsumerclient?view=azure-dotnet) supports reading events from a single partition and also offers an easy way to familiarize yourself with Event Hubs by reading from all partitions without the rigor and complexity that you would need in a production application. For reading events from all partitions in a production scenario, we strongly recommend using the [EventProcessorClient](https://docs.microsoft.com/dotnet/api/azure.messaging.eventhubs.eventprocessorclient?view=azure-dotnet) over the `EventHubConsumerClient`. + +- The [EventProcessorClient](https://docs.microsoft.com/dotnet/api/azure.messaging.eventhubs.eventprocessorclient?view=azure-dotnet) is responsible for reading and processing events for all partitions of an Event Hub. It will collaborate with other instances for the same Event Hub and consumer group pairing to balance work between them. A high degree of fault tolerance is built-in, allowing the processor to be resilient in the face of errors. The `EventProcessorClient` can be found in the new [Azure.Messaging.EventHubs.Processor](https://www.nuget.org/packages/Azure.Messaging.EventHubs.Processor/) package which replaces the older [Microsoft.Azure.EventHubs.Processor](https://www.nuget.org/packages/Microsoft.Azure.EventHubs.Processor/) package. + + One of the key features of the `EventProcessorClient` is enabling tracking of which events have been processed by interacting with a durable storage provider. This process is commonly referred to as [checkpointing](https://docs.microsoft.com/azure/event-hubs/event-hubs-features#checkpointing) and the persisted state as a checkpoint. This version of the `EventProcessorClient` only supports Azure Storage Blobs. + + **_Important note on checkpoints:_** It is unfortunate, but the [EventProcessorClient](https://docs.microsoft.com/dotnet/api/azure.messaging.eventhubs.eventprocessorclient?view=azure-dotnet) does not support legacy checkpoint data. In order to meet the goals set for unifying checkpoint data across languages for the new set of Event Hubs libraries and to make improvements to the algorithm used for managing partition ownership, breaking changes were necessary. An approach for migrating legacy checkpoints can be found in the [migration samples](#migration-samples) below. + +#### Specialized + +- The [PartitionReceiver](https://docs.microsoft.com/dotnet/api/azure.messaging.eventhubs.primitives.partitionreceiver?view=azure-dotnet) is responsible for reading events from a specific partition of an Event Hub, with a greater level of control over communication with the Event Hubs service than is offered by other event consumers. More detail on the design and philosophy for the `PartitionReceiver` can be found in the [design document](https://github.com/Azure/azure-sdk-for-net/blob/master/sdk/eventhub/Azure.Messaging.EventHubs/design/partition-receiver-proposal.md). + +- The [EventProcessor](https://docs.microsoft.com/dotnet/api/azure.messaging.eventhubs.primitives.eventprocessor-1?view=azure-dotnet) provides a base for creating a custom processor for for reading and processing events for all partitions of an Event Hub. The `EventProcessor` fills a similar role as the EventProcessorClient, with cooperative load balancing and resiliency as its core features. However, it also offers native batch processing, the ability to customize checkpoint storage, a greater level of control over communication with the Event Hubs service, and a less opinionated API. The caveat is that this comes with additional complexity and exists as of an abstract base, which needs to be extended and the core “handler” activities implemented via override. + + Generally speaking, the `EventProcessorClient` was designed to provide a familiar API to that of the `EventHubConsumerClient` and offer an intuitive "step-up" experience for developers exploring Event Hubs as they advance to production scenarios. For a large portion of our library users, that covers their needs well. There's definitely a point, however, where an application requires more control to handle higher throughput or unique needs - that's where the `EventProcessor` is intended to help. More on the design and philosophy behind this can be found [design document](https://github.com/Azure/azure-sdk-for-net/blob/master/sdk/eventhub/Azure.Messaging.EventHubs/design/event-processor%7BT%7D-proposal.md). ### Client constructors @@ -88,7 +101,7 @@ The v4 client allowed for sending a single event or an enumerable of events, whi In v4, events could be published to a single partition using `PartitionSender`. You could also send a single event, a set of events, or a batch. In v5 only batches are supported to ensure that there are no unexpected exceptions generated during send; you can't put it in a batch if it was too large to send. -In v5, this has been consolidated into a more efficient SendAsync(EventDataBatch) method. Batching merges information from multiple events into a single publish message, reducing the amount of network communication needed vs publishing events one at a time. Events are published to a specific partition when partition id is set in [`CreateBatchOptions`](https://docs.microsoft.com/en-us/dotnet/api/azure.messaging.eventhubs.createbatchoptions?view=azure-dotnet-preview) before calling [`CreateBatchAsync(CreateBatchOptions)`](https://docs.microsoft.com/en-us/dotnet/api/azure.messaging.eventhubs.eventhubproducerclient.createbatchasync?view=azure-dotnet-preview#Azure_Messaging_EventHubs_EventHubProducerClient_CreateBatchAsync_Azure_Messaging_EventHubs_CreateBatchOptions_System_Threading_CancellationToken_). +In v5, this has been consolidated into a more efficient SendAsync(EventDataBatch) method. Batching merges information from multiple events into a single publish message, reducing the amount of network communication needed vs publishing events one at a time. Events are published to a specific partition when partition id is set in [`CreateBatchOptions`](https://docs.microsoft.com/dotnet/api/azure.messaging.eventhubs.producer.createbatchoptions?view=azure-dotnet) before calling [`CreateBatchAsync(CreateBatchOptions)`](https://docs.microsoft.com/dotnet/api/azure.messaging.eventhubs.producer.eventhubproducerclient.createbatchasync?view=azure-dotnet#Azure_Messaging_EventHubs_Producer_EventHubProducerClient_CreateBatchAsync_Azure_Messaging_EventHubs_Producer_CreateBatchOptions_System_Threading_CancellationToken_). The code below assumes all events fit into a single batch. For more complete example, see sample: [Publish events to specific partition](https://github.com/Azure/azure-sdk-for-net/blob/master/sdk/eventhub/Azure.Messaging.EventHubs/samples/Sample06_PublishAnEventBatchToASpecificPartition.cs). @@ -100,7 +113,8 @@ var eventHubName = "<< NAME OF THE EVENT HUB >>"; var connectionStringBuilder = new EventHubsConnectionStringBuilder(connectionString){ EntityPath = eventHubName }; var eventHubClient = EventHubClient.CreateFromConnectionString(connectionStringBuilder.ToString()); -PartitionSender partitionSender = eventHubClient.CreatePartitionSender("my-partition-id"); +var partitionSender = eventHubClient.CreatePartitionSender("my-partition-id"); + try { EventDataBatch eventBatch = partitionSender.CreateBatch(); @@ -136,7 +150,7 @@ await using (var producerClient = new EventHubProducerClient(connectionString, e In v4, events could be published to an Event Hub that allowed the service to automatically route events to an available partition. -In v5, automatic routing occurs when an [`EventDataBatch`](https://docs.microsoft.com/en-us/dotnet/api/azure.messaging.eventhubs.eventdatabatch?view=azure-dotnet-preview) is created using [`CreateBatchAsync()`](https://docs.microsoft.com/en-us/dotnet/api/azure.messaging.eventhubs.eventhubproducerclient.createbatchasync?view=azure-dotnet-preview#Azure_Messaging_EventHubs_EventHubProducerClient_CreateBatchAsync_System_Threading_CancellationToken_). +In v5, automatic routing occurs when an [`EventDataBatch`](https://docs.microsoft.com/dotnet/api/azure.messaging.eventhubs.producer.eventdatabatch?view=azure-dotnet) is created using [`CreateBatchAsync()`](https://docs.microsoft.com/dotnet/api/azure.messaging.eventhubs.producer.eventhubproducerclient.createbatchasync?view=azure-dotnet#Azure_Messaging_EventHubs_Producer_EventHubProducerClient_CreateBatchAsync_System_Threading_CancellationToken_). In v4: ```csharp @@ -145,6 +159,7 @@ var eventHubName = "<< NAME OF THE EVENT HUB >>"; var connectionStringBuilder = new EventHubsConnectionStringBuilder(connectionString){ EntityPath = eventHubName }; var eventHubClient = EventHubClient.CreateFromConnectionString(connectionStringBuilder.ToString()); + try { EventDataBatch eventBatch = eventHubClient.CreateBatch(); @@ -178,7 +193,7 @@ await using (var producerClient = new EventHubProducerClient(connectionString, e In v4, events could be published with a partition key. -In v5, events are published with a partition key when partition key is set in [`CreateBatchOptions`](https://docs.microsoft.com/en-us/dotnet/api/azure.messaging.eventhubs.createbatchoptions?view=azure-dotnet-preview) before calling [`CreateBatchAsync(CreateBatchOptions)`](https://docs.microsoft.com/en-us/dotnet/api/azure.messaging.eventhubs.eventhubproducerclient.createbatchasync?view=azure-dotnet-preview#Azure_Messaging_EventHubs_EventHubProducerClient_CreateBatchAsync_Azure_Messaging_EventHubs_CreateBatchOptions_System_Threading_CancellationToken_). +In v5, events are published with a partition key when partition key is set in [`CreateBatchOptions`](https://docs.microsoft.com/dotnet/api/azure.messaging.eventhubs.producer.createbatchoptions?view=azure-dotnet) before calling [`CreateBatchAsync(CreateBatchOptions)`](https://docs.microsoft.com/dotnet/api/azure.messaging.eventhubs.producer.eventhubproducerclient.createbatchasync?view=azure-dotnet#Azure_Messaging_EventHubs_Producer_EventHubProducerClient_CreateBatchAsync_Azure_Messaging_EventHubs_Producer_CreateBatchOptions_System_Threading_CancellationToken_). In v4: ```csharp @@ -187,6 +202,7 @@ var eventHubName = "<< NAME OF THE EVENT HUB >>"; var connectionStringBuilder = new EventHubsConnectionStringBuilder(connectionString){ EntityPath = eventHubName }; var eventHubClient = EventHubClient.CreateFromConnectionString(connectionStringBuilder.ToString()); + try { EventData eventData = new EventData(Encoding.UTF8.GetBytes("First")); @@ -228,7 +244,8 @@ var eventHubName = "<< NAME OF THE EVENT HUB >>"; var connectionStringBuilder = new EventHubsConnectionStringBuilder(connectionString){ EntityPath = eventHubName }; var eventHubClient = EventHubClient.CreateFromConnectionString(connectionStringBuilder.ToString()); -PartitionReceiver partitionReceiver = client.CreateReceiver("my-consumer-group", "my-partition-id", EventPosition.FromStart()); +var partitionReceiver = client.CreateReceiver("my-consumer-group", "my-partition-id", EventPosition.FromStart()); + try { // Gets up to 100 events or until the read timeout elapses. @@ -280,37 +297,42 @@ In v5, `EventProcessorClient` allows you to do the same, the development model i The following code in v4: ```csharp -private static void Main(String[] args) { +public static void Main(string[] args) +{ var storageConnectionString = "<< CONNECTION STRING FOR THE STORAGE ACCOUNT >>"; var blobContainerName = "<< NAME OF THE BLOBS CONTAINER >>"; var eventHubsConnectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>"; var eventHubName = "<< NAME OF THE EVENT HUB >>"; var consumerGroup = "<< NAME OF THE EVENT HUB CONSUMER GROUP >>"; - - EventProcessorHost eventProcessorHost = new EventProcessorHost(eventHubName, consumerGroup, eventHubsConnectionString, storageConnectionString, blobContainerName); + var eventProcessorHost = new EventProcessorHost(eventHubName, consumerGroup, eventHubsConnectionString, storageConnectionString, blobContainerName); // Registers the Event Processor Host and starts receiving messages await eventProcessorHost.RegisterEventProcessorAsync(); + // When you are finished processing events. await eventProcessorHost.UnregisterEventProcessorAsync(); } -public class SimpleEventProcessor implements IEventProcessor { +public class SimpleEventProcessor : IEventProcessor +{ public Task CloseAsync(PartitionContext context, CloseReason reason) { Console.WriteLine($"Processor Shutting Down. Partition '{context.PartitionId}', Reason: '{reason}'."); return Task.CompletedTask; } + public Task OpenAsync(PartitionContext context) { Console.WriteLine($"SimpleEventProcessor initialized. Partition: '{context.PartitionId}'"); return Task.CompletedTask; } + public Task ProcessErrorAsync(PartitionContext context, Exception error) { Console.WriteLine($"Error on Partition: {context.PartitionId}, Error: {error.Message}"); return Task.CompletedTask; } + public Task ProcessEventsAsync(PartitionContext context, IEnumerable messages) { foreach (var eventData in messages) @@ -324,7 +346,7 @@ public class SimpleEventProcessor implements IEventProcessor { } ``` -And in v5, in order to use the `EventProcessorClient`, handlers for event processing and errors must be provided. These handlers are considered self-contained and developers are responsible for ensuring that exceptions within the handler code are accounted for. +In v5, in order to use the `EventProcessorClient`, handlers for event processing and errors must be provided. These handlers are considered self-contained and developers are responsible for ensuring that exceptions within the handler code are accounted for. ```csharp private async Task ProcessUntilCanceled(CancellationToken cancellationToken) @@ -357,20 +379,128 @@ private async Task ProcessUntilCanceled(CancellationToken cancellationToken) try { - while (!cancellationToken.IsCancellationRequested) + await Task.Delay(Timeout.Infinite, cancellationSource.Token); + await processor.StopProcessingAsync(); + } + finally + { + // To prevent leaks, the handlers should be removed when processing is complete + processor.ProcessEventAsync -= processEventHandler; + processor.ProcessErrorAsync -= processErrorHandler; + } +} +``` + +### Migrating Event Processor checkpoints + +In v4, the `EventProcessorHost` supported a model of pluggable storage providers for checkpoint data, using Azure Storage Blobs as the default. Using the Azure Storage checkpoint manager, the lease and checkpoint information is stored as a JSON blob appearing within the Azure Storage account provided to the `EventProcessorHost`. More details can be found in the [documentation](https://docs.microsoft.com/azure/event-hubs/event-hubs-event-processor-host#partition-ownership-tracking). + +In v5, the `EventProcessorClient` is an opinionated implementation, storing checkpoints in Azure Storage Blobs, using the blob metadata to track information. Unfortunately, the `EventProcessorClient` is unable to consume legacy checkpoints due to the differences in format, approach, and the possibility of a custom checkpoint provider having been used. + +For v5, the `EventProcessorClient` will use a default starting position to read a partition when no checkpoint was found. This default position may be set in the [`PartitionInitializingAsync`](https://docs.microsoft.com/dotnet/api/azure.messaging.eventhubs.eventprocessorclient.partitioninitializingasync?view=azure-dotnet) event. One common approach for migrating legacy checkpoints is to read the legacy checkpoint data and use it to set the default starting position for the associated partition. This example demonstrates that approach. + +Because the nature of legacy checkpoint data cannot be assumed due to custom storage provider support, the example uses a static set of checkpoint data for illustration. The format of this sample data matches that used by the default Azure Storage checkpoint manager of the `EventProcessorHost` for the body of the Blob holding checkpoint data. + +Reading Azure Storage data is not within the scope of this example, but details may be found in the Azure Storage documentation, see: [Quickstart: Azure Blob storage client library v12 for .NET](https://docs.microsoft.com/en-us/azure/storage/blobs/storage-quickstart-blobs-dotnet). + +```csharp +public static async Task Main(string[] args) +{ + var blobStorageConnectionString = "<< CONNECTION STRING FOR THE STORAGE ACCOUNT >>"; + var blobContainerName = "<< NAME OF THE BLOBS CONTAINER >>"; + var eventHubsConnectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>"; + var eventHubName = "<< NAME OF THE EVENT HUB >>"; + var consumerGroup = EventHubConsumerClient.DefaultConsumerGroupName; + + // These are stubbed for example purposes; in an actual application, these would be + // the real handlers for processing events and errors in your specific scenario. + async Task processEventHandler(ProcessEventArgs eventArgs) => Task.CompletedTask; + async Task processErrorHandler(ProcessErrorEventArgs eventArgs) => Task.CompletedTask; + + // This handler will consider whether there is a legacy checkpoint for the partition available and, + // if so, will apply it as a default for the partition so that the processor begins at the + // same location. + var legacyCheckpoints = ReadLegacyCheckpoints(); + + Task partitionInitializingHandler(PartitionInitializingEventArgs eventArgs) + { + if (eventArgs.CancellationToken.IsCancellationRequested) + { + return Task.CompletedTask; + } + + if ((legacyCheckpoints.TryGetValue(eventArgs.PartitionId, out var offsetString)) + && (int.TryParse(offsetString, out var offset))) { - await Task.Delay(TimeSpan.FromSeconds(1)); + eventArgs.DefaultStartingPosition = EventPosition.FromOffset(offset, false); + Console.WriteLine($"Initialized partition: { eventArgs.PartitionId }"); } + return Task.CompletedTask; + } + + // Create the clients and assign the handlers. + var storageClient = new BlobContainerClient(blobStorageConnectionString, blobContainerName); + var processor = new EventProcessorClient(storageClient, consumerGroup, eventHubsConnectionString, eventHubName); + + processor.PartitionInitializingAsync += partitionInitializingHandler; + processor.ProcessEventAsync += processEventHandler; + processor.ProcessErrorAsync += processErrorHandler; + + // Process until canceled. + try + { + await Task.Delay(Timeout.Infinite, cancellationSource.Token); await processor.StopProcessingAsync(); } finally { - // To prevent leaks, the handlers should be removed when processing is complete + processor.PartitionInitializingAsync -= partitionInitializingHandler; processor.ProcessEventAsync -= processEventHandler; processor.ProcessErrorAsync -= processErrorHandler; } } + +// This method is stubbed for example purposes; in an actual implementation, this +// would be responsible for reading from the legacy checkpoint store and returning +// information about the checkpoint. +// +// This example mimics the format used by the Azure Storage Blobs checkpoint manager +// included with the EventProcessorHost. If using a different checkpoint store, the +// checkpoint format may differ. +private Dictionary ReadLegacyCheckpoints() +{ + var checkpointJson = @"[ + { + ""PartitionId"":""1"", + ""Owner"":""eecd42df-a253-49d1-bb04-e5f00c106cfc"", + ""Token"":""6271aadb-801f-4ec7-a011-a008808a656c"", + ""Epoch"":5, + ""Offset"":""400"", + ""SequenceNumber"":125 + }, + { + ""PartitionId"":""2"", + ""Owner"":""eecd42df-a253-49d1-bb04-e5f00c106cfc"", + ""Token"":""6271aadb-801f-4ec7-a011-a008808a656c"", + ""Epoch"":1, + ""Offset"":""78"", + ""SequenceNumber"":39 + }]"; + + return System.Text.Json.JsonSerializer + .Deserialize(checkpointJson) + .ToDictionary(item => item.PartitionId, item => item.Offset); +} + +// This class exists to support the exmple deserialization from the JSON format +// used by the Azure Blob checkpoint manager included with the EventProcessorHost; +// if using a different checkpoint store, this class may not be necessary. +private class Checkpoint +{ + public string PartitionId { get; set; } + public string Offset { get; set; } +} ``` ## Additional samples