From e4f6af2f46ee15c8439f4d32797942b06d535646 Mon Sep 17 00:00:00 2001 From: Andrew Dawson Date: Tue, 7 Apr 2020 12:00:34 -0700 Subject: [PATCH] add db rps to progress report --- tools/cli/adminDBCommands.go | 81 +++++++++++++++++++++++------------- 1 file changed, 51 insertions(+), 30 deletions(-) diff --git a/tools/cli/adminDBCommands.go b/tools/cli/adminDBCommands.go index f4002524b10..72554c40096 100644 --- a/tools/cli/adminDBCommands.go +++ b/tools/cli/adminDBCommands.go @@ -43,19 +43,28 @@ import ( ) type ( - CorruptionType string + // CorruptionType indicates the type of corruption that was found + CorruptionType string + + // VerificationResult is the result of running a verification VerificationResult int ) const ( - HistoryMissing CorruptionType = "history_missing" - InvalidStartEvent = "invalid_start_event" - OpenExecutionInvalidCurrentExecution = "open_execution_invalid_current_execution" + // HistoryMissing is the CorruptionType indicating that history is missing + HistoryMissing CorruptionType = "history_missing" + // InvalidStartEvent is the CorruptionType indicating that the start event is invalid + InvalidStartEvent = "invalid_start_event" + // OpenExecutionInvalidCurrentExecution is the CorruptionType that indicates there is an orphan concrete execution + OpenExecutionInvalidCurrentExecution = "open_execution_invalid_current_execution" ) const ( + // VerificationResultNoCorruption indicates that no corruption was found VerificationResultNoCorruption VerificationResult = iota + // VerificationResultDetectedCorruption indicates a corruption was found VerificationResultDetectedCorruption + // VerificationResultCheckFailure indicates there was a failure to check corruption VerificationResultCheckFailure ) @@ -110,9 +119,10 @@ type ( // ShardScanReport is the type that gets written to ShardScanReportFile ShardScanReport struct { - ShardID int - Scanned *ShardScanReportExecutionsScanned - Failure *ShardScanReportFailure + ShardID int + TotalDBRequests int64 + Scanned *ShardScanReportExecutionsScanned + Failure *ShardScanReportFailure } // ShardScanReportExecutionsScanned is the part of the ShardScanReport of executions which were scanned @@ -140,6 +150,8 @@ type ( PercentageCheckFailure float64 ShardsPerHour float64 ExecutionsPerHour float64 + TotalDBRequests int64 + DatabaseRPS float64 } ) @@ -236,7 +248,7 @@ func scanShard( PageSize: executionsPageSize, PageToken: token, } - limiter.Wait(context.Background()) + preconditionForDBCall(&report.TotalDBRequests, limiter) resp, err := execStore.ListConcreteExecutions(req) if err != nil { report.Failure = &ShardScanReportFailure{ @@ -258,7 +270,8 @@ func scanShard( outputFiles.ExecutionCheckFailureFile, shardID, limiter, - historyStore) + historyStore, + &report.TotalDBRequests) switch historyVerificationResult { case VerificationResultNoCorruption: // nothing to do just keep checking other conditions @@ -296,7 +309,8 @@ func scanShard( shardID, historyBranch, execStore, - limiter) + limiter, + &report.TotalDBRequests) switch currentExecutionVerificationResult { case VerificationResultNoCorruption: // nothing to do just keep checking other conditions @@ -320,6 +334,7 @@ func verifyHistoryExists( shardID int, limiter *quotas.DynamicRateLimiter, historyStore persistence.HistoryStore, + totalDBRequests *int64, ) (VerificationResult, *persistence.InternalReadHistoryBranchResponse, *shared.HistoryBranch) { var branch shared.HistoryBranch err := branchDecoder.Decode(execution.BranchToken, &branch) @@ -342,7 +357,7 @@ func verifyHistoryExists( ShardID: shardID, PageSize: historyPageSize, } - limiter.Wait(context.Background()) + preconditionForDBCall(totalDBRequests, limiter) history, err := historyStore.ReadHistoryBranch(readHistoryBranchReq) if err != nil { if err == gocql.ErrNotFound { @@ -362,17 +377,16 @@ func verifyHistoryExists( }, }) return VerificationResultDetectedCorruption, nil, nil - } else { - recordExecutionCheckFailure(executionCheckFailureFile, &ExecutionCheckFailure{ - ShardID: shardID, - DomainID: execution.DomainID, - WorkflowID: execution.WorkflowID, - RunID: execution.RunID, - Note: "failed to read history branch with error other than gocql.ErrNotFond", - Details: err.Error(), - }) - return VerificationResultCheckFailure, nil, nil } + recordExecutionCheckFailure(executionCheckFailureFile, &ExecutionCheckFailure{ + ShardID: shardID, + DomainID: execution.DomainID, + WorkflowID: execution.WorkflowID, + RunID: execution.RunID, + Note: "failed to read history branch with error other than gocql.ErrNotFond", + Details: err.Error(), + }) + return VerificationResultCheckFailure, nil, nil } else if history == nil || len(history.History) == 0 { recordCorruptedWorkflow(corruptedExecutionFile, &CorruptedExecution{ ShardID: shardID, @@ -459,6 +473,7 @@ func verifyCurrentExecution( branch *shared.HistoryBranch, execStore persistence.ExecutionStore, limiter *quotas.DynamicRateLimiter, + totalDBRequests *int64, ) VerificationResult { if execution.State != persistence.WorkflowStateCreated && execution.State != persistence.WorkflowStateRunning { return VerificationResultNoCorruption @@ -467,7 +482,7 @@ func verifyCurrentExecution( DomainID: execution.DomainID, WorkflowID: execution.WorkflowID, } - limiter.Wait(context.Background()) + preconditionForDBCall(totalDBRequests, limiter) currentExecution, err := execStore.GetCurrentExecution(getCurrentExecutionRequest) if err != nil { switch err.(type) { @@ -613,6 +628,7 @@ func writeToFile(file *os.File, message string) { func includeShardInProgressReport(report *ShardScanReport, progressReport *ProgressReport, startTime time.Time) { progressReport.NumberOfShardsFinished++ + progressReport.TotalDBRequests += report.TotalDBRequests if report.Failure != nil { progressReport.NumberOfShardScanFailures++ } @@ -622,18 +638,18 @@ func includeShardInProgressReport(report *ShardScanReport, progressReport *Progr progressReport.ExecutionCheckFailureCount += report.Scanned.ExecutionCheckFailureCount } - round := func(x float64) float64 { - precision := 0.05 - return math.Round(x/precision) * precision + if progressReport.TotalExecutionsCount > 0 { + progressReport.PercentageCorrupted = math.Round((float64(progressReport.CorruptedExecutionsCount) * 100.0) / float64(progressReport.TotalExecutionsCount)) + progressReport.PercentageCheckFailure = math.Round((float64(progressReport.ExecutionCheckFailureCount) * 100.0) / float64(progressReport.TotalExecutionsCount)) } - progressReport.PercentageCorrupted = round((float64(progressReport.CorruptedExecutionsCount) * 100.0) / float64(progressReport.TotalExecutionsCount)) - progressReport.PercentageCheckFailure = round((float64(progressReport.ExecutionCheckFailureCount) * 100.0) / float64(progressReport.TotalExecutionsCount)) - pastTime := time.Now().Sub(startTime) hoursPast := float64(pastTime) / float64(time.Hour) - progressReport.ShardsPerHour = round(float64(progressReport.NumberOfShardsFinished) / hoursPast) - progressReport.ExecutionsPerHour = round(float64(progressReport.TotalExecutionsCount) / hoursPast) + progressReport.ShardsPerHour = math.Round(float64(progressReport.NumberOfShardsFinished) / hoursPast) + progressReport.ExecutionsPerHour = math.Round(float64(progressReport.TotalExecutionsCount) / hoursPast) + + secondsPast := float64(pastTime) / float64(time.Second) + progressReport.DatabaseRPS = math.Round(float64(progressReport.TotalDBRequests) / secondsPast) } func getRateLimiter(startRPS int, targetRPS int, scaleUpSeconds int) *quotas.DynamicRateLimiter { @@ -651,3 +667,8 @@ func getRateLimiter(startRPS int, targetRPS int, scaleUpSeconds int) *quotas.Dyn } return quotas.NewDynamicRateLimiter(rpsFn) } + +func preconditionForDBCall(totalDBRequests *int64, limiter *quotas.DynamicRateLimiter) { + *totalDBRequests = *totalDBRequests + 1 + limiter.Wait(context.Background()) +}