Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion config/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ type Settings struct {
TrackBlocks bool `json:"trackBlocks"`
TrackUserLatency bool `json:"trackUserLatency"`
Prewarm bool `json:"prewarm"`
RampUp bool `json:"rampUp"`
}

// DefaultSettings returns the default configuration values
Expand All @@ -35,6 +36,7 @@ func DefaultSettings() Settings {
TrackBlocks: false,
TrackUserLatency: false,
Prewarm: false,
RampUp: false,
}
}

Expand All @@ -52,6 +54,7 @@ func InitializeViper(cmd *cobra.Command) error {
"prewarm": "prewarm",
"trackUserLatency": "track-user-latency",
"workers": "workers",
"rampUp": "ramp-up",
}

for viperKey, flagName := range flagBindings {
Expand All @@ -72,7 +75,7 @@ func InitializeViper(cmd *cobra.Command) error {
viper.SetDefault("prewarm", defaults.Prewarm)
viper.SetDefault("trackUserLatency", defaults.TrackUserLatency)
viper.SetDefault("workers", defaults.Workers)

viper.SetDefault("rampUp", defaults.RampUp)
return nil
}

Expand Down Expand Up @@ -103,5 +106,6 @@ func ResolveSettings() Settings {
TrackBlocks: viper.GetBool("trackBlocks"),
TrackUserLatency: viper.GetBool("trackUserLatency"),
Prewarm: viper.GetBool("prewarm"),
RampUp: viper.GetBool("rampUp"),
}
}
2 changes: 2 additions & 0 deletions config/settings_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ func TestArgumentPrecedence(t *testing.T) {
cmd.Flags().Bool("prewarm", false, "Prewarm")
cmd.Flags().Bool("track-user-latency", false, "Track user latency")
cmd.Flags().Int("buffer-size", 0, "Buffer size")
cmd.Flags().Bool("ramp-up", false, "Ramp up loadtest")

// Parse CLI args
if len(tt.cliArgs) > 0 {
Expand Down Expand Up @@ -128,6 +129,7 @@ func TestDefaultSettings(t *testing.T) {
TrackBlocks: false,
TrackUserLatency: false,
Prewarm: false,
RampUp: false,
}

if defaults != expected {
Expand Down
29 changes: 23 additions & 6 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,11 @@ var rootCmd = &cobra.Command{
Short: "Sei Chain Load Test v2",
Long: `A load test generator for Sei Chain.

Supports both contract and non-contract scenarios with factory
and weighted scenario selection mechanisms. Features sharded sending
Supports both contract and non-contract scenarios with factory
and weighted scenario selection mechanisms. Features sharded sending
to multiple endpoints with account pooling management.

Use --dry-run to test configuration and view transaction details
Use --dry-run to test configuration and view transaction details
without actually sending requests or deploying contracts.`,
Run: func(cmd *cobra.Command, args []string) {
if err := runLoadTest(context.Background(), cmd, args); err != nil {
Expand All @@ -63,6 +63,7 @@ func init() {
rootCmd.Flags().IntP("workers", "w", 0, "Number of workers")
rootCmd.Flags().IntP("nodes", "n", 0, "Number of nodes/endpoints to use (0 = use all)")
rootCmd.Flags().String("metricsListenAddr", "0.0.0.0:9090", "The ip:port on which to export prometheus metrics.")
rootCmd.Flags().Bool("ramp-up", false, "Ramp up loadtest")

// Initialize Viper with proper error handling
if err := config.InitializeViper(rootCmd); err != nil {
Expand Down Expand Up @@ -148,6 +149,7 @@ func runLoadTest(ctx context.Context, cmd *cobra.Command, args []string) error {
// Create statistics collector and logger
collector := stats.NewCollector()
logger := stats.NewLogger(collector, settings.StatsInterval, settings.Debug)
var ramper *sender.Ramper

err = service.Run(ctx, func(ctx context.Context, s service.Scope) error {
// Create the generator from the config struct
Expand All @@ -159,9 +161,7 @@ func runLoadTest(ctx context.Context, cmd *cobra.Command, args []string) error {
// Create shared rate limiter for all workers if TPS is specified
var sharedLimiter *rate.Limiter
if settings.TPS > 0 {
// Convert TPS to interval: 1/tps seconds = (1/tps) * 1e9 nanoseconds
intervalNs := int64((1.0 / settings.TPS) * 1e9)
sharedLimiter = rate.NewLimiter(rate.Every(time.Duration(intervalNs)), 1)
sharedLimiter = rate.NewLimiter(rate.Limit(settings.TPS), 1)
log.Printf("📈 Rate limiting enabled: %.2f TPS shared across all workers", settings.TPS)
} else {
// No rate limiting
Expand All @@ -184,6 +184,20 @@ func runLoadTest(ctx context.Context, cmd *cobra.Command, args []string) error {
})
}

if settings.RampUp {
ramperBlockCollector := stats.NewBlockCollector(cfg.SeiChainID)
s.SpawnBgNamed("ramper block collector", func() error {
return ramperBlockCollector.Run(ctx, cfg.Endpoints[0])
})

ramper = sender.NewRamper(
sender.NewRampCurveStep(100, 100, 120*time.Second, 30*time.Second),
ramperBlockCollector,
sharedLimiter,
)
s.SpawnBgNamed("ramper", func() error { return ramper.Run(ctx) })
}

// Create and start user latency tracker if endpoints are available
if len(cfg.Endpoints) > 0 && settings.TrackUserLatency {
userLatencyTracker := stats.NewUserLatencyTracker(settings.StatsInterval)
Expand Down Expand Up @@ -274,6 +288,9 @@ func runLoadTest(ctx context.Context, cmd *cobra.Command, args []string) error {
})
// Print final statistics
logger.LogFinalStats()
if settings.RampUp && ramper != nil {
ramper.LogFinalStats()
}
log.Printf("👋 Shutdown complete")
return err
}
Expand Down
27 changes: 27 additions & 0 deletions profiles/evm_transfer.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
{
"chainId": 713714,
"seiChainId": "sei-chain",
"endpoints": [
"http://127.0.0.1:8545"
],
"accounts": {
"count": 5000,
"newAccountRate": 0.0
},
"scenarios": [
{
"name": "EVMTransfer",
"weight": 1
}
],
"workers": 1,
"tps": 0,
"statsInterval": "5s",
"bufferSize": 1000,
"dryRun": false,
"debug": false,
"trackReceipts": false,
"trackBlocks": false,
"trackUserLatency": false,
"prewarm": false
}
30 changes: 30 additions & 0 deletions profiles/local_docker.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
{
"chainId": 713714,
"seiChainId": "local-docker",
"endpoints": [
"http://127.0.0.1:8545",
"http://127.0.0.1:8547",
"http://127.0.0.1:8549",
"http://127.0.0.1:8551"
],
"accounts": {
"count": 5000,
"newAccountRate": 0.0
},
"scenarios": [
{
"name": "ERC20",
"weight": 1
}
],
"workers": 1,
"tps": 0,
"statsInterval": "5s",
"bufferSize": 1000,
"dryRun": false,
"debug": false,
"trackReceipts": false,
"trackBlocks": false,
"trackUserLatency": false,
"prewarm": false
}
199 changes: 199 additions & 0 deletions sender/ramper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
package sender

import (
"context"
"errors"
"fmt"
"log"
"time"

"github.com/sei-protocol/sei-load/stats"
"github.com/sei-protocol/sei-load/utils/service"
"golang.org/x/time/rate"
)

// This will manage the ramping process for the loadtest
// Ramping loadtest will being at the StartTps and spend LoadTime at each step, ending when we violate the chain SLO of
// 1 block per second over a given ramp period (as measured in the back half of the ramp time)
// If we successfully pass a given TPS, we will pause for PauseTime, and then start the next step.
// If we fail to pass a given TPS, we will stop the loadtest.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Write this as godoc of Ramper or package level godoc instead of inline comment?


var ErrRampTestFailedSLO = errors.New("Ramp Test failed SLO")

func (r *Ramper) FormatRampStats() string {
return fmt.Sprintf(`
─────────────────────────────────────────
RAMP STATISTICS
─────────────────────────────────────────
Ramp Curve Stats:
%s
─────────────────────────────────────────
Window Block Stats:
%s
─────────────────────────────────────────`,
r.rampCurve.GetCurveStats(), r.lastWindowStats.FormatBlockStats())
}

type Ramper struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there anything here that we want otel metrics for too? It doesn't have to happen in this PR but if the lines are small enough I would also consider adding metrics.

sharedLimiter *rate.Limiter
blockCollector stats.BlockStatsProvider
currentTps float64
startTime time.Time
rampCurve RampCurve
lastWindowStats stats.BlockStats
}

// RampCurve is a function that returns the target TPS at a given time in the ramp period
type RampCurve interface {
GetTPS(t time.Duration) float64
GetCurveStats() string
}

func NewRamper(rampCurve RampCurve, blockCollector stats.BlockStatsProvider, sharedLimiter *rate.Limiter) *Ramper {
sharedLimiter.SetLimit(rate.Limit(1)) // reset limiter to 1
return &Ramper{
sharedLimiter: sharedLimiter,
blockCollector: blockCollector,
rampCurve: rampCurve,
}
}

func (r *Ramper) UpdateTPS() {
timeSinceStart := time.Since(r.startTime)
r.currentTps = r.rampCurve.GetTPS(timeSinceStart)
r.sharedLimiter.SetLimit(rate.Limit(r.currentTps))
}

func (r *Ramper) LogFinalStats() {
log.Printf("Final Ramp stats: \n%s", r.FormatRampStats())
}

// WatchSLO will evaluate the chain SLO every 100ms using a 30 second window, and return a channel if the SLO is violated
func (r *Ramper) WatchSLO(ctx context.Context) <-chan struct{} {
ch := make(chan struct{})
go func() {
defer close(ch)

log.Println("🔍 Ramping watching chain SLO with 30s windows, checking every 100ms")

// Two separate timers: frequent SLO checks and window resets
sloCheckTicker := time.NewTicker(100 * time.Millisecond)
windowResetTicker := time.NewTicker(30 * time.Second)
defer sloCheckTicker.Stop()
defer windowResetTicker.Stop()

// Reset window stats at the start
r.blockCollector.ResetWindowStats()

for {
select {
case <-ctx.Done():
return
case <-sloCheckTicker.C:
// Check SLO every 100ms
p90BlockTime := r.blockCollector.GetWindowBlockTimePercentile(90)
if p90BlockTime > 1*time.Second {
log.Printf("❌ SLO violated: 90th percentile block time %v exceeds 1s threshold", p90BlockTime)
select {
case ch <- struct{}{}:
case <-ctx.Done():
}
return
}
case <-windowResetTicker.C:
// Reset window stats every 30 seconds for fresh measurements
log.Printf("🔄 Resetting SLO window stats (30s period)")
// save last window stats
r.lastWindowStats = r.blockCollector.GetWindowBlockStats()
r.blockCollector.ResetWindowStats()
}
}
}()
return ch
}

// Start initializes and starts all workers
func (r *Ramper) Run(ctx context.Context) error {
return service.Run(ctx, func(ctx context.Context, s service.Scope) error {
// TODO: Implement ramping logic
r.startTime = time.Now()
sloChan := r.WatchSLO(ctx)
tpsUpdateTicker := time.NewTicker(100 * time.Millisecond)
for ctx.Err() == nil {

select {
case <-sloChan:
r.sharedLimiter.SetLimit(rate.Limit(1))
log.Printf("❌ Ramping failed to pass SLO, stopping loadtest, failure window blockstats:")
log.Println(r.blockCollector.GetWindowBlockStats().FormatBlockStats())
return ErrRampTestFailedSLO
case <-tpsUpdateTicker.C:
r.UpdateTPS()
case <-ctx.Done():
return ctx.Err()
}
}
return ctx.Err()
})
}

type RampCurveStep struct {
StartTps float64
IncrementTps float64
LoadInterval time.Duration
RecoveryInterval time.Duration
Step int
CurrentTPS float64
}

func NewRampCurveStep(startTps float64, incrementTps float64, loadInterval time.Duration, recoveryInterval time.Duration) *RampCurveStep {
return &RampCurveStep{
StartTps: startTps,
IncrementTps: incrementTps,
LoadInterval: loadInterval,
RecoveryInterval: recoveryInterval,
Step: 0,
CurrentTPS: startTps,
}
}

func (r *RampCurveStep) GetStartTps() float64 {
return r.StartTps
}

func (r *RampCurveStep) GetIncrementTps() float64 {
return r.IncrementTps
}

func (r *RampCurveStep) GetTPS(t time.Duration) float64 {
// figure out where we are in the load interval
cycleInterval := r.LoadInterval + r.RecoveryInterval
cycleProgress := t % cycleInterval

// if we're in the recovery interval, return 1.00 (close to 0 but doesn't fully block the limiter)
if cycleProgress > r.LoadInterval {
return 1.00
}

cycleNumber := int(t / cycleInterval)

// this means we're in a new step, so we need to update step and TPS
if cycleNumber > r.Step {
r.Step = cycleNumber
newTps := r.StartTps + r.IncrementTps*float64(r.Step)
log.Printf("📈 Ramping to %f TPS for %v", newTps, r.LoadInterval)
r.CurrentTPS = newTps
return newTps
}

return r.CurrentTPS
}

// this should return the highest target TPS that is PRIOR to the current step
func (r *RampCurveStep) GetCurveStats() string {
step := r.Step - 1
if step < 0 {
return "no ramp curve stats available"
}
return fmt.Sprintf("Highest Passed TPS: %.2f", r.StartTps+r.IncrementTps*float64(step))
}
Loading
Loading