Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/docker/dynamic-config-custom.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -38,5 +38,7 @@ component.callbacks.allowedAddresses:
- value:
- Pattern: "*"
AllowInsecure: true
component.nexusoperations.recordCancelRequestCompletionEvents:
- value: true
frontend.activityAPIsEnabled:
- value: true
3 changes: 2 additions & 1 deletion internal/cmd/build/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func (b *builder) integrationTest() error {
if *devServerFlag {
devServer, err := testsuite.StartDevServer(context.Background(), testsuite.DevServerOptions{
CachedDownload: testsuite.CachedDownload{
Version: "v1.3.1-priority.0",
Version: "v1.3.1-nexus-cancellation.0",
},
ClientOptions: &client.Options{
HostPort: "127.0.0.1:7233",
Expand Down Expand Up @@ -146,6 +146,7 @@ func (b *builder) integrationTest() error {
"--http-port", "7243", // Nexus tests use the HTTP port directly
"--dynamic-config-value", `component.callbacks.allowedAddresses=[{"Pattern":"*","AllowInsecure":true}]`, // SDK tests use arbitrary callback URLs, permit that on the server
"--dynamic-config-value", `system.refreshNexusEndpointsMinWait="0s"`, // Make Nexus tests faster
"--dynamic-config-value", `component.nexusoperations.recordCancelRequestCompletionEvents=true`, // Defaults to false until after OSS 1.28 is released
},
})
if err != nil {
Expand Down
44 changes: 36 additions & 8 deletions internal/internal_command_state_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,8 @@ type (
// requestCancelNexusOperationStateMachine is the state machine for the RequestCancelNexusOperation command.
// Valid transitions:
// commandStateCreated -> commandStateCommandSent
// commandStateCommandSent - (NexusOperationCancelRequested) -> commandStateCompleted
// commandStateCommandSent - (NexusOperationCancelRequested) -> commandStateInitiated
// commandStateInitiated - (NexusOperationCancelRequest(Completed|Failed)) -> commandStateCompleted
requestCancelNexusOperationStateMachine struct {
*commandStateMachineBase
attributes *commandpb.RequestCancelNexusOperationCommandAttributes
Expand Down Expand Up @@ -945,7 +946,7 @@ func (sm *nexusOperationStateMachine) handleCompletionEvent() {
commandStateStarted:
sm.moveState(commandStateCompleted, eventCompletion)
default:
sm.failStateTransition(eventStarted)
sm.failStateTransition(eventCompletion)
}
}

Expand Down Expand Up @@ -979,12 +980,22 @@ func (d *requestCancelNexusOperationStateMachine) getCommand() *commandpb.Comman
}
}

func (d *requestCancelNexusOperationStateMachine) handleInitiatedEvent() {
switch d.state {
case commandStateCommandSent:
d.moveState(commandStateInitiated, eventInitiated)
default:
d.failStateTransition(eventInitiated)
}
}

func (d *requestCancelNexusOperationStateMachine) handleCompletionEvent() {
if d.state != commandStateCommandSent && d.state != commandStateCreated {
switch d.state {
case commandStateCreated, commandStateInitiated:
d.moveState(commandStateCompleted, eventCompletion)
default:
d.failStateTransition(eventCompletion)
return
}
d.moveState(commandStateCompleted, eventCompletion)
}

func newCommandsHelper() *commandsHelper {
Expand Down Expand Up @@ -1222,9 +1233,26 @@ func (h *commandsHelper) handleNexusOperationCompleted(scheduledEventID int64) c
return command
}

func (h *commandsHelper) handleNexusOperationCancelRequested(scheduledEventID int64) {
command := h.getCommand(makeCommandID(commandTypeRequestCancelNexusOperation, strconv.FormatInt(scheduledEventID, 10)))
command.handleCompletionEvent()
func (h *commandsHelper) handleNexusOperationCancelRequested(scheduledEventID int64) commandStateMachine {
seq, ok := h.scheduledEventIDToNexusSeq[scheduledEventID]
if !ok {
panicIllegalState(fmt.Sprintf("[TMPRL1100] unable to find nexus operation state machine for event ID: %v", scheduledEventID))
}
command := h.getCommand(makeCommandID(commandTypeNexusOperation, strconv.FormatInt(seq, 10)))
sm := command.(*nexusOperationStateMachine)
sm.cancelation.handleInitiatedEvent()
return command
}

func (h *commandsHelper) handleNexusOperationCancelRequestDelivered(scheduledEventID int64) commandStateMachine {
seq, ok := h.scheduledEventIDToNexusSeq[scheduledEventID]
if !ok {
panicIllegalState(fmt.Sprintf("[TMPRL1100] unable to find nexus operation state machine for event ID: %v", scheduledEventID))
}
command := h.getCommand(makeCommandID(commandTypeNexusOperation, strconv.FormatInt(seq, 10)))
sm := command.(*nexusOperationStateMachine)
sm.cancelation.handleCompletionEvent()
return command
}

func (h *commandsHelper) requestCancelNexusOperation(seq int64) commandStateMachine {
Expand Down
71 changes: 70 additions & 1 deletion internal/internal_event_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ type (
scheduledNexusOperation struct {
startedCallback func(token string, err error)
completedCallback func(result *commonpb.Payload, err error)
cancellationType NexusOperationCancellationType
endpoint string
service string
operation string
Expand Down Expand Up @@ -625,6 +626,7 @@ func (wc *workflowEnvironmentImpl) ExecuteNexusOperation(params executeNexusOper
command.setData(&scheduledNexusOperation{
startedCallback: startedHandler,
completedCallback: callback,
cancellationType: params.options.CancellationType,
endpoint: params.client.Endpoint(),
service: params.client.Service(),
operation: params.operation,
Expand Down Expand Up @@ -1334,7 +1336,10 @@ func (weh *workflowExecutionEventHandlerImpl) ProcessEvent(
enumspb.EVENT_TYPE_NEXUS_OPERATION_TIMED_OUT:
err = weh.handleNexusOperationCompleted(event)
case enumspb.EVENT_TYPE_NEXUS_OPERATION_CANCEL_REQUESTED:
weh.commandsHelper.handleNexusOperationCancelRequested(event.GetNexusOperationCancelRequestedEventAttributes().GetScheduledEventId())
err = weh.handleNexusOperationCancelRequested(event)
case enumspb.EVENT_TYPE_NEXUS_OPERATION_CANCEL_REQUEST_COMPLETED,
enumspb.EVENT_TYPE_NEXUS_OPERATION_CANCEL_REQUEST_FAILED:
err = weh.handleNexusOperationCancelRequestDelivered(event)

default:
if event.WorkerMayIgnore {
Expand Down Expand Up @@ -1960,6 +1965,70 @@ func (weh *workflowExecutionEventHandlerImpl) handleNexusOperationCompleted(even
return nil
}

func (weh *workflowExecutionEventHandlerImpl) handleNexusOperationCancelRequested(event *historypb.HistoryEvent) error {
attrs := event.GetNexusOperationCancelRequestedEventAttributes()
scheduledEventId := attrs.GetScheduledEventId()

command := weh.commandsHelper.handleNexusOperationCancelRequested(scheduledEventId)
state := command.getData().(*scheduledNexusOperation)
err := ErrCanceled
if state.cancellationType == NexusOperationCancellationTypeTryCancel {
if state.startedCallback != nil {
state.startedCallback("", err)
state.startedCallback = nil
}
if state.completedCallback != nil {
state.completedCallback(nil, err)
state.completedCallback = nil
}
}
return nil
}

func (weh *workflowExecutionEventHandlerImpl) handleNexusOperationCancelRequestDelivered(event *historypb.HistoryEvent) error {
var scheduledEventID int64
var failure *failurepb.Failure

switch event.EventType {
case enumspb.EVENT_TYPE_NEXUS_OPERATION_CANCEL_REQUEST_COMPLETED:
attrs := event.GetNexusOperationCancelRequestCompletedEventAttributes()
scheduledEventID = attrs.GetScheduledEventId()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this to be safe for use, it is imperative that temporalio/api#564 not be released before temporalio/api#572. If there is an API version that has the first and not the second, this code can have issues.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like unfortunately that did happen. I will update this PR to handle that case.

Copy link
Member

@cretz cretz Apr 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this is the pain of frequently releasing things before we know how they will be used, we get into a situation where we now have 1.47.0 that we will have to support.

case enumspb.EVENT_TYPE_NEXUS_OPERATION_CANCEL_REQUEST_FAILED:
attrs := event.GetNexusOperationCancelRequestFailedEventAttributes()
scheduledEventID = attrs.GetScheduledEventId()
failure = attrs.GetFailure()
default:
// This is only called internally and should never happen.
panic(fmt.Errorf("invalid event type, not a Nexus Operation cancel request resolution: %v", event.EventType))
}

if scheduledEventID == 0 {
// API version 1.47.0 was released without the ScheduledEventID field on these events, so if we got this event
// without that field populated, then just ignore and fall back to default WaitCompleted behavior.
return nil
}

command := weh.commandsHelper.handleNexusOperationCancelRequestDelivered(scheduledEventID)
state := command.getData().(*scheduledNexusOperation)
err := ErrCanceled
if failure != nil {
err = weh.failureConverter.FailureToError(failure)
}

if state.cancellationType == NexusOperationCancellationTypeWaitRequested {
if state.startedCallback != nil {
state.startedCallback("", err)
state.startedCallback = nil
}
if state.completedCallback != nil {
state.completedCallback(nil, err)
state.completedCallback = nil
}
}

return nil
}

func (weh *workflowExecutionEventHandlerImpl) handleUpsertWorkflowSearchAttributes(event *historypb.HistoryEvent) {
weh.updateWorkflowInfoWithSearchAttributes(event.GetUpsertWorkflowSearchAttributesEventAttributes().SearchAttributes)
}
Expand Down
61 changes: 51 additions & 10 deletions internal/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,30 @@ const (
VersioningBehaviorAutoUpgrade
)

// NexusOperationCancellationType specifies what action should be taken for a Nexus operation when the
// caller is cancelled.
//
// Exposed as: [go.temporal.io/sdk/workflow.NexusOperationCancellationType]
type NexusOperationCancellationType int

const (
// NexusOperationCancellationTypeUnspecified - Nexus operation cancellation type is unknown.
NexusOperationCancellationTypeUnspecified NexusOperationCancellationType = iota

// NexusOperationCancellationTypeAbandon - Do not request cancellation of the Nexus operation.
NexusOperationCancellationTypeAbandon

// NexusOperationCancellationTypeTryCancel - Initiate a cancellation request for the Nexus operation and immediately report cancellation
// to the caller.
NexusOperationCancellationTypeTryCancel

// NexusOperationCancellationTypeWaitRequested - Request cancellation of the Nexus operation and wait for confirmation that the request was received.
NexusOperationCancellationTypeWaitRequested

// NexusOperationCancellationTypeWaitCompleted - Wait for the Nexus operation to complete. Default.
NexusOperationCancellationTypeWaitCompleted
)

var (
errWorkflowIDNotSet = errors.New("workflowId is not set")
errLocalActivityParamsBadRequest = errors.New("missing local activity parameters through context, check LocalActivityOptions")
Expand Down Expand Up @@ -2045,7 +2069,7 @@ func (wc *workflowEnvironmentInterceptor) MutableSideEffect(ctx Context, id stri

// DefaultVersion is a version returned by GetVersion for code that wasn't versioned before
//
// Exposed as: [go.temporal.io/sdk/workflow.Version], [go.temporal.io/sdk/workflow.DefaultVersion]
// Exposed as: [go.temporal.io/sdk/workflow.DefaultVersion], [go.temporal.io/sdk/workflow.Version]
const DefaultVersion Version = -1

// TemporalChangeVersion is used as search attributes key to find workflows with specific change version.
Expand Down Expand Up @@ -2481,7 +2505,7 @@ func WithHeartbeatTimeout(ctx Context, d time.Duration) Context {
return ctx1
}

// WithWaitForCancellation adds wait for the cacellation to the copy of the context.
// WithWaitForCancellation adds wait for the cancellation to the copy of the context.
//
// Exposed as: [go.temporal.io/sdk/workflow.WithWaitForCancellation]
func WithWaitForCancellation(ctx Context, wait bool) Context {
Expand Down Expand Up @@ -2608,6 +2632,11 @@ type NexusOperationOptions struct {
// Optional: defaults to the maximum allowed by the Temporal server.
ScheduleToCloseTimeout time.Duration

// CancellationType - Indicates what action should be taken when the caller is cancelled.
//
// Optional: defaults to NexusOperationCancellationTypeWaitCompleted.
CancellationType NexusOperationCancellationType

// Summary is a single-line fixed summary for this Nexus Operation that will appear in UI/CLI. This can be
// in single-line Temporal Markdown format.
//
Expand Down Expand Up @@ -2726,6 +2755,10 @@ func (wc *workflowEnvironmentInterceptor) prepareNexusOperationParams(ctx Contex
return executeNexusOperationParams{}, err
}

if input.Options.CancellationType == NexusOperationCancellationTypeUnspecified {
input.Options.CancellationType = NexusOperationCancellationTypeWaitCompleted
}

return executeNexusOperationParams{
client: input.Client,
operation: operationName,
Expand All @@ -2743,7 +2776,7 @@ func (wc *workflowEnvironmentInterceptor) ExecuteNexusOperation(ctx Context, inp
executionFuture: executionFuture.(*futureImpl),
}

// Immediately return if the context has an error without spawning the child workflow
// Immediately return if the context has an error without spawning the Nexus operation.
if ctx.Err() != nil {
executionSettable.Set(nil, ctx.Err())
mainSettable.Set(nil, ctx.Err())
Expand Down Expand Up @@ -2781,13 +2814,21 @@ func (wc *workflowEnvironmentInterceptor) ExecuteNexusOperation(ctx Context, inp
cancellationCallback.fn = func(v any, _ bool) bool {
assertNotInReadOnlyStateCancellation(ctx)
if ctx.Err() == ErrCanceled && !mainFuture.IsReady() {
// Go back to the top of the interception chain.
getWorkflowOutboundInterceptor(ctx).RequestCancelNexusOperation(ctx, RequestCancelNexusOperationInput{
Client: input.Client,
Operation: input.Operation,
Token: operationToken,
seq: seq,
})
if input.Options.CancellationType == NexusOperationCancellationTypeAbandon {
// Caller has indicated we should not send the cancel request, so just mark futures as done.
mainSettable.Set(nil, ErrCanceled)
if !executionFuture.IsReady() {
executionSettable.Set(nil, ErrCanceled)
}
} else {
// Go back to the top of the interception chain.
getWorkflowOutboundInterceptor(ctx).RequestCancelNexusOperation(ctx, RequestCancelNexusOperationInput{
Client: input.Client,
Operation: input.Operation,
Token: operationToken,
seq: seq,
})
}
}
return false
}
Expand Down
8 changes: 6 additions & 2 deletions test/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -612,8 +612,12 @@ func (ts *IntegrationTestSuite) TestActivityPause() {
ts.Len(desc.GetPendingActivities(), 1)
ts.Equal(desc.GetPendingActivities()[0].GetActivityType().GetName(), "ActivityToBePaused")
ts.Equal(desc.GetPendingActivities()[0].GetAttempt(), int32(1))
// TODO: Update when https://github.com/temporalio/temporal/pull/7572 is released
ts.Nil(desc.GetPendingActivities()[0].GetLastFailure())
if os.Getenv("DISABLE_SERVER_1_27_TESTS") == "1" {
ts.Nil(desc.GetPendingActivities()[0].GetLastFailure())
} else {
ts.NotNil(desc.GetPendingActivities()[0].GetLastFailure())
ts.Equal(desc.GetPendingActivities()[0].GetLastFailure().GetMessage(), "activity paused")
}
ts.True(desc.GetPendingActivities()[0].GetPaused())
}

Expand Down
Loading
Loading