From ae5331223f006330bed880208cd7b8010c271291 Mon Sep 17 00:00:00 2001 From: Ender Demirkaya Date: Mon, 14 Jun 2021 19:59:26 -0700 Subject: [PATCH] comments --- .gen/go/sqlblobs/sqlblobs.go | 638 +----------------- common/archiver/historyIterator.go | 2 +- .../cassandra/cassandraHistoryPersistence.go | 2 +- common/persistence/dataManagerInterfaces.go | 48 +- common/persistence/serialization/getters.go | 112 +-- .../persistence/serialization/interfaces.go | 7 +- .../serialization/thrift_decoder.go | 2 +- .../serialization/thrift_mapper.go | 40 +- common/persistence/sql/sqlHistoryStore.go | 2 +- common/persistence/sql/sqlShardStore.go | 39 +- .../sql/sqlplugin/mysql/execution.go | 2 +- .../sql/sqlplugin/postgres/execution.go | 2 +- idls | 2 +- .../v57/cadence/versioned/v0.5/manifest.json | 2 +- .../cadence/versioned/v0.4/manifest.json | 2 +- service/frontend/workflowHandler.go | 2 +- service/history/execution/state_rebuilder.go | 2 +- .../history/replication/task_ack_manager.go | 2 +- service/history/reset/resetter.go | 2 +- service/history/task/cross_cluster_task.go | 2 +- 20 files changed, 80 insertions(+), 832 deletions(-) diff --git a/.gen/go/sqlblobs/sqlblobs.go b/.gen/go/sqlblobs/sqlblobs.go index 2212e34ef26..54538d191da 100644 --- a/.gen/go/sqlblobs/sqlblobs.go +++ b/.gen/go/sqlblobs/sqlblobs.go @@ -2269,630 +2269,6 @@ func (v *ChildExecutionInfo) IsSetParentClosePolicy() bool { return v != nil && v.ParentClosePolicy != nil } -type CrossClusterTaskInfo struct { - DomainID []byte `json:"domainID,omitempty"` - WorkflowID *string `json:"workflowID,omitempty"` - RunID []byte `json:"runID,omitempty"` - TaskType *int16 `json:"taskType,omitempty"` - TargetDomainID []byte `json:"targetDomainID,omitempty"` - TargetWorkflowID *string `json:"targetWorkflowID,omitempty"` - TargetRunID []byte `json:"targetRunID,omitempty"` - TaskList *string `json:"taskList,omitempty"` - TargetChildWorkflowOnly *bool `json:"targetChildWorkflowOnly,omitempty"` - ScheduleID *int64 `json:"scheduleID,omitempty"` - Version *int64 `json:"version,omitempty"` - VisibilityTimestampNanos *int64 `json:"visibilityTimestampNanos,omitempty"` -} - -// ToWire translates a CrossClusterTaskInfo struct into a Thrift-level intermediate -// representation. This intermediate representation may be serialized -// into bytes using a ThriftRW protocol implementation. -// -// An error is returned if the struct or any of its fields failed to -// validate. -// -// x, err := v.ToWire() -// if err != nil { -// return err -// } -// -// if err := binaryProtocol.Encode(x, writer); err != nil { -// return err -// } -func (v *CrossClusterTaskInfo) ToWire() (wire.Value, error) { - var ( - fields [12]wire.Field - i int = 0 - w wire.Value - err error - ) - - if v.DomainID != nil { - w, err = wire.NewValueBinary(v.DomainID), error(nil) - if err != nil { - return w, err - } - fields[i] = wire.Field{ID: 10, Value: w} - i++ - } - if v.WorkflowID != nil { - w, err = wire.NewValueString(*(v.WorkflowID)), error(nil) - if err != nil { - return w, err - } - fields[i] = wire.Field{ID: 12, Value: w} - i++ - } - if v.RunID != nil { - w, err = wire.NewValueBinary(v.RunID), error(nil) - if err != nil { - return w, err - } - fields[i] = wire.Field{ID: 14, Value: w} - i++ - } - if v.TaskType != nil { - w, err = wire.NewValueI16(*(v.TaskType)), error(nil) - if err != nil { - return w, err - } - fields[i] = wire.Field{ID: 16, Value: w} - i++ - } - if v.TargetDomainID != nil { - w, err = wire.NewValueBinary(v.TargetDomainID), error(nil) - if err != nil { - return w, err - } - fields[i] = wire.Field{ID: 18, Value: w} - i++ - } - if v.TargetWorkflowID != nil { - w, err = wire.NewValueString(*(v.TargetWorkflowID)), error(nil) - if err != nil { - return w, err - } - fields[i] = wire.Field{ID: 20, Value: w} - i++ - } - if v.TargetRunID != nil { - w, err = wire.NewValueBinary(v.TargetRunID), error(nil) - if err != nil { - return w, err - } - fields[i] = wire.Field{ID: 22, Value: w} - i++ - } - if v.TaskList != nil { - w, err = wire.NewValueString(*(v.TaskList)), error(nil) - if err != nil { - return w, err - } - fields[i] = wire.Field{ID: 24, Value: w} - i++ - } - if v.TargetChildWorkflowOnly != nil { - w, err = wire.NewValueBool(*(v.TargetChildWorkflowOnly)), error(nil) - if err != nil { - return w, err - } - fields[i] = wire.Field{ID: 26, Value: w} - i++ - } - if v.ScheduleID != nil { - w, err = wire.NewValueI64(*(v.ScheduleID)), error(nil) - if err != nil { - return w, err - } - fields[i] = wire.Field{ID: 28, Value: w} - i++ - } - if v.Version != nil { - w, err = wire.NewValueI64(*(v.Version)), error(nil) - if err != nil { - return w, err - } - fields[i] = wire.Field{ID: 30, Value: w} - i++ - } - if v.VisibilityTimestampNanos != nil { - w, err = wire.NewValueI64(*(v.VisibilityTimestampNanos)), error(nil) - if err != nil { - return w, err - } - fields[i] = wire.Field{ID: 32, Value: w} - i++ - } - - return wire.NewValueStruct(wire.Struct{Fields: fields[:i]}), nil -} - -// FromWire deserializes a CrossClusterTaskInfo struct from its Thrift-level -// representation. The Thrift-level representation may be obtained -// from a ThriftRW protocol implementation. -// -// An error is returned if we were unable to build a CrossClusterTaskInfo struct -// from the provided intermediate representation. -// -// x, err := binaryProtocol.Decode(reader, wire.TStruct) -// if err != nil { -// return nil, err -// } -// -// var v CrossClusterTaskInfo -// if err := v.FromWire(x); err != nil { -// return nil, err -// } -// return &v, nil -func (v *CrossClusterTaskInfo) FromWire(w wire.Value) error { - var err error - - for _, field := range w.GetStruct().Fields { - switch field.ID { - case 10: - if field.Value.Type() == wire.TBinary { - v.DomainID, err = field.Value.GetBinary(), error(nil) - if err != nil { - return err - } - - } - case 12: - if field.Value.Type() == wire.TBinary { - var x string - x, err = field.Value.GetString(), error(nil) - v.WorkflowID = &x - if err != nil { - return err - } - - } - case 14: - if field.Value.Type() == wire.TBinary { - v.RunID, err = field.Value.GetBinary(), error(nil) - if err != nil { - return err - } - - } - case 16: - if field.Value.Type() == wire.TI16 { - var x int16 - x, err = field.Value.GetI16(), error(nil) - v.TaskType = &x - if err != nil { - return err - } - - } - case 18: - if field.Value.Type() == wire.TBinary { - v.TargetDomainID, err = field.Value.GetBinary(), error(nil) - if err != nil { - return err - } - - } - case 20: - if field.Value.Type() == wire.TBinary { - var x string - x, err = field.Value.GetString(), error(nil) - v.TargetWorkflowID = &x - if err != nil { - return err - } - - } - case 22: - if field.Value.Type() == wire.TBinary { - v.TargetRunID, err = field.Value.GetBinary(), error(nil) - if err != nil { - return err - } - - } - case 24: - if field.Value.Type() == wire.TBinary { - var x string - x, err = field.Value.GetString(), error(nil) - v.TaskList = &x - if err != nil { - return err - } - - } - case 26: - if field.Value.Type() == wire.TBool { - var x bool - x, err = field.Value.GetBool(), error(nil) - v.TargetChildWorkflowOnly = &x - if err != nil { - return err - } - - } - case 28: - if field.Value.Type() == wire.TI64 { - var x int64 - x, err = field.Value.GetI64(), error(nil) - v.ScheduleID = &x - if err != nil { - return err - } - - } - case 30: - if field.Value.Type() == wire.TI64 { - var x int64 - x, err = field.Value.GetI64(), error(nil) - v.Version = &x - if err != nil { - return err - } - - } - case 32: - if field.Value.Type() == wire.TI64 { - var x int64 - x, err = field.Value.GetI64(), error(nil) - v.VisibilityTimestampNanos = &x - if err != nil { - return err - } - - } - } - } - - return nil -} - -// String returns a readable string representation of a CrossClusterTaskInfo -// struct. -func (v *CrossClusterTaskInfo) String() string { - if v == nil { - return "" - } - - var fields [12]string - i := 0 - if v.DomainID != nil { - fields[i] = fmt.Sprintf("DomainID: %v", v.DomainID) - i++ - } - if v.WorkflowID != nil { - fields[i] = fmt.Sprintf("WorkflowID: %v", *(v.WorkflowID)) - i++ - } - if v.RunID != nil { - fields[i] = fmt.Sprintf("RunID: %v", v.RunID) - i++ - } - if v.TaskType != nil { - fields[i] = fmt.Sprintf("TaskType: %v", *(v.TaskType)) - i++ - } - if v.TargetDomainID != nil { - fields[i] = fmt.Sprintf("TargetDomainID: %v", v.TargetDomainID) - i++ - } - if v.TargetWorkflowID != nil { - fields[i] = fmt.Sprintf("TargetWorkflowID: %v", *(v.TargetWorkflowID)) - i++ - } - if v.TargetRunID != nil { - fields[i] = fmt.Sprintf("TargetRunID: %v", v.TargetRunID) - i++ - } - if v.TaskList != nil { - fields[i] = fmt.Sprintf("TaskList: %v", *(v.TaskList)) - i++ - } - if v.TargetChildWorkflowOnly != nil { - fields[i] = fmt.Sprintf("TargetChildWorkflowOnly: %v", *(v.TargetChildWorkflowOnly)) - i++ - } - if v.ScheduleID != nil { - fields[i] = fmt.Sprintf("ScheduleID: %v", *(v.ScheduleID)) - i++ - } - if v.Version != nil { - fields[i] = fmt.Sprintf("Version: %v", *(v.Version)) - i++ - } - if v.VisibilityTimestampNanos != nil { - fields[i] = fmt.Sprintf("VisibilityTimestampNanos: %v", *(v.VisibilityTimestampNanos)) - i++ - } - - return fmt.Sprintf("CrossClusterTaskInfo{%v}", strings.Join(fields[:i], ", ")) -} - -func _I16_EqualsPtr(lhs, rhs *int16) bool { - if lhs != nil && rhs != nil { - - x := *lhs - y := *rhs - return (x == y) - } - return lhs == nil && rhs == nil -} - -// Equals returns true if all the fields of this CrossClusterTaskInfo match the -// provided CrossClusterTaskInfo. -// -// This function performs a deep comparison. -func (v *CrossClusterTaskInfo) Equals(rhs *CrossClusterTaskInfo) bool { - if v == nil { - return rhs == nil - } else if rhs == nil { - return false - } - if !((v.DomainID == nil && rhs.DomainID == nil) || (v.DomainID != nil && rhs.DomainID != nil && bytes.Equal(v.DomainID, rhs.DomainID))) { - return false - } - if !_String_EqualsPtr(v.WorkflowID, rhs.WorkflowID) { - return false - } - if !((v.RunID == nil && rhs.RunID == nil) || (v.RunID != nil && rhs.RunID != nil && bytes.Equal(v.RunID, rhs.RunID))) { - return false - } - if !_I16_EqualsPtr(v.TaskType, rhs.TaskType) { - return false - } - if !((v.TargetDomainID == nil && rhs.TargetDomainID == nil) || (v.TargetDomainID != nil && rhs.TargetDomainID != nil && bytes.Equal(v.TargetDomainID, rhs.TargetDomainID))) { - return false - } - if !_String_EqualsPtr(v.TargetWorkflowID, rhs.TargetWorkflowID) { - return false - } - if !((v.TargetRunID == nil && rhs.TargetRunID == nil) || (v.TargetRunID != nil && rhs.TargetRunID != nil && bytes.Equal(v.TargetRunID, rhs.TargetRunID))) { - return false - } - if !_String_EqualsPtr(v.TaskList, rhs.TaskList) { - return false - } - if !_Bool_EqualsPtr(v.TargetChildWorkflowOnly, rhs.TargetChildWorkflowOnly) { - return false - } - if !_I64_EqualsPtr(v.ScheduleID, rhs.ScheduleID) { - return false - } - if !_I64_EqualsPtr(v.Version, rhs.Version) { - return false - } - if !_I64_EqualsPtr(v.VisibilityTimestampNanos, rhs.VisibilityTimestampNanos) { - return false - } - - return true -} - -// MarshalLogObject implements zapcore.ObjectMarshaler, enabling -// fast logging of CrossClusterTaskInfo. -func (v *CrossClusterTaskInfo) MarshalLogObject(enc zapcore.ObjectEncoder) (err error) { - if v == nil { - return nil - } - if v.DomainID != nil { - enc.AddString("domainID", base64.StdEncoding.EncodeToString(v.DomainID)) - } - if v.WorkflowID != nil { - enc.AddString("workflowID", *v.WorkflowID) - } - if v.RunID != nil { - enc.AddString("runID", base64.StdEncoding.EncodeToString(v.RunID)) - } - if v.TaskType != nil { - enc.AddInt16("taskType", *v.TaskType) - } - if v.TargetDomainID != nil { - enc.AddString("targetDomainID", base64.StdEncoding.EncodeToString(v.TargetDomainID)) - } - if v.TargetWorkflowID != nil { - enc.AddString("targetWorkflowID", *v.TargetWorkflowID) - } - if v.TargetRunID != nil { - enc.AddString("targetRunID", base64.StdEncoding.EncodeToString(v.TargetRunID)) - } - if v.TaskList != nil { - enc.AddString("taskList", *v.TaskList) - } - if v.TargetChildWorkflowOnly != nil { - enc.AddBool("targetChildWorkflowOnly", *v.TargetChildWorkflowOnly) - } - if v.ScheduleID != nil { - enc.AddInt64("scheduleID", *v.ScheduleID) - } - if v.Version != nil { - enc.AddInt64("version", *v.Version) - } - if v.VisibilityTimestampNanos != nil { - enc.AddInt64("visibilityTimestampNanos", *v.VisibilityTimestampNanos) - } - return err -} - -// GetDomainID returns the value of DomainID if it is set or its -// zero value if it is unset. -func (v *CrossClusterTaskInfo) GetDomainID() (o []byte) { - if v != nil && v.DomainID != nil { - return v.DomainID - } - - return -} - -// IsSetDomainID returns true if DomainID is not nil. -func (v *CrossClusterTaskInfo) IsSetDomainID() bool { - return v != nil && v.DomainID != nil -} - -// GetWorkflowID returns the value of WorkflowID if it is set or its -// zero value if it is unset. -func (v *CrossClusterTaskInfo) GetWorkflowID() (o string) { - if v != nil && v.WorkflowID != nil { - return *v.WorkflowID - } - - return -} - -// IsSetWorkflowID returns true if WorkflowID is not nil. -func (v *CrossClusterTaskInfo) IsSetWorkflowID() bool { - return v != nil && v.WorkflowID != nil -} - -// GetRunID returns the value of RunID if it is set or its -// zero value if it is unset. -func (v *CrossClusterTaskInfo) GetRunID() (o []byte) { - if v != nil && v.RunID != nil { - return v.RunID - } - - return -} - -// IsSetRunID returns true if RunID is not nil. -func (v *CrossClusterTaskInfo) IsSetRunID() bool { - return v != nil && v.RunID != nil -} - -// GetTaskType returns the value of TaskType if it is set or its -// zero value if it is unset. -func (v *CrossClusterTaskInfo) GetTaskType() (o int16) { - if v != nil && v.TaskType != nil { - return *v.TaskType - } - - return -} - -// IsSetTaskType returns true if TaskType is not nil. -func (v *CrossClusterTaskInfo) IsSetTaskType() bool { - return v != nil && v.TaskType != nil -} - -// GetTargetDomainID returns the value of TargetDomainID if it is set or its -// zero value if it is unset. -func (v *CrossClusterTaskInfo) GetTargetDomainID() (o []byte) { - if v != nil && v.TargetDomainID != nil { - return v.TargetDomainID - } - - return -} - -// IsSetTargetDomainID returns true if TargetDomainID is not nil. -func (v *CrossClusterTaskInfo) IsSetTargetDomainID() bool { - return v != nil && v.TargetDomainID != nil -} - -// GetTargetWorkflowID returns the value of TargetWorkflowID if it is set or its -// zero value if it is unset. -func (v *CrossClusterTaskInfo) GetTargetWorkflowID() (o string) { - if v != nil && v.TargetWorkflowID != nil { - return *v.TargetWorkflowID - } - - return -} - -// IsSetTargetWorkflowID returns true if TargetWorkflowID is not nil. -func (v *CrossClusterTaskInfo) IsSetTargetWorkflowID() bool { - return v != nil && v.TargetWorkflowID != nil -} - -// GetTargetRunID returns the value of TargetRunID if it is set or its -// zero value if it is unset. -func (v *CrossClusterTaskInfo) GetTargetRunID() (o []byte) { - if v != nil && v.TargetRunID != nil { - return v.TargetRunID - } - - return -} - -// IsSetTargetRunID returns true if TargetRunID is not nil. -func (v *CrossClusterTaskInfo) IsSetTargetRunID() bool { - return v != nil && v.TargetRunID != nil -} - -// GetTaskList returns the value of TaskList if it is set or its -// zero value if it is unset. -func (v *CrossClusterTaskInfo) GetTaskList() (o string) { - if v != nil && v.TaskList != nil { - return *v.TaskList - } - - return -} - -// IsSetTaskList returns true if TaskList is not nil. -func (v *CrossClusterTaskInfo) IsSetTaskList() bool { - return v != nil && v.TaskList != nil -} - -// GetTargetChildWorkflowOnly returns the value of TargetChildWorkflowOnly if it is set or its -// zero value if it is unset. -func (v *CrossClusterTaskInfo) GetTargetChildWorkflowOnly() (o bool) { - if v != nil && v.TargetChildWorkflowOnly != nil { - return *v.TargetChildWorkflowOnly - } - - return -} - -// IsSetTargetChildWorkflowOnly returns true if TargetChildWorkflowOnly is not nil. -func (v *CrossClusterTaskInfo) IsSetTargetChildWorkflowOnly() bool { - return v != nil && v.TargetChildWorkflowOnly != nil -} - -// GetScheduleID returns the value of ScheduleID if it is set or its -// zero value if it is unset. -func (v *CrossClusterTaskInfo) GetScheduleID() (o int64) { - if v != nil && v.ScheduleID != nil { - return *v.ScheduleID - } - - return -} - -// IsSetScheduleID returns true if ScheduleID is not nil. -func (v *CrossClusterTaskInfo) IsSetScheduleID() bool { - return v != nil && v.ScheduleID != nil -} - -// GetVersion returns the value of Version if it is set or its -// zero value if it is unset. -func (v *CrossClusterTaskInfo) GetVersion() (o int64) { - if v != nil && v.Version != nil { - return *v.Version - } - - return -} - -// IsSetVersion returns true if Version is not nil. -func (v *CrossClusterTaskInfo) IsSetVersion() bool { - return v != nil && v.Version != nil -} - -// GetVisibilityTimestampNanos returns the value of VisibilityTimestampNanos if it is set or its -// zero value if it is unset. -func (v *CrossClusterTaskInfo) GetVisibilityTimestampNanos() (o int64) { - if v != nil && v.VisibilityTimestampNanos != nil { - return *v.VisibilityTimestampNanos - } - - return -} - -// IsSetVisibilityTimestampNanos returns true if VisibilityTimestampNanos is not nil. -func (v *CrossClusterTaskInfo) IsSetVisibilityTimestampNanos() bool { - return v != nil && v.VisibilityTimestampNanos != nil -} - type DomainInfo struct { Name *string `json:"name,omitempty"` Description *string `json:"description,omitempty"` @@ -3573,6 +2949,16 @@ func (v *DomainInfo) String() string { return fmt.Sprintf("DomainInfo{%v}", strings.Join(fields[:i], ", ")) } +func _I16_EqualsPtr(lhs, rhs *int16) bool { + if lhs != nil && rhs != nil { + + x := *lhs + y := *rhs + return (x == y) + } + return lhs == nil && rhs == nil +} + func _Map_String_String_Equals(lhs, rhs map[string]string) bool { if len(lhs) != len(rhs) { return false @@ -11307,11 +10693,11 @@ var ThriftModule = &thriftreflect.ThriftModule{ Name: "sqlblobs", Package: "github.com/uber/cadence/.gen/go/sqlblobs", FilePath: "sqlblobs.thrift", - SHA1: "94de148de5d4af3921c40c7a30c305e17a3bfdc5", + SHA1: "cc9d9ebd900b26264bd1f303605fdd1d3eabb6a4", Includes: []*thriftreflect.ThriftModule{ shared.ThriftModule, }, Raw: rawIDL, } -const rawIDL = "// Copyright (c) 2017 Uber Technologies, Inc.\n//\n// Permission is hereby granted, free of charge, to any person obtaining a copy\n// of this software and associated documentation files (the \"Software\"), to deal\n// in the Software without restriction, including without limitation the rights\n// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell\n// copies of the Software, and to permit persons to whom the Software is\n// furnished to do so, subject to the following conditions:\n//\n// The above copyright notice and this permission notice shall be included in\n// all copies or substantial portions of the Software.\n//\n// THE SOFTWARE IS PROVIDED \"AS IS\", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR\n// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,\n// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE\n// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER\n// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,\n// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN\n// THE SOFTWARE.\n\nnamespace java com.uber.cadence.sqlblobs\n\ninclude \"shared.thrift\"\n\nstruct ShardInfo {\n 10: optional i32 stolenSinceRenew\n 12: optional i64 (js.type = \"Long\") updatedAtNanos\n 14: optional i64 (js.type = \"Long\") replicationAckLevel\n 16: optional i64 (js.type = \"Long\") transferAckLevel\n 18: optional i64 (js.type = \"Long\") timerAckLevelNanos\n 24: optional i64 (js.type = \"Long\") domainNotificationVersion\n 34: optional map clusterTransferAckLevel\n 36: optional map clusterTimerAckLevel\n 38: optional string owner\n 40: optional map clusterReplicationLevel\n 42: optional binary pendingFailoverMarkers\n 44: optional string pendingFailoverMarkersEncoding\n 46: optional map replicationDlqAckLevel\n 50: optional binary transferProcessingQueueStates\n 51: optional string transferProcessingQueueStatesEncoding\n 55: optional binary timerProcessingQueueStates\n 56: optional string timerProcessingQueueStatesEncoding\n 60: optional binary crossClusterProcessingQueueStates\n 61: optional string crossClusterProcessingQueueStatesEncoding\n}\n\nstruct DomainInfo {\n 10: optional string name\n 12: optional string description\n 14: optional string owner\n 16: optional i32 status\n 18: optional i16 retentionDays\n 20: optional bool emitMetric\n 22: optional string archivalBucket\n 24: optional i16 archivalStatus\n 26: optional i64 (js.type = \"Long\") configVersion\n 28: optional i64 (js.type = \"Long\") notificationVersion\n 30: optional i64 (js.type = \"Long\") failoverNotificationVersion\n 32: optional i64 (js.type = \"Long\") failoverVersion\n 34: optional string activeClusterName\n 36: optional list clusters\n 38: optional map data\n 39: optional binary badBinaries\n 40: optional string badBinariesEncoding\n 42: optional i16 historyArchivalStatus\n 44: optional string historyArchivalURI\n 46: optional i16 visibilityArchivalStatus\n 48: optional string visibilityArchivalURI\n 50: optional i64 (js.type = \"Long\") failoverEndTime\n 52: optional i64 (js.type = \"Long\") previousFailoverVersion\n 54: optional i64 (js.type = \"Long\") lastUpdatedTime\n}\n\nstruct HistoryTreeInfo {\n 10: optional i64 (js.type = \"Long\") createdTimeNanos // For fork operation to prevent race condition of leaking event data when forking branches fail. Also can be used for clean up leaked data\n 12: optional list ancestors\n 14: optional string info // For lookup back to workflow during debugging, also background cleanup when fork operation cannot finish self cleanup due to crash.\n}\n\nstruct WorkflowExecutionInfo {\n 10: optional binary parentDomainID\n 12: optional string parentWorkflowID\n 14: optional binary parentRunID\n 16: optional i64 (js.type = \"Long\") initiatedID\n 18: optional i64 (js.type = \"Long\") completionEventBatchID\n 20: optional binary completionEvent\n 22: optional string completionEventEncoding\n 24: optional string taskList\n 26: optional string workflowTypeName\n 28: optional i32 workflowTimeoutSeconds\n 30: optional i32 decisionTaskTimeoutSeconds\n 32: optional binary executionContext\n 34: optional i32 state\n 36: optional i32 closeStatus\n 38: optional i64 (js.type = \"Long\") startVersion\n 44: optional i64 (js.type = \"Long\") lastWriteEventID\n 48: optional i64 (js.type = \"Long\") lastEventTaskID\n 50: optional i64 (js.type = \"Long\") lastFirstEventID\n 52: optional i64 (js.type = \"Long\") lastProcessedEvent\n 54: optional i64 (js.type = \"Long\") startTimeNanos\n 56: optional i64 (js.type = \"Long\") lastUpdatedTimeNanos\n 58: optional i64 (js.type = \"Long\") decisionVersion\n 60: optional i64 (js.type = \"Long\") decisionScheduleID\n 62: optional i64 (js.type = \"Long\") decisionStartedID\n 64: optional i32 decisionTimeout\n 66: optional i64 (js.type = \"Long\") decisionAttempt\n 68: optional i64 (js.type = \"Long\") decisionStartedTimestampNanos\n 69: optional i64 (js.type = \"Long\") decisionScheduledTimestampNanos\n 70: optional bool cancelRequested\n 71: optional i64 (js.type = \"Long\") decisionOriginalScheduledTimestampNanos\n 72: optional string createRequestID\n 74: optional string decisionRequestID\n 76: optional string cancelRequestID\n 78: optional string stickyTaskList\n 80: optional i64 (js.type = \"Long\") stickyScheduleToStartTimeout\n 82: optional i64 (js.type = \"Long\") retryAttempt\n 84: optional i32 retryInitialIntervalSeconds\n 86: optional i32 retryMaximumIntervalSeconds\n 88: optional i32 retryMaximumAttempts\n 90: optional i32 retryExpirationSeconds\n 92: optional double retryBackoffCoefficient\n 94: optional i64 (js.type = \"Long\") retryExpirationTimeNanos\n 96: optional list retryNonRetryableErrors\n 98: optional bool hasRetryPolicy\n 100: optional string cronSchedule\n 102: optional i32 eventStoreVersion\n 104: optional binary eventBranchToken\n 106: optional i64 (js.type = \"Long\") signalCount\n 108: optional i64 (js.type = \"Long\") historySize\n 110: optional string clientLibraryVersion\n 112: optional string clientFeatureVersion\n 114: optional string clientImpl\n 115: optional binary autoResetPoints\n 116: optional string autoResetPointsEncoding\n 118: optional map searchAttributes\n 120: optional map memo\n 122: optional binary versionHistories\n 124: optional string versionHistoriesEncoding\n}\n\nstruct ActivityInfo {\n 10: optional i64 (js.type = \"Long\") version\n 12: optional i64 (js.type = \"Long\") scheduledEventBatchID\n 14: optional binary scheduledEvent\n 16: optional string scheduledEventEncoding\n 18: optional i64 (js.type = \"Long\") scheduledTimeNanos\n 20: optional i64 (js.type = \"Long\") startedID\n 22: optional binary startedEvent\n 24: optional string startedEventEncoding\n 26: optional i64 (js.type = \"Long\") startedTimeNanos\n 28: optional string activityID\n 30: optional string requestID\n 32: optional i32 scheduleToStartTimeoutSeconds\n 34: optional i32 scheduleToCloseTimeoutSeconds\n 36: optional i32 startToCloseTimeoutSeconds\n 38: optional i32 heartbeatTimeoutSeconds\n 40: optional bool cancelRequested\n 42: optional i64 (js.type = \"Long\") cancelRequestID\n 44: optional i32 timerTaskStatus\n 46: optional i32 attempt\n 48: optional string taskList\n 50: optional string startedIdentity\n 52: optional bool hasRetryPolicy\n 54: optional i32 retryInitialIntervalSeconds\n 56: optional i32 retryMaximumIntervalSeconds\n 58: optional i32 retryMaximumAttempts\n 60: optional i64 (js.type = \"Long\") retryExpirationTimeNanos\n 62: optional double retryBackoffCoefficient\n 64: optional list retryNonRetryableErrors\n 66: optional string retryLastFailureReason\n 68: optional string retryLastWorkerIdentity\n 70: optional binary retryLastFailureDetails\n}\n\nstruct ChildExecutionInfo {\n 10: optional i64 (js.type = \"Long\") version\n 12: optional i64 (js.type = \"Long\") initiatedEventBatchID\n 14: optional i64 (js.type = \"Long\") startedID\n 16: optional binary initiatedEvent\n 18: optional string initiatedEventEncoding\n 20: optional string startedWorkflowID\n 22: optional binary startedRunID\n 24: optional binary startedEvent\n 26: optional string startedEventEncoding\n 28: optional string createRequestID\n 30: optional string domainName\n 32: optional string workflowTypeName\n 35: optional i32 parentClosePolicy\n}\n\nstruct SignalInfo {\n 10: optional i64 (js.type = \"Long\") version\n 11: optional i64 (js.type = \"Long\") initiatedEventBatchID\n 12: optional string requestID\n 14: optional string name\n 16: optional binary input\n 18: optional binary control\n}\n\nstruct RequestCancelInfo {\n 10: optional i64 (js.type = \"Long\") version\n 11: optional i64 (js.type = \"Long\") initiatedEventBatchID\n 12: optional string cancelRequestID\n}\n\nstruct TimerInfo {\n 10: optional i64 (js.type = \"Long\") version\n 12: optional i64 (js.type = \"Long\") startedID\n 14: optional i64 (js.type = \"Long\") expiryTimeNanos\n // TaskID is a misleading variable, it actually serves\n // the purpose of indicating whether a timer task is\n // generated for this timer info\n 16: optional i64 (js.type = \"Long\") taskID\n}\n\nstruct TaskInfo {\n 10: optional string workflowID\n 12: optional binary runID\n 13: optional i64 (js.type = \"Long\") scheduleID\n 14: optional i64 (js.type = \"Long\") expiryTimeNanos\n 15: optional i64 (js.type = \"Long\") createdTimeNanos\n}\n\nstruct TaskListInfo {\n 10: optional i16 kind // {Normal, Sticky}\n 12: optional i64 (js.type = \"Long\") ackLevel\n 14: optional i64 (js.type = \"Long\") expiryTimeNanos\n 16: optional i64 (js.type = \"Long\") lastUpdatedNanos\n}\n\nstruct TransferTaskInfo {\n 10: optional binary domainID\n 12: optional string workflowID\n 14: optional binary runID\n 16: optional i16 taskType\n 18: optional binary targetDomainID\n 20: optional string targetWorkflowID\n 22: optional binary targetRunID\n 24: optional string taskList\n 26: optional bool targetChildWorkflowOnly\n 28: optional i64 (js.type = \"Long\") scheduleID\n 30: optional i64 (js.type = \"Long\") version\n 32: optional i64 (js.type = \"Long\") visibilityTimestampNanos\n}\n\nstruct CrossClusterTaskInfo {\n 10: optional binary domainID\n 12: optional string workflowID\n 14: optional binary runID\n 16: optional i16 taskType\n 18: optional binary targetDomainID\n 20: optional string targetWorkflowID\n 22: optional binary targetRunID\n 24: optional string taskList\n 26: optional bool targetChildWorkflowOnly\n 28: optional i64 (js.type = \"Long\") scheduleID\n 30: optional i64 (js.type = \"Long\") version\n 32: optional i64 (js.type = \"Long\") visibilityTimestampNanos\n}\n\nstruct TimerTaskInfo {\n 10: optional binary domainID\n 12: optional string workflowID\n 14: optional binary runID\n 16: optional i16 taskType\n 18: optional i16 timeoutType\n 20: optional i64 (js.type = \"Long\") version\n 22: optional i64 (js.type = \"Long\") scheduleAttempt\n 24: optional i64 (js.type = \"Long\") eventID\n}\n\nstruct ReplicationTaskInfo {\n 10: optional binary domainID\n 12: optional string workflowID\n 14: optional binary runID\n 16: optional i16 taskType\n 18: optional i64 (js.type = \"Long\") version\n 20: optional i64 (js.type = \"Long\") firstEventID\n 22: optional i64 (js.type = \"Long\") nextEventID\n 24: optional i64 (js.type = \"Long\") scheduledID\n 26: optional i32 eventStoreVersion\n 28: optional i32 newRunEventStoreVersion\n 30: optional binary branch_token\n 34: optional binary newRunBranchToken\n 38: optional i64 (js.type = \"Long\") creationTime\n}" +const rawIDL = "// Copyright (c) 2017 Uber Technologies, Inc.\n//\n// Permission is hereby granted, free of charge, to any person obtaining a copy\n// of this software and associated documentation files (the \"Software\"), to deal\n// in the Software without restriction, including without limitation the rights\n// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell\n// copies of the Software, and to permit persons to whom the Software is\n// furnished to do so, subject to the following conditions:\n//\n// The above copyright notice and this permission notice shall be included in\n// all copies or substantial portions of the Software.\n//\n// THE SOFTWARE IS PROVIDED \"AS IS\", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR\n// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,\n// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE\n// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER\n// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,\n// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN\n// THE SOFTWARE.\n\nnamespace java com.uber.cadence.sqlblobs\n\ninclude \"shared.thrift\"\n\nstruct ShardInfo {\n 10: optional i32 stolenSinceRenew\n 12: optional i64 (js.type = \"Long\") updatedAtNanos\n 14: optional i64 (js.type = \"Long\") replicationAckLevel\n 16: optional i64 (js.type = \"Long\") transferAckLevel\n 18: optional i64 (js.type = \"Long\") timerAckLevelNanos\n 24: optional i64 (js.type = \"Long\") domainNotificationVersion\n 34: optional map clusterTransferAckLevel\n 36: optional map clusterTimerAckLevel\n 38: optional string owner\n 40: optional map clusterReplicationLevel\n 42: optional binary pendingFailoverMarkers\n 44: optional string pendingFailoverMarkersEncoding\n 46: optional map replicationDlqAckLevel\n 50: optional binary transferProcessingQueueStates\n 51: optional string transferProcessingQueueStatesEncoding\n 55: optional binary timerProcessingQueueStates\n 56: optional string timerProcessingQueueStatesEncoding\n 60: optional binary crossClusterProcessingQueueStates\n 61: optional string crossClusterProcessingQueueStatesEncoding\n}\n\nstruct DomainInfo {\n 10: optional string name\n 12: optional string description\n 14: optional string owner\n 16: optional i32 status\n 18: optional i16 retentionDays\n 20: optional bool emitMetric\n 22: optional string archivalBucket\n 24: optional i16 archivalStatus\n 26: optional i64 (js.type = \"Long\") configVersion\n 28: optional i64 (js.type = \"Long\") notificationVersion\n 30: optional i64 (js.type = \"Long\") failoverNotificationVersion\n 32: optional i64 (js.type = \"Long\") failoverVersion\n 34: optional string activeClusterName\n 36: optional list clusters\n 38: optional map data\n 39: optional binary badBinaries\n 40: optional string badBinariesEncoding\n 42: optional i16 historyArchivalStatus\n 44: optional string historyArchivalURI\n 46: optional i16 visibilityArchivalStatus\n 48: optional string visibilityArchivalURI\n 50: optional i64 (js.type = \"Long\") failoverEndTime\n 52: optional i64 (js.type = \"Long\") previousFailoverVersion\n 54: optional i64 (js.type = \"Long\") lastUpdatedTime\n}\n\nstruct HistoryTreeInfo {\n 10: optional i64 (js.type = \"Long\") createdTimeNanos // For fork operation to prevent race condition of leaking event data when forking branches fail. Also can be used for clean up leaked data\n 12: optional list ancestors\n 14: optional string info // For lookup back to workflow during debugging, also background cleanup when fork operation cannot finish self cleanup due to crash.\n}\n\nstruct WorkflowExecutionInfo {\n 10: optional binary parentDomainID\n 12: optional string parentWorkflowID\n 14: optional binary parentRunID\n 16: optional i64 (js.type = \"Long\") initiatedID\n 18: optional i64 (js.type = \"Long\") completionEventBatchID\n 20: optional binary completionEvent\n 22: optional string completionEventEncoding\n 24: optional string taskList\n 26: optional string workflowTypeName\n 28: optional i32 workflowTimeoutSeconds\n 30: optional i32 decisionTaskTimeoutSeconds\n 32: optional binary executionContext\n 34: optional i32 state\n 36: optional i32 closeStatus\n 38: optional i64 (js.type = \"Long\") startVersion\n 44: optional i64 (js.type = \"Long\") lastWriteEventID\n 48: optional i64 (js.type = \"Long\") lastEventTaskID\n 50: optional i64 (js.type = \"Long\") lastFirstEventID\n 52: optional i64 (js.type = \"Long\") lastProcessedEvent\n 54: optional i64 (js.type = \"Long\") startTimeNanos\n 56: optional i64 (js.type = \"Long\") lastUpdatedTimeNanos\n 58: optional i64 (js.type = \"Long\") decisionVersion\n 60: optional i64 (js.type = \"Long\") decisionScheduleID\n 62: optional i64 (js.type = \"Long\") decisionStartedID\n 64: optional i32 decisionTimeout\n 66: optional i64 (js.type = \"Long\") decisionAttempt\n 68: optional i64 (js.type = \"Long\") decisionStartedTimestampNanos\n 69: optional i64 (js.type = \"Long\") decisionScheduledTimestampNanos\n 70: optional bool cancelRequested\n 71: optional i64 (js.type = \"Long\") decisionOriginalScheduledTimestampNanos\n 72: optional string createRequestID\n 74: optional string decisionRequestID\n 76: optional string cancelRequestID\n 78: optional string stickyTaskList\n 80: optional i64 (js.type = \"Long\") stickyScheduleToStartTimeout\n 82: optional i64 (js.type = \"Long\") retryAttempt\n 84: optional i32 retryInitialIntervalSeconds\n 86: optional i32 retryMaximumIntervalSeconds\n 88: optional i32 retryMaximumAttempts\n 90: optional i32 retryExpirationSeconds\n 92: optional double retryBackoffCoefficient\n 94: optional i64 (js.type = \"Long\") retryExpirationTimeNanos\n 96: optional list retryNonRetryableErrors\n 98: optional bool hasRetryPolicy\n 100: optional string cronSchedule\n 102: optional i32 eventStoreVersion\n 104: optional binary eventBranchToken\n 106: optional i64 (js.type = \"Long\") signalCount\n 108: optional i64 (js.type = \"Long\") historySize\n 110: optional string clientLibraryVersion\n 112: optional string clientFeatureVersion\n 114: optional string clientImpl\n 115: optional binary autoResetPoints\n 116: optional string autoResetPointsEncoding\n 118: optional map searchAttributes\n 120: optional map memo\n 122: optional binary versionHistories\n 124: optional string versionHistoriesEncoding\n}\n\nstruct ActivityInfo {\n 10: optional i64 (js.type = \"Long\") version\n 12: optional i64 (js.type = \"Long\") scheduledEventBatchID\n 14: optional binary scheduledEvent\n 16: optional string scheduledEventEncoding\n 18: optional i64 (js.type = \"Long\") scheduledTimeNanos\n 20: optional i64 (js.type = \"Long\") startedID\n 22: optional binary startedEvent\n 24: optional string startedEventEncoding\n 26: optional i64 (js.type = \"Long\") startedTimeNanos\n 28: optional string activityID\n 30: optional string requestID\n 32: optional i32 scheduleToStartTimeoutSeconds\n 34: optional i32 scheduleToCloseTimeoutSeconds\n 36: optional i32 startToCloseTimeoutSeconds\n 38: optional i32 heartbeatTimeoutSeconds\n 40: optional bool cancelRequested\n 42: optional i64 (js.type = \"Long\") cancelRequestID\n 44: optional i32 timerTaskStatus\n 46: optional i32 attempt\n 48: optional string taskList\n 50: optional string startedIdentity\n 52: optional bool hasRetryPolicy\n 54: optional i32 retryInitialIntervalSeconds\n 56: optional i32 retryMaximumIntervalSeconds\n 58: optional i32 retryMaximumAttempts\n 60: optional i64 (js.type = \"Long\") retryExpirationTimeNanos\n 62: optional double retryBackoffCoefficient\n 64: optional list retryNonRetryableErrors\n 66: optional string retryLastFailureReason\n 68: optional string retryLastWorkerIdentity\n 70: optional binary retryLastFailureDetails\n}\n\nstruct ChildExecutionInfo {\n 10: optional i64 (js.type = \"Long\") version\n 12: optional i64 (js.type = \"Long\") initiatedEventBatchID\n 14: optional i64 (js.type = \"Long\") startedID\n 16: optional binary initiatedEvent\n 18: optional string initiatedEventEncoding\n 20: optional string startedWorkflowID\n 22: optional binary startedRunID\n 24: optional binary startedEvent\n 26: optional string startedEventEncoding\n 28: optional string createRequestID\n 30: optional string domainName\n 32: optional string workflowTypeName\n 35: optional i32 parentClosePolicy\n}\n\nstruct SignalInfo {\n 10: optional i64 (js.type = \"Long\") version\n 11: optional i64 (js.type = \"Long\") initiatedEventBatchID\n 12: optional string requestID\n 14: optional string name\n 16: optional binary input\n 18: optional binary control\n}\n\nstruct RequestCancelInfo {\n 10: optional i64 (js.type = \"Long\") version\n 11: optional i64 (js.type = \"Long\") initiatedEventBatchID\n 12: optional string cancelRequestID\n}\n\nstruct TimerInfo {\n 10: optional i64 (js.type = \"Long\") version\n 12: optional i64 (js.type = \"Long\") startedID\n 14: optional i64 (js.type = \"Long\") expiryTimeNanos\n // TaskID is a misleading variable, it actually serves\n // the purpose of indicating whether a timer task is\n // generated for this timer info\n 16: optional i64 (js.type = \"Long\") taskID\n}\n\nstruct TaskInfo {\n 10: optional string workflowID\n 12: optional binary runID\n 13: optional i64 (js.type = \"Long\") scheduleID\n 14: optional i64 (js.type = \"Long\") expiryTimeNanos\n 15: optional i64 (js.type = \"Long\") createdTimeNanos\n}\n\nstruct TaskListInfo {\n 10: optional i16 kind // {Normal, Sticky}\n 12: optional i64 (js.type = \"Long\") ackLevel\n 14: optional i64 (js.type = \"Long\") expiryTimeNanos\n 16: optional i64 (js.type = \"Long\") lastUpdatedNanos\n}\n\nstruct TransferTaskInfo {\n 10: optional binary domainID\n 12: optional string workflowID\n 14: optional binary runID\n 16: optional i16 taskType\n 18: optional binary targetDomainID\n 20: optional string targetWorkflowID\n 22: optional binary targetRunID\n 24: optional string taskList\n 26: optional bool targetChildWorkflowOnly\n 28: optional i64 (js.type = \"Long\") scheduleID\n 30: optional i64 (js.type = \"Long\") version\n 32: optional i64 (js.type = \"Long\") visibilityTimestampNanos\n}\n\nstruct TimerTaskInfo {\n 10: optional binary domainID\n 12: optional string workflowID\n 14: optional binary runID\n 16: optional i16 taskType\n 18: optional i16 timeoutType\n 20: optional i64 (js.type = \"Long\") version\n 22: optional i64 (js.type = \"Long\") scheduleAttempt\n 24: optional i64 (js.type = \"Long\") eventID\n}\n\nstruct ReplicationTaskInfo {\n 10: optional binary domainID\n 12: optional string workflowID\n 14: optional binary runID\n 16: optional i16 taskType\n 18: optional i64 (js.type = \"Long\") version\n 20: optional i64 (js.type = \"Long\") firstEventID\n 22: optional i64 (js.type = \"Long\") nextEventID\n 24: optional i64 (js.type = \"Long\") scheduledID\n 26: optional i32 eventStoreVersion\n 28: optional i32 newRunEventStoreVersion\n 30: optional binary branch_token\n 34: optional binary newRunBranchToken\n 38: optional i64 (js.type = \"Long\") creationTime\n}" diff --git a/common/archiver/historyIterator.go b/common/archiver/historyIterator.go index 2620169751d..c17b8a74bf3 100644 --- a/common/archiver/historyIterator.go +++ b/common/archiver/historyIterator.go @@ -29,7 +29,7 @@ import ( "github.com/uber/cadence/common" "github.com/uber/cadence/common/persistence" - "github.com/uber/cadence/common/persistence/persistence-utils" + persistenceutils "github.com/uber/cadence/common/persistence/persistence-utils" "github.com/uber/cadence/common/types" ) diff --git a/common/persistence/cassandra/cassandraHistoryPersistence.go b/common/persistence/cassandra/cassandraHistoryPersistence.go index af3921c92c8..3c2d955bec6 100644 --- a/common/persistence/cassandra/cassandraHistoryPersistence.go +++ b/common/persistence/cassandra/cassandraHistoryPersistence.go @@ -32,7 +32,7 @@ import ( "github.com/uber/cadence/common/persistence/nosql/nosqlplugin" "github.com/uber/cadence/common/persistence/nosql/nosqlplugin/cassandra" "github.com/uber/cadence/common/persistence/nosql/nosqlplugin/cassandra/gocql" - "github.com/uber/cadence/common/persistence/persistence-utils" + persistenceutils "github.com/uber/cadence/common/persistence/persistence-utils" "github.com/uber/cadence/common/types" ) diff --git a/common/persistence/dataManagerInterfaces.go b/common/persistence/dataManagerInterfaces.go index e7575b58ba0..6cf14eda5ee 100644 --- a/common/persistence/dataManagerInterfaces.go +++ b/common/persistence/dataManagerInterfaces.go @@ -379,7 +379,10 @@ type ( } // CrossClusterTaskInfo describes a cross-cluster task - CrossClusterTaskInfo TransferTaskInfo + // Cross cluster tasks are exactly like transfer tasks so + // instead of creating another struct and duplicating the same + // logic everywhere. We reuse TransferTaskInfo + CrossClusterTaskInfo = TransferTaskInfo // ReplicationTaskInfo describes the replication task created for replication of history events ReplicationTaskInfo struct { @@ -2476,49 +2479,6 @@ func (t *TransferTaskInfo) String() string { ) } -// GetTaskID returns the task ID for cross-cluster task -func (t *CrossClusterTaskInfo) GetTaskID() int64 { - return t.TaskID -} - -// GetVersion returns the task version for cross-cluster task -func (t *CrossClusterTaskInfo) GetVersion() int64 { - return t.Version -} - -// GetTaskType returns the task type for cross-cluster task -func (t *CrossClusterTaskInfo) GetTaskType() int { - return t.TaskType -} - -// GetVisibilityTimestamp returns the task type for cross-cluster task -func (t *CrossClusterTaskInfo) GetVisibilityTimestamp() time.Time { - return t.VisibilityTimestamp -} - -// GetWorkflowID returns the workflow ID for cross-cluster task -func (t *CrossClusterTaskInfo) GetWorkflowID() string { - return t.WorkflowID -} - -// GetRunID returns the run ID for cross-cluster task -func (t *CrossClusterTaskInfo) GetRunID() string { - return t.RunID -} - -// GetDomainID returns the domain ID for cross-cluster task -func (t *CrossClusterTaskInfo) GetDomainID() string { - return t.DomainID -} - -// String returns a string representation for cross-cluster task -func (t *CrossClusterTaskInfo) String() string { - return fmt.Sprintf( - "{DomainID: %v, WorkflowID: %v, RunID: %v, TaskID: %v, TargetDomainID: %v, TargetWorkflowID %v, TargetRunID: %v, TargetChildWorkflowOnly: %v, TaskList: %v, TaskType: %v, ScheduleID: %v, Version: %v.}", - t.DomainID, t.WorkflowID, t.RunID, t.TaskID, t.TargetDomainID, t.TargetWorkflowID, t.TargetRunID, t.TargetChildWorkflowOnly, t.TaskList, t.TaskType, t.ScheduleID, t.Version, - ) -} - // GetTaskID returns the task ID for replication task func (t *ReplicationTaskInfo) GetTaskID() int64 { return t.TaskID diff --git a/common/persistence/serialization/getters.go b/common/persistence/serialization/getters.go index c5f65cdd9f2..2125f61735f 100644 --- a/common/persistence/serialization/getters.go +++ b/common/persistence/serialization/getters.go @@ -148,6 +148,22 @@ func (s *ShardInfo) GetTransferProcessingQueueStatesEncoding() (o string) { return } +// GetCrossClusterProcessingQueueStates internal sql blob getter +func (s *ShardInfo) GetCrossClusterProcessingQueueStates() (o []byte) { + if s != nil { + return s.CrossClusterProcessingQueueStates + } + return +} + +// GetCrossClusterProcessingQueueStatesEncoding internal sql blob getter +func (s *ShardInfo) GetCrossClusterProcessingQueueStatesEncoding() (o string) { + if s != nil { + return s.CrossClusterProcessingQueueStatesEncoding + } + return +} + // GetTimerProcessingQueueStates internal sql blob getter func (s *ShardInfo) GetTimerProcessingQueueStates() (o []byte) { if s != nil { @@ -1476,102 +1492,6 @@ func (t *TransferTaskInfo) GetVisibilityTimestamp() time.Time { return time.Unix(0, 0) } -// GetDomainID internal sql blob getter -func (t *CrossClusterTaskInfo) GetDomainID() (o []byte) { - if t != nil { - return t.DomainID - } - return -} - -// GetWorkflowID internal sql blob getter -func (t *CrossClusterTaskInfo) GetWorkflowID() (o string) { - if t != nil { - return t.WorkflowID - } - return -} - -// GetRunID internal sql blob getter -func (t *CrossClusterTaskInfo) GetRunID() (o []byte) { - if t != nil { - return t.RunID - } - return -} - -// GetTaskType internal sql blob getter -func (t *CrossClusterTaskInfo) GetTaskType() (o int16) { - if t != nil { - return t.TaskType - } - return -} - -// GetTargetDomainID internal sql blob getter -func (t *CrossClusterTaskInfo) GetTargetDomainID() (o []byte) { - if t != nil { - return t.TargetDomainID - } - return -} - -// GetTargetWorkflowID internal sql blob getter -func (t *CrossClusterTaskInfo) GetTargetWorkflowID() (o string) { - if t != nil { - return t.TargetWorkflowID - } - return -} - -// GetTargetRunID internal sql blob getter -func (t *CrossClusterTaskInfo) GetTargetRunID() (o []byte) { - if t != nil { - return t.TargetRunID - } - return -} - -// GetTaskList internal sql blob getter -func (t *CrossClusterTaskInfo) GetTaskList() (o string) { - if t != nil { - return t.TaskList - } - return -} - -// GetTargetChildWorkflowOnly internal sql blob getter -func (t *CrossClusterTaskInfo) GetTargetChildWorkflowOnly() (o bool) { - if t != nil { - return t.TargetChildWorkflowOnly - } - return -} - -// GetScheduleID internal sql blob getter -func (t *CrossClusterTaskInfo) GetScheduleID() (o int64) { - if t != nil { - return t.ScheduleID - } - return -} - -// GetVersion internal sql blob getter -func (t *CrossClusterTaskInfo) GetVersion() (o int64) { - if t != nil { - return t.Version - } - return -} - -// GetVisibilityTimestamp internal sql blob getter -func (t *CrossClusterTaskInfo) GetVisibilityTimestamp() time.Time { - if t != nil { - return t.VisibilityTimestamp - } - return time.Unix(0, 0) -} - // GetDomainID internal sql blob getter func (t *TimerTaskInfo) GetDomainID() (o []byte) { if t != nil && t.DomainID != nil { diff --git a/common/persistence/serialization/interfaces.go b/common/persistence/serialization/interfaces.go index 6ff86a3d4dd..2f462eeda1b 100644 --- a/common/persistence/serialization/interfaces.go +++ b/common/persistence/serialization/interfaces.go @@ -27,6 +27,7 @@ import ( "go.uber.org/thriftrw/wire" + "github.com/uber/cadence/.gen/go/sqlblobs" "github.com/uber/cadence/common" "github.com/uber/cadence/common/persistence" "github.com/uber/cadence/common/types" @@ -272,7 +273,11 @@ type ( } // CrossClusterTask blob in a serialization agnostic format - CrossClusterTaskInfo TransferTaskInfo + // Cross cluster tasks are exactly like transfer tasks so + // instead of creating another struct and duplicating the same + // logic everywhere. We reuse TransferTaskInfo + CrossClusterTaskInfo = TransferTaskInfo + sqlblobsCrossClusterTaskInfo = sqlblobs.TransferTaskInfo // TimerTaskInfo blob in a serialization agnostic format TimerTaskInfo struct { diff --git a/common/persistence/serialization/thrift_decoder.go b/common/persistence/serialization/thrift_decoder.go index af5437c64dd..b844493fc7e 100644 --- a/common/persistence/serialization/thrift_decoder.go +++ b/common/persistence/serialization/thrift_decoder.go @@ -136,7 +136,7 @@ func (d *thriftDecoder) transferTaskInfoFromBlob(data []byte) (*TransferTaskInfo } func (d *thriftDecoder) crossClusterTaskInfoFromBlob(data []byte) (*CrossClusterTaskInfo, error) { - result := &sqlblobs.CrossClusterTaskInfo{} + result := &sqlblobsCrossClusterTaskInfo{} if err := thriftRWDecode(data, result); err != nil { return nil, err } diff --git a/common/persistence/serialization/thrift_mapper.go b/common/persistence/serialization/thrift_mapper.go index 322f3eb1c9f..7f87c1aa848 100644 --- a/common/persistence/serialization/thrift_mapper.go +++ b/common/persistence/serialization/thrift_mapper.go @@ -620,44 +620,12 @@ func transferTaskInfoFromThrift(info *sqlblobs.TransferTaskInfo) *TransferTaskIn } } -func crossClusterTaskInfoToThrift(info *CrossClusterTaskInfo) *sqlblobs.CrossClusterTaskInfo { - if info == nil { - return nil - } - return &sqlblobs.CrossClusterTaskInfo{ - DomainID: info.DomainID, - WorkflowID: &info.WorkflowID, - RunID: info.RunID, - TaskType: &info.TaskType, - TargetDomainID: info.TargetDomainID, - TargetWorkflowID: &info.TargetWorkflowID, - TargetRunID: info.TargetRunID, - TaskList: &info.TaskList, - TargetChildWorkflowOnly: &info.TargetChildWorkflowOnly, - ScheduleID: &info.ScheduleID, - Version: &info.Version, - VisibilityTimestampNanos: timeToUnixNanoPtr(info.VisibilityTimestamp), - } +func crossClusterTaskInfoToThrift(info *CrossClusterTaskInfo) *sqlblobsCrossClusterTaskInfo { + return transferTaskInfoToThrift(info) } -func crossClusterTaskInfoFromThrift(info *sqlblobs.CrossClusterTaskInfo) *CrossClusterTaskInfo { - if info == nil { - return nil - } - return &CrossClusterTaskInfo{ - DomainID: info.DomainID, - WorkflowID: info.GetWorkflowID(), - RunID: info.RunID, - TaskType: info.GetTaskType(), - TargetDomainID: info.TargetDomainID, - TargetWorkflowID: info.GetTargetWorkflowID(), - TargetRunID: info.TargetRunID, - TaskList: info.GetTaskList(), - TargetChildWorkflowOnly: info.GetTargetChildWorkflowOnly(), - ScheduleID: info.GetScheduleID(), - Version: info.GetVersion(), - VisibilityTimestamp: timeFromUnixNano(info.GetVisibilityTimestampNanos()), - } +func crossClusterTaskInfoFromThrift(info *sqlblobsCrossClusterTaskInfo) *CrossClusterTaskInfo { + return transferTaskInfoFromThrift(info) } func timerTaskInfoToThrift(info *TimerTaskInfo) *sqlblobs.TimerTaskInfo { diff --git a/common/persistence/sql/sqlHistoryStore.go b/common/persistence/sql/sqlHistoryStore.go index e05ec87cacf..eb8074f53da 100644 --- a/common/persistence/sql/sqlHistoryStore.go +++ b/common/persistence/sql/sqlHistoryStore.go @@ -26,7 +26,7 @@ import ( "fmt" "time" - "github.com/uber/cadence/common/persistence/persistence-utils" + persistenceutils "github.com/uber/cadence/common/persistence/persistence-utils" "github.com/uber/cadence/common/persistence/serialization" "github.com/uber/cadence/common/types" diff --git a/common/persistence/sql/sqlShardStore.go b/common/persistence/sql/sqlShardStore.go index 44f46134dbe..0f9ad4a5e1e 100644 --- a/common/persistence/sql/sqlShardStore.go +++ b/common/persistence/sql/sqlShardStore.go @@ -129,6 +129,14 @@ func (m *sqlShardStore) GetShard( } } + var crossClusterPQS *persistence.DataBlob + if shardInfo.GetCrossClusterProcessingQueueStates() != nil { + crossClusterPQS = &persistence.DataBlob{ + Encoding: common.EncodingType(shardInfo.GetCrossClusterProcessingQueueStatesEncoding()), + Data: shardInfo.GetCrossClusterProcessingQueueStates(), + } + } + var timerPQS *persistence.DataBlob if shardInfo.GetTimerProcessingQueueStates() != nil { timerPQS = &persistence.DataBlob{ @@ -138,21 +146,22 @@ func (m *sqlShardStore) GetShard( } resp := &persistence.InternalGetShardResponse{ShardInfo: &persistence.InternalShardInfo{ - ShardID: int(row.ShardID), - RangeID: row.RangeID, - Owner: shardInfo.GetOwner(), - StolenSinceRenew: int(shardInfo.GetStolenSinceRenew()), - UpdatedAt: shardInfo.GetUpdatedAt(), - ReplicationAckLevel: shardInfo.GetReplicationAckLevel(), - TransferAckLevel: shardInfo.GetTransferAckLevel(), - TimerAckLevel: shardInfo.GetTimerAckLevel(), - ClusterTransferAckLevel: shardInfo.ClusterTransferAckLevel, - ClusterTimerAckLevel: timerAckLevel, - TransferProcessingQueueStates: transferPQS, - TimerProcessingQueueStates: timerPQS, - DomainNotificationVersion: shardInfo.GetDomainNotificationVersion(), - ClusterReplicationLevel: shardInfo.ClusterReplicationLevel, - ReplicationDLQAckLevel: shardInfo.ReplicationDlqAckLevel, + ShardID: int(row.ShardID), + RangeID: row.RangeID, + Owner: shardInfo.GetOwner(), + StolenSinceRenew: int(shardInfo.GetStolenSinceRenew()), + UpdatedAt: shardInfo.GetUpdatedAt(), + ReplicationAckLevel: shardInfo.GetReplicationAckLevel(), + TransferAckLevel: shardInfo.GetTransferAckLevel(), + TimerAckLevel: shardInfo.GetTimerAckLevel(), + ClusterTransferAckLevel: shardInfo.ClusterTransferAckLevel, + ClusterTimerAckLevel: timerAckLevel, + TransferProcessingQueueStates: transferPQS, + CrossClusterProcessingQueueStates: crossClusterPQS, + TimerProcessingQueueStates: timerPQS, + DomainNotificationVersion: shardInfo.GetDomainNotificationVersion(), + ClusterReplicationLevel: shardInfo.ClusterReplicationLevel, + ReplicationDLQAckLevel: shardInfo.ReplicationDlqAckLevel, }} return resp, nil diff --git a/common/persistence/sql/sqlplugin/mysql/execution.go b/common/persistence/sql/sqlplugin/mysql/execution.go index cc00c82d564..6be4c385549 100644 --- a/common/persistence/sql/sqlplugin/mysql/execution.go +++ b/common/persistence/sql/sqlplugin/mysql/execution.go @@ -92,7 +92,7 @@ FROM transfer_tasks WHERE shard_id = ? AND task_id > ? AND task_id <= ? ORDER BY deleteTransferTaskQuery = `DELETE FROM transfer_tasks WHERE shard_id = ? AND task_id = ?` rangeDeleteTransferTaskQuery = `DELETE FROM transfer_tasks WHERE shard_id = ? AND task_id > ? AND task_id <= ?` - getCrossClusterTasksQuery = `SELECT target_cluster, shard_id, task_id, data, data_encoding + getCrossClusterTasksQuery = `SELECT task_id, data, data_encoding FROM cross_cluster_tasks WHERE target_cluster = ? AND shard_id = ? AND task_id > ? AND task_id <= ? ORDER BY target_cluster, shard_id, task_id` createCrossClusterTasksQuery = `INSERT INTO cross_cluster_tasks(target_cluster, shard_id, task_id, data, data_encoding) diff --git a/common/persistence/sql/sqlplugin/postgres/execution.go b/common/persistence/sql/sqlplugin/postgres/execution.go index d1d744d2115..814a70db98f 100644 --- a/common/persistence/sql/sqlplugin/postgres/execution.go +++ b/common/persistence/sql/sqlplugin/postgres/execution.go @@ -92,7 +92,7 @@ workflow_id = :workflow_id deleteTransferTaskQuery = `DELETE FROM transfer_tasks WHERE shard_id = $1 AND task_id = $2` rangeDeleteTransferTaskQuery = `DELETE FROM transfer_tasks WHERE shard_id = $1 AND task_id > $2 AND task_id <= $3` - getCrossClusterTasksQuery = `SELECT target_cluster, shard_id, task_id, data, data_encoding + getCrossClusterTasksQuery = `SELECT task_id, data, data_encoding FROM cross_cluster_tasks WHERE target_cluster = $1 AND shard_id = $2 AND task_id > $3 AND task_id <= $4 ORDER BY target_cluster, shard_id, task_id` createCrossClusterTasksQuery = `INSERT INTO cross_cluster_tasks(target_cluster, shard_id, task_id, data, data_encoding) diff --git a/idls b/idls index 7d2d225f313..a9b1b32b74d 160000 --- a/idls +++ b/idls @@ -1 +1 @@ -Subproject commit 7d2d225f3137d3105243ee5021b4d44565735d8a +Subproject commit a9b1b32b74d43427ad650b2edfbed8a031f59ab8 diff --git a/schema/mysql/v57/cadence/versioned/v0.5/manifest.json b/schema/mysql/v57/cadence/versioned/v0.5/manifest.json index 5581e09f58f..b1a206b5c2d 100644 --- a/schema/mysql/v57/cadence/versioned/v0.5/manifest.json +++ b/schema/mysql/v57/cadence/versioned/v0.5/manifest.json @@ -1,6 +1,6 @@ { "CurrVersion": "0.5", - "MinCompatibleVersion": "0.4", + "MinCompatibleVersion": "0.5", "Description": "create cross cluster table", "SchemaUpdateCqlFiles": [ "cross_cluster_table.sql" diff --git a/schema/postgres/cadence/versioned/v0.4/manifest.json b/schema/postgres/cadence/versioned/v0.4/manifest.json index 1be5e055408..e36dc20d5a1 100644 --- a/schema/postgres/cadence/versioned/v0.4/manifest.json +++ b/schema/postgres/cadence/versioned/v0.4/manifest.json @@ -1,6 +1,6 @@ { "CurrVersion": "0.4", - "MinCompatibleVersion": "0.3", + "MinCompatibleVersion": "0.4", "Description": "create cross cluster table", "SchemaUpdateCqlFiles": [ "cross_cluster_table.sql" diff --git a/service/frontend/workflowHandler.go b/service/frontend/workflowHandler.go index 282c36221b2..09698651586 100644 --- a/service/frontend/workflowHandler.go +++ b/service/frontend/workflowHandler.go @@ -46,7 +46,7 @@ import ( "github.com/uber/cadence/common/messaging" "github.com/uber/cadence/common/metrics" "github.com/uber/cadence/common/persistence" - "github.com/uber/cadence/common/persistence/persistence-utils" + persistenceutils "github.com/uber/cadence/common/persistence/persistence-utils" "github.com/uber/cadence/common/quotas" "github.com/uber/cadence/common/resource" "github.com/uber/cadence/common/types" diff --git a/service/history/execution/state_rebuilder.go b/service/history/execution/state_rebuilder.go index a4ea6897eed..2b85721c399 100644 --- a/service/history/execution/state_rebuilder.go +++ b/service/history/execution/state_rebuilder.go @@ -36,7 +36,7 @@ import ( "github.com/uber/cadence/common/log" "github.com/uber/cadence/common/log/tag" "github.com/uber/cadence/common/persistence" - "github.com/uber/cadence/common/persistence/persistence-utils" + persistenceutils "github.com/uber/cadence/common/persistence/persistence-utils" "github.com/uber/cadence/common/types" "github.com/uber/cadence/service/history/shard" ) diff --git a/service/history/replication/task_ack_manager.go b/service/history/replication/task_ack_manager.go index bb900eda0bd..3025043618f 100644 --- a/service/history/replication/task_ack_manager.go +++ b/service/history/replication/task_ack_manager.go @@ -40,7 +40,7 @@ import ( "github.com/uber/cadence/common/log/tag" "github.com/uber/cadence/common/metrics" "github.com/uber/cadence/common/persistence" - "github.com/uber/cadence/common/persistence/persistence-utils" + persistenceutils "github.com/uber/cadence/common/persistence/persistence-utils" "github.com/uber/cadence/common/quotas" "github.com/uber/cadence/common/types" exec "github.com/uber/cadence/service/history/execution" diff --git a/service/history/reset/resetter.go b/service/history/reset/resetter.go index 47fdb53be4b..2930f716b96 100644 --- a/service/history/reset/resetter.go +++ b/service/history/reset/resetter.go @@ -33,7 +33,7 @@ import ( "github.com/uber/cadence/common/definition" "github.com/uber/cadence/common/log" "github.com/uber/cadence/common/persistence" - "github.com/uber/cadence/common/persistence/persistence-utils" + persistenceutils "github.com/uber/cadence/common/persistence/persistence-utils" "github.com/uber/cadence/common/types" "github.com/uber/cadence/service/history/execution" "github.com/uber/cadence/service/history/shard" diff --git a/service/history/task/cross_cluster_task.go b/service/history/task/cross_cluster_task.go index b29e2ca1908..d57b8a9e262 100644 --- a/service/history/task/cross_cluster_task.go +++ b/service/history/task/cross_cluster_task.go @@ -303,4 +303,4 @@ func (c *crossClusterTaskBase) GetQueueType() QueueType { func (c *crossClusterTaskBase) IsReadyForPoll() bool { return c.state == ctask.TaskStatePending && (c.processingState == processingStateInitialed || c.processingState == processingStateResponseRecorded) -} \ No newline at end of file +}