Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] feature/core-entities #960

Merged
merged 24 commits into from
Oct 17, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
adcaf61
Bring entity logic into DurableTask.Core (first milestone) (#887)
sebastianburckhardt Aug 29, 2023
ac49392
Merge branch 'main' into feature/core-entities
sebastianburckhardt Sep 7, 2023
7eca55e
fix semantic merge conflict.
sebastianburckhardt Sep 7, 2023
ff23b97
Revise entity state and status format and access (#955)
sebastianburckhardt Sep 8, 2023
48d7d0d
Revise entity message format and external access (#956)
sebastianburckhardt Sep 8, 2023
d3e2057
make current critical section id publicly visible (#958)
sebastianburckhardt Sep 8, 2023
e043e38
remove orchestration tags from entity action (#952)
sebastianburckhardt Sep 8, 2023
73e5203
Rename OperationBatchRequest and OperationBatchResponse (#953)
sebastianburckhardt Sep 8, 2023
3f98043
Revise how entity batches are executed and handle failures (#954)
sebastianburckhardt Sep 8, 2023
3548d9f
revise operation result encoding and add more comments. (#965)
sebastianburckhardt Sep 13, 2023
e729bb6
revise entity backend properties and implement entity backend queries…
sebastianburckhardt Sep 21, 2023
8a907d0
Update versions for ADO feed (#973)
jviau Sep 21, 2023
f49f615
Add no-warn for NU5104 (#974)
jviau Sep 21, 2023
cb47d63
revise propagation path for entity parameters (#971)
sebastianburckhardt Oct 3, 2023
9b842ca
Revise entity queries (#981)
sebastianburckhardt Oct 5, 2023
f303e9d
fix bugs in tracking store implementation (#979)
sebastianburckhardt Oct 5, 2023
3ba55f4
add scheduled start time parameter to the start-new-orchestration ope…
sebastianburckhardt Oct 6, 2023
b50f47c
Merge branch 'main' into feature/core-entities
sebastianburckhardt Oct 6, 2023
42ab8c1
Revise serialization of entitymessages (#972)
sebastianburckhardt Oct 6, 2023
25c0e76
Rename includeStateless to includeTransient in entity queries (#985)
sebastianburckhardt Oct 9, 2023
da2eaf4
Rev to entities-preview.2 (#986)
jviau Oct 9, 2023
9957df7
fix null reference exception when running on older backends (#989)
sebastianburckhardt Oct 12, 2023
cc0214a
Prepare for public preview (#994)
jviau Oct 17, 2023
ca71117
Merge branch 'main' into feature/core-entities
jviau Oct 17, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Revise how entity batches are executed and handle failures (#954)
* revise task entity definition

* commit change that had been accidentally committed to a different PR.

* Apply suggestions from code review

Co-authored-by: David Justo <david.justo.1996@gmail.com>

* Apply suggestions from code review

Co-authored-by: Jacob Viau <javia@microsoft.com>

---------

Co-authored-by: David Justo <david.justo.1996@gmail.com>
Co-authored-by: Jacob Viau <javia@microsoft.com>
  • Loading branch information
3 people authored Sep 8, 2023
commit 3f98043f52656d622eaad37f3b1ca67a57b16558
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,9 @@ public class EntityBatchResult
// To ensure maximum compatibility, all properties should be public and settable by default.

/// <summary>
/// The results of executing the operations in the batch. The length of this list must match
/// the size of the batch if all messages were processed; In particular, all execution errors must be reported as a result.
/// However, this list of results can be shorter than the list of operations if
/// some suffix of the operation list was skipped, e.g. due to shutdown, send throttling, or timeouts.
/// The results of executing the operations in the batch. If there were (non-application-level) errors, the length of this list may
/// be shorter than the number of requests. In that case, <see cref="FailureDetails"/> contains the reason why not all requests
/// were processed.
/// </summary>
public List<OperationResult>? Results { get; set; }

Expand All @@ -41,5 +40,10 @@ public class EntityBatchResult
/// or null if the entity has no state (e.g. if it has been deleted).
/// </summary>
public string? EntityState { get; set; }

/// <summary>
/// Contains the failure details, if there was a failure to process all requests (fewer results were returned than requests)
/// </summary>
public FailureDetails? FailureDetails { get; set; }
}
}
2 changes: 1 addition & 1 deletion src/DurableTask.Core/Entities/TaskEntity.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,6 @@ public abstract class TaskEntity
/// <summary>
/// Execute a batch of operations on an entity.
/// </summary>
public abstract Task<EntityBatchResult> ExecuteOperationBatchAsync(EntityBatchRequest operations, EntityExecutionOptions options);
public abstract Task<EntityBatchResult> ExecuteOperationBatchAsync(EntityBatchRequest operations);
}
}
23 changes: 21 additions & 2 deletions src/DurableTask.Core/TaskEntityDispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,24 @@ protected async Task<bool> OnProcessWorkItemAsync(TaskOrchestrationWorkItem work
{
// execute the user-defined operations on this entity, via the middleware
var result = await this.ExecuteViaMiddlewareAsync(workToDoNow, runtimeState.OrchestrationInstance, schedulerState.EntityState);
var operationResults = result.Results!;

// if we encountered an error, record it as the result of the operations
// so that callers are notified that the operation did not succeed.
if (result.FailureDetails != default(FailureDetails))
{
OperationResult errorResult = new OperationResult()
{
Result = null,
ErrorMessage = "entity dispatch failed",
FailureDetails = result.FailureDetails,
};

for (int i = operationResults.Count; i < workToDoNow.OperationCount; i++)
{
operationResults.Add(errorResult);
}
}

// go through all results
// for each operation that is not a signal, send a result message back to the calling orchestrator
Expand All @@ -260,7 +278,8 @@ protected async Task<bool> OnProcessWorkItemAsync(TaskOrchestrationWorkItem work

if (result.Results.Count < workToDoNow.OperationCount)
{
// some operations were not processed
// some requests were not processed (e.g. due to shutdown or timeout)
// in this case we just defer the work so it can be retried
var deferred = workToDoNow.RemoveDeferredWork(result.Results.Count);
schedulerState.PutBack(deferred);
workToDoNow.ToBeContinued(schedulerState);
Expand Down Expand Up @@ -851,7 +870,7 @@ await this.dispatchPipeline.RunAsync(dispatchContext, async _ =>
ErrorPropagationMode = this.errorPropagationMode,
};

var result = await taskEntity.ExecuteOperationBatchAsync(request, options);
var result = await taskEntity.ExecuteOperationBatchAsync(request);

dispatchContext.SetProperty(result);
});
Expand Down