Skip to content

Commit

Permalink
[CDNC-4360] Record current worker Identity on Pending activity tasks (u…
Browse files Browse the repository at this point in the history
…ber#5307)

* added startedWorkerIdentity field to pending activity

* update thrift

* update proto mapper function

* set value for StartedWorkerIdentity field

* update thrift and proto mapper functions

* update test data

* fix lint
  • Loading branch information
ketsiambaku authored Jun 8, 2023
1 parent 97526c5 commit e57c3b2
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 12 deletions.
26 changes: 14 additions & 12 deletions common/types/mapper/proto/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -1930,18 +1930,19 @@ func FromPendingActivityInfo(t *types.PendingActivityInfo) *apiv1.PendingActivit
return nil
}
return &apiv1.PendingActivityInfo{
ActivityId: t.ActivityID,
ActivityType: FromActivityType(t.ActivityType),
State: FromPendingActivityState(t.State),
HeartbeatDetails: FromPayload(t.HeartbeatDetails),
LastHeartbeatTime: unixNanoToTime(t.LastHeartbeatTimestamp),
LastStartedTime: unixNanoToTime(t.LastStartedTimestamp),
Attempt: t.Attempt,
MaximumAttempts: t.MaximumAttempts,
ScheduledTime: unixNanoToTime(t.ScheduledTimestamp),
ExpirationTime: unixNanoToTime(t.ExpirationTimestamp),
LastFailure: FromFailure(t.LastFailureReason, t.LastFailureDetails),
LastWorkerIdentity: t.LastWorkerIdentity,
ActivityId: t.ActivityID,
ActivityType: FromActivityType(t.ActivityType),
State: FromPendingActivityState(t.State),
HeartbeatDetails: FromPayload(t.HeartbeatDetails),
LastHeartbeatTime: unixNanoToTime(t.LastHeartbeatTimestamp),
LastStartedTime: unixNanoToTime(t.LastStartedTimestamp),
Attempt: t.Attempt,
MaximumAttempts: t.MaximumAttempts,
ScheduledTime: unixNanoToTime(t.ScheduledTimestamp),
ExpirationTime: unixNanoToTime(t.ExpirationTimestamp),
LastFailure: FromFailure(t.LastFailureReason, t.LastFailureDetails),
LastWorkerIdentity: t.LastWorkerIdentity,
StartedWorkerIdentity: t.StartedWorkerIdentity,
}
}

Expand All @@ -1963,6 +1964,7 @@ func ToPendingActivityInfo(t *apiv1.PendingActivityInfo) *types.PendingActivityI
LastFailureReason: ToFailureReason(t.LastFailure),
LastFailureDetails: ToFailureDetails(t.LastFailure),
LastWorkerIdentity: t.LastWorkerIdentity,
StartedWorkerIdentity: t.StartedWorkerIdentity,
}
}

Expand Down
2 changes: 2 additions & 0 deletions common/types/mapper/thrift/shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -3312,6 +3312,7 @@ func FromPendingActivityInfo(t *types.PendingActivityInfo) *shared.PendingActivi
LastFailureReason: t.LastFailureReason,
LastWorkerIdentity: &t.LastWorkerIdentity,
LastFailureDetails: t.LastFailureDetails,
StartedWorkerIdentity: &t.StartedWorkerIdentity,
}
}

Expand All @@ -3334,6 +3335,7 @@ func ToPendingActivityInfo(t *shared.PendingActivityInfo) *types.PendingActivity
LastFailureReason: t.LastFailureReason,
LastWorkerIdentity: t.GetLastWorkerIdentity(),
LastFailureDetails: t.LastFailureDetails,
StartedWorkerIdentity: t.GetStartedWorkerIdentity(),
}
}

Expand Down
9 changes: 9 additions & 0 deletions common/types/shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -3645,6 +3645,7 @@ type PendingActivityInfo struct {
ScheduledTimestamp *int64 `json:"scheduledTimestamp,omitempty"`
ExpirationTimestamp *int64 `json:"expirationTimestamp,omitempty"`
LastFailureReason *string `json:"lastFailureReason,omitempty"`
StartedWorkerIdentity string `json:"startedWorkerIdentity,omitempty"`
LastWorkerIdentity string `json:"lastWorkerIdentity,omitempty"`
LastFailureDetails []byte `json:"lastFailureDetails,omitempty"`
}
Expand Down Expand Up @@ -3705,6 +3706,14 @@ func (v *PendingActivityInfo) GetLastFailureReason() (o string) {
return
}

// GetStartedWorkerIdentity is an internal getter (TBD...)
func (v *PendingActivityInfo) GetStartedWorkerIdentity() (o string) {
if v != nil {
return v.StartedWorkerIdentity
}
return
}

// GetLastWorkerIdentity is an internal getter (TBD...)
func (v *PendingActivityInfo) GetLastWorkerIdentity() (o string) {
if v != nil {
Expand Down
1 change: 1 addition & 0 deletions common/types/testdata/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,7 @@ var (
LastFailureReason: &FailureReason,
LastWorkerIdentity: Identity,
LastFailureDetails: FailureDetails,
StartedWorkerIdentity: Identity,
}
PendingActivityInfoArray = []*types.PendingActivityInfo{
&PendingActivityInfo,
Expand Down
3 changes: 3 additions & 0 deletions service/history/historyEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -1722,6 +1722,9 @@ func (e *historyEngineImpl) DescribeWorkflowExecution(
if ai.LastWorkerIdentity != "" {
p.LastWorkerIdentity = ai.LastWorkerIdentity
}
if ai.StartedIdentity != "" {
p.StartedWorkerIdentity = ai.StartedIdentity
}
}
result.PendingActivities = append(result.PendingActivities, p)
}
Expand Down

0 comments on commit e57c3b2

Please sign in to comment.