Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add database admin scan command #3165

Merged
merged 28 commits into from
Apr 7, 2020
Merged
Changes from 1 commit
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
e365f7a
Start db admin operation
andrewjdawson2016 Apr 2, 2020
0796d99
add persistence api to get concrete executions
andrewjdawson2016 Apr 3, 2020
c485efa
make more progress on scanner
andrewjdawson2016 Apr 3, 2020
66ff866
finish db scan command
andrewjdawson2016 Apr 3, 2020
f5a9148
add license header
andrewjdawson2016 Apr 3, 2020
0680282
finish db scan command
andrewjdawson2016 Apr 3, 2020
cedbbb6
fix small issues
andrewjdawson2016 Apr 3, 2020
dc29b3d
remove hostSelectionPolicy from cassandra cluster config as it is alr…
andrewjdawson2016 Apr 3, 2020
31f72dc
add rate limiter, make more options configurable
andrewjdawson2016 Apr 4, 2020
0e46f1c
Add information to progress report on if shards count not finish
andrewjdawson2016 Apr 4, 2020
47e16e6
make improvements to scan
andrewjdawson2016 Apr 4, 2020
d1a5e89
run make fmt and cleanup file writes
andrewjdawson2016 Apr 6, 2020
d169736
correct bug of resetting map in db scan and use deserialize batch ins…
andrewjdawson2016 Apr 6, 2020
0f20191
treat getting empty history as a corrupted workflow
andrewjdawson2016 Apr 6, 2020
8fa70b4
remove generated *.json files
andrewjdawson2016 Apr 6, 2020
603898e
correct cql connection policy
andrewjdawson2016 Apr 6, 2020
bac6227
remove json files
andrewjdawson2016 Apr 6, 2020
256f2da
Update db admin commands
andrewjdawson2016 Apr 6, 2020
62e36a8
run make fmt
andrewjdawson2016 Apr 7, 2020
5fc7bf3
Add logic to delete empty files
andrewjdawson2016 Apr 7, 2020
ca1ca30
remove scan files
andrewjdawson2016 Apr 7, 2020
f9ae380
add check for orphan concrete mutable state
andrewjdawson2016 Apr 7, 2020
eaa9ef0
change name for current execution corruption:
andrewjdawson2016 Apr 7, 2020
b54ffd8
fix nil ptr
andrewjdawson2016 Apr 7, 2020
0323216
add rps to progress report and take shard range
andrewjdawson2016 Apr 7, 2020
e4f6af2
add db rps to progress report
andrewjdawson2016 Apr 7, 2020
f06f0ea
respond to cr feedback
andrewjdawson2016 Apr 7, 2020
aa34538
Merge branch 'master' into dbAdmin
andrewjdawson2016 Apr 7, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
make improvements to scan
  • Loading branch information
andrewjdawson2016 committed Apr 7, 2020
commit 47e16e67a0a7b6c1da239481bc2c8467fc36d52c
188 changes: 121 additions & 67 deletions tools/cli/adminDBCommands.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (

"github.com/gocql/gocql"
"github.com/urfave/cli"
"github.com/uber/cadence/common"

"github.com/uber/cadence/.gen/go/shared"
"github.com/uber/cadence/common/codec"
Expand All @@ -42,16 +43,18 @@ import (

const (
historyPageSize = 50
failedToRunCheckFilename = "failedToRunCheck.json"
startEventCorruptedFilename = "startEventCorrupted.json"
)

type (
scanFiles struct {
failedToRunCheckFile *os.File
startEventCorruptFile *os.File
startEventCorruptedFile *os.File
}

// ExecutionScanEntity is the execution entity which gets written to output file from scan
ExecutionScanEntity struct {
// CorruptedExecutionEntity is a corrupted execution
CorruptedExecutionEntity struct {
ShardID int
DomainID string
WorkflowID string
Expand All @@ -60,13 +63,13 @@ type (
TreeID string
BranchID string
CloseStatus int
ScanMetadata ExecutionScanEntityMetadata
Note string
}

// ExecutionScanEntityMetadata is the metadata from scanning the execution that gets written to output file
ExecutionScanEntityMetadata struct {
Message string
ErrorMsg string
// ScanFailure is a scan failure
ScanFailure struct {
Note string
Details string
}

// ProgressReport contains metadata about the scan for all shards which have been finished
Expand All @@ -89,17 +92,20 @@ type (

// AdminDBScan is used to scan over all executions in database and detect corruptions
func AdminDBScan(c *cli.Context) {
numShards := getRequiredIntOption(c, FlagNumberOfShards)
startingRPS := getRequiredIntOption(c, FlagStartingRPS)
targetRPS := getRequiredIntOption(c, FlagRPS)
scaleUpSeconds := getRequiredIntOption(c, FlagRPSScaleUpSeconds)
scanWorkerCount := getRequiredIntOption(c, FlagGoRoutineCount)
executionsPageSize := getRequiredIntOption(c, FlagPageSize)
scanReportRate := getRequiredIntOption(c, FlagScanReportRate)
numShards := c.Int(FlagNumberOfShards)
startingRPS := c.Int(FlagStartingRPS)
targetRPS := c.Int(FlagRPS)
scaleUpSeconds := c.Int(FlagRPSScaleUpSeconds)
scanWorkerCount := c.Int(FlagGoRoutineCount)
executionsPageSize := c.Int(FlagPageSize)
scanReportRate := c.Int(FlagScanReportRate)

payloadSerializer := persistence.NewPayloadSerializer()
rateLimiter := getRateLimiter(startingRPS, targetRPS, scaleUpSeconds)
scanFiles, deferFn := createScanFiles()
session := connectToCassandra(c)
historyStore := cassp.NewHistoryV2PersistenceFromSession(session, loggerimpl.NewNopLogger())
branchDecoder := codec.NewThriftRWEncoder()
defer func() {
deferFn()
session.Close()
Expand All @@ -110,7 +116,7 @@ func AdminDBScan(c *cli.Context) {
go func(workerIdx int) {
for shardID := 0; shardID < numShards; shardID++ {
if shardID%scanWorkerCount == workerIdx {
shardReports <- scanShard(session, shardID, scanFiles, rateLimiter, executionsPageSize)
shardReports <- scanShard(session, shardID, scanFiles, rateLimiter, executionsPageSize, payloadSerializer, historyStore, branchDecoder)
}
}
}(i)
Expand All @@ -121,7 +127,11 @@ func AdminDBScan(c *cli.Context) {
report := <-shardReports
includeShardInProgressReport(report, progressReport)
if i%scanReportRate == 0 {
fmt.Printf("%+v", progressReport)
reportBytes, err := json.MarshalIndent(*progressReport, "", "\t")
if err != nil {
ErrorAndExit("failed to print progress", err)
}
fmt.Println(string(reportBytes))
}
}
}
Expand All @@ -132,13 +142,14 @@ func scanShard(
scanFiles *scanFiles,
limiter *quotas.DynamicRateLimiter,
executionsPageSize int,
payloadSerializer persistence.PayloadSerializer,
historyStore persistence.HistoryStore,
branchDecoder *codec.ThriftRWEncoder,
) *ShardReport {
execStore, err := cassp.NewWorkflowExecutionPersistence(shardID, session, loggerimpl.NewNopLogger())
if err != nil {
ErrorAndExit("failed to create execution persistence", err)
}
historyStore := cassp.NewHistoryV2PersistenceFromSession(session, loggerimpl.NewNopLogger())
branchDecoder := codec.NewThriftRWEncoder()
var token []byte
report := &ShardReport{}
isFirstIteration := true
Expand All @@ -158,61 +169,84 @@ func scanShard(
token = resp.NextPageToken

for _, e := range resp.ExecutionInfos {
report.NumberOfExecutions++
var branch shared.HistoryBranch
err := branchDecoder.Decode(e.BranchToken, &branch)
if err != nil {
report.NumberOfFailedChecks++
writeToFile(scanFiles.failedToRunCheckFile, fmt.Sprintf("failed to decode branch token: %v", err))
continue
}
readHistoryBranchReq := &persistence.InternalReadHistoryBranchRequest{
TreeID: branch.GetTreeID(),
BranchID: branch.GetBranchID(),
MinNodeID: 1,
MaxNodeID: 20,
ShardID: shardID,
PageSize: historyPageSize,
}
limiter.Wait(context.Background())
_, err = historyStore.ReadHistoryBranch(readHistoryBranchReq)
if err != nil {
if err == gocql.ErrNotFound {
metadata := ExecutionScanEntityMetadata{
Message: "Detected workflow was corrupted based on missing history",
ErrorMsg: err.Error(),
}
report.NumberOfCorruptedExecutions++
writeExecutionToFile(scanFiles.startEventCorruptFile, shardID, branch.GetTreeID(), branch.GetBranchID(), metadata, e)
} else {
metadata := ExecutionScanEntityMetadata{
Message: "Checking corruption based on start event failed",
ErrorMsg: err.Error(),
}
report.NumberOfFailedChecks++
writeExecutionToFile(scanFiles.failedToRunCheckFile, shardID, branch.GetTreeID(), branch.GetBranchID(), metadata, e)
}
}
verifyExecution(report, e, branchDecoder, scanFiles, shardID, limiter, historyStore, payloadSerializer)
}
}
return report
}

func writeToFile(file *os.File, message string) {
if _, err := file.WriteString(fmt.Sprintf("%v\r\n", message)); err != nil {
ErrorAndExit("failed to write to file", err)
func verifyExecution(
report *ShardReport,
execution *persistence.InternalWorkflowExecutionInfo,
branchDecoder *codec.ThriftRWEncoder,
scanFiles *scanFiles,
shardID int,
limiter *quotas.DynamicRateLimiter,
historyStore persistence.HistoryStore,
payloadSerializer persistence.PayloadSerializer,
) {
report.NumberOfExecutions++
var branch shared.HistoryBranch
err := branchDecoder.Decode(execution.BranchToken, &branch)
if err != nil {
report.NumberOfFailedChecks++
writeToFile(scanFiles.failedToRunCheckFile, fmt.Sprintf("failed to decode branch token: %v", err))
return
}
readHistoryBranchReq := &persistence.InternalReadHistoryBranchRequest{
TreeID: branch.GetTreeID(),
BranchID: branch.GetBranchID(),
MinNodeID: common.FirstEventID,
MaxNodeID: common.EndEventID,
ShardID: shardID,
PageSize: historyPageSize,
}
limiter.Wait(context.Background())
history, err := historyStore.ReadHistoryBranch(readHistoryBranchReq)
if err != nil {
if err == gocql.ErrNotFound {
report.NumberOfCorruptedExecutions++
recordCorruptedWorkflow(
scanFiles.startEventCorruptedFile,
shardID,
branch.GetTreeID(),
branch.GetBranchID(),
"got not found error",
execution)
} else {
report.NumberOfFailedChecks++
recordScanFailure(scanFiles.failedToRunCheckFile, "got error from read history other than not exists error", err)
}
} else if history == nil || len(history.History) == 0 {
ErrorAndExit("got no error from fetching history but history is empty", nil)
} else {
firstEvent := history.History[0]
historyEvent, err := payloadSerializer.DeserializeEvent(firstEvent)
if err != nil {
report.NumberOfFailedChecks++
recordScanFailure(scanFiles.failedToRunCheckFile, "got error decoding history event", err)
} else if historyEvent.GetEventId() != common.FirstEventID || historyEvent.GetEventType() != shared.EventTypeWorkflowExecutionStarted {
report.NumberOfCorruptedExecutions++
recordCorruptedWorkflow(
scanFiles.startEventCorruptedFile,
shardID,
branch.GetTreeID(),
branch.GetBranchID(),
"got workflow with incorrect first event",
execution)
}
}
}

func writeExecutionToFile(
func recordCorruptedWorkflow(
file *os.File,
shardID int,
treeID string,
branchID string,
metadata ExecutionScanEntityMetadata,
note string,
info *persistence.InternalWorkflowExecutionInfo,
) {
exec := ExecutionScanEntity{
cee := CorruptedExecutionEntity{
ShardID: shardID,
DomainID: info.DomainID,
WorkflowID: info.WorkflowID,
Expand All @@ -221,32 +255,52 @@ func writeExecutionToFile(
TreeID: treeID,
BranchID: branchID,
CloseStatus: info.CloseStatus,
ScanMetadata: metadata,
Note: note,
}
data, err := json.Marshal(exec)
data, err := json.Marshal(cee)
if err != nil {
ErrorAndExit("failed to marshal execution", err)
ErrorAndExit("failed to marshal CorruptedExecutionEntity", err)
}
writeToFile(file, string(data))
}

func recordScanFailure(file *os.File, note string, err error) {
sf := ScanFailure{
Note: note,
}
if err != nil {
sf.Details = err.Error()
}
data, marshalErr := json.Marshal(sf)
if marshalErr != nil {
ErrorAndExit("failed to marshal ScanFailure", marshalErr)
}
writeToFile(file, string(data))
}

func writeToFile(file *os.File, message string) {
if _, err := file.WriteString(fmt.Sprintf("%v\r\n", message)); err != nil {
ErrorAndExit("failed to write to file", err)
}
}

func createScanFiles() (*scanFiles, func()) {
failedToRunCheckFile, err := os.Create(fmt.Sprintf("failedToRunCheck.json"))
failedToRunCheckFile, err := os.Create(fmt.Sprintf(failedToRunCheckFilename))
if err != nil {
ErrorAndExit("failed to create file", err)
}
startEventCorruptFile, err := os.Create(fmt.Sprintf("startEventCorruptFile.json"))
startEventCorruptedFile, err := os.Create(fmt.Sprintf(startEventCorruptedFilename))
if err != nil {
ErrorAndExit("failed to create file", err)
}
deferFn := func() {
failedToRunCheckFile.Close()
startEventCorruptFile.Close()
startEventCorruptedFile.Close()
}

return &scanFiles{
failedToRunCheckFile: failedToRunCheckFile,
startEventCorruptFile: startEventCorruptFile,
startEventCorruptedFile: startEventCorruptedFile,
}, deferFn
}

Expand Down