Skip to content

Commit

Permalink
Query Failure (temporalio#6947)
Browse files Browse the repository at this point in the history
## What changed?

Attach a Failure object from query failures to the QueryFailure
serviceerror.

## Why?

Allows encryption of failure messages and stack traces.

See also temporalio/api/pull/503.
  • Loading branch information
bergundy authored Dec 19, 2024
1 parent d89f514 commit b8c1228
Show file tree
Hide file tree
Showing 5 changed files with 138 additions and 5 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ require (
go.opentelemetry.io/otel/sdk v1.31.0
go.opentelemetry.io/otel/sdk/metric v1.31.0
go.opentelemetry.io/otel/trace v1.31.0
go.temporal.io/api v1.43.0
go.temporal.io/api v1.43.1-0.20241206174056-8a5e1486fbf5
go.temporal.io/sdk v1.31.0
go.temporal.io/version v0.3.0
go.uber.org/automaxprocs v1.5.3
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -331,8 +331,8 @@ go.opentelemetry.io/otel/trace v1.31.0 h1:ffjsj1aRouKewfr85U2aGagJ46+MvodynlQ1HY
go.opentelemetry.io/otel/trace v1.31.0/go.mod h1:TXZkRk7SM2ZQLtR6eoAWQFIHPvzQ06FJAsO1tJg480A=
go.opentelemetry.io/proto/otlp v1.3.1 h1:TrMUixzpM0yuc/znrFTP9MMRh8trP93mkCiDVeXrui0=
go.opentelemetry.io/proto/otlp v1.3.1/go.mod h1:0X1WI4de4ZsLrrJNLAQbFeLCm3T7yBkR0XqQ7niQU+8=
go.temporal.io/api v1.43.0 h1:lBhq+u5qFJqGMXwWsmg/i8qn1UA/3LCwVc88l2xUMHg=
go.temporal.io/api v1.43.0/go.mod h1:1WwYUMo6lao8yl0371xWUm13paHExN5ATYT/B7QtFis=
go.temporal.io/api v1.43.1-0.20241206174056-8a5e1486fbf5 h1:/gZwoMXVPu9HouFLdNdHDgIDxtVfvuY3rqApj2ffZ20=
go.temporal.io/api v1.43.1-0.20241206174056-8a5e1486fbf5/go.mod h1:1WwYUMo6lao8yl0371xWUm13paHExN5ATYT/B7QtFis=
go.temporal.io/sdk v1.31.0 h1:CLYiP0R5Sdj0gq8LyYKDDz4ccGOdJPR8wNGJU0JGwj8=
go.temporal.io/sdk v1.31.0/go.mod h1:8U8H7rF9u4Hyb4Ry9yiEls5716DHPNvVITPNkgWUwE8=
go.temporal.io/version v0.3.0 h1:dMrei9l9NyHt8nG6EB8vAwDLLTwx2SvRyucCSumAiig=
Expand Down
2 changes: 1 addition & 1 deletion service/history/api/queryworkflow/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ func Invoke(
},
}, nil
case enumspb.QUERY_RESULT_TYPE_FAILED:
return nil, serviceerror.NewQueryFailed(result.GetErrorMessage())
return nil, serviceerror.NewQueryFailedWithFailure(result.GetErrorMessage(), result.GetFailure())
default:
metrics.QueryRegistryInvalidStateCount.With(scope).Record(1)
return nil, consts.ErrQueryEnteredInvalidState
Expand Down
2 changes: 1 addition & 1 deletion service/matching/matching_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -964,7 +964,7 @@ func (e *matchingEngineImpl) QueryWorkflow(
case enumspb.QUERY_RESULT_TYPE_ANSWERED:
return &matchingservice.QueryWorkflowResponse{QueryResult: workerResponse.GetCompletedRequest().GetQueryResult()}, nil
case enumspb.QUERY_RESULT_TYPE_FAILED:
return nil, serviceerror.NewQueryFailed(workerResponse.GetCompletedRequest().GetErrorMessage())
return nil, serviceerror.NewQueryFailedWithFailure(workerResponse.GetCompletedRequest().GetErrorMessage(), workerResponse.GetCompletedRequest().GetFailure())
default:
return nil, serviceerror.NewInternal("unknown query completed type")
}
Expand Down
133 changes: 133 additions & 0 deletions tests/query_workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,30 @@ import (
"context"
"errors"
"fmt"
"maps"
"slices"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"
commandpb "go.temporal.io/api/command/v1"
commonpb "go.temporal.io/api/common/v1"
enumspb "go.temporal.io/api/enums/v1"
failurepb "go.temporal.io/api/failure/v1"
querypb "go.temporal.io/api/query/v1"
"go.temporal.io/api/serviceerror"
taskqueuepb "go.temporal.io/api/taskqueue/v1"
"go.temporal.io/api/workflowservice/v1"
"go.temporal.io/sdk/client"
sdkclient "go.temporal.io/sdk/client"
"go.temporal.io/sdk/worker"
"go.temporal.io/sdk/workflow"
"go.temporal.io/server/common/log/tag"
"go.temporal.io/server/common/testing/testvars"
"go.temporal.io/server/common/util"
"go.temporal.io/server/service/history/consts"
"go.temporal.io/server/tests/testcore"
)
Expand Down Expand Up @@ -357,3 +368,125 @@ func (s *QueryWorkflowSuite) TestQueryWorkflow_ClosedWithoutWorkflowTaskStarted(
s.Error(err)
s.ErrorContains(err, consts.ErrWorkflowClosedBeforeWorkflowTaskStarted.Error())
}

func (s *QueryWorkflowSuite) TestQueryWorkflow_FailurePropagated() {
ctx := testcore.NewContext()
taskQueue := testcore.RandomizeStr(s.T().Name())

workflowRun, err := s.SdkClient().ExecuteWorkflow(ctx, client.StartWorkflowOptions{TaskQueue: taskQueue}, "workflow")
s.NoError(err)

// Create a channel for errors generated in background goroutines.
errChan := make(chan error, 1)

// First query, should come in the workflow task Queries field and responded to via the RespondWorkflowTaskCompleted
// API.
// Query the workflow in the background to have the query delivered with the first workflow task in the Queries map.
go func() {
_, err := s.FrontendClient().QueryWorkflow(ctx, &workflowservice.QueryWorkflowRequest{
Namespace: s.Namespace(),
Execution: &commonpb.WorkflowExecution{
WorkflowId: workflowRun.GetID(),
},
Query: &querypb.WorkflowQuery{
QueryType: "dont-care",
},
})
errChan <- err
}()

// Hope that 3 seconds will be enough for history to record the query and attach it to the pending workflow task.
// There's really no other way to ensure that the query is included in the task unfortunately.
util.InterruptibleSleep(ctx, 3*time.Second)

task, err := s.FrontendClient().PollWorkflowTaskQueue(ctx, &workflowservice.PollWorkflowTaskQueueRequest{
Namespace: s.Namespace(),
TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueue},
Identity: s.T().Name(),
})
s.NoError(err)
s.Len(task.Queries, 1)
qKey := slices.Collect(maps.Keys(task.Queries))[0]

_, err = s.FrontendClient().RespondWorkflowTaskCompleted(ctx, &workflowservice.RespondWorkflowTaskCompletedRequest{
TaskToken: task.TaskToken,
Identity: s.T().Name(),
QueryResults: map[string]*querypb.WorkflowQueryResult{
qKey: {
ResultType: enumspb.QUERY_RESULT_TYPE_FAILED,
ErrorMessage: "my error message",
Failure: &failurepb.Failure{
Message: "my failure error message",
},
},
},
Commands: []*commandpb.Command{
{
CommandType: enumspb.COMMAND_TYPE_COMPLETE_WORKFLOW_EXECUTION,
Attributes: &commandpb.Command_CompleteWorkflowExecutionCommandAttributes{},
},
},
})
s.NoError(err)

select {
case err = <-errChan:
case <-ctx.Done():
// Abort and fail the test.
s.NoError(ctx.Err())
}

var query1FailedErr *serviceerror.QueryFailed
s.ErrorAs(err, &query1FailedErr)
s.Equal("my error message", query1FailedErr.Message)
s.Equal("my failure error message", query1FailedErr.Failure.Message)

// Second query, should come in the workflow task Query field and responded to via the RespondQueryTaskCompleted
// API.
go func() {
task, err := s.FrontendClient().PollWorkflowTaskQueue(ctx, &workflowservice.PollWorkflowTaskQueueRequest{
Namespace: s.Namespace(),
TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueue},
Identity: s.T().Name(),
})
if err != nil {
errChan <- err
return
}

_, err = s.FrontendClient().RespondQueryTaskCompleted(ctx, &workflowservice.RespondQueryTaskCompletedRequest{
Namespace: s.Namespace(),
TaskToken: task.TaskToken,
CompletedType: enumspb.QUERY_RESULT_TYPE_FAILED,
ErrorMessage: "my error message",
Failure: &failurepb.Failure{
Message: "my failure error message",
},
})

errChan <- err
}()

_, err = s.FrontendClient().QueryWorkflow(ctx, &workflowservice.QueryWorkflowRequest{
Namespace: s.Namespace(),
Execution: &commonpb.WorkflowExecution{
WorkflowId: workflowRun.GetID(),
},
Query: &querypb.WorkflowQuery{
QueryType: "dont-care",
},
})

var query2FailedErr *serviceerror.QueryFailed
s.ErrorAs(err, &query2FailedErr)
s.Equal("my error message", query2FailedErr.Message)
s.Equal("my failure error message", query2FailedErr.Failure.Message)

select {
case err = <-errChan:
s.NoError(err)
case <-ctx.Done():
// Abort and fail the test.
s.NoError(ctx.Err())
}
}

0 comments on commit b8c1228

Please sign in to comment.