-
Notifications
You must be signed in to change notification settings - Fork 0
Add ramping loadtests to test cluster #15
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
1b9b56b
88766b8
1802811
734e9e7
a807f66
f880ff1
6263e2c
556024b
36de4ae
990e5cd
9853836
1cce3e6
489d7ef
5babcdf
a75b137
57f0fdc
2d31e3c
c4fb96c
7d169f4
3e43977
fa8f4ec
e247088
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,27 @@ | ||
| { | ||
| "chainId": 713714, | ||
| "seiChainId": "sei-chain", | ||
| "endpoints": [ | ||
| "http://127.0.0.1:8545" | ||
| ], | ||
| "accounts": { | ||
| "count": 5000, | ||
| "newAccountRate": 0.0 | ||
| }, | ||
| "scenarios": [ | ||
| { | ||
| "name": "EVMTransfer", | ||
| "weight": 1 | ||
| } | ||
| ], | ||
| "workers": 1, | ||
| "tps": 0, | ||
| "statsInterval": "5s", | ||
| "bufferSize": 1000, | ||
| "dryRun": false, | ||
| "debug": false, | ||
| "trackReceipts": false, | ||
| "trackBlocks": false, | ||
| "trackUserLatency": false, | ||
| "prewarm": false | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,30 @@ | ||
| { | ||
| "chainId": 713714, | ||
| "seiChainId": "local-docker", | ||
| "endpoints": [ | ||
| "http://127.0.0.1:8545", | ||
| "http://127.0.0.1:8547", | ||
| "http://127.0.0.1:8549", | ||
| "http://127.0.0.1:8551" | ||
| ], | ||
| "accounts": { | ||
| "count": 5000, | ||
| "newAccountRate": 0.0 | ||
| }, | ||
| "scenarios": [ | ||
| { | ||
| "name": "ERC20", | ||
| "weight": 1 | ||
| } | ||
| ], | ||
| "workers": 1, | ||
| "tps": 0, | ||
| "statsInterval": "5s", | ||
| "bufferSize": 1000, | ||
| "dryRun": false, | ||
| "debug": false, | ||
| "trackReceipts": false, | ||
| "trackBlocks": false, | ||
| "trackUserLatency": false, | ||
| "prewarm": false | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,199 @@ | ||
| package sender | ||
|
|
||
| import ( | ||
| "context" | ||
| "errors" | ||
| "fmt" | ||
| "log" | ||
| "time" | ||
|
|
||
| "github.com/sei-protocol/sei-load/stats" | ||
| "github.com/sei-protocol/sei-load/utils/service" | ||
| "golang.org/x/time/rate" | ||
| ) | ||
|
|
||
| // This will manage the ramping process for the loadtest | ||
| // Ramping loadtest will being at the StartTps and spend LoadTime at each step, ending when we violate the chain SLO of | ||
| // 1 block per second over a given ramp period (as measured in the back half of the ramp time) | ||
| // If we successfully pass a given TPS, we will pause for PauseTime, and then start the next step. | ||
| // If we fail to pass a given TPS, we will stop the loadtest. | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Write this as godoc of |
||
|
|
||
| var ErrRampTestFailedSLO = errors.New("Ramp Test failed SLO") | ||
|
|
||
| func (r *Ramper) FormatRampStats() string { | ||
| return fmt.Sprintf(` | ||
| ───────────────────────────────────────── | ||
| RAMP STATISTICS | ||
| ───────────────────────────────────────── | ||
| Ramp Curve Stats: | ||
| %s | ||
| ───────────────────────────────────────── | ||
| Window Block Stats: | ||
| %s | ||
| ─────────────────────────────────────────`, | ||
| r.rampCurve.GetCurveStats(), r.lastWindowStats.FormatBlockStats()) | ||
| } | ||
|
|
||
| type Ramper struct { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there anything here that we want otel metrics for too? It doesn't have to happen in this PR but if the lines are small enough I would also consider adding metrics. |
||
| sharedLimiter *rate.Limiter | ||
| blockCollector stats.BlockStatsProvider | ||
| currentTps float64 | ||
| startTime time.Time | ||
| rampCurve RampCurve | ||
| lastWindowStats stats.BlockStats | ||
| } | ||
|
|
||
| // RampCurve is a function that returns the target TPS at a given time in the ramp period | ||
| type RampCurve interface { | ||
| GetTPS(t time.Duration) float64 | ||
| GetCurveStats() string | ||
| } | ||
|
|
||
| func NewRamper(rampCurve RampCurve, blockCollector stats.BlockStatsProvider, sharedLimiter *rate.Limiter) *Ramper { | ||
| sharedLimiter.SetLimit(rate.Limit(1)) // reset limiter to 1 | ||
| return &Ramper{ | ||
| sharedLimiter: sharedLimiter, | ||
| blockCollector: blockCollector, | ||
| rampCurve: rampCurve, | ||
| } | ||
| } | ||
|
|
||
| func (r *Ramper) UpdateTPS() { | ||
| timeSinceStart := time.Since(r.startTime) | ||
| r.currentTps = r.rampCurve.GetTPS(timeSinceStart) | ||
| r.sharedLimiter.SetLimit(rate.Limit(r.currentTps)) | ||
| } | ||
|
|
||
| func (r *Ramper) LogFinalStats() { | ||
| log.Printf("Final Ramp stats: \n%s", r.FormatRampStats()) | ||
| } | ||
|
|
||
| // WatchSLO will evaluate the chain SLO every 100ms using a 30 second window, and return a channel if the SLO is violated | ||
| func (r *Ramper) WatchSLO(ctx context.Context) <-chan struct{} { | ||
| ch := make(chan struct{}) | ||
| go func() { | ||
| defer close(ch) | ||
|
|
||
| log.Println("🔍 Ramping watching chain SLO with 30s windows, checking every 100ms") | ||
|
|
||
| // Two separate timers: frequent SLO checks and window resets | ||
| sloCheckTicker := time.NewTicker(100 * time.Millisecond) | ||
| windowResetTicker := time.NewTicker(30 * time.Second) | ||
| defer sloCheckTicker.Stop() | ||
| defer windowResetTicker.Stop() | ||
|
|
||
| // Reset window stats at the start | ||
| r.blockCollector.ResetWindowStats() | ||
|
|
||
| for { | ||
| select { | ||
| case <-ctx.Done(): | ||
| return | ||
| case <-sloCheckTicker.C: | ||
| // Check SLO every 100ms | ||
| p90BlockTime := r.blockCollector.GetWindowBlockTimePercentile(90) | ||
| if p90BlockTime > 1*time.Second { | ||
| log.Printf("❌ SLO violated: 90th percentile block time %v exceeds 1s threshold", p90BlockTime) | ||
| select { | ||
| case ch <- struct{}{}: | ||
| case <-ctx.Done(): | ||
| } | ||
| return | ||
| } | ||
| case <-windowResetTicker.C: | ||
| // Reset window stats every 30 seconds for fresh measurements | ||
| log.Printf("🔄 Resetting SLO window stats (30s period)") | ||
| // save last window stats | ||
| r.lastWindowStats = r.blockCollector.GetWindowBlockStats() | ||
| r.blockCollector.ResetWindowStats() | ||
| } | ||
| } | ||
| }() | ||
| return ch | ||
| } | ||
|
|
||
| // Start initializes and starts all workers | ||
| func (r *Ramper) Run(ctx context.Context) error { | ||
| return service.Run(ctx, func(ctx context.Context, s service.Scope) error { | ||
| // TODO: Implement ramping logic | ||
udpatil marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| r.startTime = time.Now() | ||
| sloChan := r.WatchSLO(ctx) | ||
| tpsUpdateTicker := time.NewTicker(100 * time.Millisecond) | ||
| for ctx.Err() == nil { | ||
|
|
||
| select { | ||
| case <-sloChan: | ||
| r.sharedLimiter.SetLimit(rate.Limit(1)) | ||
| log.Printf("❌ Ramping failed to pass SLO, stopping loadtest, failure window blockstats:") | ||
| log.Println(r.blockCollector.GetWindowBlockStats().FormatBlockStats()) | ||
| return ErrRampTestFailedSLO | ||
| case <-tpsUpdateTicker.C: | ||
| r.UpdateTPS() | ||
| case <-ctx.Done(): | ||
| return ctx.Err() | ||
| } | ||
| } | ||
| return ctx.Err() | ||
| }) | ||
| } | ||
|
|
||
| type RampCurveStep struct { | ||
| StartTps float64 | ||
| IncrementTps float64 | ||
| LoadInterval time.Duration | ||
| RecoveryInterval time.Duration | ||
| Step int | ||
| CurrentTPS float64 | ||
| } | ||
|
|
||
| func NewRampCurveStep(startTps float64, incrementTps float64, loadInterval time.Duration, recoveryInterval time.Duration) *RampCurveStep { | ||
| return &RampCurveStep{ | ||
| StartTps: startTps, | ||
| IncrementTps: incrementTps, | ||
| LoadInterval: loadInterval, | ||
| RecoveryInterval: recoveryInterval, | ||
| Step: 0, | ||
| CurrentTPS: startTps, | ||
| } | ||
| } | ||
|
|
||
| func (r *RampCurveStep) GetStartTps() float64 { | ||
| return r.StartTps | ||
| } | ||
|
|
||
| func (r *RampCurveStep) GetIncrementTps() float64 { | ||
| return r.IncrementTps | ||
| } | ||
|
|
||
| func (r *RampCurveStep) GetTPS(t time.Duration) float64 { | ||
| // figure out where we are in the load interval | ||
| cycleInterval := r.LoadInterval + r.RecoveryInterval | ||
| cycleProgress := t % cycleInterval | ||
|
|
||
| // if we're in the recovery interval, return 1.00 (close to 0 but doesn't fully block the limiter) | ||
| if cycleProgress > r.LoadInterval { | ||
| return 1.00 | ||
| } | ||
|
|
||
| cycleNumber := int(t / cycleInterval) | ||
|
|
||
| // this means we're in a new step, so we need to update step and TPS | ||
| if cycleNumber > r.Step { | ||
| r.Step = cycleNumber | ||
| newTps := r.StartTps + r.IncrementTps*float64(r.Step) | ||
| log.Printf("📈 Ramping to %f TPS for %v", newTps, r.LoadInterval) | ||
| r.CurrentTPS = newTps | ||
| return newTps | ||
| } | ||
|
|
||
| return r.CurrentTPS | ||
| } | ||
|
|
||
| // this should return the highest target TPS that is PRIOR to the current step | ||
| func (r *RampCurveStep) GetCurveStats() string { | ||
| step := r.Step - 1 | ||
| if step < 0 { | ||
| return "no ramp curve stats available" | ||
| } | ||
| return fmt.Sprintf("Highest Passed TPS: %.2f", r.StartTps+r.IncrementTps*float64(step)) | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.