Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
165 changes: 95 additions & 70 deletions api/matchingservice/v1/request_response.pb.go

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion chasm/lib/callback/chasm_invocation.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ func (c chasmInvocation) getHistoryRequest(
Completion: completion,
}
case *nexusrpc.OperationCompletionUnsuccessful:
apiFailure, err := commonnexus.NexusFailureToAPIFailure(op.Failure, true)
apiFailure, err := commonnexus.NexusFailureToTemporalFailure(op.Failure)
if err != nil {
return nil, fmt.Errorf("failed to convert failure type: %v", err)
}
Expand Down
247 changes: 181 additions & 66 deletions common/nexus/failure.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,16 @@ package nexus

import (
"context"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"net/http"
"sync/atomic"

"github.com/nexus-rpc/sdk-go/nexus"
commonpb "go.temporal.io/api/common/v1"
enumspb "go.temporal.io/api/enums/v1"
failurepb "go.temporal.io/api/failure/v1"
nexuspb "go.temporal.io/api/nexus/v1"
"go.temporal.io/api/serviceerror"
Expand Down Expand Up @@ -52,113 +55,224 @@ var failureTypeString = string((&failurepb.Failure{}).ProtoReflect().Descriptor(

// ProtoFailureToNexusFailure converts a proto Nexus Failure to a Nexus SDK Failure.
func ProtoFailureToNexusFailure(failure *nexuspb.Failure) nexus.Failure {
return nexus.Failure{
Message: failure.GetMessage(),
Metadata: failure.GetMetadata(),
Details: failure.GetDetails(),
nf := nexus.Failure{
Message: failure.GetMessage(),
StackTrace: failure.GetStackTrace(),
Metadata: failure.GetMetadata(),
Details: failure.GetDetails(),
}
if failure.GetCause() != nil {
cause := ProtoFailureToNexusFailure(failure.GetCause())
nf.Cause = &cause
}
return nf
}

// NexusFailureToProtoFailure converts a Nexus SDK Failure to a proto Nexus Failure.
// Always returns a non-nil value.
func NexusFailureToProtoFailure(failure nexus.Failure) *nexuspb.Failure {
return &nexuspb.Failure{
Message: failure.Message,
Metadata: failure.Metadata,
Details: failure.Details,
pf := &nexuspb.Failure{
Message: failure.Message,
Metadata: failure.Metadata,
Details: failure.Details,
StackTrace: failure.StackTrace,
}
if failure.Cause != nil {
pf.Cause = NexusFailureToProtoFailure(*failure.Cause)
}
return pf
}

type serializedHandlerError struct {
Type string `json:"type,omitempty"`
RetryableOverride *bool `json:"retryableOverride,omitempty"`
// Bytes as base64 encoded string.
EncodedAttributes string `json:"encodedAttributes,omitempty"`
}

// APIFailureToNexusFailure converts an API proto Failure to a Nexus SDK Failure setting the metadata "type" field to
// the proto fullname of the temporal API Failure message.
// TemporalFailureToNexusFailure converts an API proto Failure to a Nexus SDK Failure setting the metadata "type" field to
// the proto fullname of the temporal API Failure message or the standard Nexus SDK failure types.
// Returns an error if the failure cannot be converted.
// Mutates the failure temporarily, unsetting the Message field to avoid duplicating the information in the serialized
// failure. Mutating was chosen over cloning for performance reasons since this function may be called frequently.
func APIFailureToNexusFailure(failure *failurepb.Failure) (nexus.Failure, error) {
// Unset message so it's not serialized in the details.
func TemporalFailureToNexusFailure(failure *failurepb.Failure) (nexus.Failure, error) {
var causep *nexus.Failure
if failure.GetCause() != nil {
var cause nexus.Failure
var err error
cause, err = TemporalFailureToNexusFailure(failure.GetCause())
if err != nil {
return nexus.Failure{}, err
}
causep = &cause
}

switch info := failure.GetFailureInfo().(type) {
case *failurepb.Failure_NexusHandlerFailureInfo:
var encodedAttributes string
if failure.EncodedAttributes != nil {
b, err := protojson.Marshal(failure.EncodedAttributes)
if err != nil {
return nexus.Failure{}, fmt.Errorf("failed to deserialize HandlerError attributes: %w", err)
}
encodedAttributes = base64.StdEncoding.EncodeToString(b)
}
var retryableOverride *bool
// nolint:exhaustive,revive // There are only two valid values other than unspecified.
switch info.NexusHandlerFailureInfo.GetRetryBehavior() {
case enumspb.NEXUS_HANDLER_ERROR_RETRY_BEHAVIOR_RETRYABLE:
val := true
retryableOverride = &val
case enumspb.NEXUS_HANDLER_ERROR_RETRY_BEHAVIOR_NON_RETRYABLE:
val := false
retryableOverride = &val
}

handlerError := serializedHandlerError{
Type: info.NexusHandlerFailureInfo.GetType(),
RetryableOverride: retryableOverride,
EncodedAttributes: encodedAttributes,
}

details, err := json.Marshal(handlerError)
if err != nil {
return nexus.Failure{}, err
}
return nexus.Failure{
Message: failure.GetMessage(),
StackTrace: failure.GetStackTrace(),
Metadata: map[string]string{
"type": "nexus.HandlerError",
},
Details: details,
Cause: causep,
}, nil
}
// Unset message and stack trace so it's not serialized in the details.
var message string
message, failure.Message = failure.Message, ""
var stackTrace string
stackTrace, failure.StackTrace = failure.StackTrace, ""

data, err := protojson.Marshal(failure)
failure.Message = message

failure.StackTrace = stackTrace
if err != nil {
return nexus.Failure{}, err
}

return nexus.Failure{
Message: failure.GetMessage(),
Message: failure.GetMessage(),
StackTrace: failure.GetStackTrace(),
Metadata: map[string]string{
"type": failureTypeString,
},
Details: data,
Cause: causep,
}, nil
}

// NexusFailureToAPIFailure converts a Nexus Failure to an API proto Failure.
// NexusFailureToTemporalFailure converts a Nexus Failure to an API proto Failure.
// If the failure metadata "type" field is set to the fullname of the temporal API Failure message, the failure is
// reconstructed using protojson.Unmarshal on the failure details field.
func NexusFailureToAPIFailure(failure nexus.Failure, retryable bool) (*failurepb.Failure, error) {
apiFailure := &failurepb.Failure{}
// reconstructed using protojson.Unmarshal on the failure details field. Otherwise, the failure is reconstructed
// based on the known Nexus SDK failure types.
// Returns an error if the failure cannot be converted.
// nolint:revive // cognitive-complexity is high but justified to keep each case together
func NexusFailureToTemporalFailure(f nexus.Failure) (*failurepb.Failure, error) {
apiFailure := &failurepb.Failure{
Message: f.Message,
StackTrace: f.StackTrace,
}

if failure.Metadata != nil && failure.Metadata["type"] == failureTypeString {
if err := protojson.Unmarshal(failure.Details, apiFailure); err != nil {
return nil, err
if f.Metadata != nil {
switch f.Metadata["type"] {
case failureTypeString:
opts := protojson.UnmarshalOptions{DiscardUnknown: true}
if err := opts.Unmarshal(f.Details, apiFailure); err != nil {
return nil, err
}
// Restore these fields as they are not included in the marshalled failure.
apiFailure.Message = f.Message
apiFailure.StackTrace = f.StackTrace
case "nexus.OperationError":
var operationError *nexus.OperationError
err := json.Unmarshal(f.Details, &operationError)
if err != nil {
return nil, fmt.Errorf("failed to deserialize OperationError: %w", err)
}
if operationError.State == nexus.OperationStateCanceled {
apiFailure.FailureInfo = &failurepb.Failure_CanceledFailureInfo{
CanceledFailureInfo: &failurepb.CanceledFailureInfo{},
}
} else {
apiFailure.FailureInfo = &failurepb.Failure_ApplicationFailureInfo{
ApplicationFailureInfo: &failurepb.ApplicationFailureInfo{
NonRetryable: true,
Type: "OperationError",
},
}
}
case "nexus.HandlerError":
var se serializedHandlerError
err := json.Unmarshal(f.Details, &se)
if err != nil {
return nil, fmt.Errorf("failed to deserialize HandlerError: %w", err)
}
var retryBehavior enumspb.NexusHandlerErrorRetryBehavior
if se.RetryableOverride == nil {
retryBehavior = enumspb.NEXUS_HANDLER_ERROR_RETRY_BEHAVIOR_UNSPECIFIED
} else if *se.RetryableOverride {
retryBehavior = enumspb.NEXUS_HANDLER_ERROR_RETRY_BEHAVIOR_RETRYABLE
} else {
retryBehavior = enumspb.NEXUS_HANDLER_ERROR_RETRY_BEHAVIOR_NON_RETRYABLE
}
apiFailure.FailureInfo = &failurepb.Failure_NexusHandlerFailureInfo{
NexusHandlerFailureInfo: &failurepb.NexusHandlerFailureInfo{
Type: se.Type,
RetryBehavior: retryBehavior,
},
}
if len(se.EncodedAttributes) > 0 {
decoded, err := base64.StdEncoding.DecodeString(se.EncodedAttributes)
if err != nil {
return nil, fmt.Errorf("failed to decode base64 HandlerError attributes: %w", err)
}
apiFailure.EncodedAttributes = &commonpb.Payload{}
if err := protojson.Unmarshal(decoded, apiFailure.EncodedAttributes); err != nil {
return nil, fmt.Errorf("failed to deserialize HandlerError attributes: %w", err)
}
}
default:
payloads, err := nexusFailureMetadataToPayloads(f)
if err != nil {
return nil, fmt.Errorf("failed to serialize failure: %w", err)
}
apiFailure.FailureInfo = &failurepb.Failure_ApplicationFailureInfo{
ApplicationFailureInfo: &failurepb.ApplicationFailureInfo{
Details: payloads,
},
}
}
} else {
payloads, err := nexusFailureMetadataToPayloads(failure)
} else if len(f.Details) > 0 {
payloads, err := nexusFailureMetadataToPayloads(f)
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to serialize failure: %w", err)
}
apiFailure.FailureInfo = &failurepb.Failure_ApplicationFailureInfo{
ApplicationFailureInfo: &failurepb.ApplicationFailureInfo{
// Make up a type here, it's not part of the Nexus Failure spec.
Type: "NexusFailure",
Details: payloads,
NonRetryable: !retryable,
Details: payloads,
},
}
}
// Ensure this always gets written.
apiFailure.Message = failure.Message
return apiFailure, nil
}

func OperationErrorToTemporalFailure(opErr *nexus.OperationError) (*failurepb.Failure, error) {
var nexusFailure nexus.Failure
failureErr, ok := opErr.Cause.(*nexus.FailureError)
if ok {
nexusFailure = failureErr.Failure
} else if opErr.Cause != nil {
nexusFailure = nexus.Failure{Message: opErr.Cause.Error()}
}

// Canceled must be translated into a CanceledFailure to match the SDK expectation.
if opErr.State == nexus.OperationStateCanceled {
if nexusFailure.Metadata != nil && nexusFailure.Metadata["type"] == failureTypeString {
temporalFailure, err := NexusFailureToAPIFailure(nexusFailure, false)
if err != nil {
return nil, err
}
if temporalFailure.GetCanceledFailureInfo() != nil {
// We already have a CanceledFailure, use it.
return temporalFailure, nil
}
// Fallback to encoding the Nexus failure into a Temporal canceled failure, we expect operations that end up
// as canceled to have a CanceledFailureInfo object.
}
payloads, err := nexusFailureMetadataToPayloads(nexusFailure)
if f.Cause != nil {
var err error
apiFailure.Cause, err = NexusFailureToTemporalFailure(*f.Cause)
if err != nil {
return nil, err
}
return &failurepb.Failure{
Message: nexusFailure.Message,
FailureInfo: &failurepb.Failure_CanceledFailureInfo{
CanceledFailureInfo: &failurepb.CanceledFailureInfo{
Details: payloads,
},
},
}, nil
}

return NexusFailureToAPIFailure(nexusFailure, false)
return apiFailure, nil
}

func nexusFailureMetadataToPayloads(failure nexus.Failure) (*commonpb.Payloads, error) {
Expand All @@ -167,6 +281,7 @@ func nexusFailureMetadataToPayloads(failure nexus.Failure) (*commonpb.Payloads,
}
// Delete before serializing.
failure.Message = ""
failure.StackTrace = ""
data, err := json.Marshal(failure)
if err != nil {
return nil, err
Expand Down
Loading