-
Notifications
You must be signed in to change notification settings - Fork 295
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Allow user to specify MaxHistoryEvents and constraint history loading to include the latest generation only #1119
base: main
Are you sure you want to change the base?
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -14,7 +14,6 @@ | |
namespace DurableTask.AzureStorage.Tracking | ||
{ | ||
using System; | ||
using System.Collections.Concurrent; | ||
using System.Collections.Generic; | ||
using System.Diagnostics; | ||
using System.Linq; | ||
|
@@ -226,22 +225,36 @@ public override async Task<bool> ExistsAsync() | |
|
||
return new OrchestrationHistory(historyEvents, checkpointCompletionTime, eTagValue, trackingStoreContext); | ||
} | ||
|
||
#nullable enable | ||
async Task<TableEntitiesResponseInfo<DynamicTableEntity>> GetHistoryEntitiesResponseInfoAsync(string instanceId, string expectedExecutionId, IList<string> projectionColumns, CancellationToken cancellationToken = default(CancellationToken)) | ||
{ | ||
var sanitizedInstanceId = KeySanitation.EscapePartitionKey(instanceId); | ||
string filterCondition = TableQuery.GenerateFilterCondition(PartitionKeyProperty, QueryComparisons.Equal, sanitizedInstanceId); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Looks like we're always overwriting this variable value with this change. To make the intent more clear, I suggest we rename this variable to to something like |
||
if (!string.IsNullOrEmpty(expectedExecutionId)) | ||
|
||
// we need to get the executionID to account for continueAsNew scenarios, where a given instanceID may have multiple histories | ||
// we can obtain this from the sentinel row, which always has the latest executionID | ||
if (string.IsNullOrWhiteSpace(expectedExecutionId)) | ||
{ | ||
// Filter down to a specific generation. | ||
var rowKeyOrExecutionId = TableQuery.CombineFilters( | ||
TableQuery.GenerateFilterCondition("RowKey", QueryComparisons.Equal, SentinelRowKey), | ||
TableOperators.Or, | ||
TableQuery.GenerateFilterCondition("ExecutionId", QueryComparisons.Equal, expectedExecutionId)); | ||
var sentinelRowFilter = TableQuery.GenerateFilterCondition("RowKey", QueryComparisons.Equal, SentinelRowKey); | ||
var sentinelRowForInstanceIdFilter = TableQuery.CombineFilters(filterCondition, TableOperators.And, sentinelRowFilter); | ||
|
||
filterCondition = TableQuery.CombineFilters(filterCondition, TableOperators.And, rowKeyOrExecutionId); | ||
TableQuery<DynamicTableEntity> sentinelRowQuery = new TableQuery<DynamicTableEntity>().Where(sentinelRowForInstanceIdFilter); | ||
var sentinelRowQueryResponse = await this.HistoryTable.ExecuteQueryAsync(sentinelRowQuery, cancellationToken); | ||
|
||
if (sentinelRowQueryResponse.ReturnedEntities != null && sentinelRowQueryResponse.ReturnedEntities.Count > 0) | ||
{ | ||
expectedExecutionId = sentinelRowQueryResponse.ReturnedEntities.FirstOrDefault().Properties["ExecutionId"].StringValue; | ||
} | ||
} | ||
|
||
// Filter down to a specific generation. | ||
var rowKeyOrExecutionId = TableQuery.CombineFilters( | ||
TableQuery.GenerateFilterCondition("RowKey", QueryComparisons.Equal, SentinelRowKey), | ||
TableOperators.Or, | ||
TableQuery.GenerateFilterCondition("ExecutionId", QueryComparisons.Equal, expectedExecutionId)); | ||
|
||
filterCondition = TableQuery.CombineFilters(filterCondition, TableOperators.And, rowKeyOrExecutionId); | ||
|
||
TableQuery<DynamicTableEntity> query = new TableQuery<DynamicTableEntity>().Where(filterCondition); | ||
|
||
if (projectionColumns != null) | ||
|
@@ -250,9 +263,11 @@ public override async Task<bool> ExistsAsync() | |
} | ||
|
||
var tableEntitiesResponseInfo = await this.HistoryTable.ExecuteQueryAsync(query, cancellationToken); | ||
|
||
return tableEntitiesResponseInfo; | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. will remove this extra whitespace before merging. Did not notice it |
||
|
||
} | ||
#nullable disable | ||
|
||
async Task<IList<DynamicTableEntity>> QueryHistoryAsync(TableQuery<DynamicTableEntity> query, string instanceId, CancellationToken cancellationToken) | ||
{ | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would be better to add this logic to
CompleteTaskOrchestrationWorkItemAsync
rather than doing it here. That way you can terminate the orchestration without needing to load and execute the excessive history, which is required in this case. There would be some logging benefits as well since the termination happens on the same logical thread as the orchestration execution, rather than on a background thread, which is the case for history loading.