Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added Stream support to ChangeFeedProcessor #888

Closed
wants to merge 34 commits into from

Conversation

fuocor
Copy link
Contributor

@fuocor fuocor commented Oct 9, 2019

Pull Request Template

Description

Added the ability to use a Stream delegate rather than a typed collection in order to provide support for custom serialization and use patterns where deserialization of the items is not required.

Type of change

Added set of classes needed to support the Stream delegate.

Please delete options that are not relevant.

  • Bug fix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds functionality)
  • This change requires a documentation update

Closing issues

#865

Assignee

Richard Fuoco

@msftclas
Copy link

msftclas commented Oct 9, 2019

CLA assistant check
All CLA requirements met.

private ChangeFeedLeaseOptions changeFeedLeaseOptions;
private ChangeFeedProcessorOptions changeFeedProcessorOptions;
private DocumentServiceLeaseStoreManager documentServiceLeaseStoreManager;
private bool initialized = false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please avoid a double initialization. opt for a private instance constructor and a public static CreateAsync method. The member can then be made readonly.

Copy link
Contributor Author

@fuocor fuocor Oct 11, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not catching the intent. The code is a copy of ChangeFeedProcessorCore with only the delegate changing. The upstream call to the constructor is made in ContainerCore.GetChangeFeedProcessorBuilder (which is not async) and only started and stopped through the abstract base class. My reading of the original code was that the initialization was lazy.

this.observerFactory = observerFactory;
}

public void ApplyBuildConfiguration(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add these to the static constructor to avoid extra state changes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if (lease == null)
throw new ArgumentNullException(nameof(lease));

var changeFeedObserver = this.observerFactory.CreateObserver();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please avoid var

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

private readonly CheckpointFrequency checkpointFrequency;
private readonly ChangeFeedObserver observer;
private int processedDocCount;
private DateTime lastCheckpointTime = DateTime.UtcNow;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please set this in the instance constructor instead.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

{
private readonly CheckpointFrequency checkpointFrequency;
private readonly ChangeFeedObserver observer;
private int processedDocCount;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe a long will be more suitable?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed...

{
private readonly PartitionCheckpointer checkpointer;

internal ChangeFeedObserverContextCore(string leaseToken)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

avoid multiple instance constructors and instead have this one call the larger one with default values.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

this.LeaseToken = leaseToken;
}

internal ChangeFeedObserverContextCore(string leaseToken, ResponseMessage feedResponse, PartitionCheckpointer checkpointer)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these all allowed to be null?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question. This is a copy of this, but it sure seems like CheckpointAsync can result in NRE

PartitionCheckpointer checkpointer,
CosmosSerializer cosmosJsonSerializer)
{
this.observer = observer;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

null checks?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done. but you may want to have a look at the original FeedProcessorCore

{
string lastContinuation = this.options.StartContinuation;

while (!cancellationToken.IsCancellationRequested)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a cancellation is requested won't it immediately leave this method instead of throwing the exception?

Copy link
Contributor Author

@fuocor fuocor Oct 11, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure w hat the original intent was/is. This is a copy of FeedProcessorCore intended for use with a stream delegate rather than a typed one

@ealsur
Copy link
Member

ealsur commented Oct 11, 2019

This PR is duplicating a lot of the Change Feed Processor files, which makes maintaining this code in the future very hard.

A better approach would probably be:

  1. Make the base abstract Observer class already receive a Stream on ProcessChangesAsync
  2. Add a new delegate that sends the stream
  3. Have 2 implementations of the Observer, one that serializes like it currently happens in FeedProcessorCore, another that passes the Stream, each would receive a different delegate as part of their constructors
  4. Have the ChangeFeedProcessorBuilder, based on which delegate type is used, create one Observer implementation or the other.

That would reduce the amount of files changed greatly.

@ealsur
Copy link
Member

ealsur commented Oct 11, 2019

@fuocor I wonder, if we fix the Custom Serializer support, would you still need Stream support? ref #865

@fuocor
Copy link
Contributor Author

fuocor commented Oct 11, 2019

@fuocor I wonder, if we fix the Custom Serializer support, would you still need Stream support? ref #865

Yes... I want to work with the batch stream

@fuocor
Copy link
Contributor Author

fuocor commented Oct 11, 2019

This PR is duplicating a lot of the Change Feed Processor files, which makes maintaining this code in the future very hard.

A better approach would probably be:

  1. Make the base abstract Observer class already receive a Stream on ProcessChangesAsync
  2. Add a new delegate that sends the stream
  3. Have 2 implementations of the Observer, one that serializes like it currently happens in FeedProcessorCore, another that passes the Stream, each would receive a different delegate as part of their constructors
  4. Have the ChangeFeedProcessorBuilder, based on which delegate type is used, create one Observer implementation or the other.

That would reduce the amount of files changed greatly.

I was looking at that possibility, but was trying not to be too intrusive on the existing code.
Now that I have my head around the existing code then I will update my code

@@ -28,9 +28,9 @@ public override Task OpenAsync(ChangeFeedObserverContext context)
return Task.CompletedTask;
}

public override Task ProcessChangesAsync(ChangeFeedObserverContext context, IReadOnlyCollection<T> docs, CancellationToken cancellationToken)
public async override Task ProcessChangesAsync(ChangeFeedObserverContext context, Stream stream, CancellationToken cancellationToken)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can still return the task here:

return this.onChanges(stream, cancellation);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the call is using .ConfigureAwait(false) which is not directly returnable.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is my point, it was initially not using ConfigureAwait. Since this is something that will be executed in a thread that is not the main thread, which would be the benefit of ConfigureAwait(false) here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Copy link
Member

@ealsur ealsur left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the refactoring. @kirankumarkolli can you please take a look? Since this involves public API changes

@ealsur ealsur added breaking non-additive functional behavior changes ChangeFeed labels Oct 15, 2019
@ealsur
Copy link
Member

ealsur commented Feb 9, 2021

/azp run

@azure-pipelines
Copy link

Azure Pipelines successfully started running 2 pipeline(s).

Copy link
Member

@ealsur ealsur left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, left some comments regarding some test changes but overall looks fine. Please address them and let's see if the tests pass and we can get it in.

@@ -1038,7 +1069,7 @@
},
"NestedTypes": {}
},
"Microsoft.Azure.Cosmos.ContainerProperties;System.Object;IsAbstract:False;IsSealed:False;IsInterface:False;IsEnum:False;IsClass:True;IsValueType:False;IsNested:False;IsGenericType:False;IsSerializable:False": {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These changes are weird and unrelated

Copy link
Member

@ealsur ealsur left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's verify the contract changes, it seems there are changes in the files that do not align with the expected API changes. @j82w do you know what could be causing it?

@ealsur
Copy link
Member

ealsur commented Feb 10, 2021

/azp run

@azure-pipelines
Copy link

Azure Pipelines successfully started running 2 pipeline(s).

@ealsur
Copy link
Member

ealsur commented Feb 10, 2021

@fuocor
Copy link
Contributor Author

fuocor commented Feb 12, 2021

Tests have been fixed

@ealsur
Copy link
Member

ealsur commented Feb 12, 2021

/azp run

@azure-pipelines
Copy link

Azure Pipelines successfully started running 2 pipeline(s).

@ealsur
Copy link
Member

ealsur commented Mar 11, 2021

/asp run

@ealsur
Copy link
Member

ealsur commented Mar 11, 2021

/azp run

@azure-pipelines
Copy link

Azure Pipelines successfully started running 2 pipeline(s).

@ealsur
Copy link
Member

ealsur commented Mar 23, 2021

@fuocor Tests kept failing on this PR (UTs mainly), I am cherry picking your work into #2331 as we are adding new delegate signatures

ealsur added a commit that referenced this pull request Apr 2, 2021
…ntext, and stream (#2331)

* Porting changes from #888

* Refactoring to enable manual checkpoint

* manual test

* exception test

* Adding stream with manual checkpoint

* new tests

* refactoring contract

* contract

* to Exception

* GetChangeFeedProcessorBuilderWithManualCheckpoint
@ealsur
Copy link
Member

ealsur commented Apr 2, 2021

Closing as the changes were ported to #2331

@ealsur ealsur closed this Apr 2, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
breaking non-additive functional behavior changes ChangeFeed
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants