Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DT.AzureStorage: Runtime de-dupe support for orchestration start events #528

Merged
merged 6 commits into from
Mar 31, 2021

Conversation

cgillum
Copy link
Collaborator

@cgillum cgillum commented Mar 30, 2021

The DT.AzureStorage backend has always been vulnerable to concurrent starts of orchestrations with the same instance ID. This PR finally fixes that, as well as a couple other issues found along the way.

Fixes Azure/azure-functions-durable-extension#1730 (this was the main issue being fixed)
Fixes #527 (found this while working on ^)
Fixes Azure/azure-functions-durable-extension#367 (earlier issue with the same symptoms)
Fixes Azure/azure-functions-durable-extension#1194
Fixes Azure/azure-functions-durable-extension#612

Problem

Often time users want to start an orchestration with a specific instance ID if it's not already started. To do so, they'd write code that looks like this:

var existing = GetOrchestrationInstance(id)
if (existing is null or existing is stopped)
    StartNewOrchestration(id)

The problem with this logic is that it's not thread-safe. If two threads call this logic at the same time, on the same machine or on different machines, it will create to orchestration start messages for the same instance. DT.AzureStorage did not have ideal handling for this. One instance may execute normally while another executes partially, leading to inconsistencies or duplicate executions.

In the past, we'd asked customers to implement their own concurrency management. With this PR, we should be able to automatically de-duplicate these start messages, allowing users to not worry about writing distributed locks to mitigate this race condition.

Solution

For every ExecutionStarted event we dequeue from a control queue, we query the Instances table to see whether the ExecutionId field of the start message matches that same property of the corresponding Instances table. If they match, then we process the start message normally. Otherwise, we discard the message as a duplicate. We can do this because updates to the Instances table are thread-safe, so the ExecutionId field is an easy way to uniquely associate an instance to an ExecutionStarted message.

The actual implementation is a bit more complex because we actually enqueue the ExecutionStarted message in the control queue before updating the Instances table. For example, by the time we read the ExecutionStarted message, the corresponding Instances table record may not yet exist, in which case we put the message back onto the queue so we can try processing it again later. We do this up to 10 times.

The other corner case we handle is where a completed instance is being intentionally overwritten, which is something we actively support. In that case, an old ExecutionId values will exist temporarily until the new one can be written. To account for this, we also compare the timestamp of the message with the timestamp of the Instances.CreatedTime record to distinguish this case from the duplicate start case.

Pros

This solution is robust without introducing too much complexity.

Cons

There is now one more Azure Table I/O request for every orchestration start. This means more storage I/O and more end-to-end latency for a single orchestration execution. I tried to avoid this but couldn't come up with anything that was sufficiently robust against all kinds of corner cases. Because the new I/O is outside of the main execution path, it shouldn't have a big impact on max throughput.

As an optimization, we will try to batch multiple Azure Table I/O requests into one to minimize overhead if there are lots of concurrent orchestration starts.

Other alternative designs considered

I initially tried to solve this by simply de-duplicating ExecutionStarted messages in-memory when we dequeued them. However, this generally assumes that we dequeue all these messages in the same batch, which is not always the case. For example, if there are 40 duplicate start messages, then they would arrive it at least two different dequeue batches.

There are other potential solutions as well, such as keeping a list of all execution start messages over a period of time. However, doing this in memory can be fragile because 1) leases may move in which case some messages will show up on one VM and some on another, and 2) the list of instance IDs also needs to be kept in sync with orchestration status updates - for example, we need to know if an instance ID in our list has completed so we can remove it from the list. This becomes complicated quickly, so I did not pursue it.

Testing

To test this, I create 40 new instances concurrently. I chose 40 to be some number larger than 32, which is out dequeue batch size (other designs I had in mind didn't work for more than one dequeue batch). The test verifies that exactly one instance runs without any duplicate activity executions.

To ensure I didn't regress anything, and to exercise the optimization I mentioned earlier, I also added a version of the test that creates 40 unique orchestrations concurrently.

NOTE: There is a lot of new test code that I borrowed from the DT.SqlServer project. It allows us to define orchestrations and activities inline as C# lambdas without writing any classes, which is very convenient. My hope is that we can use this as the basis of a reusable test library for custom DTFx backend implementations to use.

- Populate Execution ID on orchestration creation
- Populate CompletionTime or orchestration completion
Copy link
Collaborator Author

@cgillum cgillum left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding some comments to my own PR to hopefully make it a little easier to understand.

Copy link
Contributor

@ConnorMcMahon ConnorMcMahon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have looked over most of the source code at this point. Overall looks good to me so far, mainly some nits.

src/DurableTask.AzureStorage/Messaging/ControlQueue.cs Outdated Show resolved Hide resolved
if (message.TaskMessage.Event.EventType == EventType.ExecutionStarted)
{
executionStartedMessages ??= new List<MessageData>(messages.Count);
executionStartedMessages!.Add(message);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: This ! should not be required (I tested it out locally). We have guarantees on line 199 that this is no longer null.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, this is a leftover from an older iteration where I tried to use LazyInitializer.EnsureInitialized. Unfortunately that method doesn't play nice with nullable reference types. I later switched to using the ??= operator and forgot to remove the ! operator. I'll fix it here and in the other places in this file.

Copy link
Collaborator

@davidmrdavid davidmrdavid left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for this work, it seems like we're making good progress on these ordering race conditions! Left some comments, requests, and very many questions!

public class StressTests
{
/// <summary>
/// End-to-end test which validates a simple orchestrator function which doesn't call any activity functions.
Copy link
Collaborator

@davidmrdavid davidmrdavid Mar 30, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you expand, in the comment itself, on what exactly is being validated / what the passing conditions would be?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This must be some copy/paste error. Sorry about not noticing this. I've updated the summary to actually reflect what the test does.

@@ -29,6 +29,7 @@ class OrchestrationInstanceStatus : TableEntity
public string CustomStatus { get; set; }
public DateTime CreatedTime { get; set; }
public DateTime LastUpdatedTime { get; set; }
public DateTime? CompletedTime { get; set; }
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume this is something we'll want to patch in the future, right?

@@ -178,6 +183,143 @@ async Task DequeueLoop(string partitionId, ControlQueue controlQueue, Cancellati
}
}

IReadOnlyList<MessageData> FilterOutExecutionStartedForDedupeValidation(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this helper is important enough to be documented. Could we please embed a header comment outlining what this achieves and why it's needed? I realize this is in the PR itself, just asking for it to be in the codebase as well :)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Definitely. Added.


if (executionStartedMessages?.Count > 0)
{
Task.Run(async () =>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is clear enough by itself, but I vote to err on the side of caution and commenting it anyways :)

Comment on lines 241 to 247
else if (message.OriginalQueueMessage.DequeueCount >= 10)
{
// We've tried and failed repeatedly to process this start message. Most likely it will never succeed, possibly because
// the client failed to update the Instances table after enqueuing this message. In such a case, the client would
// have observed a failure and will know to retry with a new message. Discard this one.
messagesToDiscard ??= new List<MessageData>(executionStartedMessages.Count);
messagesToDiscard!.Add(message);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add some telemetry here to flag this edge-case in our logs?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already emit warning logs for all discarded messages. I feel like that plus the context from other logs should be good enough. I expect that this will also be non-actionable since the customer would have already observed an exception on the client side and will have expected that this message was either never created or was discarded.

return otherMessages;
}

bool IsScheduledAfterInstanceUpdate(MessageData msg, OrchestrationState? remoteInstance)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Requesting a header comment clarifying what an "update" refers to in this case :)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added. I also added more comments in the place where we call this method.

Comment on lines +294 to +299
if (remoteInstance == null)
{
// This is a new instance and we don't yet have a status record for it.
// We can't make a call for it yet.
return false;
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wondering if we should even allow null values to be passed-in as parameters. Does it make sense to just check for null-ness prior to calling this method instead?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could do that but then it becomes a little harder to explain what null means without introducing more nesting into the code structure (making the code potentially harder to read). I'm not sure what the right trade-off is here in terms of code quality/readability.

Comment on lines 303 to 306
// The message was inserted after the Instances table was updated.
// The same machine will have generated both timestamps so time skew is not a factor.
// We know almost certainly that this is a redundant message and can be safely discarded.
return true;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know I've said this in other comments, but I find the phrasing of "The messages was inserted after the Instances table was updated" to be too vague. The reason why is that the word "update" is too general: a lot of things could have been updated and, even if we knew what was updated, we wouldn't know in what way the updated value changed.

Judging by the if-statement condition, it seems that a more direct phrasing could be: "This ExecutionStarted event does not correspond to the instance's creation time, so it cannot be the original event." Could we opt for something like this?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I spent more time than I probably should have thinking about how I could rewrite this but honestly, every attempt I make feels like I'm further obscuring the logic. The problem with "This ExecutionStarted event does not correspond to the instance's creation time, so it cannot be the original event" is that it's still not clear what that actually means ("correspond" is also vague). My intent is less about giving an abstract sense of semantics and more about providing an explanation about how and why the logic is correct and that all corner cases have been considered. In order to do that, my comments are intentionally describing the mechanics. Hopefully some of the other comments that I already added are helping provide some of the additional context.

Comment on lines +308 to +315

CloudQueueMessage cloudQueueMessage = msg.OriginalQueueMessage;
if (cloudQueueMessage.DequeueCount <= 1 || !cloudQueueMessage.NextVisibleTime.HasValue)
{
// We can't use the initial insert time and instead must rely on a re-insertion time,
// which is only available to use after the first dequeue count.
return false;
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not exactly sure what this block is doing, can is the "originalQueueMessage"?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a typo in your comment, but I'm guessing you're asking what is "originalQueueMessage"? It's the message object that we received from the control queue using the .NET SDK. It has Azure Storage-specific metadata about dequeue counts, visible times, etc and is not specific to Durable Functions. The "MessageData" wrapper type is a Durable-specific deserialized version of this message.

Copy link
Collaborator Author

@cgillum cgillum left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the feedback. See my responses below.

public class StressTests
{
/// <summary>
/// End-to-end test which validates a simple orchestrator function which doesn't call any activity functions.
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This must be some copy/paste error. Sorry about not noticing this. I've updated the summary to actually reflect what the test does.

src/DurableTask.AzureStorage/Messaging/ControlQueue.cs Outdated Show resolved Hide resolved
@@ -29,6 +29,7 @@ class OrchestrationInstanceStatus : TableEntity
public string CustomStatus { get; set; }
public DateTime CreatedTime { get; set; }
public DateTime LastUpdatedTime { get; set; }
public DateTime? CompletedTime { get; set; }
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what you mean. This PR includes the patch/fix.

return otherMessages;
}

bool IsScheduledAfterInstanceUpdate(MessageData msg, OrchestrationState? remoteInstance)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added. I also added more comments in the place where we call this method.

Comment on lines +294 to +299
if (remoteInstance == null)
{
// This is a new instance and we don't yet have a status record for it.
// We can't make a call for it yet.
return false;
}
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could do that but then it becomes a little harder to explain what null means without introducing more nesting into the code structure (making the code potentially harder to read). I'm not sure what the right trade-off is here in terms of code quality/readability.

Comment on lines 303 to 306
// The message was inserted after the Instances table was updated.
// The same machine will have generated both timestamps so time skew is not a factor.
// We know almost certainly that this is a redundant message and can be safely discarded.
return true;
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I spent more time than I probably should have thinking about how I could rewrite this but honestly, every attempt I make feels like I'm further obscuring the logic. The problem with "This ExecutionStarted event does not correspond to the instance's creation time, so it cannot be the original event" is that it's still not clear what that actually means ("correspond" is also vague). My intent is less about giving an abstract sense of semantics and more about providing an explanation about how and why the logic is correct and that all corner cases have been considered. In order to do that, my comments are intentionally describing the mechanics. Hopefully some of the other comments that I already added are helping provide some of the additional context.

Comment on lines +308 to +315

CloudQueueMessage cloudQueueMessage = msg.OriginalQueueMessage;
if (cloudQueueMessage.DequeueCount <= 1 || !cloudQueueMessage.NextVisibleTime.HasValue)
{
// We can't use the initial insert time and instead must rely on a re-insertion time,
// which is only available to use after the first dequeue count.
return false;
}
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a typo in your comment, but I'm guessing you're asking what is "originalQueueMessage"? It's the message object that we received from the control queue using the .NET SDK. It has Azure Storage-specific metadata about dequeue counts, visible times, etc and is not specific to Durable Functions. The "MessageData" wrapper type is a Durable-specific deserialized version of this message.

Copy link
Contributor

@ConnorMcMahon ConnorMcMahon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a few minor qualms about what we are actually validating with our tests, but overall, I think the PR looks solid.

Test/DurableTask.AzureStorage.Tests/StressTests.cs Outdated Show resolved Hide resolved
@@ -29,6 +29,7 @@ class OrchestrationInstanceStatus : TableEntity
public string CustomStatus { get; set; }
public DateTime CreatedTime { get; set; }
public DateTime LastUpdatedTime { get; set; }
public DateTime? CompletedTime { get; set; }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general, this is only a DTFx problem right? I don't think we expose CompletedTime in Durable Functions, which is likely how we went so long without catching this.

Copy link
Contributor

@ConnorMcMahon ConnorMcMahon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

@cgillum cgillum merged commit d89ca58 into main Mar 31, 2021
@cgillum cgillum deleted the dedup branch March 31, 2021 22:30
ConnorMcMahon pushed a commit that referenced this pull request May 27, 2021
The key issue being fixed is a problem where execution started messages are executed out of order after #528 was merged. None of the existing tests were sensitive to this. However, some of the Durable Functions tests were. To fix this, we move the deduplication of execution started messages back onto the main thread, though there may be a potential perf hit.

Customers can turn off this deduplication if performance is too highly impacted.
@henggua-ramsoft
Copy link

This looks like an issue we are running into. How do we determine if this fix has been released yet?

@cgillum
Copy link
Collaborator Author

cgillum commented Jun 18, 2021

The fix for this has been released: https://github.com/Azure/durabletask/releases/tag/durabletask.azurestorage-v1.8.6. We actually have done another v1.8.7 release since then which covers another issue.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment