Skip to content

Commit

Permalink
Add task list partition config to schema
Browse files Browse the repository at this point in the history
  • Loading branch information
Shaddoll committed Oct 9, 2024
1 parent f9e8453 commit e0cfaaa
Show file tree
Hide file tree
Showing 19 changed files with 889 additions and 95 deletions.
426 changes: 418 additions & 8 deletions .gen/go/sqlblobs/sqlblobs.go

Large diffs are not rendered by default.

24 changes: 16 additions & 8 deletions common/persistence/data_manager_interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -448,14 +448,22 @@ type (

// TaskListInfo describes a state of a task list implementation.
TaskListInfo struct {
DomainID string
Name string
TaskType int
RangeID int64
AckLevel int64
Kind int
Expiry time.Time
LastUpdated time.Time
DomainID string
Name string
TaskType int
RangeID int64
AckLevel int64
Kind int
Expiry time.Time
LastUpdated time.Time
AdaptivePartitionConfig *TaskListPartitionConfig
}

// TaskListPartitionConfig represents the configuration for task list partitions.
TaskListPartitionConfig struct {
Version int64
NumReadPartitions int
NumWritePartitions int
}

// TaskInfo describes either activity or decision task
Expand Down
68 changes: 40 additions & 28 deletions common/persistence/nosql/nosql_task_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,14 +113,23 @@ func (t *nosqlTaskStore) LeaseTaskList(

if selectErr != nil {
if storeShard.db.IsNotFoundError(selectErr) { // First time task list is used
var c *persistence.TaskListPartitionConfig
if request.TaskListKind != persistence.TaskListKindSticky {
c = &persistence.TaskListPartitionConfig{
Version: 0,
NumReadPartitions: 1,
NumWritePartitions: 1,
}
}
currTL = &nosqlplugin.TaskListRow{
DomainID: request.DomainID,
TaskListName: request.TaskList,
TaskListType: request.TaskType,
RangeID: initialRangeID,
TaskListKind: request.TaskListKind,
AckLevel: initialAckLevel,
LastUpdatedTime: now,
DomainID: request.DomainID,
TaskListName: request.TaskList,
TaskListType: request.TaskType,
RangeID: initialRangeID,
TaskListKind: request.TaskListKind,
AckLevel: initialAckLevel,
LastUpdatedTime: now,
AdaptivePartitionConfig: c,
}
err = storeShard.db.InsertTaskList(ctx, currTL)
} else {
Expand All @@ -141,13 +150,14 @@ func (t *nosqlTaskStore) LeaseTaskList(
currTL.RangeID++

err = storeShard.db.UpdateTaskList(ctx, &nosqlplugin.TaskListRow{
DomainID: request.DomainID,
TaskListName: request.TaskList,
TaskListType: request.TaskType,
RangeID: currTL.RangeID,
TaskListKind: currTL.TaskListKind,
AckLevel: currTL.AckLevel,
LastUpdatedTime: now,
DomainID: request.DomainID,
TaskListName: request.TaskList,
TaskListType: request.TaskType,
RangeID: currTL.RangeID,
TaskListKind: currTL.TaskListKind,
AckLevel: currTL.AckLevel,
LastUpdatedTime: now,
AdaptivePartitionConfig: currTL.AdaptivePartitionConfig,
}, currTL.RangeID-1)
}
if err != nil {
Expand All @@ -161,13 +171,14 @@ func (t *nosqlTaskStore) LeaseTaskList(
return nil, convertCommonErrors(storeShard.db, "LeaseTaskList", err)
}
tli := &persistence.TaskListInfo{
DomainID: request.DomainID,
Name: request.TaskList,
TaskType: request.TaskType,
RangeID: currTL.RangeID,
AckLevel: currTL.AckLevel,
Kind: request.TaskListKind,
LastUpdated: now,
DomainID: request.DomainID,
Name: request.TaskList,
TaskType: request.TaskType,
RangeID: currTL.RangeID,
AckLevel: currTL.AckLevel,
Kind: request.TaskListKind,
LastUpdated: now,
AdaptivePartitionConfig: currTL.AdaptivePartitionConfig,
}
return &persistence.LeaseTaskListResponse{TaskListInfo: tli}, nil
}
Expand All @@ -179,13 +190,14 @@ func (t *nosqlTaskStore) UpdateTaskList(
tli := request.TaskListInfo
var err error
taskListToUpdate := &nosqlplugin.TaskListRow{
DomainID: tli.DomainID,
TaskListName: tli.Name,
TaskListType: tli.TaskType,
RangeID: tli.RangeID,
TaskListKind: tli.Kind,
AckLevel: tli.AckLevel,
LastUpdatedTime: time.Now(),
DomainID: tli.DomainID,
TaskListName: tli.Name,
TaskListType: tli.TaskType,
RangeID: tli.RangeID,
TaskListKind: tli.Kind,
AckLevel: tli.AckLevel,
LastUpdatedTime: time.Now(),
AdaptivePartitionConfig: tli.AdaptivePartitionConfig,
}
storeShard, err := t.GetStoreShardByTaskList(tli.DomainID, tli.Name, tli.TaskType)
if err != nil {
Expand Down
36 changes: 32 additions & 4 deletions common/persistence/nosql/nosql_task_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,13 @@ func TestLeaseTaskList_selectErrNotFound(t *testing.T) {
// We then expect the tasklist to be inserted
db.EXPECT().InsertTaskList(gomock.Any(), gomock.Any()).
DoAndReturn(func(ctx context.Context, taskList *nosqlplugin.TaskListRow) error {
checkTaskListRowExpected(t, getExpectedTaskListRow(), taskList)
tl := getExpectedTaskListRow()
tl.AdaptivePartitionConfig = &persistence.TaskListPartitionConfig{
Version: 0,
NumReadPartitions: 1,
NumWritePartitions: 1,
}
checkTaskListRowExpected(t, tl, taskList)
return nil
})

Expand Down Expand Up @@ -247,7 +253,7 @@ func TestUpdateTaskList(t *testing.T) {

db.EXPECT().UpdateTaskList(gomock.Any(), gomock.Any(), int64(1)).DoAndReturn(
func(ctx context.Context, taskList *nosqlplugin.TaskListRow, previousRangeID int64) error {
checkTaskListRowExpected(t, getExpectedTaskListRow(), taskList)
checkTaskListRowExpected(t, getExpectedTaskListRowWithPartitionConfig(), taskList)
return nil
},
)
Expand All @@ -265,7 +271,7 @@ func TestUpdateTaskList_Sticky(t *testing.T) {

db.EXPECT().UpdateTaskListWithTTL(gomock.Any(), stickyTaskListTTL, gomock.Any(), int64(1)).DoAndReturn(
func(ctx context.Context, ttlSeconds int64, taskList *nosqlplugin.TaskListRow, previousRangeID int64) error {
expectedTaskList := getExpectedTaskListRow()
expectedTaskList := getExpectedTaskListRowWithPartitionConfig()
expectedTaskList.TaskListKind = int(types.TaskListKindSticky)
checkTaskListRowExpected(t, expectedTaskList, taskList)
return nil
Expand All @@ -288,7 +294,7 @@ func TestUpdateTaskList_ConditionFailure(t *testing.T) {

db.EXPECT().UpdateTaskList(gomock.Any(), gomock.Any(), int64(1)).DoAndReturn(
func(ctx context.Context, taskList *nosqlplugin.TaskListRow, previousRangeID int64) error {
checkTaskListRowExpected(t, getExpectedTaskListRow(), taskList)
checkTaskListRowExpected(t, getExpectedTaskListRowWithPartitionConfig(), taskList)
return &nosqlplugin.TaskOperationConditionFailure{Details: "test-details"}
},
)
Expand Down Expand Up @@ -484,6 +490,23 @@ func getExpectedTaskListRow() *nosqlplugin.TaskListRow {
}
}

func getExpectedTaskListRowWithPartitionConfig() *nosqlplugin.TaskListRow {
return &nosqlplugin.TaskListRow{
DomainID: TestDomainID,
TaskListName: TestTaskListName,
TaskListType: int(types.TaskListTypeDecision),
RangeID: initialRangeID,
TaskListKind: int(types.TaskListKindNormal),
AckLevel: initialAckLevel,
LastUpdatedTime: time.Now(),
AdaptivePartitionConfig: &persistence.TaskListPartitionConfig{
Version: 1,
NumReadPartitions: 2,
NumWritePartitions: 2,
},
}
}

func checkTaskListRowExpected(t *testing.T, expectedRow *nosqlplugin.TaskListRow, taskList *nosqlplugin.TaskListRow) {
// Check the duration
assert.WithinDuration(t, expectedRow.LastUpdatedTime, taskList.LastUpdatedTime, time.Second)
Expand All @@ -502,6 +525,11 @@ func getExpectedTaskListInfo() *persistence.TaskListInfo {
AckLevel: initialAckLevel,
Kind: int(types.TaskListKindNormal),
LastUpdated: time.Now(),
AdaptivePartitionConfig: &persistence.TaskListPartitionConfig{
Version: 1,
NumReadPartitions: 2,
NumWritePartitions: 2,
},
}
}

Expand Down
41 changes: 37 additions & 4 deletions common/persistence/nosql/nosqlplugin/cassandra/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,43 @@ func (db *cdb) SelectTaskList(ctx context.Context, filter *nosqlplugin.TaskListF
TaskListName: filter.TaskListName,
TaskListType: filter.TaskListType,

TaskListKind: taskListKind,
LastUpdatedTime: lastUpdatedTime,
AckLevel: ackLevel,
RangeID: rangeID,
TaskListKind: taskListKind,
LastUpdatedTime: lastUpdatedTime,
AckLevel: ackLevel,
RangeID: rangeID,
AdaptivePartitionConfig: toTaskListPartitionConfig(tlDB["adaptive_partition_config"]),
}, nil
}

func toTaskListPartitionConfig(v interface{}) *persistence.TaskListPartitionConfig {
if v == nil {
return nil
}
partition := v.(map[string]interface{})
if len(partition) == 0 {
return nil
}
version := partition["version"].(int64)
numRead := partition["num_read_partitions"].(int)
numWrite := partition["num_write_partitions"].(int)
return &persistence.TaskListPartitionConfig{
Version: version,
NumReadPartitions: numRead,
NumWritePartitions: numWrite,
}
}

func fromTaskListPartitionConfig(config *persistence.TaskListPartitionConfig) map[string]interface{} {
if config == nil {
return nil
}
return map[string]interface{}{
"version": config.Version,
"num_read_partitions": config.NumReadPartitions,
"num_write_partitions": config.NumWritePartitions,
}
}

// InsertTaskList insert a single tasklist row
// Return TaskOperationConditionFailure if the condition doesn't meet
func (db *cdb) InsertTaskList(ctx context.Context, row *nosqlplugin.TaskListRow) error {
Expand All @@ -93,6 +123,7 @@ func (db *cdb) InsertTaskList(ctx context.Context, row *nosqlplugin.TaskListRow)
0,
row.TaskListKind,
row.LastUpdatedTime,
fromTaskListPartitionConfig(row.AdaptivePartitionConfig),
).WithContext(ctx)

previous := make(map[string]interface{})
Expand All @@ -119,6 +150,7 @@ func (db *cdb) UpdateTaskList(
row.AckLevel,
row.TaskListKind,
row.LastUpdatedTime,
fromTaskListPartitionConfig(row.AdaptivePartitionConfig),
row.DomainID,
row.TaskListName,
row.TaskListType,
Expand Down Expand Up @@ -182,6 +214,7 @@ func (db *cdb) UpdateTaskListWithTTL(
row.AckLevel,
row.TaskListKind,
db.timeSrc.Now(),
fromTaskListPartitionConfig(row.AdaptivePartitionConfig),
row.DomainID,
row.TaskListName,
row.TaskListType,
Expand Down
3 changes: 2 additions & 1 deletion common/persistence/nosql/nosqlplugin/cassandra/tasks_cql.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ const (
`type: ?, ` +
`ack_level: ?, ` +
`kind: ?, ` +
`last_updated: ? ` +
`last_updated: ?, ` +
`adaptive_partition_config: ? ` +
`}`

templateTaskType = `{` +
Expand Down
47 changes: 43 additions & 4 deletions common/persistence/nosql/nosqlplugin/cassandra/tasks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ func TestSelectTaskList(t *testing.T) {
(*tlDB)["ack_level"] = int64(1000)
(*tlDB)["kind"] = 2
(*tlDB)["last_updated"] = now
(*tlDB)["adaptive_partition_config"] = map[string]interface{}{
"version": int64(0),
"num_read_partitions": int(1),
"num_write_partitions": int(1),
}
return nil
}).Times(1)
},
Expand All @@ -77,6 +82,11 @@ func TestSelectTaskList(t *testing.T) {
AckLevel: 1000,
RangeID: 25,
LastUpdatedTime: now,
AdaptivePartitionConfig: &persistence.TaskListPartitionConfig{
Version: 0,
NumReadPartitions: 1,
NumWritePartitions: 1,
},
},
wantQueries: []string{
`SELECT range_id, task_list FROM tasks WHERE domain_id = domain1 and task_list_name = tasklist1 and task_list_type = 1 and type = 1 and task_id = -12345`,
Expand Down Expand Up @@ -149,7 +159,7 @@ func TestInsertTaskList(t *testing.T) {
wantErr bool
}{
{
name: "successfully applied",
name: "successfully applied - nil partition_config",
row: &nosqlplugin.TaskListRow{
DomainID: "domain1",
TaskListName: "tasklist1",
Expand All @@ -168,7 +178,36 @@ func TestInsertTaskList(t *testing.T) {
wantQueries: []string{
`INSERT INTO tasks (domain_id, task_list_name, task_list_type, type, task_id, range_id, task_list ) ` +
`VALUES (domain1, tasklist1, 1, 1, -12345, 1, ` +
`{domain_id: domain1, name: tasklist1, type: 1, ack_level: 0, kind: 2, last_updated: 2024-04-01T22:08:41Z }` +
`{domain_id: domain1, name: tasklist1, type: 1, ack_level: 0, kind: 2, last_updated: 2024-04-01T22:08:41Z, adaptive_partition_config: map[] }` +
`) IF NOT EXISTS`,
},
},
{
name: "successfully applied - non-nil partition_config",
row: &nosqlplugin.TaskListRow{
DomainID: "domain1",
TaskListName: "tasklist1",
TaskListType: 1,
TaskListKind: 2,
AckLevel: 1000,
RangeID: 25,
LastUpdatedTime: ts,
AdaptivePartitionConfig: &persistence.TaskListPartitionConfig{
Version: 1,
NumReadPartitions: 1,
NumWritePartitions: 1,
},
},
queryMockFn: func(query *gocql.MockQuery) {
query.EXPECT().WithContext(gomock.Any()).Return(query).Times(1)
query.EXPECT().MapScanCAS(gomock.Any()).DoAndReturn(func(prev map[string]interface{}) (bool, error) {
return true, nil
}).Times(1)
},
wantQueries: []string{
`INSERT INTO tasks (domain_id, task_list_name, task_list_type, type, task_id, range_id, task_list ) ` +
`VALUES (domain1, tasklist1, 1, 1, -12345, 1, ` +
`{domain_id: domain1, name: tasklist1, type: 1, ack_level: 0, kind: 2, last_updated: 2024-04-01T22:08:41Z, adaptive_partition_config: map[num_read_partitions:1 num_write_partitions:1 version:1] }` +
`) IF NOT EXISTS`,
},
},
Expand Down Expand Up @@ -278,7 +317,7 @@ func TestUpdateTaskList(t *testing.T) {
}).Times(1)
},
wantQueries: []string{
`UPDATE tasks SET range_id = 25, task_list = {domain_id: domain1, name: tasklist1, type: 1, ack_level: 1000, kind: 2, last_updated: 2024-04-01T22:08:41Z } WHERE domain_id = domain1 and task_list_name = tasklist1 and task_list_type = 1 and type = 1 and task_id = -12345 IF range_id = 25`,
`UPDATE tasks SET range_id = 25, task_list = {domain_id: domain1, name: tasklist1, type: 1, ack_level: 1000, kind: 2, last_updated: 2024-04-01T22:08:41Z, adaptive_partition_config: map[] } WHERE domain_id = domain1 and task_list_name = tasklist1 and task_list_type = 1 and type = 1 and task_id = -12345 IF range_id = 25`,
},
},
{
Expand Down Expand Up @@ -386,7 +425,7 @@ func TestUpdateTaskListWithTTL(t *testing.T) {
mapExecuteBatchCASApplied: true,
wantQueries: []string{
` INSERT INTO tasks (domain_id, task_list_name, task_list_type, type, task_id ) VALUES (domain1, tasklist1, 1, 1, -12345) USING TTL 180`,
`UPDATE tasks USING TTL 180 SET range_id = 25, task_list = {domain_id: domain1, name: tasklist1, type: 1, ack_level: 1000, kind: 2, last_updated: 2024-04-01T22:08:41Z } WHERE domain_id = domain1 and task_list_name = tasklist1 and task_list_type = 1 and type = 1 and task_id = -12345 IF range_id = 25`,
`UPDATE tasks USING TTL 180 SET range_id = 25, task_list = {domain_id: domain1, name: tasklist1, type: 1, ack_level: 1000, kind: 2, last_updated: 2024-04-01T22:08:41Z, adaptive_partition_config: map[] } WHERE domain_id = domain1 and task_list_name = tasklist1 and task_list_type = 1 and type = 1 and task_id = -12345 IF range_id = 25`,
},
},
{
Expand Down
9 changes: 5 additions & 4 deletions common/persistence/nosql/nosqlplugin/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,10 +185,11 @@ type (
TaskListName string
TaskListType int

RangeID int64
TaskListKind int
AckLevel int64
LastUpdatedTime time.Time
RangeID int64
TaskListKind int
AckLevel int64
LastUpdatedTime time.Time
AdaptivePartitionConfig *persistence.TaskListPartitionConfig
}

// ListTaskListResult is the result of list tasklists
Expand Down
Loading

0 comments on commit e0cfaaa

Please sign in to comment.