Skip to content

Commit 6695adf

Browse files
authored
Add Workflow.RunTaskAsync and Workflow.WhenAllAsync (#313)
Fixes #303
1 parent 9ffc963 commit 6695adf

File tree

3 files changed

+92
-1
lines changed

3 files changed

+92
-1
lines changed

README.md

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -688,7 +688,8 @@ use `TaskScheduler.Default` implicitly (and some analyzers even encourage this).
688688
with .NET tasks inside of workflows:
689689

690690
* Do not use `Task.Run` - this uses the default scheduler and puts work on the thread pool.
691-
* Use `Task.Factory.StartNew` or instantiate the `Task` and run `Task.Start` on it.
691+
* Use `Workflow.RunTaskAsync` instead.
692+
* Can also use `Task.Factory.StartNew` with current scheduler or instantiate the `Task` and run `Task.Start` on it.
692693
* Do not use `Task.ConfigureAwait(false)` - this will not use the current context.
693694
* If you must use `Task.ConfigureAwait`, use `Task.ConfigureAwait(true)`.
694695
* There is no significant performance benefit to `Task.ConfigureAwait` in workflows anyways due to how the scheduler
@@ -701,6 +702,10 @@ with .NET tasks inside of workflows:
701702
* Use `Workflow.WhenAnyAsync` instead.
702703
* Technically this only applies to an enumerable set of tasks with results or more than 2 tasks with results. Other
703704
uses are safe. See [this issue](https://github.com/dotnet/runtime/issues/87481).
705+
* Do not use `Task.WhenAll`
706+
* Use `Workflow.WhenAllAsync` instead.
707+
* Technically `Task.WhenAll` is currently deterministic in .NET and safe, but it is better to use the wrapper to be
708+
sure.
704709
* Do not use `System.Threading.Semaphore` or `System.Threading.SemaphoreSlim` or `System.Threading.Mutex`.
705710
* Use `Temporalio.Workflows.Semaphore` or `Temporalio.Workflows.Mutex` instead.
706711
* _Technically_ `SemaphoreSlim` does work if only the async form of `WaitAsync` is used without no timeouts and

src/Temporalio/Workflows/Workflow.cs

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1021,6 +1021,39 @@ public static Guid NewGuid()
10211021
/// </remarks>
10221022
public static bool Patched(string patchId) => Context.Patch(patchId, deprecated: false);
10231023

1024+
/// <summary>
1025+
/// Workflow-safe form of <see cref="Task.Run(Func{Task}, CancellationToken)" />.
1026+
/// </summary>
1027+
/// <param name="function">The work to execute asynchronously.</param>
1028+
/// <param name="cancellationToken">A cancellation token that can be used to cancel the work
1029+
/// if it has not yet started. Defaults to <see cref="CancellationToken"/>.</param>
1030+
/// <returns>A task for the running task (but not necessarily the task that is returned
1031+
/// from the function).</returns>
1032+
public static Task RunTaskAsync(
1033+
Func<Task> function, CancellationToken? cancellationToken = null) =>
1034+
Task.Factory.StartNew(
1035+
function,
1036+
cancellationToken ?? CancellationToken,
1037+
TaskCreationOptions.None,
1038+
TaskScheduler.Current).Unwrap();
1039+
1040+
/// <summary>
1041+
/// Workflow-safe form of <see cref="Task.Run{TResult}(Func{TResult}, CancellationToken)" />.
1042+
/// </summary>
1043+
/// <typeparam name="TResult">The type of the result returned by the task.</typeparam>
1044+
/// <param name="function">The work to execute asynchronously.</param>
1045+
/// <param name="cancellationToken">A cancellation token that can be used to cancel the work
1046+
/// if it has not yet started. Defaults to <see cref="CancellationToken"/>.</param>
1047+
/// <returns>A task for the running task (but not necessarily the task that is returned
1048+
/// from the function).</returns>
1049+
public static Task<TResult> RunTaskAsync<TResult>(
1050+
Func<Task<TResult>> function, CancellationToken? cancellationToken = null) =>
1051+
Task.Factory.StartNew(
1052+
function,
1053+
cancellationToken ?? CancellationToken,
1054+
TaskCreationOptions.None,
1055+
TaskScheduler.Current).Unwrap();
1056+
10241057
/// <summary>
10251058
/// Start a child workflow via lambda invoking the run method.
10261059
/// </summary>
@@ -1220,6 +1253,44 @@ public static async Task<Task<TResult>> WhenAnyAsync<TResult>(IEnumerable<Task<T
12201253
return (Task<TResult>)task;
12211254
}
12221255

1256+
/// <summary>
1257+
/// Workflow-safe form of <see cref="Task.WhenAll(IEnumerable{Task})" /> (which just calls
1258+
/// the standard library call currently because it is already safe).
1259+
/// </summary>
1260+
/// <param name="tasks">The tasks to wait on for completion.</param>
1261+
/// <returns>A task that represents the completion of all of the supplied tasks.</returns>
1262+
public static Task WhenAllAsync(IEnumerable<Task> tasks) =>
1263+
Task.WhenAll(tasks);
1264+
1265+
/// <summary>
1266+
/// Workflow-safe form of <see cref="Task.WhenAll(Task[])" /> (which just calls the standard
1267+
/// library call currently because it is already safe).
1268+
/// </summary>
1269+
/// <param name="tasks">The tasks to wait on for completion.</param>
1270+
/// <returns>A task that represents the completion of all of the supplied tasks.</returns>
1271+
public static Task WhenAllAsync(params Task[] tasks) =>
1272+
Task.WhenAll(tasks);
1273+
1274+
/// <summary>
1275+
/// Workflow-safe form of <see cref="Task.WhenAll{TResult}(IEnumerable{Task{TResult}})" />
1276+
/// (which just calls the standard library call currently because it is already safe).
1277+
/// </summary>
1278+
/// <typeparam name="TResult">The type of the completed task..</typeparam>
1279+
/// <param name="tasks">The tasks to wait on for completion.</param>
1280+
/// <returns>A task that represents the completion of all of the supplied tasks.</returns>
1281+
public static Task<TResult[]> WhenAllAsync<TResult>(IEnumerable<Task<TResult>> tasks) =>
1282+
Task.WhenAll(tasks);
1283+
1284+
/// <summary>
1285+
/// Workflow-safe form of <see cref="Task.WhenAll{TResult}(Task{TResult}[])" /> (which just
1286+
/// calls the standard library call currently because it is already safe).
1287+
/// </summary>
1288+
/// <typeparam name="TResult">The type of the completed task..</typeparam>
1289+
/// <param name="tasks">The tasks to wait on for completion.</param>
1290+
/// <returns>A task that represents the completion of all of the supplied tasks.</returns>
1291+
public static Task<TResult[]> WhenAllAsync<TResult>(params Task<TResult>[] tasks) =>
1292+
Task.WhenAll(tasks);
1293+
12231294
/// <summary>
12241295
/// Unsafe calls that can be made in a workflow.
12251296
/// </summary>

tests/Temporalio.Tests/Worker/WorkflowWorkerTests.cs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -242,6 +242,15 @@ public async Task<string> RunAsync(Scenario scenario)
242242
case Scenario.WorkflowWhenAnyWithResultThreeParam:
243243
return await await Workflow.WhenAnyAsync(
244244
Task.FromResult("done"), Task.FromResult("done"), Task.FromResult("done"));
245+
case Scenario.WorkflowWhenAll:
246+
return string.Join(string.Empty, await Workflow.WhenAllAsync(
247+
Task.FromResult("do"), Task.FromResult("ne")));
248+
case Scenario.WorkflowRunTask:
249+
return await Workflow.RunTaskAsync(async () => "done");
250+
case Scenario.WorkflowRunTaskAfterTaskStart:
251+
var runTaskStart = new Task<string>(() => "done");
252+
runTaskStart.Start();
253+
return await Workflow.RunTaskAsync(() => runTaskStart);
245254
}
246255
throw new InvalidOperationException("Unexpected completion");
247256
}
@@ -266,6 +275,9 @@ public enum Scenario
266275
// https://github.com/dotnet/runtime/issues/87481
267276
TaskWhenAnyWithResultTwoParam,
268277
WorkflowWhenAnyWithResultThreeParam,
278+
WorkflowWhenAll,
279+
WorkflowRunTask,
280+
WorkflowRunTaskAfterTaskStart,
269281
}
270282
}
271283

@@ -325,6 +337,9 @@ Task AssertScenarioSucceeds(StandardLibraryCallsWorkflow.Scenario scenario) =>
325337
await AssertScenarioSucceeds(StandardLibraryCallsWorkflow.Scenario.TaskContinueWith);
326338
await AssertScenarioSucceeds(StandardLibraryCallsWorkflow.Scenario.TaskWhenAnyWithResultTwoParam);
327339
await AssertScenarioSucceeds(StandardLibraryCallsWorkflow.Scenario.WorkflowWhenAnyWithResultThreeParam);
340+
await AssertScenarioSucceeds(StandardLibraryCallsWorkflow.Scenario.WorkflowWhenAll);
341+
await AssertScenarioSucceeds(StandardLibraryCallsWorkflow.Scenario.WorkflowRunTask);
342+
await AssertScenarioSucceeds(StandardLibraryCallsWorkflow.Scenario.WorkflowRunTaskAfterTaskStart);
328343
}
329344

330345
[Workflow]

0 commit comments

Comments
 (0)