Skip to content

Commit

Permalink
add db rps to progress report
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewjdawson2016 committed Apr 7, 2020
1 parent 0323216 commit e4f6af2
Showing 1 changed file with 51 additions and 30 deletions.
81 changes: 51 additions & 30 deletions tools/cli/adminDBCommands.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,19 +43,28 @@ import (
)

type (
CorruptionType string
// CorruptionType indicates the type of corruption that was found
CorruptionType string

// VerificationResult is the result of running a verification
VerificationResult int
)

const (
HistoryMissing CorruptionType = "history_missing"
InvalidStartEvent = "invalid_start_event"
OpenExecutionInvalidCurrentExecution = "open_execution_invalid_current_execution"
// HistoryMissing is the CorruptionType indicating that history is missing
HistoryMissing CorruptionType = "history_missing"
// InvalidStartEvent is the CorruptionType indicating that the start event is invalid
InvalidStartEvent = "invalid_start_event"
// OpenExecutionInvalidCurrentExecution is the CorruptionType that indicates there is an orphan concrete execution
OpenExecutionInvalidCurrentExecution = "open_execution_invalid_current_execution"
)

const (
// VerificationResultNoCorruption indicates that no corruption was found
VerificationResultNoCorruption VerificationResult = iota
// VerificationResultDetectedCorruption indicates a corruption was found
VerificationResultDetectedCorruption
// VerificationResultCheckFailure indicates there was a failure to check corruption
VerificationResultCheckFailure
)

Expand Down Expand Up @@ -110,9 +119,10 @@ type (

// ShardScanReport is the type that gets written to ShardScanReportFile
ShardScanReport struct {
ShardID int
Scanned *ShardScanReportExecutionsScanned
Failure *ShardScanReportFailure
ShardID int
TotalDBRequests int64
Scanned *ShardScanReportExecutionsScanned
Failure *ShardScanReportFailure
}

// ShardScanReportExecutionsScanned is the part of the ShardScanReport of executions which were scanned
Expand Down Expand Up @@ -140,6 +150,8 @@ type (
PercentageCheckFailure float64
ShardsPerHour float64
ExecutionsPerHour float64
TotalDBRequests int64
DatabaseRPS float64
}
)

Expand Down Expand Up @@ -236,7 +248,7 @@ func scanShard(
PageSize: executionsPageSize,
PageToken: token,
}
limiter.Wait(context.Background())
preconditionForDBCall(&report.TotalDBRequests, limiter)
resp, err := execStore.ListConcreteExecutions(req)
if err != nil {
report.Failure = &ShardScanReportFailure{
Expand All @@ -258,7 +270,8 @@ func scanShard(
outputFiles.ExecutionCheckFailureFile,
shardID,
limiter,
historyStore)
historyStore,
&report.TotalDBRequests)
switch historyVerificationResult {
case VerificationResultNoCorruption:
// nothing to do just keep checking other conditions
Expand Down Expand Up @@ -296,7 +309,8 @@ func scanShard(
shardID,
historyBranch,
execStore,
limiter)
limiter,
&report.TotalDBRequests)
switch currentExecutionVerificationResult {
case VerificationResultNoCorruption:
// nothing to do just keep checking other conditions
Expand All @@ -320,6 +334,7 @@ func verifyHistoryExists(
shardID int,
limiter *quotas.DynamicRateLimiter,
historyStore persistence.HistoryStore,
totalDBRequests *int64,
) (VerificationResult, *persistence.InternalReadHistoryBranchResponse, *shared.HistoryBranch) {
var branch shared.HistoryBranch
err := branchDecoder.Decode(execution.BranchToken, &branch)
Expand All @@ -342,7 +357,7 @@ func verifyHistoryExists(
ShardID: shardID,
PageSize: historyPageSize,
}
limiter.Wait(context.Background())
preconditionForDBCall(totalDBRequests, limiter)
history, err := historyStore.ReadHistoryBranch(readHistoryBranchReq)
if err != nil {
if err == gocql.ErrNotFound {
Expand All @@ -362,17 +377,16 @@ func verifyHistoryExists(
},
})
return VerificationResultDetectedCorruption, nil, nil
} else {
recordExecutionCheckFailure(executionCheckFailureFile, &ExecutionCheckFailure{
ShardID: shardID,
DomainID: execution.DomainID,
WorkflowID: execution.WorkflowID,
RunID: execution.RunID,
Note: "failed to read history branch with error other than gocql.ErrNotFond",
Details: err.Error(),
})
return VerificationResultCheckFailure, nil, nil
}
recordExecutionCheckFailure(executionCheckFailureFile, &ExecutionCheckFailure{
ShardID: shardID,
DomainID: execution.DomainID,
WorkflowID: execution.WorkflowID,
RunID: execution.RunID,
Note: "failed to read history branch with error other than gocql.ErrNotFond",
Details: err.Error(),
})
return VerificationResultCheckFailure, nil, nil
} else if history == nil || len(history.History) == 0 {
recordCorruptedWorkflow(corruptedExecutionFile, &CorruptedExecution{
ShardID: shardID,
Expand Down Expand Up @@ -459,6 +473,7 @@ func verifyCurrentExecution(
branch *shared.HistoryBranch,
execStore persistence.ExecutionStore,
limiter *quotas.DynamicRateLimiter,
totalDBRequests *int64,
) VerificationResult {
if execution.State != persistence.WorkflowStateCreated && execution.State != persistence.WorkflowStateRunning {
return VerificationResultNoCorruption
Expand All @@ -467,7 +482,7 @@ func verifyCurrentExecution(
DomainID: execution.DomainID,
WorkflowID: execution.WorkflowID,
}
limiter.Wait(context.Background())
preconditionForDBCall(totalDBRequests, limiter)
currentExecution, err := execStore.GetCurrentExecution(getCurrentExecutionRequest)
if err != nil {
switch err.(type) {
Expand Down Expand Up @@ -613,6 +628,7 @@ func writeToFile(file *os.File, message string) {

func includeShardInProgressReport(report *ShardScanReport, progressReport *ProgressReport, startTime time.Time) {
progressReport.NumberOfShardsFinished++
progressReport.TotalDBRequests += report.TotalDBRequests
if report.Failure != nil {
progressReport.NumberOfShardScanFailures++
}
Expand All @@ -622,18 +638,18 @@ func includeShardInProgressReport(report *ShardScanReport, progressReport *Progr
progressReport.ExecutionCheckFailureCount += report.Scanned.ExecutionCheckFailureCount
}

round := func(x float64) float64 {
precision := 0.05
return math.Round(x/precision) * precision
if progressReport.TotalExecutionsCount > 0 {
progressReport.PercentageCorrupted = math.Round((float64(progressReport.CorruptedExecutionsCount) * 100.0) / float64(progressReport.TotalExecutionsCount))
progressReport.PercentageCheckFailure = math.Round((float64(progressReport.ExecutionCheckFailureCount) * 100.0) / float64(progressReport.TotalExecutionsCount))
}

progressReport.PercentageCorrupted = round((float64(progressReport.CorruptedExecutionsCount) * 100.0) / float64(progressReport.TotalExecutionsCount))
progressReport.PercentageCheckFailure = round((float64(progressReport.ExecutionCheckFailureCount) * 100.0) / float64(progressReport.TotalExecutionsCount))

pastTime := time.Now().Sub(startTime)
hoursPast := float64(pastTime) / float64(time.Hour)
progressReport.ShardsPerHour = round(float64(progressReport.NumberOfShardsFinished) / hoursPast)
progressReport.ExecutionsPerHour = round(float64(progressReport.TotalExecutionsCount) / hoursPast)
progressReport.ShardsPerHour = math.Round(float64(progressReport.NumberOfShardsFinished) / hoursPast)
progressReport.ExecutionsPerHour = math.Round(float64(progressReport.TotalExecutionsCount) / hoursPast)

secondsPast := float64(pastTime) / float64(time.Second)
progressReport.DatabaseRPS = math.Round(float64(progressReport.TotalDBRequests) / secondsPast)
}

func getRateLimiter(startRPS int, targetRPS int, scaleUpSeconds int) *quotas.DynamicRateLimiter {
Expand All @@ -651,3 +667,8 @@ func getRateLimiter(startRPS int, targetRPS int, scaleUpSeconds int) *quotas.Dyn
}
return quotas.NewDynamicRateLimiter(rpsFn)
}

func preconditionForDBCall(totalDBRequests *int64, limiter *quotas.DynamicRateLimiter) {
*totalDBRequests = *totalDBRequests + 1
limiter.Wait(context.Background())
}

0 comments on commit e4f6af2

Please sign in to comment.