Skip to content

Commit

Permalink
Add support of update time for Cassandra, Mysql and Postgres (#4971)
Browse files Browse the repository at this point in the history
* Add support of update time for Cassandra, Mysql and Postgres (#2567)

* Fix integration test

* Fix one persistence test method

* Fix mysql syntax error

* Fix cassandra unit test

* Add comment to explain unit of update timestamp
  • Loading branch information
neil-xie authored Sep 9, 2022
1 parent 5fd0127 commit 60f7b13
Show file tree
Hide file tree
Showing 26 changed files with 101 additions and 31 deletions.
1 change: 1 addition & 0 deletions common/persistence/dataStoreInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -700,6 +700,7 @@ type (
TaskList string
IsCron bool
NumClusters int16
UpdateTimestamp time.Time
SearchAttributes map[string][]byte
}

Expand Down
1 change: 1 addition & 0 deletions common/persistence/dataVisibilityManagerInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ type (
TaskList string
IsCron bool
NumClusters int16
UpdateTimestamp int64 // unit is unix nano, consistent with start/execution timestamp, same in other requests
SearchAttributes map[string][]byte
}

Expand Down
2 changes: 2 additions & 0 deletions common/persistence/nosql/nosqlVisibilityStore.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ func (v *nosqlVisibilityStore) RecordWorkflowExecutionStarted(
TaskList: request.TaskList,
IsCron: request.IsCron,
NumClusters: request.NumClusters,
UpdateTime: request.UpdateTimestamp,
},
})
if err != nil {
Expand Down Expand Up @@ -119,6 +120,7 @@ func (v *nosqlVisibilityStore) RecordWorkflowExecutionClosed(
Status: &request.Status,
CloseTime: request.CloseTimestamp,
HistoryLength: request.HistoryLength,
UpdateTime: request.UpdateTimestamp,
},
})

Expand Down
30 changes: 20 additions & 10 deletions common/persistence/nosql/nosqlplugin/cassandra/visibility.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,17 +39,17 @@ const (

const (
///////////////// Open Executions /////////////////
openExecutionsColumnsForSelect = " workflow_id, run_id, start_time, execution_time, workflow_type_name, memo, encoding, task_list, is_cron, num_clusters "
openExecutionsColumnsForSelect = " workflow_id, run_id, start_time, execution_time, workflow_type_name, memo, encoding, task_list, is_cron, num_clusters, update_time "

openExecutionsColumnsForInsert = "(domain_id, domain_partition, " + openExecutionsColumnsForSelect + ")"

templateCreateWorkflowExecutionStartedWithTTL = `INSERT INTO open_executions ` +
openExecutionsColumnsForInsert +
`VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) using TTL ?`
`VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) using TTL ?`

templateCreateWorkflowExecutionStarted = `INSERT INTO open_executions` +
openExecutionsColumnsForInsert +
`VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`
`VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`

templateDeleteWorkflowExecutionStarted = `DELETE FROM open_executions ` +
`WHERE domain_id = ? ` +
Expand Down Expand Up @@ -96,25 +96,25 @@ const (
`and run_id = ? `

///////////////// Closed Executions /////////////////
closedExecutionColumnsForSelect = " workflow_id, run_id, start_time, execution_time, close_time, workflow_type_name, status, history_length, memo, encoding, task_list, is_cron, num_clusters "
closedExecutionColumnsForSelect = " workflow_id, run_id, start_time, execution_time, close_time, workflow_type_name, status, history_length, memo, encoding, task_list, is_cron, num_clusters, update_time "

closedExecutionColumnsForInsert = "(domain_id, domain_partition, " + closedExecutionColumnsForSelect + ")"

templateCreateWorkflowExecutionClosedWithTTL = `INSERT INTO closed_executions ` +
closedExecutionColumnsForInsert +
`VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) using TTL ?`
`VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) using TTL ?`

templateCreateWorkflowExecutionClosed = `INSERT INTO closed_executions ` +
closedExecutionColumnsForInsert +
`VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`
`VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`

templateCreateWorkflowExecutionClosedWithTTLV2 = `INSERT INTO closed_executions_v2 ` +
closedExecutionColumnsForInsert +
`VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) using TTL ?`
`VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) using TTL ?`

templateCreateWorkflowExecutionClosedV2 = `INSERT INTO closed_executions_v2 ` +
closedExecutionColumnsForInsert +
`VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`
`VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`

templateGetClosedWorkflowExecutions = `SELECT ` + closedExecutionColumnsForSelect +
`FROM closed_executions ` +
Expand Down Expand Up @@ -204,6 +204,7 @@ func (db *cdb) InsertVisibility(ctx context.Context, ttlSeconds int64, row *nosq
row.TaskList,
row.IsCron,
row.NumClusters,
row.UpdateTime,
).WithContext(ctx)
} else {
query = db.session.Query(templateCreateWorkflowExecutionStartedWithTTL,
Expand All @@ -219,6 +220,7 @@ func (db *cdb) InsertVisibility(ctx context.Context, ttlSeconds int64, row *nosq
row.TaskList,
row.IsCron,
row.NumClusters,
row.UpdateTime,
ttlSeconds,
).WithContext(ctx)
}
Expand Down Expand Up @@ -262,6 +264,7 @@ func (db *cdb) UpdateVisibility(ctx context.Context, ttlSeconds int64, row *nosq
row.TaskList,
row.IsCron,
row.NumClusters,
row.UpdateTime,
)
// duplicate write to v2 to order by close time
batch.Query(templateCreateWorkflowExecutionClosedV2,
Expand All @@ -280,6 +283,7 @@ func (db *cdb) UpdateVisibility(ctx context.Context, ttlSeconds int64, row *nosq
row.TaskList,
row.IsCron,
row.NumClusters,
row.UpdateTime,
)
} else {
batch.Query(templateCreateWorkflowExecutionClosedWithTTL,
Expand All @@ -298,6 +302,7 @@ func (db *cdb) UpdateVisibility(ctx context.Context, ttlSeconds int64, row *nosq
row.TaskList,
row.IsCron,
row.NumClusters,
row.UpdateTime,
ttlSeconds,
)
// duplicate write to v2 to order by close time
Expand All @@ -317,6 +322,7 @@ func (db *cdb) UpdateVisibility(ctx context.Context, ttlSeconds int64, row *nosq
row.TaskList,
row.IsCron,
row.NumClusters,
row.UpdateTime,
ttlSeconds,
)
}
Expand Down Expand Up @@ -683,7 +689,8 @@ func readOpenWorkflowExecutionRecord(
var taskList string
var isCron bool
var numClusters int16
if iter.Scan(&workflowID, &runID, &startTime, &executionTime, &typeName, &memo, &encoding, &taskList, &isCron, &numClusters) {
var updateTime time.Time
if iter.Scan(&workflowID, &runID, &startTime, &executionTime, &typeName, &memo, &encoding, &taskList, &isCron, &numClusters, &updateTime) {
record := &persistence.InternalVisibilityWorkflowExecutionInfo{
WorkflowID: workflowID,
RunID: runID,
Expand All @@ -694,6 +701,7 @@ func readOpenWorkflowExecutionRecord(
TaskList: taskList,
IsCron: isCron,
NumClusters: numClusters,
UpdateTime: updateTime,
}
return record, true
}
Expand All @@ -716,7 +724,8 @@ func readClosedWorkflowExecutionRecord(
var taskList string
var isCron bool
var numClusters int16
if iter.Scan(&workflowID, &runID, &startTime, &executionTime, &closeTime, &typeName, &status, &historyLength, &memo, &encoding, &taskList, &isCron, &numClusters) {
var updateTime time.Time
if iter.Scan(&workflowID, &runID, &startTime, &executionTime, &closeTime, &typeName, &status, &historyLength, &memo, &encoding, &taskList, &isCron, &numClusters, &updateTime) {
record := &persistence.InternalVisibilityWorkflowExecutionInfo{
WorkflowID: workflowID,
RunID: runID,
Expand All @@ -730,6 +739,7 @@ func readClosedWorkflowExecutionRecord(
TaskList: taskList,
IsCron: isCron,
NumClusters: numClusters,
UpdateTime: updateTime,
}
return record, true
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ func (s *DBVisibilityPersistenceSuite) TestBasicVisibility() {
Execution: workflowExecution,
WorkflowTypeName: "visibility-workflow",
StartTimestamp: startTime,
UpdateTimestamp: 0,
}
err0 := s.VisibilityMgr.RecordWorkflowExecutionStarted(ctx, startReq)
s.Nil(err0)
Expand All @@ -139,6 +140,7 @@ func (s *DBVisibilityPersistenceSuite) TestBasicVisibility() {
WorkflowTypeName: "visibility-workflow",
StartTimestamp: startTime,
CloseTimestamp: time.Now().UnixNano(),
UpdateTimestamp: time.Now().UnixNano(),
HistoryLength: 5,
}
err2 := s.VisibilityMgr.RecordWorkflowExecutionClosed(ctx, closeReq)
Expand Down
3 changes: 3 additions & 0 deletions common/persistence/sql/sqlVisibilityStore.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ func (s *sqlVisibilityStore) RecordWorkflowExecutionStarted(
Encoding: string(request.Memo.GetEncoding()),
IsCron: request.IsCron,
NumClusters: request.NumClusters,
UpdateTime: request.UpdateTimestamp,
})

if err != nil {
Expand Down Expand Up @@ -104,6 +105,7 @@ func (s *sqlVisibilityStore) RecordWorkflowExecutionClosed(
Encoding: string(request.Memo.GetEncoding()),
IsCron: request.IsCron,
NumClusters: request.NumClusters,
UpdateTime: request.UpdateTimestamp,
})
if err != nil {
return convertCommonErrors(s.db, "RecordWorkflowExecutionClosed", "", err)
Expand Down Expand Up @@ -334,6 +336,7 @@ func (s *sqlVisibilityStore) rowToInfo(row *sqlplugin.VisibilityRow) *p.Internal
IsCron: row.IsCron,
NumClusters: row.NumClusters,
Memo: p.NewDataBlob(row.Memo, common.EncodingType(row.Encoding)),
UpdateTime: row.UpdateTime,
}
if row.CloseStatus != nil {
status := workflow.WorkflowExecutionCloseStatus(*row.CloseStatus)
Expand Down
1 change: 1 addition & 0 deletions common/persistence/sql/sqlplugin/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,7 @@ type (
Encoding string
IsCron bool
NumClusters int16
UpdateTime time.Time
}

// VisibilityFilter contains the column names within executions_visibility table that
Expand Down
18 changes: 10 additions & 8 deletions common/persistence/sql/sqlplugin/mysql/visibility.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,12 @@ import (

const (
templateCreateWorkflowExecutionStarted = `INSERT IGNORE INTO executions_visibility (` +
`domain_id, workflow_id, run_id, start_time, execution_time, workflow_type_name, memo, encoding, is_cron, num_clusters) ` +
`VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`
`domain_id, workflow_id, run_id, start_time, execution_time, workflow_type_name, memo, encoding, is_cron, num_clusters, update_time) ` +
`VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`

templateCreateWorkflowExecutionClosed = `REPLACE INTO executions_visibility (` +
`domain_id, workflow_id, run_id, start_time, execution_time, workflow_type_name, close_time, close_status, history_length, memo, encoding, is_cron, num_clusters) ` +
`VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`
`domain_id, workflow_id, run_id, start_time, execution_time, workflow_type_name, close_time, close_status, history_length, memo, encoding, is_cron, num_clusters, update_time) ` +
`VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`

// RunID condition is needed for correct pagination
templateConditions = ` AND domain_id = ?
Expand All @@ -46,7 +46,7 @@ const (
ORDER BY start_time DESC, run_id
LIMIT ?`

templateOpenFieldNames = `workflow_id, run_id, start_time, execution_time, workflow_type_name, memo, encoding, is_cron`
templateOpenFieldNames = `workflow_id, run_id, start_time, execution_time, workflow_type_name, memo, encoding, is_cron, update_time`
templateOpenSelect = `SELECT ` + templateOpenFieldNames + ` FROM executions_visibility WHERE close_status IS NULL `

templateClosedSelect = `SELECT ` + templateOpenFieldNames + `, close_time, close_status, history_length
Expand All @@ -66,7 +66,7 @@ const (

templateGetClosedWorkflowExecutionsByStatus = templateClosedSelect + `AND close_status = ?` + templateConditions

templateGetClosedWorkflowExecution = `SELECT workflow_id, run_id, start_time, execution_time, memo, encoding, close_time, workflow_type_name, close_status, history_length, is_cron
templateGetClosedWorkflowExecution = `SELECT workflow_id, run_id, start_time, execution_time, memo, encoding, close_time, workflow_type_name, close_status, history_length, is_cron, update_time
FROM executions_visibility
WHERE domain_id = ? AND close_status IS NOT NULL
AND run_id = ?`
Expand All @@ -93,7 +93,8 @@ func (mdb *db) InsertIntoVisibility(ctx context.Context, row *sqlplugin.Visibili
row.Memo,
row.Encoding,
row.IsCron,
row.NumClusters)
row.NumClusters,
row.UpdateTime)
}

// ReplaceIntoVisibility replaces an existing row if it exist or creates a new row in visibility table
Expand All @@ -118,7 +119,8 @@ func (mdb *db) ReplaceIntoVisibility(ctx context.Context, row *sqlplugin.Visibil
row.Memo,
row.Encoding,
row.IsCron,
row.NumClusters)
row.NumClusters,
row.UpdateTime)
default:
return nil, errCloseParams
}
Expand Down
21 changes: 12 additions & 9 deletions common/persistence/sql/sqlplugin/postgres/visibility.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,13 @@ import (

const (
templateCreateWorkflowExecutionStarted = `INSERT INTO executions_visibility (` +
`domain_id, workflow_id, run_id, start_time, execution_time, workflow_type_name, memo, encoding, is_cron, num_clusters) ` +
`VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
`domain_id, workflow_id, run_id, start_time, execution_time, workflow_type_name, memo, encoding, is_cron, num_clusters, update_time) ` +
`VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
ON CONFLICT (domain_id, run_id) DO NOTHING`

templateCreateWorkflowExecutionClosed = `INSERT INTO executions_visibility (` +
`domain_id, workflow_id, run_id, start_time, execution_time, workflow_type_name, close_time, close_status, history_length, memo, encoding, is_cron, num_clusters) ` +
`VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)
`domain_id, workflow_id, run_id, start_time, execution_time, workflow_type_name, close_time, close_status, history_length, memo, encoding, is_cron, num_clusters, update_time) ` +
`VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)
ON CONFLICT (domain_id, run_id) DO UPDATE
SET workflow_id = excluded.workflow_id,
start_time = excluded.start_time,
Expand All @@ -50,7 +50,8 @@ const (
memo = excluded.memo,
encoding = excluded.encoding,
is_cron = excluded.is_cron,
num_clusters = excluded.num_clusters`
num_clusters = excluded.num_clusters,
update_time = excluded.update_time`

// RunID condition is needed for correct pagination
templateConditions1 = ` AND domain_id = $1
Expand All @@ -67,7 +68,7 @@ const (
ORDER BY start_time DESC, run_id
LIMIT $7`

templateOpenFieldNames = `workflow_id, run_id, start_time, execution_time, workflow_type_name, memo, encoding, is_cron`
templateOpenFieldNames = `workflow_id, run_id, start_time, execution_time, workflow_type_name, memo, encoding, is_cron, update_time`
templateOpenSelect = `SELECT ` + templateOpenFieldNames + ` FROM executions_visibility WHERE close_status IS NULL `

templateClosedSelect = `SELECT ` + templateOpenFieldNames + `, close_time, close_status, history_length
Expand All @@ -87,7 +88,7 @@ const (

templateGetClosedWorkflowExecutionsByStatus = templateClosedSelect + `AND close_status = $1` + templateConditions2

templateGetClosedWorkflowExecution = `SELECT workflow_id, run_id, start_time, execution_time, memo, encoding, close_time, workflow_type_name, close_status, history_length, is_cron
templateGetClosedWorkflowExecution = `SELECT workflow_id, run_id, start_time, execution_time, memo, encoding, close_time, workflow_type_name, close_status, history_length, is_cron, update_time
FROM executions_visibility
WHERE domain_id = $1 AND close_status IS NOT NULL
AND run_id = $2`
Expand All @@ -112,7 +113,8 @@ func (pdb *db) InsertIntoVisibility(ctx context.Context, row *sqlplugin.Visibili
row.Memo,
row.Encoding,
row.IsCron,
row.NumClusters)
row.NumClusters,
row.UpdateTime)
}

// ReplaceIntoVisibility replaces an existing row if it exist or creates a new row in visibility table
Expand All @@ -135,7 +137,8 @@ func (pdb *db) ReplaceIntoVisibility(ctx context.Context, row *sqlplugin.Visibil
row.Memo,
row.Encoding,
row.IsCron,
row.NumClusters)
row.NumClusters,
row.UpdateTime)
default:
return nil, errCloseParams
}
Expand Down
1 change: 1 addition & 0 deletions common/persistence/visibilitySingleManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ func (v *visibilityManagerImpl) RecordWorkflowExecutionStarted(
IsCron: request.IsCron,
NumClusters: request.NumClusters,
Memo: v.serializeMemo(request.Memo, request.DomainUUID, request.Execution.GetWorkflowID(), request.Execution.GetRunID()),
UpdateTimestamp: time.Unix(0, request.UpdateTimestamp),
SearchAttributes: request.SearchAttributes,
}
return v.persistence.RecordWorkflowExecutionStarted(ctx, req)
Expand Down
2 changes: 1 addition & 1 deletion schema/cassandra/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,4 @@ package cassandra
const Version = "0.33"

// VisibilityVersion is the Cassandra visibility database release version
const VisibilityVersion = "0.7"
const VisibilityVersion = "0.8"
3 changes: 3 additions & 0 deletions schema/cassandra/visibility/schema.cql
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ CREATE TABLE open_executions (
task_list text,
is_cron boolean,
num_clusters int,
update_time timestamp,
PRIMARY KEY ((domain_id, domain_partition), start_time, run_id)
) WITH CLUSTERING ORDER BY (start_time DESC)
AND COMPACTION = {
Expand Down Expand Up @@ -39,6 +40,7 @@ CREATE TABLE closed_executions (
task_list text,
is_cron boolean,
num_clusters int,
update_time timestamp,
PRIMARY KEY ((domain_id, domain_partition), start_time, run_id)
) WITH CLUSTERING ORDER BY (start_time DESC)
AND COMPACTION = {
Expand Down Expand Up @@ -68,6 +70,7 @@ CREATE TABLE closed_executions_v2 (
task_list text,
is_cron boolean,
num_clusters int,
update_time timestamp,
PRIMARY KEY ((domain_id, domain_partition), close_time, run_id)
) WITH CLUSTERING ORDER BY (close_time DESC)
AND COMPACTION = {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
ALTER TABLE open_executions ADD update_time timestamp;
ALTER TABLE closed_executions ADD update_time timestamp;
ALTER TABLE closed_executions_v2 ADD update_time timestamp;
8 changes: 8 additions & 0 deletions schema/cassandra/visibility/versioned/v0.8/manifest.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"CurrVersion": "0.8",
"MinCompatibleVersion": "0.8",
"Description": "add update_time field to visibility",
"SchemaUpdateCqlFiles": [
"add_update_time.cql"
]
}
1 change: 1 addition & 0 deletions schema/mysql/v57/visibility/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ CREATE TABLE executions_visibility (
task_list VARCHAR(255) DEFAULT '' NOT NULL,
is_cron BOOLEAN DEFAULT false NOT NULL,
num_clusters INT NULL,
update_time DATETIME(6) NULL,

PRIMARY KEY (domain_id, run_id)
);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ALTER TABLE executions_visibility ADD update_time DATETIME(6) NULL;
Loading

0 comments on commit 60f7b13

Please sign in to comment.