Skip to content

Change Feed Processor: Add support for manual checkpoint #1765

Closed

Description

Full detail of user request is on #616

TL;DR

The request is to add manual checkpoint as a configuration option to the Change Feed Processor, this was a feature already available in V2 CFP through a configuration option.
The lack of this feature is a migration blocker for users using it on CFP V2.

There are potentially 2 options that differ in the public API:

Option 1 - Have a Context object

How will the user enable the configuration

There will be an explicit handler signature that defines the intent of the user wanting to do manual checkpoint:

ChangeFeedProcessor changeFeedProcessor = sourceContainer
            .GetChangeFeedProcessorBuilder<ToDoItem>(processorName: "myProcessor", ChangesHandlerForManualCheckpoint)
                .WithInstanceName("myInstanceName")
                .WithLeaseContainer(leaseContainer)
                .Build();

Where the delegate ChangesHandlerForManualCheckpoint would be:

        public delegate Task ChangesHandlerForManualCheckpoint<T>(
            ChangeFeedProcessorContextWithManualCheckpoint context,
            IReadOnlyCollection<T> changes,
            CancellationToken cancellationToken);

Additionally we can expose the new context data, minus the Checkpoint API through:

        public delegate Task ChangesHandler<T>(
            ChangeFeedProcessorContextcontext,
            IReadOnlyCollection<T> changes,
            CancellationToken cancellationToken);

How will the user interact with the context

Similar to V2 CFP, the Handler will receive a Context object in the handler:

    public abstract class ChangeFeedProcessorContext
    {
        /// <summary>
        /// Gets the token representative of the current lease from which the changes come from.
        /// </summary>
        public abstract string LeaseToken { get; }
        
        /// <summary>
        /// Gets the diagnostics related to the service response.
        /// </summary>
        public abstract CosmosDiagnostics Diagnostics { get; }

        /// <summary>
        /// Gets the headers related to the service response that provided the changes.
        /// </summary>
        public abstract Headers Headers{ get; }
    }

    public abstract class ChangeFeedProcessorContextWithManualCheckpoint : ChangeFeedProcessorContext
    {
        /// <summary>
        /// Checkpoints progress of a stream. This method is valid only if manual checkpoint was configured.
        /// Client may accept multiple change feed batches to process in parallel.
        /// Once first N document processing was finished the client can call checkpoint on the last completed batches in the row.
        /// </summary>
        public abstract Task<(bool isSuccess, CosmosException error)> TryCheckpointAsync();
    }
  • The LeaseToken will help with users who want to send monitoring/diagnosing information to understand which is the lease where the changes are coming from. It also helps to send telemetry that identifies which leases are being processed.
  • Headers gives access to all the response headers, including SessionToken. Is useful if the user is sending the changes to another system that has a different client instance. For example, (real customer scenario) once the changes are received, they are sent to a Queue where they get picked up with another application and used the information to read the document. If the user is on Session consistency, without the Session Token, those reads might fail with a 404.
  • TryCheckpointAsync is the API meant to be called after the user's custom logic has decided that it is time to checkpoint when manual checkpoint is enabled.

Can the checkpoint fail?

Yes, there is a possibility of the call to TryCheckpointAsync to fail in expected scenarios:

  • The current lease has been deleted externally (this is not a common scenario but if the user manually deletes the documents in the lease store, it can happen).
  • The lease was acquired by another host. In a load-balancing scenario it can occur that the lease has been taken by another host.

How would the user call the CheckpointAsync?

public async Task HandleChangesAsync(
    ChangeFeedProcessorContextWithManualCheckpoint context,
    IReadOnlyCollection<ToDoItem> changes, 
    CancellationToken cancellationToken)
{
    Console.WriteLine($"Started handling changes for lease {context.LeaseToken}...");
    foreach (ToDoItem item in changes)
    {
        // put changes in some buffer using context.LeaseToken as buffer key
    }
    
    if (buffer[context.LeaseToken].Size == expectedMax)
    {
        (bool isSuccess, CosmosException error) = await context.TryCheckpointAsync()
        if (!isSuccess)
        {
                 Console.WriteLine($"Checkpoint failed for {context.LeaseToken} due to the lease being transfered to another host.");
                // log exception if desired
                throw error; //to stop the processing
        }
    }

    Console.WriteLine($"Finished handling changes for lease {context.LeaseToken}.");
}

Option 2 - Rely on FeedResponse

ChangeFeedProcessor changeFeedProcessor = sourceContainer
            .GetChangeFeedProcessorBuilder<ToDoItem>(processorName: "myProcessor", ChangesHandlerForManualCheckpoint)
                .WithInstanceName("myInstanceName")
                .WithLeaseContainer(leaseContainer)
                .Build();

Where the delegate ChangesHandlerForManualCheckpoint would be:

        public delegate Task ChangesHandlerForManualCheckpoint<T>(
            FeedResponse<T> feedResponse,
            string leaseToken,
            Func<Task<(bool isSuccess, CosmosException error)>> TryCheckpointAsync
            CancellationToken cancellationToken);
  • The leaseToken will help with users who want to send monitoring/diagnosing information to understand which is the lease where the changes are coming from. It also helps to send telemetry that identifies which leases are being processed.
  • FeedResponse gives access to all the response headers, diagnostics, and the content with documents
  • TryCheckpointAsync is the API meant to be called after the user's custom logic has decided that it is time to checkpoint when manual checkpoint is enabled.

How would the user call the CheckpointAsync?

public async Task HandleChangesAsync(
     FeedResponse<ToDoItem> feedResponse,
            string leaseToken,
            Func<Task<(bool isSuccess, CosmosException error)>> TryCheckpointAsync
            CancellationToken cancellationToken)
{
    Console.WriteLine($"Started handling changes for lease {leaseToken}...");
    foreach (ToDoItem item in feedResponse)
    {
        // put changes in some buffer using leaseToken as buffer key
    }
    
    if (buffer[leaseToken].Size == expectedMax)
    {
        (bool isSuccess, CosmosException error) = await TryCheckpointAsync()
        if (!isSuccess)
        {
                 Console.WriteLine($"Checkpoint failed for {leaseToken} due to the lease being transfered to another host.");
                // log exception if desired
                throw error; //to stop the processing
        }
    }

    Console.WriteLine($"Finished handling changes for lease {leaseToken}.");
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Metadata

Assignees

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions