-
Notifications
You must be signed in to change notification settings - Fork 38
Warn on unfinished workflow handlers #294
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -6,6 +6,7 @@ | |
using System.Linq; | ||
using System.Reflection; | ||
using System.Runtime.ExceptionServices; | ||
using System.Text.Json; | ||
using System.Threading; | ||
using System.Threading.Tasks; | ||
using Microsoft.Extensions.Logging; | ||
|
@@ -68,6 +69,7 @@ internal class WorkflowInstance : TaskScheduler, IWorkflowInstance, IWorkflowCon | |
private readonly Action<WorkflowInstance, Exception?> onTaskCompleted; | ||
private readonly IReadOnlyCollection<Type>? workerLevelFailureExceptionTypes; | ||
private readonly bool disableCompletionCommandReordering; | ||
private readonly Handlers inProgressHandlers = new(); | ||
private WorkflowActivationCompletion? completion; | ||
// Will be set to null after last use (i.e. when workflow actually started) | ||
private Lazy<object?[]>? startArgs; | ||
|
@@ -204,6 +206,9 @@ public WorkflowInstance(WorkflowInstanceDetails details) | |
/// </summary> | ||
public bool TracingEventsEnabled { get; private init; } | ||
|
||
/// <inheritdoc /> | ||
public bool AllHandlersFinished => inProgressHandlers.Count == 0; | ||
|
||
/// <inheritdoc /> | ||
public CancellationToken CancellationToken => cancellationTokenSource.Token; | ||
|
||
|
@@ -576,7 +581,13 @@ public WorkflowActivationCompletion Activate(WorkflowActivation act) | |
} | ||
|
||
// Maybe apply workflow completion command reordering logic | ||
ApplyCompletionCommandReordering(act, completion); | ||
ApplyCompletionCommandReordering(act, completion, out var workflowComplete); | ||
|
||
// Log warnings if we have completed | ||
if (workflowComplete && !IsReplaying) | ||
{ | ||
inProgressHandlers.WarnIfAnyLeftOver(Info.WorkflowId, logger); | ||
} | ||
|
||
// Unset the completion | ||
var toReturn = completion; | ||
|
@@ -886,6 +897,10 @@ private void ApplyDoUpdate(DoUpdate update) | |
// Queue it up so it can run in workflow environment | ||
_ = QueueNewTaskAsync(() => | ||
{ | ||
// Make sure we have loaded the instance which may invoke the constructor thereby | ||
// letting the constructor register update handlers at runtime | ||
var ignored = Instance; | ||
|
||
// Set the current update for the life of this task | ||
CurrentUpdateInfoLocal.Value = new(Id: update.Id, Name: update.Name); | ||
|
||
|
@@ -998,9 +1013,12 @@ private void ApplyDoUpdate(DoUpdate update) | |
Definition: updateDefn, | ||
Args: argsForUpdate, | ||
Headers: update.Headers)); | ||
var inProgress = inProgressHandlers.AddLast(new Handlers.Handler( | ||
update.Name, update.Id, updateDefn.UnfinishedPolicy)); | ||
return task.ContinueWith( | ||
_ => | ||
{ | ||
inProgressHandlers.Remove(inProgress); | ||
// If workflow failure exception, it's an update failure. If it's some | ||
// other exception, it's a task failure. Otherwise it's a success. | ||
var exc = task.Exception?.InnerExceptions?.SingleOrDefault(); | ||
|
@@ -1080,6 +1098,10 @@ private void ApplyQueryWorkflow(QueryWorkflow query) | |
// Queue it up so it can run in workflow environment | ||
_ = QueueNewTaskAsync(() => | ||
{ | ||
// Make sure we have loaded the instance which may invoke the constructor thereby | ||
// letting the constructor register query handlers at runtime | ||
var ignored = Instance; | ||
|
||
var origCmdCount = completion?.Successful?.Commands?.Count; | ||
try | ||
{ | ||
|
@@ -1241,11 +1263,21 @@ private void ApplySignalWorkflow(SignalWorkflow signal) | |
return; | ||
} | ||
|
||
await inbound.Value.HandleSignalAsync(new( | ||
Signal: signal.SignalName, | ||
Definition: signalDefn, | ||
Args: args, | ||
Headers: signal.Headers)).ConfigureAwait(true); | ||
// Handle signal | ||
var inProgress = inProgressHandlers.AddLast(new Handlers.Handler( | ||
signal.SignalName, null, signalDefn.UnfinishedPolicy)); | ||
try | ||
{ | ||
await inbound.Value.HandleSignalAsync(new( | ||
Signal: signal.SignalName, | ||
Definition: signalDefn, | ||
Args: args, | ||
Headers: signal.Headers)).ConfigureAwait(true); | ||
} | ||
finally | ||
{ | ||
inProgressHandlers.Remove(inProgress); | ||
} | ||
})); | ||
} | ||
|
||
|
@@ -1394,7 +1426,9 @@ private string GetStackTrace() | |
} | ||
|
||
private void ApplyCompletionCommandReordering( | ||
WorkflowActivation act, WorkflowActivationCompletion completion) | ||
WorkflowActivation act, | ||
WorkflowActivationCompletion completion, | ||
out bool workflowComplete) | ||
{ | ||
// In earlier versions of the SDK we allowed commands to be sent after workflow | ||
// completion. These ended up being removed effectively making the result of the | ||
|
@@ -1404,40 +1438,42 @@ private void ApplyCompletionCommandReordering( | |
// | ||
// Note this only applies for successful activations that don't have completion | ||
// reordering disabled and that are either not replaying or have the flag set. | ||
if (completion.Successful == null || disableCompletionCommandReordering) | ||
{ | ||
return; | ||
} | ||
if (IsReplaying && !act.AvailableInternalFlags.Contains((uint)WorkflowLogicFlag.ReorderWorkflowCompletion)) | ||
{ | ||
return; | ||
} | ||
|
||
// We know we're on a newer SDK and can move completion to the end if we need to. First, | ||
// find the completion command. | ||
// Find the index of the completion command | ||
var completionCommandIndex = -1; | ||
for (var i = completion.Successful.Commands.Count - 1; i >= 0; i--) | ||
if (completion.Successful != null) | ||
{ | ||
var cmd = completion.Successful.Commands[i]; | ||
// Set completion index if the command is a completion | ||
if (cmd.CancelWorkflowExecution != null || | ||
cmd.CompleteWorkflowExecution != null || | ||
cmd.ContinueAsNewWorkflowExecution != null || | ||
cmd.FailWorkflowExecution != null) | ||
for (var i = completion.Successful.Commands.Count - 1; i >= 0; i--) | ||
{ | ||
completionCommandIndex = i; | ||
break; | ||
var cmd = completion.Successful.Commands[i]; | ||
// Set completion index if the command is a completion | ||
if (cmd.CancelWorkflowExecution != null || | ||
cmd.CompleteWorkflowExecution != null || | ||
cmd.ContinueAsNewWorkflowExecution != null || | ||
cmd.FailWorkflowExecution != null) | ||
{ | ||
completionCommandIndex = i; | ||
break; | ||
} | ||
} | ||
} | ||
|
||
// If there is no completion command or it's already at the end, nothing to do | ||
if (completionCommandIndex == -1 || | ||
completionCommandIndex == completion.Successful.Commands.Count - 1) | ||
workflowComplete = completionCommandIndex >= 0; | ||
|
||
// This only applies for successful activations that have a completion not at the end, | ||
// don't have completion reordering disabled, and that are either not replaying or have | ||
// the flag set. | ||
if (completion.Successful == null || | ||
completionCommandIndex == -1 || | ||
completionCommandIndex == completion.Successful.Commands.Count - 1 || | ||
disableCompletionCommandReordering || | ||
(IsReplaying && !act.AvailableInternalFlags.Contains( | ||
(uint)WorkflowLogicFlag.ReorderWorkflowCompletion))) | ||
{ | ||
return; | ||
} | ||
|
||
// Now we know the completion is in the wrong spot, so set the SDK flag and move it | ||
// Now we know the completion is in the wrong spot and we're on a newer SDK, so set the | ||
// SDK flag and move it | ||
completion.Successful.UsedInternalFlags.Add((uint)WorkflowLogicFlag.ReorderWorkflowCompletion); | ||
var compCmd = completion.Successful.Commands[completionCommandIndex]; | ||
completion.Successful.Commands.RemoveAt(completionCommandIndex); | ||
|
@@ -2230,5 +2266,86 @@ public override Task SignalAsync( | |
public override Task CancelAsync() => | ||
instance.outbound.Value.CancelExternalWorkflowAsync(new(Id: Id, RunId: RunId)); | ||
} | ||
|
||
private class Handlers : LinkedList<Handlers.Handler> | ||
{ | ||
#pragma warning disable SA1118 // We're ok w/ string literals spanning lines | ||
private static readonly Action<ILogger, string, WarnableSignals, Exception?> SignalWarning = | ||
LoggerMessage.Define<string, WarnableSignals>( | ||
LogLevel.Warning, | ||
0, | ||
"Workflow {Id} finished while signal handlers are still running. This may " + | ||
"have interrupted work that the signal handler was doing. You can wait for " + | ||
"all update and signal handlers to complete by using `await " + | ||
"Workflow.WaitConditionAsync(() => Workflow.AllHandlersFinished)`. " + | ||
"Alternatively, if both you and the clients sending the signal are okay with " + | ||
"interrupting running handlers when the workflow finishes, and causing " + | ||
"clients to receive errors, then you can disable this warning via the signal " + | ||
"handler attribute: " + | ||
"`[WorkflowSignal(UnfinishedPolicy=HandlerUnfinishedPolicy.Abandon)]`. The " + | ||
"following signals were unfinished (and warnings were not disabled for their " + | ||
"handler): {Handlers}"); | ||
|
||
private static readonly Action<ILogger, string, WarnableUpdates, Exception?> UpdateWarning = | ||
LoggerMessage.Define<string, WarnableUpdates>( | ||
Comment on lines
+2289
to
+2290
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could maybe combine these two warning functions generically to save on duping the boilerplate part of the message. Not a big deal. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 In this case I am matching the contents of temporalio/sdk-python#556, but can make this change generally if needed. |
||
LogLevel.Warning, | ||
0, | ||
"Workflow {Id} finished while update handlers are still running. This may " + | ||
"have interrupted work that the update handler was doing, and the client " + | ||
"that sent the update will receive a 'workflow execution already completed' " + | ||
"RpcException instead of the update result. You can wait for all update and " + | ||
"signal handlers to complete by using `await " + | ||
"Workflow.WaitConditionAsync(() => Workflow.AllHandlersFinished)`. " + | ||
"Alternatively, if both you and the clients sending the update are okay with " + | ||
"interrupting running handlers when the workflow finishes, and causing " + | ||
"clients to receive errors, then you can disable this warning via the update " + | ||
"handler attribute: " + | ||
"`[WorkflowUpdate(UnfinishedPolicy=HandlerUnfinishedPolicy.Abandon)]`. The " + | ||
"following updates were unfinished (and warnings were not disabled for their " + | ||
"handler): {Handlers}"); | ||
#pragma warning restore SA1118 | ||
|
||
public void WarnIfAnyLeftOver(string id, ILogger logger) | ||
{ | ||
var signals = this. | ||
Where(h => h.UpdateId == null && h.UnfinishedPolicy == HandlerUnfinishedPolicy.WarnAndAbandon). | ||
GroupBy(h => h.Name). | ||
Select(h => (h.Key, h.Count())). | ||
ToArray(); | ||
if (signals.Length > 0) | ||
{ | ||
SignalWarning(logger, id, new WarnableSignals { NamesAndCounts = signals }, null); | ||
} | ||
var updates = this. | ||
Where(h => h.UpdateId != null && h.UnfinishedPolicy == HandlerUnfinishedPolicy.WarnAndAbandon). | ||
Select(h => (h.Name, h.UpdateId!)). | ||
ToArray(); | ||
if (updates.Length > 0) | ||
{ | ||
UpdateWarning(logger, id, new WarnableUpdates { NamesAndIds = updates }, null); | ||
} | ||
} | ||
|
||
public readonly struct WarnableSignals | ||
{ | ||
public (string, int)[] NamesAndCounts { get; init; } | ||
|
||
public override string ToString() => JsonSerializer.Serialize( | ||
NamesAndCounts.Select(v => new { name = v.Item1, count = v.Item2 }).ToArray()); | ||
} | ||
|
||
public readonly struct WarnableUpdates | ||
{ | ||
public (string, string)[] NamesAndIds { get; init; } | ||
|
||
public override string ToString() => JsonSerializer.Serialize( | ||
NamesAndIds.Select(v => new { name = v.Item1, id = v.Item2 }).ToArray()); | ||
} | ||
|
||
public record Handler( | ||
string Name, | ||
string? UpdateId, | ||
HandlerUnfinishedPolicy UnfinishedPolicy); | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,27 @@ | ||
namespace Temporalio.Workflows | ||
{ | ||
/// <summary> | ||
/// Actions taken if a workflow terminates with running handlers. | ||
/// </summary> | ||
/// <remarks> | ||
/// Policy defining actions taken when a workflow exits while update or signal handlers are | ||
/// running. The workflow exit may be due to successful return, failure, cancellation, or | ||
/// continue-as-new. | ||
/// </remarks> | ||
public enum HandlerUnfinishedPolicy | ||
{ | ||
/// <summary> | ||
/// Issue a warning in addition to abandoning. | ||
/// </summary> | ||
WarnAndAbandon, | ||
|
||
/// <summary> | ||
/// Abandon the handler. | ||
/// </summary> | ||
/// <remarks> | ||
/// In the case of an update handler this means that the client will receive an error rather | ||
/// than the update result. | ||
/// </remarks> | ||
Abandon, | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't the handlers be returned as the own key and the message be constant? That way the message can be easily filtered for and any unfinished updates could be easily extracted? Basically I am asking why we can't treat this like other SDKs logs that use structured logging
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you clarify what you mean by constant? This doesn't change if that's what you mean.
We did want to put the literal linked list of this information in the structured logger, but .NET forces linked list formatting and I am matching the Python PR by using JSON. I considered making a whole new data type, but then I would need to make the data type available as part of our public API and guarantee compatibility on it. We are not doing that in the Python PR (there is no extraction of handlers from the warning, though it could be easily done). .NET wasn't meant to add any features over the Python one, which is English message only w/ JSON appended. If we do want to expose data types for handlers we can, but I think that requires more thought and more API surface and more guarantees.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The message here has workflow ID substituted in no
Workflow {Id} finished
? That makes it harder to filter for occurrences of this log in a logging system since users can't just filter on a direct match.Not sure you need to make the data type public, as long as the json format is consistent it could be parsed. You do have an
UpdateInfo
type maybe you could reuse that? I don't feel strongly about requiring the list to always be formatted in json, I would expect the format to be left to the logger, but if others do that's fine.For example when we warn on unhandled signals in Go we return the list as part of a
key-value
. For updates I would do something similar (although we would also need to add UpdateID as well)https://github.com/temporalio/sdk-go/blob/992d42745e67fb863a3ae4f88d4ce7e83ca54b9c/internal/internal_workflow.go#L666
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is in addition to scoped values. The thing with .NET unlike other SDKs is that their scopes are often not logged. So we also added workflow ID to the message here. Arguably I should add workflow ID to all English messages from a workflow, but regardless, all workflows have a "scope" with a dictionary with this information and the logger can be configured with a format to expose this scoped information (but many don't, and it's not in there by default like it is in Go and Python).
In .NET, there is not a good stable/format for list/array items. It is more important to make the English read well than to reuse .NET collection types in the structured log. So we can either do string or new data types w/ custom formatter. I think if users get to the point where they want to extract the structure out, we can expose entirely new, stable data types that reflect this structure (a list of two-field objects), but until we want to expose those data types, we can/should stick with string. And if/when we do want structured objects, we should also add them to the Python warning.
This is more than a list, this is a list of objects with two fields each. The question becomes whether we need a stable data structure for that. I don't think we do. For instance, when Go SDK has the code for this, you will likely need to choose between a new object to get formatting right (that users probably can't access unless we're talking exposure), or JSON formatting. Though Go may be lucky enough to have a logger that defaults to a reasonable format for lists, but .NET and others don't necessarily.
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am going to refactor to a new data type that I just won't expose. Users won't really be able to do anything with it except via reflection (but if we decide to expose this data type later we can).
EDIT: There is now a new data type but users can't do anything with really besides reflection. We can't make it extend/implement anything useful like collection, because .NET formatting is short-circuiting that in its formatting logic.