@@ -17,7 +17,6 @@ limitations under the License.
17
17
package plugin
18
18
19
19
import (
20
- "context"
21
20
"fmt"
22
21
"io"
23
22
"os/exec"
@@ -143,15 +142,15 @@ func readFromReader(reader io.ReadCloser, maxBytes int64) ([]byte, error) {
143
142
}
144
143
145
144
func (p * Plugin ) run (rule cpmtypes.CustomRule ) (exitStatus cpmtypes.Status , output string ) {
146
- var ctx context. Context
147
- var cancel context. CancelFunc
145
+ isTimeout := false
146
+ isHung := false
148
147
148
+ var timeoutDuration time.Duration
149
149
if rule .Timeout != nil && * rule .Timeout < * p .config .PluginGlobalConfig .Timeout {
150
- ctx , cancel = context . WithTimeout ( context . Background (), * rule .Timeout )
150
+ timeoutDuration = * rule .Timeout
151
151
} else {
152
- ctx , cancel = context . WithTimeout ( context . Background (), * p .config .PluginGlobalConfig .Timeout )
152
+ timeoutDuration = * p .config .PluginGlobalConfig .Timeout
153
153
}
154
- defer cancel ()
155
154
156
155
cmd := util .Exec (rule .Path , rule .Args ... )
157
156
@@ -170,37 +169,6 @@ func (p *Plugin) run(rule cpmtypes.CustomRule) (exitStatus cpmtypes.Status, outp
170
169
return cpmtypes .Unknown , "Error in starting plugin. Please check the error log"
171
170
}
172
171
173
- waitChan := make (chan struct {})
174
- defer close (waitChan )
175
-
176
- var m sync.Mutex
177
- timeout := false
178
-
179
- go func () {
180
- select {
181
- case <- ctx .Done ():
182
- if ctx .Err () == context .Canceled {
183
- return
184
- }
185
- klog .Errorf ("Error in running plugin timeout %q" , rule .Path )
186
- if cmd .Process == nil || cmd .Process .Pid == 0 {
187
- klog .Errorf ("Error in cmd.Process check %q" , rule .Path )
188
- break
189
- }
190
-
191
- m .Lock ()
192
- timeout = true
193
- m .Unlock ()
194
-
195
- err := util .Kill (cmd )
196
- if err != nil {
197
- klog .Errorf ("Error in kill process %d, %v" , cmd .Process .Pid , err )
198
- }
199
- case <- waitChan :
200
- return
201
- }
202
- }()
203
-
204
172
var (
205
173
wg sync.WaitGroup
206
174
stdout []byte
@@ -220,14 +188,46 @@ func (p *Plugin) run(rule cpmtypes.CustomRule) (exitStatus cpmtypes.Status, outp
220
188
}()
221
189
// This will wait for the reads to complete. If the execution times out, the pipes
222
190
// will be closed and the wait group unblocks.
223
- wg .Wait ()
191
+ // If the timeout is caused by the plugin process or sub-process hung due to GPU device errors or other reasons,
192
+ // wg.Wait() will be blocked forever, so we need to add a timeout to the wait group.
193
+ waitChan := make (chan struct {})
194
+ go func () {
195
+ wg .Wait ()
196
+ close (waitChan )
197
+ }()
198
+ select {
199
+ case <- waitChan :
200
+ // The reads are done.
201
+ break
202
+ case <- time .After (timeoutDuration ):
203
+ klog .Errorf ("Waiting for command output timed out when running plugin %q" , rule .Path )
204
+ isTimeout = true
205
+ err := util .Kill (cmd )
206
+ if err != nil {
207
+ klog .Errorf ("Error when killing process %d: %v" , cmd .Process .Pid , err )
208
+ } else {
209
+ klog .Infof ("Killed process %d successfully" , cmd .Process .Pid )
210
+ }
224
211
225
- if stdoutErr != nil {
212
+ // Check if the process is in D state. If it is, the process is hung and can not be killed.
213
+ // It also means that the plugin can not report the correct status, instead reports Unknown status.
214
+ // On a GPU machine, a plugin with Python script calling pynvml API may hang in D state due to some GPU device errors.
215
+ if util .IsProcessInDState (cmd .Process .Pid ) {
216
+ klog .Errorf ("Process %d is hung in D state" , cmd .Process .Pid )
217
+ isHung = true
218
+ }
219
+ }
220
+
221
+ if isHung {
222
+ return cpmtypes .Unknown , fmt .Sprintf ("Process is hung when running plugin %s" , rule .Path )
223
+ }
224
+
225
+ if ! isTimeout && stdoutErr != nil {
226
226
klog .Errorf ("Error reading stdout for plugin %q: error - %v" , rule .Path , err )
227
227
return cpmtypes .Unknown , "Error reading stdout for plugin. Please check the error log"
228
228
}
229
229
230
- if stderrErr != nil {
230
+ if ! isTimeout && stderrErr != nil {
231
231
klog .Errorf ("Error reading stderr for plugin %q: error - %v" , rule .Path , err )
232
232
return cpmtypes .Unknown , "Error reading stderr for plugin. Please check the error log"
233
233
}
@@ -239,16 +239,13 @@ func (p *Plugin) run(rule cpmtypes.CustomRule) (exitStatus cpmtypes.Status, outp
239
239
}
240
240
}
241
241
242
- // trim suffix useless bytes
243
- output = string (stdout )
244
- output = strings .TrimSpace (output )
245
-
246
- m .Lock ()
247
- cmdKilled := timeout
248
- m .Unlock ()
249
-
250
- if cmdKilled {
251
- output = fmt .Sprintf ("Timeout when running plugin %q: state - %s. output - %q" , rule .Path , cmd .ProcessState .String (), output )
242
+ stderrStr := ""
243
+ if isTimeout {
244
+ output = fmt .Sprintf ("Timeout when running plugin %q: state - %s. output - %q" , rule .Path , cmd .ProcessState .String (), "" )
245
+ } else {
246
+ // trim suffix useless bytes
247
+ output = strings .TrimSpace (string (stdout ))
248
+ stderrStr = strings .TrimSpace (string (stderr ))
252
249
}
253
250
254
251
// cut at position max_output_length if stdout is longer than max_output_length bytes
@@ -259,13 +256,13 @@ func (p *Plugin) run(rule cpmtypes.CustomRule) (exitStatus cpmtypes.Status, outp
259
256
exitCode := cmd .ProcessState .Sys ().(syscall.WaitStatus ).ExitStatus ()
260
257
switch exitCode {
261
258
case 0 :
262
- logPluginStderr (rule , string ( stderr ) , 3 )
259
+ logPluginStderr (rule , stderrStr , 3 )
263
260
return cpmtypes .OK , output
264
261
case 1 :
265
- logPluginStderr (rule , string ( stderr ) , 0 )
262
+ logPluginStderr (rule , stderrStr , 0 )
266
263
return cpmtypes .NonOK , output
267
264
default :
268
- logPluginStderr (rule , string ( stderr ) , 0 )
265
+ logPluginStderr (rule , stderrStr , 0 )
269
266
return cpmtypes .Unknown , output
270
267
}
271
268
}
0 commit comments