Skip to content

Warn on unfinished workflow handlers #294

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jul 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
179 changes: 148 additions & 31 deletions src/Temporalio/Worker/WorkflowInstance.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using System.Linq;
using System.Reflection;
using System.Runtime.ExceptionServices;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
Expand Down Expand Up @@ -68,6 +69,7 @@ internal class WorkflowInstance : TaskScheduler, IWorkflowInstance, IWorkflowCon
private readonly Action<WorkflowInstance, Exception?> onTaskCompleted;
private readonly IReadOnlyCollection<Type>? workerLevelFailureExceptionTypes;
private readonly bool disableCompletionCommandReordering;
private readonly Handlers inProgressHandlers = new();
private WorkflowActivationCompletion? completion;
// Will be set to null after last use (i.e. when workflow actually started)
private Lazy<object?[]>? startArgs;
Expand Down Expand Up @@ -204,6 +206,9 @@ public WorkflowInstance(WorkflowInstanceDetails details)
/// </summary>
public bool TracingEventsEnabled { get; private init; }

/// <inheritdoc />
public bool AllHandlersFinished => inProgressHandlers.Count == 0;

/// <inheritdoc />
public CancellationToken CancellationToken => cancellationTokenSource.Token;

Expand Down Expand Up @@ -576,7 +581,13 @@ public WorkflowActivationCompletion Activate(WorkflowActivation act)
}

// Maybe apply workflow completion command reordering logic
ApplyCompletionCommandReordering(act, completion);
ApplyCompletionCommandReordering(act, completion, out var workflowComplete);

// Log warnings if we have completed
if (workflowComplete && !IsReplaying)
{
inProgressHandlers.WarnIfAnyLeftOver(Info.WorkflowId, logger);
}

// Unset the completion
var toReturn = completion;
Expand Down Expand Up @@ -886,6 +897,10 @@ private void ApplyDoUpdate(DoUpdate update)
// Queue it up so it can run in workflow environment
_ = QueueNewTaskAsync(() =>
{
// Make sure we have loaded the instance which may invoke the constructor thereby
// letting the constructor register update handlers at runtime
var ignored = Instance;

// Set the current update for the life of this task
CurrentUpdateInfoLocal.Value = new(Id: update.Id, Name: update.Name);

Expand Down Expand Up @@ -998,9 +1013,12 @@ private void ApplyDoUpdate(DoUpdate update)
Definition: updateDefn,
Args: argsForUpdate,
Headers: update.Headers));
var inProgress = inProgressHandlers.AddLast(new Handlers.Handler(
update.Name, update.Id, updateDefn.UnfinishedPolicy));
return task.ContinueWith(
_ =>
{
inProgressHandlers.Remove(inProgress);
// If workflow failure exception, it's an update failure. If it's some
// other exception, it's a task failure. Otherwise it's a success.
var exc = task.Exception?.InnerExceptions?.SingleOrDefault();
Expand Down Expand Up @@ -1080,6 +1098,10 @@ private void ApplyQueryWorkflow(QueryWorkflow query)
// Queue it up so it can run in workflow environment
_ = QueueNewTaskAsync(() =>
{
// Make sure we have loaded the instance which may invoke the constructor thereby
// letting the constructor register query handlers at runtime
var ignored = Instance;

var origCmdCount = completion?.Successful?.Commands?.Count;
try
{
Expand Down Expand Up @@ -1241,11 +1263,21 @@ private void ApplySignalWorkflow(SignalWorkflow signal)
return;
}

await inbound.Value.HandleSignalAsync(new(
Signal: signal.SignalName,
Definition: signalDefn,
Args: args,
Headers: signal.Headers)).ConfigureAwait(true);
// Handle signal
var inProgress = inProgressHandlers.AddLast(new Handlers.Handler(
signal.SignalName, null, signalDefn.UnfinishedPolicy));
try
{
await inbound.Value.HandleSignalAsync(new(
Signal: signal.SignalName,
Definition: signalDefn,
Args: args,
Headers: signal.Headers)).ConfigureAwait(true);
}
finally
{
inProgressHandlers.Remove(inProgress);
}
}));
}

Expand Down Expand Up @@ -1394,7 +1426,9 @@ private string GetStackTrace()
}

private void ApplyCompletionCommandReordering(
WorkflowActivation act, WorkflowActivationCompletion completion)
WorkflowActivation act,
WorkflowActivationCompletion completion,
out bool workflowComplete)
{
// In earlier versions of the SDK we allowed commands to be sent after workflow
// completion. These ended up being removed effectively making the result of the
Expand All @@ -1404,40 +1438,42 @@ private void ApplyCompletionCommandReordering(
//
// Note this only applies for successful activations that don't have completion
// reordering disabled and that are either not replaying or have the flag set.
if (completion.Successful == null || disableCompletionCommandReordering)
{
return;
}
if (IsReplaying && !act.AvailableInternalFlags.Contains((uint)WorkflowLogicFlag.ReorderWorkflowCompletion))
{
return;
}

// We know we're on a newer SDK and can move completion to the end if we need to. First,
// find the completion command.
// Find the index of the completion command
var completionCommandIndex = -1;
for (var i = completion.Successful.Commands.Count - 1; i >= 0; i--)
if (completion.Successful != null)
{
var cmd = completion.Successful.Commands[i];
// Set completion index if the command is a completion
if (cmd.CancelWorkflowExecution != null ||
cmd.CompleteWorkflowExecution != null ||
cmd.ContinueAsNewWorkflowExecution != null ||
cmd.FailWorkflowExecution != null)
for (var i = completion.Successful.Commands.Count - 1; i >= 0; i--)
{
completionCommandIndex = i;
break;
var cmd = completion.Successful.Commands[i];
// Set completion index if the command is a completion
if (cmd.CancelWorkflowExecution != null ||
cmd.CompleteWorkflowExecution != null ||
cmd.ContinueAsNewWorkflowExecution != null ||
cmd.FailWorkflowExecution != null)
{
completionCommandIndex = i;
break;
}
}
}

// If there is no completion command or it's already at the end, nothing to do
if (completionCommandIndex == -1 ||
completionCommandIndex == completion.Successful.Commands.Count - 1)
workflowComplete = completionCommandIndex >= 0;

// This only applies for successful activations that have a completion not at the end,
// don't have completion reordering disabled, and that are either not replaying or have
// the flag set.
if (completion.Successful == null ||
completionCommandIndex == -1 ||
completionCommandIndex == completion.Successful.Commands.Count - 1 ||
disableCompletionCommandReordering ||
(IsReplaying && !act.AvailableInternalFlags.Contains(
(uint)WorkflowLogicFlag.ReorderWorkflowCompletion)))
{
return;
}

// Now we know the completion is in the wrong spot, so set the SDK flag and move it
// Now we know the completion is in the wrong spot and we're on a newer SDK, so set the
// SDK flag and move it
completion.Successful.UsedInternalFlags.Add((uint)WorkflowLogicFlag.ReorderWorkflowCompletion);
var compCmd = completion.Successful.Commands[completionCommandIndex];
completion.Successful.Commands.RemoveAt(completionCommandIndex);
Expand Down Expand Up @@ -2230,5 +2266,86 @@ public override Task SignalAsync(
public override Task CancelAsync() =>
instance.outbound.Value.CancelExternalWorkflowAsync(new(Id: Id, RunId: RunId));
}

private class Handlers : LinkedList<Handlers.Handler>
{
#pragma warning disable SA1118 // We're ok w/ string literals spanning lines
private static readonly Action<ILogger, string, WarnableSignals, Exception?> SignalWarning =
LoggerMessage.Define<string, WarnableSignals>(
LogLevel.Warning,
0,
"Workflow {Id} finished while signal handlers are still running. This may " +
"have interrupted work that the signal handler was doing. You can wait for " +
"all update and signal handlers to complete by using `await " +
"Workflow.WaitConditionAsync(() => Workflow.AllHandlersFinished)`. " +
"Alternatively, if both you and the clients sending the signal are okay with " +
"interrupting running handlers when the workflow finishes, and causing " +
"clients to receive errors, then you can disable this warning via the signal " +
"handler attribute: " +
"`[WorkflowSignal(UnfinishedPolicy=HandlerUnfinishedPolicy.Abandon)]`. The " +
"following signals were unfinished (and warnings were not disabled for their " +
"handler): {Handlers}");

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't the handlers be returned as the own key and the message be constant? That way the message can be easily filtered for and any unfinished updates could be easily extracted? Basically I am asking why we can't treat this like other SDKs logs that use structured logging

Copy link
Member Author

@cretz cretz Jul 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you clarify what you mean by constant? This doesn't change if that's what you mean.

We did want to put the literal linked list of this information in the structured logger, but .NET forces linked list formatting and I am matching the Python PR by using JSON. I considered making a whole new data type, but then I would need to make the data type available as part of our public API and guarantee compatibility on it. We are not doing that in the Python PR (there is no extraction of handlers from the warning, though it could be easily done). .NET wasn't meant to add any features over the Python one, which is English message only w/ JSON appended. If we do want to expose data types for handlers we can, but I think that requires more thought and more API surface and more guarantees.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The message here has workflow ID substituted in no Workflow {Id} finished? That makes it harder to filter for occurrences of this log in a logging system since users can't just filter on a direct match.

Not sure you need to make the data type public, as long as the json format is consistent it could be parsed. You do have an UpdateInfo type maybe you could reuse that? I don't feel strongly about requiring the list to always be formatted in json, I would expect the format to be left to the logger, but if others do that's fine.

For example when we warn on unhandled signals in Go we return the list as part of a key-value. For updates I would do something similar (although we would also need to add UpdateID as well)

https://github.com/temporalio/sdk-go/blob/992d42745e67fb863a3ae4f88d4ce7e83ca54b9c/internal/internal_workflow.go#L666

Copy link
Member Author

@cretz cretz Jul 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The message here has workflow ID substituted in no Workflow {Id} finished? That makes it harder to filter for occurrences of this log in a logging system since users can't just filter on a direct match.

This is in addition to scoped values. The thing with .NET unlike other SDKs is that their scopes are often not logged. So we also added workflow ID to the message here. Arguably I should add workflow ID to all English messages from a workflow, but regardless, all workflows have a "scope" with a dictionary with this information and the logger can be configured with a format to expose this scoped information (but many don't, and it's not in there by default like it is in Go and Python).

would expect the format to be left to the logger, but if others do that's fine.

In .NET, there is not a good stable/format for list/array items. It is more important to make the English read well than to reuse .NET collection types in the structured log. So we can either do string or new data types w/ custom formatter. I think if users get to the point where they want to extract the structure out, we can expose entirely new, stable data types that reflect this structure (a list of two-field objects), but until we want to expose those data types, we can/should stick with string. And if/when we do want structured objects, we should also add them to the Python warning.

For example when we warn on unhandled signals in Go we return the list as part of a key-value. For updates I would do something similar (although we would also need to add UpdateID as well)

This is more than a list, this is a list of objects with two fields each. The question becomes whether we need a stable data structure for that. I don't think we do. For instance, when Go SDK has the code for this, you will likely need to choose between a new object to get formatting right (that users probably can't access unless we're talking exposure), or JSON formatting. Though Go may be lucky enough to have a logger that defaults to a reasonable format for lists, but .NET and others don't necessarily.

Copy link
Member Author

@cretz cretz Jul 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am going to refactor to a new data type that I just won't expose. Users won't really be able to do anything with it except via reflection (but if we decide to expose this data type later we can).

EDIT: There is now a new data type but users can't do anything with really besides reflection. We can't make it extend/implement anything useful like collection, because .NET formatting is short-circuiting that in its formatting logic.


private static readonly Action<ILogger, string, WarnableUpdates, Exception?> UpdateWarning =
LoggerMessage.Define<string, WarnableUpdates>(
Comment on lines +2289 to +2290
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could maybe combine these two warning functions generically to save on duping the boilerplate part of the message. Not a big deal.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 In this case I am matching the contents of temporalio/sdk-python#556, but can make this change generally if needed.

LogLevel.Warning,
0,
"Workflow {Id} finished while update handlers are still running. This may " +
"have interrupted work that the update handler was doing, and the client " +
"that sent the update will receive a 'workflow execution already completed' " +
"RpcException instead of the update result. You can wait for all update and " +
"signal handlers to complete by using `await " +
"Workflow.WaitConditionAsync(() => Workflow.AllHandlersFinished)`. " +
"Alternatively, if both you and the clients sending the update are okay with " +
"interrupting running handlers when the workflow finishes, and causing " +
"clients to receive errors, then you can disable this warning via the update " +
"handler attribute: " +
"`[WorkflowUpdate(UnfinishedPolicy=HandlerUnfinishedPolicy.Abandon)]`. The " +
"following updates were unfinished (and warnings were not disabled for their " +
"handler): {Handlers}");
#pragma warning restore SA1118

public void WarnIfAnyLeftOver(string id, ILogger logger)
{
var signals = this.
Where(h => h.UpdateId == null && h.UnfinishedPolicy == HandlerUnfinishedPolicy.WarnAndAbandon).
GroupBy(h => h.Name).
Select(h => (h.Key, h.Count())).
ToArray();
if (signals.Length > 0)
{
SignalWarning(logger, id, new WarnableSignals { NamesAndCounts = signals }, null);
}
var updates = this.
Where(h => h.UpdateId != null && h.UnfinishedPolicy == HandlerUnfinishedPolicy.WarnAndAbandon).
Select(h => (h.Name, h.UpdateId!)).
ToArray();
if (updates.Length > 0)
{
UpdateWarning(logger, id, new WarnableUpdates { NamesAndIds = updates }, null);
}
}

public readonly struct WarnableSignals
{
public (string, int)[] NamesAndCounts { get; init; }

public override string ToString() => JsonSerializer.Serialize(
NamesAndCounts.Select(v => new { name = v.Item1, count = v.Item2 }).ToArray());
}

public readonly struct WarnableUpdates
{
public (string, string)[] NamesAndIds { get; init; }

public override string ToString() => JsonSerializer.Serialize(
NamesAndIds.Select(v => new { name = v.Item1, id = v.Item2 }).ToArray());
}

public record Handler(
string Name,
string? UpdateId,
HandlerUnfinishedPolicy UnfinishedPolicy);
}
}
}
27 changes: 27 additions & 0 deletions src/Temporalio/Workflows/HandlerUnfinishedPolicy.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
namespace Temporalio.Workflows
{
/// <summary>
/// Actions taken if a workflow terminates with running handlers.
/// </summary>
/// <remarks>
/// Policy defining actions taken when a workflow exits while update or signal handlers are
/// running. The workflow exit may be due to successful return, failure, cancellation, or
/// continue-as-new.
/// </remarks>
public enum HandlerUnfinishedPolicy
{
/// <summary>
/// Issue a warning in addition to abandoning.
/// </summary>
WarnAndAbandon,

/// <summary>
/// Abandon the handler.
/// </summary>
/// <remarks>
/// In the case of an update handler this means that the client will receive an error rather
/// than the update result.
/// </remarks>
Abandon,
}
}
5 changes: 5 additions & 0 deletions src/Temporalio/Workflows/IWorkflowContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@ namespace Temporalio.Workflows
/// </summary>
internal interface IWorkflowContext
{
/// <summary>
/// Gets a value indicating whether <see cref="Workflow.AllHandlersFinished" /> is true.
/// </summary>
bool AllHandlersFinished { get; }

/// <summary>
/// Gets value for <see cref="Workflow.CancellationToken" />.
/// </summary>
Expand Down
10 changes: 10 additions & 0 deletions src/Temporalio/Workflows/Workflow.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,16 @@ namespace Temporalio.Workflows
/// </summary>
public static class Workflow
{
/// <summary>
/// Gets a value indicating whether all update and signal handlers have finished executing.
/// </summary>
/// <remarks>
/// Consider waiting on this condition before workflow return or continue-as-new, to prevent
/// interruption of in-progress handlers by workflow return:
/// <c>await Workflow.WaitConditionAsync(() => Workflow.AllHandlersFinished)</c>.
/// </remarks>
public static bool AllHandlersFinished => Context.AllHandlersFinished;

/// <summary>
/// Gets the cancellation token for the workflow.
/// </summary>
Expand Down
6 changes: 6 additions & 0 deletions src/Temporalio/Workflows/WorkflowSignalAttribute.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,5 +43,11 @@ public WorkflowSignalAttribute(string name)
/// an array of <see cref="Converters.IRawValue" />.
/// </summary>
public bool Dynamic { get; set; }

/// <summary>
/// Gets or sets the actions taken if a workflow exits with a running instance of this
/// handler. Default is <see cref="HandlerUnfinishedPolicy.WarnAndAbandon" />.
/// </summary>
public HandlerUnfinishedPolicy UnfinishedPolicy { get; set; } = HandlerUnfinishedPolicy.WarnAndAbandon;
}
}
22 changes: 18 additions & 4 deletions src/Temporalio/Workflows/WorkflowSignalDefinition.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,16 @@ public class WorkflowSignalDefinition
{
private static readonly ConcurrentDictionary<MethodInfo, WorkflowSignalDefinition> Definitions = new();

private WorkflowSignalDefinition(string? name, MethodInfo? method, Delegate? del)
private WorkflowSignalDefinition(
string? name,
MethodInfo? method,
Delegate? del,
HandlerUnfinishedPolicy unfinishedPolicy)
{
Name = name;
Method = method;
Delegate = del;
UnfinishedPolicy = unfinishedPolicy;
}

/// <summary>
Expand All @@ -39,6 +44,11 @@ private WorkflowSignalDefinition(string? name, MethodInfo? method, Delegate? del
/// </summary>
internal Delegate? Delegate { get; private init; }

/// <summary>
/// Gets the unfinished policy.
/// </summary>
internal HandlerUnfinishedPolicy UnfinishedPolicy { get; private init; }

/// <summary>
/// Get a signal definition from a method or fail. The result is cached.
/// </summary>
Expand All @@ -63,12 +73,16 @@ public static WorkflowSignalDefinition FromMethod(MethodInfo method)
/// </summary>
/// <param name="name">Signal name. Null for dynamic signal.</param>
/// <param name="del">Signal delegate.</param>
/// <param name="unfinishedPolicy">Actions taken if a workflow exits with a running instance
/// of this handler.</param>
/// <returns>Signal definition.</returns>
public static WorkflowSignalDefinition CreateWithoutAttribute(
string? name, Delegate del)
string? name,
Delegate del,
HandlerUnfinishedPolicy unfinishedPolicy = HandlerUnfinishedPolicy.WarnAndAbandon)
{
AssertValid(del.Method, dynamic: name == null);
return new(name, null, del);
return new(name, null, del, unfinishedPolicy);
}

/// <summary>
Expand Down Expand Up @@ -103,7 +117,7 @@ private static WorkflowSignalDefinition CreateFromMethod(MethodInfo method)
name = name.Substring(0, name.Length - 5);
}
}
return new(name, method, null);
return new(name, method, null, attr.UnfinishedPolicy);
}

private static void AssertValid(MethodInfo method, bool dynamic)
Expand Down
Loading