From fb60a96a6b229b99343693d01311a5179c8787f8 Mon Sep 17 00:00:00 2001 From: Django Cass Date: Sat, 3 Oct 2020 17:55:55 +1000 Subject: [PATCH] Added support for domain-polling Updated PollForTask and Ack task to match API spec Minor code cleanup --- client/go/README.md | 4 +- client/go/conductorhttpclient.go | 561 +++++++++++++-------------- client/go/conductorworker.go | 22 +- client/go/httpclient/httpclient.go | 235 +++++------ client/go/startclient/startclient.go | 4 +- 5 files changed, 415 insertions(+), 411 deletions(-) diff --git a/client/go/README.md b/client/go/README.md index f54e51212e..93b1381561 100644 --- a/client/go/README.md +++ b/client/go/README.md @@ -99,8 +99,8 @@ import ( func main() { c := conductor.NewConductorWorker("http://localhost:8080", 1, 10000) - c.Start("task_1", sample.Task_1_Execution_Function, false) - c.Start("task_2", sample.Task_2_Execution_Function, true) + c.Start("task_1", "", sample.Task_1_Execution_Function, false) + c.Start("task_2", "mydomain", sample.Task_2_Execution_Function, true) } ``` diff --git a/client/go/conductorhttpclient.go b/client/go/conductorhttpclient.go index 428a9989b4..67f0c0dd86 100644 --- a/client/go/conductorhttpclient.go +++ b/client/go/conductorhttpclient.go @@ -14,386 +14,385 @@ package conductor import ( - "github.com/netflix/conductor/client/go/httpclient" - "strconv" - "log" - "fmt" + "fmt" + "github.com/netflix/conductor/client/go/httpclient" + "log" + "strconv" ) type ConductorHttpClient struct { - httpClient *httpclient.HttpClient + httpClient *httpclient.HttpClient } func NewConductorHttpClient(baseUrl string) *ConductorHttpClient { - conductorClient := new(ConductorHttpClient) - headers := map[string]string{"Content-Type": "application/json", "Accept": "application/json"} - httpClient := httpclient.NewHttpClient(baseUrl, headers, true) - conductorClient.httpClient = httpClient - return conductorClient + conductorClient := new(ConductorHttpClient) + headers := map[string]string{"Content-Type": "application/json", "Accept": "application/json"} + httpClient := httpclient.NewHttpClient(baseUrl, headers, true) + conductorClient.httpClient = httpClient + return conductorClient } - /**********************/ /* Metadata Functions */ /**********************/ func (c *ConductorHttpClient) GetWorkflowDef(workflowName string, version int) (string, error) { - url := c.httpClient.MakeUrl("/metadata/workflow/{workflowName}", "{workflowName}", workflowName) - versionString := "1" - - // Set default version as 1 - if version > 0 { - versionString = strconv.Itoa(version) - } - params := map[string]string{"version":versionString} - outputString, err := c.httpClient.Get(url, params, nil) - if err != nil { - log.Println("Error while trying to Get Workflow Definition", err) - return "", err - } else { - return outputString, nil - } + url := c.httpClient.MakeUrl("/metadata/workflow/{workflowName}", "{workflowName}", workflowName) + versionString := "1" + + // Set default version as 1 + if version > 0 { + versionString = strconv.Itoa(version) + } + params := map[string]string{"version": versionString} + outputString, err := c.httpClient.Get(url, params, nil) + if err != nil { + log.Println("Error while trying to Get Workflow Definition", err) + return "", err + } else { + return outputString, nil + } } func (c *ConductorHttpClient) CreateWorkflowDef(workflowDefBody string) (string, error) { - url := c.httpClient.MakeUrl("/metadata/workflow") - outputString, err := c.httpClient.Post(url, nil, nil, workflowDefBody) - if err != nil { - log.Println("Error while trying to Create Workflow Definition", err) - return "", err - } else { - return outputString, nil - } + url := c.httpClient.MakeUrl("/metadata/workflow") + outputString, err := c.httpClient.Post(url, nil, nil, workflowDefBody) + if err != nil { + log.Println("Error while trying to Create Workflow Definition", err) + return "", err + } else { + return outputString, nil + } } func (c *ConductorHttpClient) UpdateWorkflowDefs(workflowDefsBody string) (string, error) { - url := c.httpClient.MakeUrl("/metadata/workflow") - outputString, err := c.httpClient.Put(url, nil, nil, workflowDefsBody) - if err != nil { - log.Println("Error while trying to Update Workflow Definitions", err) - return "", err - } else { - return outputString, nil - } + url := c.httpClient.MakeUrl("/metadata/workflow") + outputString, err := c.httpClient.Put(url, nil, nil, workflowDefsBody) + if err != nil { + log.Println("Error while trying to Update Workflow Definitions", err) + return "", err + } else { + return outputString, nil + } } func (c *ConductorHttpClient) GetAllWorkflowDefs() (string, error) { - url := c.httpClient.MakeUrl("/metadata/workflow") - outputString, err := c.httpClient.Get(url, nil, nil) - if err != nil { - log.Println("Error while trying to Get All Workflow Definitions", err) - return "", err - } else { - return outputString, nil - } + url := c.httpClient.MakeUrl("/metadata/workflow") + outputString, err := c.httpClient.Get(url, nil, nil) + if err != nil { + log.Println("Error while trying to Get All Workflow Definitions", err) + return "", err + } else { + return outputString, nil + } } func (c *ConductorHttpClient) UnRegisterWorkflowDef(workflowDefName string, version int) (string, error) { - versionString := "" + versionString := "" - versionString = strconv.Itoa(version) + versionString = strconv.Itoa(version) - url := c.httpClient.MakeUrl("/metadata/workflow/{workflowDefName}/{version}", "{workflowDefName}", - workflowDefName, "{version}", versionString) + url := c.httpClient.MakeUrl("/metadata/workflow/{workflowDefName}/{version}", "{workflowDefName}", + workflowDefName, "{version}", versionString) - outputString, err := c.httpClient.Delete(url, nil, nil, "") + outputString, err := c.httpClient.Delete(url, nil, nil, "") - if err != nil { - log.Println("Error while trying to Unregister Workflow Definition", workflowDefName, err) - return "", err - } else { - return outputString, nil - } + if err != nil { + log.Println("Error while trying to Unregister Workflow Definition", workflowDefName, err) + return "", err + } else { + return outputString, nil + } } func (c *ConductorHttpClient) GetTaskDef(taskDefName string) (string, error) { - url := c.httpClient.MakeUrl("/metadata/taskdefs/{taskDefName}", "{taskDefName}", taskDefName) - outputString, err := c.httpClient.Get(url, nil, nil) - if err != nil { - log.Println("Error while trying to Get Task Definition", err) - return "", err - } else { - return outputString, nil - } + url := c.httpClient.MakeUrl("/metadata/taskdefs/{taskDefName}", "{taskDefName}", taskDefName) + outputString, err := c.httpClient.Get(url, nil, nil) + if err != nil { + log.Println("Error while trying to Get Task Definition", err) + return "", err + } else { + return outputString, nil + } } func (c *ConductorHttpClient) RegisterTaskDefs(taskDefsMeta string) (string, error) { - url := c.httpClient.MakeUrl("/metadata/taskdefs") - outputString, err := c.httpClient.Post(url, nil, nil, taskDefsMeta) - if err != nil { - log.Println("Error while trying to Register Task Definitions", err) - return "", err - } else { - return outputString, nil - } + url := c.httpClient.MakeUrl("/metadata/taskdefs") + outputString, err := c.httpClient.Post(url, nil, nil, taskDefsMeta) + if err != nil { + log.Println("Error while trying to Register Task Definitions", err) + return "", err + } else { + return outputString, nil + } } func (c *ConductorHttpClient) UpdateTaskDef(taskDefMeta string) (string, error) { - url := c.httpClient.MakeUrl("/metadata/taskdefs") - outputString, err := c.httpClient.Put(url, nil, nil, taskDefMeta) - if err != nil { - log.Println("Error while trying to Update Task Definition", err) - return "", err - } else { - return outputString, nil - } + url := c.httpClient.MakeUrl("/metadata/taskdefs") + outputString, err := c.httpClient.Put(url, nil, nil, taskDefMeta) + if err != nil { + log.Println("Error while trying to Update Task Definition", err) + return "", err + } else { + return outputString, nil + } } func (c *ConductorHttpClient) UnRegisterTaskDef(taskDefName string) (string, error) { - url := c.httpClient.MakeUrl("/metadata/taskdefs/{taskDefName}", "{taskDefName}", taskDefName) - outputString, err := c.httpClient.Delete(url, nil, nil, "") - if err != nil { - log.Println("Error while trying to Unregister Task Definition", taskDefName, err) - return "", err - } else { - return outputString, nil - } + url := c.httpClient.MakeUrl("/metadata/taskdefs/{taskDefName}", "{taskDefName}", taskDefName) + outputString, err := c.httpClient.Delete(url, nil, nil, "") + if err != nil { + log.Println("Error while trying to Unregister Task Definition", taskDefName, err) + return "", err + } else { + return outputString, nil + } } func (c *ConductorHttpClient) GetAllTaskDefs() (string, error) { - url := c.httpClient.MakeUrl("/metadata/taskdefs") - outputString, err := c.httpClient.Get(url, nil, nil) - if err != nil { - log.Println("Error while trying to Get All Task Definitions", err) - return "", err - } else { - return outputString, nil - } + url := c.httpClient.MakeUrl("/metadata/taskdefs") + outputString, err := c.httpClient.Get(url, nil, nil) + if err != nil { + log.Println("Error while trying to Get All Task Definitions", err) + return "", err + } else { + return outputString, nil + } } - /**********************/ /* Task Functions */ /**********************/ func (c *ConductorHttpClient) GetTask(taskId string) (string, error) { - url := c.httpClient.MakeUrl("/tasks/{taskId}", "{taskId}", taskId) - outputString, err := c.httpClient.Get(url, nil, nil) - if err != nil { - log.Println("Error while trying to Get Task", taskId, err) - return "", err - } else { - return outputString, nil - } + url := c.httpClient.MakeUrl("/tasks/{taskId}", "{taskId}", taskId) + outputString, err := c.httpClient.Get(url, nil, nil) + if err != nil { + log.Println("Error while trying to Get Task", taskId, err) + return "", err + } else { + return outputString, nil + } } func (c *ConductorHttpClient) UpdateTask(taskBody string) (string, error) { - url := c.httpClient.MakeUrl("/tasks") - outputString, err := c.httpClient.Post(url, nil, nil, taskBody) - if err != nil { - log.Println("Error while trying to Update Task", err) - return "", err - } else { - return outputString, nil - } + url := c.httpClient.MakeUrl("/tasks") + outputString, err := c.httpClient.Post(url, nil, nil, taskBody) + if err != nil { + log.Println("Error while trying to Update Task", err) + return "", err + } else { + return outputString, nil + } } -func (c *ConductorHttpClient) PollForTask(taskType string, workerid string) (string, error) { - url := c.httpClient.MakeUrl("/tasks/poll/{taskType}", "{taskType}", taskType) - params := map[string]string{"workerid":workerid} - outputString, err := c.httpClient.Get(url, params, nil) - if err != nil { - log.Println("Error while trying to Poll For Task taskType:", taskType, ",workerid:", workerid, err) - return "", err - } else { - return outputString, nil - } +func (c *ConductorHttpClient) PollForTask(taskType string, workerid string, domain string) (string, error) { + url := c.httpClient.MakeUrl("/tasks/poll/{taskType}", "{taskType}", taskType) + params := map[string]string{ + "workerid": workerid, + "domain": domain, + } + outputString, err := c.httpClient.Get(url, params, nil) + if err != nil { + log.Println("Error while trying to Poll For Task taskType:", taskType, ",workerid:", workerid, err) + return "", err + } else { + return outputString, nil + } } -func (c *ConductorHttpClient) AckTask(taskId string, workerid string) (string, error) { - url := c.httpClient.MakeUrl("/tasks/{taskId}/ack", "{taskId}", taskId) - params := map[string]string{"workerid":workerid} - headers := map[string]string{"Accept":"application/json"} - outputString, err := c.httpClient.Post(url, params, headers, "") - if err != nil { - return "", err - } - if outputString != "true" { - return "", fmt.Errorf("Task id: %s has already been Acked", taskId) - } - return outputString, nil +func (c *ConductorHttpClient) AckTask(taskId string) (string, error) { + url := c.httpClient.MakeUrl("/tasks/{taskId}/ack", "{taskId}", taskId) + headers := map[string]string{"Accept": "application/json"} + outputString, err := c.httpClient.Post(url, nil, headers, "") + if err != nil { + return "", err + } + if outputString != "true" { + return "", fmt.Errorf("Task id: %s has already been Acked", taskId) + } + return outputString, nil } func (c *ConductorHttpClient) GetAllTasksInQueue() (string, error) { - url := c.httpClient.MakeUrl("/tasks/queue/all") - outputString, err := c.httpClient.Get(url, nil, nil) - if err != nil { - log.Println("Error while trying to Get All Tasks in Queue", err) - return "", err - } else { - return outputString, nil - } + url := c.httpClient.MakeUrl("/tasks/queue/all") + outputString, err := c.httpClient.Get(url, nil, nil) + if err != nil { + log.Println("Error while trying to Get All Tasks in Queue", err) + return "", err + } else { + return outputString, nil + } } func (c *ConductorHttpClient) RemoveTaskFromQueue(taskType string, taskId string) (string, error) { - url := c.httpClient.MakeUrl("/tasks/queue/{taskType}/{taskId}", "{taskType}", taskType, "{taskId}", taskId) - outputString, err := c.httpClient.Delete(url, nil, nil, "") - if err != nil { - log.Println("Error while trying to Delete Task taskType:", taskType, ",taskId:", taskId, err) - return "", err - } else { - return outputString, nil - } + url := c.httpClient.MakeUrl("/tasks/queue/{taskType}/{taskId}", "{taskType}", taskType, "{taskId}", taskId) + outputString, err := c.httpClient.Delete(url, nil, nil, "") + if err != nil { + log.Println("Error while trying to Delete Task taskType:", taskType, ",taskId:", taskId, err) + return "", err + } else { + return outputString, nil + } } func (c *ConductorHttpClient) GetTaskQueueSizes(taskNames string) (string, error) { - url := c.httpClient.MakeUrl("/tasks/queue/sizes") - outputString, err := c.httpClient.Post(url, nil, nil, taskNames) - if err != nil { - log.Println("Error while trying to Get Task Queue Sizes", err) - return "", err - } else { - return outputString, nil - } + url := c.httpClient.MakeUrl("/tasks/queue/sizes") + outputString, err := c.httpClient.Post(url, nil, nil, taskNames) + if err != nil { + log.Println("Error while trying to Get Task Queue Sizes", err) + return "", err + } else { + return outputString, nil + } } - /**********************/ /* Workflow Functions */ /**********************/ func (c *ConductorHttpClient) GetWorkflow(workflowId string, includeTasks bool) (string, error) { - url := c.httpClient.MakeUrl("/workflow/{workflowId}", "{workflowId}", workflowId) - includeTasksString := "false" - if includeTasks { - includeTasksString = "true" - } - params := map[string]string{"includeTasks":includeTasksString} - outputString, err := c.httpClient.Get(url, params, nil) - if err != nil { - log.Println("Error while trying to Get Workflow", workflowId, err) - return "", err - } else { - return outputString, nil - } + url := c.httpClient.MakeUrl("/workflow/{workflowId}", "{workflowId}", workflowId) + includeTasksString := "false" + if includeTasks { + includeTasksString = "true" + } + params := map[string]string{"includeTasks": includeTasksString} + outputString, err := c.httpClient.Get(url, params, nil) + if err != nil { + log.Println("Error while trying to Get Workflow", workflowId, err) + return "", err + } else { + return outputString, nil + } } func (c *ConductorHttpClient) GetRunningWorkflows(workflowName string, version int, startTime float64, endTime float64) (string, error) { - url := c.httpClient.MakeUrl("/workflow/running/{workflowName}", "{workflowName}", workflowName) - versionString := "1" - // Set default version as 1 - if version > 0 { - versionString = strconv.Itoa(version) - } - params := map[string]string{"version":versionString} - if startTime != 0 { - params["startTime"] = strconv.FormatFloat(startTime, 'f', -1, 64) - } - if endTime != 0 { - params["endTime"] = strconv.FormatFloat(endTime, 'f', -1, 64) - } - - outputString, err := c.httpClient.Get(url, params, nil) - if err != nil { - log.Println("Error while trying to Get Running Workflows", workflowName, err) - return "", err - } else { - return outputString, nil - } + url := c.httpClient.MakeUrl("/workflow/running/{workflowName}", "{workflowName}", workflowName) + versionString := "1" + // Set default version as 1 + if version > 0 { + versionString = strconv.Itoa(version) + } + params := map[string]string{"version": versionString} + if startTime != 0 { + params["startTime"] = strconv.FormatFloat(startTime, 'f', -1, 64) + } + if endTime != 0 { + params["endTime"] = strconv.FormatFloat(endTime, 'f', -1, 64) + } + + outputString, err := c.httpClient.Get(url, params, nil) + if err != nil { + log.Println("Error while trying to Get Running Workflows", workflowName, err) + return "", err + } else { + return outputString, nil + } } func (c *ConductorHttpClient) StartWorkflow(workflowName string, version int, correlationId string, inputJson string) (string, error) { - url := c.httpClient.MakeUrl("/workflow/{workflowName}", "{workflowName}", workflowName) - - params := make(map[string]string) - if version > 0 { - params["version"] = strconv.Itoa(version) - } - - if correlationId != "" { - params["correlationId"] = correlationId - } - - if inputJson == "" { - inputJson = "{}" - } - - headers := map[string]string{"Accept":"text/plain"} - - outputString, err := c.httpClient.Post(url, params, headers, inputJson) - if err != nil { - log.Println("Error while trying to Start Workflow", workflowName, err) - return "", err - } else { - return outputString, nil - } + url := c.httpClient.MakeUrl("/workflow/{workflowName}", "{workflowName}", workflowName) + + params := make(map[string]string) + if version > 0 { + params["version"] = strconv.Itoa(version) + } + + if correlationId != "" { + params["correlationId"] = correlationId + } + + if inputJson == "" { + inputJson = "{}" + } + + headers := map[string]string{"Accept": "text/plain"} + + outputString, err := c.httpClient.Post(url, params, headers, inputJson) + if err != nil { + log.Println("Error while trying to Start Workflow", workflowName, err) + return "", err + } else { + return outputString, nil + } } func (c *ConductorHttpClient) TerminateWorkflow(workflowId string, reason string) (string, error) { - url := c.httpClient.MakeUrl("/workflow/{workflowId}", "{workflowId}", workflowId) + url := c.httpClient.MakeUrl("/workflow/{workflowId}", "{workflowId}", workflowId) - params := make(map[string]string) + params := make(map[string]string) - if reason != "" { - params["reason"] = reason - } + if reason != "" { + params["reason"] = reason + } - outputString, err := c.httpClient.Delete(url, params, nil, "") - if err != nil { - log.Println("Error while trying to Terminate Workflow", workflowId, err) - return "", err - } else { - return outputString, nil - } + outputString, err := c.httpClient.Delete(url, params, nil, "") + if err != nil { + log.Println("Error while trying to Terminate Workflow", workflowId, err) + return "", err + } else { + return outputString, nil + } } func (c *ConductorHttpClient) PauseWorkflow(workflowId string) (string, error) { - url := c.httpClient.MakeUrl("/workflow/{workflowId}/pause", "{workflowId}", workflowId) - outputString, err := c.httpClient.Put(url, nil, nil, "") - if err != nil { - log.Println("Error while trying to Pause Workflow", workflowId, err) - return "", err - } else { - return outputString, nil - } + url := c.httpClient.MakeUrl("/workflow/{workflowId}/pause", "{workflowId}", workflowId) + outputString, err := c.httpClient.Put(url, nil, nil, "") + if err != nil { + log.Println("Error while trying to Pause Workflow", workflowId, err) + return "", err + } else { + return outputString, nil + } } func (c *ConductorHttpClient) ResumeWorkflow(workflowId string) (string, error) { - url := c.httpClient.MakeUrl("/workflow/{workflowId}/resume", "{workflowId}", workflowId) - outputString, err := c.httpClient.Put(url, nil, nil, "") - if err != nil { - log.Println("Error while trying to Resume Workflow", workflowId, err) - return "", err - } else { - return outputString, nil - } + url := c.httpClient.MakeUrl("/workflow/{workflowId}/resume", "{workflowId}", workflowId) + outputString, err := c.httpClient.Put(url, nil, nil, "") + if err != nil { + log.Println("Error while trying to Resume Workflow", workflowId, err) + return "", err + } else { + return outputString, nil + } } func (c *ConductorHttpClient) SkipTaskFromWorkflow(workflowId string, taskReferenceName string, skipTaskRequestBody string) (string, error) { - url := c.httpClient.MakeUrl("/workflow/{workflowId}/skiptask/{taskReferenceName}", "{workflowId}", workflowId, "{taskReferenceName}", taskReferenceName) - - outputString, err := c.httpClient.Put(url, nil, nil, skipTaskRequestBody) - if err != nil { - log.Println("Error while trying to Skip Task From Workflow", workflowId, err) - return "", err - } else { - return outputString, nil - } + url := c.httpClient.MakeUrl("/workflow/{workflowId}/skiptask/{taskReferenceName}", "{workflowId}", workflowId, "{taskReferenceName}", taskReferenceName) + + outputString, err := c.httpClient.Put(url, nil, nil, skipTaskRequestBody) + if err != nil { + log.Println("Error while trying to Skip Task From Workflow", workflowId, err) + return "", err + } else { + return outputString, nil + } } func (c *ConductorHttpClient) RerunWorkflow(workflowId string, rerunWorkflowRequest string) (string, error) { - url := c.httpClient.MakeUrl("/workflow/{workflowId}/rerun", "{workflowId}", workflowId) - if rerunWorkflowRequest == "" { - rerunWorkflowRequest = "{}" - } - - outputString, err := c.httpClient.Post(url, nil, nil, rerunWorkflowRequest) - if err != nil { - log.Println("Error while trying to Rerun Workflow", workflowId, err) - return "", err - } else { - return outputString, nil - } + url := c.httpClient.MakeUrl("/workflow/{workflowId}/rerun", "{workflowId}", workflowId) + if rerunWorkflowRequest == "" { + rerunWorkflowRequest = "{}" + } + + outputString, err := c.httpClient.Post(url, nil, nil, rerunWorkflowRequest) + if err != nil { + log.Println("Error while trying to Rerun Workflow", workflowId, err) + return "", err + } else { + return outputString, nil + } } func (c *ConductorHttpClient) RestartWorkflow(workflowId string) (string, error) { - url := c.httpClient.MakeUrl("/workflow/{workflowId}/restart", "{workflowId}", workflowId) - - outputString, err := c.httpClient.Post(url, nil, nil, "") - if err != nil { - log.Println("Error while trying to Restart Completed Workflow", workflowId, err) - return "", err - } else { - return outputString, nil - } + url := c.httpClient.MakeUrl("/workflow/{workflowId}/restart", "{workflowId}", workflowId) + + outputString, err := c.httpClient.Post(url, nil, nil, "") + if err != nil { + log.Println("Error while trying to Restart Completed Workflow", workflowId, err) + return "", err + } else { + return outputString, nil + } } diff --git a/client/go/conductorworker.go b/client/go/conductorworker.go index c465705c40..64cefc5af4 100644 --- a/client/go/conductorworker.go +++ b/client/go/conductorworker.go @@ -47,10 +47,14 @@ func NewConductorWorker(baseUrl string, threadCount int, pollingInterval int) *C func (c *ConductorWorker) Execute(t *task.Task, executeFunction func(t *task.Task) (*task.TaskResult, error)) { taskResult, err := executeFunction(t) + if taskResult == nil { + log.Println("TaskResult cannot be nil: ", t.TaskId) + return + } if err != nil { log.Println("Error Executing task:", err.Error()) taskResult.Status = task.FAILED - taskResult.ReasonForIncompletion = err.Error() + taskResult.ReasonForIncompletion = err.Error() } taskResultJsonString, err := taskResult.ToJSONString() @@ -59,15 +63,15 @@ func (c *ConductorWorker) Execute(t *task.Task, executeFunction func(t *task.Tas log.Println("Error Forming TaskResult JSON body") return } - c.ConductorHttpClient.UpdateTask(taskResultJsonString) + _, _ = c.ConductorHttpClient.UpdateTask(taskResultJsonString) } -func (c *ConductorWorker) PollAndExecute(taskType string, executeFunction func(t *task.Task) (*task.TaskResult, error)) { +func (c *ConductorWorker) PollAndExecute(taskType string, domain string, executeFunction func(t *task.Task) (*task.TaskResult, error)) { for { time.Sleep(time.Duration(c.PollingInterval) * time.Millisecond) - + // Poll for Task taskType - polled, err := c.ConductorHttpClient.PollForTask(taskType, hostname) + polled, err := c.ConductorHttpClient.PollForTask(taskType, hostname, domain) if err != nil { log.Println("Error Polling task:", err.Error()) continue @@ -76,7 +80,7 @@ func (c *ConductorWorker) PollAndExecute(taskType string, executeFunction func(t log.Println("No task found for:", taskType) continue } - + // Parse Http response into Task parsedTask, err := task.ParseTask(polled) if err != nil { @@ -85,7 +89,7 @@ func (c *ConductorWorker) PollAndExecute(taskType string, executeFunction func(t } // Found a task, so we send an Ack - _, ackErr := c.ConductorHttpClient.AckTask(parsedTask.TaskId, hostname) + _, ackErr := c.ConductorHttpClient.AckTask(parsedTask.TaskId) if ackErr != nil { log.Println("Error Acking task:", ackErr.Error()) continue @@ -96,10 +100,10 @@ func (c *ConductorWorker) PollAndExecute(taskType string, executeFunction func(t } } -func (c *ConductorWorker) Start(taskType string, executeFunction func(t *task.Task) (*task.TaskResult, error), wait bool) { +func (c *ConductorWorker) Start(taskType string, domain string, executeFunction func(t *task.Task) (*task.TaskResult, error), wait bool) { log.Println("Polling for task:", taskType, "with a:", c.PollingInterval, "(ms) polling interval with", c.ThreadCount, "goroutines for task execution, with workerid as", hostname) for i := 1; i <= c.ThreadCount; i++ { - go c.PollAndExecute(taskType, executeFunction) + go c.PollAndExecute(taskType, domain, executeFunction) } // wait infinitely while the go routines are running diff --git a/client/go/httpclient/httpclient.go b/client/go/httpclient/httpclient.go index 646488091c..8f104469ff 100644 --- a/client/go/httpclient/httpclient.go +++ b/client/go/httpclient/httpclient.go @@ -14,154 +14,155 @@ package httpclient import ( - "log" - "net/http" - "io/ioutil" - "bytes" - "strings" - "fmt" + "bytes" + "fmt" + "io/ioutil" + "log" + "net/http" + "strings" ) type HttpClient struct { - BaseUrl string - Headers map[string]string - PrintLogs bool + BaseUrl string + Headers map[string]string + PrintLogs bool + client *http.Client } func NewHttpClient(baseUrl string, headers map[string]string, printLogs bool) *HttpClient { - httpClient := new(HttpClient) - httpClient.BaseUrl = baseUrl - httpClient.Headers = headers - httpClient.PrintLogs = printLogs - return httpClient + httpClient := new(HttpClient) + httpClient.BaseUrl = baseUrl + httpClient.Headers = headers + httpClient.PrintLogs = printLogs + httpClient.client = &http.Client{} + return httpClient } func (c *HttpClient) logSendRequest(url string, requestType string, body string) { - log.Println("Sending [", requestType, "] request to Server (", url, "):") - log.Println("Body:") - log.Println(body) + log.Println("Sending [", requestType, "] request to Server (", url, "):") + log.Println("Body:") + log.Println(body) } func (c *HttpClient) logResponse(statusCode string, response string) { - log.Println("Received response from Server (", c.BaseUrl, "):") - log.Println("Status: ", statusCode) - log.Println("Response:") - log.Println(response) + log.Println("Received response from Server (", c.BaseUrl, "):") + log.Println("Status: ", statusCode) + log.Println("Response:") + log.Println(response) } func genParamString(paramMap map[string]string) string { - if paramMap == nil || len(paramMap) == 0 { - return "" - } - - output := "?" - for key, value := range paramMap { - output += key - output += "=" - output += value - output += "&" - } - return output + if paramMap == nil || len(paramMap) == 0 { + return "" + } + + output := "?" + for key, value := range paramMap { + output += key + output += "=" + output += value + output += "&" + } + return output } func (c *HttpClient) httpRequest(url string, requestType string, headers map[string]string, body string) (string, error) { - var req *http.Request - var err error - - if requestType == "GET" { - req, err = http.NewRequest(requestType, url, nil) - } else { - var bodyStr = []byte(body) - req, err = http.NewRequest(requestType, url, bytes.NewBuffer(bodyStr)) - } - - if err != nil { - return "", err - } - // Default Headers - for key, value := range c.Headers { - req.Header.Set(key, value) - } - - // Custom Headers - for key, value := range headers { - req.Header.Set(key, value) - } - - if c.PrintLogs { - c.logSendRequest(url, requestType, body) - } - - client := &http.Client{} - resp, err := client.Do(req) - if err != nil { - return "", err - } - - // If successful HTTP call, but Client/Server error, we return error - if resp.StatusCode >= 400 && resp.StatusCode < 500 { - return "", fmt.Errorf("%d Http Client Error for url: %s", resp.StatusCode, url) - } - if resp.StatusCode >= 500 && resp.StatusCode < 600 { - return "", fmt.Errorf("%d Http Server Error for url: %s", resp.StatusCode, url) - } - - defer resp.Body.Close() - response, err := ioutil.ReadAll(resp.Body) - responseString := string(response) - if err != nil { - log.Println("ERROR reading response for URL: ", url) - return "", err - } - - if c.PrintLogs { - c.logResponse(resp.Status, responseString) - } - return responseString, nil + var req *http.Request + var err error + + if requestType == "GET" { + req, err = http.NewRequest(requestType, url, nil) + } else { + var bodyStr = []byte(body) + req, err = http.NewRequest(requestType, url, bytes.NewBuffer(bodyStr)) + } + + if err != nil { + return "", err + } + // Default Headers + for key, value := range c.Headers { + req.Header.Set(key, value) + } + + // Custom Headers + for key, value := range headers { + req.Header.Set(key, value) + } + + if c.PrintLogs { + c.logSendRequest(url, requestType, body) + } + + resp, err := c.client.Do(req) + if err != nil { + return "", err + } + + // If successful HTTP call, but Client/Server error, we return error + if resp.StatusCode >= 400 && resp.StatusCode < 500 { + return "", fmt.Errorf("%d Http Client Error for url: %s", resp.StatusCode, url) + } + if resp.StatusCode >= 500 && resp.StatusCode < 600 { + return "", fmt.Errorf("%d Http Server Error for url: %s", resp.StatusCode, url) + } + + defer resp.Body.Close() + response, err := ioutil.ReadAll(resp.Body) + responseString := string(response) + if err != nil { + log.Println("ERROR reading response for URL: ", url) + return "", err + } + + if c.PrintLogs { + c.logResponse(resp.Status, responseString) + } + return responseString, nil } func (c *HttpClient) Get(url string, queryParamsMap map[string]string, headers map[string]string) (string, error) { - urlString := url + genParamString(queryParamsMap) - resp, err := c.httpRequest(urlString, "GET", headers, "") - if err != nil { - log.Println("Http GET Error for URL: ", urlString) - return "", err - } - return resp, nil + urlString := url + genParamString(queryParamsMap) + resp, err := c.httpRequest(urlString, "GET", headers, "") + if err != nil { + log.Println("Http GET Error for URL: ", urlString) + return "", err + } + return resp, nil } func (c *HttpClient) Put(url string, queryParamsMap map[string]string, headers map[string]string, body string) (string, error) { - urlString := url + genParamString(queryParamsMap) - resp, err := c.httpRequest(urlString, "PUT", headers, body) - if err != nil { - log.Println("Http PUT Error for URL: ", urlString, ) - return "", err - } - return resp, nil + urlString := url + genParamString(queryParamsMap) + resp, err := c.httpRequest(urlString, "PUT", headers, body) + if err != nil { + log.Println("Http PUT Error for URL: ", urlString) + return "", err + } + return resp, nil } func (c *HttpClient) Post(url string, queryParamsMap map[string]string, headers map[string]string, body string) (string, error) { - urlString := url + genParamString(queryParamsMap) - resp, err := c.httpRequest(urlString, "POST", headers, body) - if err != nil { - log.Println("Http POST Error for URL: ", urlString) - return "", err - } - return resp, nil + urlString := url + genParamString(queryParamsMap) + resp, err := c.httpRequest(urlString, "POST", headers, body) + if err != nil { + log.Println("Http POST Error for URL: ", urlString) + return "", err + } + return resp, nil } func (c *HttpClient) Delete(url string, queryParamsMap map[string]string, headers map[string]string, body string) (string, error) { - urlString := url + genParamString(queryParamsMap) - resp, err := c.httpRequest(urlString, "DELETE", headers, body) - if err != nil { - log.Println("Http DELETE Error for URL: ", urlString) - return "", err - } - return resp, nil + urlString := url + genParamString(queryParamsMap) + resp, err := c.httpRequest(urlString, "DELETE", headers, body) + if err != nil { + log.Println("Http DELETE Error for URL: ", urlString) + return "", err + } + return resp, nil } func (c *HttpClient) MakeUrl(path string, args ...string) string { - url := c.BaseUrl - r := strings.NewReplacer(args...) - return url + r.Replace(path) + url := c.BaseUrl + r := strings.NewReplacer(args...) + return url + r.Replace(path) } diff --git a/client/go/startclient/startclient.go b/client/go/startclient/startclient.go index aabe5d43e7..ca22ed8765 100644 --- a/client/go/startclient/startclient.go +++ b/client/go/startclient/startclient.go @@ -21,6 +21,6 @@ import ( func main() { c := conductor.NewConductorWorker("http://localhost:8080/api", 1, 10000) - c.Start("task_1", sample.Task_1_Execution_Function, false) - c.Start("task_2", sample.Task_2_Execution_Function, true) + c.Start("task_1", "", sample.Task_1_Execution_Function, false) + c.Start("task_2", "mydomain", sample.Task_2_Execution_Function, true) }