Skip to content

Commit

Permalink
Refactor duplicate code in CLI (uber#1468)
Browse files Browse the repository at this point in the history
* Refactor CLI flags

* Refactor start and run
  • Loading branch information
vancexu authored Feb 14, 2019
1 parent 6876051 commit 97b7ea7
Show file tree
Hide file tree
Showing 3 changed files with 443 additions and 568 deletions.
244 changes: 49 additions & 195 deletions tools/cli/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,123 +45,6 @@ import (
"go.uber.org/cadence/client"
)

/**
Flags used to specify cli command line arguments
*/
const (
FlagPort = "port"
FlagUsername = "username"
FlagPassword = "password"
FlagKeyspace = "keyspace"
FlagAddress = "address"
FlagAddressWithAlias = FlagAddress + ", ad"
FlagHistoryAddress = "history_address"
FlagHistoryAddressWithAlias = FlagHistoryAddress + ", had"
FlagDomainID = "domain_id"
FlagDomain = "domain"
FlagDomainWithAlias = FlagDomain + ", do"
FlagShardID = "shard_id"
FlagShardIDWithAlias = FlagShardID + ", sid"
FlagWorkflowID = "workflow_id"
FlagWorkflowIDWithAlias = FlagWorkflowID + ", wid, w"
FlagRunID = "run_id"
FlagTreeID = "tree_id"
FlagBranchID = "branch_id"
FlagNumberOfShards = "number_of_shards"
FlagRunIDWithAlias = FlagRunID + ", rid, r"
FlagTargetCluster = "target_cluster"
FlagMinEventID = "min_event_id"
FlagMaxEventID = "max_event_id"
FlagTaskList = "tasklist"
FlagTaskListWithAlias = FlagTaskList + ", tl"
FlagTaskListType = "tasklisttype"
FlagTaskListTypeWithAlias = FlagTaskListType + ", tlt"
FlagWorkflowIDReusePolicy = "workflowidreusepolicy"
FlagWorkflowIDReusePolicyAlias = FlagWorkflowIDReusePolicy + ", wrp"
FlagCronSchedule = "cron"
FlagWorkflowType = "workflow_type"
FlagWorkflowTypeWithAlias = FlagWorkflowType + ", wt"
FlagWorkflowStatus = "status"
FlagWorkflowStatusWithAlias = FlagWorkflowStatus + ", s"
FlagExecutionTimeout = "execution_timeout"
FlagExecutionTimeoutWithAlias = FlagExecutionTimeout + ", et"
FlagDecisionTimeout = "decision_timeout"
FlagDecisionTimeoutWithAlias = FlagDecisionTimeout + ", dt"
FlagContextTimeout = "context_timeout"
FlagContextTimeoutWithAlias = FlagContextTimeout + ", ct"
FlagInput = "input"
FlagInputWithAlias = FlagInput + ", i"
FlagInputFile = "input_file"
FlagInputFileWithAlias = FlagInputFile + ", if"
FlagInputTopic = "input_topic"
FlagInputTopicWithAlias = FlagInputTopic + ", it"
FlagHostFile = "host_file"
FlagCluster = "cluster"
FlagInputCluster = "input_cluster"
FlagStartOffset = "start_offset"
FlagTopic = "topic"
FlagGroup = "group"
FlagReason = "reason"
FlagReasonWithAlias = FlagReason + ", re"
FlagOpen = "open"
FlagOpenWithAlias = FlagOpen + ", op"
FlagMore = "more"
FlagMoreWithAlias = FlagMore + ", m"
FlagPageSize = "pagesize"
FlagPageSizeWithAlias = FlagPageSize + ", ps"
FlagEarliestTime = "earliest_time"
FlagEarliestTimeWithAlias = FlagEarliestTime + ", et"
FlagLatestTime = "latest_time"
FlagLatestTimeWithAlias = FlagLatestTime + ", lt"
FlagPrintEventVersion = "print_event_version"
FlagPrintEventVersionWithAlias = FlagPrintEventVersion + ", pev"
FlagPrintFullyDetail = "print_full"
FlagPrintFullyDetailWithAlias = FlagPrintFullyDetail + ", pf"
FlagPrintRawTime = "print_raw_time"
FlagPrintRawTimeWithAlias = FlagPrintRawTime + ", prt"
FlagPrintDateTime = "print_datetime"
FlagPrintDateTimeWithAlias = FlagPrintDateTime + ", pdt"
FlagDescription = "description"
FlagDescriptionWithAlias = FlagDescription + ", desc"
FlagOwnerEmail = "owner_email"
FlagOwnerEmailWithAlias = FlagOwnerEmail + ", oe"
FlagRetentionDays = "retention"
FlagRetentionDaysWithAlias = FlagRetentionDays + ", rd"
FlagEmitMetric = "emit_metric"
FlagEmitMetricWithAlias = FlagEmitMetric + ", em"
FlagArchivalStatus = "archival_status"
FlagArchivalStatusWithAlias = FlagArchivalStatus + ", as"
FlagArchivalBucketName = "bucket"
FlagArchivalBucketNameWithAlias = FlagArchivalBucketName + ", ab"

FlagName = "name"
FlagNameWithAlias = FlagName + ", n"
FlagOutputFilename = "output_filename"
FlagOutputFilenameWithAlias = FlagOutputFilename + ", of"
FlagQueryType = "query_type"
FlagQueryTypeWithAlias = FlagQueryType + ", qt"
FlagShowDetail = "show_detail"
FlagShowDetailWithAlias = FlagShowDetail + ", sd"
FlagActiveClusterName = "active_cluster"
FlagActiveClusterNameWithAlias = FlagActiveClusterName + ", ac"
FlagClusters = "clusters"
FlagClustersWithAlias = FlagClusters + ", cl"
FlagDomainData = "domain_data"
FlagDomainDataWithAlias = FlagDomainData + ", dmd"
FlagEventID = "event_id"
FlagEventIDWithAlias = FlagEventID + ", eid"
FlagMaxFieldLength = "max_field_length"
FlagMaxFieldLengthWithAlias = FlagMaxFieldLength + ", maxl"
FlagSecurityToken = "security_token"
FlagSecurityTokenWithAlias = FlagSecurityToken + ", st"
FlagSkipErrorMode = "skip_errors"
FlagSkipErrorModeWithAlias = FlagSkipErrorMode + ", serr"
FlagHeadersMode = "headers"
FlagHeadersModeWithAlias = FlagHeadersMode + ", he"
FlagMessageType = "message_type"
FlagMessageTypeWithAlias = FlagMessageType + ", mt"
)

const (
localHostPort = "127.0.0.1:7933"

Expand Down Expand Up @@ -536,62 +419,15 @@ func showHistoryHelper(c *cli.Context, wid, rid string) {

// StartWorkflow starts a new workflow execution
func StartWorkflow(c *cli.Context) {
serviceClient := cFactory.ClientFrontendClient(c)

domain := getRequiredGlobalOption(c, FlagDomain)
taskList := getRequiredOption(c, FlagTaskList)
workflowType := getRequiredOption(c, FlagWorkflowType)
et := c.Int(FlagExecutionTimeout)
if et == 0 {
ErrorAndExit(fmt.Sprintf("Option %s format is invalid.", FlagExecutionTimeout), nil)
}
dt := c.Int(FlagDecisionTimeout)
wid := c.String(FlagWorkflowID)
if len(wid) == 0 {
wid = uuid.New()
}
reusePolicy := defaultWorkflowIDReusePolicy.Ptr()
if c.IsSet(FlagWorkflowIDReusePolicy) {
reusePolicy = getWorkflowIDReusePolicy(c.Int(FlagWorkflowIDReusePolicy))
}

input := processJSONInput(c)

tcCtx, cancel := newContext()
defer cancel()

startRequest := &s.StartWorkflowExecutionRequest{
RequestId: common.StringPtr(uuid.New()),
Domain: common.StringPtr(domain),
WorkflowId: common.StringPtr(wid),
WorkflowType: &s.WorkflowType{
Name: common.StringPtr(workflowType),
},
TaskList: &s.TaskList{
Name: common.StringPtr(taskList),
},
Input: []byte(input),
ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(int32(et)),
TaskStartToCloseTimeoutSeconds: common.Int32Ptr(int32(dt)),
Identity: common.StringPtr(getCliIdentity()),
WorkflowIdReusePolicy: reusePolicy,
}

if c.IsSet(FlagCronSchedule) {
startRequest.CronSchedule = common.StringPtr(c.String(FlagCronSchedule))
}

resp, err := serviceClient.StartWorkflowExecution(tcCtx, startRequest)

if err != nil {
ErrorAndExit("Failed to create workflow.", err)
} else {
fmt.Printf("Started Workflow Id: %s, run Id: %s\n", wid, resp.GetRunId())
}
startWorkflowHelper(c, false)
}

// RunWorkflow starts a new workflow execution and print workflow progress and result
func RunWorkflow(c *cli.Context) {
startWorkflowHelper(c, true)
}

func startWorkflowHelper(c *cli.Context, shouldPrintProgress bool) {
serviceClient := cFactory.ClientFrontendClient(c)

domain := getRequiredGlobalOption(c, FlagDomain)
Expand All @@ -612,14 +448,6 @@ func RunWorkflow(c *cli.Context) {
}

input := processJSONInput(c)

contextTimeout := defaultContextTimeoutForLongPoll
if c.IsSet(FlagContextTimeout) {
contextTimeout = time.Duration(c.Int(FlagContextTimeout)) * time.Second
}
tcCtx, cancel := newContextForLongPoll(contextTimeout)
defer cancel()

startRequest := &s.StartWorkflowExecutionRequest{
RequestId: common.StringPtr(uuid.New()),
Domain: common.StringPtr(domain),
Expand All @@ -640,29 +468,55 @@ func RunWorkflow(c *cli.Context) {
startRequest.CronSchedule = common.StringPtr(c.String(FlagCronSchedule))
}

resp, err := serviceClient.StartWorkflowExecution(tcCtx, startRequest)
startFn := func() {
tcCtx, cancel := newContext()
defer cancel()
resp, err := serviceClient.StartWorkflowExecution(tcCtx, startRequest)

if err != nil {
ErrorAndExit("Failed to run workflow.", err)
if err != nil {
ErrorAndExit("Failed to create workflow.", err)
} else {
fmt.Printf("Started Workflow Id: %s, run Id: %s\n", wid, resp.GetRunId())
}
}

// print execution summary
fmt.Println(colorMagenta("Running execution:"))
table := tablewriter.NewWriter(os.Stdout)
executionData := [][]string{
{"Workflow Id", wid},
{"Run Id", resp.GetRunId()},
{"Type", workflowType},
{"Domain", domain},
{"Task List", taskList},
{"Args", truncate(input)}, // in case of large input
runFn := func() {
contextTimeout := defaultContextTimeoutForLongPoll
if c.IsSet(FlagContextTimeout) {
contextTimeout = time.Duration(c.Int(FlagContextTimeout)) * time.Second
}
tcCtx, cancel := newContextForLongPoll(contextTimeout)
defer cancel()
resp, err := serviceClient.StartWorkflowExecution(tcCtx, startRequest)

if err != nil {
ErrorAndExit("Failed to run workflow.", err)
}

// print execution summary
fmt.Println(colorMagenta("Running execution:"))
table := tablewriter.NewWriter(os.Stdout)
executionData := [][]string{
{"Workflow Id", wid},
{"Run Id", resp.GetRunId()},
{"Type", workflowType},
{"Domain", domain},
{"Task List", taskList},
{"Args", truncate(input)}, // in case of large input
}
table.SetBorder(false)
table.SetColumnSeparator(":")
table.AppendBulk(executionData) // Add Bulk Data
table.Render()

printWorkflowProgress(c, wid, resp.GetRunId())
}
table.SetBorder(false)
table.SetColumnSeparator(":")
table.AppendBulk(executionData) // Add Bulk Data
table.Render()

printWorkflowProgress(c, wid, resp.GetRunId())
if shouldPrintProgress {
runFn()
} else {
startFn()
}
}

// helper function to print workflow progress with time refresh every second
Expand Down
Loading

0 comments on commit 97b7ea7

Please sign in to comment.