Description
openedon May 25, 2021
Summary
Change Feed Processor consists of 3 main components:
- The monitored container: The source of the events that is read through Change Feed
- The lease container: The storage of leases and state
- The CFP host application: The application that decides what to do with the changes (user logic).
Change Feed Processor works as a background service that spins up Tasks do interact with these components and these Tasks do not run on the main context of the CFP host application.
Example of component interactions are:
- Acquiring leases from the lease container
- Reading the Change Feed from the monitored container
- Updating the leases on the lease container
Each of them could fail, and while the CFP implementation retries, there is no indication (unless the user is capturing Traces) of what is going on.
It would be beneficial to provide some way of notification on errors or events on the different levels so users can:
- Hook logging and telemetry for informative purposes
- Detect critical failures (for example, some container was deleted)
Why is this needed
While we do have Traces that show these events or issues, it requires users to hook TraceListeners. In a NETFX application, this can be done on the app.config
or web.config
, and in a NET Core application, requires initialization code. Which is all cumbersome and does not allow for the user to take action on any of these events (for example, if there is a NotFound error on a Change Feed read, it means the monitored container was deleted, and the user should stop the CFP instance and do something like recreate the container)
Migration from V2
V2 CFP has a Health Monitor API in the form of https://docs.microsoft.com/en-us/dotnet/api/microsoft.azure.documents.changefeedprocessor.changefeedprocessorbuilder.withhealthmonitor?view=azure-dotnet
public Microsoft.Azure.Documents.ChangeFeedProcessor.ChangeFeedProcessorBuilder WithHealthMonitor (Microsoft.Azure.Documents.ChangeFeedProcessor.Monitoring.IHealthMonitor healthMonitor);
Which exposes events with this structure:
What type of events should we track
The relevant events are:
- Lease acquire -> Means the instance has started processing that particular lease. Useful during rebalancing and initialization
- Lease release -> Means the instance stopped processing the lease.
- Processing changes -> Some customers when reporting issues on CFP say that they are not receiving changes, when the issue is that their Handler logic is throwing unhandled exceptions. Being able to confirm that we are indeed delivering the changes would rule out issues on the library.
- Lease updated/checkpointed -> Signals batch completion
In any of these events there could be errors, so we should be able to provide a success or an error notification.
Proposed API
Adding a WithHealthMonitor
API to the GetChangeFeedProcessorBuilder
:
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
.GetChangeFeedProcessorBuilder<ToDoItem>(processorName: "changeFeedSample", HandleChangesAsync)
.WithInstanceName("consoleHost")
.WithHealthMonitor(healthMonitor)
.WithLeaseContainer(leaseContainer)
.Build();
Where the HealthMonitor could have an API like:
public enum ChangeFeedProcessorEvent
{
AcquireLease,
ReleaseLease,
CheckpointLease,
ReadingChangeFeedLease,
ProcessingLease
}
public abstract class ChangeFeedProcessorHealthMonitor
{
// For normal informational events happening on the context of a lease
public virtual Task NotifyInformationAsync(ChangeFeedProcessorEvent event, string LeaseToken) => Task.CompletedTask;
// For transient-non critical errors
public virtual Task NotifyErrorAsync(ChangeFeedProcessorEvent event, string LeaseToken, Exception error) => Task.CompletedTask;
// For critical errors that are affecting the CFP instance
public virtual Task NotifyCriticalAsync(ChangeFeedProcessorEvent event, string LeaseToken, Exception error) => Task.CompletedTask;
}
Impact
Such a feature would make troubleshooting issues much faster, not only issues like #1780, but any other incident where the user is currently blind and needs to enable traces to know what is going on.
Scenarios
- As a user, I want to know when a lease is acquired by a particular Instance/machine and when it starts processing. Knowing which machine is handling it can help me debug situations when I'm not seeing changes being processed.
- As a user, I want to react on errors that are happening during processing the Change Feed, unhandled exceptions or user-level errors (like serialization) that are otherwise hidden by the infinite-retry mechanics of CFP. Cases where there is a CosmosException with StatusCode 404 for example (container was deleted) are also good to identify to stop the processor.
- As a user, understanding how leases are rebalanced by knowing when an Instance releases a lease and another instance acquires it is important to track how I am using these instances (or maybe underusing).
- As a user, knowing when the CFP has delivered the changes to my delegate (and is now my responsibility to process it) is important. Sometimes when errors happen, the division between the code reading the Change Feed and the code executing the delegate is not clear enough, having an event that tells me that changes were delivered or lease was checkpointed clearly indicates where the issue could be (if I get a log that changes were delivered and then an error happened, then the issue is in the delegate).
Examples
string instanceName = "my-machine-name";
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
.GetChangeFeedProcessorBuilder<ToDoItem>(processorName: "changeFeedSample", HandleChangesAsync)
.WithInstanceName(instanceName)
.WithHealthMonitor(healthMonitor)
.WithLeaseContainer(leaseContainer)
.Build();
public class MyHealthMonitor : ChangeFeedProcessorHealthMonitor
{
public Task NotifyInformationAsync(ChangeFeedProcessorEvent event, string LeaseToken)
{
event switch
{
AcquireLease => Console.WriteLine($"Instance {instanceName} started processing lease {LeaseToken}"),
ReleaseLease => Console.WriteLine($"Instance {instanceName} has released lease {LeaseToken}"),
CheckpointLease => Console.WriteLine($"Instance {instanceName} checkpointed lease {LeaseToken}"),
ReadingChangeFeedLease => Console.WriteLine($"Instance {instanceName} read new changes for lease {LeaseToken}"),
ProcessingLease => Console.WriteLine($"Instance {instanceName} has sent changes from lease {LeaseToken} to delegate"),
};
}
// For transient-non critical errors
public Task NotifyErrorAsync(ChangeFeedProcessorEvent event, string LeaseToken, Exception error)
{
event switch
{
AcquireLease => Console.WriteLine($"Instance {instanceName} failed acquiring lease {LeaseToken} due to {error}"),
ReleaseLease => Console.WriteLine($"Instance {instanceName} released lease {LeaseToken} due to {error}"),
CheckpointLease => Console.WriteLine($"Instance {instanceName} failed checkpointing lease {LeaseToken} due to {error}"),
ProcessingLease => Console.WriteLine($"Instance {instanceName} failed processing {LeaseToken} due to {error}"),
ReadingChangeFeedLease => Console.WriteLine($"Instance {instanceName} failed to read new changes for lease {LeaseToken} due to {error}"),
};
}
// For critical errors that are affecting the CFP instance
public Task NotifyCriticalAsync(ChangeFeedProcessorEvent event, string LeaseToken, Exception error)
{
event switch
{
AcquireLease => Console.WriteLine($"Instance {instanceName} failed acquiring lease {LeaseToken} due to {error}"),
ReleaseLease => Console.WriteLine($"Instance {instanceName} released lease {LeaseToken} due to {error}"),
CheckpointLease => Console.WriteLine($"Instance {instanceName} failed checkpointing lease {LeaseToken} due to {error}"),
ProcessingLease => Console.WriteLine($"Instance {instanceName} failed processing {LeaseToken} due to {error}"),
ReadingChangeFeedLease => Console.WriteLine($"Instance {instanceName} failed to read new changes for lease {LeaseToken} due to {error}"),
};
}
}