Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions runner/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,8 @@ type Options struct {
RateLimitMinute int
Probe bool
Resume bool
RetryRounds int
RetryDelay int
resumeCfg *ResumeCfg
Exclude goflags.StringSlice
HostMaxErrors int
Expand Down Expand Up @@ -530,6 +532,8 @@ func ParseOptions() *Options {
flagSet.DurationVar(&options.Delay, "delay", -1, "duration between each http request (eg: 200ms, 1s)"),
flagSet.IntVarP(&options.MaxResponseBodySizeToSave, "response-size-to-save", "rsts", math.MaxInt32, "max response size to save in bytes"),
flagSet.IntVarP(&options.MaxResponseBodySizeToRead, "response-size-to-read", "rstr", math.MaxInt32, "max response size to read in bytes"),
flagSet.IntVar(&options.RetryRounds, "retry-rounds", 0, "number of retry rounds for HTTP 429 responses (Too Many Requests)"),
flagSet.IntVar(&options.RetryDelay, "retry-delay", 500, "delay between retry rounds for HTTP 429 responses (e.g. 5ms, 30ms)"),
)

flagSet.CreateGroup("cloud", "Cloud",
Expand Down Expand Up @@ -757,6 +761,10 @@ func (options *Options) ValidateOptions() error {
options.Threads = defaultThreads
}

if options.RetryRounds > 0 && options.RetryDelay <= 0 {
return errors.New(fmt.Sprintf("invalid retry-delay: must be >0 when retry-rounds=%d (got %d)", options.RetryRounds, options.RetryDelay))
}

return nil
}

Expand Down
119 changes: 107 additions & 12 deletions runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"time"

"golang.org/x/exp/maps"
Expand Down Expand Up @@ -1257,6 +1258,9 @@ func (r *Runner) RunEnumeration() {
}(nextStep)

wg, _ := syncutil.New(syncutil.WithSize(r.options.Threads))
retryCh := make(chan retryJob)

_, drainedCh := r.retryLoop(context.Background(), retryCh, output, r.analyze)

processItem := func(k string) error {
if r.options.resumeCfg != nil {
Expand All @@ -1279,10 +1283,10 @@ func (r *Runner) RunEnumeration() {
for _, p := range r.options.requestURIs {
scanopts := r.scanopts.Clone()
scanopts.RequestURI = p
r.process(k, wg, r.hp, protocol, scanopts, output)
r.process(k, wg, r.hp, protocol, scanopts, output, retryCh)
}
} else {
r.process(k, wg, r.hp, protocol, &r.scanopts, output)
r.process(k, wg, r.hp, protocol, &r.scanopts, output, retryCh)
}

return nil
Expand All @@ -1299,9 +1303,10 @@ func (r *Runner) RunEnumeration() {
}

wg.Wait()

if r.options.RetryRounds > 0 {
<-drainedCh
}
close(output)

wgoutput.Wait()

if r.scanopts.StoreVisionReconClusters {
Expand All @@ -1323,6 +1328,70 @@ func (r *Runner) RunEnumeration() {
}
}

type analyzeFunc func(*httpx.HTTPX, string, httpx.Target, string, string, *ScanOptions) Result

func (r *Runner) retryLoop(
parent context.Context,
retryCh chan retryJob,
output chan<- Result,
analyze analyzeFunc,
) (stop func(), drained <-chan struct{}) {
var remaining atomic.Int64
ctx, cancel := context.WithCancel(parent)
drainedCh := make(chan struct{})

go func() {
defer close(retryCh)

for {
select {
case <-ctx.Done():
return
case job, ok := <-retryCh:
if !ok {
return
}
if job.attempt == 1 {
remaining.Add(1)
}

go func(j retryJob) {
if wait := time.Until(j.when); wait > 0 {
timer := time.NewTimer(wait)
select {
case <-ctx.Done():
timer.Stop()
return
case <-timer.C:
}
}

res := analyze(j.hp, j.protocol, j.target, j.method, j.origInput, j.scanopts)
output <- res

if res.StatusCode == http.StatusTooManyRequests && j.attempt < r.options.RetryRounds {
j.attempt++
j.when = time.Now().Add(time.Duration(r.options.RetryDelay) * time.Millisecond)

select {
case <-ctx.Done():
return
case retryCh <- j:
return
}
}

if remaining.Add(-1) == 0 {
close(drainedCh)
}
}(job)
}
}
}()

return func() { cancel() }, drainedCh
}

func logFilteredErrorPage(fileName, url string) {
dir := filepath.Dir(fileName)
if !fileutil.FolderExists(dir) {
Expand Down Expand Up @@ -1380,11 +1449,11 @@ func (r *Runner) GetScanOpts() ScanOptions {
return r.scanopts
}

func (r *Runner) Process(t string, wg *syncutil.AdaptiveWaitGroup, protocol string, scanopts *ScanOptions, output chan Result) {
r.process(t, wg, r.hp, protocol, scanopts, output)
func (r *Runner) Process(t string, wg *syncutil.AdaptiveWaitGroup, protocol string, scanopts *ScanOptions, output chan Result, retryCh chan retryJob) {
r.process(t, wg, r.hp, protocol, scanopts, output, retryCh)
}

func (r *Runner) process(t string, wg *syncutil.AdaptiveWaitGroup, hp *httpx.HTTPX, protocol string, scanopts *ScanOptions, output chan Result) {
func (r *Runner) process(t string, wg *syncutil.AdaptiveWaitGroup, hp *httpx.HTTPX, protocol string, scanopts *ScanOptions, output chan Result, retryCh chan retryJob) {
// attempts to set the workpool size to the number of threads
if r.options.Threads > 0 && wg.Size != r.options.Threads {
if err := wg.Resize(context.Background(), r.options.Threads); err != nil {
Expand All @@ -1409,15 +1478,28 @@ func (r *Runner) process(t string, wg *syncutil.AdaptiveWaitGroup, hp *httpx.HTT
defer wg.Done()
result := r.analyze(hp, protocol, target, method, t, scanopts)
output <- result
if result.StatusCode == http.StatusTooManyRequests &&
r.options.RetryRounds > 0 {
retryCh <- retryJob{
hp: hp,
protocol: protocol,
target: target,
method: method,
origInput: t,
scanopts: scanopts.Clone(),
attempt: 1,
when: time.Now().Add(time.Duration(r.options.RetryDelay) * time.Millisecond),
}
}
if scanopts.TLSProbe && result.TLSData != nil {
for _, tt := range result.TLSData.SubjectAN {
if !r.testAndSet(tt) {
continue
}
r.process(tt, wg, hp, protocol, scanopts, output)
r.process(tt, wg, hp, protocol, scanopts, output, retryCh)
}
if r.testAndSet(result.TLSData.SubjectCN) {
r.process(result.TLSData.SubjectCN, wg, hp, protocol, scanopts, output)
r.process(result.TLSData.SubjectCN, wg, hp, protocol, scanopts, output, retryCh)
}
}
if scanopts.CSPProbe && result.CSPData != nil {
Expand All @@ -1428,7 +1510,7 @@ func (r *Runner) process(t string, wg *syncutil.AdaptiveWaitGroup, hp *httpx.HTT
if !r.testAndSet(tt) {
continue
}
r.process(tt, wg, hp, protocol, scanopts, output)
r.process(tt, wg, hp, protocol, scanopts, output, retryCh)
}
}
}(target, method, prot)
Expand Down Expand Up @@ -1463,15 +1545,28 @@ func (r *Runner) process(t string, wg *syncutil.AdaptiveWaitGroup, hp *httpx.HTT
}
result := r.analyze(hp, protocol, target, method, t, scanopts)
output <- result
if result.StatusCode == http.StatusTooManyRequests &&
r.options.RetryRounds > 0 {
retryCh <- retryJob{
hp: hp,
protocol: protocol,
target: target,
method: method,
origInput: t,
scanopts: scanopts.Clone(),
attempt: 1,
when: time.Now().Add(time.Duration(r.options.RetryDelay) * time.Millisecond),
}
}
if scanopts.TLSProbe && result.TLSData != nil {
for _, tt := range result.TLSData.SubjectAN {
if !r.testAndSet(tt) {
continue
}
r.process(tt, wg, hp, protocol, scanopts, output)
r.process(tt, wg, hp, protocol, scanopts, output, retryCh)
}
if r.testAndSet(result.TLSData.SubjectCN) {
r.process(result.TLSData.SubjectCN, wg, hp, protocol, scanopts, output)
r.process(result.TLSData.SubjectCN, wg, hp, protocol, scanopts, output, retryCh)
}
}
}(port, target, method, wantedProtocol)
Expand Down
103 changes: 99 additions & 4 deletions runner/runner_test.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,22 @@
package runner

import (
"context"
"fmt"
"net/http"
"net/http/httptest"
"os"
"strings"
"sync"
"sync/atomic"
"testing"
"time"

_ "github.com/projectdiscovery/fdmax/autofdmax"
"github.com/projectdiscovery/httpx/common/httpx"
"github.com/projectdiscovery/mapcidr/asn"
stringsutil "github.com/projectdiscovery/utils/strings"
syncutil "github.com/projectdiscovery/utils/sync"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -227,10 +233,10 @@ func TestCreateNetworkpolicyInstance_AllowDenyFlags(t *testing.T) {
runner := &Runner{}

tests := []struct {
name string
allow []string
deny []string
testCases []struct {
name string
allow []string
deny []string
testCases []struct {
ip string
expected bool
reason string
Expand Down Expand Up @@ -312,3 +318,92 @@ func TestCreateNetworkpolicyInstance_AllowDenyFlags(t *testing.T) {
})
}
}

func TestRunner_Process_And_RetryLoop(t *testing.T) {
var hits1, hits2 int32

// srv1: returns 429 for the first 3 requests, and 200 on the 4th request
srv1 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if atomic.AddInt32(&hits1, 1) != 4 {
w.WriteHeader(http.StatusTooManyRequests)
return
}
w.WriteHeader(http.StatusOK)
}))
defer srv1.Close()

// srv2: returns 429 for the first 2 requests, and 200 on the 3rd request
srv2 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if atomic.AddInt32(&hits2, 1) != 3 {
w.WriteHeader(http.StatusTooManyRequests)
return
}
w.WriteHeader(http.StatusOK)
}))
defer srv2.Close()

r, err := New(&Options{
Threads: 1,
RetryRounds: 2,
RetryDelay: 5,
Timeout: 3,
})
require.NoError(t, err)

output := make(chan Result)
retryCh := make(chan retryJob)

_, drainedCh := r.retryLoop(context.Background(), retryCh, output, r.analyze)

Comment on lines +356 to +357
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Don’t ignore the retry loop’s stop handle; avoid a goroutine leak

retryLoop appears to return a stop/cleanup handle (you currently discard it with _). If the loop isn’t explicitly stopped, the goroutine can remain blocked on retryCh after the test finishes, causing leaks and potential flakiness in suite runs.

Apply this diff to wire cancellation correctly and ensure cleanup even on early test failures:

-    _, drainedCh := r.retryLoop(context.Background(), retryCh, output, r.analyze)
+    ctx, cancel := context.WithCancel(context.Background())
+    stopRetry, drainedCh := r.retryLoop(ctx, retryCh, output, r.analyze)
+    t.Cleanup(func() {
+        // Ensure the loop exits and resources are released
+        stopRetry()
+        cancel()
+    })

If you want to double-check the signature and the semantics of the first return value:


🏁 Script executed:

#!/bin/bash
# Inspect retryLoop signature/returns to confirm cleanup expectations
rg -nP -C2 'func\s+\(r\s+\*Runner\)\s+retryLoop\(' runner

Length of output: 342


🏁 Script executed:

#!/bin/bash
# Retrieve the full signature (including return values) of retryLoop to confirm the first return value’s type and semantics
rg -nP -C10 'func\s+\(r\s+\*Runner\)\s+retryLoop\(' runner/runner.go

Length of output: 691


🏁 Script executed:

#!/bin/bash
# Extract the full body of retryLoop to confirm how stop() and drainedCh operate
sed -n '1333,1400p' runner/runner.go

Length of output: 1518


🏁 Script executed:

#!/bin/bash
# Locate usages of retryLoop in the test file
rg -n -C5 'retryLoop' runner/runner_test.go

# Show full context around lines 350–380 to inspect how stop and drainedCh are handled
sed -n '340,380p' runner/runner_test.go

Length of output: 1276


Wire up the retryLoop stop handle to prevent goroutine leaks

The test currently invokes retryLoop with a background context and discards its first return value (stop func()), so the internal goroutine never sees a cancellation signal. This can leave the loop blocked on retryCh after the test ends, leading to leaked goroutines and flakiness.

• File: runner/runner_test.go
• Lines: 356–357

Suggested change:

-    _, drainedCh := r.retryLoop(context.Background(), retryCh, output, r.analyze)
+    // Create a cancellable context and capture the stop function
+    ctx, cancel := context.WithCancel(context.Background())
+    stopRetry, drainedCh := r.retryLoop(ctx, retryCh, output, r.analyze)
+    // Ensure the loop exits and the context is cleaned up after the test
+    t.Cleanup(func() {
+        stopRetry()
+        cancel()
+    })
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
_, drainedCh := r.retryLoop(context.Background(), retryCh, output, r.analyze)
// Create a cancellable context and capture the stop function
ctx, cancel := context.WithCancel(context.Background())
stopRetry, drainedCh := r.retryLoop(ctx, retryCh, output, r.analyze)
// Ensure the loop exits and the context is cleaned up after the test
t.Cleanup(func() {
stopRetry()
cancel()
})
🤖 Prompt for AI Agents
In runner/runner_test.go around lines 356–357, the test calls
r.retryLoop(context.Background(), ...) but ignores the returned stop func,
leaving the internal goroutine unable to observe cancellation; capture the first
return value (stop func) when calling retryLoop and ensure you call it
(preferably via defer stop() immediately after the call) so the retryLoop is
signaled to exit and no goroutine is leaked.

wg, _ := syncutil.New(syncutil.WithSize(r.options.Threads))
so := r.scanopts.Clone()
so.Methods = []string{"GET"}
so.TLSProbe = false
so.CSPProbe = false

seed := map[string]string{
"srv1": srv1.URL,
"srv2": srv2.URL,
}

var drainWG sync.WaitGroup
drainWG.Add(1)
var s1n429, s1n200, s2n429, s2n200 int
go func() {
defer drainWG.Done()
for res := range output {
switch res.StatusCode {
case http.StatusTooManyRequests:
if res.URL == srv1.URL {
s1n429++
} else {
s2n429++
}
case http.StatusOK:
if res.URL == srv1.URL {
s1n200++
} else {
s2n200++
}
}
}
}()

for _, url := range seed {
r.process(url, wg, r.hp, httpx.HTTP, so, output, retryCh)
}

wg.Wait()
<-drainedCh
close(output)
drainWG.Wait()
Comment on lines +396 to +399
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Guard against hangs: time-bound the wait for drainedCh

If the retry loop logic regresses, <-drainedCh can block indefinitely and hang the test suite. Use a bounded wait to fail fast and surface the defect.

Apply this diff:

-    wg.Wait()
-    <-drainedCh
-    close(output)
+    wg.Wait()
+    select {
+    case <-drainedCh:
+        // drained successfully
+    case <-time.After(5 * time.Second):
+        t.Fatalf("retry loop did not drain within timeout")
+    }
+    close(output)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
wg.Wait()
<-drainedCh
close(output)
drainWG.Wait()
wg.Wait()
select {
case <-drainedCh:
// drained successfully
case <-time.After(5 * time.Second):
t.Fatalf("retry loop did not drain within timeout")
}
close(output)
drainWG.Wait()


// Verify expected results
// srv1: should have 3x 429 responses and no 200 (never succeeds within retries)
require.Equal(t, 3, s1n429)
require.Equal(t, 0, s1n200)

// srv2: should have 2x 429 responses and 1x 200 (succeeds on 3rd attempt)
require.Equal(t, 2, s2n429)
require.Equal(t, 1, s2n200)
}
11 changes: 11 additions & 0 deletions runner/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,17 @@ type Trace struct {
WroteRequest time.Time `json:"wrote_request,omitempty"`
}

type retryJob struct {
hp *httpx.HTTPX
protocol string
target httpx.Target
method string
origInput string
scanopts *ScanOptions
attempt int
when time.Time
}

// function to get dsl variables from result struct
func dslVariables() ([]string, error) {
fakeResult := Result{}
Expand Down
Loading