Skip to content

Change Feed Processor: Health Monitoring #2501

Closed

Description

Summary

Change Feed Processor consists of 3 main components:

  • The monitored container: The source of the events that is read through Change Feed
  • The lease container: The storage of leases and state
  • The CFP host application: The application that decides what to do with the changes (user logic).

Change Feed Processor works as a background service that spins up Tasks do interact with these components and these Tasks do not run on the main context of the CFP host application.

Example of component interactions are:

  • Acquiring leases from the lease container
  • Reading the Change Feed from the monitored container
  • Updating the leases on the lease container

Each of them could fail, and while the CFP implementation retries, there is no indication (unless the user is capturing Traces) of what is going on.

It would be beneficial to provide some way of notification on errors or events on the different levels so users can:

  • Hook logging and telemetry for informative purposes
  • Detect critical failures (for example, some container was deleted)

Why is this needed

While we do have Traces that show these events or issues, it requires users to hook TraceListeners. In a NETFX application, this can be done on the app.config or web.config, and in a NET Core application, requires initialization code. Which is all cumbersome and does not allow for the user to take action on any of these events (for example, if there is a NotFound error on a Change Feed read, it means the monitored container was deleted, and the user should stop the CFP instance and do something like recreate the container)

Migration from V2

V2 CFP has a Health Monitor API in the form of https://docs.microsoft.com/en-us/dotnet/api/microsoft.azure.documents.changefeedprocessor.changefeedprocessorbuilder.withhealthmonitor?view=azure-dotnet

public Microsoft.Azure.Documents.ChangeFeedProcessor.ChangeFeedProcessorBuilder WithHealthMonitor (Microsoft.Azure.Documents.ChangeFeedProcessor.Monitoring.IHealthMonitor healthMonitor);

Which exposes events with this structure:

image

What type of events should we track

The relevant events are:

  • Lease acquire -> Means the instance has started processing that particular lease. Useful during rebalancing and initialization
  • Lease release -> Means the instance stopped processing the lease.
  • Processing changes -> Some customers when reporting issues on CFP say that they are not receiving changes, when the issue is that their Handler logic is throwing unhandled exceptions. Being able to confirm that we are indeed delivering the changes would rule out issues on the library.
  • Lease updated/checkpointed -> Signals batch completion

In any of these events there could be errors, so we should be able to provide a success or an error notification.

Proposed API

Adding a WithHealthMonitor API to the GetChangeFeedProcessorBuilder:

ChangeFeedProcessor changeFeedProcessor = monitoredContainer
        .GetChangeFeedProcessorBuilder<ToDoItem>(processorName: "changeFeedSample", HandleChangesAsync)
            .WithInstanceName("consoleHost")
            .WithHealthMonitor(healthMonitor)
            .WithLeaseContainer(leaseContainer)
            .Build();

Where the HealthMonitor could have an API like:

public enum ChangeFeedProcessorEvent 
{
    AcquireLease,
    ReleaseLease,
    CheckpointLease,
    ReadingChangeFeedLease,
    ProcessingLease
}

public abstract class ChangeFeedProcessorHealthMonitor
{
    // For normal informational events happening on the context of a lease
    public virtual Task NotifyInformationAsync(ChangeFeedProcessorEvent event, string LeaseToken) => Task.CompletedTask;
    
    // For transient-non critical errors
    public virtual Task NotifyErrorAsync(ChangeFeedProcessorEvent event, string LeaseToken, Exception error) => Task.CompletedTask;

    // For critical errors that are affecting the CFP instance
    public virtual Task NotifyCriticalAsync(ChangeFeedProcessorEvent event, string LeaseToken, Exception error) => Task.CompletedTask;
}

Impact

Such a feature would make troubleshooting issues much faster, not only issues like #1780, but any other incident where the user is currently blind and needs to enable traces to know what is going on.

Scenarios

  • As a user, I want to know when a lease is acquired by a particular Instance/machine and when it starts processing. Knowing which machine is handling it can help me debug situations when I'm not seeing changes being processed.
  • As a user, I want to react on errors that are happening during processing the Change Feed, unhandled exceptions or user-level errors (like serialization) that are otherwise hidden by the infinite-retry mechanics of CFP. Cases where there is a CosmosException with StatusCode 404 for example (container was deleted) are also good to identify to stop the processor.
  • As a user, understanding how leases are rebalanced by knowing when an Instance releases a lease and another instance acquires it is important to track how I am using these instances (or maybe underusing).
  • As a user, knowing when the CFP has delivered the changes to my delegate (and is now my responsibility to process it) is important. Sometimes when errors happen, the division between the code reading the Change Feed and the code executing the delegate is not clear enough, having an event that tells me that changes were delivered or lease was checkpointed clearly indicates where the issue could be (if I get a log that changes were delivered and then an error happened, then the issue is in the delegate).

Examples

string instanceName = "my-machine-name";

ChangeFeedProcessor changeFeedProcessor = monitoredContainer
	.GetChangeFeedProcessorBuilder<ToDoItem>(processorName: "changeFeedSample", HandleChangesAsync)
		.WithInstanceName(instanceName)
		.WithHealthMonitor(healthMonitor)
		.WithLeaseContainer(leaseContainer)
		.Build();

public class MyHealthMonitor : ChangeFeedProcessorHealthMonitor
{
    public Task NotifyInformationAsync(ChangeFeedProcessorEvent event, string LeaseToken)
	{
		event switch
		{
			AcquireLease => Console.WriteLine($"Instance {instanceName} started processing lease {LeaseToken}"),
			ReleaseLease => Console.WriteLine($"Instance {instanceName} has released lease {LeaseToken}"),
			CheckpointLease => Console.WriteLine($"Instance {instanceName} checkpointed lease {LeaseToken}"),
			ReadingChangeFeedLease => Console.WriteLine($"Instance {instanceName} read new changes for lease {LeaseToken}"),
			ProcessingLease => Console.WriteLine($"Instance {instanceName} has sent changes from lease {LeaseToken} to delegate"),
		};
	}
    
    // For transient-non critical errors
    public Task NotifyErrorAsync(ChangeFeedProcessorEvent event, string LeaseToken, Exception error)
	{
		event switch
		{
			AcquireLease => Console.WriteLine($"Instance {instanceName} failed acquiring lease {LeaseToken} due to {error}"),
			ReleaseLease => Console.WriteLine($"Instance {instanceName} released lease {LeaseToken} due to {error}"),
			CheckpointLease => Console.WriteLine($"Instance {instanceName} failed checkpointing lease {LeaseToken} due to {error}"),
			ProcessingLease => Console.WriteLine($"Instance {instanceName} failed processing {LeaseToken} due to {error}"),
			ReadingChangeFeedLease => Console.WriteLine($"Instance {instanceName} failed to read new changes for lease {LeaseToken} due to {error}"),
		};
	}

    // For critical errors that are affecting the CFP instance
    public Task NotifyCriticalAsync(ChangeFeedProcessorEvent event, string LeaseToken, Exception error)
	{
		event switch
		{
			AcquireLease => Console.WriteLine($"Instance {instanceName} failed acquiring lease {LeaseToken} due to {error}"),
			ReleaseLease => Console.WriteLine($"Instance {instanceName} released lease {LeaseToken} due to {error}"),
			CheckpointLease => Console.WriteLine($"Instance {instanceName} failed checkpointing lease {LeaseToken} due to {error}"),
			ProcessingLease => Console.WriteLine($"Instance {instanceName} failed processing {LeaseToken} due to {error}"),
			ReadingChangeFeedLease => Console.WriteLine($"Instance {instanceName} failed to read new changes for lease {LeaseToken} due to {error}"),
		};
	}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions