Skip to content

Commit

Permalink
comments
Browse files Browse the repository at this point in the history
  • Loading branch information
demirkayaender committed Jun 15, 2021
1 parent 7ce001e commit ae53312
Show file tree
Hide file tree
Showing 20 changed files with 80 additions and 832 deletions.
638 changes: 12 additions & 626 deletions .gen/go/sqlblobs/sqlblobs.go

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion common/archiver/historyIterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (

"github.com/uber/cadence/common"
"github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/persistence/persistence-utils"
persistenceutils "github.com/uber/cadence/common/persistence/persistence-utils"
"github.com/uber/cadence/common/types"
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import (
"github.com/uber/cadence/common/persistence/nosql/nosqlplugin"
"github.com/uber/cadence/common/persistence/nosql/nosqlplugin/cassandra"
"github.com/uber/cadence/common/persistence/nosql/nosqlplugin/cassandra/gocql"
"github.com/uber/cadence/common/persistence/persistence-utils"
persistenceutils "github.com/uber/cadence/common/persistence/persistence-utils"
"github.com/uber/cadence/common/types"
)

Expand Down
48 changes: 4 additions & 44 deletions common/persistence/dataManagerInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,10 @@ type (
}

// CrossClusterTaskInfo describes a cross-cluster task
CrossClusterTaskInfo TransferTaskInfo
// Cross cluster tasks are exactly like transfer tasks so
// instead of creating another struct and duplicating the same
// logic everywhere. We reuse TransferTaskInfo
CrossClusterTaskInfo = TransferTaskInfo

// ReplicationTaskInfo describes the replication task created for replication of history events
ReplicationTaskInfo struct {
Expand Down Expand Up @@ -2476,49 +2479,6 @@ func (t *TransferTaskInfo) String() string {
)
}

// GetTaskID returns the task ID for cross-cluster task
func (t *CrossClusterTaskInfo) GetTaskID() int64 {
return t.TaskID
}

// GetVersion returns the task version for cross-cluster task
func (t *CrossClusterTaskInfo) GetVersion() int64 {
return t.Version
}

// GetTaskType returns the task type for cross-cluster task
func (t *CrossClusterTaskInfo) GetTaskType() int {
return t.TaskType
}

// GetVisibilityTimestamp returns the task type for cross-cluster task
func (t *CrossClusterTaskInfo) GetVisibilityTimestamp() time.Time {
return t.VisibilityTimestamp
}

// GetWorkflowID returns the workflow ID for cross-cluster task
func (t *CrossClusterTaskInfo) GetWorkflowID() string {
return t.WorkflowID
}

// GetRunID returns the run ID for cross-cluster task
func (t *CrossClusterTaskInfo) GetRunID() string {
return t.RunID
}

// GetDomainID returns the domain ID for cross-cluster task
func (t *CrossClusterTaskInfo) GetDomainID() string {
return t.DomainID
}

// String returns a string representation for cross-cluster task
func (t *CrossClusterTaskInfo) String() string {
return fmt.Sprintf(
"{DomainID: %v, WorkflowID: %v, RunID: %v, TaskID: %v, TargetDomainID: %v, TargetWorkflowID %v, TargetRunID: %v, TargetChildWorkflowOnly: %v, TaskList: %v, TaskType: %v, ScheduleID: %v, Version: %v.}",
t.DomainID, t.WorkflowID, t.RunID, t.TaskID, t.TargetDomainID, t.TargetWorkflowID, t.TargetRunID, t.TargetChildWorkflowOnly, t.TaskList, t.TaskType, t.ScheduleID, t.Version,
)
}

// GetTaskID returns the task ID for replication task
func (t *ReplicationTaskInfo) GetTaskID() int64 {
return t.TaskID
Expand Down
112 changes: 16 additions & 96 deletions common/persistence/serialization/getters.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,22 @@ func (s *ShardInfo) GetTransferProcessingQueueStatesEncoding() (o string) {
return
}

// GetCrossClusterProcessingQueueStates internal sql blob getter
func (s *ShardInfo) GetCrossClusterProcessingQueueStates() (o []byte) {
if s != nil {
return s.CrossClusterProcessingQueueStates
}
return
}

// GetCrossClusterProcessingQueueStatesEncoding internal sql blob getter
func (s *ShardInfo) GetCrossClusterProcessingQueueStatesEncoding() (o string) {
if s != nil {
return s.CrossClusterProcessingQueueStatesEncoding
}
return
}

// GetTimerProcessingQueueStates internal sql blob getter
func (s *ShardInfo) GetTimerProcessingQueueStates() (o []byte) {
if s != nil {
Expand Down Expand Up @@ -1476,102 +1492,6 @@ func (t *TransferTaskInfo) GetVisibilityTimestamp() time.Time {
return time.Unix(0, 0)
}

// GetDomainID internal sql blob getter
func (t *CrossClusterTaskInfo) GetDomainID() (o []byte) {
if t != nil {
return t.DomainID
}
return
}

// GetWorkflowID internal sql blob getter
func (t *CrossClusterTaskInfo) GetWorkflowID() (o string) {
if t != nil {
return t.WorkflowID
}
return
}

// GetRunID internal sql blob getter
func (t *CrossClusterTaskInfo) GetRunID() (o []byte) {
if t != nil {
return t.RunID
}
return
}

// GetTaskType internal sql blob getter
func (t *CrossClusterTaskInfo) GetTaskType() (o int16) {
if t != nil {
return t.TaskType
}
return
}

// GetTargetDomainID internal sql blob getter
func (t *CrossClusterTaskInfo) GetTargetDomainID() (o []byte) {
if t != nil {
return t.TargetDomainID
}
return
}

// GetTargetWorkflowID internal sql blob getter
func (t *CrossClusterTaskInfo) GetTargetWorkflowID() (o string) {
if t != nil {
return t.TargetWorkflowID
}
return
}

// GetTargetRunID internal sql blob getter
func (t *CrossClusterTaskInfo) GetTargetRunID() (o []byte) {
if t != nil {
return t.TargetRunID
}
return
}

// GetTaskList internal sql blob getter
func (t *CrossClusterTaskInfo) GetTaskList() (o string) {
if t != nil {
return t.TaskList
}
return
}

// GetTargetChildWorkflowOnly internal sql blob getter
func (t *CrossClusterTaskInfo) GetTargetChildWorkflowOnly() (o bool) {
if t != nil {
return t.TargetChildWorkflowOnly
}
return
}

// GetScheduleID internal sql blob getter
func (t *CrossClusterTaskInfo) GetScheduleID() (o int64) {
if t != nil {
return t.ScheduleID
}
return
}

// GetVersion internal sql blob getter
func (t *CrossClusterTaskInfo) GetVersion() (o int64) {
if t != nil {
return t.Version
}
return
}

// GetVisibilityTimestamp internal sql blob getter
func (t *CrossClusterTaskInfo) GetVisibilityTimestamp() time.Time {
if t != nil {
return t.VisibilityTimestamp
}
return time.Unix(0, 0)
}

// GetDomainID internal sql blob getter
func (t *TimerTaskInfo) GetDomainID() (o []byte) {
if t != nil && t.DomainID != nil {
Expand Down
7 changes: 6 additions & 1 deletion common/persistence/serialization/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

"go.uber.org/thriftrw/wire"

"github.com/uber/cadence/.gen/go/sqlblobs"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/types"
Expand Down Expand Up @@ -272,7 +273,11 @@ type (
}

// CrossClusterTask blob in a serialization agnostic format
CrossClusterTaskInfo TransferTaskInfo
// Cross cluster tasks are exactly like transfer tasks so
// instead of creating another struct and duplicating the same
// logic everywhere. We reuse TransferTaskInfo
CrossClusterTaskInfo = TransferTaskInfo
sqlblobsCrossClusterTaskInfo = sqlblobs.TransferTaskInfo

// TimerTaskInfo blob in a serialization agnostic format
TimerTaskInfo struct {
Expand Down
2 changes: 1 addition & 1 deletion common/persistence/serialization/thrift_decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func (d *thriftDecoder) transferTaskInfoFromBlob(data []byte) (*TransferTaskInfo
}

func (d *thriftDecoder) crossClusterTaskInfoFromBlob(data []byte) (*CrossClusterTaskInfo, error) {
result := &sqlblobs.CrossClusterTaskInfo{}
result := &sqlblobsCrossClusterTaskInfo{}
if err := thriftRWDecode(data, result); err != nil {
return nil, err
}
Expand Down
40 changes: 4 additions & 36 deletions common/persistence/serialization/thrift_mapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -620,44 +620,12 @@ func transferTaskInfoFromThrift(info *sqlblobs.TransferTaskInfo) *TransferTaskIn
}
}

func crossClusterTaskInfoToThrift(info *CrossClusterTaskInfo) *sqlblobs.CrossClusterTaskInfo {
if info == nil {
return nil
}
return &sqlblobs.CrossClusterTaskInfo{
DomainID: info.DomainID,
WorkflowID: &info.WorkflowID,
RunID: info.RunID,
TaskType: &info.TaskType,
TargetDomainID: info.TargetDomainID,
TargetWorkflowID: &info.TargetWorkflowID,
TargetRunID: info.TargetRunID,
TaskList: &info.TaskList,
TargetChildWorkflowOnly: &info.TargetChildWorkflowOnly,
ScheduleID: &info.ScheduleID,
Version: &info.Version,
VisibilityTimestampNanos: timeToUnixNanoPtr(info.VisibilityTimestamp),
}
func crossClusterTaskInfoToThrift(info *CrossClusterTaskInfo) *sqlblobsCrossClusterTaskInfo {
return transferTaskInfoToThrift(info)
}

func crossClusterTaskInfoFromThrift(info *sqlblobs.CrossClusterTaskInfo) *CrossClusterTaskInfo {
if info == nil {
return nil
}
return &CrossClusterTaskInfo{
DomainID: info.DomainID,
WorkflowID: info.GetWorkflowID(),
RunID: info.RunID,
TaskType: info.GetTaskType(),
TargetDomainID: info.TargetDomainID,
TargetWorkflowID: info.GetTargetWorkflowID(),
TargetRunID: info.TargetRunID,
TaskList: info.GetTaskList(),
TargetChildWorkflowOnly: info.GetTargetChildWorkflowOnly(),
ScheduleID: info.GetScheduleID(),
Version: info.GetVersion(),
VisibilityTimestamp: timeFromUnixNano(info.GetVisibilityTimestampNanos()),
}
func crossClusterTaskInfoFromThrift(info *sqlblobsCrossClusterTaskInfo) *CrossClusterTaskInfo {
return transferTaskInfoFromThrift(info)
}

func timerTaskInfoToThrift(info *TimerTaskInfo) *sqlblobs.TimerTaskInfo {
Expand Down
2 changes: 1 addition & 1 deletion common/persistence/sql/sqlHistoryStore.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
"fmt"
"time"

"github.com/uber/cadence/common/persistence/persistence-utils"
persistenceutils "github.com/uber/cadence/common/persistence/persistence-utils"
"github.com/uber/cadence/common/persistence/serialization"
"github.com/uber/cadence/common/types"

Expand Down
39 changes: 24 additions & 15 deletions common/persistence/sql/sqlShardStore.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,14 @@ func (m *sqlShardStore) GetShard(
}
}

var crossClusterPQS *persistence.DataBlob
if shardInfo.GetCrossClusterProcessingQueueStates() != nil {
crossClusterPQS = &persistence.DataBlob{
Encoding: common.EncodingType(shardInfo.GetCrossClusterProcessingQueueStatesEncoding()),
Data: shardInfo.GetCrossClusterProcessingQueueStates(),
}
}

var timerPQS *persistence.DataBlob
if shardInfo.GetTimerProcessingQueueStates() != nil {
timerPQS = &persistence.DataBlob{
Expand All @@ -138,21 +146,22 @@ func (m *sqlShardStore) GetShard(
}

resp := &persistence.InternalGetShardResponse{ShardInfo: &persistence.InternalShardInfo{
ShardID: int(row.ShardID),
RangeID: row.RangeID,
Owner: shardInfo.GetOwner(),
StolenSinceRenew: int(shardInfo.GetStolenSinceRenew()),
UpdatedAt: shardInfo.GetUpdatedAt(),
ReplicationAckLevel: shardInfo.GetReplicationAckLevel(),
TransferAckLevel: shardInfo.GetTransferAckLevel(),
TimerAckLevel: shardInfo.GetTimerAckLevel(),
ClusterTransferAckLevel: shardInfo.ClusterTransferAckLevel,
ClusterTimerAckLevel: timerAckLevel,
TransferProcessingQueueStates: transferPQS,
TimerProcessingQueueStates: timerPQS,
DomainNotificationVersion: shardInfo.GetDomainNotificationVersion(),
ClusterReplicationLevel: shardInfo.ClusterReplicationLevel,
ReplicationDLQAckLevel: shardInfo.ReplicationDlqAckLevel,
ShardID: int(row.ShardID),
RangeID: row.RangeID,
Owner: shardInfo.GetOwner(),
StolenSinceRenew: int(shardInfo.GetStolenSinceRenew()),
UpdatedAt: shardInfo.GetUpdatedAt(),
ReplicationAckLevel: shardInfo.GetReplicationAckLevel(),
TransferAckLevel: shardInfo.GetTransferAckLevel(),
TimerAckLevel: shardInfo.GetTimerAckLevel(),
ClusterTransferAckLevel: shardInfo.ClusterTransferAckLevel,
ClusterTimerAckLevel: timerAckLevel,
TransferProcessingQueueStates: transferPQS,
CrossClusterProcessingQueueStates: crossClusterPQS,
TimerProcessingQueueStates: timerPQS,
DomainNotificationVersion: shardInfo.GetDomainNotificationVersion(),
ClusterReplicationLevel: shardInfo.ClusterReplicationLevel,
ReplicationDLQAckLevel: shardInfo.ReplicationDlqAckLevel,
}}

return resp, nil
Expand Down
2 changes: 1 addition & 1 deletion common/persistence/sql/sqlplugin/mysql/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ FROM transfer_tasks WHERE shard_id = ? AND task_id > ? AND task_id <= ? ORDER BY
deleteTransferTaskQuery = `DELETE FROM transfer_tasks WHERE shard_id = ? AND task_id = ?`
rangeDeleteTransferTaskQuery = `DELETE FROM transfer_tasks WHERE shard_id = ? AND task_id > ? AND task_id <= ?`

getCrossClusterTasksQuery = `SELECT target_cluster, shard_id, task_id, data, data_encoding
getCrossClusterTasksQuery = `SELECT task_id, data, data_encoding
FROM cross_cluster_tasks WHERE target_cluster = ? AND shard_id = ? AND task_id > ? AND task_id <= ? ORDER BY target_cluster, shard_id, task_id`

createCrossClusterTasksQuery = `INSERT INTO cross_cluster_tasks(target_cluster, shard_id, task_id, data, data_encoding)
Expand Down
2 changes: 1 addition & 1 deletion common/persistence/sql/sqlplugin/postgres/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ workflow_id = :workflow_id
deleteTransferTaskQuery = `DELETE FROM transfer_tasks WHERE shard_id = $1 AND task_id = $2`
rangeDeleteTransferTaskQuery = `DELETE FROM transfer_tasks WHERE shard_id = $1 AND task_id > $2 AND task_id <= $3`

getCrossClusterTasksQuery = `SELECT target_cluster, shard_id, task_id, data, data_encoding
getCrossClusterTasksQuery = `SELECT task_id, data, data_encoding
FROM cross_cluster_tasks WHERE target_cluster = $1 AND shard_id = $2 AND task_id > $3 AND task_id <= $4 ORDER BY target_cluster, shard_id, task_id`

createCrossClusterTasksQuery = `INSERT INTO cross_cluster_tasks(target_cluster, shard_id, task_id, data, data_encoding)
Expand Down
2 changes: 1 addition & 1 deletion idls
Submodule idls updated 1 files
+17 −0 thrift/sqlblobs.thrift
2 changes: 1 addition & 1 deletion schema/mysql/v57/cadence/versioned/v0.5/manifest.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"CurrVersion": "0.5",
"MinCompatibleVersion": "0.4",
"MinCompatibleVersion": "0.5",
"Description": "create cross cluster table",
"SchemaUpdateCqlFiles": [
"cross_cluster_table.sql"
Expand Down
2 changes: 1 addition & 1 deletion schema/postgres/cadence/versioned/v0.4/manifest.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"CurrVersion": "0.4",
"MinCompatibleVersion": "0.3",
"MinCompatibleVersion": "0.4",
"Description": "create cross cluster table",
"SchemaUpdateCqlFiles": [
"cross_cluster_table.sql"
Expand Down
2 changes: 1 addition & 1 deletion service/frontend/workflowHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ import (
"github.com/uber/cadence/common/messaging"
"github.com/uber/cadence/common/metrics"
"github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/persistence/persistence-utils"
persistenceutils "github.com/uber/cadence/common/persistence/persistence-utils"
"github.com/uber/cadence/common/quotas"
"github.com/uber/cadence/common/resource"
"github.com/uber/cadence/common/types"
Expand Down
Loading

0 comments on commit ae53312

Please sign in to comment.