Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 13 additions & 40 deletions loadgen/kitchen_sink_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,10 +183,7 @@ func TestKitchenSink(t *testing.T) {
},
},
expectedUnsupportedErrs: map[cmdoptions.Language]string{
cmdoptions.LangJava: "client actions activity is not supported",
cmdoptions.LangPython: "client actions activity is not supported",
cmdoptions.LangTypeScript: "client actions activity is not supported",
cmdoptions.LangDotNet: "client actions activity is not supported",
cmdoptions.LangJava: "client actions activity is not supported",
},
historyMatcher: PartialHistoryMatcher(`WorkflowExecutionSignaled`),
},
Expand Down Expand Up @@ -219,10 +216,7 @@ func TestKitchenSink(t *testing.T) {
},
},
expectedUnsupportedErrs: map[cmdoptions.Language]string{
cmdoptions.LangJava: "client actions activity is not supported",
cmdoptions.LangPython: "client actions activity is not supported",
cmdoptions.LangTypeScript: "client actions activity is not supported",
cmdoptions.LangDotNet: "client actions activity is not supported",
cmdoptions.LangJava: "client actions activity is not supported",
},
historyMatcher: PartialHistoryMatcher(`WorkflowExecutionSignaled`),
},
Expand All @@ -249,10 +243,7 @@ func TestKitchenSink(t *testing.T) {
},
},
expectedUnsupportedErrs: map[cmdoptions.Language]string{
cmdoptions.LangJava: "client actions activity is not supported",
cmdoptions.LangPython: "client actions activity is not supported",
cmdoptions.LangTypeScript: "client actions activity is not supported",
cmdoptions.LangDotNet: "client actions activity is not supported",
cmdoptions.LangJava: "client actions activity is not supported",
},
historyMatcher: PartialHistoryMatcher(`WorkflowExecutionSignaled {"signalName":"test_signal"}`),
},
Expand Down Expand Up @@ -281,10 +272,7 @@ func TestKitchenSink(t *testing.T) {
ActivityTaskStarted
ActivityTaskCompleted`),
expectedUnsupportedErrs: map[cmdoptions.Language]string{
cmdoptions.LangJava: "client actions activity is not supported",
cmdoptions.LangPython: "client actions activity is not supported",
cmdoptions.LangTypeScript: "client actions activity is not supported",
cmdoptions.LangDotNet: "client actions activity is not supported",
cmdoptions.LangJava: "client actions activity is not supported",
},
},
{
Expand Down Expand Up @@ -315,10 +303,7 @@ func TestKitchenSink(t *testing.T) {
ActivityTaskStarted
ActivityTaskCompleted`),
expectedUnsupportedErrs: map[cmdoptions.Language]string{
cmdoptions.LangJava: "client actions activity is not supported",
cmdoptions.LangPython: "client actions activity is not supported",
cmdoptions.LangTypeScript: "client actions activity is not supported",
cmdoptions.LangDotNet: "client actions activity is not supported",
cmdoptions.LangJava: "client actions activity is not supported",
},
},
{
Expand Down Expand Up @@ -350,10 +335,7 @@ func TestKitchenSink(t *testing.T) {
...
WorkflowExecutionUpdateAccepted {"acceptedRequest":{"input":{"name":"do_actions_update"}}}`),
expectedUnsupportedErrs: map[cmdoptions.Language]string{
cmdoptions.LangJava: "client actions activity is not supported",
cmdoptions.LangPython: "client actions activity is not supported",
cmdoptions.LangTypeScript: "client actions activity is not supported",
cmdoptions.LangDotNet: "client actions activity is not supported",
cmdoptions.LangJava: "client actions activity is not supported",
},
},
{
Expand Down Expand Up @@ -384,10 +366,7 @@ func TestKitchenSink(t *testing.T) {
ActivityTaskStarted
ActivityTaskCompleted`),
expectedUnsupportedErrs: map[cmdoptions.Language]string{
cmdoptions.LangJava: "client actions activity is not supported",
cmdoptions.LangPython: "client actions activity is not supported",
cmdoptions.LangTypeScript: "client actions activity is not supported",
cmdoptions.LangDotNet: "client actions activity is not supported",
cmdoptions.LangJava: "client actions activity is not supported",
},
},
{
Expand Down Expand Up @@ -419,10 +398,7 @@ func TestKitchenSink(t *testing.T) {
},
},
expectedUnsupportedErrs: map[cmdoptions.Language]string{
cmdoptions.LangJava: "client actions activity is not supported",
cmdoptions.LangPython: "client actions activity is not supported",
cmdoptions.LangTypeScript: "client actions activity is not supported",
cmdoptions.LangDotNet: "client actions activity is not supported",
cmdoptions.LangJava: "client actions activity is not supported",
},
historyMatcher: PartialHistoryMatcher(`WorkflowExecutionUpdateCompleted`),
},
Expand All @@ -446,9 +422,9 @@ func TestKitchenSink(t *testing.T) {
},
},
expectedUnsupportedErrs: map[cmdoptions.Language]string{
cmdoptions.LangTypeScript: "client actions activity is not supported",
cmdoptions.LangDotNet: "client actions activity is not supported",
cmdoptions.LangPython: "client actions activity is not supported",
cmdoptions.LangTypeScript: "concurrent client actions are not supported",
cmdoptions.LangDotNet: "concurrent client actions are not supported",
cmdoptions.LangPython: "concurrent client actions are not supported",
cmdoptions.LangJava: "client actions activity is not supported",
},
historyMatcher: PartialHistoryMatcher(`
Expand Down Expand Up @@ -504,10 +480,7 @@ func TestKitchenSink(t *testing.T) {
},
},
expectedUnsupportedErrs: map[cmdoptions.Language]string{
cmdoptions.LangJava: "client actions activity is not supported",
cmdoptions.LangPython: "client actions activity is not supported",
cmdoptions.LangTypeScript: "client actions activity is not supported",
cmdoptions.LangDotNet: "client actions activity is not supported",
cmdoptions.LangJava: "client actions activity is not supported",
},
historyMatcher: PartialHistoryMatcher(`
WorkflowExecutionSignaled
Expand Down Expand Up @@ -695,7 +668,7 @@ func TestKitchenSink(t *testing.T) {
},
WorkflowInput: &WorkflowInput{
InitialActions: ListActionSet(
NewTimerAction(2000), // timer to keep workflow open long enough for client action
NewTimerAction(5000), // timer to keep workflow open long enough for client action
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Trying to mitigate flaky tests. Better solution might be to add AllHandlersFinished but I'm not quite sure how to.

),
},
},
Expand Down
197 changes: 193 additions & 4 deletions workers/dotnet/Temporalio.Omes/ClientActionsExecutor.cs
Original file line number Diff line number Diff line change
@@ -1,16 +1,205 @@
using Temporalio.Client;
using Temporalio.Exceptions;
using Temporal.Omes.KitchenSink;
using Temporalio.Api.Enums.V1;

namespace Temporalio.Omes;

public class ClientActionsExecutor
{
private readonly ITemporalClient _client;
private readonly string _workflowType = "kitchenSink";
private readonly string _taskQueue;
private object? _workflowInput = null;
private string _runId = "";

public string? WorkflowId { get; set; }

public ClientActionsExecutor(ITemporalClient client, string workflowId, string taskQueue)
{
_client = client;
WorkflowId = workflowId;
_taskQueue = taskQueue;
}

public async Task ExecuteClientSequence(ClientSequence clientSeq)
{
foreach (var actionSet in clientSeq.ActionSets)
{
await ExecuteClientActionSet(actionSet);
}
}

private async Task ExecuteClientActionSet(ClientActionSet actionSet)
{
if (actionSet.Concurrent)
{
throw new ApplicationFailureException("concurrent client actions are not supported", "UnsupportedOperation", nonRetryable: true);
}

foreach (var action in actionSet.Actions)
{
await ExecuteClientAction(action);
}
}

private async Task ExecuteClientAction(ClientAction action)
{
if (action.DoSignal != null)
{
await ExecuteSignalAction(action.DoSignal);
}
else if (action.DoUpdate != null)
{
await ExecuteUpdateAction(action.DoUpdate);
}
else if (action.DoQuery != null)
{
await ExecuteQueryAction(action.DoQuery);
}
else if (action.NestedActions != null)
{
await ExecuteClientActionSet(action.NestedActions);
}
else
{
throw new ArgumentException("Client action must have a recognized variant");
}
}

private async Task ExecuteSignalAction(DoSignal signal)
{
string signalName;
object? signalArgs = null;

if (signal.DoSignalActions != null)
{
signalName = "do_actions_signal";
signalArgs = signal.DoSignalActions;
}
else if (signal.Custom != null)
{
signalName = signal.Custom.Name;
signalArgs = signal.Custom.Args?.ToArray();
}
else
{
throw new ArgumentException("DoSignal must have a recognizable variant");
}

var args = NormalizeArgsToArray(signalArgs);
if (signal.WithStart)
{
var options = new WorkflowOptions(id: WorkflowId!, taskQueue: _taskQueue);
options.SignalWithStart(signalName, args);

var handle = await _client.StartWorkflowAsync(
_workflowType,
NormalizeArgsToArray(_workflowInput),
options);

WorkflowId = handle.Id;
_runId = handle.RunId ?? "";
}
else
{
var handle = _client.GetWorkflowHandle(WorkflowId!);
await handle.SignalAsync(signalName, args);
}
}

// Always unsupported in this branch
public Task ExecuteClientSequence(ClientSequence _)
=> throw new ApplicationFailureException("client actions activity is not supported", "UnsupportedOperation", nonRetryable: true);
}
private async Task ExecuteUpdateAction(DoUpdate update)
{
string updateName;
object? updateArgs;
if (update.DoActions != null)
{
updateName = "do_actions_update";
updateArgs = update.DoActions;
}
else if (update.Custom != null)
{
updateName = update.Custom.Name;
updateArgs = update.Custom.Args.Count > 0 ? update.Custom.Args : null;
}
else
{
throw new ArgumentException("DoUpdate must have a recognizable variant");
}

try
{
var args = NormalizeArgsToArray(updateArgs);
if (update.WithStart)
{
var startOperation = WithStartWorkflowOperation.Create(
_workflowType,
NormalizeArgsToArray(_workflowInput),
new(id: WorkflowId!, taskQueue: _taskQueue)
{
IdConflictPolicy = WorkflowIdConflictPolicy.UseExisting
});

await _client.ExecuteUpdateWithStartWorkflowAsync(
updateName,
args,
new(startOperation));

var handle = await startOperation.GetHandleAsync();
WorkflowId = handle.Id;
_runId = handle.RunId ?? "";
}
else
{
var handle = _client.GetWorkflowHandle(WorkflowId!);
await handle.ExecuteUpdateAsync(updateName, args);
}
}
catch (Exception)
{
if (!update.FailureExpected)
{
throw;
}
}
}

private async Task ExecuteQueryAction(DoQuery query)
{
try
{
if (query.ReportState != null)
{
var handle = _client.GetWorkflowHandle(WorkflowId!);
await handle.QueryAsync<WorkflowState>("report_state", new object[] { query.ReportState });
}
else if (query.Custom != null)
{
var handle = _client.GetWorkflowHandle(WorkflowId!);
var queryArgs = query.Custom.Args.Count > 0 ? query.Custom.Args.ToArray() : null;
await handle.QueryAsync<object>(query.Custom.Name, queryArgs ?? Array.Empty<object>());
}
else
{
throw new ArgumentException("DoQuery must have a recognizable variant");
}
}
catch (Exception)
{
if (!query.FailureExpected)
{
throw;
}
}
}

private static object[] NormalizeArgsToArray(object? args)
{
return args switch
{
null => Array.Empty<object>(),
object[] array => array,
_ => new[] { args }
};
}
}
Loading
Loading