From c4713d202a6ff18764f0ad854ce70fa77f9b7884 Mon Sep 17 00:00:00 2001 From: Max K <25259015+mkolodezny@users.noreply.github.com> Date: Thu, 5 May 2022 08:17:00 -0700 Subject: [PATCH] updated idl for activity task dispatch (#4815) * updated idl for activity task dispatch --- .gen/go/matching/matching.go | 802 +++++++++++++++++++++++-- common/types/mapper/thrift/matching.go | 34 ++ idls | 2 +- 3 files changed, 801 insertions(+), 37 deletions(-) diff --git a/.gen/go/matching/matching.go b/.gen/go/matching/matching.go index fa012e8d5b7..33b3be002e1 100644 --- a/.gen/go/matching/matching.go +++ b/.gen/go/matching/matching.go @@ -44,6 +44,696 @@ import ( shared "github.com/uber/cadence/.gen/go/shared" ) +type ActivityTaskDispatchInfo struct { + ScheduledEvent *shared.HistoryEvent `json:"scheduledEvent,omitempty"` + StartedTimestamp *int64 `json:"startedTimestamp,omitempty"` + Attempt *int64 `json:"attempt,omitempty"` + ScheduledTimestampOfThisAttempt *int64 `json:"scheduledTimestampOfThisAttempt,omitempty"` + ScheduledTimestamp *int64 `json:"scheduledTimestamp,omitempty"` + HeartbeatDetails []byte `json:"heartbeatDetails,omitempty"` + WorkflowType *shared.WorkflowType `json:"workflowType,omitempty"` + WorkflowDomain *string `json:"workflowDomain,omitempty"` +} + +// ToWire translates a ActivityTaskDispatchInfo struct into a Thrift-level intermediate +// representation. This intermediate representation may be serialized +// into bytes using a ThriftRW protocol implementation. +// +// An error is returned if the struct or any of its fields failed to +// validate. +// +// x, err := v.ToWire() +// if err != nil { +// return err +// } +// +// if err := binaryProtocol.Encode(x, writer); err != nil { +// return err +// } +func (v *ActivityTaskDispatchInfo) ToWire() (wire.Value, error) { + var ( + fields [8]wire.Field + i int = 0 + w wire.Value + err error + ) + + if v.ScheduledEvent != nil { + w, err = v.ScheduledEvent.ToWire() + if err != nil { + return w, err + } + fields[i] = wire.Field{ID: 10, Value: w} + i++ + } + if v.StartedTimestamp != nil { + w, err = wire.NewValueI64(*(v.StartedTimestamp)), error(nil) + if err != nil { + return w, err + } + fields[i] = wire.Field{ID: 20, Value: w} + i++ + } + if v.Attempt != nil { + w, err = wire.NewValueI64(*(v.Attempt)), error(nil) + if err != nil { + return w, err + } + fields[i] = wire.Field{ID: 30, Value: w} + i++ + } + if v.ScheduledTimestampOfThisAttempt != nil { + w, err = wire.NewValueI64(*(v.ScheduledTimestampOfThisAttempt)), error(nil) + if err != nil { + return w, err + } + fields[i] = wire.Field{ID: 40, Value: w} + i++ + } + if v.ScheduledTimestamp != nil { + w, err = wire.NewValueI64(*(v.ScheduledTimestamp)), error(nil) + if err != nil { + return w, err + } + fields[i] = wire.Field{ID: 50, Value: w} + i++ + } + if v.HeartbeatDetails != nil { + w, err = wire.NewValueBinary(v.HeartbeatDetails), error(nil) + if err != nil { + return w, err + } + fields[i] = wire.Field{ID: 60, Value: w} + i++ + } + if v.WorkflowType != nil { + w, err = v.WorkflowType.ToWire() + if err != nil { + return w, err + } + fields[i] = wire.Field{ID: 70, Value: w} + i++ + } + if v.WorkflowDomain != nil { + w, err = wire.NewValueString(*(v.WorkflowDomain)), error(nil) + if err != nil { + return w, err + } + fields[i] = wire.Field{ID: 80, Value: w} + i++ + } + + return wire.NewValueStruct(wire.Struct{Fields: fields[:i]}), nil +} + +func _HistoryEvent_Read(w wire.Value) (*shared.HistoryEvent, error) { + var v shared.HistoryEvent + err := v.FromWire(w) + return &v, err +} + +func _WorkflowType_Read(w wire.Value) (*shared.WorkflowType, error) { + var v shared.WorkflowType + err := v.FromWire(w) + return &v, err +} + +// FromWire deserializes a ActivityTaskDispatchInfo struct from its Thrift-level +// representation. The Thrift-level representation may be obtained +// from a ThriftRW protocol implementation. +// +// An error is returned if we were unable to build a ActivityTaskDispatchInfo struct +// from the provided intermediate representation. +// +// x, err := binaryProtocol.Decode(reader, wire.TStruct) +// if err != nil { +// return nil, err +// } +// +// var v ActivityTaskDispatchInfo +// if err := v.FromWire(x); err != nil { +// return nil, err +// } +// return &v, nil +func (v *ActivityTaskDispatchInfo) FromWire(w wire.Value) error { + var err error + + for _, field := range w.GetStruct().Fields { + switch field.ID { + case 10: + if field.Value.Type() == wire.TStruct { + v.ScheduledEvent, err = _HistoryEvent_Read(field.Value) + if err != nil { + return err + } + + } + case 20: + if field.Value.Type() == wire.TI64 { + var x int64 + x, err = field.Value.GetI64(), error(nil) + v.StartedTimestamp = &x + if err != nil { + return err + } + + } + case 30: + if field.Value.Type() == wire.TI64 { + var x int64 + x, err = field.Value.GetI64(), error(nil) + v.Attempt = &x + if err != nil { + return err + } + + } + case 40: + if field.Value.Type() == wire.TI64 { + var x int64 + x, err = field.Value.GetI64(), error(nil) + v.ScheduledTimestampOfThisAttempt = &x + if err != nil { + return err + } + + } + case 50: + if field.Value.Type() == wire.TI64 { + var x int64 + x, err = field.Value.GetI64(), error(nil) + v.ScheduledTimestamp = &x + if err != nil { + return err + } + + } + case 60: + if field.Value.Type() == wire.TBinary { + v.HeartbeatDetails, err = field.Value.GetBinary(), error(nil) + if err != nil { + return err + } + + } + case 70: + if field.Value.Type() == wire.TStruct { + v.WorkflowType, err = _WorkflowType_Read(field.Value) + if err != nil { + return err + } + + } + case 80: + if field.Value.Type() == wire.TBinary { + var x string + x, err = field.Value.GetString(), error(nil) + v.WorkflowDomain = &x + if err != nil { + return err + } + + } + } + } + + return nil +} + +// Encode serializes a ActivityTaskDispatchInfo struct directly into bytes, without going +// through an intermediary type. +// +// An error is returned if a ActivityTaskDispatchInfo struct could not be encoded. +func (v *ActivityTaskDispatchInfo) Encode(sw stream.Writer) error { + if err := sw.WriteStructBegin(); err != nil { + return err + } + + if v.ScheduledEvent != nil { + if err := sw.WriteFieldBegin(stream.FieldHeader{ID: 10, Type: wire.TStruct}); err != nil { + return err + } + if err := v.ScheduledEvent.Encode(sw); err != nil { + return err + } + if err := sw.WriteFieldEnd(); err != nil { + return err + } + } + + if v.StartedTimestamp != nil { + if err := sw.WriteFieldBegin(stream.FieldHeader{ID: 20, Type: wire.TI64}); err != nil { + return err + } + if err := sw.WriteInt64(*(v.StartedTimestamp)); err != nil { + return err + } + if err := sw.WriteFieldEnd(); err != nil { + return err + } + } + + if v.Attempt != nil { + if err := sw.WriteFieldBegin(stream.FieldHeader{ID: 30, Type: wire.TI64}); err != nil { + return err + } + if err := sw.WriteInt64(*(v.Attempt)); err != nil { + return err + } + if err := sw.WriteFieldEnd(); err != nil { + return err + } + } + + if v.ScheduledTimestampOfThisAttempt != nil { + if err := sw.WriteFieldBegin(stream.FieldHeader{ID: 40, Type: wire.TI64}); err != nil { + return err + } + if err := sw.WriteInt64(*(v.ScheduledTimestampOfThisAttempt)); err != nil { + return err + } + if err := sw.WriteFieldEnd(); err != nil { + return err + } + } + + if v.ScheduledTimestamp != nil { + if err := sw.WriteFieldBegin(stream.FieldHeader{ID: 50, Type: wire.TI64}); err != nil { + return err + } + if err := sw.WriteInt64(*(v.ScheduledTimestamp)); err != nil { + return err + } + if err := sw.WriteFieldEnd(); err != nil { + return err + } + } + + if v.HeartbeatDetails != nil { + if err := sw.WriteFieldBegin(stream.FieldHeader{ID: 60, Type: wire.TBinary}); err != nil { + return err + } + if err := sw.WriteBinary(v.HeartbeatDetails); err != nil { + return err + } + if err := sw.WriteFieldEnd(); err != nil { + return err + } + } + + if v.WorkflowType != nil { + if err := sw.WriteFieldBegin(stream.FieldHeader{ID: 70, Type: wire.TStruct}); err != nil { + return err + } + if err := v.WorkflowType.Encode(sw); err != nil { + return err + } + if err := sw.WriteFieldEnd(); err != nil { + return err + } + } + + if v.WorkflowDomain != nil { + if err := sw.WriteFieldBegin(stream.FieldHeader{ID: 80, Type: wire.TBinary}); err != nil { + return err + } + if err := sw.WriteString(*(v.WorkflowDomain)); err != nil { + return err + } + if err := sw.WriteFieldEnd(); err != nil { + return err + } + } + + return sw.WriteStructEnd() +} + +func _HistoryEvent_Decode(sr stream.Reader) (*shared.HistoryEvent, error) { + var v shared.HistoryEvent + err := v.Decode(sr) + return &v, err +} + +func _WorkflowType_Decode(sr stream.Reader) (*shared.WorkflowType, error) { + var v shared.WorkflowType + err := v.Decode(sr) + return &v, err +} + +// Decode deserializes a ActivityTaskDispatchInfo struct directly from its Thrift-level +// representation, without going through an intemediary type. +// +// An error is returned if a ActivityTaskDispatchInfo struct could not be generated from the wire +// representation. +func (v *ActivityTaskDispatchInfo) Decode(sr stream.Reader) error { + + if err := sr.ReadStructBegin(); err != nil { + return err + } + + fh, ok, err := sr.ReadFieldBegin() + if err != nil { + return err + } + + for ok { + switch { + case fh.ID == 10 && fh.Type == wire.TStruct: + v.ScheduledEvent, err = _HistoryEvent_Decode(sr) + if err != nil { + return err + } + + case fh.ID == 20 && fh.Type == wire.TI64: + var x int64 + x, err = sr.ReadInt64() + v.StartedTimestamp = &x + if err != nil { + return err + } + + case fh.ID == 30 && fh.Type == wire.TI64: + var x int64 + x, err = sr.ReadInt64() + v.Attempt = &x + if err != nil { + return err + } + + case fh.ID == 40 && fh.Type == wire.TI64: + var x int64 + x, err = sr.ReadInt64() + v.ScheduledTimestampOfThisAttempt = &x + if err != nil { + return err + } + + case fh.ID == 50 && fh.Type == wire.TI64: + var x int64 + x, err = sr.ReadInt64() + v.ScheduledTimestamp = &x + if err != nil { + return err + } + + case fh.ID == 60 && fh.Type == wire.TBinary: + v.HeartbeatDetails, err = sr.ReadBinary() + if err != nil { + return err + } + + case fh.ID == 70 && fh.Type == wire.TStruct: + v.WorkflowType, err = _WorkflowType_Decode(sr) + if err != nil { + return err + } + + case fh.ID == 80 && fh.Type == wire.TBinary: + var x string + x, err = sr.ReadString() + v.WorkflowDomain = &x + if err != nil { + return err + } + + default: + if err := sr.Skip(fh.Type); err != nil { + return err + } + } + + if err := sr.ReadFieldEnd(); err != nil { + return err + } + + if fh, ok, err = sr.ReadFieldBegin(); err != nil { + return err + } + } + + if err := sr.ReadStructEnd(); err != nil { + return err + } + + return nil +} + +// String returns a readable string representation of a ActivityTaskDispatchInfo +// struct. +func (v *ActivityTaskDispatchInfo) String() string { + if v == nil { + return "" + } + + var fields [8]string + i := 0 + if v.ScheduledEvent != nil { + fields[i] = fmt.Sprintf("ScheduledEvent: %v", v.ScheduledEvent) + i++ + } + if v.StartedTimestamp != nil { + fields[i] = fmt.Sprintf("StartedTimestamp: %v", *(v.StartedTimestamp)) + i++ + } + if v.Attempt != nil { + fields[i] = fmt.Sprintf("Attempt: %v", *(v.Attempt)) + i++ + } + if v.ScheduledTimestampOfThisAttempt != nil { + fields[i] = fmt.Sprintf("ScheduledTimestampOfThisAttempt: %v", *(v.ScheduledTimestampOfThisAttempt)) + i++ + } + if v.ScheduledTimestamp != nil { + fields[i] = fmt.Sprintf("ScheduledTimestamp: %v", *(v.ScheduledTimestamp)) + i++ + } + if v.HeartbeatDetails != nil { + fields[i] = fmt.Sprintf("HeartbeatDetails: %v", v.HeartbeatDetails) + i++ + } + if v.WorkflowType != nil { + fields[i] = fmt.Sprintf("WorkflowType: %v", v.WorkflowType) + i++ + } + if v.WorkflowDomain != nil { + fields[i] = fmt.Sprintf("WorkflowDomain: %v", *(v.WorkflowDomain)) + i++ + } + + return fmt.Sprintf("ActivityTaskDispatchInfo{%v}", strings.Join(fields[:i], ", ")) +} + +func _I64_EqualsPtr(lhs, rhs *int64) bool { + if lhs != nil && rhs != nil { + + x := *lhs + y := *rhs + return (x == y) + } + return lhs == nil && rhs == nil +} + +func _String_EqualsPtr(lhs, rhs *string) bool { + if lhs != nil && rhs != nil { + + x := *lhs + y := *rhs + return (x == y) + } + return lhs == nil && rhs == nil +} + +// Equals returns true if all the fields of this ActivityTaskDispatchInfo match the +// provided ActivityTaskDispatchInfo. +// +// This function performs a deep comparison. +func (v *ActivityTaskDispatchInfo) Equals(rhs *ActivityTaskDispatchInfo) bool { + if v == nil { + return rhs == nil + } else if rhs == nil { + return false + } + if !((v.ScheduledEvent == nil && rhs.ScheduledEvent == nil) || (v.ScheduledEvent != nil && rhs.ScheduledEvent != nil && v.ScheduledEvent.Equals(rhs.ScheduledEvent))) { + return false + } + if !_I64_EqualsPtr(v.StartedTimestamp, rhs.StartedTimestamp) { + return false + } + if !_I64_EqualsPtr(v.Attempt, rhs.Attempt) { + return false + } + if !_I64_EqualsPtr(v.ScheduledTimestampOfThisAttempt, rhs.ScheduledTimestampOfThisAttempt) { + return false + } + if !_I64_EqualsPtr(v.ScheduledTimestamp, rhs.ScheduledTimestamp) { + return false + } + if !((v.HeartbeatDetails == nil && rhs.HeartbeatDetails == nil) || (v.HeartbeatDetails != nil && rhs.HeartbeatDetails != nil && bytes.Equal(v.HeartbeatDetails, rhs.HeartbeatDetails))) { + return false + } + if !((v.WorkflowType == nil && rhs.WorkflowType == nil) || (v.WorkflowType != nil && rhs.WorkflowType != nil && v.WorkflowType.Equals(rhs.WorkflowType))) { + return false + } + if !_String_EqualsPtr(v.WorkflowDomain, rhs.WorkflowDomain) { + return false + } + + return true +} + +// MarshalLogObject implements zapcore.ObjectMarshaler, enabling +// fast logging of ActivityTaskDispatchInfo. +func (v *ActivityTaskDispatchInfo) MarshalLogObject(enc zapcore.ObjectEncoder) (err error) { + if v == nil { + return nil + } + if v.ScheduledEvent != nil { + err = multierr.Append(err, enc.AddObject("scheduledEvent", v.ScheduledEvent)) + } + if v.StartedTimestamp != nil { + enc.AddInt64("startedTimestamp", *v.StartedTimestamp) + } + if v.Attempt != nil { + enc.AddInt64("attempt", *v.Attempt) + } + if v.ScheduledTimestampOfThisAttempt != nil { + enc.AddInt64("scheduledTimestampOfThisAttempt", *v.ScheduledTimestampOfThisAttempt) + } + if v.ScheduledTimestamp != nil { + enc.AddInt64("scheduledTimestamp", *v.ScheduledTimestamp) + } + if v.HeartbeatDetails != nil { + enc.AddString("heartbeatDetails", base64.StdEncoding.EncodeToString(v.HeartbeatDetails)) + } + if v.WorkflowType != nil { + err = multierr.Append(err, enc.AddObject("workflowType", v.WorkflowType)) + } + if v.WorkflowDomain != nil { + enc.AddString("workflowDomain", *v.WorkflowDomain) + } + return err +} + +// GetScheduledEvent returns the value of ScheduledEvent if it is set or its +// zero value if it is unset. +func (v *ActivityTaskDispatchInfo) GetScheduledEvent() (o *shared.HistoryEvent) { + if v != nil && v.ScheduledEvent != nil { + return v.ScheduledEvent + } + + return +} + +// IsSetScheduledEvent returns true if ScheduledEvent is not nil. +func (v *ActivityTaskDispatchInfo) IsSetScheduledEvent() bool { + return v != nil && v.ScheduledEvent != nil +} + +// GetStartedTimestamp returns the value of StartedTimestamp if it is set or its +// zero value if it is unset. +func (v *ActivityTaskDispatchInfo) GetStartedTimestamp() (o int64) { + if v != nil && v.StartedTimestamp != nil { + return *v.StartedTimestamp + } + + return +} + +// IsSetStartedTimestamp returns true if StartedTimestamp is not nil. +func (v *ActivityTaskDispatchInfo) IsSetStartedTimestamp() bool { + return v != nil && v.StartedTimestamp != nil +} + +// GetAttempt returns the value of Attempt if it is set or its +// zero value if it is unset. +func (v *ActivityTaskDispatchInfo) GetAttempt() (o int64) { + if v != nil && v.Attempt != nil { + return *v.Attempt + } + + return +} + +// IsSetAttempt returns true if Attempt is not nil. +func (v *ActivityTaskDispatchInfo) IsSetAttempt() bool { + return v != nil && v.Attempt != nil +} + +// GetScheduledTimestampOfThisAttempt returns the value of ScheduledTimestampOfThisAttempt if it is set or its +// zero value if it is unset. +func (v *ActivityTaskDispatchInfo) GetScheduledTimestampOfThisAttempt() (o int64) { + if v != nil && v.ScheduledTimestampOfThisAttempt != nil { + return *v.ScheduledTimestampOfThisAttempt + } + + return +} + +// IsSetScheduledTimestampOfThisAttempt returns true if ScheduledTimestampOfThisAttempt is not nil. +func (v *ActivityTaskDispatchInfo) IsSetScheduledTimestampOfThisAttempt() bool { + return v != nil && v.ScheduledTimestampOfThisAttempt != nil +} + +// GetScheduledTimestamp returns the value of ScheduledTimestamp if it is set or its +// zero value if it is unset. +func (v *ActivityTaskDispatchInfo) GetScheduledTimestamp() (o int64) { + if v != nil && v.ScheduledTimestamp != nil { + return *v.ScheduledTimestamp + } + + return +} + +// IsSetScheduledTimestamp returns true if ScheduledTimestamp is not nil. +func (v *ActivityTaskDispatchInfo) IsSetScheduledTimestamp() bool { + return v != nil && v.ScheduledTimestamp != nil +} + +// GetHeartbeatDetails returns the value of HeartbeatDetails if it is set or its +// zero value if it is unset. +func (v *ActivityTaskDispatchInfo) GetHeartbeatDetails() (o []byte) { + if v != nil && v.HeartbeatDetails != nil { + return v.HeartbeatDetails + } + + return +} + +// IsSetHeartbeatDetails returns true if HeartbeatDetails is not nil. +func (v *ActivityTaskDispatchInfo) IsSetHeartbeatDetails() bool { + return v != nil && v.HeartbeatDetails != nil +} + +// GetWorkflowType returns the value of WorkflowType if it is set or its +// zero value if it is unset. +func (v *ActivityTaskDispatchInfo) GetWorkflowType() (o *shared.WorkflowType) { + if v != nil && v.WorkflowType != nil { + return v.WorkflowType + } + + return +} + +// IsSetWorkflowType returns true if WorkflowType is not nil. +func (v *ActivityTaskDispatchInfo) IsSetWorkflowType() bool { + return v != nil && v.WorkflowType != nil +} + +// GetWorkflowDomain returns the value of WorkflowDomain if it is set or its +// zero value if it is unset. +func (v *ActivityTaskDispatchInfo) GetWorkflowDomain() (o string) { + if v != nil && v.WorkflowDomain != nil { + return *v.WorkflowDomain + } + + return +} + +// IsSetWorkflowDomain returns true if WorkflowDomain is not nil. +func (v *ActivityTaskDispatchInfo) IsSetWorkflowDomain() bool { + return v != nil && v.WorkflowDomain != nil +} + type AddActivityTaskRequest struct { DomainUUID *string `json:"domainUUID,omitempty"` Execution *shared.WorkflowExecution `json:"execution,omitempty"` @@ -53,6 +743,7 @@ type AddActivityTaskRequest struct { ScheduleToStartTimeoutSeconds *int32 `json:"scheduleToStartTimeoutSeconds,omitempty"` Source *TaskSource `json:"source,omitempty"` ForwardedFrom *string `json:"forwardedFrom,omitempty"` + ActivityTaskDispatchInfo *ActivityTaskDispatchInfo `json:"activityTaskDispatchInfo,omitempty"` } // ToWire translates a AddActivityTaskRequest struct into a Thrift-level intermediate @@ -72,7 +763,7 @@ type AddActivityTaskRequest struct { // } func (v *AddActivityTaskRequest) ToWire() (wire.Value, error) { var ( - fields [8]wire.Field + fields [9]wire.Field i int = 0 w wire.Value err error @@ -142,6 +833,14 @@ func (v *AddActivityTaskRequest) ToWire() (wire.Value, error) { fields[i] = wire.Field{ID: 70, Value: w} i++ } + if v.ActivityTaskDispatchInfo != nil { + w, err = v.ActivityTaskDispatchInfo.ToWire() + if err != nil { + return w, err + } + fields[i] = wire.Field{ID: 80, Value: w} + i++ + } return wire.NewValueStruct(wire.Struct{Fields: fields[:i]}), nil } @@ -164,6 +863,12 @@ func _TaskSource_Read(w wire.Value) (TaskSource, error) { return v, err } +func _ActivityTaskDispatchInfo_Read(w wire.Value) (*ActivityTaskDispatchInfo, error) { + var v ActivityTaskDispatchInfo + err := v.FromWire(w) + return &v, err +} + // FromWire deserializes a AddActivityTaskRequest struct from its Thrift-level // representation. The Thrift-level representation may be obtained // from a ThriftRW protocol implementation. @@ -261,6 +966,14 @@ func (v *AddActivityTaskRequest) FromWire(w wire.Value) error { return err } + } + case 80: + if field.Value.Type() == wire.TStruct { + v.ActivityTaskDispatchInfo, err = _ActivityTaskDispatchInfo_Read(field.Value) + if err != nil { + return err + } + } } } @@ -373,6 +1086,18 @@ func (v *AddActivityTaskRequest) Encode(sw stream.Writer) error { } } + if v.ActivityTaskDispatchInfo != nil { + if err := sw.WriteFieldBegin(stream.FieldHeader{ID: 80, Type: wire.TStruct}); err != nil { + return err + } + if err := v.ActivityTaskDispatchInfo.Encode(sw); err != nil { + return err + } + if err := sw.WriteFieldEnd(); err != nil { + return err + } + } + return sw.WriteStructEnd() } @@ -394,6 +1119,12 @@ func _TaskSource_Decode(sr stream.Reader) (TaskSource, error) { return v, err } +func _ActivityTaskDispatchInfo_Decode(sr stream.Reader) (*ActivityTaskDispatchInfo, error) { + var v ActivityTaskDispatchInfo + err := v.Decode(sr) + return &v, err +} + // Decode deserializes a AddActivityTaskRequest struct directly from its Thrift-level // representation, without going through an intemediary type. // @@ -472,6 +1203,12 @@ func (v *AddActivityTaskRequest) Decode(sr stream.Reader) error { return err } + case fh.ID == 80 && fh.Type == wire.TStruct: + v.ActivityTaskDispatchInfo, err = _ActivityTaskDispatchInfo_Decode(sr) + if err != nil { + return err + } + default: if err := sr.Skip(fh.Type); err != nil { return err @@ -501,7 +1238,7 @@ func (v *AddActivityTaskRequest) String() string { return "" } - var fields [8]string + var fields [9]string i := 0 if v.DomainUUID != nil { fields[i] = fmt.Sprintf("DomainUUID: %v", *(v.DomainUUID)) @@ -535,28 +1272,12 @@ func (v *AddActivityTaskRequest) String() string { fields[i] = fmt.Sprintf("ForwardedFrom: %v", *(v.ForwardedFrom)) i++ } - - return fmt.Sprintf("AddActivityTaskRequest{%v}", strings.Join(fields[:i], ", ")) -} - -func _String_EqualsPtr(lhs, rhs *string) bool { - if lhs != nil && rhs != nil { - - x := *lhs - y := *rhs - return (x == y) + if v.ActivityTaskDispatchInfo != nil { + fields[i] = fmt.Sprintf("ActivityTaskDispatchInfo: %v", v.ActivityTaskDispatchInfo) + i++ } - return lhs == nil && rhs == nil -} -func _I64_EqualsPtr(lhs, rhs *int64) bool { - if lhs != nil && rhs != nil { - - x := *lhs - y := *rhs - return (x == y) - } - return lhs == nil && rhs == nil + return fmt.Sprintf("AddActivityTaskRequest{%v}", strings.Join(fields[:i], ", ")) } func _I32_EqualsPtr(lhs, rhs *int32) bool { @@ -613,6 +1334,9 @@ func (v *AddActivityTaskRequest) Equals(rhs *AddActivityTaskRequest) bool { if !_String_EqualsPtr(v.ForwardedFrom, rhs.ForwardedFrom) { return false } + if !((v.ActivityTaskDispatchInfo == nil && rhs.ActivityTaskDispatchInfo == nil) || (v.ActivityTaskDispatchInfo != nil && rhs.ActivityTaskDispatchInfo != nil && v.ActivityTaskDispatchInfo.Equals(rhs.ActivityTaskDispatchInfo))) { + return false + } return true } @@ -647,6 +1371,9 @@ func (v *AddActivityTaskRequest) MarshalLogObject(enc zapcore.ObjectEncoder) (er if v.ForwardedFrom != nil { enc.AddString("forwardedFrom", *v.ForwardedFrom) } + if v.ActivityTaskDispatchInfo != nil { + err = multierr.Append(err, enc.AddObject("activityTaskDispatchInfo", v.ActivityTaskDispatchInfo)) + } return err } @@ -770,6 +1497,21 @@ func (v *AddActivityTaskRequest) IsSetForwardedFrom() bool { return v != nil && v.ForwardedFrom != nil } +// GetActivityTaskDispatchInfo returns the value of ActivityTaskDispatchInfo if it is set or its +// zero value if it is unset. +func (v *AddActivityTaskRequest) GetActivityTaskDispatchInfo() (o *ActivityTaskDispatchInfo) { + if v != nil && v.ActivityTaskDispatchInfo != nil { + return v.ActivityTaskDispatchInfo + } + + return +} + +// IsSetActivityTaskDispatchInfo returns true if ActivityTaskDispatchInfo is not nil. +func (v *AddActivityTaskRequest) IsSetActivityTaskDispatchInfo() bool { + return v != nil && v.ActivityTaskDispatchInfo != nil +} + type AddDecisionTaskRequest struct { DomainUUID *string `json:"domainUUID,omitempty"` Execution *shared.WorkflowExecution `json:"execution,omitempty"` @@ -3347,12 +4089,6 @@ func (v *PollForDecisionTaskResponse) ToWire() (wire.Value, error) { return wire.NewValueStruct(wire.Struct{Fields: fields[:i]}), nil } -func _WorkflowType_Read(w wire.Value) (*shared.WorkflowType, error) { - var v shared.WorkflowType - err := v.FromWire(w) - return &v, err -} - func _WorkflowQuery_Read(w wire.Value) (*shared.WorkflowQuery, error) { var v shared.WorkflowQuery err := v.FromWire(w) @@ -3817,12 +4553,6 @@ func (v *PollForDecisionTaskResponse) Encode(sw stream.Writer) error { return sw.WriteStructEnd() } -func _WorkflowType_Decode(sr stream.Reader) (*shared.WorkflowType, error) { - var v shared.WorkflowType - err := v.Decode(sr) - return &v, err -} - func _WorkflowQuery_Decode(sr stream.Reader) (*shared.WorkflowQuery, error) { var v shared.WorkflowQuery err := v.Decode(sr) @@ -5550,14 +6280,14 @@ var ThriftModule = &thriftreflect.ThriftModule{ Name: "matching", Package: "github.com/uber/cadence/.gen/go/matching", FilePath: "matching.thrift", - SHA1: "dfeeaa9142345924cb1a3a7e18106dc72465efdf", + SHA1: "28de7336ec8353772c65ea1e8aed8dc6803ddae0", Includes: []*thriftreflect.ThriftModule{ shared.ThriftModule, }, Raw: rawIDL, } -const rawIDL = "// Copyright (c) 2017 Uber Technologies, Inc.\n//\n// Permission is hereby granted, free of charge, to any person obtaining a copy\n// of this software and associated documentation files (the \"Software\"), to deal\n// in the Software without restriction, including without limitation the rights\n// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell\n// copies of the Software, and to permit persons to whom the Software is\n// furnished to do so, subject to the following conditions:\n//\n// The above copyright notice and this permission notice shall be included in\n// all copies or substantial portions of the Software.\n//\n// THE SOFTWARE IS PROVIDED \"AS IS\", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR\n// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,\n// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE\n// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER\n// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,\n// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN\n// THE SOFTWARE.\n\ninclude \"shared.thrift\"\n\nnamespace java com.uber.cadence.matching\n\n// TaskSource is the source from which a task was produced\nenum TaskSource {\n HISTORY, // Task produced by history service\n DB_BACKLOG // Task produced from matching db backlog\n}\n\nstruct PollForDecisionTaskRequest {\n 10: optional string domainUUID\n 15: optional string pollerID\n 20: optional shared.PollForDecisionTaskRequest pollRequest\n 30: optional string forwardedFrom\n}\n\nstruct PollForDecisionTaskResponse {\n 10: optional binary taskToken\n 20: optional shared.WorkflowExecution workflowExecution\n 30: optional shared.WorkflowType workflowType\n 40: optional i64 (js.type = \"Long\") previousStartedEventId\n 50: optional i64 (js.type = \"Long\") startedEventId\n 51: optional i64 (js.type = \"Long\") attempt\n 60: optional i64 (js.type = \"Long\") nextEventId\n 65: optional i64 (js.type = \"Long\") backlogCountHint\n 70: optional bool stickyExecutionEnabled\n 80: optional shared.WorkflowQuery query\n 90: optional shared.TransientDecisionInfo decisionInfo\n 100: optional shared.TaskList WorkflowExecutionTaskList\n 110: optional i32 eventStoreVersion\n 120: optional binary branchToken\n 130: optional i64 (js.type = \"Long\") scheduledTimestamp\n 140: optional i64 (js.type = \"Long\") startedTimestamp\n 150: optional map queries\n}\n\nstruct PollForActivityTaskRequest {\n 10: optional string domainUUID\n 15: optional string pollerID\n 20: optional shared.PollForActivityTaskRequest pollRequest\n 30: optional string forwardedFrom\n}\n\nstruct AddDecisionTaskRequest {\n 10: optional string domainUUID\n 20: optional shared.WorkflowExecution execution\n 30: optional shared.TaskList taskList\n 40: optional i64 (js.type = \"Long\") scheduleId\n 50: optional i32 scheduleToStartTimeoutSeconds\n 59: optional TaskSource source\n 60: optional string forwardedFrom\n}\n\nstruct AddActivityTaskRequest {\n 10: optional string domainUUID\n 20: optional shared.WorkflowExecution execution\n 30: optional string sourceDomainUUID\n 40: optional shared.TaskList taskList\n 50: optional i64 (js.type = \"Long\") scheduleId\n 60: optional i32 scheduleToStartTimeoutSeconds\n 69: optional TaskSource source\n 70: optional string forwardedFrom\n}\n\nstruct QueryWorkflowRequest {\n 10: optional string domainUUID\n 20: optional shared.TaskList taskList\n 30: optional shared.QueryWorkflowRequest queryRequest\n 40: optional string forwardedFrom\n}\n\nstruct RespondQueryTaskCompletedRequest {\n 10: optional string domainUUID\n 20: optional shared.TaskList taskList\n 30: optional string taskID\n 40: optional shared.RespondQueryTaskCompletedRequest completedRequest\n}\n\nstruct CancelOutstandingPollRequest {\n 10: optional string domainUUID\n 20: optional i32 taskListType\n 30: optional shared.TaskList taskList\n 40: optional string pollerID\n}\n\nstruct DescribeTaskListRequest {\n 10: optional string domainUUID\n 20: optional shared.DescribeTaskListRequest descRequest\n}\n\nstruct ListTaskListPartitionsRequest {\n 10: optional string domain\n 20: optional shared.TaskList taskList\n}\n\n/**\n* MatchingService API is exposed to provide support for polling from long running applications.\n* Such applications are expected to have a worker which regularly polls for DecisionTask and ActivityTask. For each\n* DecisionTask, application is expected to process the history of events for that session and respond back with next\n* decisions. For each ActivityTask, application is expected to execute the actual logic for that task and respond back\n* with completion or failure.\n**/\nservice MatchingService {\n /**\n * PollForDecisionTask is called by frontend to process DecisionTask from a specific taskList. A\n * DecisionTask is dispatched to callers for active workflow executions, with pending decisions.\n **/\n PollForDecisionTaskResponse PollForDecisionTask(1: PollForDecisionTaskRequest pollRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.LimitExceededError limitExceededError,\n 4: shared.ServiceBusyError serviceBusyError,\n )\n\n /**\n * PollForActivityTask is called by frontend to process ActivityTask from a specific taskList. ActivityTask\n * is dispatched to callers whenever a ScheduleTask decision is made for a workflow execution.\n **/\n shared.PollForActivityTaskResponse PollForActivityTask(1: PollForActivityTaskRequest pollRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.LimitExceededError limitExceededError,\n 4: shared.ServiceBusyError serviceBusyError,\n )\n\n /**\n * AddDecisionTask is called by the history service when a decision task is scheduled, so that it can be dispatched\n * by the MatchingEngine.\n **/\n void AddDecisionTask(1: AddDecisionTaskRequest addRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.ServiceBusyError serviceBusyError,\n 4: shared.LimitExceededError limitExceededError,\n 5: shared.DomainNotActiveError domainNotActiveError,\n 6: shared.RemoteSyncMatchedError remoteSyncMatchedError,\n )\n\n /**\n * AddActivityTask is called by the history service when a decision task is scheduled, so that it can be dispatched\n * by the MatchingEngine.\n **/\n void AddActivityTask(1: AddActivityTaskRequest addRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.ServiceBusyError serviceBusyError,\n 4: shared.LimitExceededError limitExceededError,\n 5: shared.DomainNotActiveError domainNotActiveError,\n 6: shared.RemoteSyncMatchedError remoteSyncMatchedError,\n )\n\n /**\n * QueryWorkflow is called by frontend to query a workflow.\n **/\n shared.QueryWorkflowResponse QueryWorkflow(1: QueryWorkflowRequest queryRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.EntityNotExistsError entityNotExistError,\n 4: shared.QueryFailedError queryFailedError,\n 5: shared.LimitExceededError limitExceededError,\n 6: shared.ServiceBusyError serviceBusyError,\n )\n\n /**\n * RespondQueryTaskCompleted is called by frontend to respond query completed.\n **/\n void RespondQueryTaskCompleted(1: RespondQueryTaskCompletedRequest request)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.EntityNotExistsError entityNotExistError,\n 4: shared.LimitExceededError limitExceededError,\n 5: shared.ServiceBusyError serviceBusyError,\n )\n\n /**\n * CancelOutstandingPoll is called by frontend to unblock long polls on matching for zombie pollers.\n * Our rpc stack does not support context propagation, so when a client connection goes away frontend sees\n * cancellation of context for that handler, but any corresponding calls (long-poll) to matching service does not\n * see the cancellation propagated so it can unblock corresponding long-polls on its end. This results is tasks\n * being dispatched to zombie pollers in this situation. This API is added so everytime frontend makes a long-poll\n * api call to matching it passes in a pollerID and then calls this API when it detects client connection is closed\n * to unblock long polls for this poller and prevent tasks being sent to these zombie pollers.\n **/\n void CancelOutstandingPoll(1: CancelOutstandingPollRequest request)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.ServiceBusyError serviceBusyError,\n )\n\n /**\n * DescribeTaskList returns information about the target tasklist, right now this API returns the\n * pollers which polled this tasklist in last few minutes.\n **/\n shared.DescribeTaskListResponse DescribeTaskList(1: DescribeTaskListRequest request)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.EntityNotExistsError entityNotExistError,\n 4: shared.ServiceBusyError serviceBusyError,\n )\n\n /**\n * GetTaskListsByDomain returns the list of all the task lists for a domainName.\n **/\n shared.GetTaskListsByDomainResponse GetTaskListsByDomain(1: shared.GetTaskListsByDomainRequest request)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.EntityNotExistsError entityNotExistError,\n 4: shared.ServiceBusyError serviceBusyError,\n )\n\n /**\n * ListTaskListPartitions returns a map of partitionKey and hostAddress for a taskList\n **/\n shared.ListTaskListPartitionsResponse ListTaskListPartitions(1: ListTaskListPartitionsRequest request)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 4: shared.ServiceBusyError serviceBusyError,\n )\n}\n" +const rawIDL = "// Copyright (c) 2017 Uber Technologies, Inc.\n//\n// Permission is hereby granted, free of charge, to any person obtaining a copy\n// of this software and associated documentation files (the \"Software\"), to deal\n// in the Software without restriction, including without limitation the rights\n// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell\n// copies of the Software, and to permit persons to whom the Software is\n// furnished to do so, subject to the following conditions:\n//\n// The above copyright notice and this permission notice shall be included in\n// all copies or substantial portions of the Software.\n//\n// THE SOFTWARE IS PROVIDED \"AS IS\", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR\n// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,\n// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE\n// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER\n// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,\n// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN\n// THE SOFTWARE.\n\ninclude \"shared.thrift\"\n\nnamespace java com.uber.cadence.matching\n\n// TaskSource is the source from which a task was produced\nenum TaskSource {\n HISTORY, // Task produced by history service\n DB_BACKLOG // Task produced from matching db backlog\n}\n\nstruct PollForDecisionTaskRequest {\n 10: optional string domainUUID\n 15: optional string pollerID\n 20: optional shared.PollForDecisionTaskRequest pollRequest\n 30: optional string forwardedFrom\n}\n\nstruct PollForDecisionTaskResponse {\n 10: optional binary taskToken\n 20: optional shared.WorkflowExecution workflowExecution\n 30: optional shared.WorkflowType workflowType\n 40: optional i64 (js.type = \"Long\") previousStartedEventId\n 50: optional i64 (js.type = \"Long\") startedEventId\n 51: optional i64 (js.type = \"Long\") attempt\n 60: optional i64 (js.type = \"Long\") nextEventId\n 65: optional i64 (js.type = \"Long\") backlogCountHint\n 70: optional bool stickyExecutionEnabled\n 80: optional shared.WorkflowQuery query\n 90: optional shared.TransientDecisionInfo decisionInfo\n 100: optional shared.TaskList WorkflowExecutionTaskList\n 110: optional i32 eventStoreVersion\n 120: optional binary branchToken\n 130: optional i64 (js.type = \"Long\") scheduledTimestamp\n 140: optional i64 (js.type = \"Long\") startedTimestamp\n 150: optional map queries\n}\n\nstruct PollForActivityTaskRequest {\n 10: optional string domainUUID\n 15: optional string pollerID\n 20: optional shared.PollForActivityTaskRequest pollRequest\n 30: optional string forwardedFrom\n}\n\nstruct AddDecisionTaskRequest {\n 10: optional string domainUUID\n 20: optional shared.WorkflowExecution execution\n 30: optional shared.TaskList taskList\n 40: optional i64 (js.type = \"Long\") scheduleId\n 50: optional i32 scheduleToStartTimeoutSeconds\n 59: optional TaskSource source\n 60: optional string forwardedFrom\n}\n\nstruct AddActivityTaskRequest {\n 10: optional string domainUUID\n 20: optional shared.WorkflowExecution execution\n 30: optional string sourceDomainUUID\n 40: optional shared.TaskList taskList\n 50: optional i64 (js.type = \"Long\") scheduleId\n 60: optional i32 scheduleToStartTimeoutSeconds\n 69: optional TaskSource source\n 70: optional string forwardedFrom\n 80: optional ActivityTaskDispatchInfo activityTaskDispatchInfo\n}\n\nstruct ActivityTaskDispatchInfo {\n 10: optional shared.HistoryEvent scheduledEvent\n 20: optional i64 (js.type = \"Long\") startedTimestamp\n 30: optional i64 (js.type = \"Long\") attempt\n 40: optional i64 (js.type = \"Long\") scheduledTimestampOfThisAttempt\n 50: optional i64 (js.type = \"Long\") scheduledTimestamp\n 60: optional binary heartbeatDetails\n 70: optional shared.WorkflowType workflowType\n 80: optional string workflowDomain\n}\n\nstruct QueryWorkflowRequest {\n 10: optional string domainUUID\n 20: optional shared.TaskList taskList\n 30: optional shared.QueryWorkflowRequest queryRequest\n 40: optional string forwardedFrom\n}\n\nstruct RespondQueryTaskCompletedRequest {\n 10: optional string domainUUID\n 20: optional shared.TaskList taskList\n 30: optional string taskID\n 40: optional shared.RespondQueryTaskCompletedRequest completedRequest\n}\n\nstruct CancelOutstandingPollRequest {\n 10: optional string domainUUID\n 20: optional i32 taskListType\n 30: optional shared.TaskList taskList\n 40: optional string pollerID\n}\n\nstruct DescribeTaskListRequest {\n 10: optional string domainUUID\n 20: optional shared.DescribeTaskListRequest descRequest\n}\n\nstruct ListTaskListPartitionsRequest {\n 10: optional string domain\n 20: optional shared.TaskList taskList\n}\n\n/**\n* MatchingService API is exposed to provide support for polling from long running applications.\n* Such applications are expected to have a worker which regularly polls for DecisionTask and ActivityTask. For each\n* DecisionTask, application is expected to process the history of events for that session and respond back with next\n* decisions. For each ActivityTask, application is expected to execute the actual logic for that task and respond back\n* with completion or failure.\n**/\nservice MatchingService {\n /**\n * PollForDecisionTask is called by frontend to process DecisionTask from a specific taskList. A\n * DecisionTask is dispatched to callers for active workflow executions, with pending decisions.\n **/\n PollForDecisionTaskResponse PollForDecisionTask(1: PollForDecisionTaskRequest pollRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.LimitExceededError limitExceededError,\n 4: shared.ServiceBusyError serviceBusyError,\n )\n\n /**\n * PollForActivityTask is called by frontend to process ActivityTask from a specific taskList. ActivityTask\n * is dispatched to callers whenever a ScheduleTask decision is made for a workflow execution.\n **/\n shared.PollForActivityTaskResponse PollForActivityTask(1: PollForActivityTaskRequest pollRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.LimitExceededError limitExceededError,\n 4: shared.ServiceBusyError serviceBusyError,\n )\n\n /**\n * AddDecisionTask is called by the history service when a decision task is scheduled, so that it can be dispatched\n * by the MatchingEngine.\n **/\n void AddDecisionTask(1: AddDecisionTaskRequest addRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.ServiceBusyError serviceBusyError,\n 4: shared.LimitExceededError limitExceededError,\n 5: shared.DomainNotActiveError domainNotActiveError,\n 6: shared.RemoteSyncMatchedError remoteSyncMatchedError,\n )\n\n /**\n * AddActivityTask is called by the history service when a decision task is scheduled, so that it can be dispatched\n * by the MatchingEngine.\n **/\n void AddActivityTask(1: AddActivityTaskRequest addRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.ServiceBusyError serviceBusyError,\n 4: shared.LimitExceededError limitExceededError,\n 5: shared.DomainNotActiveError domainNotActiveError,\n 6: shared.RemoteSyncMatchedError remoteSyncMatchedError,\n )\n\n /**\n * QueryWorkflow is called by frontend to query a workflow.\n **/\n shared.QueryWorkflowResponse QueryWorkflow(1: QueryWorkflowRequest queryRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.EntityNotExistsError entityNotExistError,\n 4: shared.QueryFailedError queryFailedError,\n 5: shared.LimitExceededError limitExceededError,\n 6: shared.ServiceBusyError serviceBusyError,\n )\n\n /**\n * RespondQueryTaskCompleted is called by frontend to respond query completed.\n **/\n void RespondQueryTaskCompleted(1: RespondQueryTaskCompletedRequest request)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.EntityNotExistsError entityNotExistError,\n 4: shared.LimitExceededError limitExceededError,\n 5: shared.ServiceBusyError serviceBusyError,\n )\n\n /**\n * CancelOutstandingPoll is called by frontend to unblock long polls on matching for zombie pollers.\n * Our rpc stack does not support context propagation, so when a client connection goes away frontend sees\n * cancellation of context for that handler, but any corresponding calls (long-poll) to matching service does not\n * see the cancellation propagated so it can unblock corresponding long-polls on its end. This results is tasks\n * being dispatched to zombie pollers in this situation. This API is added so everytime frontend makes a long-poll\n * api call to matching it passes in a pollerID and then calls this API when it detects client connection is closed\n * to unblock long polls for this poller and prevent tasks being sent to these zombie pollers.\n **/\n void CancelOutstandingPoll(1: CancelOutstandingPollRequest request)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.ServiceBusyError serviceBusyError,\n )\n\n /**\n * DescribeTaskList returns information about the target tasklist, right now this API returns the\n * pollers which polled this tasklist in last few minutes.\n **/\n shared.DescribeTaskListResponse DescribeTaskList(1: DescribeTaskListRequest request)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.EntityNotExistsError entityNotExistError,\n 4: shared.ServiceBusyError serviceBusyError,\n )\n\n /**\n * GetTaskListsByDomain returns the list of all the task lists for a domainName.\n **/\n shared.GetTaskListsByDomainResponse GetTaskListsByDomain(1: shared.GetTaskListsByDomainRequest request)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.EntityNotExistsError entityNotExistError,\n 4: shared.ServiceBusyError serviceBusyError,\n )\n\n /**\n * ListTaskListPartitions returns a map of partitionKey and hostAddress for a taskList\n **/\n shared.ListTaskListPartitionsResponse ListTaskListPartitions(1: ListTaskListPartitionsRequest request)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 4: shared.ServiceBusyError serviceBusyError,\n )\n}\n" // MatchingService_AddActivityTask_Args represents the arguments for the MatchingService.AddActivityTask function. // diff --git a/common/types/mapper/thrift/matching.go b/common/types/mapper/thrift/matching.go index 27386a11b18..adbacc59188 100644 --- a/common/types/mapper/thrift/matching.go +++ b/common/types/mapper/thrift/matching.go @@ -21,6 +21,7 @@ package thrift import ( + "github.com/uber/cadence/common" "github.com/uber/cadence/common/types" "github.com/uber/cadence/.gen/go/matching" @@ -40,6 +41,7 @@ func FromAddActivityTaskRequest(t *types.AddActivityTaskRequest) *matching.AddAc ScheduleToStartTimeoutSeconds: t.ScheduleToStartTimeoutSeconds, Source: FromTaskSource(t.Source), ForwardedFrom: &t.ForwardedFrom, + ActivityTaskDispatchInfo: FromSyncMatchActivityTaskInfo(t.ActivityTaskDispatchInfo), } } @@ -57,6 +59,38 @@ func ToAddActivityTaskRequest(t *matching.AddActivityTaskRequest) *types.AddActi ScheduleToStartTimeoutSeconds: t.ScheduleToStartTimeoutSeconds, Source: ToTaskSource(t.Source), ForwardedFrom: t.GetForwardedFrom(), + ActivityTaskDispatchInfo: ToSyncMatchActivityTaskInfo(t.ActivityTaskDispatchInfo), + } +} + +func FromSyncMatchActivityTaskInfo(t *types.ActivityTaskDispatchInfo) *matching.ActivityTaskDispatchInfo { + if t == nil { + return nil + } + return &matching.ActivityTaskDispatchInfo{ + ScheduledEvent: FromHistoryEvent(t.ScheduledEvent), + StartedTimestamp: t.StartedTimestamp, + Attempt: t.Attempt, + ScheduledTimestampOfThisAttempt: t.ScheduledTimestampOfThisAttempt, + HeartbeatDetails: t.HeartbeatDetails, + WorkflowType: FromWorkflowType(t.WorkflowType), + WorkflowDomain: &t.WorkflowDomain, + } +} + +// ToRecordActivityTaskStartedResponse converts thrift RecordActivityTaskStartedResponse type to internal +func ToSyncMatchActivityTaskInfo(t *matching.ActivityTaskDispatchInfo) *types.ActivityTaskDispatchInfo { + if t == nil { + return nil + } + return &types.ActivityTaskDispatchInfo{ + ScheduledEvent: ToHistoryEvent(t.ScheduledEvent), + StartedTimestamp: t.StartedTimestamp, + Attempt: t.Attempt, + ScheduledTimestampOfThisAttempt: t.ScheduledTimestampOfThisAttempt, + HeartbeatDetails: t.HeartbeatDetails, + WorkflowType: ToWorkflowType(t.WorkflowType), + WorkflowDomain: common.StringDefault(t.WorkflowDomain), } } diff --git a/idls b/idls index 3318277f97d..7bb6b080838 160000 --- a/idls +++ b/idls @@ -1 +1 @@ -Subproject commit 3318277f97dfb50eb4bb745cbb868884f2d28df1 +Subproject commit 7bb6b08083836896e8bbfbb42af4aa8be8aeb7bb