Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Modifies WorkItem renew logic to start running after item is fetched. #1168

Open
wants to merge 5 commits into
base: main
Choose a base branch
from

Conversation

sig5
Copy link

@sig5 sig5 commented Sep 23, 2024

Fixes #1150
The summary of changes is as follows:

  1. Adds a WorkItemBase to extract common properties for Work Items.
  2. Pulls the logic to renew work item lock in WorkItemDispatcher.

Open Question:
Do we want to add a renewal policy for Tracking store items too? If not, why?

@sig5
Copy link
Author

sig5 commented Sep 23, 2024

@microsoft-github-policy-service agree company="Microsoft"

@sig5
Copy link
Author

sig5 commented Oct 4, 2024

Bumping this up for review :)

Copy link
Collaborator

@davidmrdavid davidmrdavid left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the delay. Left some notes, overall the work makes sense based on what I had originally suggested here, but now that I'm reviewing the code and looking at the logic more closely, I don't think this really gets to the root problem. Sorry, I think I misunderstood the original problem.

Let me explain. In WorkItemDispatcher.DispatchAsync, we have we in line 227 the logic for ensuring we don't exceed our concurrency limits:

if (!await this.concurrencyLock.WaitAsync(TimeSpan.FromSeconds(5)))

In line 262 we have the FetchWorkItem invocation. At that point ,we have already guaranteed that we're not going over the concurrency limit, so it doesn't make a big difference whether we start renewing there, or a bit later in ProcessWorkItemAsync. In both cases, we have already satisfied the check in line 227 (the .WaitAsync). So it's "too late".

I think the issue is the FetchWorkItem async is simply pulling of messages already read from the queues, and it is those messages that may have already expired (because they weren't getting renewe until after we call FetchWorkItem). So it feels to me that the solution really is to guarantee the renewing is ocurring as soon as messages are obtained from storage, in GetMessageAsync here:

public async Task<QueueMessage?> GetMessageAsync(TimeSpan visibilityTimeout, CancellationToken cancellationToken = default)
{
QueueMessage message = await this.queueClient.ReceiveMessageAsync(visibilityTimeout, cancellationToken).DecorateFailure();
if (message == null)
{
return null;
}
this.stats.MessagesRead.Increment();
return message;
}

Please let me know if this doesn't make sense, I'm happy to clarify. In any case, I need to think about the design a bit more, if it ends up being too complicated, I may not want to tackle this edge case just yet. I'm happy to hear your ideas if the problem makes sense though, please let me know

Comment on lines +1 to +2
<!DOCTYPE html>
<!-- saved from url=(0014)about:internet -->
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, what is this file?
In any case, can we please delete it. Doesn't seem to be part of our codebase. I must admit I'm not super comfortable with the strangely encoded URIs referenced in here, but they may be benign.

/// <summary>
/// The datetime this orchestration work item is locked until
/// </summary>
public DateTime LockedUntilUtc;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is a public class, I don't think we can remove types here without breaking our dependent packages, and possibly some customers. We can choose to deprecate some of these fields, but not remove them.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nevermind, I see this was added to WorkItemBase

/// <summary>
/// The datetime this work item is locked until
/// </summary>
public DateTime LockedUntilUtc;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same note as before - we should be careful not to remove fields since this is a public class, that would be a breaking change

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nevermind, I see this was added to WorkItemBase

@@ -221,7 +221,8 @@ public ServiceBusOrchestrationService(
"TrackingDispatcher",
item => item == null ? string.Empty : item.InstanceId,
FetchTrackingWorkItemAsync,
ProcessTrackingWorkItemAsync)
ProcessTrackingWorkItemAsync,
(T,token) => Task.CompletedTask)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you please explain why this is the implementation for ServiceBus? Not sure I follow why we can just return Task.CompletedTask

@@ -635,6 +613,7 @@ static OrchestrationExecutionContext GetOrchestrationExecutionContext(Orchestrat
static TimeSpan MinRenewalInterval = TimeSpan.FromSeconds(5); // prevents excessive retries if clocks are off
static TimeSpan MaxRenewalInterval = TimeSpan.FromSeconds(30);

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please remove this newline

Func<T, string> workItemIdentifier,
Func<TimeSpan, CancellationToken, Task<T>> fetchWorkItem,
Func<T, Task> processWorkItem)
Func<T, Task> processWorkItem,
Func<T, CancellationToken, Task> renewWorkItem)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since this is a public class, we should avoid adding a new manadatory parameter, that will break existing implementations. Please make this optional, or create another constructor

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Consider starting the renewal of messages in DTFx.Core as soon as they are fetched
2 participants