Skip to content

Commit

Permalink
Add FirstExecutionRunID to mutable state (uber#5031)
Browse files Browse the repository at this point in the history
  • Loading branch information
Shaddoll authored Nov 23, 2022
1 parent 1a52714 commit d249434
Show file tree
Hide file tree
Showing 26 changed files with 587 additions and 304 deletions.
160 changes: 144 additions & 16 deletions .gen/go/shared/shared.go

Large diffs are not rendered by default.

68 changes: 64 additions & 4 deletions .gen/go/sqlblobs/sqlblobs.go

Large diffs are not rendered by default.

282 changes: 142 additions & 140 deletions .gen/proto/history/v1/service.pb.yarpc.go

Large diffs are not rendered by default.

282 changes: 142 additions & 140 deletions .gen/proto/matching/v1/service.pb.yarpc.go

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions common/persistence/dataManagerInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,7 @@ type (
DomainID string
WorkflowID string
RunID string
FirstExecutionRunID string
ParentDomainID string
ParentWorkflowID string
ParentRunID string
Expand Down
1 change: 1 addition & 0 deletions common/persistence/dataStoreInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,7 @@ type (
DomainID string
WorkflowID string
RunID string
FirstExecutionRunID string
ParentDomainID string
ParentWorkflowID string
ParentRunID string
Expand Down
2 changes: 2 additions & 0 deletions common/persistence/executionManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ func (m *executionManagerImpl) DeserializeExecutionInfo(
DomainID: info.DomainID,
WorkflowID: info.WorkflowID,
RunID: info.RunID,
FirstExecutionRunID: info.FirstExecutionRunID,
ParentDomainID: info.ParentDomainID,
ParentWorkflowID: info.ParentWorkflowID,
ParentRunID: info.ParentRunID,
Expand Down Expand Up @@ -459,6 +460,7 @@ func (m *executionManagerImpl) SerializeExecutionInfo(
DomainID: info.DomainID,
WorkflowID: info.WorkflowID,
RunID: info.RunID,
FirstExecutionRunID: info.FirstExecutionRunID,
ParentDomainID: info.ParentDomainID,
ParentWorkflowID: info.ParentWorkflowID,
ParentRunID: info.ParentRunID,
Expand Down
10 changes: 10 additions & 0 deletions common/persistence/nosql/nosqlExecutionStoreUtil.go
Original file line number Diff line number Diff line change
Expand Up @@ -622,6 +622,11 @@ func (d *nosqlExecutionStore) prepareUpdateWorkflowExecutionTxn(
executionInfo.InitiatedID = emptyInitiatedID
}

// TODO: remove this logic once all workflows before 0.26.x are completed
if executionInfo.FirstExecutionRunID == "" {
executionInfo.FirstExecutionRunID = emptyRunID
}

executionInfo.CompletionEvent = executionInfo.CompletionEvent.ToNilSafeDataBlob()
executionInfo.AutoResetPoints = executionInfo.AutoResetPoints.ToNilSafeDataBlob()
// TODO also need to set the start / current / last write version
Expand Down Expand Up @@ -655,6 +660,11 @@ func (d *nosqlExecutionStore) prepareCreateWorkflowExecutionTxn(
executionInfo.InitiatedID = emptyInitiatedID
}

// TODO: remove this logic once all workflows before 0.26.x are completed
if executionInfo.FirstExecutionRunID == "" {
executionInfo.FirstExecutionRunID = emptyRunID
}

if executionInfo.StartTimestamp.IsZero() {
executionInfo.StartTimestamp = nowTimestamp
d.logger.Error("Workflow startTimestamp not set, fallback to now",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ const (
`domain_id: ?, ` +
`workflow_id: ?, ` +
`run_id: ?, ` +
`first_run_id: ?, ` +
`parent_domain_id: ?, ` +
`parent_workflow_id: ?, ` +
`parent_run_id: ?, ` +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,14 @@ func parseWorkflowExecutionInfo(
info.WorkflowID = v.(string)
case "run_id":
info.RunID = v.(gocql.UUID).String()
case "first_run_id":
info.FirstExecutionRunID = v.(gocql.UUID).String()
if info.FirstExecutionRunID == _emptyUUID.String() {
// for backward compatibility, the gocql library doesn't handle the null uuid correectly https://github.com/gocql/gocql/blob/master/marshal.go#L1807
info.FirstExecutionRunID = ""
} else if info.FirstExecutionRunID == emptyRunID {
info.FirstExecutionRunID = ""
}
case "parent_domain_id":
info.ParentDomainID = v.(gocql.UUID).String()
if info.ParentDomainID == emptyDomainID {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1210,6 +1210,7 @@ func (db *cdb) updateWorkflowExecution(
domainID,
workflowID,
execution.RunID,
execution.FirstExecutionRunID,
execution.ParentDomainID,
execution.ParentWorkflowID,
execution.ParentRunID,
Expand Down Expand Up @@ -1304,6 +1305,7 @@ func (db *cdb) createWorkflowExecution(
domainID,
workflowID,
execution.RunID,
execution.FirstExecutionRunID,
execution.ParentDomainID,
execution.ParentWorkflowID,
execution.ParentRunID,
Expand Down
24 changes: 24 additions & 0 deletions common/persistence/persistence-tests/executionManagerTest.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ func (s *ExecutionManagerSuite) TestCreateWorkflowExecutionDeDup() {
DomainID: domainID,
WorkflowID: workflowID,
RunID: runID,
FirstExecutionRunID: runID,
TaskList: tasklist,
WorkflowTypeName: workflowType,
WorkflowTimeout: workflowTimeout,
Expand Down Expand Up @@ -239,6 +240,7 @@ func (s *ExecutionManagerSuite) TestCreateWorkflowExecutionStateCloseStatus() {
}
req.NewWorkflowSnapshot.ExecutionInfo.WorkflowID = workflowExecutionStatusCreated.GetWorkflowID()
req.NewWorkflowSnapshot.ExecutionInfo.RunID = workflowExecutionStatusCreated.GetRunID()
req.NewWorkflowSnapshot.ExecutionInfo.FirstExecutionRunID = workflowExecutionStatusCreated.GetRunID()
req.NewWorkflowSnapshot.ExecutionInfo.State = p.WorkflowStateCreated
for _, invalidCloseStatus := range invalidCloseStatuses {
req.NewWorkflowSnapshot.ExecutionInfo.CloseStatus = invalidCloseStatus
Expand All @@ -260,6 +262,7 @@ func (s *ExecutionManagerSuite) TestCreateWorkflowExecutionStateCloseStatus() {
}
req.NewWorkflowSnapshot.ExecutionInfo.WorkflowID = workflowExecutionStatusRunning.GetWorkflowID()
req.NewWorkflowSnapshot.ExecutionInfo.RunID = workflowExecutionStatusRunning.GetRunID()
req.NewWorkflowSnapshot.ExecutionInfo.FirstExecutionRunID = workflowExecutionStatusRunning.GetRunID()
req.NewWorkflowSnapshot.ExecutionInfo.State = p.WorkflowStateRunning
for _, invalidCloseStatus := range invalidCloseStatuses {
req.NewWorkflowSnapshot.ExecutionInfo.CloseStatus = invalidCloseStatus
Expand All @@ -281,6 +284,7 @@ func (s *ExecutionManagerSuite) TestCreateWorkflowExecutionStateCloseStatus() {
}
req.NewWorkflowSnapshot.ExecutionInfo.WorkflowID = workflowExecutionStatusCompleted.GetWorkflowID()
req.NewWorkflowSnapshot.ExecutionInfo.RunID = workflowExecutionStatusCompleted.GetRunID()
req.NewWorkflowSnapshot.ExecutionInfo.FirstExecutionRunID = workflowExecutionStatusCompleted.GetRunID()
req.NewWorkflowSnapshot.ExecutionInfo.State = p.WorkflowStateCompleted
for _, invalidCloseStatus := range invalidCloseStatuses {
req.NewWorkflowSnapshot.ExecutionInfo.CloseStatus = invalidCloseStatus
Expand All @@ -300,6 +304,7 @@ func (s *ExecutionManagerSuite) TestCreateWorkflowExecutionStateCloseStatus() {
req.Mode = p.CreateWorkflowModeZombie
req.NewWorkflowSnapshot.ExecutionInfo.WorkflowID = workflowExecutionStatusZombie.GetWorkflowID()
req.NewWorkflowSnapshot.ExecutionInfo.RunID = workflowExecutionStatusZombie.GetRunID()
req.NewWorkflowSnapshot.ExecutionInfo.FirstExecutionRunID = workflowExecutionStatusZombie.GetRunID()
req.NewWorkflowSnapshot.ExecutionInfo.State = p.WorkflowStateZombie
for _, invalidCloseStatus := range invalidCloseStatuses {
req.NewWorkflowSnapshot.ExecutionInfo.CloseStatus = invalidCloseStatus
Expand Down Expand Up @@ -349,6 +354,7 @@ func (s *ExecutionManagerSuite) TestCreateWorkflowExecutionWithZombieState() {
DomainID: domainID,
WorkflowID: workflowID,
RunID: workflowExecutionZombie1.GetRunID(),
FirstExecutionRunID: workflowExecutionZombie1.GetRunID(),
TaskList: tasklist,
WorkflowTypeName: workflowType,
WorkflowTimeout: workflowTimeout,
Expand All @@ -375,6 +381,7 @@ func (s *ExecutionManagerSuite) TestCreateWorkflowExecutionWithZombieState() {
RunID: uuid.New(),
}
req.NewWorkflowSnapshot.ExecutionInfo.RunID = workflowExecutionRunning.GetRunID()
req.NewWorkflowSnapshot.ExecutionInfo.FirstExecutionRunID = workflowExecutionRunning.GetRunID()
req.Mode = p.CreateWorkflowModeBrandNew
req.NewWorkflowSnapshot.ExecutionInfo.State = p.WorkflowStateRunning
req.NewWorkflowSnapshot.ExecutionInfo.CloseStatus = p.WorkflowCloseStatusNone
Expand All @@ -389,6 +396,7 @@ func (s *ExecutionManagerSuite) TestCreateWorkflowExecutionWithZombieState() {
RunID: uuid.New(),
}
req.NewWorkflowSnapshot.ExecutionInfo.RunID = workflowExecutionZombie.GetRunID()
req.NewWorkflowSnapshot.ExecutionInfo.FirstExecutionRunID = workflowExecutionZombie.GetRunID()
req.Mode = p.CreateWorkflowModeZombie
req.NewWorkflowSnapshot.ExecutionInfo.State = p.WorkflowStateZombie
req.NewWorkflowSnapshot.ExecutionInfo.CloseStatus = p.WorkflowCloseStatusNone
Expand Down Expand Up @@ -445,6 +453,7 @@ func (s *ExecutionManagerSuite) TestUpdateWorkflowExecutionStateCloseStatus() {
DomainID: domainID,
WorkflowID: workflowExecution.GetWorkflowID(),
RunID: workflowExecution.GetRunID(),
FirstExecutionRunID: workflowExecution.GetRunID(),
TaskList: tasklist,
WorkflowTypeName: workflowType,
WorkflowTimeout: workflowTimeout,
Expand Down Expand Up @@ -553,6 +562,7 @@ func (s *ExecutionManagerSuite) TestUpdateWorkflowExecutionStateCloseStatus() {
}
req.NewWorkflowSnapshot.ExecutionInfo.WorkflowID = workflowExecutionRunning.GetWorkflowID()
req.NewWorkflowSnapshot.ExecutionInfo.RunID = workflowExecutionRunning.GetRunID()
req.NewWorkflowSnapshot.ExecutionInfo.FirstExecutionRunID = workflowExecutionRunning.GetRunID()
req.Mode = p.CreateWorkflowModeWorkflowIDReuse
req.PreviousRunID = workflowExecution.GetRunID()
req.PreviousLastWriteVersion = common.EmptyVersion
Expand Down Expand Up @@ -633,6 +643,7 @@ func (s *ExecutionManagerSuite) TestUpdateWorkflowExecutionWithZombieState() {
DomainID: domainID,
WorkflowID: workflowExecution.GetWorkflowID(),
RunID: workflowExecution.GetRunID(),
FirstExecutionRunID: workflowExecution.GetRunID(),
TaskList: tasklist,
WorkflowTypeName: workflowType,
WorkflowTimeout: workflowTimeout,
Expand Down Expand Up @@ -700,6 +711,7 @@ func (s *ExecutionManagerSuite) TestUpdateWorkflowExecutionWithZombieState() {
csum = checksum.Checksum{} // set checksum to nil
req.NewWorkflowSnapshot.ExecutionInfo.WorkflowID = workflowExecutionRunning.GetWorkflowID()
req.NewWorkflowSnapshot.ExecutionInfo.RunID = workflowExecutionRunning.GetRunID()
req.NewWorkflowSnapshot.ExecutionInfo.FirstExecutionRunID = workflowExecutionRunning.GetRunID()
req.Mode = p.CreateWorkflowModeWorkflowIDReuse
req.PreviousRunID = workflowExecution.GetRunID()
req.PreviousLastWriteVersion = common.EmptyVersion
Expand Down Expand Up @@ -856,6 +868,7 @@ func (s *ExecutionManagerSuite) TestCreateWorkflowExecutionBrandNew() {
DomainID: domainID,
WorkflowID: workflowExecution.GetWorkflowID(),
RunID: workflowExecution.GetRunID(),
FirstExecutionRunID: workflowExecution.GetRunID(),
TaskList: tasklist,
WorkflowTypeName: workflowType,
WorkflowTimeout: workflowTimeout,
Expand Down Expand Up @@ -919,6 +932,7 @@ func (s *ExecutionManagerSuite) TestUpsertWorkflowActivity() {
DomainID: domainID,
WorkflowID: workflowExecution.GetWorkflowID(),
RunID: workflowExecution.GetRunID(),
FirstExecutionRunID: workflowExecution.GetRunID(),
TaskList: tasklist,
WorkflowTypeName: workflowType,
WorkflowTimeout: workflowTimeout,
Expand Down Expand Up @@ -1058,6 +1072,7 @@ func (s *ExecutionManagerSuite) TestCreateWorkflowExecutionRunIDReuseWithoutRepl
DomainID: domainID,
WorkflowID: newExecution.GetWorkflowID(),
RunID: newExecution.GetRunID(),
FirstExecutionRunID: newExecution.GetRunID(),
TaskList: tasklist,
WorkflowTypeName: workflowType,
WorkflowTimeout: workflowTimeout,
Expand Down Expand Up @@ -1181,6 +1196,7 @@ func (s *ExecutionManagerSuite) TestPersistenceStartWorkflow() {
DomainID: domainID,
WorkflowID: workflowExecution.GetWorkflowID(),
RunID: workflowExecution.GetRunID(),
FirstExecutionRunID: workflowExecution.GetRunID(),
TaskList: "queue1",
WorkflowTypeName: "workflow_type_test",
WorkflowTimeout: 20,
Expand Down Expand Up @@ -1305,6 +1321,7 @@ func (s *ExecutionManagerSuite) TestGetWorkflow() {
},
Mode: p.CreateWorkflowModeBrandNew,
}
createReq.NewWorkflowSnapshot.ExecutionInfo.FirstExecutionRunID = createReq.NewWorkflowSnapshot.ExecutionInfo.RunID

createResp, err := s.ExecutionManager.CreateWorkflowExecution(ctx, createReq)
s.NoError(err)
Expand Down Expand Up @@ -4412,6 +4429,7 @@ func (s *ExecutionManagerSuite) TestConflictResolveWorkflowExecutionWithTransact
DomainID: domainID,
WorkflowID: workflowExecutionReset.GetWorkflowID(),
RunID: workflowExecutionReset.GetRunID(),
FirstExecutionRunID: workflowExecutionReset.GetRunID(),
ParentDomainID: uuid.New(),
ParentWorkflowID: "some random parent workflow ID",
ParentRunID: uuid.New(),
Expand Down Expand Up @@ -4583,6 +4601,7 @@ func (s *ExecutionManagerSuite) TestConflictResolveWorkflowExecutionWithTransact
DomainID: domainID,
WorkflowID: workflowExecutionReset.GetWorkflowID(),
RunID: workflowExecutionReset.GetRunID(),
FirstExecutionRunID: workflowExecutionReset.GetRunID(),
ParentDomainID: uuid.New(),
ParentWorkflowID: "some random parent workflow ID",
ParentRunID: uuid.New(),
Expand Down Expand Up @@ -4734,6 +4753,7 @@ func (s *ExecutionManagerSuite) TestConflictResolveWorkflowExecutionWithTransact
DomainID: domainID,
WorkflowID: workflowExecutionReset.GetWorkflowID(),
RunID: workflowExecutionReset.GetRunID(),
FirstExecutionRunID: workflowExecutionReset.GetRunID(),
ParentDomainID: uuid.New(),
ParentWorkflowID: "some random parent workflow ID",
ParentRunID: uuid.New(),
Expand Down Expand Up @@ -4853,6 +4873,7 @@ func (s *ExecutionManagerSuite) TestConflictResolveWorkflowExecutionWithTransact
DomainID: domainID,
WorkflowID: workflowExecutionReset.GetWorkflowID(),
RunID: workflowExecutionReset.GetRunID(),
FirstExecutionRunID: workflowExecutionReset.GetRunID(),
ParentDomainID: uuid.New(),
ParentWorkflowID: "some random parent workflow ID",
ParentRunID: uuid.New(),
Expand Down Expand Up @@ -5011,6 +5032,7 @@ func (s *ExecutionManagerSuite) TestConflictResolveWorkflowExecutionWithTransact
DomainID: domainID,
WorkflowID: workflowExecutionReset.GetWorkflowID(),
RunID: workflowExecutionReset.GetRunID(),
FirstExecutionRunID: workflowExecutionReset.GetRunID(),
ParentDomainID: uuid.New(),
ParentWorkflowID: "some random parent workflow ID",
ParentRunID: uuid.New(),
Expand Down Expand Up @@ -5153,6 +5175,7 @@ func (s *ExecutionManagerSuite) TestConflictResolveWorkflowExecutionWithTransact
DomainID: domainID,
WorkflowID: workflowExecutionReset.GetWorkflowID(),
RunID: workflowExecutionReset.GetRunID(),
FirstExecutionRunID: workflowExecutionReset.GetRunID(),
ParentDomainID: uuid.New(),
ParentWorkflowID: "some random parent workflow ID",
ParentRunID: uuid.New(),
Expand Down Expand Up @@ -5328,6 +5351,7 @@ func copyWorkflowExecutionInfo(sourceInfo *p.WorkflowExecutionInfo) *p.WorkflowE
DomainID: sourceInfo.DomainID,
WorkflowID: sourceInfo.WorkflowID,
RunID: sourceInfo.RunID,
FirstExecutionRunID: sourceInfo.FirstExecutionRunID,
ParentDomainID: sourceInfo.ParentDomainID,
ParentWorkflowID: sourceInfo.ParentWorkflowID,
ParentRunID: sourceInfo.ParentRunID,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ func (s *ExecutionManagerSuiteForEventsV2) TestWorkflowCreation() {
DomainID: domainID,
WorkflowID: workflowExecution.GetWorkflowID(),
RunID: workflowExecution.GetRunID(),
FirstExecutionRunID: workflowExecution.GetRunID(),
TaskList: "taskList",
WorkflowTypeName: "wType",
WorkflowTimeout: 20,
Expand Down Expand Up @@ -231,6 +232,7 @@ func (s *ExecutionManagerSuiteForEventsV2) TestWorkflowCreationWithVersionHistor
DomainID: domainID,
WorkflowID: workflowExecution.GetWorkflowID(),
RunID: workflowExecution.GetRunID(),
FirstExecutionRunID: workflowExecution.GetRunID(),
TaskList: "taskList",
WorkflowTypeName: "wType",
WorkflowTimeout: 20,
Expand Down Expand Up @@ -363,6 +365,7 @@ func (s *ExecutionManagerSuiteForEventsV2) TestContinueAsNew() {
DomainID: updatedInfo.DomainID,
WorkflowID: newWorkflowExecution.GetWorkflowID(),
RunID: newWorkflowExecution.GetRunID(),
FirstExecutionRunID: updatedInfo.FirstExecutionRunID,
TaskList: updatedInfo.TaskList,
WorkflowTypeName: updatedInfo.WorkflowTypeName,
WorkflowTimeout: updatedInfo.WorkflowTimeout,
Expand Down
3 changes: 3 additions & 0 deletions common/persistence/persistence-tests/persistenceTestBase.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,7 @@ func (s *TestBase) CreateWorkflowExecutionWithBranchToken(
DomainID: domainID,
WorkflowID: workflowExecution.GetWorkflowID(),
RunID: workflowExecution.GetRunID(),
FirstExecutionRunID: workflowExecution.GetRunID(),
TaskList: taskList,
WorkflowTypeName: wType,
WorkflowTimeout: wTimeout,
Expand Down Expand Up @@ -410,6 +411,7 @@ func (s *TestBase) CreateChildWorkflowExecution(ctx context.Context, domainID st
DomainID: domainID,
WorkflowID: workflowExecution.GetWorkflowID(),
RunID: workflowExecution.GetRunID(),
FirstExecutionRunID: workflowExecution.GetRunID(),
ParentDomainID: parentDomainID,
ParentWorkflowID: parentExecution.GetWorkflowID(),
ParentRunID: parentExecution.GetRunID(),
Expand Down Expand Up @@ -532,6 +534,7 @@ func (s *TestBase) ContinueAsNewExecution(
DomainID: updatedInfo.DomainID,
WorkflowID: newExecution.GetWorkflowID(),
RunID: newExecution.GetRunID(),
FirstExecutionRunID: updatedInfo.FirstExecutionRunID,
TaskList: updatedInfo.TaskList,
WorkflowTypeName: updatedInfo.WorkflowTypeName,
WorkflowTimeout: updatedInfo.WorkflowTimeout,
Expand Down
8 changes: 8 additions & 0 deletions common/persistence/serialization/getters.go
Original file line number Diff line number Diff line change
Expand Up @@ -868,6 +868,14 @@ func (w *WorkflowExecutionInfo) GetHasRetryPolicy() (o bool) {
return
}

// GetFirstExecutionRunID internal sql blob getter
func (w *WorkflowExecutionInfo) GetFirstExecutionRunID() (o []byte) {
if w != nil {
return w.FirstExecutionRunID
}
return
}

// GetVersion internal sql blob getter
func (a *ActivityInfo) GetVersion() (o int64) {
if a != nil {
Expand Down
1 change: 1 addition & 0 deletions common/persistence/serialization/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ type (
Memo map[string][]byte
VersionHistories []byte
VersionHistoriesEncoding string
FirstExecutionRunID UUID
}

// ActivityInfo blob in a serialization agnostic format
Expand Down
Loading

0 comments on commit d249434

Please sign in to comment.