-
Notifications
You must be signed in to change notification settings - Fork 48
/
Copy pathprofile_loop_awsbinary.go
184 lines (163 loc) · 4.9 KB
/
profile_loop_awsbinary.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
// +build lambdabinary
package sparta
import (
"fmt"
"os"
"path"
"path/filepath"
"runtime/pprof"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
spartaAWS "github.com/mweagle/Sparta/aws"
"github.com/sirupsen/logrus"
)
var currentSlot int
var stackName string
var profileBucket string
const snapshotCount = 3
func nextUploadSlot() int {
uploadSlot := currentSlot
currentSlot = (currentSlot + 1) % snapshotCount
return uploadSlot
}
func init() {
currentSlot = 0
// These correspond to the environment variables that were published
// into the Lambda environment by the profile decorator
stackName = os.Getenv(envVarStackName)
profileBucket = os.Getenv(envVarProfileBucketName)
}
func profileOutputFile(basename string) (*os.File, error) {
fileName := fmt.Sprintf("%s.%s.profile", basename, InstanceID())
// http://docs.aws.amazon.com/lambda/latest/dg/current-supported-versions.html
if os.Getenv("_LAMBDA_SERVER_PORT") != "" {
fileName = filepath.Join("/tmp", fileName)
}
return os.Create(fileName)
}
////////////////////////////////////////////////////////////////////////////////
// Type returned from worker pool uploading profiles to S3
type uploadResult struct {
err error
uploaded bool
}
func (ur *uploadResult) Error() error {
return ur.err
}
func (ur *uploadResult) Result() interface{} {
return ur.uploaded
}
func uploadFileTask(uploader *s3manager.Uploader,
profileType string,
uploadSlot int,
localFilePath string,
logger *logrus.Logger) taskFunc {
return func() workResult {
fileReader, fileReaderErr := os.Open(localFilePath)
if fileReaderErr != nil {
return &uploadResult{err: fileReaderErr}
}
defer fileReader.Close()
defer os.Remove(localFilePath)
uploadFileName := fmt.Sprintf("%d-%s", uploadSlot, path.Base(localFilePath))
keyPath := path.Join(profileSnapshotRootKeypathForType(profileType, stackName), uploadFileName)
uploadInput := &s3manager.UploadInput{
Bucket: aws.String(profileBucket),
Key: aws.String(keyPath),
Body: fileReader,
}
uploadOutput, uploadErr := uploader.Upload(uploadInput)
return &uploadResult{
err: uploadErr,
uploaded: uploadOutput != nil,
}
}
}
func snapshotProfiles(s3BucketArchive interface{},
snapshotInterval time.Duration,
cpuProfileDuration time.Duration,
profileTypes ...string) {
// The session the S3 Uploader will use
profileLogger, _ := NewLogger("")
publishProfiles := func(cpuProfilePath string) {
profileLogger.WithFields(logrus.Fields{
"CPUProfilePath": cpuProfilePath,
"Types": profileTypes,
}).Info("Publishing CPU profile")
uploadSlot := nextUploadSlot()
sess := spartaAWS.NewSession(profileLogger)
uploader := s3manager.NewUploader(sess)
uploadTasks := make([]*workTask, 0)
if cpuProfilePath != "" {
uploadTasks = append(uploadTasks,
newWorkTask(uploadFileTask(uploader,
"cpu",
uploadSlot,
cpuProfilePath,
profileLogger)))
}
for _, eachProfileType := range profileTypes {
namedProfile := pprof.Lookup(eachProfileType)
if namedProfile != nil {
outputProfile, outputFileErr := profileOutputFile(eachProfileType)
if outputFileErr != nil {
profileLogger.WithFields(logrus.Fields{
"Error": outputFileErr,
}).Error("Failed to CPU profile file")
} else {
namedProfile.WriteTo(outputProfile, 0)
outputProfile.Close()
uploadTasks = append(uploadTasks,
newWorkTask(uploadFileTask(uploader,
eachProfileType,
uploadSlot,
outputProfile.Name(),
profileLogger)))
}
}
}
workerPool := newWorkerPool(uploadTasks, 32)
workerPool.Run()
ScheduleProfileLoop(s3BucketArchive,
snapshotInterval,
cpuProfileDuration,
profileTypes...)
}
if cpuProfileDuration != 0 {
outputFile, outputFileErr := profileOutputFile("cpu")
if outputFileErr != nil {
profileLogger.Warn("Failed to create cpu profile path: %s\n",
outputFileErr.Error())
return
}
startErr := pprof.StartCPUProfile(outputFile)
if startErr != nil {
profileLogger.Warn("Failed to start CPU profile: %s\n", startErr.Error())
}
profileLogger.Info("Opened CPU profile")
time.AfterFunc(cpuProfileDuration, func() {
pprof.StopCPUProfile()
profileLogger.Info("Opened CPU profile")
closeErr := outputFile.Close()
if closeErr != nil {
profileLogger.Warn("Failed to close CPU profile output: %s\n", closeErr.Error())
} else {
publishProfiles(outputFile.Name())
}
})
} else {
publishProfiles("")
}
}
// ScheduleProfileLoop installs a profiling loop that pushes profile information
// to S3 for local consumption using a `profile` command that wraps
// pprof
func ScheduleProfileLoop(s3BucketArchive interface{},
snapshotInterval time.Duration,
cpuProfileDuration time.Duration,
profileTypes ...string) {
time.AfterFunc(snapshotInterval, func() {
snapshotProfiles(s3BucketArchive, snapshotInterval, cpuProfileDuration, profileTypes...)
})
}