Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 14 additions & 4 deletions api/persistence/v1/executions.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

46 changes: 45 additions & 1 deletion components/nexusoperations/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (d CancelRequestedEventDefinition) Type() enumspb.EventType {

func (d CancelRequestedEventDefinition) Apply(root *hsm.Node, event *historypb.HistoryEvent) error {
_, err := transitionOperation(root, event, func(node *hsm.Node, o Operation) (hsm.TransitionOutput, error) {
return o.Cancel(node, event.EventTime.AsTime())
return o.Cancel(node, event.EventTime.AsTime(), event.EventId)
})

return err
Expand All @@ -79,6 +79,44 @@ func (d CancelRequestedEventDefinition) CherryPick(root *hsm.Node, event *histor
return hsm.ErrNotCherryPickable
}

type CancelRequestCompletedEventDefinition struct{}

func (d CancelRequestCompletedEventDefinition) IsWorkflowTaskTrigger() bool {
return false
}

func (d CancelRequestCompletedEventDefinition) Type() enumspb.EventType {
return enumspb.EVENT_TYPE_NEXUS_OPERATION_CANCEL_REQUEST_COMPLETED
}

func (d CancelRequestCompletedEventDefinition) Apply(root *hsm.Node, event *historypb.HistoryEvent) error {
return nil
}

func (d CancelRequestCompletedEventDefinition) CherryPick(root *hsm.Node, event *historypb.HistoryEvent, _ map[enumspb.ResetReapplyExcludeType]struct{}) error {
// We never cherry pick command events, and instead allow user logic to reschedule those commands.
return hsm.ErrNotCherryPickable
}

type CancelRequestFailedEventDefinition struct{}

func (d CancelRequestFailedEventDefinition) IsWorkflowTaskTrigger() bool {
return false
}

func (d CancelRequestFailedEventDefinition) Type() enumspb.EventType {
return enumspb.EVENT_TYPE_NEXUS_OPERATION_CANCEL_REQUEST_FAILED
}

func (d CancelRequestFailedEventDefinition) Apply(root *hsm.Node, event *historypb.HistoryEvent) error {
return nil
}

func (d CancelRequestFailedEventDefinition) CherryPick(root *hsm.Node, event *historypb.HistoryEvent, _ map[enumspb.ResetReapplyExcludeType]struct{}) error {
// We never cherry pick command events, and instead allow user logic to reschedule those commands.
return hsm.ErrNotCherryPickable
}

type StartedEventDefinition struct{}

func (d StartedEventDefinition) IsWorkflowTaskTrigger() bool {
Expand Down Expand Up @@ -239,6 +277,12 @@ func RegisterEventDefinitions(reg *hsm.Registry) error {
if err := reg.RegisterEventDefinition(CancelRequestedEventDefinition{}); err != nil {
return err
}
if err := reg.RegisterEventDefinition(CancelRequestCompletedEventDefinition{}); err != nil {
return err
}
if err := reg.RegisterEventDefinition(CancelRequestFailedEventDefinition{}); err != nil {
return err
}
if err := reg.RegisterEventDefinition(StartedEventDefinition{}); err != nil {
return err
}
Expand Down
21 changes: 21 additions & 0 deletions components/nexusoperations/executors.go
Original file line number Diff line number Diff line change
Expand Up @@ -663,6 +663,17 @@ func (e taskExecutor) saveCancelationResult(ctx context.Context, env hsm.Environ
if err != nil {
return hsm.TransitionOutput{}, err
}
n.AddHistoryEvent(enumspb.EVENT_TYPE_NEXUS_OPERATION_CANCEL_REQUEST_FAILED, func(e *historypb.HistoryEvent) {
// nolint:revive // We must mutate here even if the linter doesn't like it.
e.Attributes = &historypb.HistoryEvent_NexusOperationCancelRequestFailedEventAttributes{
NexusOperationCancelRequestFailedEventAttributes: &historypb.NexusOperationCancelRequestFailedEventAttributes{
RequestedEventId: c.RequestedEventId,
Failure: failure,
},
}
// nolint:revive // We must mutate here even if the linter doesn't like it.
e.WorkerMayIgnore = true // For compatibility with older SDKs.
})
if !isRetryable {
return TransitionCancelationFailed.Apply(c, EventCancelationFailed{
Time: env.Now(),
Expand All @@ -680,6 +691,16 @@ func (e taskExecutor) saveCancelationResult(ctx context.Context, env hsm.Environ
// Cancelation request transmitted successfully.
// The operation is not yet canceled and may ignore our request, the outcome will be known via the
// completion callback.
n.AddHistoryEvent(enumspb.EVENT_TYPE_NEXUS_OPERATION_CANCEL_REQUEST_COMPLETED, func(e *historypb.HistoryEvent) {
// nolint:revive // We must mutate here even if the linter doesn't like it.
e.Attributes = &historypb.HistoryEvent_NexusOperationCancelRequestCompletedEventAttributes{
NexusOperationCancelRequestCompletedEventAttributes: &historypb.NexusOperationCancelRequestCompletedEventAttributes{
RequestedEventId: c.RequestedEventId,
},
}
// nolint:revive // We must mutate here even if the linter doesn't like it.
e.WorkerMayIgnore = true // For compatibility with older SDKs.
})
return TransitionCancelationSucceeded.Apply(c, EventCancelationSucceeded{
Time: env.Now(),
Node: n,
Expand Down
8 changes: 4 additions & 4 deletions components/nexusoperations/executors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,7 @@ func TestProcessInvocationTask(t *testing.T) {
if tc.cancelBeforeStart {
op, err := hsm.MachineData[nexusoperations.Operation](node)
require.NoError(t, err)
_, err = op.Cancel(node, time.Now())
_, err = op.Cancel(node, time.Now(), 0)
require.NoError(t, err)
c, err := op.Cancelation(node)
require.NoError(t, err)
Expand Down Expand Up @@ -759,7 +759,7 @@ func TestProcessCancelationTask(t *testing.T) {
Node: node,
})
require.NoError(t, err)
_, err = op.Cancel(node, time.Now())
_, err = op.Cancel(node, time.Now(), 0)
require.NoError(t, err)
node, err = node.Child([]hsm.Key{nexusoperations.CancelationMachineKey})
require.NoError(t, err)
Expand Down Expand Up @@ -866,7 +866,7 @@ func TestProcessCancelationTask_OperationCompleted(t *testing.T) {
Node: node,
})
require.NoError(t, err)
_, err = op.Cancel(node, time.Now())
_, err = op.Cancel(node, time.Now(), 0)
require.NoError(t, err)
_, err = nexusoperations.TransitionSucceeded.Apply(op, nexusoperations.EventSucceeded{
Node: node,
Expand Down Expand Up @@ -925,7 +925,7 @@ func TestProcessCancelationBackoffTask(t *testing.T) {
Node: node,
})
require.NoError(t, err)
_, err = op.Cancel(node, time.Now())
_, err = op.Cancel(node, time.Now(), 0)
require.NoError(t, err)

node, err = node.Child([]hsm.Key{nexusoperations.CancelationMachineKey})
Expand Down
2 changes: 1 addition & 1 deletion components/nexusoperations/statemachine.go
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,7 @@ var TransitionTimedOut = hsm.NewTransition(
// machine will stay in UNSPECIFIED state. If the Operation is in STARTED state, then transition the
// Cancelation machine to the SCHEDULED state. Otherwise, the Cancelation machine will wait the
// Operation machine transition to the STARTED state.
func (o Operation) Cancel(node *hsm.Node, t time.Time) (hsm.TransitionOutput, error) {
func (o Operation) Cancel(node *hsm.Node, t time.Time, requestedEventID int64) (hsm.TransitionOutput, error) {
child, err := node.AddChild(CancelationMachineKey, Cancelation{
NexusOperationCancellationInfo: &persistencespb.NexusOperationCancellationInfo{},
})
Expand Down
6 changes: 3 additions & 3 deletions components/nexusoperations/statemachine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,7 @@ func TestCancel(t *testing.T) {
Node: root,
})
require.NoError(t, err)
_, err = op.Cancel(root, time.Now())
_, err = op.Cancel(root, time.Now(), 0)
require.NoError(t, err)
require.Equal(t, enumsspb.NEXUS_OPERATION_STATE_STARTED, op.State())
node, err := root.Child([]hsm.Key{nexusoperations.CancelationMachineKey})
Expand All @@ -478,7 +478,7 @@ func TestCancelationValidTransitions(t *testing.T) {
})
}))
require.NoError(t, hsm.MachineTransition(root, func(op nexusoperations.Operation) (hsm.TransitionOutput, error) {
return op.Cancel(root, time.Now())
return op.Cancel(root, time.Now(), 0)
}))
node, err := root.Child([]hsm.Key{nexusoperations.CancelationMachineKey})
require.NoError(t, err)
Expand Down Expand Up @@ -577,7 +577,7 @@ func TestCancelationBeforeStarted(t *testing.T) {
backend := &hsmtest.NodeBackend{}
root := newOperationNode(t, backend, mustNewScheduledEvent(time.Now(), 0))
require.NoError(t, hsm.MachineTransition(root, func(op nexusoperations.Operation) (hsm.TransitionOutput, error) {
return op.Cancel(root, time.Now())
return op.Cancel(root, time.Now(), 0)
}))
opLog, err := root.Parent.OpLog()
require.NoError(t, err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -762,6 +762,9 @@ message NexusOperationCancellationInfo {
temporal.api.failure.v1.Failure last_attempt_failure = 5;
// The time when the next attempt is scheduled.
google.protobuf.Timestamp next_attempt_schedule_time = 6;

// The event ID of the NEXUS_OPERATION_CANCEL_REQUESTED event for this cancelation.
int64 requested_event_id = 7;
}

// ResetChildInfo contains the state and actions to be performed on children when a parent workflow resumes after reset.
Expand Down
24 changes: 23 additions & 1 deletion tests/nexus_workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ func (s *NexusWorkflowTestSuite) TestNexusOperationCancelation() {
taskQueue := testcore.RandomizeStr(s.T().Name())
endpointName := testcore.RandomizedNexusEndpoint(s.T().Name())

errSent := false
h := nexustest.Handler{
OnStartOperation: func(ctx context.Context, service, operation string, input *nexus.LazyValue, options nexus.StartOperationOptions) (nexus.HandlerStartOperationResult[any], error) {
if service != "service" {
Expand All @@ -89,6 +90,11 @@ func (s *NexusWorkflowTestSuite) TestNexusOperationCancelation() {
return &nexus.HandlerStartOperationResultAsync{OperationToken: "test"}, nil
},
OnCancelOperation: func(ctx context.Context, service, operation, token string, options nexus.CancelOperationOptions) error {
if !errSent {
// Fail cancel request once to test NexusOperationCancelRequestFailed event is recorded and request is retried.
errSent = true
return nexus.HandlerErrorf(nexus.HandlerErrorTypeInternal, "intentional cancel error for test")
}
return nil
},
}
Expand Down Expand Up @@ -203,11 +209,18 @@ func (s *NexusWorkflowTestSuite) TestNexusOperationCancelation() {
assert.Equal(t, "operation", op.Operation)
assert.Equal(t, enumspb.PENDING_NEXUS_OPERATION_STATE_STARTED, op.State)
assert.Equal(t, enumspb.NEXUS_OPERATION_CANCELLATION_STATE_SUCCEEDED, op.CancellationInfo.State)

}, time.Second*10, time.Millisecond*30)

err = s.SdkClient().TerminateWorkflow(ctx, run.GetID(), run.GetRunID(), "test")
s.NoError(err)

hist := s.GetHistory(s.Namespace().String(), &commonpb.WorkflowExecution{
WorkflowId: run.GetID(),
RunId: run.GetRunID(),
})
s.ContainsHistoryEvents(`
NexusOperationCancelRequestFailed
NexusOperationCancelRequestCompleted`, hist)
}

func (s *NexusWorkflowTestSuite) TestNexusOperationSyncCompletion() {
Expand Down Expand Up @@ -1514,6 +1527,15 @@ func (s *NexusWorkflowTestSuite) TestNexusOperationCancelBeforeStarted_Cancelati
// Terminate the workflow for good measure.
err = s.SdkClient().TerminateWorkflow(ctx, run.GetID(), run.GetRunID(), "test")
s.NoError(err)

// Assert that we did not send a cancel request until after the operation was started.
hist := s.GetHistory(s.Namespace().String(), &commonpb.WorkflowExecution{
WorkflowId: run.GetID(),
RunId: run.GetRunID(),
})
s.ContainsHistoryEvents(`
NexusOperationCancelRequested
NexusOperationStarted`, hist)
}

func (s *NexusWorkflowTestSuite) TestNexusOperationAsyncCompletionAfterReset() {
Expand Down
Loading