Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix replication issues with nexus operations and cancelation requests #6457

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 26 additions & 1 deletion components/nexusoperations/executors.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,11 +117,22 @@ func (e taskExecutor) executeInvocationTask(ctx context.Context, env hsm.Environ
return fmt.Errorf("failed to get namespace by ID: %w", err)
}

args, err := e.loadOperationArgs(ctx, ns, env, ref)
args, err := e.loadOperationArgs(ctx, ns, env, ref, task)
if err != nil {
return fmt.Errorf("failed to load operation args: %w", err)
}

if args.cancelRequested {
// TODO(bergundy): Properly support cancel before started. We need to transmit this intent to cancel to the
// handler because we don't know for sure that the operation hasn't been started.
return e.saveResult(ctx, env, ref, nil, &nexus.UnsuccessfulOperationError{
State: nexus.OperationStateCanceled,
Failure: nexus.Failure{
Message: "operation canceled before it was started",
},
})
}

// This happens when we accept the ScheduleNexusOperation command when the endpoint is not found in the registry as
// indicated by the EndpointNotFoundAlwaysNonRetryable dynamic config.
if args.endpointID == "" {
Expand Down Expand Up @@ -272,16 +283,21 @@ type startArgs struct {
payload *commonpb.Payload
workflowEventLink *commonpb.Link_WorkflowEvent
namespaceFailoverVersion int64
cancelRequested bool
}

func (e taskExecutor) loadOperationArgs(
ctx context.Context,
ns *namespace.Namespace,
env hsm.Environment,
ref hsm.Ref,
task InvocationTask,
) (args startArgs, err error) {
var eventToken []byte
err = env.Access(ctx, ref, hsm.AccessRead, func(node *hsm.Node) error {
if err := task.Validate(node); err != nil {
return err
}
if err := node.CheckRunning(); err != nil {
return err
}
Expand All @@ -290,6 +306,11 @@ func (e taskExecutor) loadOperationArgs(
return err
}

args.cancelRequested, err = operation.cancelRequested(node)
if err != nil {
return err
}

args.endpointName = operation.Endpoint
args.endpointID = operation.EndpointId
args.service = operation.Service
Expand Down Expand Up @@ -445,6 +466,9 @@ func handleNonRetryableStartOperationError(env hsm.Environment, node *hsm.Node,
}

func (e taskExecutor) executeBackoffTask(env hsm.Environment, node *hsm.Node, task BackoffTask) error {
if err := task.Validate(node); err != nil {
return err
}
if err := node.CheckRunning(); err != nil {
return err
}
Expand Down Expand Up @@ -583,6 +607,7 @@ func (e taskExecutor) loadArgsForCancelation(ctx context.Context, env hsm.Enviro
// Operation is already in a terminal state.
return fmt.Errorf("%w: operation already in terminal state", consts.ErrStaleReference)
}

args.service = op.Service
args.operation = op.Operation
args.operationID = op.OperationId
Expand Down
23 changes: 23 additions & 0 deletions components/nexusoperations/executors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ func TestProcessInvocationTask(t *testing.T) {
name string
endpointNotFound bool
eventHasNoEndpointID bool
operationIsCanceled bool
checkStartOperationOptions func(t *testing.T, options nexus.StartOperationOptions)
onStartOperation func(ctx context.Context, service, operation string, input *nexus.LazyValue, options nexus.StartOperationOptions) (nexus.HandlerStartOperationResult[any], error)
expectedMetricOutcome string
Expand Down Expand Up @@ -352,6 +353,22 @@ func TestProcessInvocationTask(t *testing.T) {
require.Equal(t, 1, len(events))
},
},
{
name: "operation already canceled",
operationIsCanceled: true,
requestTimeout: time.Hour,
destinationDown: false,
onStartOperation: nil, // This should not be called if the operation is already canceled.
checkOutcome: func(t *testing.T, op nexusoperations.Operation, events []*historypb.HistoryEvent) {
require.Equal(t, enumsspb.NEXUS_OPERATION_STATE_CANCELED, op.State())
require.Nil(t, op.LastAttemptFailure)
require.Equal(t, 1, len(events))
require.NotNil(t, events[0].GetNexusOperationCanceledEventAttributes().Failure.Cause.GetCanceledFailureInfo())
require.Equal(t,
"operation canceled before it was started",
events[0].GetNexusOperationCanceledEventAttributes().Failure.Cause.Message)
},
},
}
for _, tc := range cases {
tc := tc
Expand Down Expand Up @@ -381,6 +398,12 @@ func TestProcessInvocationTask(t *testing.T) {
backend := &hsmtest.NodeBackend{Events: []*historypb.HistoryEvent{event}}
node := newOperationNode(t, backend, backend.Events[0])
env := fakeEnv{node}
if tc.operationIsCanceled {
op, err := hsm.MachineData[nexusoperations.Operation](node)
require.NoError(t, err)
_, err = op.Cancel(node, time.Now())
require.NoError(t, err)
}
namespaceRegistry := namespace.NewMockRegistry(ctrl)
namespaceRegistry.EXPECT().GetNamespaceByID(namespace.ID("ns-id")).Return(
namespace.NewNamespaceForTest(&persistence.NamespaceInfo{Name: "ns-name"}, nil, false, nil, 0), nil)
Expand Down
57 changes: 28 additions & 29 deletions components/nexusoperations/statemachine.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
"fmt"
"time"

"github.com/nexus-rpc/sdk-go/nexus"
enumspb "go.temporal.io/api/enums/v1"
failurepb "go.temporal.io/api/failure/v1"
historypb "go.temporal.io/api/history/v1"
Expand Down Expand Up @@ -141,10 +140,7 @@
}

// transitionTasks returns tasks that are emitted as transition outputs.
func (o Operation) transitionTasks(node *hsm.Node) ([]hsm.Task, error) {
if canceled, err := o.cancelRequested(node); canceled || err != nil {
return nil, err
}
func (o Operation) transitionTasks() ([]hsm.Task, error) {
switch o.State() { // nolint:exhaustive
case enumsspb.NEXUS_OPERATION_STATE_BACKING_OFF:
return []hsm.Task{BackoffTask{Deadline: o.NextAttemptScheduleTime.AsTime()}}, nil
Expand All @@ -168,7 +164,7 @@
}

func (o Operation) RegenerateTasks(node *hsm.Node) ([]hsm.Task, error) {
transitionTasks, err := o.transitionTasks(node)
transitionTasks, err := o.transitionTasks()
if err != nil {
return nil, err
}
Expand All @@ -179,8 +175,8 @@
return append(transitionTasks, creationTasks...), nil
}

func (o Operation) output(node *hsm.Node) (hsm.TransitionOutput, error) {
tasks, err := o.transitionTasks(node)
func (o Operation) output() (hsm.TransitionOutput, error) {
tasks, err := o.transitionTasks()
if err != nil {
return hsm.TransitionOutput{}, err
}
Expand Down Expand Up @@ -260,7 +256,7 @@
[]enumsspb.NexusOperationState{enumsspb.NEXUS_OPERATION_STATE_UNSPECIFIED},
enumsspb.NEXUS_OPERATION_STATE_SCHEDULED,
func(op Operation, event EventScheduled) (hsm.TransitionOutput, error) {
return op.output(event.Node)
return op.output()
},
)

Expand All @@ -275,7 +271,7 @@
enumsspb.NEXUS_OPERATION_STATE_SCHEDULED,
func(op Operation, event EventRescheduled) (hsm.TransitionOutput, error) {
op.NextAttemptScheduleTime = nil
return op.output(event.Node)
return op.output()
},
)

Expand Down Expand Up @@ -304,7 +300,7 @@
},
},
}
return op.output(event.Node)
return op.output()
},
)

Expand Down Expand Up @@ -342,7 +338,7 @@
}
// Keep last attempt information as-is for debuggability when completed asynchronously.
// When used in a workflow, this machine node will be deleted from the tree after this transition.
return op.output(event.Node)
return op.output()
},
)

Expand Down Expand Up @@ -371,7 +367,7 @@
}
// Keep last attempt information as-is for debuggability when completed asynchronously.
// When used in a workflow, this machine node will be deleted from the tree after this transition.
return op.output(event.Node)
return op.output()
},
)

Expand All @@ -387,11 +383,6 @@
enumsspb.NEXUS_OPERATION_STATE_SCHEDULED,
enumsspb.NEXUS_OPERATION_STATE_BACKING_OFF,
enumsspb.NEXUS_OPERATION_STATE_STARTED,
// Requesting cancelation of an unstarted operation transitions the machine to CANCELED and adds a
// NEXUS_OPERATION_CANCELED event.
// During replication the NEXUS_OPERATION_CANCELED event is applied when the machine is already canceled so we
// mark this as a valid transition.
enumsspb.NEXUS_OPERATION_STATE_CANCELED,
},
enumsspb.NEXUS_OPERATION_STATE_CANCELED,
func(op Operation, event EventCanceled) (hsm.TransitionOutput, error) {
Expand All @@ -406,7 +397,7 @@
}
// Keep last attempt information as-is for debuggability when completed asynchronously.
// When used in a workflow, this machine node will be deleted from the tree after this transition.
return op.output(event.Node)
return op.output()
},
)

Expand All @@ -424,7 +415,7 @@
func(op Operation, event EventStarted) (hsm.TransitionOutput, error) {
op.recordAttempt(event.Time)
op.OperationId = event.Attributes.OperationId
return op.output(event.Node)
return op.output()
},
)

Expand All @@ -443,7 +434,7 @@
func(op Operation, event EventTimedOut) (hsm.TransitionOutput, error) {
// Keep attempt information as-is for debuggability.
// When used in a workflow, this machine node will be deleted from the tree after this transition.
return op.output(event.Node)
return op.output()
},
)

Expand All @@ -457,13 +448,16 @@
// This function should be called as part of command/event handling and it should not called more than once.
return hsm.TransitionOutput{}, err
}
// TODO(bergundy): Support cancel before started. We need to transmit this intent to cancel to the handler because
// we don't know for sure that the operation hasn't been started.
if o.State() == enumsspb.NEXUS_OPERATION_STATE_BACKING_OFF || o.State() == enumsspb.NEXUS_OPERATION_STATE_SCHEDULED {
return handleUnsuccessfulOperationError(node, o, &nexus.UnsuccessfulOperationError{
State: nexus.OperationStateCanceled,
Failure: nexus.Failure{Message: "operation canceled before started"},
}, CompletionSourceCancelRequested)
// Operation wasn't started yet, we don't know how to cancel it ATM.
// TODO(bergundy): Support cancel-before-started.
if o.OperationId == "" {
return hsm.TransitionOutput{}, hsm.MachineTransition(child, func(c Cancelation) (hsm.TransitionOutput, error) {
return TransitionCancelationFailed.Apply(c, EventCancelationFailed{
Err: fmt.Errorf("cannot send cancelation for an unstarted operation"),

Check failure on line 456 in components/nexusoperations/statemachine.go

View workflow job for this annotation

GitHub Actions / lint

do not define dynamic errors, use wrapped static errors instead: "fmt.Errorf(\"cannot send cancelation for an unstarted operation\")" (err113)
Time: t,
Node: child,
})
})
}
return hsm.TransitionOutput{}, hsm.MachineTransition(child, func(c Cancelation) (hsm.TransitionOutput, error) {
return TransitionCancelationScheduled.Apply(c, EventCancelationScheduled{
Expand Down Expand Up @@ -674,7 +668,12 @@
}

var TransitionCancelationFailed = hsm.NewTransition(
[]enumspb.NexusOperationCancellationState{enumspb.NEXUS_OPERATION_CANCELLATION_STATE_SCHEDULED},
[]enumspb.NexusOperationCancellationState{
// We can immediately transition to failed to since we don't know how to send a cancelation request for an
// unstarted operation.
enumspb.NEXUS_OPERATION_CANCELLATION_STATE_UNSPECIFIED,
enumspb.NEXUS_OPERATION_CANCELLATION_STATE_SCHEDULED,
},
enumspb.NEXUS_OPERATION_CANCELLATION_STATE_FAILED,
func(c Cancelation, event EventCancelationFailed) (hsm.TransitionOutput, error) {
c.recordAttempt(event.Time)
Expand Down
74 changes: 38 additions & 36 deletions components/nexusoperations/statemachine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -444,43 +444,26 @@ func TestCompleteExternally(t *testing.T) {
}

func TestCancel(t *testing.T) {
t.Run("before started", func(t *testing.T) {
backend := &hsmtest.NodeBackend{}
root := newOperationNode(t, backend, mustNewScheduledEvent(time.Now(), time.Hour))
op, err := hsm.MachineData[nexusoperations.Operation](root)
require.NoError(t, err)
_, err = op.Cancel(root, time.Now())
require.NoError(t, err)
require.Equal(t, enumsspb.NEXUS_OPERATION_STATE_CANCELED, op.State())
require.Equal(t, 1, len(backend.Events))
attrs := backend.Events[0].GetNexusOperationCanceledEventAttributes()
require.Equal(t, int64(1), attrs.ScheduledEventId)
require.Equal(t, "operation canceled before started", attrs.Failure.Cause.Message)
require.NotNil(t, attrs.Failure.Cause.GetCanceledFailureInfo())
})

t.Run("after started", func(t *testing.T) {
backend := &hsmtest.NodeBackend{}
root := newOperationNode(t, backend, mustNewScheduledEvent(time.Now(), time.Hour))
op, err := hsm.MachineData[nexusoperations.Operation](root)
require.NoError(t, err)
_, err = nexusoperations.TransitionStarted.Apply(op, nexusoperations.EventStarted{
Time: time.Now(),
Attributes: &historypb.NexusOperationStartedEventAttributes{
OperationId: "op-id",
},
Node: root,
})
require.NoError(t, err)
_, err = op.Cancel(root, time.Now())
require.NoError(t, err)
require.Equal(t, enumsspb.NEXUS_OPERATION_STATE_STARTED, op.State())
node, err := root.Child([]hsm.Key{nexusoperations.CancelationMachineKey})
require.NoError(t, err)
cancelation, err := hsm.MachineData[nexusoperations.Cancelation](node)
require.NoError(t, err)
require.Equal(t, enumspb.NEXUS_OPERATION_CANCELLATION_STATE_SCHEDULED, cancelation.State())
backend := &hsmtest.NodeBackend{}
root := newOperationNode(t, backend, mustNewScheduledEvent(time.Now(), time.Hour))
op, err := hsm.MachineData[nexusoperations.Operation](root)
require.NoError(t, err)
_, err = nexusoperations.TransitionStarted.Apply(op, nexusoperations.EventStarted{
Time: time.Now(),
Attributes: &historypb.NexusOperationStartedEventAttributes{
OperationId: "op-id",
},
Node: root,
})
require.NoError(t, err)
_, err = op.Cancel(root, time.Now())
require.NoError(t, err)
require.Equal(t, enumsspb.NEXUS_OPERATION_STATE_STARTED, op.State())
node, err := root.Child([]hsm.Key{nexusoperations.CancelationMachineKey})
require.NoError(t, err)
cancelation, err := hsm.MachineData[nexusoperations.Cancelation](node)
require.NoError(t, err)
require.Equal(t, enumspb.NEXUS_OPERATION_CANCELLATION_STATE_SCHEDULED, cancelation.State())
}

func TestCancelationValidTransitions(t *testing.T) {
Expand Down Expand Up @@ -593,6 +576,25 @@ func TestCancelationValidTransitions(t *testing.T) {
require.Equal(t, 0, len(out.Tasks))
}

func TestCancelationBeforeStarted(t *testing.T) {
// Setup
backend := &hsmtest.NodeBackend{}
root := newOperationNode(t, backend, mustNewScheduledEvent(time.Now(), 0))
require.NoError(t, hsm.MachineTransition(root, func(op nexusoperations.Operation) (hsm.TransitionOutput, error) {
return op.Cancel(root, time.Now())
}))
require.Len(t, root.Outputs(), 2)
require.Len(t, root.Outputs()[0].Outputs[0].Tasks, 1)
require.Equal(t, nexusoperations.TaskTypeInvocation, root.Outputs()[0].Outputs[0].Tasks[0].Type())
require.Len(t, root.Outputs()[1].Outputs[0].Tasks, 0)

node, err := root.Child([]hsm.Key{nexusoperations.CancelationMachineKey})
require.NoError(t, err)
cancelation, err := hsm.MachineData[nexusoperations.Cancelation](node)
require.NoError(t, err)
require.Equal(t, enumspb.NEXUS_OPERATION_CANCELLATION_STATE_FAILED, cancelation.State())
}

func TestOperationCompareState(t *testing.T) {
reg := hsm.NewRegistry()
require.NoError(t, nexusoperations.RegisterStateMachines(reg))
Expand Down
Loading
Loading