-
Notifications
You must be signed in to change notification settings - Fork 295
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Modifies WorkItem renew logic to start running after item is fetched. #1168
base: main
Are you sure you want to change the base?
Conversation
@microsoft-github-policy-service agree company="Microsoft" |
Bumping this up for review :) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for the delay. Left some notes, overall the work makes sense based on what I had originally suggested here, but now that I'm reviewing the code and looking at the logic more closely, I don't think this really gets to the root problem. Sorry, I think I misunderstood the original problem.
Let me explain. In WorkItemDispatcher.DispatchAsync
, we have we in line 227 the logic for ensuring we don't exceed our concurrency limits:
if (!await this.concurrencyLock.WaitAsync(TimeSpan.FromSeconds(5)))
In line 262 we have the FetchWorkItem
invocation. At that point ,we have already guaranteed that we're not going over the concurrency limit, so it doesn't make a big difference whether we start renewing there, or a bit later in ProcessWorkItemAsync
. In both cases, we have already satisfied the check in line 227 (the .WaitAsync
). So it's "too late".
I think the issue is the FetchWorkItem
async is simply pulling of messages already read from the queues, and it is those messages that may have already expired (because they weren't getting renewe until after we call FetchWorkItem
). So it feels to me that the solution really is to guarantee the renewing is ocurring as soon as messages are obtained from storage, in GetMessageAsync
here:
durabletask/src/DurableTask.AzureStorage/Storage/Queue.cs
Lines 88 to 99 in 2795981
public async Task<QueueMessage?> GetMessageAsync(TimeSpan visibilityTimeout, CancellationToken cancellationToken = default) | |
{ | |
QueueMessage message = await this.queueClient.ReceiveMessageAsync(visibilityTimeout, cancellationToken).DecorateFailure(); | |
if (message == null) | |
{ | |
return null; | |
} | |
this.stats.MessagesRead.Increment(); | |
return message; | |
} |
Please let me know if this doesn't make sense, I'm happy to clarify. In any case, I need to think about the design a bit more, if it ends up being too complicated, I may not want to tackle this edge case just yet. I'm happy to hear your ideas if the problem makes sense though, please let me know
<!DOCTYPE html> | ||
<!-- saved from url=(0014)about:internet --> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, what is this file?
In any case, can we please delete it. Doesn't seem to be part of our codebase. I must admit I'm not super comfortable with the strangely encoded URIs referenced in here, but they may be benign.
/// <summary> | ||
/// The datetime this orchestration work item is locked until | ||
/// </summary> | ||
public DateTime LockedUntilUtc; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this is a public class, I don't think we can remove types here without breaking our dependent packages, and possibly some customers. We can choose to deprecate some of these fields, but not remove them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nevermind, I see this was added to WorkItemBase
/// <summary> | ||
/// The datetime this work item is locked until | ||
/// </summary> | ||
public DateTime LockedUntilUtc; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same note as before - we should be careful not to remove fields since this is a public class, that would be a breaking change
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nevermind, I see this was added to WorkItemBase
@@ -221,7 +221,8 @@ public ServiceBusOrchestrationService( | |||
"TrackingDispatcher", | |||
item => item == null ? string.Empty : item.InstanceId, | |||
FetchTrackingWorkItemAsync, | |||
ProcessTrackingWorkItemAsync) | |||
ProcessTrackingWorkItemAsync, | |||
(T,token) => Task.CompletedTask) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you please explain why this is the implementation for ServiceBus? Not sure I follow why we can just return Task.CompletedTask
@@ -635,6 +613,7 @@ static OrchestrationExecutionContext GetOrchestrationExecutionContext(Orchestrat | |||
static TimeSpan MinRenewalInterval = TimeSpan.FromSeconds(5); // prevents excessive retries if clocks are off | |||
static TimeSpan MaxRenewalInterval = TimeSpan.FromSeconds(30); | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please remove this newline
Func<T, string> workItemIdentifier, | ||
Func<TimeSpan, CancellationToken, Task<T>> fetchWorkItem, | ||
Func<T, Task> processWorkItem) | ||
Func<T, Task> processWorkItem, | ||
Func<T, CancellationToken, Task> renewWorkItem) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since this is a public class, we should avoid adding a new manadatory parameter, that will break existing implementations. Please make this optional, or create another constructor
Fixes #1150
The summary of changes is as follows:
WorkItemBase
to extract common properties for Work Items.WorkItemDispatcher
.Open Question:
Do we want to add a renewal policy for Tracking store items too? If not, why?