forked from uber/cadence
-
Notifications
You must be signed in to change notification settings - Fork 0
/
common.go
114 lines (100 loc) · 3.92 KB
/
common.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
// Copyright (c) 2019 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
package canary
import (
"context"
"fmt"
"time"
"go.uber.org/cadence/workflow"
"go.uber.org/zap"
)
func maxInt64(a, b int64) int64 {
if a > b {
return a
}
return b
}
func absDurationDiff(d1, d2 time.Duration) time.Duration {
if d1 > d2 {
return d1 - d2
}
return d2 - d1
}
func stringPtr(v string) *string {
return &v
}
func int32Ptr(v int32) *int32 {
return &v
}
// getContextValue retrieves and returns the value corresponding
// to the given key - panics if the key does not exist
func getContextValue(ctx context.Context, key contextKey) interface{} {
value := ctx.Value(key)
if value == nil {
panic("ctx.Value(" + key + ") returned nil")
}
return value
}
// getActivityContext retrieves and returns the activity context from the
// global context passed to the activity
func getActivityContext(ctx context.Context) *activityContext {
return getContextValue(ctx, ctxKeyActivityRuntime).(*activityContext)
}
// getActivityArchivalContext retrieves and returns the activity archival context from the
// global context passed to the activity
func getActivityArchivalContext(ctx context.Context) *activityContext {
return getContextValue(ctx, ctxKeyActivityArchivalRuntime).(*activityContext)
}
// checkWFVersionCompatibility takes a workflow.Context param and
// validates that the workflow task currently being handled
// is compatible with this version of the canary - this method
// MUST only be called within a workflow function and it MUST
// be the first line in the workflow function
// Returns an error if the version is incompatible
func checkWFVersionCompatibility(ctx workflow.Context) error {
version := workflow.GetVersion(ctx, workflowChangeID, workflowVersion, workflowVersion)
if version != workflowVersion {
workflow.GetLogger(ctx).Error("workflow version mismatch",
zap.Int("want", int(workflowVersion)), zap.Int("got", int(version)))
return fmt.Errorf("workflow version mismatch, want=%v, got=%v", workflowVersion, version)
}
return nil
}
// beginWorkflow executes the common steps involved in all the workflow functions
// It checks for workflow task version compatibility and also records the execution
// in m3. This function must be the first call in every workflow function
// Returns metrics scope on success, error on failure
func beginWorkflow(ctx workflow.Context, wfType string, scheduledTimeNanos int64) (*workflowMetricsProfile, error) {
profile := recordWorkflowStart(ctx, wfType, scheduledTimeNanos)
if err := checkWFVersionCompatibility(ctx); err != nil {
profile.scope.Counter(errIncompatibleVersion).Inc(1)
return nil, profile.end(err)
}
return profile, nil
}
func concat(first string, second string) string {
return first + "/" + second
}
func getScheduledTimeFromInputIfNonZero(ctx workflow.Context, nanos int64) int64 {
if nanos == 0 {
return workflow.Now(ctx).UnixNano()
}
return nanos
}