@@ -21,11 +21,10 @@ import (
21
21
"time"
22
22
23
23
"github.com/aws/aws-sdk-go/aws"
24
- "github.com/aws/aws-sdk-go/aws/awserr"
25
24
"github.com/aws/aws-sdk-go/service/cloudwatchlogs"
26
25
"github.com/gorilla/websocket"
27
26
28
- "github.com/cortexlabs/cortex/pkg/lib/errors "
27
+ awslib "github.com/cortexlabs/cortex/pkg/lib/aws "
29
28
"github.com/cortexlabs/cortex/pkg/lib/sets/strset"
30
29
"github.com/cortexlabs/cortex/pkg/operator/api/context"
31
30
"github.com/cortexlabs/cortex/pkg/operator/config"
@@ -75,7 +74,7 @@ func StreamFromCloudWatch(podCheckCancel chan struct{}, appName string, apiName
75
74
"apiName" : apiName ,
76
75
}
77
76
78
- var latestContextID string
77
+ var currentContextID string
79
78
var podTemplateHash string
80
79
var ctx * context.Context
81
80
@@ -91,21 +90,20 @@ func StreamFromCloudWatch(podCheckCancel chan struct{}, appName string, apiName
91
90
if ctx == nil {
92
91
writeString (socket , "\n deployment " + appName + "not found" )
93
92
closeSocket (socket )
94
- return
93
+ continue
95
94
}
96
95
97
96
if _ , ok := ctx .APIs [apiName ]; ! ok {
98
97
writeString (socket , "\n api " + apiName + " was not found in latest deployment" )
99
98
closeSocket (socket )
100
- return
101
- }
102
-
103
- if len (latestContextID ) != 0 && ctx .ID != latestContextID {
104
- writeString (socket , "\n a new deployment was detected, streaming logs from the latest deployment" )
99
+ continue
105
100
}
106
101
107
- if ctx .ID != latestContextID {
108
- latestContextID = ctx .ID
102
+ if ctx .ID != currentContextID {
103
+ if len (currentContextID ) != 0 {
104
+ writeString (socket , "\n a new deployment was detected, streaming logs from the latest deployment" )
105
+ }
106
+ currentContextID = ctx .ID
109
107
podSearchLabels ["workloadID" ] = ctx .APIs [apiName ].WorkloadID
110
108
podTemplateHash = ""
111
109
wrotePending = false
@@ -121,7 +119,7 @@ func StreamFromCloudWatch(podCheckCancel chan struct{}, appName string, apiName
121
119
if err != nil {
122
120
writeString (socket , err .Error ())
123
121
closeSocket (socket )
124
- return
122
+ continue
125
123
}
126
124
}
127
125
@@ -141,8 +139,7 @@ func StreamFromCloudWatch(podCheckCancel chan struct{}, appName string, apiName
141
139
})
142
140
143
141
if err != nil {
144
- awsErr := errors .Cause (err ).(awserr.Error )
145
- if awsErr .Code () == "ResourceNotFoundException" {
142
+ if awslib .CheckErrCode (err , "ResourceNotFoundException" ) {
146
143
if ! wrotePending {
147
144
writeString (socket , "pending..." )
148
145
wrotePending = true
@@ -193,10 +190,12 @@ func getPodTemplateHash(labels map[string]string) (string, error) {
193
190
}
194
191
195
192
func writeString (socket * websocket.Conn , message string ) {
193
+ socket .SetWriteDeadline (time .Now ().Add (socketWriteDeadlineWait ))
196
194
socket .WriteMessage (websocket .TextMessage , []byte (message ))
197
195
}
198
196
199
197
func closeSocket (socket * websocket.Conn ) {
198
+ socket .SetWriteDeadline (time .Now ().Add (socketWriteDeadlineWait ))
200
199
socket .WriteMessage (websocket .CloseMessage , websocket .FormatCloseMessage (websocket .CloseNormalClosure , "" ))
201
200
time .Sleep (socketCloseGracePeriod )
202
201
}
0 commit comments