Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create TTL in the UpdateWorkflowExecution cycles. #5243

Merged
merged 50 commits into from
Jun 29, 2023
Merged
Show file tree
Hide file tree
Changes from 48 commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
9d6a369
Adding TTL to the executions during the updateworkflow step
agautam478 Apr 28, 2023
ac5d6b4
Determining TTL duration
agautam478 Apr 28, 2023
98869d1
Adding Dynamic Config to TTL
agautam478 May 2, 2023
b25a168
Merge branch 'master' into updateworkflow
agautam478 May 2, 2023
9f03e49
Added DC and updated tests
agautam478 May 2, 2023
82b06e5
Merge branch 'master' into updateworkflow
agautam478 May 2, 2023
7b21e82
Added the Dynamic config to persistence tests
agautam478 May 2, 2023
0f2023e
Added updateworkflowTTL unit test
agautam478 May 4, 2023
68a2b02
Merge branch 'master' into updateworkflow
agautam478 May 4, 2023
1b55939
Added updateworkflowTTL unit test
agautam478 May 4, 2023
2d7a4cb
Added updateworkflowTTL unit test
agautam478 May 4, 2023
95cb954
Added updateworkflowTTL unit test
agautam478 May 4, 2023
0dd736c
Added updateworkflowTTL unit test
agautam478 May 4, 2023
d9d6c19
Merge branch 'master' into updateworkflow
agautam478 May 4, 2023
43ea9b8
Added updateworkflowTTL unit test
agautam478 May 4, 2023
db60ce0
Added updateworkflowTTL unit test
agautam478 May 4, 2023
facde95
Added updateworkflowTTL unit test
agautam478 May 4, 2023
8093616
Added updateworkflowTTL unit test
agautam478 May 4, 2023
db68b04
Updated the tests with TODO and new checks
agautam478 May 4, 2023
9444968
Updated the tests with TODO and new checks
agautam478 May 4, 2023
d8b7493
Updated the tests with a new check
agautam478 May 4, 2023
aefe10b
Merge branch 'master' into updateworkflow
agautam478 May 5, 2023
83a49f1
Merge branch 'master' into updateworkflow
agautam478 May 5, 2023
fcb483b
Merge branch 'master' into updateworkflow
agautam478 May 8, 2023
33f2cd3
Resolved nit comments
agautam478 May 8, 2023
b78b80f
Added TODO for flow change
agautam478 May 8, 2023
fb2db42
Merge branch 'master' into updateworkflow
agautam478 May 31, 2023
0302ea0
Moved the TTL from Update cycles to the workflow mutation:
agautam478 Jun 1, 2023
ab0f84c
Cleaned up the code and fixed minor bugs
agautam478 Jun 2, 2023
60b27d0
Merge branch 'master' into updateworkflow
agautam478 Jun 2, 2023
8a1933a
Merge branch 'master' into updateworkflow
agautam478 Jun 6, 2023
f4d3b50
Resolved comments and updated unit test
agautam478 Jun 6, 2023
834b1ed
Merge branch 'master' into updateworkflow
agautam478 Jun 8, 2023
229ddcb
Merge branch 'master' into updateworkflow
agautam478 Jun 9, 2023
7b6a9eb
Resolved the code comments
agautam478 Jun 13, 2023
16f7788
Added comment explaining the cron starttime
agautam478 Jun 13, 2023
8291df6
Merge branch 'master' into updateworkflow
agautam478 Jun 14, 2023
091baad
Added new unit test for TTL
agautam478 Jun 15, 2023
253d158
Added new unit test for TTL
agautam478 Jun 15, 2023
c55f175
Merge branch 'master' into updateworkflow
agautam478 Jun 15, 2023
ff30f9b
Added new unit test for TTL
agautam478 Jun 15, 2023
500aa35
Updated updateworkflow params
agautam478 Jun 15, 2023
f0ca8f1
New create and update info
agautam478 Jun 15, 2023
11517a2
Add TTL info
agautam478 Jun 15, 2023
07ec550
Add TTL check for get workflow
agautam478 Jun 15, 2023
9c89bf3
Merge branch 'master' into updateworkflow
agautam478 Jun 27, 2023
ca24c56
fixing the primary key bug
agautam478 Jun 16, 2023
401636c
Added an insert to enable TTL for the primary keys
agautam478 Jun 27, 2023
1c4ac0c
Added close status check to state check
agautam478 Jun 28, 2023
9637621
Merge branch 'master' into updateworkflow
agautam478 Jun 29, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -1840,6 +1840,13 @@ const (
EnableShardIDMetrics
// LastBoolKey must be the last one in this const group
LastBoolKey

// EnableExecutionTTL is which domains are allowed to have workflow executions with a TTL
// KeyName: system.enableExecutionTTL
// Value type: Bool
// Default value: false
// Allowed filters: DomainID
EnableExecutionTTL
)

const (
Expand Down Expand Up @@ -3930,6 +3937,11 @@ var BoolKeys = map[BoolKey]DynamicBool{
Description: "Enable shardId metrics in persistence client",
DefaultValue: true,
},
EnableExecutionTTL: DynamicBool{
KeyName: "system.enableExecutionTTL",
Description: "EnableExecutionTTL is which domains are allowed to have workflow executions with a TTL",
DefaultValue: false,
},
}

var FloatKeys = map[FloatKey]DynamicFloat{
Expand Down
2 changes: 2 additions & 0 deletions common/persistence/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type (
EnableCassandraAllConsistencyLevelDelete dynamicconfig.BoolPropertyFn
PersistenceSampleLoggingRate dynamicconfig.IntPropertyFn
EnableShardIDMetrics dynamicconfig.BoolPropertyFn
EnableExecutionTTL dynamicconfig.BoolPropertyFnWithDomainIDFilter
}
)

Expand All @@ -41,5 +42,6 @@ func NewDynamicConfiguration(dc *dynamicconfig.Collection) *DynamicConfiguration
EnableCassandraAllConsistencyLevelDelete: dc.GetBoolProperty(dynamicconfig.EnableCassandraAllConsistencyLevelDelete),
PersistenceSampleLoggingRate: dc.GetIntProperty(dynamicconfig.SampleLoggingRate),
EnableShardIDMetrics: dc.GetBoolProperty(dynamicconfig.EnableShardIDMetrics),
EnableExecutionTTL: dc.GetBoolPropertyFilteredByDomainID(dynamicconfig.EnableExecutionTTL),
}
}
5 changes: 3 additions & 2 deletions common/persistence/dataManagerInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -1024,8 +1024,9 @@ type (
ReplicationTasks []Task
TimerTasks []Task

Condition int64
Checksum checksum.Checksum
Condition int64
TTLInSeconds int64
Checksum checksum.Checksum
}

// WorkflowSnapshot is used as generic workflow execution state snapshot
Expand Down
4 changes: 2 additions & 2 deletions common/persistence/dataStoreInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -460,8 +460,8 @@ type (
CrossClusterTasks []Task
TimerTasks []Task
ReplicationTasks []Task

Condition int64
TTLInSeconds int64
Condition int64

Checksum checksum.Checksum
}
Expand Down
7 changes: 3 additions & 4 deletions common/persistence/executionManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,6 @@ func (m *executionManagerImpl) UpdateWorkflowExecution(
return nil, err
}
}

newRequest := &InternalUpdateWorkflowExecutionRequest{
RangeID: request.RangeID,

Expand Down Expand Up @@ -661,9 +660,9 @@ func (m *executionManagerImpl) SerializeWorkflowMutation(
CrossClusterTasks: input.CrossClusterTasks,
ReplicationTasks: input.ReplicationTasks,
TimerTasks: input.TimerTasks,

Condition: input.Condition,
Checksum: input.Checksum,
TTLInSeconds: input.TTLInSeconds,
Condition: input.Condition,
Checksum: input.Checksum,
}, nil
}

Expand Down
1 change: 0 additions & 1 deletion common/persistence/nosql/nosqlExecutionStore.go
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,6 @@ func (d *nosqlExecutionStore) ConflictResolveWorkflowExecution(
ShardID: d.shardID,
RangeID: request.RangeID,
}

err = d.db.UpdateWorkflowExecutionWithTasks(
ctx, currentWorkflowWriteReq,
mutateExecution, insertExecution, resetExecution,
Expand Down
8 changes: 6 additions & 2 deletions common/persistence/nosql/nosqlExecutionStoreUtil.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,10 @@ func (d *nosqlExecutionStore) prepareResetWorkflowExecutionRequestWithMapsAndEve
versionHistories := resetWorkflow.VersionHistories
nowTimestamp := time.Now()

//TTLInSeconds is invalid in this case so passing a default value of 0.
executionRequest, err := d.prepareUpdateWorkflowExecutionTxn(
executionInfo, versionHistories, checkSum,
nowTimestamp, lastWriteVersion,
nowTimestamp, lastWriteVersion, 0,
)
if err != nil {
return nil, err
Expand Down Expand Up @@ -127,13 +128,14 @@ func (d *nosqlExecutionStore) prepareUpdateWorkflowExecutionRequestWithMapsAndEv
) (*nosqlplugin.WorkflowExecutionRequest, error) {
executionInfo := workflowMutation.ExecutionInfo
lastWriteVersion := workflowMutation.LastWriteVersion
ttlInSeconds := workflowMutation.TTLInSeconds
checkSum := workflowMutation.Checksum
versionHistories := workflowMutation.VersionHistories
nowTimestamp := time.Now()

executionRequest, err := d.prepareUpdateWorkflowExecutionTxn(
executionInfo, versionHistories, checkSum,
nowTimestamp, lastWriteVersion,
nowTimestamp, lastWriteVersion, ttlInSeconds,
)
if err != nil {
return nil, err
Expand Down Expand Up @@ -607,6 +609,7 @@ func (d *nosqlExecutionStore) prepareUpdateWorkflowExecutionTxn(
checksum checksum.Checksum,
nowTimestamp time.Time,
lastWriteVersion int64,
ttlInSeconds int64,
) (*nosqlplugin.WorkflowExecutionRequest, error) {
// validate workflow state & close status
if err := p.ValidateUpdateWorkflowStateCloseStatus(
Expand Down Expand Up @@ -636,6 +639,7 @@ func (d *nosqlExecutionStore) prepareUpdateWorkflowExecutionTxn(
VersionHistories: versionHistories,
Checksums: &checksum,
LastWriteVersion: lastWriteVersion,
TTLInSeconds: ttlInSeconds,
}, nil
}

Expand Down
14 changes: 12 additions & 2 deletions common/persistence/nosql/nosqlplugin/cassandra/workflowCql.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,8 +315,18 @@ workflow_state = ? ` +
`WHERE shard_id = ? ` +
`and type = ?`

templateUpdateWorkflowExecutionWithVersionHistoriesQuery = `UPDATE executions ` +
`SET execution = ` + templateWorkflowExecutionType +
templateUpdateWorkflowExecutionWithVersionHistoriesQueryPart1 = `INSERT INTO executions (` +
`domain_id, ` +
`run_id, ` +
`shard_id, ` +
`task_id, ` +
`type, ` +
`visibility_ts, ` +
`workflow_id ` +
`) VALUES (?, ?, ?, ?, ?, ?, ?) USING TTL ?`

templateUpdateWorkflowExecutionWithVersionHistoriesQueryPart2 = `UPDATE executions ` +
`USING TTL ? SET execution = ` + templateWorkflowExecutionType +
`, next_event_id = ? ` +
`, version_histories = ? ` +
`, version_histories_encoding = ? ` +
Expand Down
24 changes: 20 additions & 4 deletions common/persistence/nosql/nosqlplugin/cassandra/workflowUtils.go
Original file line number Diff line number Diff line change
Expand Up @@ -1078,7 +1078,6 @@ func (db *cdb) resetWorkflowExecutionAndMapsAndEventBuffer(
if execution.MapsWriteMode != nosqlplugin.WorkflowExecutionMapsWriteModeReset {
return fmt.Errorf("should only support WorkflowExecutionMapsWriteModeReset")
}

err = db.resetActivityInfos(batch, shardID, domainID, workflowID, execution.RunID, execution.ActivityInfos)
if err != nil {
return err
Expand Down Expand Up @@ -1168,7 +1167,6 @@ func (db *cdb) updateWorkflowExecutionAndEventBufferWithMergeAndDeleteMaps(
return err
}
}

if execution.MapsWriteMode != nosqlplugin.WorkflowExecutionMapsWriteModeUpdate {
return fmt.Errorf("should only support WorkflowExecutionMapsWriteModeUpdate")
}
Expand Down Expand Up @@ -1205,8 +1203,26 @@ func (db *cdb) updateWorkflowExecution(
) error {
execution.StartTimestamp = db.convertToCassandraTimestamp(execution.StartTimestamp)
execution.LastUpdatedTimestamp = db.convertToCassandraTimestamp(execution.LastUpdatedTimestamp)

batch.Query(templateUpdateWorkflowExecutionWithVersionHistoriesQuery,
//default TTL Value. 0 TTL means no ttl is set, hence your records will persist forever unless explicitly deleted.
ttlInSeconds := 0
//Only fires when the workflow is closing.
if execution.CloseStatus == 1 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you should check state == WorkflowStateCompleted instead

if db.dc.EnableExecutionTTL(domainID) {
ttlInSeconds = int(execution.TTLInSeconds)
}
batch.Query(templateUpdateWorkflowExecutionWithVersionHistoriesQueryPart1,
domainID,
execution.RunID,
shardID,
rowTypeExecutionTaskID,
rowTypeExecution,
defaultVisibilityTimestamp,
workflowID,
ttlInSeconds,
)
}
batch.Query(templateUpdateWorkflowExecutionWithVersionHistoriesQueryPart2,
ttlInSeconds,
domainID,
workflowID,
execution.RunID,
Expand Down
1 change: 1 addition & 0 deletions common/persistence/nosql/nosqlplugin/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type (
VersionHistories *persistence.DataBlob
Checksums *checksum.Checksum
LastWriteVersion int64
TTLInSeconds int64
// condition checking for updating execution info
PreviousNextEventIDCondition *int64

Expand Down
86 changes: 86 additions & 0 deletions common/persistence/persistence-tests/executionManagerTest.go
Original file line number Diff line number Diff line change
Expand Up @@ -609,6 +609,92 @@ func (s *ExecutionManagerSuite) TestUpdateWorkflowExecutionStateCloseStatus() {
}
}

// TestUpdateWorkflowExecutionTTL test
func (s *ExecutionManagerSuite) TestUpdateWorkflowExecutionTTL() {
//TODO: Update the test when the TTLbuffer becomes configurable.
agautam478 marked this conversation as resolved.
Show resolved Hide resolved
ctx, cancel := context.WithTimeout(context.Background(), testContextTimeout)
defer cancel()
if s.ExecutionManager.GetName() != "cassandra" {
// TTL API is only supported in cassandra"
return
}

domainID := uuid.New()
workflowID := "update-workflow-test-with-ttl"
workflowExecution := types.WorkflowExecution{
WorkflowID: workflowID,
RunID: uuid.New(),
}
tasklist := "some random tasklist"
workflowType := "some random workflow type"
workflowTimeout := int32(10)
decisionTimeout := int32(14)
lastProcessedEventID := int64(0)
nextEventID := int64(3)
csum := s.newRandomChecksum()
versionHistory := p.NewVersionHistory([]byte{}, []*p.VersionHistoryItem{
{
EventID: nextEventID,
Version: common.EmptyVersion,
},
})
versionHistories := p.NewVersionHistories(versionHistory)

// create and update a workflow to make it completed
req := &p.CreateWorkflowExecutionRequest{
NewWorkflowSnapshot: p.WorkflowSnapshot{
ExecutionInfo: &p.WorkflowExecutionInfo{
CreateRequestID: uuid.New(),
DomainID: domainID,
WorkflowID: workflowExecution.GetWorkflowID(),
RunID: workflowExecution.GetRunID(),
FirstExecutionRunID: workflowExecution.GetRunID(),
TaskList: tasklist,
WorkflowTypeName: workflowType,
WorkflowTimeout: workflowTimeout,
DecisionStartToCloseTimeout: decisionTimeout,
NextEventID: nextEventID,
LastProcessedEvent: lastProcessedEventID,
State: p.WorkflowStateRunning,
CloseStatus: p.WorkflowCloseStatusNone,
},
ExecutionStats: &p.ExecutionStats{},
Checksum: csum,
VersionHistories: versionHistories,
},
RangeID: s.ShardInfo.RangeID,
Mode: p.CreateWorkflowModeBrandNew,
}
_, err := s.ExecutionManager.CreateWorkflowExecution(ctx, req)
s.Nil(err)
currentRunID, err := s.GetCurrentWorkflowRunID(ctx, domainID, workflowID)
s.Nil(err)
s.Equal(workflowExecution.GetRunID(), currentRunID)

info, err := s.GetWorkflowExecutionInfo(ctx, domainID, workflowExecution)
s.Nil(err)

updatedInfo := copyWorkflowExecutionInfo(info.ExecutionInfo)
updateStats := copyExecutionStats(info.ExecutionStats)
updatedInfo.State = p.WorkflowStateCompleted
updatedInfo.CloseStatus = p.WorkflowCloseStatusCompleted
_, err = s.ExecutionManager.UpdateWorkflowExecution(ctx, &p.UpdateWorkflowExecutionRequest{
UpdateWorkflowMutation: p.WorkflowMutation{
ExecutionInfo: updatedInfo,
ExecutionStats: updateStats,
Condition: nextEventID,
TTLInSeconds: 1,
VersionHistories: versionHistories,
},
RangeID: s.ShardInfo.RangeID,
Mode: p.UpdateWorkflowModeUpdateCurrent,
})
s.NoError(err)
time.Sleep(2 * time.Second)
info, err = s.GetWorkflowExecutionInfo(ctx, domainID, workflowExecution)
s.IsType(&types.EntityNotExistsError{}, err)
}

// TestUpdateWorkflowExecutionWithZombieState test
func (s *ExecutionManagerSuite) TestUpdateWorkflowExecutionWithZombieState() {
ctx, cancel := context.WithTimeout(context.Background(), testContextTimeout)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ func NewTestBaseWithNoSQL(options *TestBaseOptions) TestBase {
EnableCassandraAllConsistencyLevelDelete: dynamicconfig.GetBoolPropertyFn(true),
PersistenceSampleLoggingRate: dynamicconfig.GetIntPropertyFn(100),
EnableShardIDMetrics: dynamicconfig.GetBoolPropertyFn(true),
EnableExecutionTTL: dynamicconfig.GetBoolPropertyFnFilteredByDomainID(true),
}
params := TestBaseParams{
DefaultTestCluster: testCluster,
Expand Down
3 changes: 3 additions & 0 deletions config/dynamicconfig/development.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ system.minRetentionDays:
history.EnableConsistentQueryByDomain:
- value: true
constraints: {}
system.enableExecutionTTL:
- value: true
constraints: {}
history.enableCrossClusterOperations:
- value: true
constraints: {}
Expand Down
3 changes: 2 additions & 1 deletion service/history/execution/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ import (

const (
defaultRemoteCallTimeout = 30 * time.Second
ttlBufferDays = 15
dayToSecondMultiplier = 86400
)

type conflictError struct {
Expand Down Expand Up @@ -1214,7 +1216,6 @@ func (c *contextImpl) updateWorkflowExecutionWithRetry(
resp, err = c.shard.UpdateWorkflowExecution(ctx, request)
return err
}

isRetryable := func(err error) bool {
if _, ok := err.(*persistence.TimeoutError); ok {
// timeout error is not retryable for update workflow execution
Expand Down
34 changes: 32 additions & 2 deletions service/history/execution/mutable_state_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -4066,6 +4066,11 @@ func (e *mutableStateBuilder) CloseTransactionAsMutation(
// impact the checksum calculation
checksum := e.generateChecksum()

TTLInSeconds, err := e.calculateTTL()
if err != nil {
e.logError("TTL calculation failed")
}

workflowMutation := &persistence.WorkflowMutation{
ExecutionInfo: e.executionInfo,
VersionHistories: e.versionHistories,
Expand All @@ -4090,8 +4095,9 @@ func (e *mutableStateBuilder) CloseTransactionAsMutation(
ReplicationTasks: e.insertReplicationTasks,
TimerTasks: e.insertTimerTasks,

Condition: e.nextEventIDInDB,
Checksum: checksum,
Condition: e.nextEventIDInDB,
Checksum: checksum,
TTLInSeconds: int64(TTLInSeconds),
}

e.checksum = checksum
Expand Down Expand Up @@ -4807,3 +4813,27 @@ func (e *mutableStateBuilder) logDataInconsistency() {
tag.WorkflowRunID(runID),
)
}
func (e *mutableStateBuilder) calculateTTL() (int, error) {
domainID := e.executionInfo.DomainID
//Calculating the TTL for workflow Execution.

domainObj, err := e.shard.GetDomainCache().GetDomainByID(domainID)
if err != nil {
return 0, err
}
config := domainObj.GetConfig()
retention := time.Duration(config.Retention)
daysInSeconds := int((retention + ttlBufferDays) * dayToSecondMultiplier)
//Default state of TTL, means there is no TTL attached.
TTLInSeconds := 0
startTime := e.executionInfo.StartTimestamp
//Handles Cron and Delaystart. For Cron workflows the StartTimestamp does not show up until the wf has started.
//default value os TTL ie. 0 will be passed down in this case. The TTL is calculated only if the startTime is non zero.
if !time.Time.IsZero(startTime) {
Shaddoll marked this conversation as resolved.
Show resolved Hide resolved
CalculateTTLInSeconds := int(e.executionInfo.WorkflowTimeout) - int(time.Now().Sub(startTime).Seconds()) + daysInSeconds
if CalculateTTLInSeconds >= 0 {
return CalculateTTLInSeconds, nil
}
}
return TTLInSeconds, nil
}
1 change: 1 addition & 0 deletions service/history/execution/mutable_state_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ func (s *mutableStateSuite) SetupTest() {
s.logger = s.mockShard.GetLogger()

s.mockShard.Resource.DomainCache.EXPECT().GetDomainID(constants.TestDomainName).Return(constants.TestDomainID, nil).AnyTimes()
s.mockShard.Resource.DomainCache.EXPECT().GetDomainByID(constants.TestDomainID).Return(constants.TestLocalDomainEntry, nil).AnyTimes()

s.msBuilder = newMutableStateBuilder(s.mockShard, s.logger, constants.TestLocalDomainEntry)
}
Expand Down
Loading