Skip to content

Commit

Permalink
Update parameter of simulation test (uber#6226)
Browse files Browse the repository at this point in the history
  • Loading branch information
Shaddoll authored Aug 13, 2024
1 parent 32e3878 commit d4df0cf
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 17 deletions.
43 changes: 34 additions & 9 deletions host/matching_simulation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ package host

import (
"context"
"errors"
"flag"
"fmt"
"os"
Expand All @@ -50,6 +51,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"go.uber.org/yarpc"
"golang.org/x/time/rate"

"github.com/uber/cadence/client/history"
"github.com/uber/cadence/common/dynamicconfig"
Expand Down Expand Up @@ -100,6 +102,8 @@ func TestMatchingSimulationSuite(t *testing.T) {
dynamicconfig.MatchingForwarderMaxOutstandingTasks: getForwarderMaxOutstandingTasks(clusterConfig.MatchingConfig.SimulationConfig.ForwarderMaxOutstandingTasks),
dynamicconfig.MatchingForwarderMaxRatePerSecond: getForwarderMaxRPS(clusterConfig.MatchingConfig.SimulationConfig.ForwarderMaxRatePerSecond),
dynamicconfig.MatchingForwarderMaxChildrenPerNode: getForwarderMaxChildPerNode(clusterConfig.MatchingConfig.SimulationConfig.ForwarderMaxChildrenPerNode),
dynamicconfig.LocalPollWaitTime: clusterConfig.MatchingConfig.SimulationConfig.LocalPollWaitTime,
dynamicconfig.LocalTaskWaitTime: clusterConfig.MatchingConfig.SimulationConfig.LocalTaskWaitTime,
}

ctrl := gomock.NewController(t)
Expand Down Expand Up @@ -195,19 +199,25 @@ func (s *MatchingSimulationSuite) TestMatchingSimulation() {

startTime := time.Now()
// Start task generators
rps := getTaskQPS(s.testClusterConfig.MatchingConfig.SimulationConfig.TasksPerSecond)
rateLimiter := rate.NewLimiter(rate.Limit(rps), rps)
generatedTasksCounter := int32(0)
lastTaskScheduleID := int32(0)
numGenerators := getNumGenerators(s.testClusterConfig.MatchingConfig.SimulationConfig.NumTaskGenerators)
taskGenerateInterval := getTaskGenerateInterval(s.testClusterConfig.MatchingConfig.SimulationConfig.TaskGeneratorTickInterval)
var tasksToGenerate sync.WaitGroup
tasksToGenerate.Add(maxTasksToGenerate)
var generatorWG sync.WaitGroup
for i := 1; i <= numGenerators; i++ {
generatorWG.Add(1)
go s.generate(ctx, matchingClient, domainID, tasklist, maxTasksToGenerate, taskGenerateInterval, &generatedTasksCounter, &lastTaskScheduleID, &generatorWG)
go s.generate(ctx, matchingClient, domainID, tasklist, maxTasksToGenerate, rateLimiter, &generatedTasksCounter, &lastTaskScheduleID, &generatorWG, &tasksToGenerate)
}

// Let it run until all tasks have been polled.
// There's a test timeout configured in docker/buildkite/docker-compose-local-matching-simulation.yml that you
// can change if your test case needs more time
s.log("Waiting until all tasks are generated")
tasksToGenerate.Wait()
generationTime := time.Now().Sub(startTime)
s.log("Waiting until all tasks are received")
tasksToReceive.Wait()
executionTime := time.Now().Sub(startTime)
Expand All @@ -227,17 +237,20 @@ func (s *MatchingSimulationSuite) TestMatchingSimulation() {
// Don't change the start/end line format as it is used by scripts to parse the summary info
testSummary := []string{}
testSummary = append(testSummary, "Simulation Summary:")
testSummary = append(testSummary, fmt.Sprintf("Task generate Duration: %v", generationTime))
testSummary = append(testSummary, fmt.Sprintf("Simulation Duration: %v", executionTime))
testSummary = append(testSummary, fmt.Sprintf("Num of Pollers: %d", numPollers))
testSummary = append(testSummary, fmt.Sprintf("Poll Timeout: %v", pollDuration))
testSummary = append(testSummary, fmt.Sprintf("Num of Task Generators: %d", numGenerators))
testSummary = append(testSummary, fmt.Sprintf("Task generated every: %v", taskGenerateInterval))
testSummary = append(testSummary, fmt.Sprintf("Task generated QPS: %v", rps))
testSummary = append(testSummary, fmt.Sprintf("Num of Write Partitions: %d", s.testClusterConfig.MatchingDynamicConfigOverrides[dynamicconfig.MatchingNumTasklistWritePartitions]))
testSummary = append(testSummary, fmt.Sprintf("Num of Read Partitions: %d", s.testClusterConfig.MatchingDynamicConfigOverrides[dynamicconfig.MatchingNumTasklistReadPartitions]))
testSummary = append(testSummary, fmt.Sprintf("Forwarder Max Outstanding Polls: %d", s.testClusterConfig.MatchingDynamicConfigOverrides[dynamicconfig.MatchingForwarderMaxOutstandingPolls]))
testSummary = append(testSummary, fmt.Sprintf("Forwarder Max Outstanding Tasks: %d", s.testClusterConfig.MatchingDynamicConfigOverrides[dynamicconfig.MatchingForwarderMaxOutstandingTasks]))
testSummary = append(testSummary, fmt.Sprintf("Forwarder Max RPS: %d", s.testClusterConfig.MatchingDynamicConfigOverrides[dynamicconfig.MatchingForwarderMaxRatePerSecond]))
testSummary = append(testSummary, fmt.Sprintf("Forwarder Max Children per Node: %d", s.testClusterConfig.MatchingDynamicConfigOverrides[dynamicconfig.MatchingForwarderMaxChildrenPerNode]))
testSummary = append(testSummary, fmt.Sprintf("Local Poll Wait Time: %v", s.testClusterConfig.MatchingDynamicConfigOverrides[dynamicconfig.LocalPollWaitTime]))
testSummary = append(testSummary, fmt.Sprintf("Local Task Wait Time: %v", s.testClusterConfig.MatchingDynamicConfigOverrides[dynamicconfig.LocalTaskWaitTime]))
testSummary = append(testSummary, fmt.Sprintf("Tasks generated: %d", generatedTasksCounter))
testSummary = append(testSummary, fmt.Sprintf("Tasks polled: %d", polledTasksCounter))
totalPollCnt := 0
Expand Down Expand Up @@ -267,21 +280,25 @@ func (s *MatchingSimulationSuite) generate(
matchingClient MatchingClient,
domainID, tasklist string,
maxTasksToGenerate int,
taskGenerateInterval time.Duration,
rateLimiter *rate.Limiter,
generatedTasksCounter *int32,
lastTaskScheduleID *int32,
wg *sync.WaitGroup) {
wg *sync.WaitGroup,
tasksToGenerate *sync.WaitGroup) {
defer wg.Done()

t := time.NewTicker(taskGenerateInterval)
defer t.Stop()

for {
select {
case <-ctx.Done():
s.log("Generator done")
return
case <-t.C:
default:
if err := rateLimiter.Wait(ctx); err != nil {
if !errors.Is(err, context.Canceled) {
s.T().Fatal("Rate limiter failed: ", err)
}
return
}
scheduleID := int(atomic.AddInt32(lastTaskScheduleID, 1))
if scheduleID > maxTasksToGenerate {
s.log("Generated %d tasks so generator will stop", maxTasksToGenerate)
Expand All @@ -298,6 +315,7 @@ func (s *MatchingSimulationSuite) generate(

s.log("Decision task %d added", scheduleID)
atomic.AddInt32(generatedTasksCounter, 1)
tasksToGenerate.Done()
}
}
}
Expand Down Expand Up @@ -483,3 +501,10 @@ func getForwarderMaxChildPerNode(i int) int {
}
return i
}

func getTaskQPS(i int) int {
if i == 0 {
return 40
}
return i
}
10 changes: 8 additions & 2 deletions host/onebox.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,8 @@ type (
// Number of task generators defaults to 1
NumTaskGenerators int

// Each generator will produce a new task every TaskGeneratorTickInterval. Defaults to 50ms
TaskGeneratorTickInterval time.Duration
// The total QPS to generate tasks. Defaults to 40.
TasksPerSecond int

// Upper limit of tasks to generate. Task generators will stop if total number of tasks generated reaches MaxTaskToGenerate during simulation
// Defaults to 2k
Expand All @@ -187,6 +187,12 @@ type (

// Children per node. defaults to 20
ForwarderMaxChildrenPerNode int

// LocalPollWaitTime. defaults to 0ms.
LocalPollWaitTime time.Duration

// LocalTaskWaitTime. defaults to 0ms.
LocalTaskWaitTime time.Duration
}

// CadenceParams contains everything needed to bootstrap Cadence
Expand Down
6 changes: 3 additions & 3 deletions host/testdata/matching_simulation_burst.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ matchingconfig:
tasklistreadpartitions: 2
numpollers: 10
numtaskgenerators: 2
taskgeneratortickinterval: 10ms
taskspersecond: 200
maxtasktogenerate: 1500
polltimeout: 5s
forwardermaxoutstandingpolls: 20
polltimeout: 60s
forwardermaxoutstandingpolls: 1
forwardermaxoutstandingtasks: 1
forwardermaxratepersecond: 10
forwardermaxchildrenpernode: 20
Expand Down
8 changes: 5 additions & 3 deletions host/testdata/matching_simulation_default.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,14 @@ matchingconfig:
tasklistreadpartitions: 2
numpollers: 10
numtaskgenerators: 2
taskgeneratortickinterval: 50ms
taskspersecond: 40
maxtasktogenerate: 1500
polltimeout: 5s
forwardermaxoutstandingpolls: 20
polltimeout: 60s
forwardermaxoutstandingpolls: 1
forwardermaxoutstandingtasks: 1
forwardermaxratepersecond: 10
forwardermaxchildrenpernode: 20
localpollwaittime: 0ms
localtaskwaittime: 0ms
workerconfig:
enableasyncwfconsumer: false

0 comments on commit d4df0cf

Please sign in to comment.