99 "sync"
1010 "time"
1111
12+ "github.com/linuxsuren/api-testing/pkg/limit"
1213 "github.com/linuxsuren/api-testing/pkg/render"
1314 "github.com/linuxsuren/api-testing/pkg/runner"
1415 "github.com/linuxsuren/api-testing/pkg/testing"
@@ -17,10 +18,16 @@ import (
1718)
1819
1920type runOption struct {
20- pattern string
21- duration time.Duration
22- thread int64
23- context context.Context
21+ pattern string
22+ duration time.Duration
23+ requestTimeout time.Duration
24+ requestIgnoreError bool
25+ thread int64
26+ context context.Context
27+ qps int32
28+ burst int32
29+ limiter limit.RateLimiter
30+ startTime time.Time
2431}
2532
2633// CreateRunCommand returns the run command
@@ -40,13 +47,23 @@ See also https://github.com/LinuxSuRen/api-testing/tree/master/sample`,
4047 flags .StringVarP (& opt .pattern , "pattern" , "p" , "test-suite-*.yaml" ,
4148 "The file pattern which try to execute the test cases" )
4249 flags .DurationVarP (& opt .duration , "duration" , "" , 0 , "Running duration" )
50+ flags .DurationVarP (& opt .requestTimeout , "request-timeout" , "" , time .Minute , "Timeout for per request" )
51+ flags .BoolVarP (& opt .requestIgnoreError , "request-ignore-error" , "" , false , "Indicate if ignore the request error" )
4352 flags .Int64VarP (& opt .thread , "thread" , "" , 1 , "Threads of the execution" )
53+ flags .Int32VarP (& opt .qps , "qps" , "" , 5 , "QPS" )
54+ flags .Int32VarP (& opt .burst , "burst" , "" , 5 , "burst" )
4455 return
4556}
4657
4758func (o * runOption ) runE (cmd * cobra.Command , args []string ) (err error ) {
4859 var files []string
60+ o .startTime = time .Now ()
4961 o .context = cmd .Context ()
62+ o .limiter = limit .NewDefaultRateLimiter (o .qps , o .burst )
63+ defer func () {
64+ cmd .Printf ("consume: %s\n " , time .Now ().Sub (o .startTime ).String ())
65+ o .limiter .Stop ()
66+ }()
5067
5168 if files , err = filepath .Glob (o .pattern ); err == nil {
5269 for i := range files {
@@ -69,13 +86,15 @@ func (o *runOption) runSuiteWithDuration(suite string) (err error) {
6986 // make sure having a valid timer
7087 timeout = time .NewTicker (time .Second )
7188 }
72- errChannel := make (chan error , 10 )
89+ errChannel := make (chan error , 10 * o .thread )
90+ stopSingal := make (chan struct {}, 1 )
7391 var wait sync.WaitGroup
7492
7593 for ! stop {
7694 select {
7795 case <- timeout .C :
7896 stop = true
97+ stopSingal <- struct {}{}
7998 case err = <- errChannel :
8099 if err != nil {
81100 stop = true
@@ -85,32 +104,38 @@ func (o *runOption) runSuiteWithDuration(suite string) (err error) {
85104 continue
86105 }
87106 wait .Add (1 )
88- if o .duration <= 0 {
89- stop = true
90- }
91107
92- go func (ch chan error ) {
108+ go func (ch chan error , sem * semaphore.Weighted ) {
109+ now := time .Now ()
93110 defer sem .Release (1 )
94111 defer wait .Done ()
112+ defer func () {
113+ fmt .Println ("routing end with" , time .Now ().Sub (now ))
114+ }()
95115
96- ctx := getDefaultContext ()
97- ch <- runSuite (suite , ctx )
98- }(errChannel )
116+ dataContext := getDefaultContext ()
117+ ch <- o . runSuite (suite , dataContext , o . context , stopSingal )
118+ }(errChannel , sem )
99119 }
100120 }
101- err = <- errChannel
121+
122+ select {
123+ case err = <- errChannel :
124+ case <- stopSingal :
125+ }
126+
102127 wait .Wait ()
103128 return
104129}
105130
106- func runSuite (suite string , ctx map [string ]interface {}) (err error ) {
131+ func ( o * runOption ) runSuite (suite string , dataContext map [string ]interface {}, ctx context. Context , stopSingal chan struct {}) (err error ) {
107132 var testSuite * testing.TestSuite
108133 if testSuite , err = testing .Parse (suite ); err != nil {
109134 return
110135 }
111136
112137 var result string
113- if result , err = render .Render ("base api" , testSuite .API , ctx ); err == nil {
138+ if result , err = render .Render ("base api" , testSuite .API , dataContext ); err == nil {
114139 testSuite .API = result
115140 testSuite .API = strings .TrimSuffix (testSuite .API , "/" )
116141 } else {
@@ -123,12 +148,25 @@ func runSuite(suite string, ctx map[string]interface{}) (err error) {
123148 testCase .Request .API = fmt .Sprintf ("%s%s" , testSuite .API , testCase .Request .API )
124149 }
125150
126- setRelativeDir (suite , & testCase )
127151 var output interface {}
128- if output , err = runner .RunTestCase (& testCase , ctx ); err != nil {
152+ select {
153+ case <- stopSingal :
129154 return
155+ default :
156+ // reuse the API prefix
157+ if strings .HasPrefix (testCase .Request .API , "/" ) {
158+ testCase .Request .API = fmt .Sprintf ("%s%s" , testSuite .API , testCase .Request .API )
159+ }
160+
161+ setRelativeDir (suite , & testCase )
162+ o .limiter .Accept ()
163+
164+ ctxWithTimeout , _ := context .WithTimeout (ctx , o .requestTimeout )
165+ if output , err = runner .RunTestCase (& testCase , dataContext , ctxWithTimeout ); err != nil && ! o .requestIgnoreError {
166+ return
167+ }
130168 }
131- ctx [testCase .Name ] = output
169+ dataContext [testCase .Name ] = output
132170 }
133171 return
134172}
0 commit comments