Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Preview] ChangeFeedProcessor: Adds support for manual checkpoint, context, and stream #2331

Merged
merged 16 commits into from
Apr 2, 2021

Conversation

ealsur
Copy link
Member

@ealsur ealsur commented Mar 23, 2021

Description

This PR builds on top of the great work that @fuocor did in #888. That PR was stale and not passing tests, but had goodness in the feature it was bringing. I could not cherry-pick from a fork but acknowledge will go to the author on the changelog notes 😄

New delegates

We previously had a single delegate to process changes:

public delegate Task ChangesHandler<T>(
	IReadOnlyCollection<T> changes,
	CancellationToken cancellationToken);

In order to address the requests in the linked incidents, we are introducing 4 new delegates:

// Adds the context, similar to the existing handler
public delegate Task ChangeFeedHandler<T>(
	ChangeFeedProcessorContext context,
	IReadOnlyCollection<T> changes,
	CancellationToken cancellationToken);

// Adds support for manual checkpoint on the typed API
public delegate Task ChangeFeedHandlerWithManualCheckpoint<T>(
	ChangeFeedProcessorContext context,
	IReadOnlyCollection<T> changes,
	Func<Task<(bool isSuccess, Exception error)>> tryCheckpointAsync,
	CancellationToken cancellationToken);

// Adds support for stream handling
public delegate Task ChangeFeedStreamHandler(
	ChangeFeedProcessorContext context,
	Stream changes,
	CancellationToken cancellationToken);

// Adds supports for manual checkpointing with stream handling
public delegate Task ChangeFeedStreamHandlerWithManualCheckpoint(
	ChangeFeedProcessorContext context,
	Stream changes,
	Func<Task<(bool isSuccess, Exception error)>> tryCheckpointAsync,
	CancellationToken cancellationToken);

Container API changes

public abstract ChangeFeedProcessorBuilder GetChangeFeedProcessorBuilder<T>(
	string processorName,
	ChangeFeedHandler<T> onChangesDelegate);

public abstract ChangeFeedProcessorBuilder GetChangeFeedProcessorBuilderWithManualCheckpoint<T>(
	string processorName,
	ChangeFeedHandlerWithManualCheckpoint<T> onChangesDelegate);

public abstract ChangeFeedProcessorBuilder GetChangeFeedProcessorBuilder(
	string processorName,
	ChangeFeedStreamHandler onChangesDelegate);

public abstract ChangeFeedProcessorBuilder GetChangeFeedProcessorBuilderWithManualCheckpoint(
	string processorName,
	ChangeFeedStreamHandlerWithManualCheckpoint onChangesDelegate);

Stream support

Stream APIs allow customers to send the data to other components without serializing it on the Change Feed Processor, this is possible through 2 of the new delegates, the most common one that will handle checkpointing for the user:

public delegate Task ChangeFeedStreamHandler(
	ChangeFeedProcessorContext context,
	Stream changes,
	CancellationToken cancellationToken);

And one where the use can manage checkpoint themselves:

public delegate Task ChangeFeedStreamHandlerWithManualCheckpoint(
	ChangeFeedProcessorContext context,
	Stream changes,
	Func<Task<(bool isSuccess, Exception error)>> tryCheckpointAsync,
	CancellationToken cancellationToken);

How to leverage manual checkpoint?

There isn't any extra configuration required on the ChangeFeedProcessorBuilder other than using the delegate that exposes the checkpoint logic. Choosing this delegate is an implicit opt-in on the logic.

This enables users to decide when to checkpoint the progress of a lease. Some users decide to do buffering of changes and once the buffer commits (processes all the changes), then they decide to checkpoint.

Any of the 2 delegates that expose the checkpoint mechanism allows for this, via the call to tryCheckpointAsync, like so:

ChangeFeedProcessor processor = container
	.GetChangeFeedProcessorBuilderWithManualCheckpoint("myProcessorName", 
		async (ChangeFeedProcessorContext context, IReadOnlyCollection<T> changes, Func<Task<(bool isSuccess, Exception error)>> tryCheckpointAsync, CancellationToken token) =>
	{
		// manage and process the changes

		// on some condition, trigger the checkpoint
		(bool isSuccess, Exception exception) = await tryCheckpointAsync();
		if (!isSuccess)
		{
			// log exception.ToString()
			throw exception;
		}
	})
	.WithInstanceName("<instance name>")
	.WithLeaseContainer(leaseContainer).Build();

The call can fail, so checking isSuccess is relevant to understand the failure. There are cases where checkpoint can fail due to load balancing (yielding a 412 - Precondition failed error), or there could be other issues (the lease collection could have been deleted for example).

What is there in the context?

    public abstract class ChangeFeedProcessorContext
    {
        /// <summary>
        /// Gets the token representative of the current lease from which the changes come from.
        /// </summary>
        public abstract string LeaseToken { get; }

        /// <summary>
        /// Gets the diagnostics related to the service response.
        /// </summary>
        public abstract CosmosDiagnostics Diagnostics { get; }

        /// <summary>
        /// Gets the headers related to the service response that provided the changes.
        /// </summary>
        public abstract Headers Headers { get; }
    }

The context includes information related to changes delivered on the delegate, it includes the identifier of which lease they belong to (this is useful paired with the Container.ChangeFeedEstimator for monitoring). the Headers (that expose RU consumption and SessionToken for extending the session to other components), and the Diagnostics.

Relevant files for reviewing

The size of this PR is rather large but mainly due to refactoring required to support Stream handlers (removing <T> from multiple classes), I took the opportunity to cleanup old code that was never used at all but was there when we migrated the code from the original CFP.

The most important files are:

  • Container.cs
  • ChangeFeedProcessorContext.cs
  • ChangeFeedObserverContextCore.cs (particularly the handling of exceptions on checkpointing and exposing the diagnostics and headers)

Type of change

  • New feature (non-breaking change which adds functionality)

Closing issues

Closes #1765
Closes #616
Closes #865
Closes #400
Closes #1122

@ealsur
Copy link
Member Author

ealsur commented Mar 24, 2021

@bartelink Would love to hear your feedback on the proposed APIs in this PR

@bartelink
Copy link
Contributor

bartelink commented Mar 25, 2021

Firstly, thanks so much for prioritizing this work. I'm really looking forward to getting off a parked 2018 era timebomb, having less need to manage code and prod instance of apps using two Cosmos SDKs in the same process, and being able to participate more meaningfully in providing proper feedback on the actual codebase the team is maintaining.

It seems to me that having a divide between .WithChangesHandler and WithChangesHandlerWithoutImplicitAutoCheckpointing in the wiring is the best way to minimize the number of overloads and make the contract unambiguous (I can picture people mistakenly thinking when using intellisense at speed that a tryCheckpoint argument might be a way to optionally bring forward checkpointing without the transition in semantics).

I personally think the Delegates are the way to go for the API design (It should be mentioned that I mainly program in F# so delegates are may feel inordinately comfortable with compared to the average dev.). Spawning lots of types makes it much harder to grasp that lie of the land for a newcomer and makes the API docs messy (lots of undocumentation boilerplate), i.e. you want to convey that there are two major axes of choice:

  1. Manual vs Implicit checkpoints
  2. Stream vs strongly typed. For me the first one is pretty critical to convey, whereas the other axis is going to be fairly obvious - having overloads to pick from is a good way to manage it.

@bartelink
Copy link
Contributor

Also, thanks for taking the time to write up the overview as you normally do. I watch this repo and scan all issues, with varying degrees of attention depending on time of day and workload etc. I thought I'd raise this point with regard to terminology though...

This is actually the first time that I got what you were talking about with respect to the term Stream - I had always assumed you were building something that lets you filter by key / logical partition identifier (in my head 'stream' maps to 'logical partition'; stream is a well established term in event sourcing with this connotation, and I'd venture that the term has similar implications for someone that's been exposed to Kafka and/or the Designing Data Intensive applications book).

While Stream may make sense to you and/or be consistently used across the SDK, I'm not sure thats a universally well known term for deferring deserialization and/or propagating the document content without strongly typing and/or deserializing it first. Qualifying it by either using a term for the strong type binding/deserialization mechanism and/or one for the technique of deferring it and instead propagate the raw content might help learnability of the SDK

@ealsur
Copy link
Member Author

ealsur commented Mar 25, 2021

@bartelink Regarding your comment of:

It seems to me that having a divide between .WithChangesHandler and WithChangesHandlerWithoutImplicitAutoCheckpointing in the wiring is the best way to minimize the number of overloads and make the contract unambiguous (I can picture people mistakenly thinking when using intellisense at speed that a tryCheckpoint argument might be a way to optionally bring forward checkpointing without the transition in semantics).

What you mean is that the GetChangeFeedProcessorBuilder call, that takes the delegates, would be better like this:

public abstract ChangeFeedProcessorBuilder GetChangeFeedProcessorBuilder<T>(
	string processorName,
	ChangeFeedHandler<T> onChangesDelegate);

public abstract ChangeFeedProcessorBuilder GetChangeFeedProcessorBuilderWithManualCheckpoint<T>(
	string processorName,
	ChangeFeedHandlerWithManualCheckpoint<T> onChangesDelegate);

public abstract ChangeFeedProcessorBuilder GetChangeFeedProcessorBuilder(
	string processorName,
	ChangeFeedStreamHandler onChangesDelegate);

public abstract ChangeFeedProcessorBuilder GetChangeFeedProcessorBuilderWithManualCheckpoint(
	string processorName,
	ChangeFeedStreamHandlerWithManualCheckpoint onChangesDelegate);

Which makes it explicit on the call and Intellisense would not give other options.

CancellationToken cancellationToken);

/// <summary>
/// Delegate to receive the changes within a <see cref="ChangeFeedProcessor"/> execution with manual checkpoint.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to call out more explicitly that no auto-checkpointing is done at all anymore when using this delegate

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea, @bartelink said the same, that is why the Builder is different:

public abstract ChangeFeedProcessorBuilder GetChangeFeedProcessorBuilder<T>(
	string processorName,
	ChangeFeedHandler<T> onChangesDelegate);

public abstract ChangeFeedProcessorBuilder GetChangeFeedProcessorBuilderWithManualCheckpoint<T>(
	string processorName,
	ChangeFeedHandlerWithManualCheckpoint<T> onChangesDelegate);

That way, the delegate can only be used if the user is explicitly calling the builder with manual checkpoint.

Is that good enough?

Copy link
Member

@FabianMeiswinkel FabianMeiswinkel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM - Thanks

@ealsur ealsur merged commit c5560d8 into master Apr 2, 2021
@ealsur ealsur deleted the users/ealsur/cfpmanual branch April 2, 2021 19:11
@fuocor
Copy link
Contributor

fuocor commented Apr 12, 2021

Thanks for getting this done :)
I was wondering when you'll roll it out into a release.

@j82w
Copy link
Contributor

j82w commented Apr 12, 2021

There will be a new release hopefully this week or next week.

@fuocor
Copy link
Contributor

fuocor commented Apr 30, 2021

I don't see it in 3.18.0 :( but found it in 3.19.0-preview :)

@ealsur
Copy link
Member Author

ealsur commented May 1, 2021

@fuocor It's in the preview package, not the GA yet. 3.19.0-preview

@bartelink
Copy link
Contributor

bartelink commented May 9, 2021

🤔 I'm looking for Manual Checkpoint stuff; I didn't see it in the API diffs so didnt try for it in the 3.19.0-preview package. Now that I did, I still don't see it - am I missing something? 🤦 My VPN stopped blocking the package restore and I can see it now, I am a prize idiot, sorry!

While I have your attention though...

Firstly, kudos to all involved in writing https://docs.microsoft.com/en-us/azure/cosmos-db/how-to-migrate-from-change-feed-library

Any hints on how to port conditional parsing of Documents like this ? https://github.com/jet/propulsion/blob/629ea0646a4dddfff5f9ca53e44c1a5dbdaa8379/src/Propulsion.Cosmos/EquinoxCosmosParser.fs#L39-L40

I believe if I just used <Batch>, the conversion would hard-fail in those cases where the precondition would...

In other words, for the the Stream overload, what's the cleanest way to port that code (I'll stick with Newtonsoft.Json for now). It needs to:
a) check for the presence of 4 marker fields (we want to ignore other non-relevant documents in the Container)
b) assuming they are present, trigger the normal parse (it's fine for that to throw if the body is malformed)

@ealsur
Copy link
Member Author

ealsur commented May 10, 2021

@bartelink Glad the migration guide is useful, once these new APIs are GAed, I'll update it further with the new APIs.

If before you were using Document, you can use JObject (same thing) probably? Have you tried it?

@PradSenn
Copy link

The delegate for taking checkpoint (tryCheckpointAsync) is returning result and exception. This contradicts with common async API model in C# where exception are thrown. I don't see a reason why consumers can not handle the exception in normal C# style instead of having to use C style error checking.

@ealsur
Copy link
Member Author

ealsur commented May 18, 2021

@bartelink, @fuocor do you have any opinion regarding the checkpoint API similar to @PradSenn?

@bartelink
Copy link
Contributor

bartelink commented May 18, 2021

The API as it is happens to meet my expectation and requirement exactly given that otherwise the very first thing I'd have to do is wrap the invocation in a try/catch.

Is there precedent elsewhere in the SDK for this? Having said that, the guidelines do suggest to default to throwing.

A final point I'd make is that this particular API, being an advanced one that is intended for people who are going to significant lengths to do explicit checkpoint management in order to achieve better performance, is not the sort where using this newer paradigm is going to cause an issue.

@ealsur
Copy link
Member Author

ealsur commented May 18, 2021

@bartelink - Thanks for the insightful comment!

@PradSenn - My motivation for the design was that not all developers remember or do try/catch, while having an API that explicitly says whether there was an error or not, forces the developer to make a conscious decision.

Changing it to throwing is rather simple, as @bartelink says, this API is advanced enough that either throwing or with the current API, a user should handle it with enough care.

So I guess it's something we can tackle before GA. Could you create an Issue to track it?

@fuocor
Copy link
Contributor

fuocor commented May 20, 2021

I agree with @bartelink.
IMO Try semantics do not throw exceptions. With that said, I don't believe the tryCheckpoint delegate is as a Try in the true sense.

@ealsur
Copy link
Member Author

ealsur commented May 20, 2021

@PradSenn @bartelink @fuocor - Sent #2488 to change the signature based on the feedback

@bartelink
Copy link
Contributor

bartelink commented May 29, 2021

The LeaseToken in the ChangeFeedProcessorContext is described as

Gets the token representative of the current lease from which the changes come from.

For me, this does not (read: did not; had me wondering what I was missing) convey enough information when porting more complex scenarios using the V2 CFP's PartitionKeyRangeId value.

Perhaps it could refer to a shard, physical partition, segment, range or some such synonym ?

I actually think the root problem here is the name - LeaseToken made me think of Session + Token etc not 'shard/partition' + 'identifier'/'key' - while that's probably too late to fix, it is the real answer (sorry I didnt get around ton validating this earlier...)

I appreciate based on #1122 that there are reasons to leave it abstract, but perhaps the relationship could be mentioned or illustrated in https://docs.microsoft.com/en-us/azure/cosmos-db/how-to-migrate-from-change-feed-library ?

I'm also wondering if there is a replacement for the callback that one could previously use to log when a Lease is assigned?

@ealsur
Copy link
Member Author

ealsur commented Jun 1, 2021

@bartelink - I will update the migration docs once the APIs GA.

We want to expand the CFP beyond the Partition restriction, so the abstract is a way to ensure that. Obviously from a migration perspective, yes, the LeaseToken will be equivalent initially, but not be the case forever.
Conceptually CFP generates leases, and the LeaseToken is the identifier. What that Lease represents (a full partition, part of a partition, etc) is something that is "implementation detail".
Right now, it's an undocumented 1 partition = 1 lease, but ideally, if we let users define a custom degree of parallelization, then those leases will represent not a partition, but something else (a range of partition key values).

For the callback - I think #2501 is what you are looking for.

@bartelink
Copy link
Contributor

bartelink commented Jun 1, 2021

Yes I accept your central point, but the Token part of the name is the bit thats most jarring. If it was LeaseElementId, LeaseId or something (I guess the name needs to align with, but not be confusing in the context of it being relative to a LeasePrefix). So, once again, my main point is that Token is not conveying to me 'abstract portion / element / segment / partition / shard of an overall thing that's being represented'

Yes #2501 is correct answer re the callback (I had read that issue but wasn't able to locate it when I was heads down porting). The text in there also does not use this token terminology - I'd again contend that there is missing terminology for 'acquired (leased) segment of a monitored container' ;)

(Also I'm puzzling whether the porting guide should have a 'where has the observer gone' paragraph i.e. TL;DR all changes get supplied to a single callback and are distinguished by the LeaseToken, the Open/Close event hooks are replaced with #2501 )

@ealsur
Copy link
Member Author

ealsur commented Jun 1, 2021

Yeah, I agree, the Token part might not have been the greatest name we could've picked, ~2 years ago sounded good and better than PartitionId, but I see how it can be mistaken with SessionToken or complex to wrap the head around when compared with PartitionId.

Regarding monitoring, ideally the events will also show the LeaseToken that contextually was there when the event happened. Right now it's mostly a design concept I'm playing with, mostly out of trying to help people self-diagnose issues. Something like that API + docs should be all that is needed.

@ealsur
Copy link
Member Author

ealsur commented Jun 29, 2021

@bartelink question on the Open/Close of the observer. Are you currently hooking logic (other than telemetry) to those events? If so, which?

If it's just telemetry, would a non-programmatic way to access that telemetry (such as something like OpenTelemetry wired up through the SDK) be sufficient if those events were raised there?

@bartelink
Copy link
Contributor

bartelink commented Jun 29, 2021

I ported the logic in jet/propulsion#113

See below the fold para for the real answer as to the use case that can't be achieved without an unassigned hook


In terms of the pure telemetry aspect, while I literally can't replicate the logging as it was (clear events as to when a range is assigned and unassigned, being able to identify lease timeouts by seeing storms of those in the log etc). If the assigned/unassign events were hookable, I'd definitely use those, but would not be able to thread them into the logging context hierarchy alongside other activity per lifetime of an observer in the same manner. However, I can deal with that, especially as better thought out OT integration can definitely improve on logging of that kind as a source of troubleshooting context and hints.

The lack of Disposal meant I had to do some messy reworking of the consumption pipeline (slightly concealed by the fact that some preparatory work in in preceding PRs).

I run an Async pumping loop per source partition/shard/token which:
a) feeds observed batches (some of which are ahead of the checkpointed position) into the processing loop, enabling it to steal work from the future if blocked (rate limited) on other tasks
b) manages checkpointing in response to the processor reporting progress (it knows the most recent completed batch and hence can avoid doing redundant checkpointing of batches that have already been superseded)
c) periodically reports throughput, read ahead buffer state, and checkpointing status (e.g. if there is rate limiting on the aux container) to Prometheus

The lack of Disposal (unassign notifications) means I'd need to add timers to dispose buffers and/or back off if I wanted to retain the same resource consumption over time (imagine there being 100 physical partitions and each consumer being bounced around those over time).

(If there's no Disposal event, the lack of an initialization event is not a big difference - I check for initialization of the processing loop for a given assignment and log on first hit.)

However, I guess if I'd never had a Dispose hook, I'd probably have come up with a cleaner and more scalable pumping mechanism

Having said that, the above, while important and slightly painful from a porting perspective, can be worked around

(It should also be noted that it seems the V2 CFP doesn't always trigger Disposal consistently anyway, i.e. the V2 impl may have bugs)


👉 The thing that an unassigned event can uniquely enable is to be able to recall work (batches read ahead and passed to teh processor, but not yet completed and therefore checkpointed) that has been optimistically passed to the processing engine, and/or partially completed. If the CFP reassigns to another node, the current node cannot purge the work and will thus be working in competition with the other node that will next be assigned the partition that got unassigned

Any reporting of assignments/unassignments would only be useful if there was an equivalent sequencing of a) assign b) 0..n changes c) unassign - i.e. providing the info out of band does not really enable the above scenario.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
6 participants