Skip to content

Commit

Permalink
Workflow Update: drop speculative WFT even if it had events (temporal…
Browse files Browse the repository at this point in the history
…io#6709)

## What changed?
<!-- Describe what has changed in this PR -->
Drop speculative WFT even if it had events.

## Why?
<!-- Tell your future self why have you made these changes -->
Special compatibility flag was
[added](temporalio/api#467) to
`RespondWorkflowTaskCompletedRequest`. If this flag is set to true, then
server can drop speculative WFT with update rejections, even if it
shipped events to the worker. SDK support will be added later.

## How did you test it?
<!-- How have you verified this change? Tested locally? Added a unit
test? Checked in staging env? -->
Added new unit tests.

## Potential risks
<!-- Assuming the worst case, what can be broken when deploying this
change to production? -->
No risks. Compatibility flag is used to support backward compatibility
and don't drop speculative WFT if worker doesn't support it (old go
SDKs).

## Documentation
<!-- Have you made sure this change doesn't falsify anything currently
stated in `docs/`? If significant
new behavior is added, have you described that in `docs/`? -->
Yes, updated.

## Is hotfix candidate?
<!-- Is this PR a hotfix candidate or does it require a notification to
be sent to the broader community? (Yes/No) -->
No.
  • Loading branch information
alexshtin authored Nov 18, 2024
1 parent 3a1b707 commit b2fb628
Show file tree
Hide file tree
Showing 6 changed files with 198 additions and 33 deletions.
5 changes: 5 additions & 0 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -2072,6 +2072,11 @@ the number of children greater than or equal to this threshold`,
time.Minute*10,
`WorkflowTaskRetryMaxInterval is the maximum interval added to a workflow task's startToClose timeout for slowing down retry`,
)
DiscardSpeculativeWorkflowTaskMaximumEventsCount = NewGlobalIntSetting(
"history.discardSpeculativeWorkflowTaskMaximumEventsCount",
10,
`If speculative workflow task shipped more than DiscardSpeculativeWorkflowTaskMaximumEventsCount events, it can't be discarded`,
)
DefaultWorkflowTaskTimeout = NewNamespaceDurationSetting(
"history.defaultWorkflowTaskTimeout",
primitives.DefaultWorkflowTaskTimeout,
Expand Down
36 changes: 24 additions & 12 deletions docs/architecture/speculative-workflow-task.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ event.
- its `WorkflowTaskInfo.Type` is `WORKFLOW_TASK_TYPE_SPECULATIVE`

Similar to a CPU's *speculative execution* (which gives this Workflow Task its name) where a branch
execution can be thrown away, a speculative Workflow Task can be dropped as if it never existed.
execution can be thrown away, a speculative Workflow Task can be discarded as if it never existed.
The overall strategy is to optimistically assume the speculative Workflow Task will go through, but
if anything goes wrong, give up quickly and convert the speculative Workflow Task to a normal one.

Expand Down Expand Up @@ -98,23 +98,35 @@ a new speculative Workflow Task can be created after the first one is lost, but
try to complete the first one. To prevent this, `StartedTime` was added to the Workflow Task token
and if it doesn't match the start time in mutable state, the Workflow Task can't be completed.

### Persist or Drop
### Persist or Discard
While completing a speculative Workflow Task, the server makes a decision to either write the
speculative events followed by a `WorkflowTaskCompleted` event - or drop the speculative events and
make the speculative Workflow Task disappear. The latter can only happen, if the server knows that
speculative events followed by a `WorkflowTaskCompleted` event - or discard the speculative events and
make the speculative Workflow Task disappear. The latter can only happen if the server knows that
this Workflow Task didn't change the Workflow state. Currently, the conditions are
(check `skipWorkflowTaskCompletedEvent()` func):
- response doesn't have any commands,
- response has only Update rejection messages.

> #### TODO
> There is one more special case: when the speculative Workflow Task contained other events
> (e.g. activity scheduled), then it can't be dropped because they would need to be sent *again* in the
> next Workflow Task - but older SDK versions don't support receiving the same events twice. A
> compatibility flag is needed to safely allow SDKs to opt-in to this optimization.
The speculative Workflow Task can also ship other events (e.g. `ActivityTaskScheduled` or `TimerStarted`)
that were generated from previous Workflow Task commands (also known as command-events).
Unfortunately, older SDKs don't support receiving same events more
than once. If SDK supports this, it will set `DiscardSpeculativeWorkflowTaskWithEvents` flag to `true`
and the server will discard speculative Workflow Task even if it had events. These events can be shipped
multiply times if Updates keep being rejected. To prevent shipping a large set of events to the worker over
and over again, the server persists speculative Workflow Task if a number of events exceed
`DiscardSpeculativeWorkflowTaskMaximumEventsCount` threshold.

When the server decides to drop a speculative Workflow Task, it needs to communicate this decision to
the worker - because the SDK needs to roll back to a previous history event and drop all events after
> #### NOTE
> This is possible because of an important server invariant: the Workflow history can only end with:
> - Workflow Task event (Scheduled, Started, Completed, Failed, Timeout),
>
> or
> - command-event, generated from previous Workflow Task command.
> All these events don't change the Workflow state on the worker side. This invariant must not be
> broken by other features.
When the server decides to discard a speculative Workflow Task, it needs to communicate this decision to
the worker - because the SDK needs to roll back to a previous history event and discard all events after
that one. To do that, the server will set the `ResetHistoryEventId` field on the
`RespondWorkflowTaskCompletedResponse` to the mutable state's `LastCompletedWorkflowTaskStartedEventId`
(since the SDK uses `WorkflowTaskStartedEventID` as its history checkpoint).
Expand All @@ -128,7 +140,7 @@ rejection or acceptance message. If it does happen, the server will persist all
events and create a new Workflow Task as normal.

> #### NOTE
> This is a design decision, which could be changed later: instead, the server could drop the
> This is a design decision, which could be changed later: instead, the server could discard the
> speculative Workflow Task when it heartbeats and create a new speculative Workflow Task. No
> new events would be added to the history - but heartbeats would not be visible anymore.
Expand Down
2 changes: 1 addition & 1 deletion docs/architecture/workflow-update.md
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ Update to the worker. This Workflow Task is always speculative, unless there is
already-scheduled-but-not-yet-started Workflow Task present.

Later, when handling a worker response in the `RespondWorkflowTaskCompleted` API handler, the server
might write or drop events for this Workflow Task. Read
might write or discard events for this Workflow Task. Read
[Speculative Workflow Tasks](./speculative-workflow-task.md) for more details.

### Lifecycle Stage
Expand Down
134 changes: 131 additions & 3 deletions service/history/api/respondworkflowtaskcompleted/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,13 @@ import (
"context"
"errors"
"maps"
"strconv"
"testing"
"time"

"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
commandpb "go.temporal.io/api/command/v1"
enumspb "go.temporal.io/api/enums/v1"
historypb "go.temporal.io/api/history/v1"
protocolpb "go.temporal.io/api/protocol/v1"
Expand Down Expand Up @@ -159,7 +161,6 @@ func (s *WorkflowTaskCompletedHandlerSuite) TestUpdateWorkflow() {

createWrittenHistoryCh := func(expectedUpdateWorkflowExecutionCalls int) <-chan []*historypb.HistoryEvent {
writtenHistoryCh := make(chan []*historypb.HistoryEvent, expectedUpdateWorkflowExecutionCalls)
var historyEvents []*historypb.HistoryEvent
s.mockExecutionMgr.EXPECT().UpdateWorkflowExecution(gomock.Any(), gomock.Any()).DoAndReturn(func(_ context.Context, request *persistence.UpdateWorkflowExecutionRequest) (*persistence.UpdateWorkflowExecutionResponse, error) {
var wfEvents []*persistence.WorkflowEvents
if len(request.UpdateWorkflowEvents) > 0 {
Expand All @@ -168,6 +169,7 @@ func (s *WorkflowTaskCompletedHandlerSuite) TestUpdateWorkflow() {
wfEvents = request.NewWorkflowEvents
}

var historyEvents []*historypb.HistoryEvent
for _, uwe := range wfEvents {
for _, event := range uwe.Events {
historyEvents = append(historyEvents, event)
Expand All @@ -187,10 +189,13 @@ func (s *WorkflowTaskCompletedHandlerSuite) TestUpdateWorkflow() {
wfContext := s.createStartedWorkflow(tv)
writtenHistoryCh := createWrittenHistoryCh(1)

_, err := wfContext.LoadMutableState(context.Background(), s.workflowTaskCompletedHandler.shardContext)
s.NoError(err)

updRequestMsg, upd, serializedTaskToken := s.createSentUpdate(tv, "1", wfContext)
s.NotNil(upd)

_, err := s.workflowTaskCompletedHandler.Invoke(context.Background(), &historyservice.RespondWorkflowTaskCompletedRequest{
_, err = s.workflowTaskCompletedHandler.Invoke(context.Background(), &historyservice.RespondWorkflowTaskCompletedRequest{
NamespaceId: tv.NamespaceID().String(),
CompleteRequest: &workflowservice.RespondWorkflowTaskCompletedRequest{
TaskToken: serializedTaskToken,
Expand Down Expand Up @@ -338,6 +343,129 @@ func (s *WorkflowTaskCompletedHandlerSuite) TestUpdateWorkflow() {
7 WorkflowTaskScheduled
8 WorkflowTaskStarted`, <-writtenHistoryCh)
})

s.Run("Discard speculative WFT with events", func() {
tv := testvars.New(s.T())
tv = tv.WithRunID(tv.Any().RunID())
s.mockNamespaceCache.EXPECT().GetNamespaceByID(tv.NamespaceID()).Return(tv.Namespace(), nil).AnyTimes()
wfContext := s.createStartedWorkflow(tv)
// Expect only 2 calls to UpdateWorkflowExecution: for timer started and timer fired events but not Update or WFT events.
writtenHistoryCh := createWrittenHistoryCh(2)
ms, err := wfContext.LoadMutableState(context.Background(), s.workflowTaskCompletedHandler.shardContext)
s.NoError(err)

_, _, err = ms.AddTimerStartedEvent(
1,
&commandpb.StartTimerCommandAttributes{
TimerId: tv.TimerID(),
StartToFireTimeout: tv.InfiniteTimeout(),
},
)
s.NoError(err)
err = wfContext.UpdateWorkflowExecutionAsActive(context.Background(), s.workflowTaskCompletedHandler.shardContext)
s.NoError(err)

s.EqualHistoryEvents(`
2 TimerStarted
`, <-writtenHistoryCh)

updRequestMsg, upd, serializedTaskToken := s.createSentUpdate(tv, "1", wfContext)
s.NotNil(upd)

_, err = s.workflowTaskCompletedHandler.Invoke(context.Background(), &historyservice.RespondWorkflowTaskCompletedRequest{
NamespaceId: tv.NamespaceID().String(),
CompleteRequest: &workflowservice.RespondWorkflowTaskCompletedRequest{
TaskToken: serializedTaskToken,
Messages: s.UpdateRejectMessages(tv, updRequestMsg, "1"),
Identity: tv.Any().String(),
Capabilities: &workflowservice.RespondWorkflowTaskCompletedRequest_Capabilities{
DiscardSpeculativeWorkflowTaskWithEvents: true,
},
},
})
s.NoError(err)

updStatus, err := upd.WaitLifecycleStage(context.Background(), enumspb.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_UNSPECIFIED, time.Duration(0))
s.NoError(err)
s.Equal(enumspb.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_COMPLETED.String(), updStatus.Stage.String())
s.Equal("rejection-of-"+tv.UpdateID("1"), updStatus.Outcome.GetFailure().GetMessage())

ms, err = wfContext.LoadMutableState(context.Background(), s.workflowTaskCompletedHandler.shardContext)
s.NoError(err)
_, err = ms.AddTimerFiredEvent(tv.TimerID())
s.NoError(err)
err = wfContext.UpdateWorkflowExecutionAsActive(context.Background(), s.workflowTaskCompletedHandler.shardContext)
s.NoError(err)

s.EqualHistoryEvents(`
3 TimerFired // No WFT events in between 2 and 3.
`, <-writtenHistoryCh)
})

s.Run("Do not discard speculative WFT with more than 10 events", func() {
tv := testvars.New(s.T())
tv = tv.WithRunID(tv.Any().RunID())
s.mockNamespaceCache.EXPECT().GetNamespaceByID(tv.NamespaceID()).Return(tv.Namespace(), nil).AnyTimes()
wfContext := s.createStartedWorkflow(tv)
// Expect 2 calls to UpdateWorkflowExecution: for timer started and WFT events.
writtenHistoryCh := createWrittenHistoryCh(2)
ms, err := wfContext.LoadMutableState(context.Background(), s.workflowTaskCompletedHandler.shardContext)
s.NoError(err)

for i := 0; i < 11; i++ {
_, _, err = ms.AddTimerStartedEvent(
1,
&commandpb.StartTimerCommandAttributes{
TimerId: tv.TimerID(strconv.Itoa(i)),
StartToFireTimeout: tv.InfiniteTimeout(),
},
)
s.NoError(err)
}
err = wfContext.UpdateWorkflowExecutionAsActive(context.Background(), s.workflowTaskCompletedHandler.shardContext)
s.NoError(err)

s.EqualHistoryEvents(`
2 TimerStarted
3 TimerStarted
4 TimerStarted
5 TimerStarted
6 TimerStarted
7 TimerStarted
8 TimerStarted
9 TimerStarted
10 TimerStarted
11 TimerStarted
12 TimerStarted
`, <-writtenHistoryCh)

updRequestMsg, upd, serializedTaskToken := s.createSentUpdate(tv, "1", wfContext)
s.NotNil(upd)

_, err = s.workflowTaskCompletedHandler.Invoke(context.Background(), &historyservice.RespondWorkflowTaskCompletedRequest{
NamespaceId: tv.NamespaceID().String(),
CompleteRequest: &workflowservice.RespondWorkflowTaskCompletedRequest{
TaskToken: serializedTaskToken,
Messages: s.UpdateRejectMessages(tv, updRequestMsg, "1"),
Identity: tv.Any().String(),
Capabilities: &workflowservice.RespondWorkflowTaskCompletedRequest_Capabilities{
DiscardSpeculativeWorkflowTaskWithEvents: true,
},
},
})
s.NoError(err)

updStatus, err := upd.WaitLifecycleStage(context.Background(), enumspb.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_UNSPECIFIED, time.Duration(0))
s.NoError(err)
s.Equal(enumspb.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_COMPLETED.String(), updStatus.Stage.String())
s.Equal("rejection-of-"+tv.UpdateID("1"), updStatus.Outcome.GetFailure().GetMessage())

s.EqualHistoryEvents(`
13 WorkflowTaskScheduled // WFT events were created even if it was a rejection (because number of events > 10).
14 WorkflowTaskStarted
15 WorkflowTaskCompleted
`, <-writtenHistoryCh)
})
}

func (s *WorkflowTaskCompletedHandlerSuite) TestHandleBufferedQueries() {
Expand Down Expand Up @@ -420,7 +548,7 @@ func (s *WorkflowTaskCompletedHandlerSuite) createStartedWorkflow(tv *testvars.T
WorkflowExecutionTimeout: durationpb.New(tv.InfiniteTimeout().AsDuration()),
WorkflowRunTimeout: durationpb.New(tv.InfiniteTimeout().AsDuration()),
WorkflowTaskTimeout: durationpb.New(tv.InfiniteTimeout().AsDuration()),
Identity: tv.Any().String(),
Identity: tv.ClientIdentity(),
}

_, _ = ms.AddWorkflowExecutionStartedEvent(
Expand Down
18 changes: 10 additions & 8 deletions service/history/configs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,9 +255,10 @@ type Config struct {
DefaultWorkflowTaskTimeout dynamicconfig.DurationPropertyFnWithNamespaceFilter
// WorkflowTaskHeartbeatTimeout is to timeout behavior of: RespondWorkflowTaskComplete with ForceCreateNewWorkflowTask == true
// without any commands or messages. After this timeout workflow task will be scheduled to another worker(by clear stickyness).
WorkflowTaskHeartbeatTimeout dynamicconfig.DurationPropertyFnWithNamespaceFilter
WorkflowTaskCriticalAttempts dynamicconfig.IntPropertyFn
WorkflowTaskRetryMaxInterval dynamicconfig.DurationPropertyFn
WorkflowTaskHeartbeatTimeout dynamicconfig.DurationPropertyFnWithNamespaceFilter
WorkflowTaskCriticalAttempts dynamicconfig.IntPropertyFn
WorkflowTaskRetryMaxInterval dynamicconfig.DurationPropertyFn
DiscardSpeculativeWorkflowTaskMaximumEventsCount dynamicconfig.IntPropertyFn

// The following is used by the new RPC replication stack
ReplicationTaskApplyTimeout dynamicconfig.DurationPropertyFn
Expand Down Expand Up @@ -586,11 +587,12 @@ func NewConfig(
ThrottledLogRPS: dynamicconfig.HistoryThrottledLogRPS.Get(dc),
EnableStickyQuery: dynamicconfig.EnableStickyQuery.Get(dc),

DefaultActivityRetryPolicy: dynamicconfig.DefaultActivityRetryPolicy.Get(dc),
DefaultWorkflowRetryPolicy: dynamicconfig.DefaultWorkflowRetryPolicy.Get(dc),
WorkflowTaskHeartbeatTimeout: dynamicconfig.WorkflowTaskHeartbeatTimeout.Get(dc),
WorkflowTaskCriticalAttempts: dynamicconfig.WorkflowTaskCriticalAttempts.Get(dc),
WorkflowTaskRetryMaxInterval: dynamicconfig.WorkflowTaskRetryMaxInterval.Get(dc),
DefaultActivityRetryPolicy: dynamicconfig.DefaultActivityRetryPolicy.Get(dc),
DefaultWorkflowRetryPolicy: dynamicconfig.DefaultWorkflowRetryPolicy.Get(dc),
WorkflowTaskHeartbeatTimeout: dynamicconfig.WorkflowTaskHeartbeatTimeout.Get(dc),
WorkflowTaskCriticalAttempts: dynamicconfig.WorkflowTaskCriticalAttempts.Get(dc),
WorkflowTaskRetryMaxInterval: dynamicconfig.WorkflowTaskRetryMaxInterval.Get(dc),
DiscardSpeculativeWorkflowTaskMaximumEventsCount: dynamicconfig.DiscardSpeculativeWorkflowTaskMaximumEventsCount.Get(dc),

ReplicationTaskApplyTimeout: dynamicconfig.ReplicationTaskApplyTimeout.Get(dc),
ReplicationTaskFetcherParallelism: dynamicconfig.ReplicationTaskFetcherParallelism.Get(dc),
Expand Down
36 changes: 27 additions & 9 deletions service/history/workflow/workflow_task_state_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -624,28 +624,46 @@ func (m *workflowTaskStateMachine) skipWorkflowTaskCompletedEvent(workflowTaskTy

if request.GetForceCreateNewWorkflowTask() {
// If ForceCreateNewWorkflowTask is set to true, then this is a heartbeat response.
// New WT will be created as Normal and WorkflowTaskCompletedEvent for this WT is also must be written.
// In the future, if we decide not to write heartbeat of speculative WT to the history, this check should be removed,
// and extra logic should be added to create next WT as Speculative. Currently, new heartbeat WT is always created as Normal.
// New WFT will be created as Normal and WorkflowTaskCompletedEvent for this WFT is also must be written.
// In the future, if we decide not to write heartbeat of speculative WFT to the history, this check should be removed,
// and extra logic should be added to create next WFT as Speculative. Currently, new heartbeat WFT is always created as Normal.
metrics.SpeculativeWorkflowTaskCommits.With(m.metricsHandler).Record(1,
metrics.ReasonTag("force_create_task"))
return false
}

// Speculative WFT can be dropped only if there are no events after previous WFTCompleted event,
// Speculative WFT that has only Update rejection messages should be discarded (this function returns `true`).
// If speculative WFT also shipped events to the worker and was discarded, then
// next WFT will ship these events again. Unfortunately, old SDKs don't support receiving same events more than once.
// If SDK supports this, it will set DiscardSpeculativeWorkflowTaskWithEvents to `true`
// and server can discard speculative WFT even if it had events.

// Otherwise, server needs to determinate if there were events on this speculative WFT,
// i.e. last event in the history is WFTCompleted event.
// It is guaranteed that WFTStarted event is followed by WFTCompleted event and history tail might look like:
// previous WFTStarted
// previous WFTCompleted
// --> NextEventID points here because it doesn't move for speculative WFT.
// In this case difference between NextEventID and LastCompletedWorkflowTaskStartedEventId is 2.
// If there are other events after WFTCompleted event, then difference is > 2 and speculative WFT can't be dropped.
if m.ms.GetNextEventID() != m.ms.GetLastCompletedWorkflowTaskStartedEventId()+2 {
// In this case, the difference between NextEventID and LastCompletedWorkflowTaskStartedEventId is 2.
// If there are other events after WFTCompleted event, then the difference is > 2 and speculative WFT can't be discarded.
if !request.GetCapabilities().GetDiscardSpeculativeWorkflowTaskWithEvents() &&
m.ms.GetNextEventID() > m.ms.GetLastCompletedWorkflowTaskStartedEventId()+2 {
metrics.SpeculativeWorkflowTaskCommits.With(m.metricsHandler).Record(1,
metrics.ReasonTag("interleaved_events"))
return false
}

// Even if worker supports receiving same events more than once,
// server still writes speculative WFT if it had too many events.
// This is to prevent shipping a big set of events to the worker over and over again,
// in case if Updates are constantly rejected.
if request.GetCapabilities().GetDiscardSpeculativeWorkflowTaskWithEvents() &&
m.ms.GetNextEventID() > m.ms.GetLastCompletedWorkflowTaskStartedEventId()+2+int64(m.ms.config.DiscardSpeculativeWorkflowTaskMaximumEventsCount()) {
metrics.SpeculativeWorkflowTaskCommits.With(m.metricsHandler).Record(1,
metrics.ReasonTag("too_many_interleaved_events"))
return false
}

for _, message := range request.Messages {
if !message.GetBody().MessageIs((*updatepb.Rejection)(nil)) {
metrics.SpeculativeWorkflowTaskCommits.With(m.metricsHandler).Record(1,
Expand All @@ -654,9 +672,9 @@ func (m *workflowTaskStateMachine) skipWorkflowTaskCompletedEvent(workflowTaskTy
}
}

// Speculative WT can be dropped when response contains only rejection messages.
// Speculative WFT can be discarded when response contains only rejection messages.
// Empty messages list is equivalent to only rejection messages because server will reject all sent updates (if any).
//

// TODO: We should perform a shard ownership check here to prevent the case where the entire speculative workflow task
// is done on a stale mutable state and the fact that mutable state is stale caused workflow update requests to be rejected.
// NOTE: The AssertShardOwnership persistence API is not implemented in the repo.
Expand Down

0 comments on commit b2fb628

Please sign in to comment.