Skip to content

Commit

Permalink
Merge pull request Netflix#1909 from djcass44/update-go-worker-poll
Browse files Browse the repository at this point in the history
Updated go client Polling and Ack to match documented spec
  • Loading branch information
apanicker-nflx authored Oct 13, 2020
2 parents 8adf958 + 06b7c6b commit fc37811
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 26 deletions.
4 changes: 2 additions & 2 deletions client/go/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,8 @@ import (
func main() {
c := conductor.NewConductorWorker("http://localhost:8080", 1, 10000)

c.Start("task_1", sample.Task_1_Execution_Function, false)
c.Start("task_2", sample.Task_2_Execution_Function, true)
c.Start("task_1", "", sample.Task_1_Execution_Function, false)
c.Start("task_2", "mydomain", sample.Task_2_Execution_Function, true)
}

```
Expand Down
18 changes: 10 additions & 8 deletions client/go/conductorhttpclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@
package conductor

import (
"fmt"
"github.com/netflix/conductor/client/go/httpclient"
"strconv"
"log"
"fmt"
"strconv"
)

type ConductorHttpClient struct {
Expand Down Expand Up @@ -188,9 +188,12 @@ func (c *ConductorHttpClient) UpdateTask(taskBody string) (string, error) {
}
}

func (c *ConductorHttpClient) PollForTask(taskType string, workerid string) (string, error) {
func (c *ConductorHttpClient) PollForTask(taskType string, workerid string, domain string) (string, error) {
url := c.httpClient.MakeUrl("/tasks/poll/{taskType}", "{taskType}", taskType)
params := map[string]string{"workerid":workerid}
params := map[string]string{
"workerid": workerid,
"domain": domain,
}
outputString, err := c.httpClient.Get(url, params, nil)
if err != nil {
log.Println("Error while trying to Poll For Task taskType:", taskType, ",workerid:", workerid, err)
Expand All @@ -200,11 +203,10 @@ func (c *ConductorHttpClient) PollForTask(taskType string, workerid string) (str
}
}

func (c *ConductorHttpClient) AckTask(taskId string, workerid string) (string, error) {
func (c *ConductorHttpClient) AckTask(taskId string) (string, error) {
url := c.httpClient.MakeUrl("/tasks/{taskId}/ack", "{taskId}", taskId)
params := map[string]string{"workerid":workerid}
headers := map[string]string{"Accept":"application/json"}
outputString, err := c.httpClient.Post(url, params, headers, "")
headers := map[string]string{"Accept": "application/json"}
outputString, err := c.httpClient.Post(url, nil, headers, "")
if err != nil {
return "", err
}
Expand Down
22 changes: 13 additions & 9 deletions client/go/conductorworker.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,14 @@ func NewConductorWorker(baseUrl string, threadCount int, pollingInterval int) *C

func (c *ConductorWorker) Execute(t *task.Task, executeFunction func(t *task.Task) (*task.TaskResult, error)) {
taskResult, err := executeFunction(t)
if taskResult == nil {
log.Println("TaskResult cannot be nil: ", t.TaskId)
return
}
if err != nil {
log.Println("Error Executing task:", err.Error())
taskResult.Status = task.FAILED
taskResult.ReasonForIncompletion = err.Error()
taskResult.ReasonForIncompletion = err.Error()
}

taskResultJsonString, err := taskResult.ToJSONString()
Expand All @@ -59,15 +63,15 @@ func (c *ConductorWorker) Execute(t *task.Task, executeFunction func(t *task.Tas
log.Println("Error Forming TaskResult JSON body")
return
}
c.ConductorHttpClient.UpdateTask(taskResultJsonString)
_, _ = c.ConductorHttpClient.UpdateTask(taskResultJsonString)
}

func (c *ConductorWorker) PollAndExecute(taskType string, executeFunction func(t *task.Task) (*task.TaskResult, error)) {
func (c *ConductorWorker) PollAndExecute(taskType string, domain string, executeFunction func(t *task.Task) (*task.TaskResult, error)) {
for {
time.Sleep(time.Duration(c.PollingInterval) * time.Millisecond)

// Poll for Task taskType
polled, err := c.ConductorHttpClient.PollForTask(taskType, hostname)
polled, err := c.ConductorHttpClient.PollForTask(taskType, hostname, domain)
if err != nil {
log.Println("Error Polling task:", err.Error())
continue
Expand All @@ -76,7 +80,7 @@ func (c *ConductorWorker) PollAndExecute(taskType string, executeFunction func(t
log.Println("No task found for:", taskType)
continue
}

// Parse Http response into Task
parsedTask, err := task.ParseTask(polled)
if err != nil {
Expand All @@ -85,7 +89,7 @@ func (c *ConductorWorker) PollAndExecute(taskType string, executeFunction func(t
}

// Found a task, so we send an Ack
_, ackErr := c.ConductorHttpClient.AckTask(parsedTask.TaskId, hostname)
_, ackErr := c.ConductorHttpClient.AckTask(parsedTask.TaskId)
if ackErr != nil {
log.Println("Error Acking task:", ackErr.Error())
continue
Expand All @@ -96,10 +100,10 @@ func (c *ConductorWorker) PollAndExecute(taskType string, executeFunction func(t
}
}

func (c *ConductorWorker) Start(taskType string, executeFunction func(t *task.Task) (*task.TaskResult, error), wait bool) {
func (c *ConductorWorker) Start(taskType string, domain string, executeFunction func(t *task.Task) (*task.TaskResult, error), wait bool) {
log.Println("Polling for task:", taskType, "with a:", c.PollingInterval, "(ms) polling interval with", c.ThreadCount, "goroutines for task execution, with workerid as", hostname)
for i := 1; i <= c.ThreadCount; i++ {
go c.PollAndExecute(taskType, executeFunction)
go c.PollAndExecute(taskType, domain, executeFunction)
}

// wait infinitely while the go routines are running
Expand Down
11 changes: 6 additions & 5 deletions client/go/httpclient/httpclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,25 +14,27 @@
package httpclient

import (
"bytes"
"fmt"
"io/ioutil"
"log"
"net/http"
"io/ioutil"
"bytes"
"strings"
"fmt"
)

type HttpClient struct {
BaseUrl string
Headers map[string]string
PrintLogs bool
client *http.Client
}

func NewHttpClient(baseUrl string, headers map[string]string, printLogs bool) *HttpClient {
httpClient := new(HttpClient)
httpClient.BaseUrl = baseUrl
httpClient.Headers = headers
httpClient.PrintLogs = printLogs
httpClient.client = &http.Client{}
return httpClient
}

Expand Down Expand Up @@ -92,8 +94,7 @@ func (c *HttpClient) httpRequest(url string, requestType string, headers map[str
c.logSendRequest(url, requestType, body)
}

client := &http.Client{}
resp, err := client.Do(req)
resp, err := c.client.Do(req)
if err != nil {
return "", err
}
Expand Down
4 changes: 2 additions & 2 deletions client/go/startclient/startclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,6 @@ import (
func main() {
c := conductor.NewConductorWorker("http://localhost:8080/api", 1, 10000)

c.Start("task_1", sample.Task_1_Execution_Function, false)
c.Start("task_2", sample.Task_2_Execution_Function, true)
c.Start("task_1", "", sample.Task_1_Execution_Function, false)
c.Start("task_2", "mydomain", sample.Task_2_Execution_Function, true)
}

0 comments on commit fc37811

Please sign in to comment.