diff --git a/host/matching_simulation_test.go b/host/matching_simulation_test.go index 4efad4c38d6..4a9aa1c375a 100644 --- a/host/matching_simulation_test.go +++ b/host/matching_simulation_test.go @@ -35,6 +35,7 @@ package host import ( "context" + "errors" "flag" "fmt" "os" @@ -50,6 +51,7 @@ import ( "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" "go.uber.org/yarpc" + "golang.org/x/time/rate" "github.com/uber/cadence/client/history" "github.com/uber/cadence/common/dynamicconfig" @@ -100,6 +102,8 @@ func TestMatchingSimulationSuite(t *testing.T) { dynamicconfig.MatchingForwarderMaxOutstandingTasks: getForwarderMaxOutstandingTasks(clusterConfig.MatchingConfig.SimulationConfig.ForwarderMaxOutstandingTasks), dynamicconfig.MatchingForwarderMaxRatePerSecond: getForwarderMaxRPS(clusterConfig.MatchingConfig.SimulationConfig.ForwarderMaxRatePerSecond), dynamicconfig.MatchingForwarderMaxChildrenPerNode: getForwarderMaxChildPerNode(clusterConfig.MatchingConfig.SimulationConfig.ForwarderMaxChildrenPerNode), + dynamicconfig.LocalPollWaitTime: clusterConfig.MatchingConfig.SimulationConfig.LocalPollWaitTime, + dynamicconfig.LocalTaskWaitTime: clusterConfig.MatchingConfig.SimulationConfig.LocalTaskWaitTime, } ctrl := gomock.NewController(t) @@ -195,19 +199,25 @@ func (s *MatchingSimulationSuite) TestMatchingSimulation() { startTime := time.Now() // Start task generators + rps := getTaskQPS(s.testClusterConfig.MatchingConfig.SimulationConfig.TasksPerSecond) + rateLimiter := rate.NewLimiter(rate.Limit(rps), rps) generatedTasksCounter := int32(0) lastTaskScheduleID := int32(0) numGenerators := getNumGenerators(s.testClusterConfig.MatchingConfig.SimulationConfig.NumTaskGenerators) - taskGenerateInterval := getTaskGenerateInterval(s.testClusterConfig.MatchingConfig.SimulationConfig.TaskGeneratorTickInterval) + var tasksToGenerate sync.WaitGroup + tasksToGenerate.Add(maxTasksToGenerate) var generatorWG sync.WaitGroup for i := 1; i <= numGenerators; i++ { generatorWG.Add(1) - go s.generate(ctx, matchingClient, domainID, tasklist, maxTasksToGenerate, taskGenerateInterval, &generatedTasksCounter, &lastTaskScheduleID, &generatorWG) + go s.generate(ctx, matchingClient, domainID, tasklist, maxTasksToGenerate, rateLimiter, &generatedTasksCounter, &lastTaskScheduleID, &generatorWG, &tasksToGenerate) } // Let it run until all tasks have been polled. // There's a test timeout configured in docker/buildkite/docker-compose-local-matching-simulation.yml that you // can change if your test case needs more time + s.log("Waiting until all tasks are generated") + tasksToGenerate.Wait() + generationTime := time.Now().Sub(startTime) s.log("Waiting until all tasks are received") tasksToReceive.Wait() executionTime := time.Now().Sub(startTime) @@ -227,17 +237,20 @@ func (s *MatchingSimulationSuite) TestMatchingSimulation() { // Don't change the start/end line format as it is used by scripts to parse the summary info testSummary := []string{} testSummary = append(testSummary, "Simulation Summary:") + testSummary = append(testSummary, fmt.Sprintf("Task generate Duration: %v", generationTime)) testSummary = append(testSummary, fmt.Sprintf("Simulation Duration: %v", executionTime)) testSummary = append(testSummary, fmt.Sprintf("Num of Pollers: %d", numPollers)) testSummary = append(testSummary, fmt.Sprintf("Poll Timeout: %v", pollDuration)) testSummary = append(testSummary, fmt.Sprintf("Num of Task Generators: %d", numGenerators)) - testSummary = append(testSummary, fmt.Sprintf("Task generated every: %v", taskGenerateInterval)) + testSummary = append(testSummary, fmt.Sprintf("Task generated QPS: %v", rps)) testSummary = append(testSummary, fmt.Sprintf("Num of Write Partitions: %d", s.testClusterConfig.MatchingDynamicConfigOverrides[dynamicconfig.MatchingNumTasklistWritePartitions])) testSummary = append(testSummary, fmt.Sprintf("Num of Read Partitions: %d", s.testClusterConfig.MatchingDynamicConfigOverrides[dynamicconfig.MatchingNumTasklistReadPartitions])) testSummary = append(testSummary, fmt.Sprintf("Forwarder Max Outstanding Polls: %d", s.testClusterConfig.MatchingDynamicConfigOverrides[dynamicconfig.MatchingForwarderMaxOutstandingPolls])) testSummary = append(testSummary, fmt.Sprintf("Forwarder Max Outstanding Tasks: %d", s.testClusterConfig.MatchingDynamicConfigOverrides[dynamicconfig.MatchingForwarderMaxOutstandingTasks])) testSummary = append(testSummary, fmt.Sprintf("Forwarder Max RPS: %d", s.testClusterConfig.MatchingDynamicConfigOverrides[dynamicconfig.MatchingForwarderMaxRatePerSecond])) testSummary = append(testSummary, fmt.Sprintf("Forwarder Max Children per Node: %d", s.testClusterConfig.MatchingDynamicConfigOverrides[dynamicconfig.MatchingForwarderMaxChildrenPerNode])) + testSummary = append(testSummary, fmt.Sprintf("Local Poll Wait Time: %v", s.testClusterConfig.MatchingDynamicConfigOverrides[dynamicconfig.LocalPollWaitTime])) + testSummary = append(testSummary, fmt.Sprintf("Local Task Wait Time: %v", s.testClusterConfig.MatchingDynamicConfigOverrides[dynamicconfig.LocalTaskWaitTime])) testSummary = append(testSummary, fmt.Sprintf("Tasks generated: %d", generatedTasksCounter)) testSummary = append(testSummary, fmt.Sprintf("Tasks polled: %d", polledTasksCounter)) totalPollCnt := 0 @@ -267,21 +280,25 @@ func (s *MatchingSimulationSuite) generate( matchingClient MatchingClient, domainID, tasklist string, maxTasksToGenerate int, - taskGenerateInterval time.Duration, + rateLimiter *rate.Limiter, generatedTasksCounter *int32, lastTaskScheduleID *int32, - wg *sync.WaitGroup) { + wg *sync.WaitGroup, + tasksToGenerate *sync.WaitGroup) { defer wg.Done() - t := time.NewTicker(taskGenerateInterval) - defer t.Stop() - for { select { case <-ctx.Done(): s.log("Generator done") return - case <-t.C: + default: + if err := rateLimiter.Wait(ctx); err != nil { + if !errors.Is(err, context.Canceled) { + s.T().Fatal("Rate limiter failed: ", err) + } + return + } scheduleID := int(atomic.AddInt32(lastTaskScheduleID, 1)) if scheduleID > maxTasksToGenerate { s.log("Generated %d tasks so generator will stop", maxTasksToGenerate) @@ -298,6 +315,7 @@ func (s *MatchingSimulationSuite) generate( s.log("Decision task %d added", scheduleID) atomic.AddInt32(generatedTasksCounter, 1) + tasksToGenerate.Done() } } } @@ -483,3 +501,10 @@ func getForwarderMaxChildPerNode(i int) int { } return i } + +func getTaskQPS(i int) int { + if i == 0 { + return 40 + } + return i +} diff --git a/host/onebox.go b/host/onebox.go index 2b8317057d7..1893b91f54a 100644 --- a/host/onebox.go +++ b/host/onebox.go @@ -166,8 +166,8 @@ type ( // Number of task generators defaults to 1 NumTaskGenerators int - // Each generator will produce a new task every TaskGeneratorTickInterval. Defaults to 50ms - TaskGeneratorTickInterval time.Duration + // The total QPS to generate tasks. Defaults to 40. + TasksPerSecond int // Upper limit of tasks to generate. Task generators will stop if total number of tasks generated reaches MaxTaskToGenerate during simulation // Defaults to 2k @@ -187,6 +187,12 @@ type ( // Children per node. defaults to 20 ForwarderMaxChildrenPerNode int + + // LocalPollWaitTime. defaults to 0ms. + LocalPollWaitTime time.Duration + + // LocalTaskWaitTime. defaults to 0ms. + LocalTaskWaitTime time.Duration } // CadenceParams contains everything needed to bootstrap Cadence diff --git a/host/testdata/matching_simulation_burst.yaml b/host/testdata/matching_simulation_burst.yaml index 279233ee5fd..9304ea54eb5 100644 --- a/host/testdata/matching_simulation_burst.yaml +++ b/host/testdata/matching_simulation_burst.yaml @@ -12,10 +12,10 @@ matchingconfig: tasklistreadpartitions: 2 numpollers: 10 numtaskgenerators: 2 - taskgeneratortickinterval: 10ms + taskspersecond: 200 maxtasktogenerate: 1500 - polltimeout: 5s - forwardermaxoutstandingpolls: 20 + polltimeout: 60s + forwardermaxoutstandingpolls: 1 forwardermaxoutstandingtasks: 1 forwardermaxratepersecond: 10 forwardermaxchildrenpernode: 20 diff --git a/host/testdata/matching_simulation_default.yaml b/host/testdata/matching_simulation_default.yaml index 4a0da782f2c..099cd92c408 100644 --- a/host/testdata/matching_simulation_default.yaml +++ b/host/testdata/matching_simulation_default.yaml @@ -12,12 +12,14 @@ matchingconfig: tasklistreadpartitions: 2 numpollers: 10 numtaskgenerators: 2 - taskgeneratortickinterval: 50ms + taskspersecond: 40 maxtasktogenerate: 1500 - polltimeout: 5s - forwardermaxoutstandingpolls: 20 + polltimeout: 60s + forwardermaxoutstandingpolls: 1 forwardermaxoutstandingtasks: 1 forwardermaxratepersecond: 10 forwardermaxchildrenpernode: 20 + localpollwaittime: 0ms + localtaskwaittime: 0ms workerconfig: enableasyncwfconsumer: false