@@ -5,12 +5,10 @@ import (
5
5
"bytes"
6
6
"encoding/json"
7
7
"fmt"
8
- "io"
9
8
"log"
10
9
"os"
11
10
"regexp"
12
11
"strings"
13
- "sync"
14
12
"time"
15
13
16
14
"github.com/iron-io/iron_go3/worker"
@@ -37,31 +35,32 @@ func cleanIronGeneric(output []byte) []byte {
37
35
return output
38
36
}
39
37
40
- func cleanPython27IronOutput (output string ) (string , error ) {
38
+ func cleanIronTaskIdAndTimestamp (output string ) (string , error ) {
41
39
var buf bytes.Buffer
42
- var requestId string = ""
40
+ var taskId string = ""
43
41
// expecting request id as hex of bson_id
44
42
requestIdPattern , _ := regexp .Compile ("[a-f0-9]{24}" )
45
43
scanner := bufio .NewScanner (strings .NewReader (output ))
46
44
for scanner .Scan () {
47
45
line := scanner .Text ()
48
46
49
- if requestId == "" {
47
+ if taskId == "" {
50
48
parts := strings .Fields (line )
51
49
52
50
// generic logging through logger.info, logger.warning & etc
53
51
if len (parts ) >= 3 {
54
52
requestIdCandidate := parts [2 ]
55
53
if requestIdPattern .MatchString (requestIdCandidate ) {
56
- requestId = requestIdCandidate
54
+ taskId = requestIdCandidate
57
55
}
58
56
}
59
57
}
60
58
61
- line = util .RemoveTimestampAndRequestIdFromLogLine (line , requestId )
62
-
63
- buf .WriteString (line )
64
- buf .WriteRune ('\n' )
59
+ line , isOk := util .RemoveTimestampAndRequestIdFromIronLogLine (line , taskId )
60
+ if isOk {
61
+ buf .WriteString (line )
62
+ buf .WriteRune ('\n' )
63
+ }
65
64
if err := scanner .Err (); err != nil {
66
65
return "" , err
67
66
}
@@ -70,57 +69,77 @@ func cleanPython27IronOutput(output string) (string, error) {
70
69
return buf .String (), nil
71
70
}
72
71
73
- func cleanIron (runtime string , output []byte ) ([]byte , error ) {
72
+ func cleanIron (output []byte ) ([]byte , error ) {
74
73
output = cleanIronGeneric (output )
75
- switch runtime {
76
- case "python2.7" :
77
- cleaned , err := cleanPython27IronOutput (string (output ))
78
- return []byte (cleaned ), err
79
- default :
80
- return output , nil
81
- }
74
+ cleaned , err := cleanIronTaskIdAndTimestamp (string (output ))
75
+ return []byte (cleaned ), err
82
76
}
83
77
84
- func runOnIron (w * worker.Worker , wg * sync.WaitGroup , test * util.TestDescription , result chan <- io.Reader ) {
85
- var imagePrefix string
86
- if imagePrefix = os .Getenv ("IRON_LAMBDA_TEST_IMAGE_PREFIX" ); imagePrefix == "" {
87
- log .Fatalf ("IRON_LAMBDA_TEST_IMAGE_PREFIX not set" )
88
- }
78
+ //Returns a result and a debug channels. The channels are closed on test run finalization
79
+ func runOnIron (w * worker.Worker , test * util.TestDescription ) (<- chan string , <- chan string ) {
80
+ result := make (chan string , 1 )
81
+ debug := make (chan string , 1 )
82
+ go func () {
83
+ defer close (result )
84
+ defer close (debug )
85
+ var imagePrefix string
86
+ if imagePrefix = os .Getenv ("IRON_LAMBDA_TEST_IMAGE_PREFIX" ); imagePrefix == "" {
87
+ log .Fatalf ("IRON_LAMBDA_TEST_IMAGE_PREFIX not set" )
88
+ }
89
89
90
- var output bytes.Buffer
91
- defer func () {
92
- result <- & output
93
- wg .Done ()
94
- }()
90
+ payload , _ := json .Marshal (test .Event )
91
+ timeout := time .Duration (test .Timeout ) * time .Second
95
92
96
- payload , _ := json .Marshal (test .Event )
97
- timeout := time .Duration (test .Timeout ) * time .Second
93
+ debug <- "Enqueuing the task"
94
+ taskids , err := w .TaskQueue (worker.Task {
95
+ Cluster : "internal" ,
96
+ CodeName : fmt .Sprintf ("%s/%s" , imagePrefix , test .Name ),
97
+ Payload : string (payload ),
98
+ Timeout : & timeout ,
99
+ })
98
100
99
- taskids , err := w .TaskQueue (worker.Task {
100
- Cluster : "internal" ,
101
- CodeName : fmt .Sprintf ("%s/%s" , imagePrefix , test .Name ),
102
- Payload : string (payload ),
103
- Timeout : & timeout ,
104
- })
101
+ if err != nil {
102
+ debug <- fmt .Sprintf ("Error queueing task %s" , err )
103
+ return
104
+ }
105
105
106
- if err != nil {
107
- output . WriteString ( fmt . Sprintf ( "Error queueing task %s %s" , test . Name , err ))
108
- return
109
- }
106
+ if len ( taskids ) < 1 {
107
+ debug <- "Something went wrong, empty taskids list"
108
+ return
109
+ }
110
110
111
- if len (taskids ) < 1 {
112
- output .WriteString (fmt .Sprintf ("Something went wrong, empty taskids list" , test .Name ))
113
- return
114
- }
111
+ end := time .After (timeout )
112
+ taskid := taskids [0 ]
113
+ debug <- fmt .Sprintf ("Task Id: %s" , taskid )
115
114
116
- taskid := taskids [0 ]
115
+ debug <- "Waiting for task"
116
+ select {
117
+ case <- w .WaitForTask (taskid ):
118
+ case <- end :
119
+ debug <- fmt .Sprintf ("Task timed out %s" , taskid )
120
+ return
121
+ }
117
122
118
- <- w .WaitForTask (taskid )
119
- iron_log := <- w .WaitForTaskLog (taskid )
120
- cleanedLog , err := cleanIron (test .Runtime , iron_log )
121
- if err != nil {
122
- output .WriteString (fmt .Sprintf ("Error processing a log for task %s %s" , test .Name , err ))
123
- } else {
124
- output .Write (cleanedLog )
125
- }
123
+ var iron_log []byte
124
+ debug <- "Waiting for task log"
125
+ select {
126
+ case _iron_log , wait_log_ok := <- w .WaitForTaskLog (taskid ):
127
+ if ! wait_log_ok {
128
+ debug <- fmt .Sprintf ("Something went wrong, no task log %s" , taskid )
129
+ return
130
+ }
131
+ iron_log = _iron_log
132
+ case <- end :
133
+ debug <- fmt .Sprintf ("Task timed out to get task log or the log is empty %s" , taskid )
134
+ return
135
+ }
136
+
137
+ cleanedLog , err := cleanIron (iron_log )
138
+ if err != nil {
139
+ debug <- fmt .Sprintf ("Error processing a log %s" , test .Name )
140
+ } else {
141
+ result <- string (cleanedLog )
142
+ }
143
+ }()
144
+ return result , debug
126
145
}
0 commit comments